精华内容
下载资源
问答
  • Flume拦截器

    2019-11-01 14:22:08
    一、Flume拦截器 时间戳拦截器 Timestamp.conf #1.定义agent名, source、channel、sink的名称 a4.sources = r1 a4.channels = c1 a4.sinks = k1 #2.具体定义source a4.sources.r1.type = spooldir a4.sources....

    一、Flume拦截器

    时间戳拦截器

    Timestamp.conf

    #1.定义agent名, source、channel、sink的名称
    a4.sources = r1
    a4.channels = c1
    a4.sinks = k1
    
    #2.具体定义source
    a4.sources.r1.type = spooldir
    a4.sources.r1.spoolDir = /opt/module/flume-1.8.0/upload
    
    #定义拦截器,为文件最后添加时间戳
    a4.sources.r1.interceptors = i1
    a4.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
    
    #具体定义channel
    a4.channels.c1.type = memory
    a4.channels.c1.capacity = 10000
    a4.channels.c1.transactionCapacity = 100
    
    
    #具体定义sink
    a4.sinks.k1.type = hdfs
    a4.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume-interceptors/%H
    a4.sinks.k1.hdfs.filePrefix = events-
    a4.sinks.k1.hdfs.fileType = DataStream
    
    #不按照条数生成文件
    a4.sinks.k1.hdfs.rollCount = 0
    #HDFS上的文件达到128M时生成一个文件
    a4.sinks.k1.hdfs.rollSize = 134217728
    #HDFS上的文件达到60秒生成一个文件
    a4.sinks.k1.hdfs.rollInterval = 60
    
    #组装source、channel、sink
    a4.sources.r1.channels = c1
    a4.sinks.k1.channel = c1
    

    启动命令

    /opt/module/flume-1.8.0/bin/flume-ng agent -n a4 \
    -f /opt/module/flume-1.8.0/jobconf/flume-interceptors.conf \
    -c /opt/module/flume-1.8.0/conf \
    -Dflume.root.logger=INFO,console
    

    主机名拦截器

    Host.conf

    #1.定义agent
    a1.sources= r1
    a1.sinks = k1
    a1.channels = c1
    
    #2.定义source
    a1.sources.r1.type = exec
    a1.sources.r1.channels = c1
    a1.sources.r1.command = tail -F /opt/Andy
    #拦截器
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = host
    
    #参数为true时用IP192.168.1.111,参数为false时用主机名,默认为true
    a1.sources.r1.interceptors.i1.useIP = false
    a1.sources.r1.interceptors.i1.hostHeader = agentHost
    
     #3.定义sinks
    a1.sinks.k1.type=hdfs
    a1.sinks.k1.channel = c1
    a1.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flumehost/%H
    a1.sinks.k1.hdfs.filePrefix = Andy_%{agentHost}
    #往生成的文件加后缀名.log
    a1.sinks.k1.hdfs.fileSuffix = .log
    a1.sinks.k1.hdfs.fileType = DataStream
    a1.sinks.k1.hdfs.writeFormat = Text
    a1.sinks.k1.hdfs.rollInterval = 10
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
     
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
     
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

    启动命令:

    bin/flume-ng agent -c conf/ -f jobconf/host.conf -n a1 -Dflume.root.logger=INFO,console
    

    UUID拦截器

    uuid.conf

    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    a1.sources.r1.type = exec
    a1.sources.r1.channels = c1
    a1.sources.r1.command = tail -F /opt/Andy
    a1.sources.r1.interceptors = i1
    #type的参数不能写成uuid,得写具体,否则找不到类
    a1.sources.r1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
    #如果UUID头已经存在,它应该保存
    a1.sources.r1.interceptors.i1.preserveExisting = true
    a1.sources.r1.interceptors.i1.prefix = UUID_
    
    #如果sink类型改为HDFS,那么在HDFS的文本中没有headers的信息数据
    a1.sinks.k1.type = logger
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    # bin/flume-ng agent -c conf/ -f jobconf/uuid.conf -n a1 -Dflume.root.logger==INFO,console
    

    查询替换拦截器

    search.conf

    #1 agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    #2 source
    a1.sources.r1.type = exec
    a1.sources.r1.channels = c1
    a1.sources.r1.command = tail -F /opt/Andy
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = search_replace
    
    #遇到数字改成AncientMing,A123会替换为AAncientMing
    a1.sources.r1.interceptors.i1.searchPattern = [0-9]+
    a1.sources.r1.interceptors.i1.replaceString = AncientMing
    a1.sources.r1.interceptors.i1.charset = UTF-8
    
    #3 sink
    a1.sinks.k1.type = logger
    
    #4 Chanel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    #5 bind
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    # bin/flume-ng agent -c conf/ -f jobconf/search.conf -n a1 -Dflume.root.logger=INFO,console
    

    正则过滤拦截器

    filter.conf

    #1 agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    #2 source
    a1.sources.r1.type = exec
    a1.sources.r1.channels = c1
    a1.sources.r1.command = tail -F /opt/Andy
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = regex_filter
    a1.sources.r1.interceptors.i1.regex = ^A.*
    #如果excludeEvents设为false,表示过滤掉不是以A开头的events。如果excludeEvents设为true,则表示过滤掉以A开头的events。
    a1.sources.r1.interceptors.i1.excludeEvents = true
    
    a1.sinks.k1.type = logger
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    # bin/flume-ng agent -c conf/ -f jobconf/filter.conf -n a1 -Dflume.root.logger=INFO,console
    

    正则抽取拦截器

    extractor.conf

    #1 agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    #2 source
    a1.sources.r1.type = exec
    a1.sources.r1.channels = c1
    a1.sources.r1.command = tail -F /opt/Andy
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = regex_extractor
    a1.sources.r1.interceptors.i1.regex = hostname is (.*?) ip is (.*)
    a1.sources.r1.interceptors.i1.serializers = s1 s2
    a1.sources.r1.interceptors.i1.serializers.s1.name = cookieid
    a1.sources.r1.interceptors.i1.serializers.s2.name = ip
    
    a1.sinks.k1.type = logger
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    # bin/flume-ng agent -c conf/ -f jobconf/extractor.conf -n a1 -Dflume.root.logger=INFO,console
    

    注:正则抽取拦截器的headers不会出现在文件名和文件内容中

    二、Flume自定义拦截器

    字母小写变大写

    1.Pom.xml

       <dependencies>
            <!-- flume核心依赖 -->
            <dependency>
                <groupId>org.apache.flume</groupId>
                <artifactId>flume-ng-core</artifactId>
                <version>1.8.0</version>
            </dependency>
        </dependencies>
        <build>
            <plugins>
                <!-- 打包插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-jar-plugin</artifactId>
                    <version>2.4</version>
                    <configuration>
                        <archive>
                            <manifest>
                                <addClasspath>true</addClasspath>
                                <classpathPrefix>lib/</classpathPrefix>
                                <mainClass></mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                </plugin>
                <!-- 编译插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <encoding>utf-8</encoding>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    

    2.自定义实现拦截器

    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
     
    import java.util.ArrayList;
    import java.util.List;
     
    public class MyInterceptor implements Interceptor {
        @Override
        public void initialize() {
        }
     
        @Override
        public void close() {
        }
     
        /**
         * 拦截source发送到通道channel中的消息
         *
         * @param event 接收过滤的event
         * @return event    根据业务处理后的event
         */
        @Override
        public Event intercept(Event event) {
            // 获取事件对象中的字节数据
            byte[] arr = event.getBody();
            // 将获取的数据转换成大写
            event.setBody(new String(arr).toUpperCase().getBytes());
            // 返回到消息中
            return event;
        }
        // 接收被过滤事件集合
        @Override
        public List<Event> intercept(List<Event> events) {
            List<Event> list = new ArrayList<>();
            for (Event event : events) {
                list.add(intercept(event));
            }
            return list;
        }
     
        public static class Builder implements Interceptor.Builder {
            // 获取配置文件的属性
            @Override
            public Interceptor build() {
                return new MyInterceptor();
            }
     
            @Override
            public void configure(Context context) {
     
            }
        }
    

    使用Maven做成Jar包,在flume的目录下mkdir jar,上传此jar到jar目录中

    Flume配置文件

    ToUpCase.conf

    #1.agent 
    a1.sources = r1
    a1.sinks =k1
    a1.channels = c1
     
     
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /opt/Andy
    a1.sources.r1.interceptors = i1
    #全类名$Builder
    a1.sources.r1.interceptors.i1.type = ToUpCase.MyInterceptor$Builder
     
    # Describe the sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /ToUpCase1
    a1.sinks.k1.hdfs.filePrefix = events-
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 10
    a1.sinks.k1.hdfs.roundUnit = minute
    a1.sinks.k1.hdfs.rollInterval = 3
    a1.sinks.k1.hdfs.rollSize = 20
    a1.sinks.k1.hdfs.rollCount = 5
    a1.sinks.k1.hdfs.batchSize = 1
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    #生成的文件类型,默认是 Sequencefile,可用 DataStream,则为普通文本
    a1.sinks.k1.hdfs.fileType = DataStream
     
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
     
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

    运行命令:

    bin/flume-ng agent -c conf/ -n a1 -f jar/ToUpCase.conf -C jar/Flume-1.0-SNAPSHOT.jar -Dflume.root.logger=DEBUG,console
    

    三、Flume对接kafka

    配置flume(flume-kafka.conf)

    # define
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F -c +0 /opt/jars/calllog.csv
    a1.sources.r1.shell = /bin/bash -c
    
    # sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.brokerList = bigdata111:9092,bigdata112:9092,bigdata113:9092
    a1.sinks.k1.topic = calllog
    a1.sinks.k1.batchSize = 20
    a1.sinks.k1.requiredAcks = 1
    
    # channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # bind
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

    进入flume根目录下,启动flume

    /opt/module/flume-1.8.0/bin/flume-ng agent --conf /opt/module/flume-1.8.0/conf/ --name a1 --conf-file /opt/jars/flume2kafka.conf
    

    四、kafka对接Flume

    kafka2flume.conf

    agent.sources = kafkaSource
    agent.channels = memoryChannel
    agent.sinks = hdfsSink
    
    # The channel can be defined as follows.
    agent.sources.kafkaSource.channels = memoryChannel
    agent.sources.kafkaSource.type=org.apache.flume.source.kafka.KafkaSource
    agent.sources.kafkaSource.zookeeperConnect=bigdata111:2181,bigdata112:2181,bigdata113:2181
    agent.sources.kafkaSource.topic=calllog
    #agent.sources.kafkaSource.groupId=flume
    agent.sources.kafkaSource.kafka.consumer.timeout.ms=100
    
    
    agent.channels.memoryChannel.type=memory
    agent.channels.memoryChannel.capacity=10000
    agent.channels.memoryChannel.transactionCapacity=1000
    agent.channels.memoryChannel.type=memory
    agent.channels.memoryChannel.capacity=10000
    agent.channels.memoryChannel.transactionCapacity=1000
    
    
    # the sink of hdfs
    agent.sinks.hdfsSink.type=hdfs
    agent.sinks.hdfsSink.channel = memoryChannel
    agent.sinks.hdfsSink.hdfs.path=hdfs://bigdata111:9000/kafka2flume
    agent.sinks.hdfsSink.hdfs.writeFormat=Text
    agent.sinks.hdfsSink.hdfs.fileType=DataStream
    #这两个不配置,会产生大量的小文件
    agent.sinks.hdfsSink.hdfs.rollSize=0
    agent.sinks.hdfsSink.hdfs.rollCount=0
    

    启动命令

    bin/flume-ng agent --conf conf --conf-file jobconf/kafka2flume.conf --name agent -Dflume.root.logger=INFO,console
    

    注意:这个配置是从kafka过数据,但是需要重新向kafka的topic灌数据,他才会传到HDFS

    展开全文
  • flume拦截器

    2017-10-11 22:30:23
    对于flume拦截器,我的理解是:在app(应用程序日志)和 source 之间的,对app日志进行拦截处理的。也即在日志进入到source之前,对日志进行一些包装、清新过滤等等动作。 官方上提供的已有的拦截器有: ...

    1.http://blog.csdn.net/xiao_jun_0820/article/details/38111305

    对于flume拦截器,我的理解是:在app(应用程序日志)和 source 之间的,对app日志进行拦截处理的。也即在日志进入到source之前,对日志进行一些包装、清新过滤等等动作。

    官方上提供的已有的拦截器有:

    Timestamp Interceptor

    Host Interceptor

    Static Interceptor

    Regex Filtering Interceptor

    Regex Extractor Interceptor


    像很多java的开源项目如springmvc中的拦截器一样,flume的拦截器也是chain形式的,可以对一个source指定多个拦截器,按先后顺序依次处理。

    Timestamp Interceptor :在event的header中添加一个key叫:timestamp,value为当前的时间戳。这个拦截器在sink为hdfs 时很有用,后面会举例说到

    Host Interceptor:在event的header中添加一个key叫:host,value为当前机器的hostname或者ip。

    Static Interceptor:可以在event的header中添加自定义的key和value。

    Regex Filtering Interceptor:通过正则来清洗或包含匹配的events。

    Regex Extractor Interceptor:通过正则表达式来在header中添加指定的key,value则为正则匹配的部分


    下面举例说明这些拦截器的用法,首先我们调整一下第一篇文章中的那个WriteLog类:
    [java] view plain copy
    1. public class WriteLog {  
    2.     protected static final Log logger = LogFactory.getLog(WriteLog.class);  
    3.   
    4.     /** 
    5.      * @param args 
    6.      * @throws InterruptedException 
    7.      */  
    8.     public static void main(String[] args) throws InterruptedException {  
    9.         // TODO Auto-generated method stub  
    10.         while (true) {  
    11.             logger.info(new Date().getTime());  
    12.             logger.info("{\"requestTime\":"  
    13.                     + System.currentTimeMillis()  
    14.                     + ",\"requestParams\":{\"timestamp\":1405499314238,\"phone\":\"02038824941\",\"cardName\":\"测试商家名称\",\"provinceCode\":\"440000\",\"cityCode\":\"440106\"},\"requestUrl\":\"/reporter-api/reporter/reporter12/init.do\"}");  
    15.             Thread.sleep(2000);  
    16.   
    17.         }  
    18.     }  
    19. }  
    又多输出了一行日志信息,现在每次循环都会输出两行日志信息,第一行是一个时间戳信息,第二行是一行JSON格式的字符串信息。

    接下来我们用regex_filter和 timestamp这两个拦截器来实现这样一个功能:
    1 过滤掉LOG4J输出的第一行那个时间戳日志信息,只收集JSON格式的日志信息
    2 将收集的日志信息保存到HDFS上,每天的日志保存到以该天命名的目录下面,如2014-7-25号的日志,保存到/flume/events/14-07-25目录下面。

    修改后的flume.conf如下:
    [plain] view plain copy
    1. tier1.sources=source1  
    2. tier1.channels=channel1  
    3. tier1.sinks=sink1  
    4.   
    5. tier1.sources.source1.type=avro  
    6. tier1.sources.source1.bind=0.0.0.0  
    7. tier1.sources.source1.port=44444  
    8. tier1.sources.source1.channels=channel1  
    9.   
    10. tier1.sources.source1.interceptors=i1 i2  
    11. tier1.sources.source1.interceptors.i1.type=regex_filter  
    12. tier1.sources.source1.interceptors.i1.regex=\\{.*\\}  
    13. tier1.sources.source1.interceptors.i2.type=timestamp  
    14.   
    15. tier1.channels.channel1.type=memory  
    16. tier1.channels.channel1.capacity=10000  
    17. tier1.channels.channel1.transactionCapacity=1000  
    18. tier1.channels.channel1.keep-alive=30  
    19.   
    20. tier1.sinks.sink1.type=hdfs  
    21. tier1.sinks.sink1.channel=channel1  
    22. tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events/%y-%m-%d  
    23. tier1.sinks.sink1.hdfs.fileType=DataStream  
    24. tier1.sinks.sink1.hdfs.writeFormat=Text  
    25. tier1.sinks.sink1.hdfs.rollInterval=0  
    26. tier1.sinks.sink1.hdfs.rollSize=10240  
    27. tier1.sinks.sink1.hdfs.rollCount=0  
    28. tier1.sinks.sink1.hdfs.idleTimeout=60  

    我们对source1添加了两个拦截器i1和i2,i1为regex_filter,过滤的正则为\\{.*\\},注意正则的写法用到了转义字符,不然source1无法启动,会报错。
    i2为timestamp,在header中添加了一个timestamp的key,然后我们修改了sink1.hdfs.path在后面加上了/%y-%m-%d这一串字符,这一串字符要求event的header中必须有timestamp这个key,这就是为什么我们需要添加一个timestamp拦截器的原因,如果不添加这个拦截器,无法使用这样的占位符,会报错。还有很多占位符,请参考官方文档。

    然后运行WriteLog,去hdfs上查看对应目录下面的文件,会发现内容只有JSON字符串的日志,与我们的功能描述一致。
    2.http://blog.csdn.net/ty_laurel/article/details/54585726

    概述

    Flume 除了主要的三大组件 Source、Channel和 Sink,还有一些其他灵活的组件,如拦截器、SourceRunner运行器、Channel选择器和Sink处理器等。

    组件框架图

    今天主要来看看拦截器,先看下组件框架流程图,熟悉了大致框架流程学习起来必然会更加轻松: 

    1. 接收事件
    2. 根据配置选择对应的Source运行器(EventDrivenSourceRunner 和 PollableSourceRunner)
    3. 处理器处理事件(Load-Balancing Sink 和 Failover Sink 处理器)
    4. 将事件传递给拦截器链
    5. 将每个事件传递给Channel选择器
    6. 返回写入事件的Channel列表
    7. 将所有事件写入每个必需的Channel,只有一个事务被打开
    8. 可选Channel(配置可选Channel后不管其是否写入成功)

    拦截器

    拦截器(Interceptor)是简单插件式组件,设置在Source和Channel之间,Source接收到event在写入到对应的Channel之前,可以通过调用的拦截器转换或者删除过滤掉一部分event。通过拦截器后返回的event数不能大于原本的数量。在一个Flume 事件流程中,可以添加任意数量的拦截器转换或者删除从单个Source中来的事件,Source将同一个事务的所有事件event传递给Channel处理器,进而依次可以传递给多个拦截器,直至从最后一个拦截器中返回的最终事件event写入到对应的Channel中。 
    flume-1.7版本支持的拦截器: 

    编写自定义拦截器

    自定义的拦截器编写,我们只需要实现一个Interceptor接口即可,该接口的定义如下:

    1. public interface Interceptor {
    2. /* 任何需要拦截器初始化或者启动的操作就可以定义在此,无则为空即可 */
    3. public void initialize();
    4. /* 每次只处理一个Event */
    5. public Event intercept(Event event);
    6. /* 量处理Event */
    7. public List<Event> intercept(List<Event> events);
    8. /*需要拦截器执行的任何closing/shutdown操作,一般为空 */
    9. public void close();
    10. /* 获取配置文件中的信息,必须要有一个无参的构造方法 */
    11. public interface Builder extends Configurable {
    12. public Interceptor build();
    13. }
    14. }

    接口中的几个方法或者内部接口含义代码中已经标注,需要留意的地方就是考虑到多线程运行Source时,需要保证编写的代码是线程安全的。这里就不展示自定义拦截器代码了,仿照已有的拦截器,可以很容易的编写一个简单功能的自定义拦截器的。

    实际使用及问题

    问题:

    目前环境中使用的都是tailSource、hdfsSink,在sink时根据时间对日志分割成不同的目录,但是实际过程中存在一些延迟,导致sink写入hdfs时的时间和日志文件中记录的时间存在一些差异;并且不能保留原有的日志文件名。

    需求:

    1. 根据日志中记录的时间对文件进行分目录存储
    2. 将source端读取的日志名字符串添加至hdfsSink写入hdfs的文件名中(在hdfs文件中可以根据文件名区分日志)

    日志格式如下:

    1. 2017/01/13 13:30:00 ip:123.178.46.252 message:[{"s":"bbceif1484117100097","u":"354910072847819","id":"2x1kfBk63z","e":
    2. 2017/01/13 14:50:00 ip:123.178.46.252 message:[{"s":"bbceif1484117100097","u":"354910072847819","id":"2x1kfBk63z","e":
    3. 2017/01/13 15:52:00 ip:123.178.46.252 message:[{"s":"bbceif1484117100097","u":"354910072847819","id":"2x1kfBk63z","e":
    4. 2017/01/13 16:53:00 ip:123.178.46.252 message:[{"s":"bbceif1484117100097","u":"354910072847819","id":"2x1kfBk63z","e":
    5. 2017/01/14 13:50:00 ip:123.178.46.252 message:[{"s":"bbceif1484117100097","u":"354910072847819","id":"2x1kfBk63z","e":
    6. 2017/01/14 13:50:00 ip:123.178.46.252 message:[{"s":"bbceif1484117100097","u":"354910072847819","id":"2x1kfBk63z","e":
    7. 2017/01/14 14:50:00 ip:123.178.46.252 message:[{"s":"bbceif1484117100097","u":"354910072847819","id":"2x1kfBk63z","e":
    8. 2017/01/14 14:56:00 ip:123.178.46.252 message:[{"s":"bbceif1484117100097","u":"354910072847819","id":"2x1kfBk63z","e":

    如何实现以上需求?

    1. 要了解TaildirSource如何读取日志文件,按行读取还是按数据量大小? 
      分析代码可知,无论单个事件操作还是批量操作均是按行读取
    2. hdfsSink如何对文件进行分目录? 
      若定义了hdfs.useLocalTimeStamp = true ,则是根据本地时间戳分目录,否则是从事件的header中获取时间戳。

    明白了这两个问题,就可以继续往前走了。

    实现需求1

    Source端: 
    经过调研查阅资料发现,有拦截器就可以直接实现该目标功能。使用RegexExtractorInterceptor正则抽取拦截器,匹配日志中的时间字符串,将其添加至Event的header中(header的key值为timestamp),写入header时序列化只能使用org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer(该序列化器内部根据配置传入的pattern将时间转换为时间戳格式):

    1. agent1.sources.r1.interceptors = inter
    2. agent1.sources.r1.interceptors.inter.type = regex_extractor
    3. agent1.sources.r1.interceptors.inter.regex = ^(\\d\\d\\d\\d/\\d\\d/\\d\\d\\s\\d\\d:\\d\\d:\\d\\d).*
    4. agent1.sources.r1.interceptors.inter.serializers = s1
    5. #agent1.sources.r1.interceptors.inter.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer //该序列化内部只是将传入的匹配项直接返回return
    6. agent1.sources.r1.interceptors.inter.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
    7. agent1.sources.r1.interceptors.inter.serializers.s1.name = timestamp
    8. agent1.sources.r1.interceptors.inter.serializers.s1.pattern = yyyy/MM/dd HH:mm:ss

    Sink端: 
    Sink端只需要注意不要设置hdfs.useLocalTimeStamp 为 true,也就是不使用本地时间,默认为false即可。

    1. agent1.sinks.k1.type = hdfs
    2. agent1.sinks.k1.channel = c2
    3. agent1.sinks.k1.hdfs.path = /user/portal/tmp/syx/test2/%Y%m%d/%Y%m%d%H
    4. agent1.sinks.k1.hdfs.filePrefix = events-%[localhost]-%{timestamp} //%[localhost] 获取主机名,%{timestamp} 获取事件header中key为timestamp的值value
    5. #agent1.sinks.k1.hdfs.useLocalTimeStamp = true //注意此处直接使用Event header中的timestamp,不适用本地时间戳
    6. agent1.sinks.k1.hdfs.callTimeout = 100000

    实现需求2

    tailDirSource端使用参数:

    fileHeader false Whether to add a header storing the absolute path filename.
    fileHeaderKey file Header key to use when appending absolute path filename to event header.

    fileHeader 设置为 true ,可以将日志文件的绝对路径存储在事件的header中; 
    fileHeaderKey 目前来说不需要设置,它指定了存储在header中路径的key 名(header中是以key-value对存储),默认为 file。如下:

    1. Event: { headers:{timestamp=1452581700000, file=/home/hadoop_portal/tiany/test.log} body: 32 30 31 36 2F 30 31 2F 31 32 20 31 34 3A 35 35 2016/01/12 14:55 }

    因为hdfsSink可以直接从事件的header中读取字串作为hdfs文件名的一部分,可以通过将日志文件名添加至header中来实现。现在看起来实现上述需求就很简单了,只需要将绝对路径名修改为文件名就行了,这就可以修改tailDirSource中往 Event 中添加header时的代码了,如下:

    1. //ReliableTaildirEventReader.java中的readEvents方法
    2. Map<String, String> headers = currentFile.getHeaders();
    3. if (annotateFileName || (headers != null && !headers.isEmpty())) {
    4. for (Event event : events) {
    5. if (headers != null && !headers.isEmpty()) {
    6. event.getHeaders().putAll(headers);
    7. }
    8. if (annotateFileName) { //判断是否需要设置日志文件路径名至header中,为boolean类型
    9. int lastIndex = currentFile.getPath().lastIndexOf('/'); //获取绝对路径中最后一次出现'/'的索引,根据索引获取路径中的文件名字串即可
    10. event.getHeaders().put(fileNameHeader, currentFile.getPath().substring(lastIndex+1));
    11. }
    12. }
    13. }

    按以上方法操作,两个需求可以算就是完成了,flume测试跑了一天,很符合需求,以为这样任务就完成了吗? NO,隔了一晚上悲催的事就发生了,flume狂报错,日志显示无法从事件header中获取到时间戳timestamp,很纳闷,不是明明就将timestamp写入到header中了吗? 
    为了检测header中是否真的没有timestamp,将Sink修改为logger Sink(因为该方式可以将事件的header和body以日志形式打印出来,方便查看),修改之后测试跑了几个小时,接下来就是分析log,发现确实如报错,Source过来的日志有一些确实是没有时间字段的。 
    这种问题该如何解决呢?其实也是很简单的,研究RegexExtractorInterceptor拦截器的源代码,发现其中只是对匹配到指定格式时做了相应的处理,但是对于未匹配到的日志行时不做任何处理,因而修改源代码,在未匹配到指定字串时,添加默认的时间戳即可,但是不能为空,因为hdfsSink分目录时必须要从事件header中获取到timestamp的,否则就会报错,修改后代码如下:

    1. public Event intercept(Event event) {
    2. Matcher matcher = regex.matcher(
    3. new String(event.getBody(), Charsets.UTF_8));
    4. Map<String, String> headers = event.getHeaders();
    5. if (matcher.find()) { //匹配到执行if语句中代码
    6. for (int group = 0, count = matcher.groupCount(); group < count; group++) {
    7. int groupIndex = group + 1;
    8. if (groupIndex > serializers.size()) {
    9. if (logger.isDebugEnabled()) {
    10. logger.debug("Skipping group {} to {} due to missing serializer",
    11. group, count);
    12. }
    13. break;
    14. }
    15. NameAndSerializer serializer = serializers.get(group);
    16. if (logger.isDebugEnabled()) {
    17. logger.debug("Serializing {} using {}", serializer.headerName,
    18. serializer.serializer);
    19. }
    20. headers.put(serializer.headerName,
    21. serializer.serializer.serialize(matcher.group(groupIndex)));
    22. }
    23. //日志中没匹配到指定时间格式,添加当前时间为时间戳
    24. } else {
    25. long now = System.currentTimeMillis();
    26. headers.put("timestamp", Long.toString(now));
    27. }
    28. return event;
    29. }

    maven重新打包,替换掉原先的flume-ng-core.jar包即可,重新运行问题解决。

    注意:若使用了KafkaChannel,parseAsFlumeEvent 应该使用默认值true,因为在Sink时需要读取Event中的header内容。

    总结

    flume的拦截器还是很有用的,可以在写入Channel之前先对日志做一次清洗,根据实际的需求编写自定义拦截器或者使用已有的拦截器,可以很方便的完成一些需求。对于这次的问题,虽然解决了,但是还是感觉很尴尬(日志提供方给出的日志格式每条日志都有时间字段,怪他们?no),其实主要还是由于自己没有考虑全面,只需要几行代码的事。因此在今后的学习工作生活中,无论干神马事,都得方方面面考虑,对于开发人员来说特别是故障处理、应急处理,很重要的。

    3.http://blog.csdn.net/xiao_jun_0820/article/details/38333171

    还是针对学习八中的那个需求,我们现在换一种实现方式,采用拦截器来实现。

    先回想一下,spooldir source可以将文件名作为header中的key:basename写入到event的header当中去。试想一下,如果有一个拦截器可以拦截这个event,然后抽取header中这个key的值,将其拆分成3段,每一段都放入到header中,这样就可以实现那个需求了。

    遗憾的是,flume没有提供可以拦截header的拦截器。不过有一个抽取body内容的拦截器:RegexExtractorInterceptor,看起来也很强大,以下是一个官方文档的示例:

    If the Flume event body contained 1:2:3.4foobar5 and the following configuration was used


    a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
    a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
    a1.sources.r1.interceptors.i1.serializers.s1.name = one
    a1.sources.r1.interceptors.i1.serializers.s2.name = two
    a1.sources.r1.interceptors.i1.serializers.s3.name = three
    The extracted event will contain the same body but the following headers will have been added one=>1, two=>2, three=>3

    大概意思就是,通过这样的配置,event body中如果有1:2:3.4foobar5 这样的内容,这会通过正则的规则抽取具体部分的内容,然后设置到header当中去。


    于是决定打这个拦截器的主义,觉得只要把代码稍微改改,从拦截body改为拦截header中的具体key,就OK了。翻开源码,哎呀,很工整,改起来没难度,以下是我新增的一个拦截器:RegexExtractorExtInterceptor:

    [java] view plain copy
    1. package com.besttone.flume;  
    2.   
    3. import java.util.List;  
    4. import java.util.Map;  
    5. import java.util.regex.Matcher;  
    6. import java.util.regex.Pattern;  
    7.   
    8. import org.apache.commons.lang.StringUtils;  
    9. import org.apache.flume.Context;  
    10. import org.apache.flume.Event;  
    11. import org.apache.flume.interceptor.Interceptor;  
    12. import org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer;  
    13. import org.apache.flume.interceptor.RegexExtractorInterceptorSerializer;  
    14. import org.slf4j.Logger;  
    15. import org.slf4j.LoggerFactory;  
    16.   
    17. import com.google.common.base.Charsets;  
    18. import com.google.common.base.Preconditions;  
    19. import com.google.common.base.Throwables;  
    20. import com.google.common.collect.Lists;  
    21.   
    22. /** 
    23.  * Interceptor that extracts matches using a specified regular expression and 
    24.  * appends the matches to the event headers using the specified serializers</p> 
    25.  * Note that all regular expression matching occurs through Java's built in 
    26.  * java.util.regex package</p>. Properties: 
    27.  * <p> 
    28.  * regex: The regex to use 
    29.  * <p> 
    30.  * serializers: Specifies the group the serializer will be applied to, and the 
    31.  * name of the header that will be added. If no serializer is specified for a 
    32.  * group the default {@link RegexExtractorInterceptorPassThroughSerializer} will 
    33.  * be used 
    34.  * <p> 
    35.  * Sample config: 
    36.  * <p> 
    37.  * agent.sources.r1.channels = c1 
    38.  * <p> 
    39.  * agent.sources.r1.type = SEQ 
    40.  * <p> 
    41.  * agent.sources.r1.interceptors = i1 
    42.  * <p> 
    43.  * agent.sources.r1.interceptors.i1.type = REGEX_EXTRACTOR 
    44.  * <p> 
    45.  * agent.sources.r1.interceptors.i1.regex = (WARNING)|(ERROR)|(FATAL) 
    46.  * <p> 
    47.  * agent.sources.r1.interceptors.i1.serializers = s1 s2 
    48.  * agent.sources.r1.interceptors.i1.serializers.s1.type = 
    49.  * com.blah.SomeSerializer agent.sources.r1.interceptors.i1.serializers.s1.name 
    50.  * = warning agent.sources.r1.interceptors.i1.serializers.s2.type = 
    51.  * org.apache.flume.interceptor.RegexExtractorInterceptorTimestampSerializer 
    52.  * agent.sources.r1.interceptors.i1.serializers.s2.name = error 
    53.  * agent.sources.r1.interceptors.i1.serializers.s2.dateFormat = yyyy-MM-dd 
    54.  * </code> 
    55.  * </p> 
    56.  *  
    57.  * <pre> 
    58.  * Example 1: 
    59.  * </p> 
    60.  * EventBody: 1:2:3.4foobar5</p> Configuration: 
    61.  * agent.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d) 
    62.  * </p> 
    63.  * agent.sources.r1.interceptors.i1.serializers = s1 s2 s3 
    64.  * agent.sources.r1.interceptors.i1.serializers.s1.name = one 
    65.  * agent.sources.r1.interceptors.i1.serializers.s2.name = two 
    66.  * agent.sources.r1.interceptors.i1.serializers.s3.name = three 
    67.  * </p> 
    68.  * results in an event with the the following 
    69.  *  
    70.  * body: 1:2:3.4foobar5 headers: one=>1, two=>2, three=3 
    71.  *  
    72.  * Example 2: 
    73.  *  
    74.  * EventBody: 1:2:3.4foobar5 
    75.  *  
    76.  * Configuration: agent.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d) 
    77.  * <p> 
    78.  * agent.sources.r1.interceptors.i1.serializers = s1 s2 
    79.  * agent.sources.r1.interceptors.i1.serializers.s1.name = one 
    80.  * agent.sources.r1.interceptors.i1.serializers.s2.name = two 
    81.  * <p> 
    82.  *  
    83.  * results in an event with the the following 
    84.  *  
    85.  * body: 1:2:3.4foobar5 headers: one=>1, two=>2 
    86.  * </pre> 
    87.  */  
    88. public class RegexExtractorExtInterceptor implements Interceptor {  
    89.   
    90.     static final String REGEX = "regex";  
    91.     static final String SERIALIZERS = "serializers";  
    92.   
    93.     // 增加代码开始  
    94.   
    95.     static final String EXTRACTOR_HEADER = "extractorHeader";  
    96.     static final boolean DEFAULT_EXTRACTOR_HEADER = false;  
    97.     static final String EXTRACTOR_HEADER_KEY = "extractorHeaderKey";  
    98.   
    99.     // 增加代码结束  
    100.   
    101.     private static final Logger logger = LoggerFactory  
    102.             .getLogger(RegexExtractorExtInterceptor.class);  
    103.   
    104.     private final Pattern regex;  
    105.     private final List<NameAndSerializer> serializers;  
    106.   
    107.     // 增加代码开始  
    108.   
    109.     private final boolean extractorHeader;  
    110.     private final String extractorHeaderKey;  
    111.   
    112.     // 增加代码结束  
    113.   
    114.     private RegexExtractorExtInterceptor(Pattern regex,  
    115.             List<NameAndSerializer> serializers, boolean extractorHeader,  
    116.             String extractorHeaderKey) {  
    117.         this.regex = regex;  
    118.         this.serializers = serializers;  
    119.         this.extractorHeader = extractorHeader;  
    120.         this.extractorHeaderKey = extractorHeaderKey;  
    121.     }  
    122.   
    123.     @Override  
    124.     public void initialize() {  
    125.         // NO-OP...  
    126.     }  
    127.   
    128.     @Override  
    129.     public void close() {  
    130.         // NO-OP...  
    131.     }  
    132.   
    133.     @Override  
    134.     public Event intercept(Event event) {  
    135.         String tmpStr;  
    136.         if(extractorHeader)  
    137.         {  
    138.             tmpStr = event.getHeaders().get(extractorHeaderKey);  
    139.         }  
    140.         else  
    141.         {  
    142.             tmpStr=new String(event.getBody(),  
    143.                     Charsets.UTF_8);  
    144.         }  
    145.           
    146.         Matcher matcher = regex.matcher(tmpStr);  
    147.         Map<String, String> headers = event.getHeaders();  
    148.         if (matcher.find()) {  
    149.             for (int group = 0, count = matcher.groupCount(); group < count; group++) {  
    150.                 int groupIndex = group + 1;  
    151.                 if (groupIndex > serializers.size()) {  
    152.                     if (logger.isDebugEnabled()) {  
    153.                         logger.debug(  
    154.                                 "Skipping group {} to {} due to missing serializer",  
    155.                                 group, count);  
    156.                     }  
    157.                     break;  
    158.                 }  
    159.                 NameAndSerializer serializer = serializers.get(group);  
    160.                 if (logger.isDebugEnabled()) {  
    161.                     logger.debug("Serializing {} using {}",  
    162.                             serializer.headerName, serializer.serializer);  
    163.                 }  
    164.                 headers.put(serializer.headerName, serializer.serializer  
    165.                         .serialize(matcher.group(groupIndex)));  
    166.             }  
    167.         }  
    168.         return event;  
    169.     }  
    170.   
    171.     @Override  
    172.     public List<Event> intercept(List<Event> events) {  
    173.         List<Event> intercepted = Lists.newArrayListWithCapacity(events.size());  
    174.         for (Event event : events) {  
    175.             Event interceptedEvent = intercept(event);  
    176.             if (interceptedEvent != null) {  
    177.                 intercepted.add(interceptedEvent);  
    178.             }  
    179.         }  
    180.         return intercepted;  
    181.     }  
    182.   
    183.     public static class Builder implements Interceptor.Builder {  
    184.   
    185.         private Pattern regex;  
    186.         private List<NameAndSerializer> serializerList;  
    187.   
    188.         // 增加代码开始  
    189.   
    190.         private boolean extractorHeader;  
    191.         private String extractorHeaderKey;  
    192.   
    193.         // 增加代码结束  
    194.   
    195.         private final RegexExtractorInterceptorSerializer defaultSerializer = new RegexExtractorInterceptorPassThroughSerializer();  
    196.   
    197.         @Override  
    198.         public void configure(Context context) {  
    199.             String regexString = context.getString(REGEX);  
    200.             Preconditions.checkArgument(!StringUtils.isEmpty(regexString),  
    201.                     "Must supply a valid regex string");  
    202.   
    203.             regex = Pattern.compile(regexString);  
    204.             regex.pattern();  
    205.             regex.matcher("").groupCount();  
    206.             configureSerializers(context);  
    207.   
    208.             // 增加代码开始  
    209.             extractorHeader = context.getBoolean(EXTRACTOR_HEADER,  
    210.                     DEFAULT_EXTRACTOR_HEADER);  
    211.   
    212.             if (extractorHeader) {  
    213.                 extractorHeaderKey = context.getString(EXTRACTOR_HEADER_KEY);  
    214.                 Preconditions.checkArgument(  
    215.                         !StringUtils.isEmpty(extractorHeaderKey),  
    216.                         "必须指定要抽取内容的header key");  
    217.             }  
    218.             // 增加代码结束  
    219.         }  
    220.   
    221.         private void configureSerializers(Context context) {  
    222.             String serializerListStr = context.getString(SERIALIZERS);  
    223.             Preconditions.checkArgument(  
    224.                     !StringUtils.isEmpty(serializerListStr),  
    225.                     "Must supply at least one name and serializer");  
    226.   
    227.             String[] serializerNames = serializerListStr.split("\\s+");  
    228.   
    229.             Context serializerContexts = new Context(  
    230.                     context.getSubProperties(SERIALIZERS + "."));  
    231.   
    232.             serializerList = Lists  
    233.                     .newArrayListWithCapacity(serializerNames.length);  
    234.             for (String serializerName : serializerNames) {  
    235.                 Context serializerContext = new Context(  
    236.                         serializerContexts.getSubProperties(serializerName  
    237.                                 + "."));  
    238.                 String type = serializerContext.getString("type""DEFAULT");  
    239.                 String name = serializerContext.getString("name");  
    240.                 Preconditions.checkArgument(!StringUtils.isEmpty(name),  
    241.                         "Supplied name cannot be empty.");  
    242.   
    243.                 if ("DEFAULT".equals(type)) {  
    244.                     serializerList.add(new NameAndSerializer(name,  
    245.                             defaultSerializer));  
    246.                 } else {  
    247.                     serializerList.add(new NameAndSerializer(name,  
    248.                             getCustomSerializer(type, serializerContext)));  
    249.                 }  
    250.             }  
    251.         }  
    252.   
    253.         private RegexExtractorInterceptorSerializer getCustomSerializer(  
    254.                 String clazzName, Context context) {  
    255.             try {  
    256.                 RegexExtractorInterceptorSerializer serializer = (RegexExtractorInterceptorSerializer) Class  
    257.                         .forName(clazzName).newInstance();  
    258.                 serializer.configure(context);  
    259.                 return serializer;  
    260.             } catch (Exception e) {  
    261.                 logger.error("Could not instantiate event serializer.", e);  
    262.                 Throwables.propagate(e);  
    263.             }  
    264.             return defaultSerializer;  
    265.         }  
    266.   
    267.         @Override  
    268.         public Interceptor build() {  
    269.             Preconditions.checkArgument(regex != null,  
    270.                     "Regex pattern was misconfigured");  
    271.             Preconditions.checkArgument(serializerList.size() > 0,  
    272.                     "Must supply a valid group match id list");  
    273.             return new RegexExtractorExtInterceptor(regex, serializerList,  
    274.                     extractorHeader, extractorHeaderKey);  
    275.         }  
    276.     }  
    277.   
    278.     static class NameAndSerializer {  
    279.         private final String headerName;  
    280.         private final RegexExtractorInterceptorSerializer serializer;  
    281.   
    282.         public NameAndSerializer(String headerName,  
    283.                 RegexExtractorInterceptorSerializer serializer) {  
    284.             this.headerName = headerName;  
    285.             this.serializer = serializer;  
    286.         }  
    287.     }  
    288. }  

    简单说明一下改动的内容:

    增加了两个配置参数:

    extractorHeader   是否抽取的是header部分,默认为false,即和原始的拦截器功能一致,抽取的是event body的内容

    extractorHeaderKey 抽取的header的指定的key的内容,当extractorHeader为true时,必须指定该参数。

    按照第八讲的方法,我们将该类打成jar包,作为flume的插件放到了/var/lib/flume-ng/plugins.d/RegexExtractorExtInterceptor/lib目录下,重新启动flume,将该拦截器加载到classpath中。

    最终的flume.conf如下:

    [plain] view plain copy
    1. tier1.sources=source1  
    2. tier1.channels=channel1  
    3. tier1.sinks=sink1  
    4. tier1.sources.source1.type=spooldir  
    5. tier1.sources.source1.spoolDir=/opt/logs  
    6. tier1.sources.source1.fileHeader=true  
    7. tier1.sources.source1.basenameHeader=true  
    8. tier1.sources.source1.interceptors=i1  
    9. tier1.sources.source1.interceptors.i1.type=com.besttone.flume.RegexExtractorExtInterceptor$Builder  
    10. tier1.sources.source1.interceptors.i1.regex=(.*)\\.(.*)\\.(.*)  
    11. tier1.sources.source1.interceptors.i1.extractorHeader=true  
    12. tier1.sources.source1.interceptors.i1.extractorHeaderKey=basename  
    13. tier1.sources.source1.interceptors.i1.serializers=s1 s2 s3  
    14. tier1.sources.source1.interceptors.i1.serializers.s1.name=one  
    15. tier1.sources.source1.interceptors.i1.serializers.s2.name=two  
    16. tier1.sources.source1.interceptors.i1.serializers.s3.name=three  
    17. tier1.sources.source1.channels=channel1  
    18. tier1.sinks.sink1.type=hdfs  
    19. tier1.sinks.sink1.channel=channel1  
    20. tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events/%{one}/%{three}  
    21. tier1.sinks.sink1.hdfs.round=true  
    22. tier1.sinks.sink1.hdfs.roundValue=10  
    23. tier1.sinks.sink1.hdfs.roundUnit=minute  
    24. tier1.sinks.sink1.hdfs.fileType=DataStream  
    25. tier1.sinks.sink1.hdfs.writeFormat=Text  
    26. tier1.sinks.sink1.hdfs.rollInterval=0  
    27. tier1.sinks.sink1.hdfs.rollSize=10240  
    28. tier1.sinks.sink1.hdfs.rollCount=0  
    29. tier1.sinks.sink1.hdfs.idleTimeout=60  
    30. tier1.channels.channel1.type=memory  
    31. tier1.channels.channel1.capacity=10000  
    32. tier1.channels.channel1.transactionCapacity=1000  
    33. tier1.channels.channel1.keep-alive=30  

    我把source type改回了内置的spooldir,而不是上一讲自定义的source,然后添加了一个拦截器i1,type是自定义的拦截器:com.besttone.flume.RegexExtractorExtInterceptor$Builder,正则表达式按“.”分隔抽取三部分,分别放到header中的key:one,two,three当中去,即a.log.2014-07-31,通过拦截器后,在header当中就会增加三个key: one=a,two=log,three=2014-07-31。这时候我们在tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events/%{one}/%{three}。

    就实现了和前面第八讲一模一样的需求。


    也可以看到,自定义拦截器的改动成本非常小,比自定义source小多了,我们这就增加了一个类,就实现了该功能。

    4.http://lxw1234.com/archives/2015/11/547.htm

    我们目前的业务场景如下:前端的5台日志收集服务器产生网站日志,使用Flume实时收集日志,并将日志发送至Kafka,然后Kafka中的日志一方面可以导入到HDFS,另一方面供实时计算模块使用。

    前面的文章《Kafka分区机制介绍与示例》介绍过Kafka的分区机制。我们对Kafka中存储日志的Topic指定了多个分区,默认情况下,Kafka Sink在收到events之后,将会随机选择一个该Topic的分区来存储数据,但我们不想这么做,我们需要根据网站日志中的cookieid来决定events存储到哪个分区中,简单来说,就是对cookieid计算hashcode,取绝对值,然后和Topic的分区数做模运算,这样,即实现了多分区的负载均衡,又确保相同的cookieid会写入同一个分区中,这样的处理,对后续的实时计算模块大有好处(后续再介绍)。

    而这样的需求,利用Flume的拦截器即可实现。前面有两篇文章
    Flume中的拦截器(Interceptor)介绍与使用(一)》和
    Flume中的拦截器(Interceptor)介绍与使用(二)
    介绍了Flume的拦截器和使用示例,这里我们使用的拦截器是Regex Extractor Interceptor。
    即从原始events中抽取出cookieid,放入到header中,而Kafka Sink在写入Kafka的时候,会从header中获取指定的key,然后根据分区规则确定该条events写入哪个分区中。

    网站日志格式

    假设原始网站日志有三个字段,分别为 时间|cookieid|ip,中间以单竖线分隔,比如:

    1. 2015-10-30 16:00:00| 967837DE00026C55D8DB2E|127.0.0.1
    2. 2015-10-30 16:05:00| 967837DE00026C55D8DB2E|127.0.0.1
    3. 2015-10-30 17:10:00| AC19BBDC0002A955A4A48F|127.0.0.1
    4. 2015-10-30 17:15:00| AC19BBDC0002A955A4A48F|127.0.0.1

    Flume Source的配置

    1. agent_lxw1234.sources = sources1
    2. agent_lxw1234.channels = fileChannel
    3. agent_lxw1234.sinks = sink1
    4. ##source 配置
    5. agent_lxw1234.sources.sources1.type = com.lxw1234.flume17.TaildirSource
    6. agent_lxw1234.sources.sources1.positionFile = /tmp/flume/agent_lxw1234_position.json
    7. agent_lxw1234.sources.sources1.filegroups = f1
    8. agent_lxw1234.sources.sources1.filegroups.f1 = /tmp/lxw1234_.*.log
    9. agent_lxw1234.sources.sources1.batchSize = 100
    10. agent_lxw1234.sources.sources1.backoffSleepIncrement = 1000
    11. agent_lxw1234.sources.sources1.maxBackoffSleep = 5000
    12. agent_lxw1234.sources.sources1.channels = fileChannel
    13.  

    该source用于监控/tmp/lxw1234_.*.log命名格式的文件。

    Flume Source拦截器配置

    1. ## source 拦截器
    2. agent_lxw1234.sources.sources1.interceptors = i1
    3. agent_lxw1234.sources.sources1.interceptors.i1.type = regex_extractor
    4. agent_lxw1234.sources.sources1.interceptors.i1.regex = .*?\\|(.*?)\\|.*
    5. agent_lxw1234.sources.sources1.interceptors.i1.serializers = s1
    6. agent_lxw1234.sources.sources1.interceptors.i1.serializers.s1.name = key

    该拦截器(Regex Extractor Interceptor)用于从原始日志中抽取cookieid,访问到events header中,header名字为key。

    Flume Kafka Sink配置

    1. # sink 1 配置
    2. agent_lxw1234.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
    3. agent_lxw1234.sinks.sink1.brokerList = developnode1:9091,developnode1:9092,developnode2:9091,developnode2:9092
    4. agent_lxw1234.sinks.sink1.topic = lxw1234
    5. agent_lxw1234.sinks.sink1.channel = fileChannel
    6. agent_lxw1234.sinks.sink1.batch-size = 100
    7. agent_lxw1234.sinks.sink1.requiredAcks = -1
    8. agent_lxw1234.sinks.sink1.kafka.partitioner.class = com.lxw1234.flume17.SimplePartitioner

    该Sink配置为Kafka Sink,将接收到的events发送至kafka集群的topic:lxw1234中。
    其中topic:lxw1234创建时候指定了4个分区,Kafka Sink使用的分区规则为
    com.lxw1234.flume17.SimplePartitioner,它会读取events header中的key值(即cookieid),然后对cookieid应用于分区规则,以便确定该条events发送至哪个分区中。
    关于com.lxw1234.flume17.SimplePartitioner的介绍和代码,见:
    Kafka分区机制介绍与示例》。

    Kafka消费者

    使用下面的Java程序从Kafka中消费数据,打印出每条events所在的分区。

    并从events中抽取cookieid,然后根据com.lxw1234.flume17.SimplePartitioner中的分区规则(Math.abs(cookieid.hashCode()) % 4)测试分区,看是否和获取到的分区一致。

    1. package com.lxw1234.kafka;
    2.  
    3. import java.util.HashMap;
    4. import java.util.List;
    5. import java.util.Map;
    6. import java.util.Properties;
    7.  
    8. import kafka.consumer.Consumer;
    9. import kafka.consumer.ConsumerConfig;
    10. import kafka.consumer.ConsumerIterator;
    11. import kafka.consumer.KafkaStream;
    12. import kafka.javaapi.consumer.ConsumerConnector;
    13. import kafka.message.MessageAndMetadata;
    14.  
    15. public class MyConsumer {
    16. public static void main(String[] args) {
    17. String topic = "lxw1234";
    18. ConsumerConnector consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
    19. Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    20. topicCountMap.put(topic, new Integer(1));
    21. Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    22. KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
    23. ConsumerIterator<byte[], byte[]> it = stream.iterator();
    24. while(it.hasNext()) {
    25. MessageAndMetadata<byte[], byte[]> mam = it.next();
    26. String msg = new String(mam.message());
    27. String cookieid = msg.split("\\|")[1];
    28. int testPartition = Math.abs(cookieid.hashCode()) % 4;
    29. System.out.println("consume: Partition [" + mam.partition() + "] testPartition [" + testPartition + "] Message: [" + new String(mam.message()) + "] ..");
    30. }
    31. }
    32. private static ConsumerConfig createConsumerConfig() {
    33. Properties props = new Properties();
    34. props.put("group.id","group_lxw_test");
    35. props.put("zookeeper.connect","127.0.0.133:2182");
    36. props.put("zookeeper.session.timeout.ms", "4000");
    37. props.put("zookeeper.sync.time.ms", "200");
    38. props.put("auto.commit.interval.ms", "1000");
    39. props.put("auto.offset.reset", "smallest");
    40. return new ConsumerConfig(props);
    41. }
    42. }

    运行结果

    kafka分区

    如图中红框所示,实际events所在的分区和期望分区(testPartition)的结果完全一致,由此可见,所有的events已经按照既定的规则写入Kafka分区中。

    相关阅读:

    Flume中的拦截器(Interceptor)介绍与使用(一)

    Flume中的拦截器(interceptor),用户Source读取events发送到Sink的时候,在events header中加入一些有用的信息,或者对events的内容进行过滤,完成初步的数据清洗。这在实际业务场景中非常有用,Flume-ng 1.6中目前提供了以下拦截器:

    Timestamp Interceptor;
    Host Interceptor;
    Static Interceptor;
    UUID Interceptor;
    Morphline Interceptor;
    Search and Replace Interceptor;
    Regex Filtering Interceptor;
    Regex Extractor Interceptor;

    本文对常用的几种拦截器进行学习和介绍,并附上使用示例。

    本文中使用的Source为TaildirSource,就是监控一个文件的变化,将内容发送给Sink,具体可参考《Flume中的TaildirSource》,Source配置如下:

    1. #-->设置sources名称
    2. agent_lxw1234.sources = sources1
    3. #--> 设置channel名称
    4. agent_lxw1234.channels = fileChannel
    5. #--> 设置sink 名称
    6. agent_lxw1234.sinks = sink1
    7.  
    8. # source 配置
    9. agent_lxw1234.sources.sources1.type = com.lxw1234.flume17.TaildirSource
    10. agent_lxw1234.sources.sources1.positionFile = /tmp/flume/agent_lxw1234_position.json
    11. agent_lxw1234.sources.sources1.filegroups = f1
    12. agent_lxw1234.sources.sources1.filegroups.f1 = /tmp/lxw1234_.*.log
    13. agent_lxw1234.sources.sources1.batchSize = 100
    14. agent_lxw1234.sources.sources1.backoffSleepIncrement = 1000
    15. agent_lxw1234.sources.sources1.maxBackoffSleep = 5000
    16. agent_lxw1234.sources.sources1.channels = fileChannel

    Flume Source中使用拦截器的相关配置如下:

    1. ## source 拦截器
    2. agent_lxw1234.sources.sources1.interceptors = i1 i2
    3. agent_lxw1234.sources.sources1.interceptors.i1.type = host
    4. agent_lxw1234.sources.sources1.interceptors.i1.useIP = false
    5. agent_lxw1234.sources.sources1.interceptors.i1.hostHeader = agentHost
    6. agent_lxw1234.sources.sources1.interceptors.i2.type = timestamp
    7.  

    对一个Source可以使用多个拦截器。

    Timestamp Interceptor

    时间戳拦截器,将当前时间戳(毫秒)加入到events header中,key名字为:timestamp,值为当前时间戳。用的不是很多。比如在使用HDFS Sink时候,根据events的时间戳生成结果文件,hdfs.path = hdfs://cdh5/tmp/dap/%Y%m%d

    hdfs.filePrefix = log_%Y%m%d_%H

    会根据时间戳将数据写入相应的文件中。

    但可以用其他方式代替(设置useLocalTimeStamp = true)。

    Host Interceptor

    主机名拦截器。将运行Flume agent的主机名或者IP地址加入到events header中,key名字为:host(也可自定义)。

    根据上面的Source,拦截器的配置如下:

    1. ## source 拦截器
    2. agent_lxw1234.sources.sources1.interceptors = i1
    3. agent_lxw1234.sources.sources1.interceptors.i1.type = host
    4. agent_lxw1234.sources.sources1.interceptors.i1.useIP = false
    5. agent_lxw1234.sources.sources1.interceptors.i1.hostHeader = agentHost
    6.  
    7. # sink 1 配置
    8. agent_lxw1234.sinks.sink1.type = hdfs
    9. agent_lxw1234.sinks.sink1.hdfs.path = hdfs://cdh5/tmp/lxw1234/%Y%m%d
    10. agent_lxw1234.sinks.sink1.hdfs.filePrefix = lxw1234_%{agentHost}
    11. agent_lxw1234.sinks.sink1.hdfs.fileSuffix = .log
    12. agent_lxw1234.sinks.sink1.hdfs.fileType = DataStream
    13. agent_lxw1234.sinks.sink1.hdfs.useLocalTimeStamp = true
    14. agent_lxw1234.sinks.sink1.hdfs.writeFormat = Text
    15. agent_lxw1234.sinks.sink1.hdfs.rollCount = 0
    16. agent_lxw1234.sinks.sink1.hdfs.rollSize = 0
    17. agent_lxw1234.sinks.sink1.hdfs.rollInterval = 600
    18. agent_lxw1234.sinks.sink1.hdfs.batchSize = 500
    19. agent_lxw1234.sinks.sink1.hdfs.threadsPoolSize = 10
    20. agent_lxw1234.sinks.sink1.hdfs.idleTimeout = 0
    21. agent_lxw1234.sinks.sink1.hdfs.minBlockReplicas = 1
    22. agent_lxw1234.sinks.sink1.channel = fileChannel
    23.  

    该配置用于将source的events保存到HDFS上hdfs://cdh5/tmp/lxw1234的目录下,文件名为lxw1234_<主机名>.log

    Static Interceptor

    静态拦截器,用于在events header中加入一组静态的key和value。

    根据上面的Source,拦截器的配置如下:

    1. ## source 拦截器
    2. agent_lxw1234.sources.sources1.interceptors = i1
    3. agent_lxw1234.sources.sources1.interceptors.i1.type = static
    4. agent_lxw1234.sources.sources1.interceptors.i1.preserveExisting = true
    5. agent_lxw1234.sources.sources1.interceptors.i1.key = static_key
    6. agent_lxw1234.sources.sources1.interceptors.i1.value = static_value
    7.  
    8. # sink 1 配置
    9. agent_lxw1234.sinks.sink1.type = hdfs
    10. agent_lxw1234.sinks.sink1.hdfs.path = hdfs://cdh5/tmp/lxw1234
    11. agent_lxw1234.sinks.sink1.hdfs.filePrefix = lxw1234_%{static_key}
    12. agent_lxw1234.sinks.sink1.hdfs.fileSuffix = .log
    13. agent_lxw1234.sinks.sink1.hdfs.fileType = DataStream
    14. agent_lxw1234.sinks.sink1.hdfs.useLocalTimeStamp = true
    15. agent_lxw1234.sinks.sink1.hdfs.writeFormat = Text
    16. agent_lxw1234.sinks.sink1.hdfs.rollCount = 0
    17. agent_lxw1234.sinks.sink1.hdfs.rollSize = 0
    18. agent_lxw1234.sinks.sink1.hdfs.rollInterval = 600
    19. agent_lxw1234.sinks.sink1.hdfs.batchSize = 500
    20. agent_lxw1234.sinks.sink1.hdfs.threadsPoolSize = 10
    21. agent_lxw1234.sinks.sink1.hdfs.idleTimeout = 0
    22. agent_lxw1234.sinks.sink1.hdfs.minBlockReplicas = 1
    23. agent_lxw1234.sinks.sink1.channel = fileChannel

    看看最终Sink在HDFS上生成的文件结构:

    flume interceptor

    UUID Interceptor

    UUID拦截器,用于在每个events header中生成一个UUID字符串,例如:b5755073-77a9-43c1-8fad-b7a586fc1b97。生成的UUID可以在sink中读取并使用。根据上面的source,拦截器的配置如下:

    1. ## source 拦截器
    2. agent_lxw1234.sources.sources1.interceptors = i1
    3. agent_lxw1234.sources.sources1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
    4. agent_lxw1234.sources.sources1.interceptors.i1.headerName = uuid
    5. agent_lxw1234.sources.sources1.interceptors.i1.preserveExisting = true
    6. agent_lxw1234.sources.sources1.interceptors.i1.prefix = UUID_
    7.  
    8. # sink 1 配置
    9. agent_lxw1234.sinks.sink1.type = logger
    10. agent_lxw1234.sinks.sink1.channel = fileChannel

    运行后在日志中查看header信息:

    flume interceptor

    Morphline Interceptor

    Morphline拦截器,该拦截器使用Morphline对每个events数据做相应的转换。关于Morphline的使用,可参考

    http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html

    后续再研究这块。


    Flume中的拦截器(Interceptor)介绍与使用(二)

    Flume中的拦截器(interceptor),用户Source读取events发送到Sink的时候,在events header中加入一些有用的信息,或者对events的内容进行过滤,完成初步的数据清洗。这在实际业务场景中非常有用,Flume-ng 1.6中目前提供了以下拦截器:

    Timestamp Interceptor;
    Host Interceptor;
    Static Interceptor;
    UUID Interceptor;
    Morphline Interceptor;
    Search and Replace Interceptor;
    Regex Filtering Interceptor;
    Regex Extractor Interceptor;

    本文接上一篇《Flume中的拦截器(Interceptor)介绍与使用(一)》,继续对剩下几种拦截器进行学习和介绍,并附上使用示例。

    Search and Replace Interceptor

    该拦截器用于将events中的正则匹配到的内容做相应的替换。

    具体配置示例如下:

    1. ## source 拦截器
    2. agent_lxw1234.sources.sources1.interceptors = i1
    3. agent_lxw1234.sources.sources1.interceptors.i1.type = search_replace
    4. agent_lxw1234.sources.sources1.interceptors.i1.searchPattern = [0-9]+
    5. agent_lxw1234.sources.sources1.interceptors.i1.replaceString = lxw1234
    6. agent_lxw1234.sources.sources1.interceptors.i1.charset = UTF-8
    7.  
    8. # sink 1 配置
    9. ##agent_lxw1234.sinks.sink1.type = com.lxw1234.sink.MySink
    10. agent_lxw1234.sinks.sink1.type = logger
    11. agent_lxw1234.sinks.sink1.channel = fileChannel
    12.  

    该配置将events中的数字替换为lxw1234。

    原始的events内容为:

    flume interceptor

    实际的events内容为:

    flume interceptor

    Regex Filtering Interceptor

    该拦截器使用正则表达式过滤原始events中的内容。

    配置示例如下:

    1. ## source 拦截器
    2. agent_lxw1234.sources.sources1.interceptors = i1
    3. agent_lxw1234.sources.sources1.interceptors.i1.type = regex_filter
    4. agent_lxw1234.sources.sources1.interceptors.i1.regex = ^lxw1234.*
    5. agent_lxw1234.sources.sources1.interceptors.i1.excludeEvents = false
    6.  
    7. # sink 1 配置
    8. ##agent_lxw1234.sinks.sink1.type = com.lxw1234.sink.MySink
    9. agent_lxw1234.sinks.sink1.type = logger
    10. agent_lxw1234.sinks.sink1.channel = fileChannel
    11.  

    该配置表示过滤掉不是以lxw1234开头的events。

    如果excludeEvents设为true,则表示过滤掉以lxw1234开头的events。

    原始events内容为:

    flume interceptors

    拦截后的events内容为:

    flume interceptors

    Regex Extractor Interceptor

    该拦截器使用正则表达式抽取原始events中的内容,并将该内容加入events header中。

    配置示例如下:

    1. ## source 拦截器
    2. agent_lxw1234.sources.sources1.interceptors = i1
    3. agent_lxw1234.sources.sources1.interceptors.i1.type = regex_extractor
    4. agent_lxw1234.sources.sources1.interceptors.i1.regex = cookieid is (.*?) and ip is (.*?)
    5. agent_lxw1234.sources.sources1.interceptors.i1.serializers = s1 s2
    6. agent_lxw1234.sources.sources1.interceptors.i1.serializers.s1.type = default
    7. agent_lxw1234.sources.sources1.interceptors.i1.serializers.s1.name = cookieid
    8. agent_lxw1234.sources.sources1.interceptors.i1.serializers.s2.type = default
    9. agent_lxw1234.sources.sources1.interceptors.i1.serializers.s2.name = ip
    10.  
    11. # sink 1 配置
    12. ##agent_lxw1234.sinks.sink1.type = com.lxw1234.sink.MySink
    13. agent_lxw1234.sinks.sink1.type = logger
    14. agent_lxw1234.sinks.sink1.channel = fileChannel
    15.  

    该配置从原始events中抽取出cookieid和ip,加入到events header中。

    原始的events内容为:

    flume interceptors

    events header中的内容为:

    flume interceptors

     

    Flume的拦截器可以配合Sink完成许多业务场景需要的功能,

    比如:按照时间及主机生成目标文件目录及文件名;

    配合Kafka Sink完成多分区的写入等等。


    Flume中的TaildirSource

    关键字:Flume、TaildirSource、TailFile、Source

    在通过Flume收集日志的业务场景中,一般都会遇到下面的情况,在日志收集服务器的某个目录下,会按照一段时间生成一个日志文件,并且日志会不断的追加到这个文件中,比如,每小时一个命名规则为log_20151015_10.log的日志文件,所有10点产生的日志都会追加到这个文件中,到了11点,就会生成另一个log_20151015_11.log的文件。

    这种场景如果通过flume(1.6)收集,当前提供的Spooling Directory Source和Exec Source均不能满足动态实时收集的需求,在当前正在开发的flume1.7版本中,提供了一个非常好用的TaildirSource,使用这个source,可以监控一个目录,并且使用正则表达式匹配该目录中的文件名进行实时收集。

    我将TaildirSource的相关源码下载下来(需要做简单修改),然后集成到Flume1.6中,满足了上面提到的需求,获得了良好的效果。

    源码下载地址: 点击下载

    将源码单独编译,打成jar包,上传到$FLUME_HOME/lib/目录下。

    下面的例子中,通过flume监控/tmp/lxw1234-flume/目录下,命名规则为log_.*.log的文件,并将文件内容实时的写入/tmp/flumefiles/目录下,即:source为TaildirSource,sink为file_roll;

    Agent配置

    agent_lxw1234的配置文件如下($FLUME_HOME/conf/agent_lxw1234_conf.properties):

    1. #-->设置sources名称
    2. agent_lxw1234.sources = sources1
    3. #--> 设置channel名称
    4. agent_lxw1234.channels = fileChannel
    5. #--> 设置sink 名称
    6. agent_lxw1234.sinks = sink1
    7.  
    8. # source 配置
    9. agent_lxw1234.sources.sources1.type = com.lxw1234.flume17.TaildirSource
    10. agent_lxw1234.sources.sources1.positionFile = /tmp/flume/taildir_position.json
    11. agent_lxw1234.sources.sources1.filegroups = f1
    12. agent_lxw1234.sources.sources1.filegroups.f1 = /tmp/lxw1234-flume/log_.*.log
    13. agent_lxw1234.sources.sources1.batchSize = 100
    14. agent_lxw1234.sources.sources1.backoffSleepIncrement = 1000
    15. agent_lxw1234.sources.sources1.maxBackoffSleep = 5000
    16. agent_lxw1234.sources.sources1.channels = fileChannel
    17.  
    18. # sink1 配置
    19. agent_lxw1234.sinks.sink1.type = file_roll
    20. agent_lxw1234.sinks.sink1.sink.directory = /tmp/flumefiles
    21. agent_lxw1234.sinks.sink1.sink.rollInterval = 0
    22. agent_lxw1234.sinks.sink1.channel = fileChannel
    23.  
    24. # fileChannel 配置
    25. agent_lxw1234.channels.fileChannel.type = file
    26. #-->检测点文件所存储的目录
    27. agent_lxw1234.channels.fileChannel.checkpointDir = /opt/flume/checkpoint/lxw1234/
    28. #-->数据存储所在的目录设置
    29. agent_lxw1234.channels.fileChannel.dataDirs = /opt/flume/data/lxw1234/
    30. #-->隧道的最大容量
    31. agent_lxw1234.channels.fileChannel.capacity = 10000
    32. #-->事务容量的最大值设置
    33. agent_lxw1234.channels.fileChannel.transactionCapacity = 200
    34.  

    TaildirSource的配置说明(带*的为必须配置)

    **channels**

    **type**

    **filegroups**     指定filegroups,可以有多个,以空格分隔;(TailSource可以同时监控tail多个目录中的文件)

    **filegroups.<filegroupName>**     配置每个filegroup的文件绝对路径,文件名可以用正则表达式匹配

    positionFile    配置检查点文件的路径,检查点文件会以json格式保存已经tail文件的位置

    启动Agent

    cd $FLUME_HOME/conf/

    flume-ng agent -n agent_lxw1234 –conf . -f agent_lxw1234_conf.properties

    运行结果

    启动之后,在sink所指的/tmp/flumefiles目录下,生成了一个大小为0的目标文件,命令为时间戳-1,如:

    flume tail file

    接着往监控的目录中生成log_20151015_10.log的文件:

    flume tail file

     

    此时,在上面tail –f目标文件的控制台中,已经可以看到写入的内容了:

    flume tail file

    再模拟生成一个新的文件(log_20151015_11.log):

    flume tail file

     

    同样,目标文件中也正常写入:

    flume tail file

     

     

    如果在监控的目录/tmp/lxw1234-flume/中,产生和所配置的文件名正则表达式不匹配的文件,则不会被tail。

    另外,如果将所监控目录/tmp/lxw1234-flume/中已经过期的文件移除,也不会影响agent的运行。

    检查点文件positionFile

    看一下该文件的内容:flume tail file

    该文件中记录了所监控的每个文件的当前位置,如图中红圈圈出的pos的值,因为两个文件都已经读到了最后,因此每个pos的值就是该文件的大小。

    TailSource使用了RandomAccessFile来根据positionFile中保存的文件位置来读取文件的,在agent重启之后,亦会先从positionFile中找到上次读取的文件位置,保证内容不会重复发送。


    展开全文
  • Flume 拦截器

    2021-02-07 17:51:15
    Flume中允许使用拦截器对传输中的event进行拦截和处理!拦截器必须实现org.apache.flume.interceptor.Interceptor接口。拦截器可以根据开发者的设定修改甚至删除event!Flume同时支持拦截器链,Interceptors 可以...

    概念

    Interceptors 是拦截 event对象的.是在Source放到Channel的过程中拦截的,那么你就可以在放到Channel之前对Event对象进行处理,比如说加工一下Event或者删除一下Event都行.

    在Flume中允许使用拦截器对传输中的event进行拦截和处理!拦截器必须实现org.apache.flume.interceptor.Interceptor接口。拦截器可以根据开发者的设定修改甚至删除event!Flume同时支持拦截器链,Interceptors 可以设置多个,这样的话多个Interceptors 可以都是生效,你拦截完了处理完了, 下一个Interceptors 可以接着拦截处理event对象. 拦截器链的Interceptors 拦截是按照你配置的顺序依次进行拦截的.

    编写拦截器

    https://www.yuque.com/docs/share/62a96c95-e342-49ed-a8bc-6d359b0b275b?# 《Flume编写拦截器》

    展开全文
  • flume 拦截器

    2019-12-24 15:18:24
    https://blog.csdn.net/u012443641/article/details/80757229
    展开全文
  • Flume拦截器(含自定义拦截器)一、Flume拦截器1.1 时间戳拦截器1.2 主机名拦截器1.3 UUID拦截器1.4 查询替换拦截器1.5 正则过滤拦截器1.6 正则抽取拦截器二、Flume自定义拦截器2.1 添加Pom.xml依赖2.2 自定义实现...
  • Flume拦截器实战

    2019-09-22 10:21:39
    文章目录六、 Flume拦截器实战案例1. 日志的采集和汇总1.1. 案例场景1.2. 场景分析1.3. 数据流程处理分析1.4. 功能实现2. Flume自定义拦截器2.1. 案例背景介绍2.2. 自定义拦截器2.3. 功能实现2.4. 项目...
  • 相信大家对Flume都已经很了解,Flume是Cloudera提供的一个...Flume拦截器(Interceptor)是设置在source和channel之间,数据接收到Source后,当数据进入到Channel之前,拦截器都可以对这些数据做事件拦截,根据不同的...
  • 自定义flume拦截器

    2020-07-20 17:21:14
    flume拦截器LogTypeinterCeptor(数据类型分发)LogETLinterCeptor(数据清洗拦截器)LogETLinterCeptor(定义的方法类) LogTypeinterCeptor(数据类型分发) import org.apache.flume.Context; import org.apache....
  • Apache Flume 拦截器

    2020-12-03 16:52:27
    Apache Flume 拦截器拦截器Flume自定义拦截器 案例场景 A、B两台日志服务机器实时生产日志主要类型为access.log、nginx.log、web.log 现在要求: 把A、B 机器中的access.log、nginx.log、web.log 采集汇总到C机器上...
  • Flume拦截器一.使用正则拦截器(去掉首行)二.自定义拦截器1.创建maven工程2.在idea中自定义编写拦截器3.打成jar包传到$FLUME_HOME/lib 目录下4.编写agent文件5.执行结果 一.使用正则拦截器(去掉首行) 需求: 使用...
  • flume 拦截器(interceptor)1、flume拦截器介绍拦截器是简单的插件式组件,设置在source和channel之间。source接收到的事件event,在写入channel之前,拦截器都可以进行转换或者删除这些事件。每个拦截器只处理同一个...
  • 一、Flume拦截器(interceptor)介绍 二、Flume内置的拦截器 1、时间戳拦截器 2、主机拦截器 3、静态拦截器 4、正则过滤拦截器 三、静态拦截器综合案例实现 1. 案例场景 2. 场景分析 3. 数据流程处理分析 4....
  • 大数据-Flume拦截器

    2019-09-18 21:39:47
    Flume拦截器 当Source读取events发送到Sink的时候,在events header中加入一些有用的信息,或者对events的内容进行过滤,完成初步的数据清洗。 Flume自带有六种拦截器,分别为时间拦截器、主机拦截器、UUID拦截器、...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 998
精华内容 399
关键字:

flume拦截器