精华内容
下载资源
问答
  • 分布式日志收集框架Flume 分布式日志收集框架Flume 1.业务现状分析 WebServer/ApplicationServer分散在各个机器上 想在大数据平台Hadoop进行统计分析 日志如何收集到Hadoop平台上 解决方案及存在...

    分布式日志收集框架Flume

    分布式日志收集框架Flume

    1.业务现状分析

    1490928-20181028234432828-439808085.png

    • WebServer/ApplicationServer分散在各个机器上
    • 想在大数据平台Hadoop进行统计分析
    • 日志如何收集到Hadoop平台上
    • 解决方案及存在的问题

    • 如何解决我们的数据从其他的server上移动到Hadoop之上?
      1. shell: cp --> Hadoop集群的机器上,hdfs dfs -put ....(有很多问题不好解决,容错、负载均衡、时效性、压缩)
      2. Flume,从 A --> B 移动日志

    2.Flume概述

    • Flume官网:http://flume.apache.org/

      Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data.
      Flume是由Apache基金会提供的一个分布式、高可靠、高可用的服务,用于分布式的海量日志的高效收集、聚合、移动系统。

    • Flume设计目标
      1. 可靠性:高科要
      2. 扩展性:模块可扩展
      3. 管理性:agent管理
    • 界同类产品对比
      1. Flume: Cloudera/Apache, Java语言开发。
      2. Logstash: ELK(ElasticsSearch, Logstash, Kibana)
      3. Scribe: Facebook, 使用C/C++开发, 负载均衡不是很好, 已经不维护了。
      4. Chukwa: Yahoo/Apache, 使用Java语言开发, 负载均衡不是很好, 已经不维护了。
      5. Fluentd: 和Flume类似, Ruby开发。
    • Flume发展史
      1. Cloudera公司提出0.9.2,叫Flume-OG
      2. 2011年Flume-728编号,重要里程碑(Flume-NG),贡献给Apache社区
      3. 2012年7月 1.0版本
      4. 2015年5月 1.6版本
      5. ~ 1.7版本

    3.Flume架构及核心组件

    1490928-20181028234451303-1357634157.png

    Flume有三大组件

    • Source: 收集,指定数据源从哪里来(Avro, Thrift, Spooling, Kafka, Exec)
    • Channel: 聚集,把数据先存在(Memory, File, Kafka等用的比较多)
    • Sink: 把数据写到某个地方去(HDFS, Hive, Logger, Avro, Thrift, File, ES, HBase, Kafka等)

    4.Flume环境部署

    • 前置条件
      • Java Runtime Environment - Java 1.8 or later(安装Java)
      • Memory - Sufficient memory for configurations used by sources, channels or sinks(足够内存)
      • Disk Space - Sufficient disk space for configurations used by channels or sinks(足够空间)
      • Directory Permissions - Read/Write permissions for directories used by agent(读写权限)
    • 1.安装JDK(下载,解压,安装,配置环境变量)
    • 2.安装Flume(下载,加压,安装,配置环境变量,检测:flume-ng version)

    5.Flume实战

    • 需求1:从指定网络端口采集数据输出到控制台
      • flume-conf.properties
        • A) 配置Source
        • B) 配置Channel
        • C) 配置Sink
        • D) 把以上三个组件串起来
      # example.conf: A single-node Flume configuration
      
      # a1: agent名称
      # r1:source的名称
      # k1:sink的名称
      # c1:channel的名称
      
      # Name the components on this agent
      a1.sources = r1
      a1.sinks = k1
      a1.channels = c1
      
      # Describe/configure the source
      a1.sources.r1.type = netcat
      a1.sources.r1.bind = localhost
      a1.sources.r1.port = 44444
      
      # Describe the sink
      a1.sinks.k1.type = logger
      
      # Use a channel which buffers events in memory
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 1000
      a1.channels.c1.transactionCapacity = 100
      
      # Bind the source and sink to the channel
      a1.sources.r1.channels = c1
      a1.sinks.k1.channel = c1
      • 启动Agent
      flume-ng agent \
      --name $agent_name \
      --conf conf \
      --conf-file conf/flume-conf.properties \
      -Dflume.root.logger=INFO,console
      
      flume-ng agent \
      --name a1 \
      --conf $FLUME_HOME/conf \
      --conf-file $FLUME_HOME/conf/example.conf \
      -Dflume.root.logger=INFO,console
    • 需求2:监控一个文件实时采集新增的数据输出到控制台
      • 1.Agent选型:exec source + memory channel + logger sink
      • 2.配置文件
      # exec-memory-logger.conf: A single-node Flume configuration
      
      # a1: agent名称
      # r1:source的名称
      # k1:sink的名称
      # c1:channel的名称
      
      # Name the components on this agent
      a1.sources = r1
      a1.sinks = k1
      a1.channels = c1
      
      # Describe/configure the source
      a1.sources.r1.type = exec
      a1.sources.r1.command = tail -F /home/k.o/data/data.log
      a1.sources.r1.shell = /bin/sh -c
      
      # Describe the sink
      a1.sinks.k1.type = logger
      
      # Use a channel which buffers events in memory
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 1000
      a1.channels.c1.transactionCapacity = 100
      
      # Bind the source and sink to the channel
      a1.sources.r1.channels = c1
      a1.sinks.k1.channel = c1
      • 启动Agent
      flume-ng agent \
      --name $agent_name \
      --conf conf \
      --conf-file conf/flume-conf.properties \
      -Dflume.root.logger=INFO,console
      
      flume-ng agent \
      --name a1 \
      --conf $FLUME_HOME/conf \
      --conf-file $FLUME_HOME/conf/exec-memory-logger.conf \
      -Dflume.root.logger=INFO,console
    • 需求3:将A服务器上的日志实时采集到B服务器

    1490928-20181028234508896-1442320648.png

    • 技术选型:
      1.exec source + memory channel + avro sink
      2.arro source + memory channel + logger sink
    # exec-memory-avro.conf: A single-node Flume configuration
    
    # exec-memory-avro: agent名称
    # exec-source:source的名称
    # avro-sink:sink的名称
    # memory-channel:channel的名称
    
    # Name the components on this agent
    exec-memory-avro.sources = exec-source
    exec-memory-avro.sinks = avro-sink
    exec-memory-avro.channels = memory-channel
    
    # Describe/configure the source
    exec-memory-avro.sources.exec-source.type = exec
    exec-memory-avro.sources.exec-source.command = tail -F /home/k.o/data/data.log
    exec-memory-avro.sources.exec-source.shell = /bin/sh -c
    
    # Describe the sink
    exec-memory-avro.sinks.avro-sink.type = avro
    exec-memory-avro.sinks.avro-sink.hostname = localhost
    exec-memory-avro.sinks.avro-sink.port = 44444
    
    # Use a channel which buffers events in memory
    exec-memory-avro.channels.memory-channel.type = memory
    exec-memory-avro.channels.memory-channel.capacity = 1000
    exec-memory-avro.channels.memory-channel.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    exec-memory-avro.sources.exec-source.channels = memory-channel
    exec-memory-avro.sinks.avro-sink.channel = memory-channel
    # avro-memory-logger.conf: A single-node Flume configuration
    
    # avro-memory-logger: agent名称
    # exec-source:source的名称
    # logger-sink:sink的名称
    # memory-channel:channel的名称
    
    # Name the components on this agent
    avro-memory-logger.sources = avro-source
    avro-memory-logger.sinks = logger-sink
    avro-memory-logger.channels = memory-channel
    
    # Describe/configure the source
    avro-memory-logger.sources.avro-source.type = avro
    avro-memory-logger.sources.avro-source.bind = localhost
    avro-memory-logger.sources.avro-source.port = 44444
    
    # Describe the sink
    avro-memory-logger.sinks.logger-sink.type = logger
    
    # Use a channel which buffers events in memory
    avro-memory-logger.channels.memory-channel.type = memory
    avro-memory-logger.channels.memory-channel.capacity = 1000
    avro-memory-logger.channels.memory-channel.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    avro-memory-logger.sources.avro-source.channels = memory-channel
    avro-memory-logger.sinks.logger-sink.channel = memory-channel
    • 启动Agent
    # 先启动 avro-memory-logger
    flume-ng agent \
    --name avro-memory-logger \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/conf/avro-memory-logger.conf \
    -Dflume.root.logger=INFO,console
    
    # 再启动 exec-memory-avro
    flume-ng agent \
    --name exec-memory-avro \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/conf/exec-memory-avro.conf \
    -Dflume.root.logger=INFO,console
    • 日志收集过程
      1. 机器A上监控一个文件,当我们访问主站时会有用户行为日志记录到access.log钟
      2. avro sink把新产生的日志输出到对应的avro source指定的hostname和port上
      3. 通过avro source对应的logger将我们收集的日志输出到控制台
    posted @ 2018-10-28 23:45 eat.u 阅读(...) 评论(...) 编辑 收藏
    展开全文
  • 在.NET Core中使用Exceptionless分布式日志收集框架 一.Exceptionless简介  Exceptionless 是一个开源的实时的日志收集框架,它可以应用在基于 ASP.NET,ASP.NET Core,Web Api,Web Forms,WPF,...

    在.NET Core中使用Exceptionless分布式日志收集框架

    一.Exceptionless简介

      Exceptionless 是一个开源的实时的日志收集框架,它可以应用在基于 ASP.NET,ASP.NET Core,Web Api,Web Forms,WPF,Console,MVC 等技术栈的应用程序中,并且提供了Rest接口可以应用在 Javascript,Node.js 中。它将日志收集变得简单易用并且不需要了解太多的相关技术细节及配置。
    在以前,我们做日志收集大多使用 Log4net,Nlog 等框架,在应用程序变得复杂并且集群的时候,可能传统的方式已经不是很好的适用了,因为收集各个日志并且分析他们将变得麻烦而且浪费时间。
    现在Exceptionless团队给我们提供了一个更好的框架来做这件事情,我认为这是非常伟大并且有意义的,感谢他们。

    二.使用

       这里有两种方式(本地,远程)

    • 本地需要一定的环境要求(NET 4.6.1 、Java JDK 1.8+、IIS Express 8+)
    • 远程就比较方便了,非常的方便,那么今天就给大家来一波Exceptionless远程使用的教程。

    三.注册账号并配置

      官网:http://exceptionless.com

      GitHub:https://github.com/exceptionless/Exceptionless

      关于官方首页访问慢的问题,那没有办法,最好翻个梯子,当然也就只是首页慢了点,因为首页有谷歌的api,管理页面就没有那么卡了(36ms)左右。

      由于我们使用的远程日志,我们得进入官方申请账号,进入官方进行账号注册。

      进入创建项目,输入项目信息,创建项目!

    这里当然可以选择我们喜爱的.NET Core !官方已经给你说好了,那么这个时候你的项目就可以用Exceptionless来记录日志了。那我们来创建一个.NET Core 程序吧?

    四.在程序中进行远程记录

      nuget完Exceptionless.AspNetCore之后会出现一个txt文件,这个txt文件是给你的一个简单的使用教程, 当然你可以选择删除。

    配置中间件

    public void Configure(IApplicationBuilder app, IHostingEnvironment env)
            {
                app.UseExceptionless("xxxxxxxxxxxxxx");
                app.UseMvc();
            }

    这个时候你就可以正常使用了,那么我们现在故意报错一下!

    public ActionResult<IEnumerable<string>> Get()
            {
                throw new Exception("my text info");
                return new string[] { "value1", "value2" };
            }

     现在启动我们的浏览器,报错成功。

    那我们的框架收集到了吗?刷洗一下,成功记录下来了。

    那它替我们收集了什么信息呢?我们一探究竟!点进去噢!发现这真的太棒了。

    除了记录一些基本的http信息之外,竟还有系统版本、系统架构、电脑版本、运行时等。灰常NB啊。

    当然我们恶意报错是不对的,可以trycath一下,那么代码就变成了这样。

    try
    {
        throw new ApplicationException(Guid.NewGuid().ToString());
    }
     catch(Exception ex)
    {
         ex.ToExceptionless().Submit();
    }

    除了一些异常记录之外,ExceptionLess还提供了Log (日志)、Feature Usages(功能用途)、404、Custom Event(自定义事件)。扩展的东西是在 ExceptionlessClient.Default类中。有兴趣的话可以研究。

    附日志记录封装类(很简单的那种。大佬别打我)

    public class ExceptionLessLog :ILog
        {
    
            /// <summary>
            /// 跟踪
            /// </summary>
            public void Trace(string message, params string[] tags)
            {
                ExceptionlessClient.Default.CreateLog(message, LogLevel.Trace).AddTags(tags).Submit();
            }
    
            /// <summary>
            /// 调试
            /// </summary>
            public void Debug(string message, params string[] tags)
            {
                ExceptionlessClient.Default.CreateLog(message, LogLevel.Debug).AddTags(tags).Submit();
            }
    
            /// <summary>
            /// 信息
            /// </summary>
            public void Info(string message, params string[] tags)
            {
                ExceptionlessClient.Default.CreateLog(message, LogLevel.Info).AddTags(tags).Submit();
            }
    
            /// <summary>
            /// 警告
            /// </summary>
            public void Warn(string message, params string[] tags)
            {
                ExceptionlessClient.Default.CreateLog(message, LogLevel.Warn).AddTags(tags).Submit();
            }
    
            /// <summary>
            /// 错误
            /// </summary>
            public void Error(string message, params string[] tags)
            {
                ExceptionlessClient.Default.CreateLog(message, LogLevel.Error).AddTags(tags).Submit();
            }
        }

    总结:

      ExceptionLess功能非常强大,你看我都没怎么敲代码就搞定了这么炫酷的功能,没试过的小伙伴赶紧试一试吧.

    • 相关文章1:http://www.cnblogs.com/savorboard/p/exceptionless.html (远程)
    • 相关文章2:https://www.cnblogs.com/yilezhu/p/9193723.html(本地部署)
    posted @ 2019-01-24 17:39 张子浩 阅读(...) 评论(...) 编辑 收藏
    展开全文
  • 2.分布式日志收集框架Flume2.1业务现状分析

    2.分布式日志收集框架Flume

    2.1业务现状分析

    • 使用Hadoop集群和一些分布式框架来处理大量的数据
    • 如何解决我们的日志数据从其他server中移动到Hadoop集群之上??
      • shell cp hadoop集群的机器上,hadoop fs -put … /
      • 存在的问题:
      • 如何做监控,比如一个机器断了
      • 如果采用CP的话,必须指定时间间隔,则会失去时效性
      • 文本格式的数据直接通过网络传输,对IO开销特别大
      • 容错和负载均衡
    • 引入Flume后,解决以上问题

    2.2Flume概述

    Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

    Flume官网示意图

    • 从WebServer收集数据到目标HDFS
    • 三大组件:Source,Channel, Sink
    • 设计目标:
      • 可靠性
      • 扩展性
      • 管理性
    • 业界同类产品对比
      • Flume: Cloudera/Apache Java
      • Scribe: Facebook C/C++ 不再维护
      • Chukwa: Yahoo/Apache Java 不再维护
      • Kafka:
      • Fluentd: Ruby
      • Logstash: ELK

    2.3Flume发展史

    • Cloudera 0.9.2 Flume-OG
    • flume-728 Flume-NG ==> Apache

    Flume架构及核心组件

    • Source:指定数据源从哪里收集
    • Channel:聚集
    • Sink:

    2.4Flume环境部署

    • Java1.7+ 本机使用的是java1.8
    • 足够的内存
    • 读写权限
    • 环境安装过程
        #Java安装:
        1.解压jdk
        tar -zxvf jdk-8u144-linux-x64.tar.gz -C ~/app/
    
        2.配置系统环境变量到
        vim ~/.bash_profile
        如下:
        export JAVA_HOME=/home/hadoop/app/jdk1.8.0_144
        export PATH=$JAVA_HOME/bin:$PATH
    
        3.生效配置文件
        source ~/.bash_profile
        java -version
    • [flume-ng-1.6.0-cdh5.7.0.tar.gz下载

    • flume-ng-1.6.0-cdh5.7.0.tar.gz安装与配置:
      
      1.解压
      tar -zxvf flume-ng-1.6.0-cdh5.7.0.tar.gz -C ~/app
      
      2.配置系统环境变量到
      export FLUME_HOME=/home/hadoop/app/apache-flume-1.6.0-cdh5.7.0-bin
      export PATH=$FLUME_HOME/bin:$PATH
      
      3.生效配置文件
      source ~/.bash_profile
      
      4.配置Flume Conf
      cd ~/app/apache-flume-1.6.0-cdh5.7.0-bin/conf
      cp flume-env.sh.template flume-env.sh
      vim flume-env.sh
      加入一行,设置JAVA_HOME
      export JAVA_HOME=/home/hadoop/app/jdk1.8.0_144

    2.3Flume实战

    • 需求:从指定的网络端口采集数据输出到控制台,参考网站

    • 使用Flume的关键就是写配置文件

      • 1.配置Source

      • 2.配置Channel

      • 3.配置Sink

      • 将以上组件串起来

        a1: agent名称
        r1: 数据源的名称
        k1: sink的名称
        c1: channel的名称
        netcat TCP source:会监听某个端口把每一行的数据转换成一个event(Flume数据传输的单元),相关参数是type,bing绑定主机,port指定的端口
        logger sink:日志event信息通过INFO级别输出,只需要指定一个k1.type
        channel:
    
        # example.conf: A single-node Flume configuration
    
        # Name the components on this agent
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1
    
        # Describe/configure the source
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = hadoop001
        a1.sources.r1.port = 44444
    
        # Describe the sink
        a1.sinks.k1.type = logger
    
        # Use a channel which buffers events in memory
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100
    
        # Bind the source and sink to the channel
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1
    • 配置完后,进行启动agent
     进入到bin目录执行:
          flume-ng agent \
          -n a1 \
          -c $FLUME_HOME/conf \
          -f $FLUME_HOME/conf/example.conf \
          -Dflume.root.logger=INFO,console
    
          //监听端口是44444,可以通过开启一个新窗口,通过`telnet hadoop001 44444`发送消息,观察FLume接受到的数据
          ```
    
    
          Event: { headers:{} body: 68 65 6C 6C 6F 0D  hello. }
          Event是Flume数据传输的基本单元
          Event= 可选的header+ byte array
        - 13691256985   
    • 需求二:监控一个文件实时采集新增的数据输出到控制台
      • Agent选型:exec source + memeory channel + logger sink
     # Name the components on this agent
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1
    
        # Describe/configure the source
        a1.sources.r1.type = exec
        a1.sources.r1.command = tail -F /home/hadoop/data/data.log
        a1.sources.r1.shell = /bin/sh -C
    
        # Describe the sink
        a1.sinks.k1.type = logger
    
        # Use a channel which buffers events in memory
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100
    
        # Bind the source and sink to the channel
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1
    • 需求二:监控一个文件实时采集新增的数据输出到控制台
        flume-ng agent \
        -n a1 \
        -c $FLUME_HOME/conf \
        -f $FLUME_HOME/conf/exec-memory-logger.conf \
        -Dflume.root.logger=INFO,console
    • flume的sink数据到hdfs

    • flume的sink数据到kafka

    • 需求三:将A服务器上的日志采集到B服务器上

      • 基本原理:将A服务器上的数据输出Avro sink作为B服务器上的输入Avro source.
          #exec-memory-avro.conf
          # Name the components on this agent
          exec-memory-avro.sources = exec-source
          exec-memory-avro.sinks = avro-sink
          exec-memory-avro.channels = memory-channel
    
          # Describe/configure the source
          exec-memory-avro.sources.exec-source.type = exec
          exec-memory-avro.sources.exec-source.command = tail -F /home/hadoop/data/data.log
          exec-memory-avro.sources.exec-source.shell = /bin/sh -c
    
          # Describe the sink
          exec-memory-avro.sinks.avro-sink.type = avro
          exec-memory-avro.sinks.avro-sink.hostname = hadoop001
          exec-memory-avro.sinks.avro-sink.port = 44444
    
          # Use a channel which buffers events in memory
          exec-memory-avro.channels.memory-channel.type = memory
          exec-memory-avro.channels.memory-channel.capacity = 1000
          exec-memory-avro.channels.memory-channel.transactionCapacity = 100
    
          # Bind the source and sink to the channel
          exec-memory-avro.sources.exec-source.channels = memory-channel
          exec-memory-avro.sinks.avro-sink.channel = memory-channel
    
          ----------------------------------------------------------------------------------
          #avro-memory-logger.conf
    
          # Name the components on this agent
          avro-memory-logger.sources = avro-source
          avro-memory-logger.sinks = logger-sink
          avro-memory-logger.channels = memory-channel
    
          # Describe/configure the source
          avro-memory-logger.sources.avro-source.type = avro
          avro-memory-logger.sources.avro-source.bind = hadoop001
          avro-memory-logger.sources.avro-source.port = 44445
    
          # Describe the sink
          avro-memory-logger.sinks.logger-sink.type = logger
    
          # Use a channel which buffers events in memory
          avro-memory-logger.channels.memory-channel.type = memory
          avro-memory-logger.channels.memory-channel.capacity = 1000
          avro-memory-logger.channels.memory-channel.transactionCapacity = 100
    
          # Bind the source and sink to the channel
          avro-memory-logger.sources.avro-source.channels = memory-channel
          avro-memory-logger.sinks.logger-sink.channel = memory-channel
    
    * 执行
    
          先启动avro-memory-logger
          flume-ng agent \
          -n avro-memory-logger \
          -c $FLUME_HOME/conf \
          -f $FLUME_HOME/conf/avro-memory-logger.conf \
          -Dflume.root.logger=INFO,console
          ```
    
    
          再启动exec-memory-avro.conf
          flume-ng agent \
          -n exec-memory-avro \
          -c $FLUME_HOME/conf \
          -f $FLUME_HOME/conf/exec-memory-avro.conf \
          -Dflume.root.logger=INFO,console
    
    
          往data.log中写数据,观察输出
          echo heheheehehe4 >>  data.log 
    
          结果如下:
          2017-12-29 12:33:31,925 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 68 65 68 65 65 68 65 68 65 34             heheheehehe4 }
    

    日志收集过程:

    • 机器A上监控一个文件,当我们访问主站时会有用户行为日志记录到access.log中
    • Avro sink把新产生的日志输出到对应的avro,第二台机器接受这个avro输入作为avro source当做输入
    展开全文
  • 2. 分布式日志收集框架Flume 2.1 业务现状分析 如上图,大量的系统和各种服务的日志数据持续生成。用户有了很好的商业创意想要充分利用这些系统日志信息。比如用户行为分析,轨迹跟踪等等。如何将日志上传到Hadoop...

    2. 分布式日志收集框架Flume

    image.png

    2.1 业务现状分析

    image.png
    如上图,大量的系统和各种服务的日志数据持续生成。用户有了很好的商业创意想要充分利用这些系统日志信息。比如用户行为分析,轨迹跟踪等等。
    如何将日志上传到Hadoop集群上?
    对比方案存在什么问题,以及有什么优势?

    • 方案1: 容错,负载均衡,高延时等问题如何消除?
    • 方案2: Flume框架

    2.2 Flume概述

    flume官网 http://flume.apache.org
    Flume is a distributed, reliable, and available service for efficiently collecting(收集), aggregating(聚合), and moving(移动)large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

    Flume是有Cloudera提供的一个分布式、高可靠、高可用的服务,用于分布式的海量日志的高效收集、聚合、移动的系统
    Flume的设计目标

    • 可靠性
    • 扩展性
    • 管理性(agent有效的管理者)

    业界同类产品对比

    • Flume(*): Cloudera/Apache Java
    • Scribe: Facebook C/C++ 不再维护
    • Chukwa:Yahoo/Apache Java 不再维护
    • Fluentd:Ruby
    • Logstash(*):ELK(ElasticSearch,Kibana)

    Flume发展史

    • Cloudera 0.9.2 Flume-OG
    • flume-728 Flume-NG => Apache
    • 2012.7 1.0
    • 2015.5 1.6 (* +)
    • ~ 1.8

    2.3 Flume架构及核心组件

    image.png

    1. Source(收集)
    2. Channel(聚合)
    3. Sink(输出)

    multi-agent flow

    image.png
    In order to flow the data across multiple agents or hops, the sink of the previous agent and source of the current hop need to be avro type with the sink pointing to the hostname (or IP address) and port of the source.
    A very common scenario in log collection is a large number of log producing clients sending data to a few consumer agents that are attached to the storage subsystem. For example, logs collected from hundreds of web servers sent to a dozen of agents that write to HDFS cluster.

    image.png
    This can be achieved in Flume by configuring a number of first tier agents with an avro sink, all pointing to an avro source of single agent (Again you could use the thrift sources/sinks/clients in such a scenario). This source on the second tier agent consolidates the received events into a single channel which is consumed by a sink to its final destination.

    Multiplexing the flow

    Flume supports multiplexing the event flow to one or more destinations. This is achieved by defining a flow multiplexer that can replicate or selectively route an event to one or more channels.
    image.png
    The above example shows a source from agent “foo” fanning out the flow to three different channels. This fan out can be replicating or multiplexing. In case of replicating flow, each event is sent to all three channels. For the multiplexing case, an event is delivered to a subset of available channels when an event’s attribute matches a preconfigured value. For example, if an event attribute called “txnType” is set to “customer”, then it should go to channel1 and channel3, if it’s “vendor” then it should go to channel2, otherwise channel3. The mapping can be set in the agent’s configuration file.

    2.4 Flume环境部署

    前置条件

    • Java Runtime Environment - Java 1.8 or later
    • Memory - Sufficient memory for configurations used by sources, channels or sinks
    • Disk Space - Sufficient disk space for configurations used by channels or sinks
    • Directory Permissions - Read/Write permissions for directories used by agent

    安装JDK

    • 下载JDK包
    • 解压JDK包
    tar -zxvf jdk-8u162-linux-x64.tar.gz  [install dir]
    * 配置JAVA环境变量:
    修改系统配置文件 /etc/profile  或者  ~/.bash_profile
    export JAVA_HOME=[jdk install dir]
    export PATH = $JAVA_HOME/bin:$PATH
    执行指令 
    source /etc/profile  或者 
    source ~/.bash_profile 
    使得配置生效。
    执行指令 
    java -version 
    检测环境配置是否生效。

    安装Flume

    • 下载Flume包
    wget http://www.apache.org/dist/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
    • 解压Flume包
    tar -zxvf apache-flume-1.7.0-bin.tar.gz -C [install dir]
    • 配置Flume环境变量
    vim /etc/profile  或者
    vim ~/.bash_profile
    export FLUME_HOME=[flume install dir]
    export PATH = $FLUME_HOME/bin:$PATH
    执行指令 
    source /etc/profile  或者 
    source ~/.bash_profile 
    使得配置生效。
    • 修改flume-env.sh脚本文件
    export JAVA_HOME=[jdk install dir]
    执行指令
    flume-ng version
    检测安装情况

    2.5 Flume实战

    • 需求1:从指定的网络端口采集数据输出到控制台

    使用Flume的关键就是写配置文件

    1. 配置source
    2. 配置Channel
    3. 配置Sink
    4. 把以上三个组件链接起来

    a1: agent名称
    r1: source的名称
    k1: sink的名称
    c1: channel的名称

    单一节点 Flume 配置

    # example.conf: A single-node Flume configuration
    
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    启动Flume agent

    flume-ng agent \
    --name a1 \
    --conf  $FLUME_HOME/conf    \
    --conf-file  $FLUME_HOME/conf/example.conf \
    -Dflume.root.logger=INFO,console
    

    使用telnet或者nc进行测试

    telnet [hostname]  [port]     或者
    nc [hostname]  [port]

    Event = 可选的headers + byte array

    Event: { headers:{} body: 74 68 69 73 20 69 73 20 61 20 74 65 73 74 20 70 this is a test p }
    • 需求2:监控一个文件实时采集新增的数据输出到控制台
      技术(Agent)选型:exec source + memory channel + logger sink
    # example.conf: A single-node Flume configuration
    
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -f  /root/data/data.log
    a1.sources.r1.shell = /bin/bash -c
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    启动Flume agent

    flume-ng agent \
    --name a1 \
    --conf  $FLUME_HOME/conf    \
    --conf-file  $FLUME_HOME/conf/example.conf \
    -Dflume.root.logger=INFO,console
    

    修改data.log文件,监测是否数据是否输出到控制台

    echo hello >> data.log
    echo world >> data.log
    echo welcome >> data.log

    控制台输出

    2018-09-02 03:55:00,672 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F                                  hello }
    2018-09-02 03:55:06,748 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 77 6F 72 6C 64                                  world }
    2018-09-02 03:55:22,280 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 77 65 6C 63 6F 6D 65                            welcome }

    至此,需求2成功实现。

    • 需求3(*):将A服务器上的日志实时采集到B服务器上(重点掌握)
      技术(Agent)选型:

    exec source + memory channel + avro sink
    avro source + memory channel + logger sink
    image.png

    # exec-memory-avro.conf: A single-node Flume configuration
    
    # Name the components on this agent
    exec-memory-avro.sources = exec-source
    exec-memory-avro.sinks = avro-sink
    exec-memory-avro.channels = memory-channel
    
    # Describe/configure the source
    exec-memory-avro.sources.exec-source.type = exec
    exec-memory-avro.sources.exec-source.command = tail -f  /root/data/data.log
    exec-memory-avro.sources.exec-source.shell = /bin/bash -c
    
    # Describe the sink
    exec-memory-avro.sinks.avro-sink.type = avro
    exec-memory-avro.sinks.avro-sink.hostname = c7-master
    exec-memory-avro.sinks.avro-sink.port = 44444
    
    # Use a channel which buffers events in memory
    exec-memory-avro.channels.memory-channel.type = memory
    exec-memory-avro.channels.memory-channel.capacity = 1000
    exec-memory-avro.channels.memory-channel.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    exec-memory-avro.sources.exec-source.channels = memory-channel
    exec-memory-avro.sinks.avro-sink.channel = memory-channel
    # avro-memory-logger.conf: A single-node Flume configuration
    
    # Name the components on this agent
    avro-memory-logger.sources = avro-source
    avro-memory-logger.sinks = logger-sink
    avro-memory-logger.channels = memory-channel
    
    # Describe/configure the source
    avro-memory-logger.sources.avro-source.type = avro
    avro-memory-logger.sources.avro-source.bind = c7-master
    avro-memory-logger.sources.avro-source.port = 44444
    
    # Describe the sink
    avro-memory-logger.sinks.logger-sink.type = logger
    
    # Use a channel which buffers events in memory
    avro-memory-logger.channels.memory-channel.type = memory
    avro-memory-logger.channels.memory-channel.capacity = 1000
    avro-memory-logger.channels.memory-channel.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    avro-memory-logger.sources.avro-source.channels = memory-channel
    avro-memory-logger.sinks.logger-sink.channel = memory-channel

    优先启动 avro-memory-logger agent

    flume-ng agent \
    --name avro-memory-logger \
    --conf  $FLUME_HOME/conf    \
    --conf-file  $FLUME_HOME/conf/avro-memory-logger.conf \
    -Dflume.root.logger=INFO,console
    

    再启动 exec-memory-avro agent

    flume-ng agent \
    --name exec-memory-avro \
    --conf  $FLUME_HOME/conf    \
    --conf-file  $FLUME_HOME/conf/exec-memory-avro.conf \
    -Dflume.root.logger=INFO,console
    

    日志收集过程:
    1)机器A上监控一个文件,当我们访问主站时会有用户行为日志记录到access.log中
    2)avro sink把新产生的日志输出到对应的avro source指定的hostname:port主机上。
    3)通过avro source对应的agent将我们的日志输出到控制台。

    展开全文
  •  WebServer/ApplicationServer分散在各个机器上,然而我们依旧想在Hadoop平台上进行统计分析,如何将日志收集到Hadoop平台呢?  简单的这样吗?  shell cp hadoop集群的机器上;  hadoop fs -put ... /  显然...
  • flume 是 Cloudera 提供的一个高可用的、高可靠的、分布式的海量日志采集、聚合和传输的系统,flume 支持在日志系统中定制各类数据发送方,用于收集数据;同时,flume 提供对数据进行简单处理,并写到各种数据接收方...
  • 实战一:从指定网络端口采集数据输出到控制台flume框架架构Source:指定数据源,有NetCat TCP(项目用到),kafka,JMS,Avro,Syslog等等Channel:数据管道,有Kafka,Memory,File等等Sink:日志数据存放,有Avro...
  • 业务现状分析我们有很多servers和systems,...可以把源系统的日志数据移到分布式的存储和计算框架上处理,如何解决?shell cp hadoop集群的机器上,hadoop fs -put …,有一系列问题,容错、负载均衡、高延时、压缩...
  • 文章目录Flume概述Flume架构及核心组件Flume&amp;amp;amp;amp;JDK环境部署Flume实战案例一Flume实战案例二Flume实战案例三(重点掌握) 业务现状:公司有Hadoop集群;...日志如何收集到Hadoop平...
  •  Flume是由Cloudera提供的一个分布式、高可靠、高可用的服务,用于分布式的海量日志的高效收集、集合、移动系统  Flume设计目标:  可靠性  扩张性  管理性  Flume同类产品的对比: (红色标注...
  • Apache Flume是一个分布式,可靠且可用的系统,用于有效地从许多不同的源收集,聚合和移动大量日志数据到集中式数据存储。 Apache Flume的使用不仅限于日志数据聚合。由于数据源是可定制的,因此Flume可用于传输大量...
  • 至此Flume环境安装成功,因为这类的日志收集工具是严格按照开发者配置的信息来执行任务的,所以接下来分析配置文件。 这是一份官方文档给出的配置文件,我们来分析下: # example.conf: A single-node Flume ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 487
精华内容 194
关键字:

分布式日志收集框架