精华内容
下载资源
问答
  • ELK+Kafka+Beats实现海量日志收集平台(一) 一、应用场景  利用ELK+Kafka+Beats来实现一个统一日志平台,它是一款针对大规模分布式系统日志的统一采集、 存储、分析的APM 工具。在分布式系统中,有大量的服务...

                ELK+Kafka+Beats实现海量日志收集平台(一)

    目录

    一、应用场景

    二、实现原理


    一、应用场景

      利用ELK+Kafka+Beats来实现一个统一日志平台,它是一款针对大规模分布式系统日志的统一采集、存储、分析的APM 工具。在分布式系统中,有大量的服务部署在不通的服务器上,客服端的一个请求查询,就可能会调用后端多个服务,每个服务之间可能会相互调用或一个服务又会调用其它服务,最终才将请求的结果返回,汇总展现到前端页面上。假若这其中的某个环节发生异常,开发运维人员很难准确定位这个问题到底是由哪个服务调用造成的, 统一日志平台的作用就是追踪每个请求的完整调用链路,收集调用链路上每个服务的性能、日志数据,方便开发运维人员能够快速发现问题,定位问题。

      统一日志平台通过采集模块、传输模块、存储模块、分析模块实现日志数据的统一采集、存储和分析,结构图如下:

              

    二、实现原理

      “ ELK”是三个开源项目的缩写:Elasticsearch,Logstash和Kibana。也称ELK Stack,能够可靠,安全地从任何来源以任何格式获取数据,然后进行实时搜索,分析和可视化。Elasticsearch是搜索和分析引擎,开源的,分布式,RESTful,基于JSON的搜索引擎。它易于使用,可扩展且灵活。Logstash是服务器端的数据处理管道,它同时从多个源中提取数据,进行转换,然后将其发送到类似Elasticsearch的“存储”中。Kibana允许用户在Elasticsearch中使用图表将数据可视化。

            

      Beats 是一个免费且开放的平台,集合了多种单一用途数据采集器。它们从成百上千或成千上万台机器和系统向 Logstash 或 Elasticsearch 发送数据。Beats可以将数据直接发送到Elasticsearch或通过 Logstash 发送,然后在Logstash中可以进一步处理和过滤数据,然后再在Kibana中进行可视化 。

    Beats架构图如下:

                  

      要实现海量日志数据收集分析,首先要解决的问题就是如何处理海量的数据信息,本案例中利用Kafka结合Beats、Logstash来实现分布式消息队列平台,其中采用Beats来采集日志数据,也就相当于是Kafka消息队列中的Producer来生产消息,然后将消息发送到Kafka(相当于消息队列的Broker),然后将日志数据发送到Logstash(扮演消费者-Consumer)进行分析过滤等处理。再把从Logstash中处理之后的数据存储到Elasticsearch中,最终通过Kibana来可视化日志数据。

    该过程架构图如下:

        

             其中Beats主要有以下几种:

                   Filebeat      : 用于收集日志文件

                   Winlogbeat : 用于收集Windows事件日志

                   Metricbeat  : 用于指标

                   Packetbeat : 用于收集网络流量数据

      由于我们要采集日志来进行分析管理,所以我们使用Beats中的filebeat来进行日志采集

      通过上面的架构思路,大致清楚了日志收集分析显示到底要干什么事儿?接下来再通过下图进一步将

    该流程具体梳理下

           

      本案例通过编写一个简单的SpringBoot工程来生产日志数据,也就是图中的Log4j2 Appender来作为filebeats的数据源(filebeats要从哪儿获取日志文件),使用Log4j2来进行日志记录而不是Spring自带的Slf4j记录是因为Log4j2的性能要优于Slf4j。图中把生成的日志分为了all.log、error.log 两类日志,all.log用于记录应用服务产生的所有日志记录,error.log主要用于记录warn、error两类的错误日志。error.log日志只有当应用服务报错的时候才进行记录,这样以便日后进行分析告警。

      ELK官方参考文档

      ElasticSearch官网文档 https://www.elastic.co/cn/elasticsearch/

      Logstash官网文档 https://www.elastic.co/cn/logstash

      Kibana 官网文档  https://www.elastic.co/cn/kibana

      Beats 官网文档 https://www.elastic.co/cn/beats/

           下一章: ELK+Kafka+Beats实现海量日志收集平台(二)

    展开全文
  • ELK + KAFKA

    2019-09-23 08:03:44
    1,ELK 日志收集 E: Elasticsearch L: Logstash K: Kibana 为什么用Elasticsearch 来存储日志:因为Elastic...2,为什么需要elk+kafka实现分布式日志收集。 因为单纯的使用elk 日志收集,进行io读取,容易数据丢...

    1,ELK 日志收集

    E: Elasticsearch

    L: Logstash

    K: Kibana

    为什么用Elasticsearch 来存储日志:因为Elasticsearch方便扩展,本来就是处理PB级别的数据,可以存储足够多的日志数据。

    原理图大致如下:

    2,为什么需要elk+kafka实现分布式日志收集。

    因为单纯的使用elk 日志收集,进行io读取,容易数据丢失,logstash太多了,扩展不好。非实时性。

    3, Logstash输入来源有那些?

    本地文件、kafka、数据库、mongdb、redis

    4,环境搭建

        第一步:搭建Kafka 环境

           

     

    转载于:https://www.cnblogs.com/pickKnow/p/11453740.html

    展开全文
  • ELK+Kafka+Beats实现海量日志收集平台(二) 目录 三、环境搭建 四、部署demo工程项目 五、测试 三、环境搭建  通过上一小节应用场景和实现原理的介绍,接下来实现所需环境搭建及说明 架构图如下所示:   ...

                ELK+Kafka+Beats实现海量日志收集平台(二)

    目录

    三、环境搭建

    四、部署demo工程项目

    五、测试


    三、环境搭建

      通过上一小节应用场景和实现原理的介绍,接下来实现所需环境搭建及说明架构图如下所示:

      

      环境说明:

      192.168.232.6 : 部署了demo项目(用于产生数据日志)

                 filebeat-6.6.0 

      192.168.232.3 : Kafka (单体)(Zookeeper:192.168.232.3~5)

      192.168.232.4 : Kibana

      192.168.232.7 : Logstash (单体)

       ES集群:192.168.232.8~10

       1、filebeat安装配置参考https://www.cnblogs.com/jhtian/p/13731230.html

       2、Kafka安装配置参考https://www.cnblogs.com/jhtian/p/13708679.html

       3、Logstash安装配置参考https://www.cnblogs.com/jhtian/p/13744753.html

       4、ES集群搭建可参考https://www.cnblogs.com/jhtian/p/12703651.html

       5、Kibana安装可参考https://www.cnblogs.com/jhtian/p/13785029.html

    四、部署demo工程项目

      项目结构图如下,分别调用项目的 /index、/error两个方法分别打印正常、错误日志(warn及以上级别日志)到logs文件夹中,作为filebeat读取数据的来源。

         

    web访问类文件

    indexAction.java

     1 package com.tianjh.demo.web;
     2 
     3 import com.tianjh.demo.util.SetMDC;
     4 import lombok.extern.slf4j.Slf4j;
     5 import org.springframework.web.bind.annotation.RequestMapping;
     6 import org.springframework.web.bind.annotation.RestController;
     7 
     8 @Slf4j
     9 @RestController
    10 public class indexAction {
    11 
    12     @RequestMapping(value = "/index")
    13     public String index() {
    14         SetMDC.putMDC();
    15         log.info("这是一条模拟error日志打印");
    16         log.info("这是一条模拟warn日志打印");
    17         log.info("这是一条模拟info日志打印");
    18         return "hello word";
    19     }
    20 
    21     @RequestMapping(value = "/err")
    22     public String error() {
    23         SetMDC.putMDC();
    24         try {
    25             int a = 5/0;
    26         } catch (Exception e) {
    27             log.error("算术异常", e);
    28         }
    29         return "error";
    30     }
    31 }
    

    工具类Utils

    FastJsonConvertUtil.java

      1 package com.tianjh.demo.util;
      2 
      3 import java.util.ArrayList;
      4 import java.util.List;
      5 
      6 import com.alibaba.fastjson.JSON;
      7 import com.alibaba.fastjson.JSONObject;
      8 import com.alibaba.fastjson.serializer.SerializerFeature;
      9 
     10 import lombok.extern.slf4j.Slf4j;
     11 
     12 /**
     13  * $FastJsonConvertUtil
     14  * @author hezhuo.bai
     15  * @since 2019年1月15日 下午4:53:28
     16  */
     17 @Slf4j
     18 public class FastJsonConvertUtil {
     19 
     20     private static final SerializerFeature[] featuresWithNullValue = { SerializerFeature.WriteMapNullValue, SerializerFeature.WriteNullBooleanAsFalse,
     21             SerializerFeature.WriteNullListAsEmpty, SerializerFeature.WriteNullNumberAsZero, SerializerFeature.WriteNullStringAsEmpty };
     22 
     23     /**
     24      * <B>方法名称:</B>将JSON字符串转换为实体对象<BR>
     25      * <B>概要说明:</B>将JSON字符串转换为实体对象<BR>
     26      * @author hezhuo.bai
     27      * @since 2019年1月15日 下午4:53:49 
     28      * @param data JSON字符串
     29      * @param clzss 转换对象
     30      * @return T
     31      */
     32     public static <T> T convertJSONToObject(String data, Class<T> clzss) {
     33         try {
     34             T t = JSON.parseObject(data, clzss);
     35             return t;
     36         } catch (Exception e) {
     37             log.error("convertJSONToObject Exception", e);
     38             return null;
     39         }
     40     }
     41     
     42     /**
     43      * <B>方法名称:</B>将JSONObject对象转换为实体对象<BR>
     44      * <B>概要说明:</B>将JSONObject对象转换为实体对象<BR>
     45      * @author hezhuo.bai
     46      * @since 2019年1月15日 下午4:54:32
     47      * @param data JSONObject对象
     48      * @param clzss 转换对象
     49      * @return T
     50      */
     51     public static <T> T convertJSONToObject(JSONObject data, Class<T> clzss) {
     52         try {
     53             T t = JSONObject.toJavaObject(data, clzss);
     54             return t;
     55         } catch (Exception e) {
     56             log.error("convertJSONToObject Exception", e);
     57             return null;
     58         }
     59     }
     60 
     61     /**
     62      * <B>方法名称:</B>将JSON字符串数组转为List集合对象<BR>
     63      * <B>概要说明:</B>将JSON字符串数组转为List集合对象<BR>
     64      * @author hezhuo.bai
     65      * @since 2019年1月15日 下午4:54:50
     66      * @param data JSON字符串数组
     67      * @param clzss 转换对象
     68      * @return List<T>集合对象
     69      */
     70     public static <T> List<T> convertJSONToArray(String data, Class<T> clzss) {
     71         try {
     72             List<T> t = JSON.parseArray(data, clzss);
     73             return t;
     74         } catch (Exception e) {
     75             log.error("convertJSONToArray Exception", e);
     76             return null;
     77         }
     78     }
     79 
     80     /**
     81      * <B>方法名称:</B>将List<JSONObject>转为List集合对象<BR>
     82      * <B>概要说明:</B>将List<JSONObject>转为List集合对象<BR>
     83      * @author hezhuo.bai
     84      * @since 2019年1月15日 下午4:55:11
     85      * @param data List<JSONObject>
     86      * @param clzss 转换对象
     87      * @return List<T>集合对象
     88      */
     89     public static <T> List<T> convertJSONToArray(List<JSONObject> data, Class<T> clzss) {
     90         try {
     91             List<T> t = new ArrayList<T>();
     92             for (JSONObject jsonObject : data) {
     93                 t.add(convertJSONToObject(jsonObject, clzss));
     94             }
     95             return t;
     96         } catch (Exception e) {
     97             log.error("convertJSONToArray Exception", e);
     98             return null;
     99         }
    100     }
    101 
    102     /**
    103      * <B>方法名称:</B>将对象转为JSON字符串<BR>
    104      * <B>概要说明:</B>将对象转为JSON字符串<BR>
    105      * @author hezhuo.bai
    106      * @since 2019年1月15日 下午4:55:41
    107      * @param obj 任意对象
    108      * @return JSON字符串
    109      */
    110     public static String convertObjectToJSON(Object obj) {
    111         try {
    112             String text = JSON.toJSONString(obj);
    113             return text;
    114         } catch (Exception e) {
    115             log.error("convertObjectToJSON Exception", e);
    116             return null;
    117         }
    118     }
    119     
    120     /**
    121      * <B>方法名称:</B>将对象转为JSONObject对象<BR>
    122      * <B>概要说明:</B>将对象转为JSONObject对象<BR>
    123      * @author hezhuo.bai
    124      * @since 2019年1月15日 下午4:55:55
    125      * @param obj 任意对象
    126      * @return JSONObject对象
    127      */
    128     public static JSONObject convertObjectToJSONObject(Object obj){
    129         try {
    130             JSONObject jsonObject = (JSONObject) JSONObject.toJSON(obj);
    131             return jsonObject;
    132         } catch (Exception e) {
    133             log.error("convertObjectToJSONObject Exception", e);
    134             return null;
    135         }        
    136     }
    137 
    138     public static String convertObjectToJSONWithNullValue(Object obj) {
    139         try {
    140             String text = JSON.toJSONString(obj, featuresWithNullValue);
    141             return text;
    142         } catch (Exception e) {
    143             log.error("convertObjectToJSONWithNullValue Exception", e);
    144             return null;
    145         }
    146     }
    147 }
    

    NetUtil.java

      1 package com.tianjh.demo.util;
      2 
      3 import java.lang.management.ManagementFactory;
      4 import java.lang.management.RuntimeMXBean;
      5 import java.net.InetAddress;
      6 import java.net.NetworkInterface;
      7 import java.net.SocketAddress;
      8 import java.net.UnknownHostException;
      9 import java.nio.channels.SocketChannel;
     10 import java.util.Enumeration;
     11 import java.util.regex.Matcher;
     12 import java.util.regex.Pattern;
     13 
     14 /**
     15  * $NetUtil
     16  * 获取本机地址 端口号的工具类
     17  *  * @author hezhuo.bai
     18  *  * @since 2019年1月15日 下午4:53:28
     19  */
     20 public class NetUtil {   
     21     
     22     public static String normalizeAddress(String address){
     23         String[] blocks = address.split("[:]");
     24         if(blocks.length > 2){
     25             throw new IllegalArgumentException(address + " is invalid");
     26         }
     27         String host = blocks[0];
     28         int port = 80;
     29         if(blocks.length > 1){
     30             port = Integer.valueOf(blocks[1]);
     31         } else {
     32             address += ":"+port; //use default 80
     33         } 
     34         String serverAddr = String.format("%s:%d", host, port);
     35         return serverAddr;
     36     }
     37     
     38     public static String getLocalAddress(String address){
     39         String[] blocks = address.split("[:]");
     40         if(blocks.length != 2){
     41             throw new IllegalArgumentException(address + " is invalid address");
     42         } 
     43         String host = blocks[0];
     44         int port = Integer.valueOf(blocks[1]);
     45         
     46         if("0.0.0.0".equals(host)){
     47             return String.format("%s:%d",NetUtil.getLocalIp(), port);
     48         }
     49         return address;
     50     }
     51     
     52     private static int matchedIndex(String ip, String[] prefix){
     53         for(int i=0; i<prefix.length; i++){
     54             String p = prefix[i];
     55             if("*".equals(p)){ //*, assumed to be IP
     56                 if(ip.startsWith("127.") ||
     57                    ip.startsWith("10.") ||    
     58                    ip.startsWith("172.") ||
     59                    ip.startsWith("192.")){
     60                     continue;
     61                 }
     62                 return i;
     63             } else {
     64                 if(ip.startsWith(p)){
     65                     return i;
     66                 }
     67             } 
     68         }
     69         
     70         return -1;
     71     }
     72     
     73     public static String getLocalIp(String ipPreference) {
     74         if(ipPreference == null){
     75             ipPreference = "*>10>172>192>127";
     76         }
     77         String[] prefix = ipPreference.split("[> ]+");
     78         try {
     79             Pattern pattern = Pattern.compile("[0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+");
     80             Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
     81             String matchedIp = null;
     82             int matchedIdx = -1;
     83             while (interfaces.hasMoreElements()) {
     84                 NetworkInterface ni = interfaces.nextElement();
     85                 Enumeration<InetAddress> en = ni.getInetAddresses(); 
     86                 while (en.hasMoreElements()) {
     87                     InetAddress addr = en.nextElement();
     88                     String ip = addr.getHostAddress();  
     89                     Matcher matcher = pattern.matcher(ip);
     90                     if (matcher.matches()) {  
     91                         int idx = matchedIndex(ip, prefix);
     92                         if(idx == -1) continue;
     93                         if(matchedIdx == -1){
     94                             matchedIdx = idx;
     95                             matchedIp = ip;
     96                         } else {
     97                             if(matchedIdx>idx){
     98                                 matchedIdx = idx;
     99                                 matchedIp = ip;
    100                             }
    101                         }
    102                     } 
    103                 } 
    104             } 
    105             if(matchedIp != null) return matchedIp;
    106             return "127.0.0.1";
    107         } catch (Exception e) { 
    108             return "127.0.0.1";
    109         }
    110     }
    111     
    112     public static String getLocalIp() {
    113         return getLocalIp("*>10>172>192>127");
    114     }
    115     
    116     public static String remoteAddress(SocketChannel channel){
    117         SocketAddress addr = channel.socket().getRemoteSocketAddress();
    118         String res = String.format("%s", addr);
    119         return res;
    120     }
    121     
    122     public static String localAddress(SocketChannel channel){
    123         SocketAddress addr = channel.socket().getLocalSocketAddress();
    124         String res = String.format("%s", addr);
    125         return addr==null? res: res.substring(1);
    126     }
    127     
    128     public static String getPid(){
    129         RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
    130         String name = runtime.getName();
    131         int index = name.indexOf("@");
    132         if (index != -1) {
    133             return name.substring(0, index);
    134         }
    135         return null;
    136     }
    137     
    138     public static String getLocalHostName() {
    139         try {
    140             return (InetAddress.getLocalHost()).getHostName();
    141         } catch (UnknownHostException uhe) {
    142             String host = uhe.getMessage();
    143             if (host != null) {
    144                 int colon = host.indexOf(':');
    145                 if (colon > 0) {
    146                     return host.substring(0, colon);
    147                 }
    148             }
    149             return "UnknownHost";
    150         }
    151     }
    152 }

    SetMDC.java

     1 package com.tianjh.demo.util;
     2 
     3 import org.jboss.logging.MDC;
     4 import org.springframework.context.EnvironmentAware;
     5 import org.springframework.core.env.Environment;
     6 import org.springframework.stereotype.Component;
     7 
     8 @Component
     9 public class SetMDC implements EnvironmentAware {
    10 
    11     private static Environment environment;
    12     
    13     @Override
    14     public void setEnvironment(Environment environment) {
    15         SetMDC.environment = environment;
    16     }
    17     
    18     public static void putMDC() {
    19         MDC.put("hostName", NetUtil.getLocalHostName());
    20         MDC.put("ip", NetUtil.getLocalIp());
    21         MDC.put("applicationName", environment.getProperty("spring.application.name"));
    22     }
    23 
    24 }
    

    及关键配置文件

    pom.xml

    1 <?xml version="1.0" encoding="UTF-8"?>
     2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     3          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
     4     <modelVersion>4.0.0</modelVersion>
     5     <parent>
     6         <groupId>org.springframework.boot</groupId>
     7         <artifactId>spring-boot-starter-parent</artifactId>
     8         <version>2.1.5.RELEASE</version>
     9         <relativePath/> <!-- lookup parent from repository -->
    10     </parent>
    11     <groupId>com.tianjh</groupId>
    12     <artifactId>demo</artifactId>
    13     <version>0.0.1-SNAPSHOT</version>
    14     <name>demo</name>
    15     <description>Demo project for Spring Boot</description>
    16 
    17     <properties>
    18         <java.version>1.8</java.version>
    19     </properties>
    20 
    21 
    22     <dependencies>
    23         <dependency>
    24             <groupId>org.springframework.boot</groupId>
    25             <artifactId>spring-boot-starter-web</artifactId>
    26             <!--     排除spring-boot-starter-logging -->
    27             <exclusions>
    28                 <exclusion>
    29                     <groupId>org.springframework.boot</groupId>
    30                     <artifactId>spring-boot-starter-logging</artifactId>
    31                 </exclusion>
    32             </exclusions>
    33         </dependency>
    34 
    35         <dependency>
    36             <groupId>org.springframework.boot</groupId>
    37             <artifactId>spring-boot-starter-test</artifactId>
    38             <scope>test</scope>
    39         </dependency>
    40         <dependency>
    41             <groupId>org.projectlombok</groupId>
    42             <artifactId>lombok</artifactId>
    43         </dependency>
    44         <!-- log4j2 -->
    45         <dependency>
    46             <groupId>org.springframework.boot</groupId>
    47             <artifactId>spring-boot-starter-log4j2</artifactId>
    48         </dependency>
    49         <dependency>
    50             <groupId>com.lmax</groupId>
    51             <artifactId>disruptor</artifactId>
    52             <version>3.3.4</version>
    53         </dependency>
    54 
    55         <dependency>
    56             <groupId>com.alibaba</groupId>
    57             <artifactId>fastjson</artifactId>
    58             <version>1.2.58</version>
    59         </dependency>
    60 
    61     </dependencies>
    62 
    63     <build>
    64         <finalName>demo</finalName>
    65         <!-- 打包时包含properties、xml -->
    66         <resources>
    67             <resource>
    68                 <directory>src/main/java</directory>
    69                 <includes>
    70                     <include>**/*.properties</include>
    71                     <include>**/*.xml</include>
    72                 </includes>
    73                 <!-- 是否替换资源中的属性-->
    74                 <filtering>true</filtering>
    75             </resource>
    76             <resource>
    77                 <directory>src/main/resources</directory>
    78             </resource>
    79         </resources>
    80 
    81         <plugins>
    82             <plugin>
    83                 <groupId>org.springframework.boot</groupId>
    84                 <artifactId>spring-boot-maven-plugin</artifactId>
    85                 <configuration>
    86                     <mainClass>com.tianjh.demo.Application</mainClass>
    87                 </configuration>
    88             </plugin>
    89         </plugins>
    90     </build>
    91 
    92 </project>

    Spring配置文件

    1 server.servlet.context-path=/
    2 server.port=8001
    3 
    4 spring.application.name=demo
    5 spring.http.encoding.charset=UTF-8
    6 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
    7 spring.jackson.time-zone=GMT+8
    8 spring.jackson.default-property-inclusion=NON_NULL

    日志配置文件

    1 <?xml version="1.0" encoding="UTF-8"?>
     2 <Configuration status="INFO" schema="Log4J-V2.0.xsd" monitorInterval="600">
     3     <Properties>
     4         <!--        日志要输出的文件-->
     5         <Property name="LOG_HOME">logs</Property>
     6         <!--        项目名称-->
     7         <property name="FILE_NAME">demo</property>
     8         <!--        日志输出格式-->
     9         <property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger]
    10             [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n
    11         </property>
    12     </Properties>
    13     <Appenders>
    14         <Console name="CONSOLE" target="SYSTEM_OUT">
    15             <PatternLayout pattern="${patternLayout}"/>
    16         </Console>
    17         <RollingRandomAccessFile name="appAppender" fileName="${LOG_HOME}/all-${FILE_NAME}.log"
    18                                  filePattern="${LOG_HOME}/all-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log">
    19             <PatternLayout pattern="${patternLayout}"/>
    20             <Policies>
    21                 <TimeBasedTriggeringPolicy interval="1"/>
    22                 <SizeBasedTriggeringPolicy size="500MB"/>
    23             </Policies>
    24             <DefaultRolloverStrategy max="20"/>
    25         </RollingRandomAccessFile>
    26         <RollingRandomAccessFile name="errorAppender" fileName="${LOG_HOME}/err-${FILE_NAME}.log"
    27                                  filePattern="${LOG_HOME}/err-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log">
    28             <PatternLayout pattern="${patternLayout}"/>
    29             <Filters>
    30                 <ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/>
    31             </Filters>
    32             <Policies>
    33                 <TimeBasedTriggeringPolicy interval="1"/>
    34                 <SizeBasedTriggeringPolicy size="500MB"/>
    35             </Policies>
    36             <DefaultRolloverStrategy max="20"/>
    37         </RollingRandomAccessFile>
    38     </Appenders>
    39     <Loggers>
    40         <!-- 业务相关 异步logger -->
    41         <AsyncLogger name="com.tianjh.*" level="info" includeLocation="true">
    42             <AppenderRef ref="appAppender"/>
    43         </AsyncLogger>
    44         <AsyncLogger name="com.tianjh.*" level="info" includeLocation="true">
    45             <AppenderRef ref="errorAppender"/>
    46         </AsyncLogger>
    47         <Root level="info">
    48             <Appender-Ref ref="CONSOLE"/>
    49             <Appender-Ref ref="appAppender"/>
    50             <AppenderRef ref="errorAppender"/>
    51         </Root>
    52     </Loggers>
    53 </Configuration>

     打包上传到192.168.232.6这台服务器进行运行

    运行之后调用该项目的index方法

     

    在项目指定的文件夹里生成了咋们所要的日志文件

    参考前面的链接安装好所有环境后,filebeat、kafka、Logstash、es都应该配置好了接下来就结合filebeat(生产者-Producer)、kafka(broker)、Logstash(消费者-Consumer)实现fielbeat从demo.jar项目输出的日志文件logs下读取all-demo.log 、err-demo.log两个日志文件,然后把相应日志数据发送到kafka中,再由Logstash到Kafka中获取数据进行消费。

    在这个过程中需要在kafka中新增两个topic 

    ./kafka-topics.sh --zookeeper 192.168.232.3:2181 --create --topic all-log-demo --partitions 1 --replication-factor 1 
    
    ./kafka-topics.sh --zookeeper 192.168.232.3:2181 --create --topic err-log-demo --partitions 1 --replication-factor 1 

    也就是采集到的all-demo.log日志数据放入topic:all-log-demo 中,而采集到的err-demo.log日志数据放入topic:err-log-demo 中。

    五、测试

    启动demo.jar、filebeat、kafka、logstash

    启动demo.jar之前先删除掉之前测试的日志文件all-demo.log 、err-demo.log

    启动demo.jar

    启动kafka

     

     启动Logstash

     

    随后通过浏览器访问:

    http://192.168.232.6:8001/err

    http://192.168.232.6:8001/index 两个地址来调用index、err方法打印日志文件。

    demo.jar 项目控制台输出了日志---调用index方法

    demo.jar 项目控制台输出了日志---调用err方法

    在调用上述两个方法之后,filebeat会将日志数据发送到kafka,通过使用如下的命令进行查看消费情况:

     ./kafka-consumer-groups.sh --bootstrap-server 192.168.232.3:9092 --describe --group all-log-group 
    
     ./kafka-consumer-groups.sh --bootstrap-server 192.168.232.3:9092 --describe --group err-log-group 

    在kafka上也有相应的日志消费记录情况,( all-log-group、err-log-group )是在Logstash中进行配置的。

     

    通过前面的几个流程之后,现在日志数据到达了kafka-broker之上,现在就需要用logstash来进行消费数据,logstash上也实时进行数据消费,如下图所示是全量日志过滤:

    all-log-demo.log 日志

     

    err-log-demo.log 日志

    通过上述的一系列操作,简单的实现了日志数据的生成、采集、过滤。这其中最为重要也最核心的地方就是kafka,利用kafka的高性能来缓存filebeat生成的海量数据,从而让logstash慢慢的进行消费,当然上面的例子并不能体现出kafka处理海量数据的能力。

    下一章: ELK+Kafka+Beats实现海量日志收集平台(三)

     

    展开全文
  • ELK+Kafka+Beats实现海量日志收集平台(三) 目录 六、将日志数据存储到Elasticsearch 七、Kibana展示  六、将日志数据存储到Elasticsearch  通过前面的步骤实现了日志数据的生产、收集和过滤。接下来就将...

              ELK+Kafka+Beats实现海量日志收集平台(三)

    目录

    六、将日志数据存储到Elasticsearch

    七、Kibana展示  


    六、将日志数据存储到Elasticsearch

      通过前面的步骤实现了日志数据的生产、收集和过滤。接下来就将收集之后的日志数据信息持久化到ElasticSearch上,然后在结合Kibana最终显示。

      启动ElasticSearch集群,之前提供的ES集群安装链接是7.6.2版本,所以要自己替换版本到6.6.0进行安装,6.6.0的安装配置和7.6.2版本的有些区别,配置信息如下:  

           

      搭建好之后的信息如下:

      

      Elasticsearch Head 插件显示如下:

      

    七、Kibana展示  

      启动Kibana,进如Kibana安装过后的目录下使用如下命令启动

         nohup ../bin/kibana & 

      紧接着来修改一下logstash的配置文件,在之前的基础上我们将收集到的日志数据输出控制台上了,现在我们要将日志数据存储到ES集群上。打开logstash配置文件添加如下:

      

      在配置文件中,配置了输出到Elasticsearch上。重启logstash,可以看到启动日志输出如下表明logstash连接ES成功。

      

      接着通过访问 /err 、/index两个方法来产生日志信息,随后利用Elasticsearch head插件查看ES集群信息如下:

      

      发现建立了两个索引,err-log-demo-* 、all-log-demo-* 这两个索引分别就是之前设置的错误日志索引和全量日志索引。

      再进入到Kibana界面查看,安装下列步骤进入界面

      

      随后,看到如下界面:

      

      创建index,比如all-log-demo* 如果有对应的索引就会进行匹配

      

      接着下一步:

      

      同理创建err-log-demo*,随后按照如下操作进行查看:

      

      查看的是err-log-demo*,这里其实就记录了我们访问demo.jar 的 /err 方法产生的那条错误日志。

      

      至此,咋们的ELK环境搭建及日志收集彻底实现了。

    展开全文
  • elk + kafka + filebeat搭建日志分析系统-附件资源
  • SpringBoot 整合 ELK+Kafka 所需镜像及 docker-compose.yml 集合打包,用于提供某篇文章中所提及的 Ubuntu 镜像资源
  • elk+kafka+zookeeper整体架构,安装部署 kafka架构组件概念详解 Zookeeper 在 Kafka 中的作用
  • ELK + Kafka + Filebeat学习

    千次阅读 2018-05-16 11:06:35
    ELK + Kafka + Filebeat学习目录:1、ELK + Kafka + Filebeat架构2、为什么要使用ELK3、验证流程:Filebeat-&gt;Logstash-&gt;Kafka-&gt;Logstash-&gt;Elasticsearch-&gt;Kibana1 ELK + Kafka ...
  • ELK+kafka+filebeat搭建生产ELFK集群ELK 架构介绍 集群服务版本 服务 版本 java 1.8.0_221 elasticsearch 7.10.1 filebeat 7.10.1 kibana 7.10.1 logstash 7.10.1 cerebro 0.9.2-1 kafka 2.12-...
  • ELK+kafka

    千次阅读 2019-06-04 18:28:36
    上面架构图分为五层,详细解释如下: 第一层,数据采集层 ... logstash服务把接收到的日志通过格式处理,转存到本地的kafka broker+zookeeper集群中。 第三层,数据转发层 这个单独的logstash节点会...
  • elk + Kafka 日志分析

    2020-12-31 16:31:04
    ELK + Kafka 日志分析 Kafka 日志接收 + 异步处理 Logstash 获取日志 + 发送日志 Elasticsearch 存储日志 + 分析日志 Kibana 查询展示 创建容器的命令 docker run -d --name kgc_kafka -p 9092:9092 --network kgc_...
  • 日志收集系统ELK+kafka部署 文章目录日志收集系统ELK+kafka部署一、系统准备1.1 部署简介1.2 升级内核1.3 内核优化1.4 安装NTP服务1.5 关闭swap分区二、部署ISCSI2.1 配置数据网络-iscsi服务端2.2 配置数据网络-...
  • ELK+kafka+filebeat7.8.0监控nginx和tomcat 环境介绍: 安装软件 主机名 IP地址 系统版本 Elasticsearch/kafka es-kafka 10.8.161.2 centos7.4--4G head/Kibana/kafka head-kibana-...
  • 折腾ELK+kafka+zk

    2019-05-02 15:14:00
    回顾前大半年: 1.kubespray搭建K8S集群 2.openVPN 搭建 ...致使我们成都的项目一直稳定运行,上周被安排去上海协助公司将项目迁移到公司云平台,一周接触到openshift,ELK+kafka+zk+prometheus上海的...
  • Spring Boot+ELK+Kafka

    2020-04-15 00:13:37
    (1)ELK+Kafka (2) 第三方 Elatsicsearch 1.分布式搜索引擎 2.特性 (1)分布式实时全文搜索引擎 (2)分布式实时分析搜索引擎 (3)分布式实时大数据处理引擎 Kibana 1. Elastic Stack 成员 2.数据分析和可视...
  • ELK+Kafka分布式日志

    2019-11-05 22:45:15
    ELK+Kafka分布式日志 将所有的日志发送到一个平台 Elasticsearch 搜索引擎 Logstash 日志框架 Kibana 图形化界面 Kafka 消息中间件 多个应用系统将日志发送到一个平台Kafka上,通过Logstash(输入输出)...
  • ELK+Kafka搭建#####部署图------------#####服务节点对应图elk0 192.168.182.50 filebeats 日志收集客户端elk1 192.168.182.51 kafka+zkelk2 192.168.182.52 kafka+zk+es(data_node)elk3 192.168.182.53 kafka+zk+es...
  • ELK+Kafka实现日志收集系统 背景: 主要针对,分布式项目!对多个模块的日志进行管理… 在传统项目中,如果在生产环境中,有多台不同的服务器集群, 如果生产环境需要通过日志定位项目的Bug的话,需要在每台节点上...
  • elk+kafka搭建教程

    2020-09-18 17:06:07
    elk+kafka搭建教程一、搭建kafka集群二、启动elasticsearch1、elasticsearch集群2、elasticsearch单机配置文件如下3、启动elasticsearch三、启动filebeat1、修改配置文件filebeat.yml2、filebeat重载功能实现(不用...
  • ELK7.5+KAFKA超详细安装文档和资源包
  • 0x00 概述 关于如何搭建ELK部分,请参考... 该篇用户为非root,使用用户为“elk”。 基于以前ELK架构的基础,结合Kafka队列,实现了ELK+Kafka集群,整体架构如下: # 1. 两台es组成es集群;( 以下对elasti...
  • ELK+kafka+filebeat企业内部日志分析系统 elk:日志搜集平台 ELK由ElasticSearch、Logstash和Kiabana三个开源工具组成: [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BLCPmQgN-...
  • 内部日志分析系统ELK+kafka+filebeat搭建 如需看各个名词的含义可以点击下方链接: https://blog.csdn.net/weixin_46081669/article/details/113446902 本篇文章为学习过程中的汇总,可能写的会很乱,这里先行道歉!...
  • ELK+kafka集群实战

    2020-11-03 20:39:35
    ELK+Kafka集群 前言 前言 业务层可以直接写入到kafka队列中,不用担心elasticsearch的写入效率问题。 图示 Kafka Apache kafka是消息中间件的一种,是一种分布式的,基于发布/订阅的消息系统。能实现一个为处理实时...
  • ELK(13):ELK+kafka+filebeat(海量日志) https://blog.51cto.com/cloumn/blog/240 https://www.cnblogs.com/whych/p/9958188.html 使用filebeat收集日志直接写入kafka,然后再由logstash从kafka读取写到elastic...
  • ELK+KAFKA+XPACK+filebeat安装使用教程(史上最全)(上)一、架构图:二、主机信息三、基础环境配置四、安装并配置Elasticsearch五、安装Elasticsearch-head插件六、安装Kibana七、安装Logstash八、安装zookeeper和...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,476
精华内容 590
关键字:

elk+kafka