精华内容
下载资源
问答
  • 一、ELK应用背景: 二、ELK平台概述: 三、ELK核心组件: 四、案例:部署ELK分布式日志分析平台: 五、案例:应用ELK日志分析平台进行分析nginx日志:
  • 由于微服务是隔离的,因此它们不共享数据库和日志文件,因此实时搜索,分析和查看日志数据变得充满挑战。 这就是ELK堆栈的救援之处。 2. ELK 它是三个开源产品的集合: 弹性搜索是基于JSON的NoSQL数据库 Logst....

    elk 聚合日志

    1.简介

    随着微服务的使用,创建稳定的分布式应用程序和摆脱许多遗留问题变得很容易。

    但是微服务的使用也带来了一些挑战, 分布式日志管理就是其中之一。

    由于微服务是隔离的,因此它们不共享数据库和日志文件,因此实时搜索,分析和查看日志数据变得充满挑战。

    这就是ELK堆栈的救援之处。

    2. ELK

    它是三个开源产品的集合:

    • 弹性搜索是基于JSON的NoSQL数据库
    • Logstash一个日志管道工具,可从各种来源获取输入,执行不同的转换并将数据导出到各种目标(此处为弹性搜索)
    • Kibana是可视化层,可在弹性搜索的基础上工作

    请参考下面给出的架构:



    ELK堆栈

    日志存储从微服务中获取日志。

    提取的日志将转换为JSON并提供给弹性搜索。

    开发人员可以使用Kibana查看弹性搜索中存在的日志。

    3.安装ELK

    ELK是基于Java的。

    在安装ELK之前,必须确保已JAVA_HOMEPATH ,并且已使用JDK 1.8完成安装。

    3.1 Elasticsearch

    • 可以从下载页面下载最新版本的Elasticsearch,并将其解压缩到任何文件夹中
    • 可以使用bin\elasticsearch.bat从命令提示符处执行它
    • 默认情况下,它将从http:// localhost:9200开始

    3.2基巴纳

    • 可以从下载页面下载最新版本的Kibana,并且可以将其提取到任何文件夹中
    • 可以使用bin\kibana.bat在命令提示符下执行它
    • 成功启动后,Kibana将在默认端口5601上启动,并且Kibana UI将位于http:// localhost:5601

    3.3 Logstash

    • 可以从下载页面下载最新版本的Logstash,并将其解压缩到任何文件夹中
    • 根据配置说明创建一个文件cst_logstash.conf
    • 可以使用bin/logstash -f cst_logstash.conf在命令提示符下执行以启动logstash

    4.创建一个示例微服务组件

    创建微服务是必需的,以便logstash可以指向API日志。

    下面的清单显示了示例微服务的代码。

    pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    	<groupId>com.xyz.app</groupId>
    	<artifactId>ArtDemo1001_Rest_Controller_Full_Deployment_Logging</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<!-- Add Spring repositories -->
    	<!-- (you don't need this if you are using a .RELEASE version) -->
    	<repositories>
    		<repository>
    			<id>spring-snapshots</id>
    			<url>http://repo.spring.io/snapshot</url>
    			<snapshots>
    				<enabled>true</enabled>
    			</snapshots>
    		</repository>
    		<repository>
    			<id>spring-milestones</id>
    			<url>http://repo.spring.io/milestone</url>
    		</repository>
    	</repositories>
    	<pluginRepositories>
    		<pluginRepository>
    			<id>spring-snapshots</id>
    			<url>http://repo.spring.io/snapshot</url>
    		</pluginRepository>
    		<pluginRepository>
    			<id>spring-milestones</id>
    			<url>http://repo.spring.io/milestone</url>
    		</pluginRepository>
    	</pluginRepositories>
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>1.5.2.RELEASE</version>
    	</parent>
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    		<java.version>1.8</java.version>
    		<spring-cloud.version>Dalston.SR3</spring-cloud.version>
    	</properties>
    	<!-- Add typical dependencies for a web application -->
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-web</artifactId>
    		</dependency>
    	</dependencies>
    
    	<!-- Package as an executable jar -->
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    		</plugins>
    	</build>
    
    	<dependencyManagement>
    		<dependencies>
    			<dependency>
    				<groupId>org.springframework.cloud</groupId>
    				<artifactId>spring-cloud-dependencies</artifactId>
    				<version>${spring-cloud.version}</version>
    				<type>pom</type>
    				<scope>import</scope>
    			</dependency>
    		</dependencies>
    	</dependencyManagement>
    
    </project>

    上面的pom.xml代码已配置了基于Spring Boot的项目所需的依赖项。

    EmployeeDAO.java

    package com.xyz.app.dao;
    
    import java.util.Collection;
    import java.util.LinkedHashMap;
    import java.util.Map;
    
    import org.springframework.stereotype.Repository;
    
    import com.xyz.app.model.Employee;
    
    @Repository
    public class EmployeeDAO {
    	/**
    	 * Map is used to Replace the Database 
    	 * */
    	static public Map<Integer,Employee> mapOfEmloyees = 
                     new LinkedHashMap<Integer,Employee>();
    	static int count=10004;
    	static
    	{
    		mapOfEmloyees.put(10001, new Employee("Jack",10001,12345.6,1001));
    		mapOfEmloyees.put(10002, new Employee("Justin",10002,12355.6,1002));
    		mapOfEmloyees.put(10003, new Employee("Eric",10003,12445.6,1003));
    	}
    	
    	/**
    	 * Returns all the Existing Employees
    	 * */
    	public Collection getAllEmployee(){
    		return mapOfEmloyees.values();			
    	}
    	
    
    	/**Get Employee details using EmployeeId .
    	 * Returns an Employee object response with Data if Employee is Found
    	 * Else returns a null
    	 * */
    	public Employee getEmployeeDetailsById(int id){
    		return mapOfEmloyees.get(id);
    	}
    	/**Create Employee details.
    	 * Returns auto-generated Id
    	 * */
    	public Integer addEmployee(Employee employee){
    		count++;
    		employee.setEmployeeId(count);
    		mapOfEmloyees.put(count, employee);
    		return count;
    	}
    	
    	/**Update the Employee details,
    	 * Receives the Employee Object and returns the updated Details  
    	 * */
    	public Employee updateEmployee (Employee employee){
    		mapOfEmloyees.put(employee.getEmployeeId(), employee);
    		return employee;
    	}
    	/**Delete the Employee details,
    	 * Receives the EmployeeID and returns the deleted employee's Details  
    	 * */
    	public Employee removeEmployee (int id){
    		Employee emp= mapOfEmloyees.remove(id);
    		return emp;
    	}
    	
    }

    上面的代码表示应用程序的DAO层。

    CRUD操作在包含Employee对象的Map集合上执行,以避免数据库依赖性并保持应用程序轻巧。

    EmployeeController.java

    package com.xyz.app.controller;
    
    import java.util.Collection;
    
    import org.apache.log4j.Logger;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.http.HttpStatus;
    import org.springframework.http.MediaType;
    import org.springframework.http.ResponseEntity;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.RestController;
    
    import com.xyz.app.dao.EmployeeDAO;
    import com.xyz.app.model.Employee;
    
    @RestController
    public class EmployeeController {
    	
    	@Autowired 
    	private EmployeeDAO employeeDAO;
    	
    	
    	public static Logger logger = Logger.getLogger(EmployeeController.class);
    	
    	/** Method is used to get all the employee details and return the same 
    	 */ 
    	@RequestMapping(value="emp/controller/getDetails",method=RequestMethod.GET,produces=MediaType.APPLICATION_JSON_VALUE)
    	public ResponseEntity<Collection> getEmployeeDetails(){
    		logger.info("From Producer method[getEmployeeDetails] start");
    			logger.debug("From Producer method[getEmployeeDetails] start");
    			Collection  listEmployee =employeeDAO.getAllEmployee();
    			logger.debug("From Producer method[getEmployeeDetails] start");
    		logger.info("From Producer method[getEmployeeDetails] end");
    		return new ResponseEntity<Collection>(listEmployee, HttpStatus.OK);
    	}
    	/** Method finds an employee using employeeId and returns the found Employee 
    		If no employee is not existing corresponding to the employeeId, 
    		then null is returned with HttpStatus.INTERNAL_SERVER_ERROR as status
    	 */ 
    	@RequestMapping(value="emp/controller/getDetailsById/{id}",method=RequestMethod.GET,produces=MediaType.APPLICATION_JSON_VALUE)
    	public ResponseEntity getEmployeeDetailByEmployeeId(@PathVariable("id") int myId){
    		logger.info("From Producer method[getEmployeeDetailByEmployeeId] start");
    		Employee employee = employeeDAO.getEmployeeDetailsById(myId);
    		if(employee!=null)
    		{
    			logger.info("From Producer method[getEmployeeDetailByEmployeeId] end");
    			return new ResponseEntity(employee,HttpStatus.OK);
    		}
    		else
    		{
    			logger.info("From Producer method[getEmployeeDetailByEmployeeId] end");
    			return new ResponseEntity(HttpStatus.NOT_FOUND);
    		}
    		
    	}
    	
    	/** Method creates an employee and returns the auto-generated employeeId */ 
    	@RequestMapping(value="/emp/controller/addEmp",
    			method=RequestMethod.POST,
    			consumes=MediaType.APPLICATION_JSON_VALUE,
    			produces=MediaType.TEXT_HTML_VALUE)
    	public ResponseEntity addEmployee(@RequestBody Employee employee){
    		logger.info("From Producer method[addEmployee] start");
    			logger.debug("From Producer method[addEmployee] start");
    			int empId= employeeDAO.addEmployee(employee);
    			logger.debug("From Producer method[addEmployee] start");
    		logger.info("From Producer method[addEmployee] end");
    		return new ResponseEntity("Employee added successfully with id:"+empId,HttpStatus.CREATED);
    	}
    
    	/** Method updates an employee and returns the updated Employee 
     		If Employee to be updated is not existing, then null is returned with 
     		HttpStatus.INTERNAL_SERVER_ERROR as status
    	 */ 
    	@RequestMapping(value="/emp/controller/updateEmp",
    			method=RequestMethod.PUT,
    			consumes=MediaType.APPLICATION_JSON_VALUE,
    			produces=MediaType.APPLICATION_JSON_VALUE)
    	public ResponseEntity updateEmployee(@RequestBody Employee employee){
    		logger.info("From Producer method[updateEmployee] start");
    		if(employeeDAO.getEmployeeDetailsById(employee.getEmployeeId())==null){
    			Employee employee2=null;
    			return new ResponseEntity(employee2,HttpStatus.INTERNAL_SERVER_ERROR);
    		}
    		System.out.println(employee);
    		employeeDAO.updateEmployee(employee);
    		logger.info("From Producer method[updateEmployee] end");
    		return new ResponseEntity(employee,HttpStatus.OK);
    	}
    	
    	/** Method deletes an employee using employeeId and returns the deleted Employee 
    	 	If Employee to be deleted is not existing, then null is returned with 
    	 	HttpStatus.INTERNAL_SERVER_ERROR as status
    	 */ 
    	@RequestMapping(value="/emp/controller/deleteEmp/{id}",
    			method=RequestMethod.DELETE,
    			produces=MediaType.APPLICATION_JSON_VALUE)
    	public ResponseEntity deleteEmployee(@PathVariable("id") int myId){
    		logger.info("From Producer method[deleteEmployee] start");
    		if(employeeDAO.getEmployeeDetailsById(myId)==null){
    			Employee employee2=null;
    			return new ResponseEntity(employee2,HttpStatus.INTERNAL_SERVER_ERROR);
    		}
    		Employee employee = employeeDAO.removeEmployee(myId);
    		System.out.println("Removed: "+employee);
    		logger.info("From Producer method[deleteEmployee] end");
    		return new ResponseEntity(employee,HttpStatus.OK);
    	}
    }

    上面的代码代表具有请求处理程序的应用程序的控制器层。

    请求处理程序调用DAO层函数并执行CRUD操作。

    application.properties

    server.port = 8090
     logging.level.com.xyz.app.controller.EmployeeController=DEBUG
     #name of the log file to be created
     #same file will be given as input to logstash
     logging.file=app.log
     spring.application.name = producer

    上面的代码代表为基于Spring Boot的应用程序配置的属性。

    5. Logstash配置

    如3.3节所述,需要为logstash创建配置文件。

    logstash将使用此配置文件从微服务日志中获取输入。

    日志被转换为JSON并馈入elasticsearch。

    cst_logstash.conf

    input {
      file {
        # If more than one log files from different microservices have to be tracked then a comma-separated list of log files can 
        # be provided
        path => ["PATH-TO-UPDATE/app.log"]
        codec => multiline {
          pattern => "^%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{TIME}.*"
          negate => "true"
          what => "previous"
        }
      }
    }
    output {
      stdout {
        codec => rubydebug
      }
      # Sending properly parsed log events to elasticsearch
      elasticsearch {
        hosts => ["localhost:9200"]
      }
    }

    上面的logstash配置文件侦听日志文件,并将日志消息推送到弹性搜索。

    注意 :根据您的设置更改日志路径。

    6.执行与输出

    6.1为日志执行微服务

    可以使用clean install spring-boot:run部署Spring Boot应用程序,并可以从浏览器或邮递员客户端访问以下URL: http:// localhost:8090 / emp / controller / getDetails

    这将击中微服务并在微服务方面生成日志。

    这些日志将由logstash读取,并推送到弹性搜索中,此外,可以使用Kibana进行后续步骤来查看这些日志。

    6.2在Kibana上查看输出的步骤

    Kibana Index Creation- 1
    • 单击下一步,将显示以下屏幕
    Kibana Index Creation- 2

    选择上面突出显示的选项,然后单击“创建索引模式”

    • 从左侧菜单中选择“发现”选项后,页面显示如下:
    在Kibana-1上查看日志
    • 可以根据上面突出显示的属性来可视化和过滤日志。 将鼠标悬停在任何属性上后,将显示该属性的“添加”按钮。 在这里,选择消息属性视图如下所示:
    在Kibana- 2上查看日志

    7.参考

    8.下载Eclipse项目

    下载您可以在此处下载此示例的完整源代码: microservice

    翻译自: https://www.javacodegeeks.com/2018/12/log-aggregation-using-elk-stack.html

    elk 聚合日志

    展开全文
  • ELK-日志分析系统

    万次阅读 2020-09-23 17:14:47
    为什么要建立日志分析系统: 当我们需要进行日志分析场景:直接在日志文件中 grep、awk 就可以获得自己想要的信息。但在规模较大的场景中,此方法效率低下,面临问题包括日志量太大如何归档、文本搜索太慢怎么办、...

    微信搜索:“二十同学” 公众号,欢迎关注一条不一样的成长之路

    为什么要建立日志分析系统:

    当我们需要进行日志分析场景:直接在日志文件中 grep、awk 就可以获得自己想要的信息。但在规模较大的场景中,此方法效率低下,面临问题包括日志量太大如何归档、文本搜索太慢怎么办、如何多维度查询。需要集中化的日志管理,所有服务器上的日志收集汇总。

    解决办法是建立集中式日志收集系统,将所有节点上的日志统一收集,管理,访问。

    一般大型系统是一个分布式部署的架构,不同的服务模块部署在不同的服务器上,问题出现时,大部分情况需要根据问题暴露的关键信息,定位到具体的服务器和服务模块,构建一套集中式日志系统,可以提高定位问题的效率。

    一个完整的集中式日志系统,需要包含以下几个主要特点:

    • 收集-能够采集多种来源的日志数据
    • 传输-能够稳定的把日志数据传输到中央系统
    • 存储-如何存储日志数据
    • 分析-可以支持 UI 分析
    • 警告-能够提供错误报告,监控机制

    ELK提供了一整套解决方案,并且都是开源软件,之间互相配合使用,完美衔接,高效的满足了很多场合的应用。目前主流的一种日志系统。

    ELK简介:

    ELK是三个开源软件的缩写,分别表示:Elasticsearch , Logstash, Kibana , 它们都是开源软件。新增了一个FileBeat,它是一个轻量级的日志收集处理工具(Agent),Filebeat占用资源少,适合于在各个服务器上搜集日志后传输给Logstash,官方也推荐此工具。

    Elasticsearch是个开源分布式搜索引擎,提供搜集、分析、存储数据三大功能。它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。

    Logstash 主要是用来日志的搜集、分析、过滤日志的工具,支持大量的数据获取方式。一般工作方式为c/s架构,client端安装在需要收集日志的主机上,server端负责将收到的各节点日志进行过滤、修改等操作在一并发往elasticsearch上去。

    Kibana 也是一个开源和免费的工具,Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助汇总、分析和搜索重要数据日志。

    Kibana示例图

    Filebeat隶属于Beats。目前Beats包含四种工具:

    1. Packetbeat(搜集网络流量数据)
    2. Topbeat(搜集系统、进程和文件系统级别的 CPU 和内存使用情况等数据)
    3. Filebeat(搜集文件数据)
    4. Winlogbeat(搜集 Windows 事件日志数据)

    ELK用途:

    传统意义上,ELK是作为替代Splunk的一个开源解决方案。Splunk 是日志分析领域的领导者。日志分析并不仅仅包括系统产生的错误日志,异常,也包括业务逻辑,或者任何文本类的分析。而基于日志的分析,能够在其上产生非常多的解决方案,譬如:

    1.问题排查。我们常说,运维和开发这一辈子无非就是和问题在战斗,所以这个说起来很朴实的四个字,其实是沉甸甸的。很多公司其实不缺钱,就要稳定,而要稳定,就要运维和开发能够快速的定位问题,甚至防微杜渐,把问题杀死在摇篮里。日志分析技术显然问题排查的基石。基于日志做问题排查,还有一个很帅的技术,叫全链路追踪,比如阿里的eagleeye 或者Google的dapper,也算是日志分析技术里的一种。
    2.监控和预警。 日志,监控,预警是相辅相成的。基于日志的监控,预警使得运维有自己的机械战队,大大节省人力以及延长运维的寿命。
    3.关联事件。多个数据源产生的日志进行联动分析,通过某种分析算法,就能够解决生活中各个问题。比如金融里的风险欺诈等。这个可以可以应用到无数领域了,取决于你的想象力。
    4.数据分析。 这个对于数据分析师,还有算法工程师都是有所裨益的。

    官方文档:

    Filebeat:

    https://www.elastic.co/cn/products/beats/filebeat
    https://www.elastic.co/guide/en/beats/filebeat/5.6/index.html

    Logstash:
    https://www.elastic.co/cn/products/logstash
    https://www.elastic.co/guide/en/logstash/5.6/index.html

    Kibana:

    https://www.elastic.co/cn/products/kibana

    https://www.elastic.co/guide/en/kibana/5.5/index.html

    Elasticsearch:
    https://www.elastic.co/cn/products/elasticsearch
    https://www.elastic.co/guide/en/elasticsearch/reference/5.6/index.html

    elasticsearch中文社区:
    https://elasticsearch.cn/

    ELK架构图:

    架构图一:

    这是最简单的一种ELK架构方式。优点是搭建简单,易于上手。缺点是Logstash耗资源较大,运行占用CPU和内存高。另外没有消息队列缓存,存在数据丢失隐患。

    此架构由Logstash分布于各个节点上搜集相关日志、数据,并经过分析、过滤后发送给远端服务器上的Elasticsearch进行存储。Elasticsearch将数据以分片的形式压缩存储并提供多种API供用户查询,操作。用户亦可以更直观的通过配置Kibana Web方便的对日志查询,并根据数据生成报表。

    架构图二:

    此种架构引入了消息队列机制,位于各个节点上的Logstash Agent先将数据/日志传递给Kafka(或者Redis),并将队列中消息或数据间接传递给Logstash,Logstash过滤、分析后将数据传递给Elasticsearch存储。最后由Kibana将日志和数据呈现给用户。因为引入了Kafka(或者Redis),所以即使远端Logstash server因故障停止运行,数据将会先被存储下来,从而避免数据丢失。

    架构图三:

    此种架构将收集端logstash替换为beats,更灵活,消耗资源更少,扩展性更强。同时可配置Logstash 和Elasticsearch 集群用于支持大集群系统的运维日志数据监控和查询。

    Filebeat工作原理:

    Filebeat由两个主要组件组成:prospectors 和 harvesters。这两个组件协同工作将文件变动发送到指定的输出中。

    Harvester(收割机):负责读取单个文件内容。每个文件会启动一个Harvester,每个Harvester会逐行读取各个文件,并将文件内容发送到制定输出中。Harvester负责打开和关闭文件,意味在Harvester运行的时候,文件描述符处于打开状态,如果文件在收集中被重命名或者被删除,Filebeat会继续读取此文件。所以在Harvester关闭之前,磁盘不会被释放。默认情况filebeat会保持文件打开的状态,直到达到close_inactive(如果此选项开启,filebeat会在指定时间内将不再更新的文件句柄关闭,时间从harvester读取最后一行的时间开始计时。若文件句柄被关闭后,文件发生变化,则会启动一个新的harvester。关闭文件句柄的时间不取决于文件的修改时间,若此参数配置不当,则可能发生日志不实时的情况,由scan_frequency参数决定,默认10s。Harvester使用内部时间戳来记录文件最后被收集的时间。例如:设置5m,则在Harvester读取文件的最后一行之后,开始倒计时5分钟,若5分钟内文件无变化,则关闭文件句柄。默认5m)。

    Prospector(勘测者):负责管理Harvester并找到所有读取源。

    filebeat.prospectors:
    
    - input_type: log
    
      paths:
    
        - /apps/logs/*/info.log

    Prospector会找到/apps/logs/*目录下的所有info.log文件,并为每个文件启动一个Harvester。Prospector会检查每个文件,看Harvester是否已经启动,是否需要启动,或者文件是否可以忽略。若Harvester关闭,只有在文件大小发生变化的时候Prospector才会执行检查。只能检测本地的文件。

    Filebeat如何记录文件状态:

    将文件状态记录在文件中(默认在/var/lib/filebeat/registry)。此状态可以记住Harvester收集文件的偏移量。若连接不上输出设备,如ES等,filebeat会记录发送前的最后一行,并再可以连接的时候继续发送。Filebeat在运行的时候,Prospector状态会被记录在内存中。Filebeat重启的时候,利用registry记录的状态来进行重建,用来还原到重启之前的状态。每个Prospector会为每个找到的文件记录一个状态,对于每个文件,Filebeat存储唯一标识符以检测文件是否先前被收集。

    Filebeat如何保证事件至少被输出一次:

    Filebeat之所以能保证事件至少被传递到配置的输出一次,没有数据丢失,是因为filebeat将每个事件的传递状态保存在文件中。在未得到输出方确认时,filebeat会尝试一直发送,直到得到回应。若filebeat在传输过程中被关闭,则不会再关闭之前确认所有时事件。任何在filebeat关闭之前为确认的时间,都会在filebeat重启之后重新发送。这可确保至少发送一次,但有可能会重复。可通过设置shutdown_timeout参数来设置关闭之前的等待事件回应的时间(默认禁用)。

     

    Logstash工作原理:

    Logstash事件处理有三个阶段:inputs → filters → outputs。是一个接收,处理,转发日志的工具。支持系统日志,webserver日志,错误日志,应用日志,总之包括所有可以抛出来的日志类型。

     

    Input:输入数据到logstash。

    一些常用的输入为:

    file:从文件系统的文件中读取,类似于tail -f命令

    syslog:在514端口上监听系统日志消息,并根据RFC3164标准进行解析

    redis:从redis service中读取

    beats:从filebeat中读取

    Filters:数据中间处理,对数据进行操作。

    一些常用的过滤器为:

    grok:解析任意文本数据,Grok 是 Logstash 最重要的插件。它的主要作用就是将文本格式的字符串,转换成为具体的结构化的数据,配合正则表达式使用。内置120多个解析语法。

    官方提供的grok表达式:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
    grok在线调试:https://grokdebug.herokuapp.com/

    mutate:对字段进行转换。例如对字段进行删除、替换、修改、重命名等。

    drop:丢弃一部分events不进行处理。

    clone:拷贝 event,这个过程中也可以添加或移除字段。

    geoip:添加地理信息(为前台kibana图形化展示使用)

    Outputs:outputs是logstash处理管道的最末端组件。一个event可以在处理过程中经过多重输出,但是一旦所有的outputs都执行结束,这个event也就完成生命周期。

    一些常见的outputs为:

    elasticsearch:可以高效的保存数据,并且能够方便和简单的进行查询。

    file:将event数据保存到文件中。

    graphite:将event数据发送到图形化组件中,一个很流行的开源存储图形化展示的组件。

    Codecs:codecs 是基于数据流的过滤器,它可以作为input,output的一部分配置。Codecs可以帮助你轻松的分割发送过来已经被序列化的数据。

    一些常见的codecs:

    json:使用json格式对数据进行编码/解码。

    multiline:将汇多个事件中数据汇总为一个单一的行。比如:java异常信息和堆栈信息。

    展开全文
  • 主要介绍了springboot向elk日志实现过程,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
  • ELK收集日志

    2020-11-19 15:49:38
    1.如果内存紧张的话可以把logstash(收集日志)做到另外一台主机上 2.先开两台主机 3.在133上面解压包,把下面的解压了 4.修改es配置文件 vim /etc/elasticsearch/elasticsearch.yml 5.修改kibana配置文件 ...

    1.如果内存紧张的话可以把logstash(收集日志)做到另外一台主机上

    2.先开两台主机

    在这里插入图片描述

    3.在133上面解压包,把下面的解压了

    在这里插入图片描述

    4.修改es配置文件 vim /etc/elasticsearch/elasticsearch.yml

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    在这里插入图片描述

    5.修改kibana配置文件 vim /etc/kibana/kibana.yml

    在这里插入图片描述
    在这里插入图片描述

    在这里插入图片描述

    6.想在哪个上面收集日志就在那个上面安装logstash 我这里把他安装到134上

    7.vim /etc/logstash/conf.d/messages.conf

    【1】
    我这个是监控两个 分别是系统日志和apache

    input {
       file {
           path => "/var/log/messages"                要监控日志的绝对路径
           type => "system-log"                       自己定义的名字
           start_position => "beginning"             
     }
       file {
           path => "/var/log/httpd/*_log"            
           type => "httpd-log"
           start_position => "beginning"
     }
    }
    output {
       if [type] == "system-log" {
           elasticsearch {
           hosts => "192.168.242.133:9200"                 把上面收集的日志信息发送到133主机上的es上
           index => "systemc-log_%{+YYYY.MM.dd}"           给上面的名字加个日期
     }
    }
       if [type] == "httpd-log" {
           elasticsearch {
           hosts => "192.168.242.133:9200"
           index => "httpd-log_%{+YYYY.MM.dd}"
     }
    }
    }
    
    

    重启logstash systemctl restart logstash

    出现9600 就可以
    在这里插入图片描述

    8.查看添加索引

    在这里插入图片描述

    展开全文
  • 构建ELK海量日志分析平台

    千人学习 2019-02-21 10:33:06
    本课程重点构建ELK海量日志分析平台,包括Filebeat多数据源采集、打标记、多行异常信息整合,Logstash数据解析、过滤、清洗,ElasticSearch对数据进行存储、搜索以及分析,Kibana实现大数据分析和数据可视化。...
  • ELK实现日志采集

    2020-07-18 14:28:37
    ELK实现操作日志记录 本文使用Filebeat监听程序日志,将日志输出给Logstash并保存到Elasticsearch 中,通过Kibana可视化界面展示日志 Elasticsearch和Kibaba安装 作者使用docker一键安装部署Elasticsearch集群和...

    ELK实现日志采集

    本文使用Filebeat采集日志,将日志输出给Logstash并保存到Elasticsearch 中,通过Kibana可视化界面展示日志
    在这里插入图片描述

    一、Elasticsearch和Kibaba安装

    作者使用docker一键安装部署Elasticsearch集群和kibana
    docker和docker-compose安装脚本:

    https://download.csdn.net/download/qq_36266502/12642914
    

    程序中resources/docker/目录下也有

    Elasticsearch和Kibana安装脚本:

    创建docker-compose文件

    vim docker-compose.yml
    
    version: '2.2'
    services:
      cerebro:
        image: lmenezes/cerebro:0.8.3
        container_name: cerebro
        ports:
          - "9000:9000"
        command:
          - -Dhosts.0.host=http://elasticsearch:9200
        networks:
          - es7net
      kibana:
        image: docker.elastic.co/kibana/kibana:7.4.2
        container_name: kibana7
        environment:
          - I18N_LOCALE=zh-CN
          - XPACK_GRAPH_ENABLED=true
          - TIMELION_ENABLED=true
          - XPACK_MONITORING_COLLECTION_ENABLED="true"
        ports:
          - "5601:5601"
        networks:
          - es7net
      elasticsearch:
        image: docker.elastic.co/elasticsearch/elasticsearch:7.4.2
        container_name: es7_01
        environment:
          - cluster.name=eslearn
          - node.name=es7_01
          - bootstrap.memory_lock=true
          - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
          - discovery.seed_hosts=es7_01,es7_02
          - cluster.initial_master_nodes=es7_01,es7_02
        ulimits:
          memlock:
            soft: -1
            hard: -1
        volumes:
          - es7data1:/usr/share/elasticsearch/data
        ports:
          - 9200:9200
        networks:
          - es7net
      elasticsearch2:
        image: docker.elastic.co/elasticsearch/elasticsearch:7.4.2
        container_name: es7_02
        environment:
          - cluster.name=eslearn
          - node.name=es7_02
          - bootstrap.memory_lock=true
          - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
          - discovery.seed_hosts=es7_01,es7_02
          - cluster.initial_master_nodes=es7_01,es7_02
        ulimits:
          memlock:
            soft: -1
            hard: -1
        volumes:
          - es7data2:/usr/share/elasticsearch/data
        networks:
          - es7net
    volumes:
      es7data1:
        driver: local
      es7data2:
        driver: local
    
    networks:
      es7net:
        driver: bridge
    
    

    启动和停止命令:

    启动:
    docker-compose up -d
    停止:
    docker-compose down
    

    二、Logstash安装配置和启动

    Logstash安装:

    Logstash配置:
    作者使用grok表达式来格式化程序 输出的日志。

    Grok 是 Logstash 最重要的插件。你可以在 grok 里直接使用或应用预定义的表达式名称,grok 支持把预定义的 grok 表达式 写入到文件中,官方提供的预定义 grok 表达式见:
    https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns
    最直观的grok语法使用参见:https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html,这里不做赘述。

    进入Logstash根目录的config目录:

    cp logstash-sample.conf logstash.conf
    vim logstash.conf
    
    input {
        beats {
            port => "5044"  ##指定监听端口
        }
    }
    filter {
    	## 这里使用grok表达式
        grok {
            patterns_dir => ["./patterns"] ## 引用自定义的表达式配置文件,内容详见下面
            match => { "message" => "%{DATE1:time1} %{LEVEL:level1} %{THREAD:thread1} %{JAVA_SOURCE:source1} - %{JAVALOGMESSAGE:doc}" }
        }
        ## 移除掉某些字段
        mutate{
            remove_field => ["ecs","host","agent","@timestamp","@version","input","tags"]
        }
    }
    output{
      elasticsearch {
        hosts =>  ['192.168.59.110:9200'] ##ES地址,集群使用逗号隔开
        index => "elk-log" ##ES索引名称
      }
    }
    

    文章中filter.grok 将grok表达式配置在外部,用文件保存,patterns内容:

    DATE1    [0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3}
    LEVEL   (INFO)|(DEBUG)|(ERROR)|(WARN)|(FATAL)
    JAVA_SOURCE [a-zA-Z.<>():0-9]*
    THREAD [\s\S]*
    

    验证日志输出 是否匹配grok表达式

    1. 编写java程序,配置logback.xml日志输出格式
      注:如果不想用程序,可以使用echo XXX >> 日志.log,日志格式下一步有
    <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
    	<!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->
    	<pattern>%date %level [%thread] %logger{60} - %msg%n</pattern>
    </encoder>
    
    1. 在Kibana开发工具中调试grok

    日志样例输出格式:

    2020-07-18 13:24:34,471 ERROR [http-nio-8080-exec-1] com.zhenglei.controller.TestControler - 55.3.244.1 GET /index.html 15824 9
    

    Grok 模式:

    %{DATE1:time1} %{LEVEL:level1} %{THREAD:thread1} %{JAVA_SOURCE:source1} - %{JAVALOGMESSAGE:doc}
    

    自定义模式:

    DATE1    [0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3}
    LEVEL   (INFO)|(DEBUG)|(ERROR)|(WARN)|(FATAL)
    JAVA_SOURCE [a-zA-Z.<>():0-9]*
    THREAD [\s\S]*
    

    最后点击“模拟”,输出结构化数据 如下图:
    在这里插入图片描述
    Logstash启动:
    在logstash根目录下执行

    nohup /bin/logstash -f /config/logstash.yml &
    

    三、Filebeat安装配置和启动

    Filebeat安装:

    Filebeat配置:
    在filebeat根目录创建filebeat-demo.yml文件:

    filebeat.inputs:
      - type: log
        paths:
        - /usr/local/elk/logs/*.log ##监听的日志目录
        multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}' ##用于匹配多行的正则表达式
        multiline.negate: true ## 是否匹配pattern的行合并到上一行
        multiline.match: after ## 匹配pattern后,与前面(before)还是后面(after)的内容合并为一条日志
    
    output.logstash:
        hosts: ["127.0.0.1:5044"] ##输出到logstash地址,端口号和logstash配置中保持一致
    

    Filebeat启动:
    在filebeat根目录下执行

    nohup ./filebeat -e -c filebeat-demo.yml &
    

    四、查看ES日志

    1. 启动Logstash
    2. 启动Filebeat
    3. 启动日志输出程序(文章最后会给程序地址)
    4. 查看Kibana中的日志,如下图:
      在这里插入图片描述

    文中程序地址:https://github.com/2767131402/demo.git

    展开全文
  • ELK分布式日志收集系统的搭建

    万次阅读 2020-03-30 14:44:48
    ELK分布式日志收集系统的搭建 filebeat+logstash+elasticsearch+kibana搭建一个分布式的日志收集系统
  • ELK分布式日志收集系统

    千次阅读 2020-07-31 15:25:15
    ELK分布式日志收集系统 一、传统系统日志收集的问题 在传统项目中,如果在生产环境中,有多台不同的服务器集群,如果生产环境需要通过日志定位项目的Bug的话,需要在每台节点上使用传统的命令方式查询,这样效率非常...
  • ELK前端日志分析 监控系统
  • ELK分布式日志解决方案

    千次阅读 2020-09-28 17:43:32
    ELK,包含三个产品,Elasticsearch、Logstash、Kibana。 在整个方案中,他们起到的作用如下: 应用服务 生产日志,通过Logger产生日志并输出。 Logstash 收集日志,通过http接收应用服务产生的日志。 Elastic...
  • ELKstack日志收集系统

    千次阅读 2016-01-05 09:35:53
    简介  ELKstack是由Elasticsearch、Logstash、Kibana三个开源软件组合而成的一款集分析、搜索、图形展示于一身的实施日志收集系统。 各个组件的功能如下: Elasticsearch:日志分布式存储、搜索工具,支持集群...
  • 清理elk日志

    千次阅读 2019-09-09 14:54:41
    对于免费版本的elk,默认情况会保留全部数据,导致数据列越来越多,如下3个月的累计数据量为27G,平均每月增加9G,需要定期清理elk日志; 解决办法 1)创建脚本 vi rm_es_log.sh chmod +x rm_es_log.sh 默认只保留...
  • 经过调研,决定尝试选用ELK分布式日志系统,如果能替代阿里云产品,则可以节省一些费用;同时也可以解决日志存数据库带来的性能问题。一举两得。 节前就曾在公司的虚拟机上尝试搭建ELK环境,可是因为版本问题一直有...
  • ELK 日志分析

    千次阅读 2020-11-22 21:49:42
    ELK二、ELK 简介三、实验部署3.1 准备工作3.2 下载并安装软件包3.3 安装 JDK(java)环境工具3.4 配置 elasticsearch3.4.1 新建 elasticsearch 用户并启动3.4.2 查看进程是否启动成功3.4.3 若出现错误可以查看日志...
  • ELK-日志收集工具nxlog

    千次阅读 2019-04-24 13:54:17
    日志收集工具nxlog 文章目录日志收集工具nxlog前言安装语法宏变量通用模块指令格式Module 模块名FlowControlInputType:指定输入类型OutputType:指定输出类型Exec: 执行命令Schedule:定时器EveryFirstExecWhenRoute...
  • 文章目录一、ELK简介1、认识ELK2、ELK架构图3、ELFK架构ELFK架构图二、ELK分布式日志平台搭建三、ELK收集标准输入日志,并在web界面显示 作为运维人员,其价值不在于部署大量服务,而在于排错及服务性能优化,优化...
  • 一、ELK日志分析系统详解 日志分析是运维工程师解决系统故障、发现问题的主要手段。日志包含多种类型,包括程序日志、系统日志以及安全日志等。通过对日志的分析,既可以做到未雨绸缪、预防故障的发生,又可以在故障...
  • ELK解决日志换行问题

    千次阅读 2020-03-22 22:42:48
    我们在部署elk完后,在kibana上发现很多ERROR日志换行情况,结果会在kibana上看到多行的error日志 我们常出现的ERROR日志如下 [INFO][2020-03-22 22:37:05,064][org.apache.commons.httpclient....
  • ELK日志分布式日志平台搭建

    千次阅读 2018-09-29 13:13:33
    首先先搭建好ELK中的ElasticSearch存储数据 和Logstash采集数据 ElasticSearch 本地快速搭建与使用 es的独立性也很强,它本身是一个分布式的搜索引擎,通过倒排序索引对存储的数据进行索引。 当输入查询关键字...
  • 下面我们ELK开源的日志管理来管理httpd日志 ELK介绍 在开源的日志管理方案中,最出名的莫过于 ELK 了。ELK 是三个软件的合称:Elasticsearch、Logstash、Kibana。 Elasticsearch 一个近乎实时...
  • 1.ELK是有Elastic公司的三个组件配合进行日志收集,分别是: ElasticSearch:用于存储日志信息。 Logstash:用于收集、处理和转发日志信息。 Kibana:提供可搜索的Web可视化界面。 当然,现在很多都配合着Beats...
  • ELK日志分析

    2020-12-07 20:57:52
    简介: 在大型运维环境中,管理员通常面对大量的服务器,对于这些服务器的维护,一个很重要的工作就是...一、ELK日志分析 1、ELK的作用和特点 1)ELK的作用 集中化日志存储管理和分析 2)ELK特点 开源 方便配置高可用
  • ELK分布式日志收集搭建和使用 大型系统分布式日志采集系统ELK 全框架 SpringBootSecurity 1、传统系统日志收集的问题 2、Logstash操作工作原理 3、分布式日志收集ELK原理 4、Elasticsearch+Logstash+Kiabana整合 5...
  • ELK前端日志分析、监控系统 前端日志与后端日志不同,具有很强的自定义特性,不像后端的接口日志、服务器日志格式比较固定,大部分成熟的后端框架都有非常完善的日志系统,借助一些分析框架,就可以实现日志的监控...
  • 基于elk日志分析平台(一)简介
  • ELK搭建日志分析系统

    千次阅读 2019-03-06 14:35:02
    记录ELK的搭建过程和遇到的问题, 所有下载链接都在官方下载地址 建议服务器配置高一些,例如2HG ES安装 下载后上传到对应的服务器位置 解压 : tar -xzvf file.tar.gz 配置磁盘,当磁盘大于95%,就只能读取,不...
  • ELK日志分析系统

    2019-12-18 19:46:27
    ELK日志分析系统 ELK日志分析系统简介 日志服务器 提高安全性 集中存放日志 缺陷 对日志的分析困难 收集数据:LogstashAgent 建立索引:ElasticSearchCluster 数据可视乎:KilbanaServer 简单的结果拓扑 ELK...
  • Elastic Stack(ELK日志系统简介及搭建 Elastic Stack(ELK日志系统后续使用及问题解决 kibana7.2添加登录及权限 kibana监控logstash 一、需求: 在我们的项目中有日志是一个必不可少的东西,但是日志的检索...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 39,129
精华内容 15,651
关键字:

elk查看日志