精华内容
下载资源
问答
  • Logstash配置文件详解

    2021-03-03 15:34:38
    logstash-使用 logstash pipeline 包含两...在生产环境使用logstash,一般使用都将配置写入文件里面,然后启动logstash。 具体参照官网:https://www.elastic.co/guide/en/logstash/7.1/index.html 处理nginx日志 # vi

    logstash-使用

    logstash pipeline 包含两个必须的元素:input和output,和一个可选元素:filter。

    从input读取事件源,(经过filter解析和处理之后),从output输出到目标存储库(elasticsearch或其他)。

    在生产环境使用logstash,一般使用都将配置写入文件里面,然后启动logstash。

    具体参照官网:https://www.elastic.co/guide/en/logstash/7.1/index.html

    处理nginx日志

    # vim nginx_access.conf
    
    input{
        file{
            path => "/var/log/nginx/access.log"
            start_position => "beginning"
            type => "nginx_access_log"
        }
    }
    filter{
        grok{
            match => {"message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) \"(?:-|%{DATA:referrer})\" \"%{DATA:user_agent}\" (?:%{IP:proxy}|-) %{DATA:upstream_addr} %{NUMBER:upstream_request_time:float} %{NUMBER:upstream_response_time:float}"}
            match => {"message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) \"%{DATA:referrer}\" \"%{DATA:user_agent}\" \"%{DATA:proxy}\""}
        }
        if [request] {
            urldecode {
                field => "request"
            }
           ruby {
               init => "@kname = ['url_path','url_arg']"
               code => "
                   new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split('?'))])
                   event.append(new_event)"
           }
            if [url_arg] {
                ruby {
                   init => "@kname = ['key', 'value']"
                   code => "event.set('url_args', event.get('url_arg').split('&').collect {|i| Hash[@kname.zip(i.split('='))]})"
                    }
            }
        }
        geoip{
            source => "clientip"
        }
        useragent{
            source => "user_agent"
            target => "ua"
            remove_field => "user_agent"
        }
        date {
            match => ["timestamp","dd/MMM/YYYY:HH:mm:ss Z"]
            locale => "en"
        }
        mutate{
            remove_field => ["message","timestamp","request","url_arg"]
        }
    }
    output{
        elasticsearch {      
            hosts => "localhost:9200"
            index => "nginx-access-log-%{+YYYY.MM.dd}"   
        }
    #  stdout {       
    #     codec => rubydebug    
    #  }
    }
    

    如果是想测试配置文件写的是否正确,用下面这个方式启动测试一下

    /usr/share/logstash/bin/logstash -t -f /etc/logstash/conf.d/nginx.conf  #测试配置文件
    Configuration OK
    /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/nginx_access.conf  #启动logstash
    

    启动logstash

    # systemctl start logstash
    

    input plugin 让logstash可以读取特定的事件源。

    官网:https://www.elastic.co/guide/en/logstash/current/input-plugins.html

    事件源可以是从stdin屏幕输入读取,可以从file指定的文件,也可以从es,filebeat,kafka,redis等读取

    • stdin 标准输入

    • file

      从文件读取数据

      file{
          path => ['/var/log/nginx/access.log']  #要输入的文件路径
          type => 'nginx_access_log'
          start_position => "beginning"
      }
      # path  可以用/var/log/*.log,/var/log/**/*.log,如果是/var/log则是/var/log/*.log
      # type 通用选项. 用于激活过滤器
      # start_position 选择logstash开始读取文件的位置,begining或者end。
      还有一些常用的例如:discover_interval,exclude,sincedb_path,sincedb_write_interval等可以参考官网
      
    • syslog

      通过网络将系统日志消息读取为事件

      syslog{
          port =>"514" 
          type => "syslog"
      }
      # port 指定监听端口(同时建立TCP/UDP514端口的监听)
      
      #从syslogs读取需要实现配置rsyslog:
      # cat /etc/rsyslog.conf   加入一行
      *.* @172.17.128.200:514   #指定日志输入到这个端口,然后logstash监听这个端口,如果有新日志输入则读取
      # service rsyslog restart   #重启日志服务
      
    • beats

      从Elastic beats接收事件

      beats {
          port => 5044   #要监听的端口
      }
      # 还有host等选项
      
      # 从beat读取需要先配置beat端,从beat输出到logstash。
      # vim /etc/filebeat/filebeat.yml 
      ..........
      output.logstash:
      hosts: ["localhost:5044"]
      
    • kafka

      将 kafka topic 中的数据读取为事件

      kafka{
          bootstrap_servers=> "kafka01:9092,kafka02:9092,kafka03:9092"
          topics => ["access_log"]
          group_id => "logstash-file"
          codec => "json"
      }
      
      kafka{
          bootstrap_servers=> "kafka01:9092,kafka02:9092,kafka03:9092"
          topics => ["weixin_log","user_log"]  
          codec => "json"
      }
      
      # bootstrap_servers 用于建立群集初始连接的Kafka实例的URL列表。
      # topics  要订阅的主题列表,kafka topics
      # group_id 消费者所属组的标识符,默认为logstash。kafka中一个主题的消息将通过相同的方式分发到Logstash的group_id
      # codec 通用选项,用于输入数据的编解码器。
      

    还有很多的input插件类型,可以参考官方文档来配置。

    filter plugin 过滤器插件,对事件执行中间处理

    • grok

      解析文本并构造 。把非结构化日志数据通过正则解析成结构化和可查询化

      grok {
                  match => {"message"=>"^%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}$"}
              }
      匹配nginx日志
      # 203.202.254.16 - - [22/Jun/2018:16:12:54 +0800] "GET / HTTP/1.1" 200 3700 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/601.7.7 (KHTML, like Gecko) Version/9.1.2 Safari/601.7.7"
      #220.181.18.96 - - [13/Jun/2015:21:14:28 +0000] "GET /blog/geekery/xvfb-firefox.html HTTP/1.1" 200 10975 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
      
    • 注意这里grok 可以有多个match匹配规则,如果前面的匹配失败可以使用后面的继续匹配。例如

       grok {
                  match => ["message", "%{IP:clientip} - %{USER:user} \[%{HTTPDATE:raw_datetime}\] \"(?:%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion})\" (?:\"%{DATA:body}\" )?(?:\"%{DATA:cookie}\" )?%{NUMBER:response} (?:%{NUMBER:bytes:int}|-) \"%{DATA:referrer}\" \"%{DATA:agent}\" (?:(%{IP:proxy},? ?)*|-|unknown) (?:%{DATA:upstream_addr} |)%{NUMBER:request_time:float} (?:%{NUMBER:upstream_time:float}|-)"]
                  match => ["message", "%{IP:clientip} - %{USER:user} \[%{HTTPDATE:raw_datetime}\] \"(?:%{WORD:verb} %{URI:request} HTTP/%{NUMBER:httpversion})\" (?:\"%{DATA:body}\" )?(?:\"%{DATA:cookie}\" )?%{NUMBER:response} (?:%{NUMBER:bytes:int}|-) \"%{DATA:referrer}\" \"%{DATA:agent}\" (?:(%{IP:proxy},? ?)*|-|unknown) (?:%{DATA:upstream_addr} |)%{NUMBER:request_time:float} (?:%{NUMBER:upstream_time:float}|-)"]       
              }
      

    ​ grok 语法:%{SYNTAX:SEMANTIC} 即 %{正则:自定义字段名}

    ​ 官方提供了很多正则的grok pattern可以直接使用 :https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns

    ​ grok debug工具: http://grokdebug.herokuapp.com

    正则表达式调试工具: https://www.debuggex.com/

    需要用到较多的正则知识,参考文档有:https://www.jb51.net/tools/zhengze.html

    ​ 自定义模式: (?<字段名>the pattern)

    ​ 例如: 匹配 2018/06/27 14:00:54

    ​ (?\d\d\d\d/\d\d/\d\d \d\d:\d\d:\d\d)

    ​ 得到结果: “datetime”: “2018/06/27 14:00:54”

    • date 日期解析 解析字段中的日期,然后转存到@timestamp

      [2018-07-04 17:43:35,503]
      grok{
            match => {"message"=>"%{DATA:raw_datetime}"}
      }
      date{
             match => ["raw_datetime","YYYY-MM-dd HH:mm:ss,SSS"]
              remove_field =>["raw_datetime"]
      }
      
      #将raw_datetime存到@timestamp 然后删除raw_datetime
      
      #24/Jul/2018:18:15:05 +0800
      date {
            match => ["timestamp","dd/MMM/YYYY:HH:mm:ss Z]
      }
      
    • mutate 对字段做处理 重命名、删除、替换和修改字段。

      • covert

        类型转换。类型包括:integer,float,integer_eu,float_eu,string和boolean

        filter{
            mutate{
        #     covert => ["response","integer","bytes","float"]  #数组的类型转换
                convert => {"message"=>"integer"}
            }
        }
        #测试------->
        {
                  "host" => "localhost",
               "message" => 123,    #没带“”,int类型
            "@timestamp" => 2018-06-26T02:51:08.651Z,
              "@version" => "1"
        }
        
      • split

        使用分隔符把字符串分割成数组

        mutate{
            split => {"message"=>","}
        }
        #---------->
        aaa,bbb
        {
            "@timestamp" => 2018-06-26T02:40:19.678Z,
              "@version" => "1",
                  "host" => "localhost",
               "message" => [
                [0] "aaa",
                [1] "bbb"
            ]}
        192,128,1,100
        {
                "host" => "localhost",
             "message" => [
              [0] "192",
              [1] "128",
              [2] "1",
              [3] "100"
         ],
          "@timestamp" => 2018-06-26T02:45:17.877Z,
            "@version" => "1"
        }
        
      • merge

        合并字段 。数组和字符串 ,字符串和字符串

        filter{
            mutate{
                add_field => {"field1"=>"value1"}
            }
            mutate{ 
                  split => {"message"=>"."}   #把message字段按照.分割
            }
            mutate{
                merge => {"message"=>"field1"}   #将filed1字段加入到message字段
            }
        }
        #--------------->
        abc
        {
               "message" => [
                [0] "abc,"
                [1] "value1"
            ],
            "@timestamp" => 2018-06-26T03:38:57.114Z,
                "field1" => "value1",
              "@version" => "1",
                  "host" => "localhost"
        }
        
        abc,.123
        {
               "message" => [
                [0] "abc,",
                [1] "123",
                [2] "value1"
            ],
            "@timestamp" => 2018-06-26T03:38:57.114Z,
                "field1" => "value1",
              "@version" => "1",
                  "host" => "localhost"
        }
        
      • rename

        对字段重命名

        filter{
            mutate{
                rename => {"message"=>"info"}
            }
        }
        #-------->
        123
        {
            "@timestamp" => 2018-06-26T02:56:00.189Z,
                  "info" => "123",
              "@version" => "1",
                  "host" => "localhost"
        }
        
      • remove_field

        ​ 移除字段

        mutate {
            remove_field => ["message","datetime"]
        }
        
      • join

        用分隔符连接数组,如果不是数组则不做处理

        mutate{
                split => {"message"=>":"}
        }
        mutate{
                join => {"message"=>","}
        }
        ------>
        abc:123
        {
            "@timestamp" => 2018-06-26T03:55:41.426Z,
               "message" => "abc,123",
                  "host" => "localhost",
              "@version" => "1"
        }
        aa:cc
        {
            "@timestamp" => 2018-06-26T03:55:47.501Z,
               "message" => "aa,cc",
                  "host" => "localhost",
              "@version" => "1"
        }
        
      • gsub

        用正则或者字符串替换字段值。仅对字符串有效

        mutate{
                gsub => ["message","/","_"]   #用_替换/
            }
        
        ------>
        a/b/c/
        {
              "@version" => "1",
               "message" => "a_b_c_",
                  "host" => "localhost",
            "@timestamp" => 2018-06-26T06:20:10.811Z
        }
        
      • update

        更新字段。如果字段不存在,则不做处理

        mutate{
                add_field => {"field1"=>"value1"}
            }
            mutate{
                update => {"field1"=>"v1"}
                update => {"field2"=>"v2"}    #field2不存在 不做处理
            }
        ---------------->
        {
            "@timestamp" => 2018-06-26T06:26:28.870Z,
                "field1" => "v1",
                  "host" => "localhost",
              "@version" => "1",
               "message" => "a"
        }
        
      • replace

        更新字段。如果字段不存在,则创建

         mutate{
                add_field => {"field1"=>"value1"}
            }
            mutate{
                replace => {"field1"=>"v1"}
                replace => {"field2"=>"v2"}
            }
        ---------------------->
        {
               "message" => "1",
                  "host" => "localhost",
            "@timestamp" => 2018-06-26T06:28:09.915Z,
                "field2" => "v2",        #field2不存在,则新建
              "@version" => "1",
                "field1" => "v1"
        }
        
    • geoip

      根据来自Maxmind GeoLite2数据库的数据添加有关IP地址的地理位置的信息

       geoip {
                  source => "clientip"
                  database =>"/tmp/GeoLiteCity.dat"
              }
      
    • ruby

      ruby插件可以执行任意Ruby代码

      filter{
          urldecode{
              field => "message"
          }
          ruby {
              init => "@kname = ['url_path','url_arg']"
              code => " 
                  new_event = LogStash::Event.new(Hash[@kname.zip(event.get('message').split('?'))]) 
                  event.append(new_event)"
          }
          if [url_arg]{
              kv{
                  source => "url_arg"
                  field_split => "&"
                  target => "url_args"
                  remove_field => ["url_arg","message"]
              }
          }
      }
      # ruby插件
      # 以?为分隔符,将request字段分成url_path和url_arg
      -------------------->
      www.test.com?test
      {
             "url_arg" => "test",
                "host" => "localhost",
            "url_path" => "www.test.com",
             "message" => "www.test.com?test",  
            "@version" => "1",
          "@timestamp" =>  2018-06-26T07:31:04.887Z
      }
      www.test.com?title=elk&content=学习elk
      {
            "url_args" => {
                "title" => "elk",
              "content" => "学习elk"
          },
                "host" => "localhost",
            "url_path" => "www.test.com",
            "@version" => "1",
          "@timestamp" =>  2018-06-26T07:33:54.507Z
      }
      
    • urldecode

      用于解码被编码的字段,可以解决URL中 中文乱码的问题

       urldecode{
              field => "message"
          }
      
      # field :指定urldecode过滤器要转码的字段,默认值是"message"
      # charset(缺省): 指定过滤器使用的编码.默认UTF-8
      
    • kv

      通过指定分隔符将字符串分割成key/value

      kv{
              prefix => "url_"   #给分割后的key加前缀
              target => "url_ags"    #将分割后的key-value放入指定字段
              source => "message"   #要分割的字段
              field_split => "&"    #指定分隔符
              remove_field => "message"
          }
      -------------------------->
      a=1&b=2&c=3
      {
                  "host" => "localhost",
             "url_ags" => {
                "url_c" => "3",
                "url_a" => "1",
                "url_b" => "2"
          },
            "@version" => "1",
          "@timestamp" => 2018-06-26T07:07:24.557Z
      
    • useragent

      添加有关用户代理(如系列,操作系统,版本和设备)的信息

      if [agent] != "-" {
        useragent {
          source => "agent"
          target => "ua"
          remove_field => "agent"
        }
      }
      # if语句,只有在agent字段不为空时才会使用该插件
      #source 为必填设置,目标字段
      #target 将useragent信息配置到ua字段中。如果不指定将存储在根目录中
      

    logstash 比较运算符

    等于: ==, !=, <, >, <=, >=
      正则: =~, !~ (checks a pattern on the right against a string value on the left)
      包含关系: in, not in

    支持的布尔运算符:and, or, nand, xor

    支持的一元运算符: !

    output plugin 输出插件,将事件发送到特定目标。

    • stdout 标准输出。将事件输出到屏幕上

      output{
          stdout{
              codec => "rubydebug"
          }
      }
      
    • file 将事件写入文件

          file {
             path => "/data/logstash/%{host}/{application}
             codec => line { format => "%{message}"} }
          }
      
    • kafka 将事件发送到kafka

          kafka{
              bootstrap_servers => "localhost:9092"
              topic_id => "test_topic"  #必需的设置。生成消息的主题
          }
      
    • elasticseach 在es中存储日志

          elasticsearch {
              hosts => "localhost:9200"
              index => "nginx-access-log-%{+YYYY.MM.dd}"  
          }
      #index 事件写入的索引。可以按照日志来创建索引,以便于删旧数据和按时间来搜索日志
      

    补充一个codec plugin 编解码器插件

    codec 本质上是流过滤器,可以作为input 或output 插件的一部分运行。例如上面output的stdout插件里有用到。

    • multiline codec plugin 多行合并, 处理堆栈日志或者其他带有换行符日志需要用到

      input {
        stdin {
          codec => multiline {
            pattern => "pattern, a regexp"    #正则匹配规则,匹配到的内容按照下面两个参数处理
            negate => "true" or "false"     # 默认为false。处理匹配符合正则规则的行。如果为true,处理不匹配符合正则规则的行。
            what => "previous" or "next"    #指定上下文。将指定的行是合并到上一行或者下一行。
          }
        }
      }
      codec => multiline {
          pattern => "^\s"  
          what => "previous"  
      }
      # 以空格开头的行都合并到上一行
      
      codec => multiline {
          # Grok pattern names are valid! :)
          pattern => "^%{TIMESTAMP_ISO8601} "
          negate => true
          what => "previous"
      }
      # 任何不以这个时间戳格式开头的行都与上一行合并
      
      codec => multiline {
         pattern => "\\$"
         what => "next"
      }
      # 以反斜杠结尾的行都与下一行合并
      
    展开全文
  • logstash 配置文件详解

    千次阅读 2019-08-06 12:15:37
    1: bin / logstash -f second-pipeline.conf --config.test_and_exit 该--config.test_and_...当配置文件通过配置测试时,使用以下命令启动 Logstash: bin / logstash -f second-pipeline.conf 2: bin / lo...

    1:

    bin / logstash -f second-pipeline.conf --config.test_and_exit

    --config.test_and_exit选项会解析您的配置文件并报告任何错误。当配置文件通过配置测试时,使用以下命令启动      Logstash:

     bin / logstash -f second-pipeline.conf

    2:

    bin / logstash -f first-pipeline.conf --config.reload.automatic

    --config.reload.automatic选项启用自动配置重新加载,因此您不必在每次修改配置文件时停止并重新启动Logstash。

    3、file源的解析

    input{
        file{
            #path属性接受的参数是一个数组,其含义是标明需要读取的文件位置
            path => [‘pathA’,‘pathB’]
            #表示多就去path路径下查看是够有新的文件产生。默认是15秒检查一次。
            discover_interval => 15
            #排除那些文件,也就是不去读取那些文件
            exclude => [‘fileName1’,‘fileNmae2’]
            #被监听的文件多久没更新后断开连接不在监听,默认是一个小时。
            close_older => 3600
            #在每次检查文件列 表的时候, 如果一个文件的最后 修改时间 超过这个值, 就忽略这个文件。 默认一天。
            ignore_older => 86400
            #logstash 每隔多 久检查一次被监听文件状态( 是否有更新) , 默认是 1 秒。
            stat_interval => 1
            #sincedb记录数据上一次的读取位置的一个index
            sincedb_path => ’$HOME/. sincedb‘
            #logstash 从什么 位置开始读取文件数据, 默认是结束位置 也可以设置为:beginning 从头开始
            start_position => ‘beginning’
            #注意:这里需要提醒大家的是,如果你需要每次都从同开始读取文件的话,关设置start_position => beginning是没有用的,你可以选择sincedb_path 定义为 /dev/null
        }            
     
    }
     

    4:jdbc详解

    input{
        jdbc{
        #jdbc sql server 驱动,各个数据库都有对应的驱动,需自己下载
        jdbc_driver_library => ""
        #jdbc class 不同数据库有不同的 class 配置
        jdbc_driver_class => ""
        #配置数据库连接 ip 和端口,以及数据库    
        jdbc_connection_string => ""
        #配置数据库用户名
        jdbc_user =>   
        #配置数据库密码
        jdbc_password =>
        #上面这些都不重要,要是这些都看不懂的话,你的老板估计要考虑换人了。重要的是接下来的内容。
        # 定时器 多久执行一次SQL,默认是一分钟
        # schedule => 分 时 天 月 年  
        # schedule => * 22  *  *  * 表示每天22点执行一次
        schedule => "* * * * *"
        #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
        clean_run => false
        #是否需要记录某个column 的值,如果 record_last_run 为真,可以自定义我们需要表的字段名称,
        #此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
        use_column_value => true
        #如果 use_column_value 为真,需配置此参数. 这个参数就是数据库给出的一个字段名称。当然该字段必须是递增的,可以是 数据库的数据时间这类的
        tracking_column => create_time
        #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
        record_last_run => true
        #们只需要在 SQL 语句中 WHERE MY_ID > :last_sql_value 即可. 其中 :last_sql_value 取得就是该文件中的值
        last_run_metadata_path => "/etc/logstash/run_metadata.d/my_info"
        #是否将字段名称转小写。
        #这里有个小的提示,如果你这前就处理过一次数据,并且在Kibana中有对应的搜索需求的话,还是改为true,
        #因为默认是true,并且Kibana是大小写区分的。准确的说应该是ES大小写区分
        lowercase_column_names => false
        #你的SQL的位置,当然,你的SQL也可以直接写在这里。
        #statement => SELECT * FROM tabeName t WHERE  t.creat_time > :last_sql_value
        statement_filepath => "/etc/logstash/statement_file.d/my_info.sql"
        #数据类型,标明你属于那一方势力。单了ES哪里好给你安排不同的山头。
        type => "my_info"
        }
        #注意:外载的SQL文件就是一个文本文件就可以了,还有需要注意的是,一个jdbc{}插件就只能处理一个SQL语句,
        #如果你有多个SQL需要处理的话,只能在重新建立一个jdbc{}插件。
    }
     

    5:output 详解

    output {
    //输入到es集群中,当hosts 参数列出多个IP地址时,Logstash会在地址列表中对请求进行负载平衡。另请注意,Elasticsearch的默认端口是,9200并且可以在上面的配置中省略。
        elasticsearch { 
            hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"]
        }
    //写入文件
        file {
            path => "/path/to/target/file"
        }
    //以json格式输出到控制台
        stdout { codec => json_lines } 
    }

     

    展开全文
  • logstash配置文件详解

    万次阅读 多人点赞 2017-03-10 14:17:12
    Logstash实际应用配置详解背景业务目的是能够分析nginx和apache每天产生的日志,对url、ip、rest接口等信息进行监控,并将数据发送到elasticsearch服务。

    Logstash实际应用配置详解

    背景

    业务目的是能够分析nginx和apache每天产生的日志,对url、ip、rest接口等信息进行监控,并将数据发送到elasticsearch服务。

    config

    输入

    从日志文件中获取信息

    file {
        path => "/home/keepgostudio/demo/logs/test.log"
        start_position => "beginning"
    }       
    

    过滤

    grok

    目前是logstash中把非标准化的日志数据转换成标准化并且可搜索数据最好的方式。Logstash默认提供了能分析包括java堆栈日志、apache日志在内的120种形式。点击查看

    如没有特别的需求的话,使用默认的apache日志形式就能达到想要的效果,如下。

    grok{       
        match => {"message" => ["%{COMBINEDAPACHELOG}"]}        
    }
    

    但如果想要监控更多的信息,比如url上的参数,那么默认的表达式将没办法满足我们的需求,这时我们就需要自己动手去编写一些符合我们业务需要的表达式,并告诉logstash以某种期望的方式进行数据转换。

    首先,在logstash的根目录下创建一个patterns文件夹,这个文件夹默认是没有的。

    其次,在patterns文件夹中创建文件test_pattern(这里为了方便所以没有按照pattern的功能对文件进行命名,在实际应用中最好按照功能来对文件命名,至于原因你懂的)。在test_pattern文件中可以按照“名称 正则表达式”这样的格式自定义一些正则表达式,以便在grok中进行使用。

    最后,在使用的时候一定要把pattern_dir这个参数带上,否则logstash无法识别你自定义的这些正则表达式。

    grok {
        patterns_dir => ["/home/keepgostudio/download/logstash-5.2.0/patterns"]
        match => {
            "message" => ["%{PARAMS_APACHELOG}", "%{NO_PARAMS_APACHELOG}"]
        }
        remove_field => ["host", "timestamp", "httpversion", "@version"]
    }
    

    kv

    将数据源转换成键值对,并创建相对的field。比如传入“a=111&b=2222&c=3333”,输出的时候,a,b,c会被创建成三个field,这样做的好处是,当需要查询某一个参数的时候可直接查询,而不是把一个字符串搜索出来再做解析。

    kv {
        source => "field_name"
        field_split => "&?"
    }
    

    geoip

    这个从字面上就能看出他的功能,根据ip查出相应的地理信息,比如城市,省份,国家,经纬度等。这个ip信息是在logstash中的一个数据源中进行搜索查找,而不是进行网络搜索。

    geoip {
        source => "field_name"
        fields => ["country_name", "region_name", "city_name", "latitude", "longitude"]
        target => "location"
    }
    

    drop

    drop可以跳过某些不想统计的日志信息,当某条日志信息符合if规则时,该条信息则不会在out中出现,logstash将直接进行下一条日志的解析。

    if [field_name] == "value" {
        drop {}
    }
    

    输出

    logstash输出到elasticsearch时,会随机生成一个id给每一条记录。建议在刚开始接触的时候,可以采用手动输入的方式。这样结合elasticsearch的服务,更容易理解整个实现的流程。

    elasticsearch {
        hosts => ["192.168.1.44:9200"]      
        index => "logstash-test-%{type}-%{host}"        
    }
    

    附录

    test.config

    input {
        stdin {}
    }
    
    filter {
        grok {
            patterns_dir => ["/home/keepgostudio/download/logstash-5.2.0/patterns"]
            match => {
                "message" => ["%{PARAMS_APACHELOG}", "%{NO_PARAMS_APACHELOG}"]
            }
            remove_field => ["host", "timestamp", "httpversion", "@version"]
        }
    
        kv {
            source => "params"
            field_split => "&?"
        }
    
        geoip {
            source => "ip"
            fields => ["country_name", "region_name", "city_name", "latitude", "longitude"]
            target => "location"
    }
    
    output {
        elasticsearch {
            hosts => ["192.168.1.44:9200"]      
            index => "logstash-test-%{type}-%{host}"        
        }
    
    }
    

    test_pattern

    HTTP_URL \S+(?=\?)
    HTTP_URL_WITH_PARAMS "(?:%{WORD:method} %{HTTP_URL:url}\?%{NOTSPACE:params}(?: HTTP/%{NUMBER:httpversion}))"
    HTTP_URL_WITHOUT_PARAMS "(?:%{WORD:method} %{NOTSPACE:url}(?: HTTP/%{NUMBER:httpversion}))"
    NO_PARAMS_APACHELOG %{IPV4:ip} %{USERNAME} %{USERNAME} \[%{HTTPDATE:timestamp}\] %{HTTP_URL_WITHOUT_PARAMS} %{NUMBER:response} (?:%{NUMBER:bytes}|-) "%{NOTSPACE:referrer}" %{QS:agent}
    PARAMS_APACHELOG %{IPV4:ip} %{USERNAME} %{USERNAME} \[%{HTTPDATE:timestamp}\] %{HTTP_URL_WITH_PARAMS} %{NUMBER:response} (?:%{NUMBER:bytes}|-) "%{NOTSPACE:referrer}" %{QS:agent}
    
    展开全文
  • 自定义文件的logstash配置文件
  • 配置文件 | logstash配置文件详解

    万次阅读 2018-09-28 15:25:17
    说明 /logstash/config/logstash.yml:主要用于控制...配置参数说明 logstash.yml 参数 用途 默认值 node.name 节点名称 主机名称 path.data /数据存储路径 LOGSTASH_HOME/da...

    说明

    /logstash/config/logstash.yml:主要用于控制logstash运行时的状态
    /logstash/config/startup.options:logstash 运行相关参数

    配置参数说明

    logstash.yml
    参数用途默认值

    node.name

    节点名称主机名称

    path.data

    /数据存储路径LOGSTASH_HOME/data/

    pipeline.workers

    输出通道的工作workers数据量(提升输出效率)cpu核数

    pipeline.output.workers

    每个输出插件的工作wokers数量1

    pipeline.batch.size

    每次input数量125

    path.config

    过滤配置文件目录

    config.reload.automatic

    自动重新加载被修改配置false or true

    config.reload.interval

    配置文件检查时间

    path.logs

    日志输出路径

    http.host

    绑定主机地址,用户指标收集“127.0.0.1”

    http.port

    绑定端口5000-9700

    log.level

    日志输出级别,如果config.debug开启,这里一定要是debug日志info

    log.format

    日志格式* plain*

    path.plugins

    自定义插件目录
    startup.options
    参数用途
    JAVACMD=/usr/bin/java本地jdk
    LS_HOME=/opt/logstashlogstash所在目录
    LS_SETTINGS_DIR="${LS_HOME}/config"默认logstash配置文件目录
    LS_OPTS="–path.settings ${LS_SETTINGS_DIR}"logstash启动命令参数 指定配置文件目录
    LS_JAVA_OPTS=""指定jdk目录
    LS_PIDFILE=/var/run/logstash.pidlogstash.pid所在目录
    LS_USER=logstashlogstash启动用户
    LS_GROUP=logstashlogstash启动组
    LS_GC_LOG_FILE=/var/log/logstash/gc.loglogstash jvm gc日志路径
    LS_OPEN_FILES=65534logstash最多打开监控文件数量
    展开全文
  • 1.logstash配置不多,主要编写数据处理解析规则,具体可以参考  http://udn.yyuap.com/doc/logstash-best-practice-cn/get_start/daemon.html  这里。 2.filebeat文件配置详解(新版本里面有两个新文件filebeat...
  • logstash配置文件

    2018-08-30 21:49:01
    配置文件logstash的stdin、file、tcp、udp、syslog、beats、grok、elasticsearch插件配置
  • 它以插件的形式来组织功能,通过配置文件来描述需要插件做什么,配置文件主要由input、filter和output三部分组成。 一、input  负责从数据源提取数据,由于我提取的是日志文件,所以使用的是file插件,该插件...
  • logstash 配置文件编写详解

    千次阅读 2018-09-28 17:50:47
    说明 它一个有jruby语言编写的运行在java...通过在配置文件编写输入(input),过滤(filter),输出(output)相关规则,对数据收集转发。 配置文件编写语法 大致格式如下 # 输入 input { ... } #...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 2,302
精华内容 920
关键字:

logstash配置文件详解