精华内容
下载资源
问答
  • 自定义文件的logstash配置文件
  • logstash配置文件

    2018-05-25 13:41:57
    logstach的exe.conf配置文件............................................................................................................
  • Logstash 配置工具 该项目面向希望开始使用。 它提供了一个基于 Web 的 UI 来创建 Logstash 配置文件。 创建的配置文件可以下载并与 Logstash 一起使用。 未来,用户界面还将允许用户在线测试他们的配置文件。 开发...
  • Logstash配置格式的语法分析基础是原始的,仅需进行很小的更改即可使用。 logstash-config使用从PEG生成解析器(解析器表达语法)。 特别感谢Martin Angers( )。 该软件包目前正在开发中,没有API保证。 安装 ...
  • Logstash配置对IntelliJ的支持 IntelliJ IDE的插件以支持弹性配置文件 特征 语法高亮 关键字补全 大括号匹配 自动评论 安装 使用IDE内置插件系统: 文件>设置>插件>浏览存储库... >搜索“ logstash” >安装插件 手动...
  • 一、logstash跟es有版本对照关系 了解对照关系,决定要安装的logstash版本 二、Logstash工作原理 Logstash事件处理管道有三个阶段:输入→过滤器→输出,输入生成事件,过滤器修改它们,然后输出将它们发送到其他...

    一、logstash跟es有版本对照关系

    了解对照关系,决定要安装的logstash版本

    二、ELK出现的原因

    在这里插入图片描述
    在这里插入图片描述

    三、Logstash工作原理

    Logstash事件处理管道有三个阶段:输入→过滤器→输出,输入生成事件,过滤器修改它们,然后输出将它们发送到其他地方。输入和输出支持编解码器,使你能够在数据进入或离开管道时对其进行编码或解码,而无需使用单独的过滤器
    在这里插入图片描述

    • 输入

    你使用输入将数据获取到Logstash中,一些比较常用的输入是:

    属性解析
    file从文件系统上的文件进行读取,非常类似于UNIX命令tail -0F。
    syslog在众所周知的端口514上监听syslog消息并根据RFC3164格式进行解析。
    redis从redis服务器读取数据,同时使用Redis通道和Redis列表,Redis通常被用作集中式Logstash安装中的“broker”,它将从远程Logstash “shipper”中的Logstash事件排队。
    beats处理Beats发送的事件。

    有关可用输入的更多信息,请参见输入插件

    • 过滤器

    过滤器是Logstash管道中的中间处理设备,如果事件符合一定的条件,你可以将过滤器与条件语句组合在一起,对其执行操作,一些有用的过滤器包括:

    属性解析
    grok解析和构造任意文本,Grok是目前Logstash中解析非结构化日志数据到结构化和可查询数据的最佳方式,使用内置的120种模式,你很可能会找到一个满足你的需要!
    mutate对事件字段执行一般的转换,你可以重命名、删除、替换和修改事件中的字段。
    drop完全删除事件,例如debug事件。
    clone复制事件,可能添加或删除字段。
    geoip添加关于IP地址地理位置的信息(在Kibana中还显示了令人惊叹的图表!)

    有关可用过滤器的更多信息,请参见过滤器插件

    • 输出

    输出是Logstash管道的最后阶段,事件可以通过多个输出,但是一旦所有的输出处理完成,事件就完成了它的执行,一些常用的输出包括:

    属性解析
    elasticsearch发送事件数据到Elasticsearch,如果你打算以一种高效、方便、易于查询的格式保存数据,那么使用Elasticsearch是可行的。
    file将事件数据写入磁盘上的文件。
    graphite将事件数据发送到graphite,这是一种流行的用于存储和绘制指标的开源工具。http://graphite.readthedocs.io/en/latest/
    statsd发送事件到statsd,“监听统计信息(如计数器和计时器)、通过UDP发送聚合并将聚合发送到一个或多个可插拔后端服务”的服务,如果你已经在使用statsd,这可能对你很有用!

    有关可用输出的更多信息,请参见输出插件。

    • 编解码器

    Codecs是基本的流过滤器,可以作为输入或输出的一部分进行操作,Codecs使你能够轻松地将消息的传输与序列化过程分开,流行的codecs包括json、msgpack和plain(text)。

    属性解析
    json以JSON格式对数据进行编码或解码。
    multiline将多行文本事件(如java异常和stacktrace消息)合并到单个事件中。

    有关可用编解码器的更多信息,请参见编解码器插件

    • 执行模型

    Logstash事件处理管道协调输入、过滤器和输出的执行。Logstash管道中的每个输入阶段都在自己的线程中运行,输入将事件写入位于内存(默认)或磁盘上的中央队列,每个管道工作线程从这个队列中取出一批事件,通过配置的过滤器运行事件批处理,然后通过任何输出运行过滤的事件,可以配置批处理的大小和管道工作线程的数量参见调优和分析Logstash性能)。默认情况下,Logstash使用内存有限队列在管道阶段之间(输入→过滤器和过滤器→输出)来缓冲事件,如果Logstash不安全的终止,则存储在内存中的任何事件都将丢失。为了防止数据丢失,你可以启用Logstash将运行中的事件持久化到磁盘上,有关更多信息,请参见持久队列

    四、logstash的配置文件

    1. logstash.yml
      你可以在Logstash设置文件logstash.yml中设置选项来控制Logstash执行,例如,你可以指定管道设置、配置文件的位置、日志记录选项和其他设置。当你运行Logstash时,logstash.yml文件中的大多数设置都可以作为命令行标志使用,在命令行中设置的任何标志都会覆盖logstash.yml文件中的相应设置。logstash.yml文件是用YAML编写的,它的位置因平台而异(参见Logstash目录布局),你可以以层次结构形式指定设置或使用平面键,例如,要使用分层表单设置管道批处理大小和批延迟,你需要指定
    pipeline:
      batch:
        size: 125
        delay: 50
    

    要表示与平面键相同的值,需要指定:

    pipeline.batch.size: 125
    pipeline.batch.delay: 50
    

    logstash.yml文件还支持bash风格的环境变量插值设置值

    pipeline:
      batch:
        size: ${BATCH_SIZE}
        delay: ${BATCH_DELAY:50}
    node:
      name: "node_${LS_NODE_NAME}"
    path:
       queue: "/tmp/${QUEUE_DIR:queue}"
    

    注意,${VAR_NAME:default_value}表示法是受支持的,在上面的示例中,它设置了一个默认的批延迟50和一个默认的path.queue为/tmp/queue的。

    模块也可以在logstash.yml文件中指定,模块定义将具有这种格式:

    modules:
      - name: MODULE_NAME1
        var.PLUGIN_TYPE1.PLUGIN_NAME1.KEY1: VALUE
        var.PLUGIN_TYPE1.PLUGIN_NAME1.KEY2: VALUE
        var.PLUGIN_TYPE2.PLUGIN_NAME2.KEY1: VALUE
        var.PLUGIN_TYPE3.PLUGIN_NAME3.KEY1: VALUE
      - name: MODULE_NAME2
        var.PLUGIN_TYPE1.PLUGIN_NAME1.KEY1: VALUE
        var.PLUGIN_TYPE1.PLUGIN_NAME1.KEY2: VALUE
    

    如果使用命令行标志--modules,则忽略在logstash.yml文件中定义的任何模块
    logstash.yml文件包括以下设置,如果你使用的是X-Pack,请参阅Logstash中的X-Pack设置。

    设置描述默认值
    node.name节点的描述性名称机器的主机名
    path.dataLogstash及其插件用于任何持久需求的目录LOGSTASH_HOME/data
    pipeline.id管道的IDmain
    pipeline.workers将并行执行管道的过滤和输出阶段的工人数量,如果你发现事件正在备份,或者CPU没有饱和,请考虑增加这个数字,以更好地利用机器处理能力主机CPU核心的数量
    pipeline.batch.size在尝试执行过滤器和输出之前,单个工作线程将从输入中收集的最大事件数,更大的批处理大小通常更高效,但代价是增加内存开销,你可能需要增加jvm.options配置文件中的JVM堆空间,有关更多信息,请参阅Logstash配置文件125
    pipeline.batch.delay当创建管道事件批处理时,在向管道工作人员发送一个较小的批处理之前,等待每个事件的时间为多少毫秒50
    pipeline.unsafe_shutdown当设置为true时,即使内存中仍然存在游离事件,也会在关闭期间强制Logstash退出,默认情况下,Logstash将拒绝退出,直到所有接收到的事件都被推送到输出,启用此选项可能导致关闭期间的数据丢失false
    path.config主管道的Logstash配置路径,如果指定目录或通配符,配置文件将按字母顺序从目录中读取None
    config.test_and_exit当设置为true时,检查配置是否有效,然后退出,注意,在此设置中没有检查grok模式的正确性,Logstash可以从一个目录中读取多个配置文件,如果你把这个设置和log.level: debug结合起来,Logstash将对合并后的配置文件进行日志记录,并用它来自的源文件注解每个配置块false
    config.reload.automatic当设置为true时,定期检查配置是否已更改,并在更改配置时重新加载配置,这也可以通过SIGHUP信号手动触发false
    config.reload.intervalLogstash多久检查一次配置文件以查看更改3s
    config.debug当设置为true时,将完整编译的配置显示为debug日志消息,你还必须设置log.level: debug,警告:日志消息将包含传递给插件配置的任意密码选项,可能会导致明文密码出现在日志中!false
    config.support_escapes当设置为true时,引号中的字符串将处理以下转义序列:\n变成文字换行符(ASCII 10),\r变成文字回车(ASCII 13),\t变成文字制表符(ASCII 9),\变成字面反斜杠\,"变成一个文字双引号,'变成文字引号false
    modules当配置时,modules必须位于上表中描述的嵌套YAML结构中None
    queue.type用于事件缓冲的内部队列模型,为基于内存中的遗留队列指定memory,或者persisted基于磁盘的ACKed队列(持久队列)memory
    path.queue启用持久队列时存储数据文件的目录路径(queue.type: persisted)path.data/queue
    queue.page_capacity启用持久队列时使用的页面数据文件的大小(queue.type: persisted),队列数据由分隔成页面的仅追加的数据文件组成64mb
    queue.max_events启用持久队列时队列中未读事件的最大数量(queue.type: persisted)0(无限)
    queue.max_bytes队列的总容量(字节数),确保磁盘驱动器的容量大于这里指定的值,如果queue.max_events和queue.max_bytes都指定,Logstash使用最先达到的任何标准1024mb(1g)
    queue.checkpoint.acks当启用持久队列时,在强制执行检查点之前的最大ACKed事件数(queue.type: persisted),指定queue.checkpoint.acks: 0设置此值为无限制1024
    queue.checkpoint.writes启用持久队列时强制执行检查点之前的最大写入事件数(queue.type: persisted),指定queue.checkpoint.writes: 0设置此值为无限制1024
    queue.drain启用时,Logstash会一直等到持久队列耗尽后才关闭false
    dead_letter_queue.enable标记指示Logstash以插件支持的DLQ特性false
    dead_letter_queue.max_bytes每个dead letter队列的最大大小,如果条目将增加dead letter队列的大小,超过此设置,则删除条目1024mb
    path.dead_letter_queue存储dead letter队列数据文件的目录路径path.data/dead_letter_queue
    http.host指标REST端点的绑定地址“127.0.0.1”
    http.port指标REST端点的绑定端口9600
    log.level日志级别,有效的选项是:fatal、error、warn、info、debug、traceinfo
    log.format日志格式,设置为json日志以JSON格式,或plain使用Object#.inspectplain
    path.logsLogstash将其日志写到的目录LOGSTASH_HOME/logs
    path.plugins哪里可以找到自定义插件,你可以多次指定此设置以包含多个路径,插件应该在特定的目录层次结构中:PATH/logstash/TYPE/NAME.rb,TYPE是inputs、filters、outputs或codecs,NAME是插件的名称特定于平台的

    五、logstash配置

    下面的示例演示如何配置Logstash来过滤事件,处理Apache日志和syslog消息,并使用条件来控制哪些事件由过滤器或输出处理。如果你需要帮助构建grok模式,请尝试grok调试器,Grok调试器是基本许可证下的X-Pack特性,因此可以免费使用

    • 配置过滤器

    过滤器是一种在线处理机制,它提供了根据需要对数据进行切片和切割的灵活性,让我们看一下活动中的一些过滤器,下面的配置文件设置了grok和date过滤器。

    input { stdin { } }
    
    filter {
      grok {
        match => { "message" => "%{COMBINEDAPACHELOG}" }
      }
      date {
        match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
      }
    }
    
    output {
      elasticsearch { hosts => ["localhost:9200"] }
      stdout { codec => rubydebug }
    }
    

    使用此配置运行Logstash:

    bin/logstash -f logstash-filter.conf
    现在,将下面的行粘贴到你的终端并按Enter键,这样它就会被stdin输入处理:

    127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] “GET /xampp/status.php HTTP/1.1” 200 3891 “http://cadenza/xampp/navi.php” “Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0”
    你应该会看到返回到stdout的是这样的:

    {
            "message" => "127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] \"GET /xampp/status.php HTTP/1.1\" 200 3891 \"http://cadenza/xampp/navi.php\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\"",
         "@timestamp" => "2013-12-11T08:01:45.000Z",
           "@version" => "1",
               "host" => "cadenza",
           "clientip" => "127.0.0.1",
              "ident" => "-",
               "auth" => "-",
          "timestamp" => "11/Dec/2013:00:01:45 -0800",
               "verb" => "GET",
            "request" => "/xampp/status.php",
        "httpversion" => "1.1",
           "response" => "200",
              "bytes" => "3891",
           "referrer" => "\"http://cadenza/xampp/navi.php\"",
              "agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\""
    }
    

    如你所见,Logstash(在grok过滤器的帮助下)能够解析日志行(碰巧是Apache的“组合日志”格式),并将其分解为许多不同的离散信息,一旦开始查询和分析日志数据,这就非常有用。例如,你将能够轻松地在HTTP响应码、IP地址、referrers等上运行报表。Logstash有很多现成的grok模式,因此如果你需要解析通用的日志格式,很可能已经有人为你完成了这项工作,有关更多信息,请参阅GitHub上Logstash grok模式的列表。本例中使用的另一个过滤器是date过滤器,这个过滤器会解析一个时间戳,并将其用作事件的时间戳(不管你什么时候使用日志数据)。你将注意到,本例中的@timestamp字段设置为2013年12月11日,尽管Logstash在随后的某个时间摄取了该事件,这在备份日志时非常方便,它使你能够告诉Logstash“使用此值作为此事件的时间戳”。

    • 处理Apache日志

    让我们做一些有用的事情:处理apache访问日志文件!我们将从本地主机上的文件中读取输入,并根据需要使用条件处理事件。首先,创建一个名为logstash-apache.conf的文件包含以下内容(你可以根据需要更改日志文件路径):

    input {
      file {
        path => "/tmp/access_log"
        start_position => "beginning"
      }
    }
    
    filter {
      if [path] =~ "access" {
        mutate { replace => { "type" => "apache_access" } }
        grok {
          match => { "message" => "%{COMBINEDAPACHELOG}" }
        }
      }
      date {
        match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
      }
    }
    
    output {
      elasticsearch {
        hosts => ["localhost:9200"]
      }
      stdout { codec => rubydebug }
    }
    

    然后,使用以下日志条目(或使用你自己的webserver中的一些日志条目)创建上面配置的输入文件(在本例中为“/tmp/access_log”):

    71.141.244.242 - kurt [18/May/2011:01:48:10 -0700] "GET /admin HTTP/1.1" 301 566 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.2.3) Gecko/20100401 Firefox/3.6.3"
    134.39.72.245 - - [18/May/2011:12:40:18 -0700] "GET /favicon.ico HTTP/1.1" 200 1189 "-" "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; InfoPath.2; .NET4.0C; .NET4.0E)"
    98.83.179.51 - - [18/May/2011:19:35:08 -0700] "GET /css/main.css HTTP/1.1" 200 1837 "http://www.safesand.com/information.htm" "Mozilla/5.0 (Windows NT 6.0; WOW64; rv:2.0.1) Gecko/20100101 Firefox/4.0.1"
    

    现在,使用-f标志运行Logstash以将其传递到配置文件:

    bin/logstash -f logstash-apache.conf
    现在你应该在Elasticsearch中看到你的apache日志数据了,Logstash打开并读取指定的输入文件,处理遇到的每个事件。记录到此文件的任何追加行也将被捕获,由Logstash作为事件处理,并存储在Elasticsearch中。还有一个额外的好处,他们储藏的字段“type”设置为“apache_access”(这是由输入配置中的type ⇒ "apache_access"行)。

    在这个配置中,Logstash只查看apache access_log,但是通过更改上面配置中的一行就可以同时查看access_log和error_log(实际上是任何文件匹配*log)

    input {
      file {
        path => "/tmp/*_log"
    ...
    

    当你重新启动Logstash时,它将同时处理error和access日志,但是,如果你检查数据(可能使用elasticsearch-kopf),你会看到access_log被分解为离散字段,而error_log则不是。这是因为我们使用了grok过滤器来匹配标准的组合apache日志格式,并自动将数据分割为单独的字段。如果我们能根据它的格式来控制行是如何被解析的,不是很好吗?嗯,我们可以…
    注意,Logstash不会重新处理access_log文件中已经查看的事件,从文件中读取数据时,Logstash保存其位置,并只在添加新行时处理它们。

    • 使用条件

    你可以使用条件来控制哪些事件由过滤器或输出处理,例如,你可以根据出现在哪个文件(access_log、error_log以及以“log”结尾的其他随机文件)中来标记每个事件。

    input {
      file {
         #标签
        type => "systemlog-localhost"
        #采集点
        path => "/tmp/*_log"
        #开始收集点
        start_position => "beginning"
        #扫描间隔时间,默认是1s,建议5s
        stat_interval => "5"
      }
    }
    filter {
      if [path] =~ "access" {
        mutate { replace => { type => "apache_access" } }
        grok {
          match => { "message" => "%{COMBINEDAPACHELOG}" }
        }
        date {
          match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
        }
      } else if [path] =~ "error" {
        mutate { replace => { type => "apache_error" } }
      } else {
        mutate { replace => { type => "random_logs" } }
      }
    }
    
    output {
      elasticsearch { hosts => ["localhost:9200"] }
      stdout { codec => rubydebug }
    }
    

    这个示例使用type字段标记所有事件,但实际上不解析error或random文件,有很多类型的错误日志,它们应该如何标记取决于你使用的日志

    类似地,你可以使用条件将事件定向到特定的输出,例如,你可以:

    • 警告nagios任何状态为5xx的apache事件
    • 将任何4xx状态记录到Elasticsearch
    • 通过statsd记录所有的状态代码

    要告诉nagios任何具有5xx状态码的http事件,首先需要检查type字段的值,如果是apache,那么你可以检查status字段是否包含5xx错误,如果是,发送到nagios。如果不是5xx错误,检查status字段是否包含4xx错误,如果是,发送到Elasticsearch。最后,将所有apache状态码发送到statsd,无论状态字段包含什么:

    output {
      if [type] == "apache" {
        if [status] =~ /^5\d\d/ {
          nagios { ...  }
        } else if [status] =~ /^4\d\d/ {
          elasticsearch { ... }
        }
        statsd { increment => "apache.%{status}" }
      }
    }
    
    • 处理Syslog消息

    Syslog是Logstash最常见的用例之一,而且它处理得非常好(只要日志行大致符合RFC3164),Syslog实际上是UNIX网络日志记录标准,它将消息从客户端发送到本地文件,或通过rsyslog发送到集中式日志服务器。对于本例,你不需要一个功能正常的syslog实例;我们将从命令行中伪造它,这样你就可以了解发生了什么。

    首先,让我们为Logstash + syslog创建一个简单的配置文件,名为logstash-syslog.conf。

    input {
      tcp {
        port => 5000
        type => syslog
      }
      udp {
        port => 5000
        type => syslog
      }
    }
    
    filter {
      if [type] == "syslog" {
        grok {
          match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
          add_field => [ "received_at", "%{@timestamp}" ]
          add_field => [ "received_from", "%{host}" ]
        }
        date {
          match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
        }
      }
    }
    
    output {
      elasticsearch { hosts => ["localhost:9200"] }
      stdout { codec => rubydebug }
    }
    

    使用这个新配置运行Logstash:

    bin/logstash -f logstash-syslog.conf
    

    通常,客户端将连接到端口5000上的Logstash实例并发送消息,对于本例,我们将telnet到Logstash并输入一条日志行(类似于前面在STDIN中输入日志行),打开另一个shell窗口与Logstash syslog输入进行交互并输入以下命令:

    telnet localhost 5000
    

    复制粘贴以下行作为示例(你可以自己尝试一些,但是要记住,如果grok过滤器对你的数据不正确,它们可能无法解析)。

    Dec 23 12:11:43 louis postfix/smtpd[31499]: connect from unknown[95.75.93.154]
    Dec 23 14:42:56 louis named[16000]: client 199.48.164.7#64817: query (cache) 'amsterdamboothuren.com/MX/IN' denied
    Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)
    Dec 22 18:28:06 louis rsyslogd: [origin software="rsyslogd" swVersion="4.2.0" x-pid="2253" x-info="http://www.rsyslog.com"] rsyslogd was HUPed, type 'lightweight'.
    

    现在,当你的原始shell处理和解析消息时,你应该会看到Logstash的输出!

    {
                     "message" => "Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)",
                  "@timestamp" => "2013-12-23T22:30:01.000Z",
                    "@version" => "1",
                        "type" => "syslog",
                        "host" => "0:0:0:0:0:0:0:1:52617",
            "syslog_timestamp" => "Dec 23 14:30:01",
             "syslog_hostname" => "louis",
              "syslog_program" => "CRON",
                  "syslog_pid" => "619",
              "syslog_message" => "(www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)",
                 "received_at" => "2013-12-23 22:49:22 UTC",
               "received_from" => "0:0:0:0:0:0:0:1:52617",
        "syslog_severity_code" => 5,
        "syslog_facility_code" => 1,
             "syslog_facility" => "user-level",
             "syslog_severity" => "notice"
    }
    

    六、安装及例子

    docker pull logstash:7.14.0
    
    1. 启动
     docker run -d --name=logstash logstash:7.14.0
    
    1. 复制文件,以便下次启动挂载
    docker cp logstash:/usr/share/logstash /mydata/logstash
    
    1. 授权挂载文件夹
    chmod 777 -R /mydata/logstash
    
    1. 在es正常运行的前提下
    • 编辑/mydata/logstash/config/logstash.yml如下
    http.host: "0.0.0.0"
    #根据实际修改es的ip:port
    xpack.monitoring.elasticsearch.hosts: [ "http://192.168.31.196:9200" ]
    # 主管道的Logstash配置路径,如果指定目录或通配符,配置文件将按字母顺序从目录中读取
    path.config: /usr/share/logstash/config/conf.d/*.conf
    #Logstash将其日志写到的目录
    path.logs: /usr/share/logstash/logs
    
    1. 然后根据下面的内容编写logstash.conf文件,就可以启动logstash的收集日志功能
    input {
      file {
        #标签
        type => "systemlog-localhost"
        #采集点(这里一定要注意,由于我是docker启动的logstash,这里是容器内的文件,如果要生成外面日志索引,那么文件的路径一定要挂载正确)
        path => "/usr/share/logstash/logs/access_log.2021-03-19.log"
        #开始收集点
        start_position => "beginning"
        #扫描间隔时间,默认是1s,建议5s
        stat_interval => "5"
      }
    }
    
    output {
      elasticsearch {
      #集群的话,直接添加多个url
        hosts => ["172.17.0.3:9200"]
        #es的用户名和密码
    	user =>"elastic"
    	password =>"elastic"
    	#建立的索引以日期区分
        index => "logstash-system-localhost-%{+YYYY.MM.dd}"
     }
     #在控制台输出logstash的日志
     stdout { codec=> rubydebug }
    }
    

    控制台的输出,说明在es中已经生成索引
    在这里插入图片描述

    1. 在kibana中查看生成的索引
      在这里插入图片描述

    关于如何docker安装es,docker安装kibana

    参考文章

    展开全文
  • logstash 配置

    2019-02-13 15:42:03
    1.原配置 logstash.conf input { tcp { host => "localhost" port => 9601 mode => "server" tags => ["tags"] codec => json_lines } } output { elas...

    1.原配置

    logstash.conf 

    input {

       tcp {

       host => "localhost"

        port => 9601

        mode => "server"

        tags => ["tags"]

        codec => json_lines

        }

    }

    output {

            elasticsearch {

            hosts => "127.0.0.1:9200"

           index => "%{[appname]}-%{+YYYY.MM.dd}"

            }

            stdout { codec => rubydebug}

    }

    logback.xml配置

    <appender name="LOGSTASH"
              class="net.logstash.logback.appender.LogstashTcpSocketAppender">
        <destination>localhost:9601</destination>
        <!-- encoder必须配置,有多种可选 -->
        <encoder charset="UTF-8"
                 class="net.logstash.logback.encoder.LogstashEncoder">
            <!-- "appname":"yang_test" 的作用是指定创建索引的名字时用,并且在生成的文档中会多了这个字段  -->
            <customFields>{"appname":"tillo_approvice_service"}</customFields>
        </encoder>
    </appender>
    
    2.rabbitmq方式

    input {

     rabbitmq {

     type =>"all"

     durable => true

     exchange => "ex_logstash"

     exchange_type => "direct"

     key => "logstash"

     host => "localhost:5672"

     user => "guest"

     password => "guest"

     queue => "faceJob-logstash"

     auto_delete => false

    }}

    output {

            elasticsearch {

            hosts => "127.0.0.1:9200"

            }

            stdout { codec => rubydebug}

    }

     

    logback.xml配置

    <appender name="AMQP" class="org.springframework.amqp.rabbit.logback.AmqpAppender">
        <layout>
            <pattern>
                <![CDATA[%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50}.%method %line - %msg%n ]]>
            </pattern>
        </layout>
        <host>127.0.0.1</host>
        <port>5672</port>
        <username>guest</username>
        <password>guest</password>
        <routingKeyPattern>logstash</routingKeyPattern>
        <declareExchange>false</declareExchange>
        <exchangeType>direct</exchangeType>
        <exchangeName>ex_logstash</exchangeName>
        <generateId>true</generateId>
        <charset>UTF-8</charset>
        <durable>true</durable>
        <deliveryMode>PERSISTENT</deliveryMode>
    </appender>
    

     

     

    多项目配置

     

    input {
       tcp {
         host => "localhost"  
            port => 9601
            mode => "server"  
            tags => ["tags"]
            type =>"aillo_service"
         codec => json_lines         
        }
       file {
           path => "/var/log/*.log"
           start_position => "beginning"
           type => "coturn_service"
        }
    }

    #input {
    # rabbitmq {
    # type =>"all"
    # durable => true
    # exchange => "ex_logstash"
    # exchange_type => "direct"
    # key => "logstash"
    # host => "localhost:5672"
    # user => "guest"
    # password => "guest"
    # queue => "faceJob-logstash"
    # auto_delete => false
    #}
    #}

      

    output {
           if[type]=="aillo_service"{
            elasticsearch {
            hosts => "127.0.0.1:9200"
            index => "%{[appname]}-%{+YYYY.MM.dd}"
            }
            stdout { codec => rubydebug}
            }
            if[type]=="coturn_service"{
            elasticsearch {
            hosts => "127.0.0.1:9200"
            index => "coturn_service-%{+YYYY.MM.dd}"
            }
            stdout { codec => rubydebug}
            }

    }
          }

    }
     

    展开全文
  • Logstash配置文件语法

    2019-09-23 10:57:03
    logstash配置文件语法 数据类型 数组 array:数组可以是单个或者多个字符串值。 path => ["/var/log/messages","/var/log/*.log"] path => "/data/mysql/mysql.log" 如果指定了多次,追加数组,此...

    logstash配置文件语法

    数据类型

    1. 数组
      array:数组可以是单个或者多个字符串值。
    path => ["/var/log/messages","/var/log/*.log"]
    path => "/data/mysql/mysql.log"
    

    如果指定了多次,追加数组,此实例path数组包含三个字符串元素
    2. boolean
    布尔值必须是TRUE或者false
    3. bytes
    指定字节单位
    4. Codec
    logstash编码名称用来表示数据编码。用于input和output段。
    便于数据的处理。codec => "json”
    5. hash
    键值对,注意多个键值对用空格分隔,而不是逗号。

    match => 
    展开全文
  • logstash配置详解

    2020-12-12 14:44:34
    logstash配置详解 #logstash pipeline 包含两个必须的元素:input和output,和一个可选元素:filter。从input读取事件源,(经过filter解析和处理之后),从output输出到目标存储库(elasticsearch或其他) ###命令 ...

    logstash配置详解

    #logstash pipeline 包含两个必须的元素:input和output,和一个可选元素:filter。从input读取事件源,(经过filter解析和处理之后),从output输出到目标存储库(elasticsearch或其他)
    ###命令
         bin/logstash [OPTIONS]
            -n	#logstash实例的名称,如果不设置默认为当前的主机名(比如我的主机名为sqczm)。
            -f	#配置文件,我们可以指定一个特定的文件,也可以指定一个特定的目录,如果指定的是特定的目录,则logstash会读取该目录下的所有文本文件,将其在内存中拼接成完整的大配置文件,再去执行。
            -e	#给定的可直接执行的配置内容,也就是说我们可以不指定-f参数,直接把配置文件中的内容作为字符串放到-e参数后面。
            -w	#指定工作线程的个数
            -p	#logstash用来加载插件的目录
            -l	#日志输出文件,如果不设置,logstash将会把日志发送至标准的output
            -t	#检查配置的语法是否正确并退出
            -r	#监视配置文件的变化,并且自动重新加载修改后的配置文件
            -config.reload.interval RELOAD_INTERVAL	#为了检查配置文件是否改变,而去拉取配置文件的频率
            -http.host HTTP_HOST	#Web API绑定的主机,默认为“127.0.0.1”
            -http.port HTTP_PORT	#Web API绑定的端口,默认为9600-9700之间
            -log.format FORMAT	#logstash写它自身日志的时候使用json还是文本格式,默认是“plain”
            -path.settings SETTINGS_DIR	#设置包含logstash.yml配置文件的目录,比如log4j日志配置。也可以设置LS_SETTINGS_DIR环境变量
    
    ### stdin 标准输入
    ### file 从文件读取数据 
        file{
            path => ['/var/log/nginx/access.log']  #要输入的文件路径
            type => 'nginx_access_log'
            start_position => "beginning"
        }
        # path  可以用/var/log/*.log,/var/log/**/*.log,如果是/var/log则是/var/log/*.log
        # type 通用选项. 用于激活过滤器
        # start_position 选择logstash开始读取文件的位置,beginning或者end。
        #还有一些常用的例如:discover_interval,exclude,sincedb_path,sincedb_write_interval等可以参考官网
    ### syslog 通过网络将系统日志消息读取为事件
        syslog{
            port =>"514" 
            type => "syslog"
        }
        # port 指定监听端口(同时建立TCP/UDP的514端口的监听)
        #从syslogs读取需要实现配置rsyslog:
        # cat /etc/rsyslog.conf   加入一行
        *.* @172.17.128.200:514   #指定日志输入到这个端口,然后logstash监听这个端口,如果有新日志输入则读取
        # service rsyslog restart   #重启日志服务
    ### beats 从Elastic beats接收事件 
        beats {
            port => 5044   #要监听的端口
        }
        # 还有host等选项
        # 从beat读取需要先配置beat端,从beat输出到logstash。
        # vim /etc/filebeat/filebeat.yml 
            ..........
            output.logstash:
            hosts: ["localhost:5044"]
    ### kafka  将 kafka topic 中的数据读取为事件
        kafka{
            bootstrap_servers=> "kafka01:9092,kafka02:9092,kafka03:9092"
            topics => ["access_log"]
            group_id => "logstash-file"
            codec => "json"
        }
        kafka{
            bootstrap_servers=> "kafka01:9092,kafka02:9092,kafka03:9092"
            topics => ["weixin_log","user_log"]  
            codec => "json"
        }
        # bootstrap_servers 用于建立群集初始连接的Kafka实例的URL列表。# topics  要订阅的主题列表,kafka topics# group_id 消费者所属组的标识符,默认为logstash。kafka中一个主题的消息将通过相同的方式分发到Logstash的group_id
        # codec 通用选项,用于输入数据的编解码器。
    
    #filter plugin 过滤器插件,对事件执行中间处理
    ### grok   解析文本并构造 。把非结构化日志数据通过正则解析成结构化和可查询化 .
        #grok 语法:%{SYNTAX:SEMANTIC}   即 %{正则:自定义字段名}
        #多个match匹配规则,如果前面的匹配失败可以使用后面的继续匹配
        grok {
            match => ["message", "%{IP:clientip} - %{USER:user} \[%{HTTPDATE:raw_datetime}\] \"(?:%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion})\" (?:\"%{DATA:body}\" )?(?:\"%{DATA:cookie}\" )?%{NUMBER:response} (?:%{NUMBER:bytes:int}|-) \"%{DATA:referrer}\" \"%{DATA:agent}\" (?:(%{IP:proxy},? ?)*|-|unknown) (?:%{DATA:upstream_addr} |)%{NUMBER:request_time:float} (?:%{NUMBER:upstream_time:float}|-)"]
            match => ["message", "%{IP:clientip} - %{USER:user} \[%{HTTPDATE:raw_datetime}\] \"(?:%{WORD:verb} %{URI:request} HTTP/%{NUMBER:httpversion})\" (?:\"%{DATA:body}\" )?(?:\"%{DATA:cookie}\" )?%{NUMBER:response} (?:%{NUMBER:bytes:int}|-) \"%{DATA:referrer}\" \"%{DATA:agent}\" (?:(%{IP:proxy},? ?)*|-|unknown) (?:%{DATA:upstream_addr} |)%{NUMBER:request_time:float} (?:%{NUMBER:upstream_time:float}|-)"]       
        }
    ### date   日期解析  解析字段中的日期,然后转存
        date{
            match => ["raw_datetime","YYYY-MM-dd HH:mm:ss,SSS"]
            remove_field =>["raw_datetime"]
        }
    ### mutate  对字段做处理 重命名、删除、替换和修改字段。
        #covert 类型转换。类型包括:integer,float,integer_eu,float_eu,string和boolean 
        filter{
            mutate{
                covert => ["response","integer","bytes","float"]  #数组的类型转换
                convert => {"message"=>"integer"}
            }
        }
        #split   使用分隔符把字符串分割成数组
        #merge  合并字段  。数组和字符串 ,字符串和字符串
        #rename   对字段重命名
        #remove_field    移除字段
        #join  用分隔符连接数组,如果不是数组则不做处理 
    
        #gsub  用正则或者字符串替换字段值。仅对字符串有效 
        #update  更新字段。如果字段不存在,则不做处理
    
        #replace 更新字段。如果字段不存在,则创建
    ### geoip  根据来自Maxmind GeoLite2数据库的数据添加有关IP地址的地理位置的信息 
    ### ruby    ruby插件可以执行任意Ruby代码
    ### urldecode    用于解码被编码的字段,可以解决URL中 中文乱码的问题 
    ### kv   通过指定分隔符将字符串分割成key/value 
    ### useragent 添加有关用户代理(如系列,操作系统,版本和设备)的信息
    ### logstash 比较运算符
      等于:   ==, !=, <, >, <=, >=
      正则:   =~, !~ 
      包含关系:  in, not in
      支持的布尔运算符:and, or, nand, xor
      支持的一元运算符: !
    ### output plugin  输出插件,将事件发送到特定目标。
        #stdout标准输出。将事件输出到屏幕上 
            output{
                stdout{
                    codec => "rubydebug"
                }
            }
        #file将事件写入文件
            file {
               path => "/data/logstash/%{host}/{application}
               codec => line { format => "%{message}"} 
            }
        #kafka  将事件发送到kafka 
            kafka{
                bootstrap_servers => "localhost:9092"
                topic_id => "test_topic"  #必需的设置。生成消息的主题
            }
        #elasticseach  在es中存储日志 
            elasticsearch {
                hosts => "localhost:9200"
                index => "nginx-access-log-%{+YYYY.MM.dd}"  
            }
            #index 事件写入的索引。可以按照日志来创建索引,以便于删旧数据和按时间来搜索日志
    
    展开全文
  • Logstash配置 这些是我的私有logstash配置文件。 它们是为Arch Linux和我使用的软件创建的。 说明在文件中。 由于这是我的第一个logstash设置,因此某些配置可能很奇怪。 如果您比我还了解任何其他信息,请提出问题...
  • logstash配置文件的写法解析

    千次阅读 2019-12-19 14:53:47
    /opt/logstash/bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}' 然后你会发现终端在等待你的输入。没问题,敲入 test_str,回车,返回下面的结果就是OK的! 从test.conf文件中写入 ...
  • windows logstash配置

    2020-06-30 14:39:43
    1.多路径配置 2.解决log显示成多行问题 3.解决中文乱码问题 Sample Logstash configuration for creating a simple Beats -> Logstash -> Elasticsearch pipeline. input { file { path=>“C:/website/...
  • Meraki MX系列Logstash配置 可与Logstash v1.5一起使用并经过测试。* 这将监视您的所有事件,IDS,流和URL Syslog信息。 我开始这个项目是因为我找不到他们最新的东西并且没有我想要的东西。 我想做一些易于配置,...
  • logstash配置定时

    2021-05-13 16:53:44
    jdbc { # 数据库 jdbc_connection_string => "jdbc:mysql://59.110.240.77:3306/infomation" jdbc_user => "fetter" jdbc_password => "fetter" # mysql驱动解压的位置 jdbc_driver_library =>...
  • 一、测试Logstash是否运行正常 解压移动 将解压出来的移动至soft下的logstash622文件夹 在bin下面启动 输入输出 ./logstash -e 'input { stdin {} } output { stdout {} }' 进入等待输入界面 输入hello 查看...
  • 文章目录1. logstash安装1.1 下载1.2 添加配置1.3 在ES中创建映射模板1.4 启动logstash2. 异常2.1 could not find java;...Logstash配置es search-guard插件 logstash文档https://elkguide.elasticsear...
  • logstash配置文件.rar

    2019-12-18 20:55:40
    logstash6.2.2多管道输出es聚合配置案例,共3个管道,文章,论坛,产品管道。其中:文章和论坛是简单配置,产品使用聚合配置,用ruby语法写了聚合逻辑。
  • Logstash配置详解

    2020-04-08 10:54:43
    目录摘要图文并茂搞定Logstash一、版本说明二、下载三、安装四、测试五、配置文件1 输入插件1.1 标准输入(Stdin)1.2 读取文件(File)1.3 读取 Syslog 数据1.4 读取 Collectd 数据过滤插件输出插件3.1 标准输出(Stdout...
  • Logstash配置以服务方式运行

    千次阅读 2021-03-16 09:59:46
    参考: https://blog.51cto.com/12217124/2424209 CentOS7设置systemd管理Logstash服务遇到的坑 (因为端口小于1024需要用root...Logstash最常见的运行方式即命令行运行 ./bin/logstash -f logstash....
  • logstash 配置文件详解

    千次阅读 2019-08-06 12:15:37
    1: bin / logstash -f second-pipeline.conf --config.test_and_exit 该--config.test_and_...当配置文件通过配置测试时,使用以下命令启动 Logstash: bin / logstash -f second-pipeline.conf 2: bin / lo...
  • springboot 整合logstash配置文件

    千次阅读 2020-01-20 11:00:22
    配置参考1: <?xml version="1.0" encoding="UTF-8"?> <configuration> <include resource="org/springframework/boot/logging/logback/defaults.xml"/> <springProperty scop...
  • logstash配置文件

    2018-08-30 21:49:01
    配置文件有logstash的stdin、file、tcp、udp、syslog、beats、grok、elasticsearch插件配置
  • 用于解析 vsftpd 日志记录的 Logstash 配置和 grok 模式 用法 安装logstash 将50-filter-vsftpd.conf添加到/etc/logstash/conf.d 将vsftpd.grok添加到/etc/logstash/patterns.d 重启logstash 包含的 Logstash ...
  • filebeat+logstash配置

    2020-10-29 09:27:16
    Logstash依赖于JVM,在启动的时候大家也很容易就能发现它的启动速度很慢很慢,但logstash的好处是支持很多类型的插件,支持对数据做预处理。而filebeat很轻量,前身叫logstash-forward,是使用Golang开发的,所以不...
  • 要添加测试,只需添加具有给定输入input.log ,ETL配置logstash.conf和预期输出output.log的新目录。 先决条件 NodeJS>第8版 码头工人 重击> v4 建立 克隆存储库 运行npm install 如下设置测试目录 __tests__ ...
  • 关于logstash安装:https://www.cnblogs.com/toov5/p/10301727.html Logstash是一个开源数据收集引擎,具有实时管道功能。...下面进一步详细说配置: jdbc_driver_library: jdbc mysql 驱动的路径,在上一步中已...
  • ClearPassAndELK ... 10-logstash-syslog.conf一个示例Logstash配置文件,它将处理ClearPass生成的SYSLOG事件并将其存储在Elasticsearch集群中。 我的计划是随着我更多地了解如何使用Kibanna分析Elasti
  • logstash 配置文件编写详解

    千次阅读 2018-09-28 17:50:47
    说明 它一个有jruby语言编写的运行在java...通过在配置文件编写输入(input),过滤(filter),输出(output)相关规则,对数据收集转发。 配置文件编写语法 大致格式如下 # 输入 input { ... } #...
  • Logstash配置详解(不断更新)

    万次阅读 2018-11-27 19:39:16
    即重启Logstash时,从上次读取的位置继续 sincedb 2 如何即时读取到文件的新内容 定时检查文件是否有更新 3 如何发现新文件并进行读取 定时检查是否有新文件生成 4 如何文件发生了归档(r...
  • 配置文件 | logstash配置文件详解

    万次阅读 2018-09-28 15:25:17
    说明 /logstash/config/logstash.yml:主要用于控制...配置参数说明 logstash.yml 参数 用途 默认值 node.name 节点名称 主机名称 path.data /数据存储路径 LOGSTASH_HOME/da...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 31,248
精华内容 12,499
关键字:

logstash配置