flume 订阅
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。当前Flume有两个版本Flume 0.9X版本的统称Flume-og,Flume1.X版本的统称Flume-ng。由于Flume-ng经过重大重构,与Flume-og有很大不同,使用时请注意区分。 展开全文
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。当前Flume有两个版本Flume 0.9X版本的统称Flume-og,Flume1.X版本的统称Flume-ng。由于Flume-ng经过重大重构,与Flume-og有很大不同,使用时请注意区分。
信息
外文名
flume
特    点
聚合和传输的系统
中文名
水槽
实    质
孵化项目
flume日志收集
Flume最早是Cloudera提供的日志收集系统,是Apache下的一个孵化项目,Flume支持在日志系统中定制各类数据发送方,用于收集数据。
收起全文
精华内容
下载资源
问答
  • Flume 千次阅读
    2022-03-30 17:57:29

    简介

    概述

    1. Flume是Cloudera公司开发的后来贡献给了Apache的一套用于进行日志的收集(collecting)、汇聚(aggregating)和传输(movlng)的机制
    2. 在大数据中,有超过70%的数据来源是日志 - 日志是大数据的基石
    3. Flume版本:
      1. Flume0.X:Flume-og,配置结构复杂,并且对分布式支持不好
      2. Flume1.X:Flume-ng,配置结构进行简化,并且很好的支持分布式

    基本概念

    1. Event:事件

      1. Flume会将收集到的日志封装成Event对象,一个Event对象就是一条日志

      2. Event本质上就说一个Json串,固定包含两个部分headers和body

        {
            "headers":{},
         	"body":""
        }
        
    2. Agent:是Flume流动结构的基本组成,固定的包含至少3个组件

      1. Source:从数据源采集数据 - conllecting
      2. Channel:临时存储数据 - aggregating
      3. Sink:将数据写到目的地 - moving

    流动模型

    1. 单级流动
      单级流动

    2. 多级流动
      多级流动

    3. 扇入流动
      扇入流动

    4. 扇出流动

    扇出流动

    1. 复杂流动:需要根据需求将上述流动进行组合,构成了复杂流动

    安装和入门

    1. 进入安装目录下,下载Flume的安装包

    2. 解压

      tar -vxf apache-flume-1.9.0-bin.tar.gz
      
    3. 重命名

      mv apache-flume-1.9.0-bin.tar.gz flume-1.9.0
      
    4. 添加环境变量

      vim /etc/profile.d/flumehome.sh
      #文件中添加
      export FLUME_HOME=/home/software/flume-1.9.0
      export PATH=$FLUME_HOME$/bin
      #保存退出,重新生效
      source /etc/profile.d/flumehome.sh
      
    5. 校验环境变量

      flume-ng version
      
    6. 进去Flume安装目录的lib目录中,解决连接冲突问题

      cd /home/software/flume-1.9.0/lib/
      rm -rf guava—11.0.2.jar 
      cp /home/software/hadoop-3.1.3/share/hadoop/common/lib/guava-27.0-jre.jar  ./
      
    7. 构建数据目录

      cd /home/software/flume-1.9.0
      mkdir data
      
    8. 入门:编写格式文件

      vim basic.properties
      

      在文件中添加

      # 给Agent起名
      # 给Source起名
      a1.sources = s1
      # 给Channels起名
      a1.channels = c1
      # 给Sink起名
      a1.sinks = k1
      
      
      # 配置Source
      a1.sources.s1.type = netcat
      a1.sources.s1.bind = 0.0.0.0
      a1.sources.s1.port = 8090
      
      
      # 配置Channel
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 10000
      a1.channels.c1.transactionCapacity = 100
      
      
      # 配置Sink
      a1.sinks.k1.type = logger
      
      
      # 将Source和Channel进行绑定
      a1.sources.s1.channels = c1
      # 将Sink和Channel进行绑定
      a1.sinks.k1.channel = c1
      

    Source

    AVRO Source

    1. AVRO Source:采集被AVRO序列化之后的数据,然后将数据反序列化之后进行存储和传输。可以结合AVRO Sink来实现多级、扇入和扇出流动

    2. 示例

      1. 格式文件

        a1.sources  = s1
        a1.channels = c1
        a1.sinks = k1
        
        # 配置AVRO Source
        # 类型只能是AVRO
        a1.sources.s1.type = avro
        # 监听的主机名/IP
        a1.sources.s1.bind = hadoop01
        # 绑定的端口号
        a1.sources.s1.port = 8090
        
        a1.channels.c1.type = memory
        
        a1.sinks.k1.type = logger
        
        a1.sources.s1.channels = c1
        a1.sinks.k1.channel = c1
        
      2. 启动Flume

        flume-ng agent -n a1 -c $FLUME_HOME/conf -f avrosource.properties -Dflume.root.logger=INFO,console
        
      3. 启动Flume的AVRO客户端来发送数据

        flume-ng avro-client -H hadoop01 -p 8090 -F a.txt -Dflume.root.logger=INFO,console
        

    Exec Source

    1. Exec Source:监听指定的Linux命令,然后将命令的执行结果作为日志进行收集

    2. 案例:监听指定的文件,获取这个文件中新添的数据

      1. 格式

        a1.sources = s1
        a1.channels = c1
        a1.sinks = k1
        
        # 配置Exec Source
        # 类型必须是exec
        a1.sources.s1.type = exec
        # 监听的命令
        a1.sources.s1.command = tail -F /home/software/flume-1.9.0/data/a.txt
        # 命令类型
        a1.sources.s1.shell = /bin/sh -c
        
        a1.channels.c1.type = memory
        
        a1.sinks.k1.type = logger
        
        a1.sources.s1.channels = c1
        a1.sinks.k1.channel = c1
        
      2. 启动命令

        flume-ng agent -n a1 -c $FLUME_HOME/conf -f execsource.properties -Dflume.root.logger=INFO,console
        

    Spooling Diretory Source

    1. Spooling Directory Source:监听指定的目录,如果目录中产生了新的文件,那么会自动收集新文件中的内容

    2. 如果不指定,那么被收集完的文件会自动添加一个后缀.COMPLETED

    3. 示例

      1. 格式文件

        a1.sources = s1
        a1.channels = c1
        a1.sinks = k1
        
        # 配置Spooling Directory Source
        # 类型必须是spooldir
        a1.sources.s1.type = spooldir
        # 监听的目录
        a1.sources.s1.spoolDir = /home/flumedata
        
        a1.channels.c1.type = memory
        
        a1.sinks.k1.type = logger
        
        a1.sources.s1.channels = c1
        a1.sinks.k1.channel = c1
        
      2. 启动命令

        flume-ng agent -n a1 -c $FLUME_HOME/conf -f spooldirsource.properties -Dflume.root.logger=INFO,console
        

    Sequence Generator Source

    1. Sequence Generator Source:序列产生器。从0开始不断递增,递增到指定大小。如果不指定,那么会递增到Long.MAX_VALUE

    2. 示例

      1. 格式文件

        a1.sources = s1
        a1.channels = c1
        a1.sinks = k1
        
        # 配置Sequence Generator Source
        # 类型必须是seq
        a1.sources.s1.type = seq
        # 结束值
        a1.sources.s1.totalEvents = 200
        
        a1.channels.c1.type = memory
        
        a1.sinks.k1.type = logger
        
        a1.sources.s1.channels = c1
        a1.sinks.k1.channel = c1
        
      2. 启动命令

        flume-ng agent -n a1 -c $FLUME_HOME/conf -f seqsource.properties -Dflume.root.logger=INFO,console
        

    HTTP Source

    1. HTTP Source:监听HTTP请求,但是只能监听GET和POST请求。其中GET请求只能用于实验,因此只用这个Source来监听POST请求

    2. 示例

      1. 格式文件

        a1.sources = s1
        a1.channels = c1
        a1.sinks = k1
        
        # 配置HTTP Source
        # 类型必须是http
        a1.sources.s1.type = http
        # 监听的端口号
        a1.sources.s1.port = 8090
        
        a1.channels.c1.type = memory
        
        a1.sinks.k1.type = logger
        
        a1.sources.s1.channels = c1
        a1.sinks.k1.channel = c1
        
      2. 启动命令

        flume-ng agent -n a1 -c $FLUME_HOME/conf -f httpsource.properties -Dflume.root.logger=INFO,console
        
      3. 发送POST请求

        curl -X POST -d '[{"headers":{"class":"bigdata"},"body":"testing"}]' http://hadoop01:8090
        

    Custom Source

    1. Custom Source:自定义Source,需要定义一个类实现Source接口的子接口之一:EventDrivenSource或者PollableSource

      1. EventDrivenSource:事件驱动Source - 被动型Source。在产生数据之后,才需要定义线程获取数据
      2. PollableSource:拉去Source - 主动型Source。无论是否有数据,都有自己提供线程去查询是否有数据
    2. 自定义Source

      1. 导入POM文件

        <?xml version="1.0" encoding="UTF-8"?>
        <project xmlns="http://maven.apache.org/POM/4.0.0"
                 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
            <modelVersion>4.0.0</modelVersion>
        
            <groupId>djh.lb</groupId>
            <artifactId>flume</artifactId>
            <version>1.0-SNAPSHOT</version>
        
            <properties>
                <maven.compiler.source>8</maven.compiler.source>
                <maven.compiler.target>8</maven.compiler.target>
            </properties>
        
            <dependencies>
                <dependency>
                    <groupId>org.apache.flume</groupId>
                    <artifactId>flume-ng-core</artifactId>
                    <version>1.9.0</version>
                </dependency>
                <dependency>
                    <groupId>org.apache.flume</groupId>
                    <artifactId>flume-ng-sdk</artifactId>
                    <version>1.9.0</version>
                </dependency>
                <dependency>
                    <groupId>org.apache.flume</groupId>
                    <artifactId>flume-ng-configuration</artifactId>
                    <version>1.9.0</version>
                </dependency>
            </dependencies>
        
        </project>
        
        1. 代码

          package djh.lb.flume.auth.source;
          
          import org.apache.flume.Context;
          import org.apache.flume.Event;
          import org.apache.flume.EventDrivenSource;
          import org.apache.flume.channel.ChannelProcessor;
          import org.apache.flume.conf.Configurable;
          import org.apache.flume.event.EventBuilder;
          import org.apache.flume.source.AbstractSource;
          
          import java.util.HashMap;
          import java.util.Map;
          import java.util.concurrent.ExecutorService;
          import java.util.concurrent.Executors;
          
          // 模拟:Sequence Generator Source
          public class AuthSource extends AbstractSource implements EventDrivenSource, Configurable {
          
              private long step; // 步长
              private long end; // 结束值
              private ExecutorService es; // 线程池
          
              // 获取指定的属性值
              @Override
              public void configure(Context context) {
                  // 如果没有指定步长,那么默认步长为1
                  step = context.getLong("step", 1L);
                  // 如果没有指定结束值,那么默认结束值为Long.MAX_VALUE
                  end = context.getLong("end", Long.MAX_VALUE);
              }
          
              // 启动Source
              @Override
              public synchronized void start() {
                  // 初始化线程池
                  es = Executors.newFixedThreadPool(5);
                  // 获取ChannelProcessor
                  ChannelProcessor cp = this.getChannelProcessor();
                  // 提交任务
                  es.submit(new Add(step, end, cp));
              }
          
              // 结束Source
              @Override
              public synchronized void stop() {
                  if (es != null) es.shutdown();
              }
          
          }
          
          class Add implements Runnable {
          
              private final long step;
              private final long end;
              private final ChannelProcessor cp;
          
              public Add(long step, long end, ChannelProcessor cp) {
                  this.step = step;
                  this.end = end;
                  this.cp = cp;
              }
          
              @Override
              public void run() {
                  for (long i = 0; i < end; i += step) {
                      // 将每一次递增的数据作为日志进行收集
                      // 构建headers
                      Map<String, String> headers = new HashMap<>();
                      headers.put("time", System.currentTimeMillis() + "");
                      // 构建body部分
                      byte[] body = (i + "").getBytes();
                      // Flume会将收集到的数据封装成Event对象
                      Event e = EventBuilder.withBody(body, headers);
                      // 需要将数据传递给Channel
                      cp.processEvent(e);
                  }
              }
          }
          
        2. 生成jar包,放到Flume安装目录的子目录lib下

        3. 编写格式文件

          a1.sources = s1
          a1.channels = c1
          a1.sinks = k1
          
          # 配置Custom Source
          # 类型必须是类的全路径名
          a1.sources.s1.type = djh.lb.flume.auth.source.AuthSource
          # 递增步长
          a1.sources.s1.step = 5
          # 结束值
          a1.sources.s1.end = 100
          
          a1.channels.c1.type = memory
          
          a1.sinks.k1.type = logger
          
          a1.sources.s1.channels = c1
          a1.sinks.k1.channel = c1
          
        4. 启动命令

          flume-ng agent -n a1 -c $FLUME_HOME/conf -f authsource.properties -Dflume.root.logger=INFO,console
          

    Channel

    Memory Channel

    1. Memory Channel:将收集到的数据临时存储到内存的队列中。在存储的时候,如果队列已满,那么后续的数据会被阻塞
    2. 如果不指定,那么队列的容量默认为100,即意味着在队列中可以存储100条数据。可以通过capacity这个属性来修改。实际过程中,一般会将这个属性的值改成10w-30w,也可以考虑到50w
    3. transactionCapacity:决定了Source每一批给Channel的数据量,也决定了Channel每一批给Sink的数据量。如果不指定,这个属性的默认值为100
    4. Memory Channel适合于要求速度而不要求可靠性的场景

    File Channel

    1. File Channel:将收集到的数据临时存储到磁盘的文件中。在存储的时候,如果不指定,会将数据临时存储到~/.flume/file-channel.data

    2. checkpoinDir:记录检查点的目录位置。检查点就说Sink获取的数据的位置,默认是~/.flume/file-channel/checkpoint

    3. File Channel适用于要求可靠性而不要求速度的场景

    4. 示例

      1. 格式文件
      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1
      
      a1.sources.s1.type = netcat
      a1.sources.s1.bind = 0.0.0.0
      a1.sources.s1.port = 8090
      
      # 配置File Channel
      # 类型必须是file
      a1.channels.c1.type = file
      # 数据临时存储路径
      a1.channels.c1.dataDirs = /home/flumedata/data
      # 检查点临时存储路径
      a1.channels.c1.checkpointDir = /home/flumedata/checkpoint
      
      a1.sinks.k1.type = logger
      
      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1
      
      1. 启动命令

        flume-ng agent -n a1 -c $FLUME_HOME/conf -f filechannel.properties -Dflume.root.logger=INFO,console
        

    其他Channel

    1. JDBC Channel:将收集到的数据临时存储到数据库中。目前为止,JDBC Channel只支持Derby数据库,由于Derby数据库的特性(微型、单连接),导致实际开发中不使用这个Channel
    2. Spillable Memory Channel:先考虑将收集到的数据临时存储到内存中,如果内存队列被塞满,那么不会阻塞而是会将数据临时存储到磁盘上。

    Sink

    HDFS Sink

    1. HDFS Sink:将数据最终写到HDFS上。在写出数据的时候,每隔30s滚动一次,在HDFS上生成一个文件,这样会在HDFS上生成大量的小文件。因此实际过程中需要将滚动时间来进行修改,可以通过属性hdfs.rollInterval来修改

    2. hdfs.rollCount:写入文件中的数据条数。默认数量为10,如果达到这个条数,那么这个文件也会产生滚动,也会导致产生大量的小文件。如果这个属性设置为0,那么不会根据数据条数进行滚动

    3. hdfs.rollSize:滚动时候的文件大小。默认是1024B,也就意味着在HDFS上,文件达到1KB大小就会产生滚动。如果这个属性设置为0,则表示不会根据文件大小来进行滚动。

    4. hdfs.fileType:HDFS上文件的存储类型,包含:SequenceFile(序列文件),DataStream(文本文件), CompressedStream(压缩文件)。默认值是SequenceFile。如果指定为CompressedStream,那么还需要指定hdfs.codeC属性的值

    5. 示例

      1. 格式文件

        a1.sources = s1
        a1.channels = c1
        a1.sinks = k1
        
        a1.sources.s1.type = netcat
        a1.sources.s1.bind = hadoop01
        a1.sources.s1.port = 8090
        
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 10000
        a1.channels.c1.transactionCapacity = 100
        
        # 配置HDFS Sink
        # 类型必须是hdfs
        a1.sinks.k1.type = hdfs
        # 文件在HDFS上的存储路径
        a1.sinks.k1.hdfs.path = hdfs://hadoop01:9000/flumedata
        # 滚动间隔时间
        a1.sinks.k1.hdfs.rollInterval = 3600
        # 文件的滚动大小
        a1.sinks.k1.hdfs.rollSize = 134217728
        # 数据条数
        a1.sinks.k1.hdfs.rollCount = 1000000
        # 文件类型
        a1.sinks.k1.hdfs.fileType = DataStream
        
        a1.sources.s1.channels = c1
        a1.sinks.k1.channel = c1
        
      2. 启动HDFS

        start-dfs.sh
        
      3. 启动Flume

        flume-ng agent -n a1 -c $FLUME_HOME/conf -f hdfssink.properties -Dflume.root.logger=INFO,console
        

    Logger Sink

    1. Logger Sink:将收集到的数据打印到控制台上。在打印的时候,对中文支持不好。还需要注意的是,Logger Sink为了防止过多的数据占满屏幕,还进行了限制:如果body部分的字节超过16位,超过部分不打印,可以通过属性maxBytesToLog来修改

    File Roll Sink

    1. File Roll Sink:将收集到的数据最终写到本地磁盘上。在写出的时候,每隔30s滚动一次生成一个小文件

    2. 示例

      1. 格式文件:

        a1.sources = s1
        a1.channels = c1
        a1.sinks = k1
        
        
        # 配置Source
        a1.sources.s1.type = netcat
        a1.sources.s1.bind = 0.0.0.0
        a1.sources.s1.port = 8090
        
        
        # 配置Channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 10000
        a1.channels.c1.transactionCapacity = 100
        
        
        # 配置File Roll Sink
        # 类型必须是file_roll
        a1.sinks.k1.type = file_roll
        # 文件存储位置
        a1.sinks.k1.sink.directory = /home/flumedata
        # 文件滚动间隔时间
        a1.sinks.k1.sink.rollInterval = 3600
        
        # 将Source和Channel进行绑定
        a1.sources.s1.channels = c1
        # 将Sink和Channel进行绑定
        a1.sinks.k1.channel = c1
        
      2. 启动命令

        flume-ng agent -n a1 -c $FLUME_HOME/conf -f filerollsink.properties -Dflume.root.logger=INFO,console
        

    AVRO Sink

    1. AVRO Sink:将数据序列化之后写出,结合AVRO Source来实现多级扇入、扇出流动模型

    2. 多级流动

      1. 第一个节点

        a1.sources = s1
        a1.channels = c1
        a1.sinks = k1
        
        a1.sources.s1.type = netcat
        a1.sources.s1.bind = hadoop01
        a1.sources.s1.port = 8090
        
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 10000
        a1.channels.c1.transactionCapacity = 100
        
        
        a1.sinks.k1.type = avro
        a1.sinks.k1.hostname = hadoop02
        a1.sinks.k1.port = 8090
        
        a1.sources.s1.channels = c1
        a1.sinks.k1.channel = c1
        
      2. 第二个节点

        a1.sources = s1
        a1.channels = c1
        a1.sinks = k1
        
        a1.sources.s1.type = avro
        a1.sources.s1.bind = hadoop02
        a1.sources.s1.port = 8090
        
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 10000
        a1.channels.c1.transactionCapacity = 100
        
        
        a1.sinks.k1.type = avro
        a1.sinks.k1.hostname = hadoop03
        a1.sinks.k1.port = 8090
        
        a1.sources.s1.channels = c1
        a1.sinks.k1.channel = c1
        
      3. 第三个节点

        a1.sources = s1
        a1.channels = c1
        a1.sinks = k1
        
        a1.sources.s1.type = avro
        a1.sources.s1.bind = hadoop03
        a1.sources.s1.port = 8090
        
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 10000
        a1.channels.c1.transactionCapacity = 100
        
        
        a1.sinks.k1.type = logger
        
        a1.sources.s1.channels = c1
        a1.sinks.k1.channel = c1
        
      4. 按顺序启动hadoop03、hadoop02、hadoop01

        flume-ng agent -n a1 -c $FLUME_HOME/conf -f duoji.properties -Dflume.root.logger=INFO,console
        
    3. 扇入流动

      1. 第一个和第二个节点

        a1.sources = s1
        a1.channels = c1
        a1.sinks = k1
        
        a1.sources.s1.type = netcat
        a1.sources.s1.bind = 0.0.0.0
        a1.sources.s1.port = 8090
        
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 10000
        a1.channels.c1.transactionCapacity = 100
        
        
        a1.sinks.k1.type = avro
        a1.sinks.k1.hostname = hadoop03
        a1.sinks.k1.port = 8090
        
        a1.sources.s1.channels = c1
        a1.sinks.k1.channel = c1
        
      2. 第三个节点

        a1.sources = s1
        a1.channels = c1
        a1.sinks = k1
        
        a1.sources.s1.type = avro
        a1.sources.s1.bind = 0.0.0.0
        a1.sources.s1.port = 8090
        
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 10000
        a1.channels.c1.transactionCapacity = 100
        
        
        a1.sinks.k1.type = logger
        
        a1.sources.s1.channels = c1
        a1.sinks.k1.channel = c1
        
      3. 先启动hadoop01或者hadoop02,最后启动hadoop03

        flume-ng agent -n a1 -c $FLUME_HOME/conf -f shanru.properties -Dflume.root.logger=INFO,console
        
    4. 多级扇出

      1. 第一个节点

        a1.sources = s1
        a1.channels = c1 c2
        a1.sinks = k1 k2
        
        a1.sources.s1.type = netcat
        a1.sources.s1.bind = 0.0.0.0
        a1.sources.s1.port = 8090
        
        
        a1.channels.c1.type = memory
        
        a1.channels.c2.type = memory
        
        
        a1.sinks.k1.type = avro
        a1.sinks.k1.hostname = hadoop02
        a1.sinks.k1.port = 8090
        
        a1.sinks.k2.type = avro
        a1.sinks.k2.hostname = hadoop03
        a1.sinks.k2.port = 8090
        
        a1.sources.s1.channels = c1 c2
        a1.sinks.k1.channel = c1
        a1.sinks.k2.channel = c2
        
      2. 第二和第三个节点

        a1.sources = s1
        a1.channels = c1
        a1.sinks = k1
        
        a1.sources.s1.type = avro
        a1.sources.s1.bind = 0.0.0.0
        a1.sources.s1.port = 8090
        
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 10000
        a1.channels.c1.transactionCapacity = 100
        
        
        a1.sinks.k1.type = logger
        
        a1.sources.s1.channels = c1
        a1.sinks.k1.channel = c1
        
      3. 先启动hadoop02或者hadoop03,最后启动hadoop01

        flume-ng agent -n a1 -c $FLUME_HOME/conf -f shanchu.properties -Dflume.root.logger=INFO,console
        

    Custom Sink

    1. Custom Sink:Flume允许用户进行自定义Sink,需要定义一个类实现Sink接口

    2. 示例

      1. 代码

        package cn.tedu.flume.auth.sink;
        
        import org.apache.flume.*;
        import org.apache.flume.conf.Configurable;
        import org.apache.flume.sink.AbstractSink;
        
        import java.io.FileNotFoundException;
        import java.io.PrintStream;
        import java.util.Map;
        
        // 模拟:File Roll Sink
        public class AuthSink extends AbstractSink implements Sink, Configurable {
        
            private String path;
            private PrintStream ps;
        
            // 获取属性
            @Override
            public void configure(Context context) {
                // 获取用户指定的路径
                path = context.getString("path");
                // 判断用户是否指定了路径
                if (path == null || path.equals(""))
                    throw new IllegalArgumentException("路径没有指定!!!");
            }
        
            // 启动Sink
            @Override
            public synchronized void start() {
                // 初始化打印流
                try {
                    ps = new PrintStream(path + "/" + System.currentTimeMillis());
                } catch (FileNotFoundException e) {
                    e.printStackTrace();
                }
            }
        
            @Override
            public Status process() {
                // 获取Channel
                Channel c = this.getChannel();
                // 获取事务
                Transaction t = c.getTransaction();
                // 开启事务
                t.begin();
                // 获取数据
                Event e;
                try {
                    while ((e = c.take()) != null) {
                        // 获取headers
                        Map<String, String> headers = e.getHeaders();
                        // 写出headers部分
                        ps.println("headers:");
                        for (Map.Entry<String, String> en : headers.entrySet()) {
                            ps.println("\t" + en.getKey() + "=" + en.getValue());
                        }
                        // 获取body部分
                        byte[] body = e.getBody();
                        // 写出body部分
                        ps.println("body:");
                        ps.println("\t" + new String(body));
                    }
                    // 提交事务
                    t.commit();
                    return Status.READY;
                } catch (Exception ex) {
                    // 回滚事务
                    t.rollback();
                    return Status.BACKOFF;
                } finally {
                    // 关闭事务
                    t.close();
                }
            }
        
            // 关闭Sink
            @Override
            public synchronized void stop() {
                if (ps != null) ps.close();
            }
        }
        
      2. 完成之后,打成jar包,放到Flume安装目录的子目录lib下

      3. 格式文件

        a1.sources = s1
        a1.channels = c1
        a1.sinks = k1
        
        a1.sources.s1.type = http
        a1.sources.s1.port = 8090
        
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 10000
        a1.channels.c1.transactionCapacity = 100
        
        # 配置Custom Sink
        # 类型必须是类的全路径名
        a1.sinks.k1.type = cn.tedu.flume.auth.sink.AuthSink
        # 存储位置
        a1.sinks.k1.path = /home/flumedata
        
        a1.sources.s1.channels = c1
        a1.sinks.k1.channel = c1
        
      4. 启动命令

        flume-ng agent -n a1 -c $FLUME_HOME/conf -f authsink.properties -Dflume.root.logger=INFO,console
        
      5. 发送数据

        curl -X POST -d '[{"headers":{"subject":"bigdata","class":"18计科G"},"body":"Welcome"}]' http://hadoop01:8090
        curl -X POST -d '[{"headers":{"date":"20220317","time":"11:44"},"body":"testing"}]' http://hadoop01:8090
        

    Sink Processor

    1. Sink Processor:Sink处理器,本质上是一个Sink Group(Sink组),在使用的时候,会将一个或者多个Sink绑定到同一个组中,同一个组中的Sink执行相同的功能
    2. Sink Processor提供了3种模式
      1. default:默认模式。在扇出结构中,如果不指定,那么默认使用的就是这个模式。在这个模式下,每一个Sink都对应了一个单独的Sink Group
      2. failover:奔溃恢复模式。在使用的时候,需要将多个Sink绑定到一个组中,需要给一个Sink来指定一个优先级,只要高优先级的Sink存货,数据就不会发送给低优先级的Sink
      3. load balancing:负载均衡模式。在使用的时候,也需要将多个Sink绑定到一个组中,需要给Sink指定均衡模式。Flume中,提供了两种均衡模式:round_robin(轮询),random(随机)

    Interceptor

    概述

    1. Interceptor:是Source的子组件,也就意味着是配置在Source上的
    2. 不同于Selecetor,Interceptor可以配置多个,构成拦截器链

    Timestamp Interceptor

    1. Timestamp Interceptor:会在事件的Event部分来添加一个timestamp字段用于标记数据被收集的时间
    2. Timesamp Interceptor结合HDFS Sink来实现数据的按天存放

    Host Interceptor

    1. Host Interceptor:在事件的headers中添加一个host字段用于标记数据的来源主机

    Static Interceptor

    Static Interceptor:在事件的headers部分添加指定字段,可以用于标记

    UUID Interceptor

    UUID Interceptor:在数据的headers部分来添加一个id字段,用于标记数据的唯一性

    Search and Replace Interceptor

    Search and Replace Interceptor:在使用的时候需要指定一个正则表达式,会将符合正则表达式的数据替换为指定形式

    Regex Filtering Interceptor

    Regex Filtering Interceptor:在使用的时候需要指定正则表达式,根据excludeEvents的属性的值来确定过滤规则。当excludeEvents属性的值为true的时候,符合正则表达式的值会被过滤掉;当excludeEvents属性值为fales的时候,不符合正则表达式的数据会被过滤掉

    Custom Interceptor

    Custom Interceptor:Flume支持用于自定义拦截器,在使用的时候需要定义一个类实现Interceptor接口

    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1
    
    a1.sources.s1.type = netcat
    a1.sources.s1.bind = 0.0.0.0
    a1.sources.s1.port = 8090
    a1.sources.s1.interceptors = i1
    # 自定义Interceptor
    a1.sources.s1.interceptors.i1.type = cn.tedu.flume.auth.source.AuthInterceptor$B
    uilder
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 100
    
    a1.sinks.s1.channels = c1
    
    a1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1
    

    Flume执行流程

    Flume执行流程

    更多相关内容
  • flume

    2021-03-25 17:10:52
    Apache Flume是一个分布式,可靠且可用的系统,用于有效地收集,聚合大量日志数据并将其从许多不同的源移动到集中式数据存储中https://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.5.1/
  • 欢迎使用Apache Flume! Apache Flume是一种分布式,可靠且可用的服务,用于有效地收集,聚合和移动大量日志数据。 它具有基于流数据流的简单灵活的体系结构。 它具有可调整的可靠性机制以及许多故障转移和恢复机制...
  • Flume用法

    2021-01-07 14:17:16
    Flume包含三部分 Source:从哪收集,一般使用:avro(序列化),exec(命令行),spooling(目录),taildir(目录和文件,包含offset,不会数据丢失),kafka Channel:数据存哪里:(memory,kafka,file) Sink:数据输出到...
  • rocketmq-flume Source&Sink 该项目用于与之间的消息接收和投递。 首先请确定您已经对和有了基本的了解 确保本地maven库中已经存在,或者下载RocketMQ源码自行编译 在rocketmq-flume项目根目录执行mvn clean install...
  • Flume使用详解(一)

    2021-01-07 13:16:23
    概述: Flume最早是Cloudera提供的日志收集系统,后贡献给Apache。所以目前是Apache下的项目,...当前Flume有两个版本Flume 0.9X版本的统称Flume-og,老版本的flume需要引入zookeeper集群管理,性能 也较低(单线程工
  • Flume部署和使用

    2021-01-07 08:23:24
    Flume部署和使用 官方文档: http://flume.apache.org/ example: WebServer –> Agent[Source–>Channel–>Sink] –> HDFS 一.简介 Flume是一个分布式,可靠的的框架,它能从许多不同的数据源高效地收集、聚合和移动...
  • 基于新 Kafka Producer 的 Flume kafka sink,高性能且可配置。 它依赖于很少的项目/库,只有 Flume 1.5.2 kafka-clients-0.8.2.1 或更高版本,slf4j。 类似于 Flume 1.6 KafkaSink,但这里有一些不同: Flume 1.6 ...
  • 文章目录一、Flume 定义二、Flume 基础架构 一、Flume 定义 Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume 基于流式架构,灵活简单。 为什么选用 Flume ? 二、...
  • 文章目录Flume优化一、内存参数优化(减少GC)1)-xmx和-xms设置相同值,避免在 GC 后调整堆大小带来的压力。2)JVM heap(堆内存)设置4G或更高二、channel优化Flume如何保证数据安全(高可用)事务机制Flume解决...
  • flume-ng-sql-source 该项目用于与sql数据库进行通信 当前支持SQL数据库引擎 在最后一次更新之后,该代码已与hibernate集成在一起,因此该技术支持的所有数据库均应正常工作。 编译与包装 $ mvn package 部署方式 ...
  • #FlumeConfig###A 可视化 Flume 编辑器## 版本:0.1.0 Flume 配置完全用 Javascript 编写并且是自包含的。 它允许您直观地布置 Flume 拓扑,输入源、通道和接收器的属性,并为您创建水槽配置文件。 它可以处理多个...
  • logback-flume-appender Logback追加程序将日志消息转发到Flume代理 配置条目 application :应用程序名称,如果未设置,则将从application.name系统属性中推断出来 hostname :主机名,如果未设置,则通过Box主机...
  • (2)flume的开源包flume-ng-sql-source-1.4.3.jar 最新的好像是1.5的 小版本记不住了 这个下载地址直接csdn上就有 这两个jar 都拷贝到flume的lib下 (3)flume配置文件 a1.sources = r1 a1.sinks = k1 a1....
  • flume_exporter 普罗米修斯水槽出口商。 要运行它: make build ./flume_exporter [flags] 标志帮助: ./flume_exporter --help 配置:config.yml agents: - name: "flume-agents" enabled: true # ...
  • 水槽时间戳过滤器 ...$ cp /path/to/flume-timestamp-filter/target/flume-timestamp-filtering-interceptor-0.0.jar /usr/lib/flume-ng/plugins.d/flume-timestamp-filter/lib/ 根据Configuration配置flum
  • flume+kafka+flink+mysql实现nginx数据统计与分析
  • flume配置文件案例

    2020-09-27 23:37:47
    flume配置文件,文件配了说明,可以拿下来改一改就用。 可以获取端口数据监听或者文件、文件夹内容监听,实时写入hdfs、mysql或者你需要的路径。
  • 由于flume官方并未提供ftp,source的支持; 因此想使用ftp文件服务器的资源作为数据的来源就需要自定义ftpsource,根据github:https://github.com/keedio/flume-ftp-source,提示下载相关jar,再此作为记录。
  • hadoop集群配置之————flume安装配置(详细版)
  • flume的包。flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、...
  • # example.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type = netcat ...
  • Flume+Kafka+HBase实例

    2021-01-11 12:27:11
    结合电信客服项目,将数据存储模块独立成项目,欢迎下载
  • 文章目录一、Flume 事务二、Flume Agent 内部原理三、Flume 拓扑结构1、简单串联2、复制和多路复用3、负载均衡和故障转移4、聚合 一、Flume 事务 二、Flume Agent 内部原理 重要组件: 1)ChannelSelector Channel...
  • apache旗下组件flume,用于数据的采集,主要包括数据的source、channel以及sink,提供1.7.0稳定版本
  • 要部署它,请在flume类路径中复制flume-influxdb-sink-0.0.2.jar及其依赖项。 一个胖罐子,包括maven在build中的所有依赖项,因此也可以将其复制。 配置 这是示例接收器配置: agent.sinks.influx.type = ...
  • Flume NG SQS 插件 该项目提供了一个源插件,用于从 Amazon 的简单队列服务 ( ) 中提取消息,这是一个快速、可靠、可扩展且完全托管的基于云的消息队列系统。 安装 首先,克隆存储库并构建包(需要 Maven) git ...
  • flume+hdfs所需jar.rar

    2021-03-25 17:39:44
    flume1.9.0+hdfs3.2.2相关jar
  • 注意:flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 68,379
精华内容 27,351
关键字:

flume