精华内容
下载资源
问答
  • flume数据采集

    2017-10-21 13:15:20
    从Hadoop的业务开发流程图中可以看出,在大数据的业务处理过程中,对于数据采集是十分重要的一步,也是不可避免的一步,从而引出我们本文的主角—Flume。本文将围绕Flume的架构、Flume的应用(日志采集)进行详细的...

    在具体介绍本文内容之前,先给大家看一下Hadoop业务的整体开发流程: 
    这里写图片描述 
    从Hadoop的业务开发流程图中可以看出,在大数据的业务处理过程中,对于数据的采集是十分重要的一步,也是不可避免的一步,从而引出我们本文的主角—Flume。本文将围绕Flume的架构、Flume的应用(日志采集)进行详细的介绍。 
    (一)Flume架构介绍 
    1、Flume的概念 
    这里写图片描述 
    flume是分布式的日志收集系统,它将各个服务器中的数据收集起来并送到指定的地方去,比如说送到图中的HDFS,简单来说flume就是收集日志的。 
    2、Event的概念 
    在这里有必要先介绍一下flume中event的相关概念:flume的核心是把数据从数据源(source)收集过来,在将收集到的数据送到指定的目的地(sink)。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,flume在删除自己缓存的数据。 
    在整个数据的传输的过程中,流动的是event,即事务保证是在event级别进行的。那么什么是event呢?—–event将传输的数据进行封装,是flume传输数据的基本单位,如果是文本文件,通常是一行记录,event也是事务的基本单位。event从source,流向channel,再到sink,本身为一个字节数组,并可携带headers(头信息)信息。event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。 
    为了方便大家理解,给出一张event的数据流向图: 
    这里写图片描述 
    一个完整的event包括:event headers、event body、event信息(即文本文件中的单行记录),如下所以: 
    这里写图片描述 
    其中event信息就是flume收集到的日记记录。 
    3、flume架构介绍 
    flume之所以这么神奇,是源于它自身的一个设计,这个设计就是agent,agent本身是一个java进程,运行在日志收集节点—所谓日志收集节点就是服务器节点。 
    agent里面包含3个核心的组件:source—->channel—–>sink,类似生产者、仓库、消费者的架构。 
    source:source组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定义。 
    channel:source组件把数据收集来以后,临时存放在channel中,即channel组件在agent中是专门用来存放临时数据的——对采集到的数据进行简单的缓存,可以存放在memory、jdbc、file等等。 
    sink:sink组件是用于把数据发送到目的地的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、hbase、solr、自定义。 
    4、flume的运行机制 
    flume的核心就是一个agent,这个agent对外有两个进行交互的地方,一个是接受数据的输入——source,一个是数据的输出sink,sink负责将数据发送到外部指定的目的地。source接收到数据之后,将数据发送给channel,chanel作为一个数据缓冲区会临时存放这些数据,随后sink会将channel中的数据发送到指定的地方—-例如HDFS等,注意:只有在sink将channel中的数据成功发送出去之后,channel才会将临时数据进行删除,这种机制保证了数据传输的可靠性与安全性。 
    5、flume的广义用法 
    flume之所以这么神奇—-其原因也在于flume可以支持多级flume的agent,即flume可以前后相继,例如sink可以将数据写到下一个agent的source中,这样的话就可以连成串了,可以整体处理了。flume还支持扇入(fan-in)、扇出(fan-out)。所谓扇入就是source可以接受多个输入,所谓扇出就是sink可以将数据输出多个目的地destination中。 
    这里写图片描述 
    (二)flume应用—日志采集 
    对于flume的原理其实很容易理解,我们更应该掌握flume的具体使用方法,flume提供了大量内置的Source、Channel和Sink类型。而且不同类型的Source、Channel和Sink可以自由组合—–组合方式基于用户设置的配置文件,非常灵活。比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。下面我将用具体的案例详述flume的具体用法。 
    其实flume的用法很简单—-书写一个配置文件,在配置文件当中描述source、channel与sink的具体实现,而后运行一个agent实例,在运行agent实例的过程中会读取配置文件的内容,这样flume就会采集到数据。 
    配置文件的编写原则: 
    1>从整体上描述代理agent中sources、sinks、channels所涉及到的组件

        # Name the components on this agent
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1
    • 1
    • 2
    • 3
    • 4

    2>详细描述agent中每一个source、sink与channel的具体实现:即在描述source的时候,需要 
    指定source到底是什么类型的,即这个source是接受文件的、还是接受http的、还是接受thrift 
    的;对于sink也是同理,需要指定结果是输出到HDFS中,还是Hbase中啊等等;对于channel 
    需要指定是内存啊,还是数据库啊,还是文件啊等等。

        # Describe/configure the source
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 44444
    
        # Describe the sink
        a1.sinks.k1.type = logger
    
        # Use a channel which buffers events in memory
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    3>通过channel将source与sink连接起来

        # Bind the source and sink to the channel
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1
    • 1
    • 2
    • 3

    启动agent的shell操作:

        flume-ng  agent -n a1  -c  ../conf   -f  ../conf/example.file  
        -Dflume.root.logger=DEBUG,console  
    • 1
    • 2

    参数说明: -n 指定agent名称(与配置文件中代理的名字相同) 
    -c 指定flume中配置文件的目录 
    -f 指定配置文件 
    -Dflume.root.logger=DEBUG,console 设置日志等级

    具体案例: 
    案例1: NetCat Source:监听一个指定的网络端口,即只要应用程序向这个端口里面写数据,这个source组件就可以获取到信息。 其中 Sink:logger Channel:memory 
    flume官网中NetCat Source描述:

    Property Name Default     Description
    channels       –     
    type           –     The component type name, needs to be netcat
    bind           –  日志需要发送到的主机名或者Ip地址,该主机运行着netcat类型的source在监听          
    port           –  日志需要发送到的端口号,该端口号要有netcat类型的source在监听      
    • 1
    • 2
    • 3
    • 4
    • 5

    a) 编写配置文件:

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = 192.168.80.80
    a1.sources.r1.port = 44444
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    b) 启动flume agent a1 服务端

    flume-ng  agent -n a1  -c ../conf  -f ../conf/netcat.conf   -Dflume.root.logger=DEBUG,console
    • 1

    c) 使用telnet发送数据

    telnet  192.168.80.80  44444  big data world!(windows中运行的)
    • 1

    d) 在控制台上查看flume收集到的日志数据: 
    这里写图片描述

    案例2:NetCat Source:监听一个指定的网络端口,即只要应用程序向这个端口里面写数据,这个source组件就可以获取到信息。 其中 Sink:hdfs Channel:file (相比于案例1的两个变化) 
    flume官网中HDFS Sink的描述: 
    这里写图片描述 
    a) 编写配置文件:

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = 192.168.80.80
    a1.sources.r1.port = 44444
    
    # Describe the sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutput
    a1.sinks.k1.hdfs.writeFormat = Text
    a1.sinks.k1.hdfs.fileType = DataStream
    a1.sinks.k1.hdfs.rollInterval = 10
    a1.sinks.k1.hdfs.rollSize = 0
    a1.sinks.k1.hdfs.rollCount = 0
    a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    
    # Use a channel which buffers events in file
    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /usr/flume/checkpoint
    a1.channels.c1.dataDirs = /usr/flume/data
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    b) 启动flume agent a1 服务端

    flume-ng  agent -n a1  -c ../conf  -f ../conf/netcat.conf   -Dflume.root.logger=DEBUG,console
    • 1

    c) 使用telnet发送数据

    telnet  192.168.80.80  44444  big data world!(windows中运行的)
    • 1

    d) 在HDFS中查看flume收集到的日志数据: 
    这里写图片描述 
    案例3:Spooling Directory Source:监听一个指定的目录,即只要应用程序向这个指定的目录中添加新的文件,source组件就可以获取到该信息,并解析该文件的内容,然后写入到channle。写入完成后,标记该文件已完成或者删除该文件。其中 Sink:logger Channel:memory 
    flume官网中Spooling Directory Source描述:

    Property Name       Default      Description
    channels              –  
    type                  –          The component type name, needs to be spooldir.
    spoolDir              –          Spooling Directory Source监听的目录
    fileSuffix         .COMPLETED    文件内容写入到channel之后,标记该文件
    deletePolicy       never         文件内容写入到channel之后的删除策略: never or immediate
    fileHeader         false         Whether to add a header storing the absolute path filename.
    ignorePattern      ^$           Regular expression specifying which files to ignore (skip)
    interceptors          –          指定传输中event的head(头信息),常用timestamp
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    Spooling Directory Source的两个注意事项:

    ①If a file is written to after being placed into the spooling directory, Flume will print an error to its log file and stop processing.
    即:拷贝到spool目录下的文件不可以再打开编辑
    ②If a file name is reused at a later time, Flume will print an error to its log file and stop processing.
    即:不能将具有相同文件名字的文件拷贝到这个目录下
    • 1
    • 2
    • 3
    • 4

    a) 编写配置文件:

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /usr/local/datainput
    a1.sources.r1.fileHeader = true
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = timestamp
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    b) 启动flume agent a1 服务端

    flume-ng  agent -n a1  -c ../conf  -f ../conf/spool.conf   -Dflume.root.logger=DEBUG,console
    • 1

    c) 使用cp命令向Spooling Directory 中发送数据

     cp datafile  /usr/local/datainput   (注:datafile中的内容为:big data world!)
    • 1

    d) 在控制台上查看flume收集到的日志数据: 
    这里写图片描述 
    从控制台显示的结果可以看出event的头信息中包含了时间戳信息。 
    同时我们查看一下Spooling Directory中的datafile信息—-文件内容写入到channel之后,该文件被标记了:

    [root@hadoop80 datainput]# ls
    datafile.COMPLETED
    • 1
    • 2

    案例4:Spooling Directory Source:监听一个指定的目录,即只要应用程序向这个指定的目录中添加新的文件,source组件就可以获取到该信息,并解析该文件的内容,然后写入到channle。写入完成后,标记该文件已完成或者删除该文件。 其中 Sink:hdfs Channel:file (相比于案例3的两个变化)

    a) 编写配置文件:

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /usr/local/datainput
    a1.sources.r1.fileHeader = true
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = timestamp
    
    # Describe the sink
    # Describe the sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutput
    a1.sinks.k1.hdfs.writeFormat = Text
    a1.sinks.k1.hdfs.fileType = DataStream
    a1.sinks.k1.hdfs.rollInterval = 10
    a1.sinks.k1.hdfs.rollSize = 0
    a1.sinks.k1.hdfs.rollCount = 0
    a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    
    # Use a channel which buffers events in file
    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /usr/flume/checkpoint
    a1.channels.c1.dataDirs = /usr/flume/data
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    b) 启动flume agent a1 服务端

    flume-ng  agent -n a1  -c ../conf  -f ../conf/spool.conf   -Dflume.root.logger=DEBUG,console
    • 1

    c) 使用cp命令向Spooling Directory 中发送数据

     cp datafile  /usr/local/datainput   (注:datafile中的内容为:big data world!)
    • 1

    d) 在控制台上可以参看sink的运行进度日志: 
    这里写图片描述 
    d) 在HDFS中查看flume收集到的日志数据: 
    这里写图片描述 
    这里写图片描述 
    从案例1与案例2、案例3与案例4的对比中我们可以发现:flume的配置文件在编写的过程中是非常灵活的。

    案例5:Exec Source:监听一个指定的命令,获取一条命令的结果作为它的数据源 
    常用的是tail -F file指令,即只要应用程序向日志(文件)里面写数据,source组件就可以获取到日志(文件)中最新的内容 。 其中 Sink:hdfs Channel:file 
    这个案列为了方便显示Exec Source的运行效果,结合Hive中的external table进行来说明。

    a) 编写配置文件:

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /usr/local/log.file
    
    # Describe the sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutput
    a1.sinks.k1.hdfs.writeFormat = Text
    a1.sinks.k1.hdfs.fileType = DataStream
    a1.sinks.k1.hdfs.rollInterval = 10
    a1.sinks.k1.hdfs.rollSize = 0
    a1.sinks.k1.hdfs.rollCount = 0
    a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    
    # Use a channel which buffers events in file
    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /usr/flume/checkpoint
    a1.channels.c1.dataDirs = /usr/flume/data
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    b)在hive中建立外部表—–hdfs://hadoop80:9000/dataoutput的目录,方便查看日志捕获内容

    hive> create external table t1(infor  string)
        > row format delimited
        > fields terminated by '\t'
        > location '/dataoutput/';
    OK
    Time taken: 0.284 seconds
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    c) 启动flume agent a1 服务端

    flume-ng  agent -n a1  -c ../conf  -f ../conf/exec.conf   -Dflume.root.logger=DEBUG,console
    • 1

    d) 使用echo命令向/usr/local/datainput 中发送数据

     echo  big data > log.file
    • 1

    d) 在HDFS和Hive分别中查看flume收集到的日志数据: 
    这里写图片描述

    hive> select * from t1;
    OK
    big data
    Time taken: 0.086 seconds
    • 1
    • 2
    • 3
    • 4

    e)使用echo命令向/usr/local/datainput 中在追加一条数据

    echo big data world! >> log.file
    • 1

    d) 在HDFS和Hive再次分别中查看flume收集到的日志数据: 
    这里写图片描述 
    这里写图片描述

    hive> select * from t1;
    OK
    big data
    big data world!
    Time taken: 0.511 seconds
    • 1
    • 2
    • 3
    • 4
    • 5

    总结Exec source:Exec source和Spooling Directory Source是两种常用的日志采集的方式,其中Exec source可以实现对日志的实时采集,Spooling Directory Source在对日志的实时采集上稍有欠缺,尽管Exec source可以实现对日志的实时采集,但是当Flume不运行或者指令执行出错时,Exec source将无法收集到日志数据,日志会出现丢失,从而无法保证收集日志的完整性。

    案例6:Avro Source:监听一个指定的Avro 端口,通过Avro 端口可以获取到Avro client发送过来的文件 。即只要应用程序通过Avro 端口发送文件,source组件就可以获取到该文件中的内容。 其中 Sink:hdfs Channel:file 
    (注:Avro和Thrift都是一些序列化的网络端口–通过这些网络端口可以接受或者发送信息,Avro可以发送一个给定的文件给Flume,Avro 源使用AVRO RPC机制) 
    Avro Source运行原理如下图: 
    这里写图片描述 
    flume官网中Avro Source的描述:

    Property     Name   Default Description
    channels      –  
    type          –     The component type name, needs to be avro
    bind          –     日志需要发送到的主机名或者ip,该主机运行着ARVO类型的source
    port          –     日志需要发送到的端口号,该端口要有ARVO类型的source在监听
    • 1
    • 2
    • 3
    • 4
    • 5

    1)编写配置文件

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = avro
    a1.sources.r1.bind = 192.168.80.80
    a1.sources.r1.port = 4141
    
    # Describe the sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutput
    a1.sinks.k1.hdfs.writeFormat = Text
    a1.sinks.k1.hdfs.fileType = DataStream
    a1.sinks.k1.hdfs.rollInterval = 10
    a1.sinks.k1.hdfs.rollSize = 0
    a1.sinks.k1.hdfs.rollCount = 0
    a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    
    # Use a channel which buffers events in file
    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /usr/flume/checkpoint
    a1.channels.c1.dataDirs = /usr/flume/data
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    b) 启动flume agent a1 服务端

    flume-ng  agent -n a1  -c ../conf  -f ../conf/avro.conf   -Dflume.root.logger=DEBUG,console
    • 1

    c)使用avro-client发送文件

    flume-ng avro-client -c  ../conf  -H 192.168.80.80  -p 4141 -F /usr/local/log.file
    • 1

    注:log.file文件中的内容为:

    [root@hadoop80 local]# more log.file
    big data
    big data world!
    • 1
    • 2
    • 3

    d) 在HDFS中查看flume收集到的日志数据: 
    这里写图片描述 
    这里写图片描述 
    这里写图片描述

    通过上面的几个案例,我们可以发现:flume配置文件的书写是相当灵活的—-不同类型的Source、Channel和Sink可以自由组合!

    最后对上面用的几个flume source进行适当总结: 
    ① NetCat Source:监听一个指定的网络端口,即只要应用程序向这个端口里面写数据,这个source组件 
    就可以获取到信息。 
    ②Spooling Directory Source:监听一个指定的目录,即只要应用程序向这个指定的目录中添加新的文 
    件,source组件就可以获取到该信息,并解析该文件的内容,然后写入到channle。写入完成后,标记 
    该文件已完成或者删除该文件。 
    ③Exec Source:监听一个指定的命令,获取一条命令的结果作为它的数据源 
    常用的是tail -F file指令,即只要应用程序向日志(文件)里面写数据,source组件就可以获取到日志(文件)中最新的内容 。 
    ④Avro Source:监听一个指定的Avro 端口,通过Avro 端口可以获取到Avro client发送过来的文件 。即只要应用程序通过Avro 端口发送文件,source组件就可以获取到该文件中的内容。

    如有问题,欢迎留言指正!

    展开全文
  • flume 数据采集

    2017-12-29 21:00:07
    在大数据的业务处理过程中,Flume主要负责数据采集。   2、Flume架构介绍    flume是分布式的日志收集系统,它将各个服务器中的数据收集起来并送到指定的地方去,比如说送到图中的HDFS,简单来说flume就是收集...

    1、flume在大数据业务中的角色

    Hadoop业务的整体开发流程: 
     

    在大数据的业务处理过程中,Flume主要负责数据的采集。

     

    2、Flume架构介绍 


     
    flume是分布式的日志收集系统,它将各个服务器中的数据收集起来并送到指定的地方去,比如说送到图中的HDFS,简单来说flume就是收集日志的。 
    2、Event 
    在这里有必要先介绍一下flume中event的相关概念:flume的核心是把数据从数据源(source)收集过来,在将收集到的数据送到指定的目的地(sink)。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,flume在删除自己缓存的数据。 
    在整个数据的传输的过程中,流动的是event,即事务保证是在event级别进行的。那么什么是event呢?—–event将传输的数据进行封装,是flume传输数据的基本单位,如果是文本文件,通常是一行记录,event也是事务的基本单位。event从source,流向channel,再到sink,本身为一个字节数组,并可携带headers(头信息)信息。event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。 
    为了方便大家理解,给出一张event的数据流向图: 


     
    一个完整的event包括:event headers、event body、event信息(即文本文件中的单行记录),如下所以: 
     其中event信息就是flume收集到的日记记录。 

    3、flume组件介绍 

    flume之所以这么神奇,是源于它自身的一个设计,这个设计就是agent,agent本身是一个java进程,运行在日志收集节点—所谓日志收集节点就是服务器节点。 
    agent里面包含3个核心的组件:source—->channel—–>sink,类似生产者、仓库、消费者的架构。 
    source:source组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spoolingdirectory、netcat、sequencegenerator、syslog、http、legacy、自定义。 

    channel:source组件把数据收集来以后,临时存放在channel中,即channel组件在agent中是专门用来存放临时数据的——对采集到的数据进行简单的缓存,可以存放在memory、jdbc、file等等。 

    sink:sink组件是用于把数据发送到目的地的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、hbase、solr、自定义。 

    4、flume的运行机制 

    flume的核心就是一个agent,这个agent对外有两个进行交互的地方,一个是接受数据的输入——source,一个是数据的输出sink,sink负责将数据发送到外部指定的目的地。source接收到数据之后,将数据发送给channel,chanel作为一个数据缓冲区会临时存放这些数据,随后sink会将channel中的数据发送到指定的地方—-例如HDFS等,注意:只有在sink将channel中的数据成功发送出去之后,channel才会将临时数据进行删除,这种机制保证了数据传输的可靠性与安全性。 

    5、flume的用法 

    flume之所以这么神奇—-其原因也在于flume可以支持多级flume的agent,即flume可以前后相继,例如sink可以将数据写到下一个agent的source中,这样的话就可以连成串了,可以整体处理了。flume还支持扇入(fan-in)、扇出(fan-out)。所谓扇入就是source可以接受多个输入,所谓扇出就是sink可以将数据输出多个目的地destination中。 

    对于flume的原理其实很容易理解,我们更应该掌握flume的具体使用方法,flume提供了大量内置的Source、Channel和Sink类型。而且不同类型的Source、Channel和Sink可以自由组合—–组合方式基于用户设置的配置文件,非常灵活。比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。
     


     

    案例一:监控端口数据

    目标:Flume监控一端Console,另一端Console发送消息,使被监控端实时显示。

    分步实现:

    1) 创建Flume Agent配置文件flume-telnet.conf

    # Name the components on this agent

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

     

    # Describe/configure the source

    a1.sources.r1.type = netcat

    a1.sources.r1.bind = localhost

    a1.sources.r1.port = 44444

     

    # Describe the sink

    a1.sinks.k1.type = logger

     

    # Use a channel which buffers events in memory

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

     

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    2) 判断44444端口是否被占用

    $ netstat -tunlp | grep 44444

    3) 先开启flume先听端口

    $ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-telnet.conf -Dflume.root.logger==INFO,console

    4) 使用telnet工具向本机的44444端口发送内容

    $ telnet localhost 44444

    案例二:实时读取本地文件到HDFS

    目标:实时监控hive日志,并上传到HDFS中

    分步实现:

    1) 拷贝Hadoop相关jar到Flume的lib目录下(要学会根据自己的目录和版本查找jar包)

    $ cp share/hadoop/common/lib/hadoop-auth-2.5.0-cdh5.3.6.jar ./lib/

    $ cp share/hadoop/common/lib/commons-configuration-1.6.jar ./lib/

    $ cp share/hadoop/mapreduce1/lib/hadoop-hdfs-2.5.0-cdh5.3.6.jar ./lib/

    $ cp share/hadoop/common/hadoop-common-2.5.0-cdh5.3.6.jar ./lib/

    $ cp ./share/hadoop/hdfs/lib/htrace-core-3.1.0-incubating.jar ./lib/

    $ cp ./share/hadoop/hdfs/lib/commons-io-2.4.jar ./lib/

    尖叫提示:标红的jar为1.99版本flume必须引用的jar

    2) 创建flume-hdfs.conf文件

    # Name the components on this agent

    a2.sources = r2

    a2.sinks = k2

    a2.channels = c2

    # Describe/configure the source

    a2.sources.r2.type = exec

    a2.sources.r2.command = tail -F /home/admin/modules/apache-hive-1.2.2-bin/hive.log

    a2.sources.r2.shell = /bin/bash -c

     

    # Describe the sink

    a2.sinks.k2.type = hdfs

    a2.sinks.k2.hdfs.path = hdfs://linux01:8020/flume/%Y%m%d/%H

    #上传文件的前缀

    a2.sinks.k2.hdfs.filePrefix = logs-

    #是否按照时间滚动文件夹

    a2.sinks.k2.hdfs.round = true

    #多少时间单位创建一个新的文件夹

    a2.sinks.k2.hdfs.roundValue = 1

    #重新定义时间单位

    a2.sinks.k2.hdfs.roundUnit = hour

    #是否使用本地时间戳

    a2.sinks.k2.hdfs.useLocalTimeStamp = true

    #积攒多少个Event才flush到HDFS一次

    a2.sinks.k2.hdfs.batchSize = 1000

    #设置文件类型,可支持压缩

    a2.sinks.k2.hdfs.fileType = DataStream

    #多久生成一个新的文件

    a2.sinks.k2.hdfs.rollInterval = 600

    #设置每个文件的滚动大小

    a2.sinks.k2.hdfs.rollSize = 134217700

    #文件的滚动与Event数量无关

    a2.sinks.k2.hdfs.rollCount = 0

    #最小冗余数

    a2.sinks.k2.hdfs.minBlockReplicas = 1

     

    # Use a channel which buffers events in memory

    a2.channels.c2.type = memory

    a2.channels.c2.capacity = 1000

    a2.channels.c2.transactionCapacity = 100

     

    # Bind the source and sink to the channel

    a2.sources.r2.channels = c2

    a2.sinks.k2.channel = c2

     

    3) 执行监控配置

    $ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-hdfs.conf

    案例三:实时读取目录文件到HDFS

    目标:使用flume监听整个目录的文件

    分步实现

    1) 创建配置文件flume-dir.conf

    a3.sources = r3

    a3.sinks = k3

    a3.channels = c3

     

    # Describe/configure the source

    a3.sources.r3.type = spooldir

    a3.sources.r3.spoolDir = /home/admin/modules/apache-flume-1.7.0-bin/upload

    a3.sources.r3.fileSuffix = .COMPLETED

    a3.sources.r3.fileHeader = true

    #忽略所有以.tmp结尾的文件,不上传

    a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

     

    # Describe the sink

    a3.sinks.k3.type = hdfs

    a3.sinks.k3.hdfs.path = hdfs://linux01:8020/flume/upload/%Y%m%d/%H

    #上传文件的前缀

    a3.sinks.k3.hdfs.filePrefix = upload-

    #是否按照时间滚动文件夹

    a3.sinks.k3.hdfs.round = true

    #多少时间单位创建一个新的文件夹

    a3.sinks.k3.hdfs.roundValue = 1

    #重新定义时间单位

    a3.sinks.k3.hdfs.roundUnit = hour

    #是否使用本地时间戳

    a3.sinks.k3.hdfs.useLocalTimeStamp = true

    #积攒多少个Event才flush到HDFS一次

    a3.sinks.k3.hdfs.batchSize = 100

    #设置文件类型,可支持压缩

    a3.sinks.k3.hdfs.fileType = DataStream

    #多久生成一个新的文件

    a3.sinks.k3.hdfs.rollInterval = 600

    #设置每个文件的滚动大小大概是128M

    a3.sinks.k3.hdfs.rollSize = 134217700

    #文件的滚动与Event数量无关

    a3.sinks.k3.hdfs.rollCount = 0

    #最小冗余数

    a3.sinks.k3.hdfs.minBlockReplicas = 1

     

    # Use a channel which buffers events in memory

    a3.channels.c3.type = memory

    a3.channels.c3.capacity = 1000

    a3.channels.c3.transactionCapacity = 100

     

    # Bind the source and sink to the channel

    a3.sources.r3.channels = c3

    a3.sinks.k3.channel = c3

     

    2) 执行测试:执行如下脚本后,请向upload文件夹中添加文件试试

    $ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir.conf

    尖叫提示: 在使用SpoolingDirectory Source时

    1) 不要在监控目录中创建并持续修改文件

    2) 上传完成的文件会以.COMPLETED结尾

    3) 被监控文件夹每600毫秒扫描一次文件变动

     

     

    案例4:监听一个指定的Avro 端口

    1)编写配置文件

    # Namethe components on this agent

    a1.sources= r1

    a1.sinks =k1

    a1.channels= c1

     

    #Describe/configure the source

    a1.sources.r1.type= avro

    a1.sources.r1.bind= 192.168.80.80

    a1.sources.r1.port= 4141

     

    #Describe the sink

    a1.sinks.k1.type= hdfs

    a1.sinks.k1.hdfs.path= hdfs://hadoop80:9000/dataoutput

    a1.sinks.k1.hdfs.writeFormat= Text

    a1.sinks.k1.hdfs.fileType= DataStream

    a1.sinks.k1.hdfs.rollInterval= 10

    a1.sinks.k1.hdfs.rollSize= 0

    a1.sinks.k1.hdfs.rollCount= 0

    a1.sinks.k1.hdfs.filePrefix= %Y-%m-%d-%H-%M-%S

    a1.sinks.k1.hdfs.useLocalTimeStamp= true

     

    # Use achannel which buffers events in file

    a1.channels.c1.type= file

    a1.channels.c1.checkpointDir= /usr/flume/checkpoint

    a1.channels.c1.dataDirs= /usr/flume/data

     

    # Bindthe source and sink to the channel

    a1.sources.r1.channels= c1

    a1.sinks.k1.channel= c1

    2) 启动flume agent a1服务端

    flume-ng agent -n a1  -c ../conf  -f ../conf/avro.conf   -Dflume.root.logger=DEBUG,console

    3)使用avro-client发送文件

    flume-ng avro-client -c  ../conf -H 192.168.80.80  -p 4141 -F/usr/local/log.file

     

    案例五:Flume与Flume之间数据传递:单Flume多Channel、Sink,


    目标:使用flume-1监控文件变动,flume-1将变动内容传递给flume-2,flume-2负责存储到HDFS。同时flume-1将变动内容传递给flume-3,flume-3负责输出到。

    local filesystem。

    分步实现:

    1) 创建flume-1.conf,用于监控hive.log文件的变动,同时产生两个channel和两个sink分别输送给flume-2和flume3:

    # Name the components on this agent

    a1.sources = r1

    a1.sinks = k1 k2

    a1.channels = c1 c2

    # 将数据流复制给多个channel

    a1.sources.r1.selector.type = replicating

     

    # Describe/configure the source

    a1.sources.r1.type = exec

    a1.sources.r1.command = tail -F /home/admin/modules/apache-hive-1.2.2-bin/hive.log

    a1.sources.r1.shell = /bin/bash -c

     

    # Describe the sink

    a1.sinks.k1.type = avro

    a1.sinks.k1.hostname = linux01

    a1.sinks.k1.port = 4141

     

    a1.sinks.k2.type = avro

    a1.sinks.k2.hostname = linux01

    a1.sinks.k2.port = 4142

     

    # Describe the channel

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

     

    a1.channels.c2.type = memory

    a1.channels.c2.capacity = 1000

    a1.channels.c2.transactionCapacity = 100

     

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1 c2

    a1.sinks.k1.channel = c1

    a1.sinks.k2.channel = c2

     

    2) 创建flume-2.conf,用于接收flume-1的event,同时产生1个channel和1个sink,将数据输送给hdfs:

    # Name the components on this agent

    a2.sources = r1

    a2.sinks = k1

    a2.channels = c1

     

    # Describe/configure the source

    a2.sources.r1.type = avro

    a2.sources.r1.bind = linux01

    a2.sources.r1.port = 4141

     

    # Describe the sink

    a2.sinks.k1.type = hdfs

    a2.sinks.k1.hdfs.path = hdfs://linux01:8020/flume2/%Y%m%d/%H

    #上传文件的前缀

    a2.sinks.k1.hdfs.filePrefix = flume2-

    #是否按照时间滚动文件夹

    a2.sinks.k1.hdfs.round = true

    #多少时间单位创建一个新的文件夹

    a2.sinks.k1.hdfs.roundValue = 1

    #重新定义时间单位

    a2.sinks.k1.hdfs.roundUnit = hour

    #是否使用本地时间戳

    a2.sinks.k1.hdfs.useLocalTimeStamp = true

    #积攒多少个Event才flush到HDFS一次

    a2.sinks.k1.hdfs.batchSize = 100

    #设置文件类型,可支持压缩

    a2.sinks.k1.hdfs.fileType = DataStream

    #多久生成一个新的文件

    a2.sinks.k1.hdfs.rollInterval = 600

    #设置每个文件的滚动大小大概是128M

    a2.sinks.k1.hdfs.rollSize = 134217700

    #文件的滚动与Event数量无关

    a2.sinks.k1.hdfs.rollCount = 0

    #最小冗余数

    a2.sinks.k1.hdfs.minBlockReplicas = 1

     

     

    # Describe the channel

    a2.channels.c1.type = memory

    a2.channels.c1.capacity = 1000

    a2.channels.c1.transactionCapacity = 100

     

    # Bind the source and sink to the channel

    a2.sources.r1.channels = c1

    a2.sinks.k1.channel = c1

     

    3) 创建flume-3.conf,用于接收flume-1的event,同时产生1个channel和1个sink,将数据输送给本地目录:

    # Name the components on this agent

    a3.sources = r1

    a3.sinks = k1

    a3.channels = c1

     

    # Describe/configure the source

    a3.sources.r1.type = avro

    a3.sources.r1.bind = linux01

    a3.sources.r1.port = 4142

     

    # Describe the sink

    a3.sinks.k1.type = file_roll

    a3.sinks.k1.sink.directory = /home/admin/Desktop/flume3

     

    # Describe the channel

    a3.channels.c1.type = memory

    a3.channels.c1.capacity = 1000

    a3.channels.c1.transactionCapacity = 100

     

    # Bind the source and sink to the channel

    a3.sources.r1.channels = c1

    a3.sinks.k1.channel = c1

    尖叫提示:输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录。

    4) 执行测试:分别开启对应flume-job(依次启动flume-3,flume-2,flume-1),同时产生文件变动并观察结果:

    $ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group-job1/flume-3.conf

    $ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group-job1/flume-2.conf

    $ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group-job1/flume-1.conf

     

    案例六:Flume与Flume之间数据传递,多Flume汇总数据到单Flume


    目标:flume-1监控文件hive.log,flume-2监控某一个端口的数据流,flume-1与flume-2将数据发送给flume-3,flume3将最终数据写入到HDFS。

    分步实现:

    1) 创建flume-1.conf,用于监控hive.log文件,同时sink数据到flume-3:

    # Name the components on this agent

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

     

    # Describe/configure the source

    a1.sources.r1.type = exec

    a1.sources.r1.command = tail -F /home/admin/modules/apache-hive-1.2.2-bin/hive.log

    a1.sources.r1.shell = /bin/bash -c

     

    # Describe the sink

    a1.sinks.k1.type = avro

    a1.sinks.k1.hostname = linux01

    a1.sinks.k1.port = 4141

     

    # Describe the channel

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

     

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

     

    2) 创建flume-2.conf,用于监控端口44444数据流,同时sink数据到flume-3:

    # Name the components on this agent

    a2.sources = r1

    a2.sinks = k1

    a2.channels = c1

     

    # Describe/configure the source

    a2.sources.r1.type = netcat

    a2.sources.r1.bind = linux01

    a2.sources.r1.port = 44444

     

    # Describe the sink

    a2.sinks.k1.type = avro

    a2.sinks.k1.hostname = linux01

    a2.sinks.k1.port = 4141

     

    # Use a channel which buffers events in memory

    a2.channels.c1.type = memory

    a2.channels.c1.capacity = 1000

    a2.channels.c1.transactionCapacity = 100

     

    # Bind the source and sink to the channel

    a2.sources.r1.channels = c1

    a2.sinks.k1.channel = c1

     

    3) 创建flume-3.conf,用于接收flume-1与flume-2发送过来的数据流,最终合并后sink到HDFS:

    # Name the components on this agent

    a3.sources = r1

    a3.sinks = k1

    a3.channels = c1

     

    # Describe/configure the source

    a3.sources.r1.type = avro

    a3.sources.r1.bind = linux01

    a3.sources.r1.port = 4141

     

    # Describe the sink

    a3.sinks.k1.type = hdfs

    a3.sinks.k1.hdfs.path = hdfs://linux01:8020/flume3/%Y%m%d/%H

    #上传文件的前缀

    a3.sinks.k1.hdfs.filePrefix = flume3-

    #是否按照时间滚动文件夹

    a3.sinks.k1.hdfs.round = true

    #多少时间单位创建一个新的文件夹

    a3.sinks.k1.hdfs.roundValue = 1

    #重新定义时间单位

    a3.sinks.k1.hdfs.roundUnit = hour

    #是否使用本地时间戳

    a3.sinks.k1.hdfs.useLocalTimeStamp = true

    #积攒多少个Event才flush到HDFS一次

    a3.sinks.k1.hdfs.batchSize = 100

    #设置文件类型,可支持压缩

    a3.sinks.k1.hdfs.fileType = DataStream

    #多久生成一个新的文件

    a3.sinks.k1.hdfs.rollInterval = 600

    #设置每个文件的滚动大小大概是128M

    a3.sinks.k1.hdfs.rollSize = 134217700

    #文件的滚动与Event数量无关

    a3.sinks.k1.hdfs.rollCount = 0

    #最小冗余数

    a3.sinks.k1.hdfs.minBlockReplicas = 1

     

    # Describe the channel

    a3.channels.c1.type = memory

    a3.channels.c1.capacity = 1000

    a3.channels.c1.transactionCapacity = 100

     

    # Bind the source and sink to the channel

    a3.sources.r1.channels = c1

    a3.sinks.k1.channel = c1

     

    4) 执行测试:分别开启对应flume-job(依次启动flume-3,flume-2,flume-1),同时产生文件变动并观察结果:

    $ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group-job2/flume-3.conf

    $ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group-job2/flume-2.conf

    $ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group-job2/flume-1.conf

    尖叫提示:测试时记得启动hive产生一些日志,同时使用telnet向44444端口发送内容,如:

    $ bin/hive

    $ telnet linux01 44444

     

    总结:

    通过上面的几个案例,我们可以发现:flume配置文件的书写是相当灵活的—-不同类型的Source、Channel和Sink可以自由组合!

    最后对上面用的几个flume source进行适当总结: 
    ① NetCat Source:监听一个指定的网络端口,即只要应用程序向这个端口里面写数据,这个source组件 
    就可以获取到信息。 
    ②Spooling Directory Source:监听一个指定的目录,即只要应用程序向这个指定的目录中添加新的文 
    件,source组件就可以获取到该信息,并解析该文件的内容,然后写入到channle。写入完成后,标记 
    该文件已完成或者删除该文件。 
    ③Exec Source:监听一个指定的命令,获取一条命令的结果作为它的数据源 
    常用的是tail -F file指令,即只要应用程序向日志(文件)里面写数据,source组件就可以获取到日志(文件)中最新的内容 。 
    ④Avro Source:监听一个指定的Avro 端口,通过Avro端口可以获取到Avro client发送过来的文件 。即只要应用程序通过Avro 端口发送文件,source组件就可以获取到该文件中的内容。

     

    展开全文
  • Flume数据采集

    千次阅读 2018-06-12 15:42:16
    flume自带很长多的source,如:exe、kafka......常用的使用场景:对于有些应用环境中,不能部署Flume SDK及其依赖项,可以在代码中通过HTTP而不是Flume的PRC发送数据的情况,此时HTTP SOURCE可以用来将数据接收到Fl...

    flume自带很长多的source,如:exe、kafka...其中有一个非常简单的source——httpsource,使用httpSource,flume启动后会拉起一个web服务来监听指定的ip和port。常用的使用场景:对于有些应用环境中,不能部署Flume SDK及其依赖项,可以在代码中通过HTTP而不是Flume的PRC发送数据的情况,此时HTTP SOURCE可以用来将数据接收到Flume中。

    1、httpsource 参数:

    配置参数默认值描述
    type
    http (org.apache.fluem.source.httpSource)
    bind
    绑定的IP地址或主机名
    port
    绑定的端口号
    enableSSLfalse
    keystore
    使用的keystore文件的路径
    keystorePassword
    能够进入keystore的密码
    handlerJSONHandlerHTTP SOURCE使用的处理程序类
    handler.*
    传给处理程序类的任何参数 可以 通过使用此参数(*)配置传入

    1)handler:

    Flume使用一个可插拔的“handler”程序来实现转换,如果不指定默认是:JSONHandler,它能处理JSON格式的事件,格式如下。此外用户可以自定义handler,必须实现HTTPSourceHandler接口。

    json数据格式:

    [html] view plain copy
    1. [ { "headers":{"":"","":""  
    2.                  },  
    3.      "body":"the first event"  
    4.    },  
    5.    { "headers":{"":"","":""  
    6.                  },  
    7.      "body":"the second event"  
    8.    }  
    9.      
    10. ]  

    2、简单介绍一下flume的logger sink:

    记录INFO级别的日志,一般用于调试。本文将使用这种类型的sink,配置的属性:

    • type  logger
    • maxBytesToLog    16    Maximum number of bytes of the Event body to log
    注意:要求必须在 --conf 参数指定的目录下有 log4j的配置文件,可以通过-Dflume.root.logger=INFO,console在命令启动时手动指定log4j参数。

    3、应用

    3.1 Flume配置如下:


    3.2 结构说明

    在该场景中,使用三个Flume Agent分别部署在三台Web服务器上,用来采集终端的数据,然后其数据的下沉方式都为发送到另外两个Flume Agent上,然后这两个Flume agent 将数据发送到kafka或者hdfs中

    3.3 配置

    3.3.1 第一层Flume配置

    #########################################################
    ##
    ##主要作用是监听文件中的新增数据,采集到数据之后,输出到avro
    ##    注意:Flume agent的运行,主要就是配置source channel sink
    ##  下面的a1就是agent的代号,source叫r1 channel叫c1 sink叫k1
    #########################################################

    a1.sources  = r1
    a1.channels = c1
    a1.sinks    = k1 k2

    a1.sinkgroups= g1

    a1.sources.r1.type     = http
    a1.sources.r1.port     = 14703
    a1.sources.r1.channels = c1
    a1.sources.r1.charset = UTF-8
    #对于channel的配置描述 使用内存作为数据的临时缓存
    a1.channels.c1.type   = memory
    a1.channels.c1.capacity = 1000000000
    a1.channels.c1.byteCapacityBufferPercentage = 20
    a1.channels.c1.byteCapacity = 10240000000
    #对于sink的配置描述 使用avro日志做数据的消费
    a1.sinks.k1.type=avro
    a1.sinks.k1.channel=c1
    a1.sinks.k1.hostname=lzs01

    a1.sinks.k1.port=44444

    a1.sinks.k2.type=avro
    a1.sinks.k2.channel=c1
    a1.sinks.k2.hostname=lzs02

    a1.sinks.k2.port=44445

    a1.sinkgroups.g1.sinks= k1 k2  
    a1.sinkgroups.g1.processor.type= load_balance  
    a1.sinkgroups.g1.processor.backoff = true
    a1.sinkgroups.g1.processor.selector = round_robin 


    3.3.2第二层Flume配置

    tier1.sources = r1 r2
    tier1.sinks = k1 hdfs1
    tier1.channels = c_k1 c_hdfs1


    #对于source的配置描述 监听avro
    tier1.sources.r1.type = avro
    tier1.sources.r1.bind = 0.0.0.0
    tier1.sources.r1.port = 44444
    tier1.sources.r1.charset = UTF-8


    #对于source的配置描述 监听avro
    tier1.sources.r2.type = avro
    tier1.sources.r2.bind = 0.0.0.0
    tier1.sources.r2.port = 44445
    tier1.sources.r2.charset = UTF-8


    #对于sink的配置描述 使用kafka做数据的消费
    tier1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    tier1.sinks.k1.topic = f-k-s
    tier1.sinks.k1.brokerList = lzs01:9092,lzs02:9092,lzs03:9092
    tier1.sinks.k1.requiredAcks = 1
    tier1.sinks.k1.batchSize = 20
    tier1.sinks.k1.channel = k1


    #对于sink的配置描述 使用日志做数据的消费
    tier1.sinks.hdfs1.type = hdfs
    tier1.sinks.hdfs1.hdfs.path = hdfs://nameservice1/user/flume/cloud/%Y%m%d/
    tier1.sinks.hdfs1.hdfs.filePrefix = stb-%Y-%m-%d-%H
    tier1.sinks.hdfs1.hdfs.useLocalTimeStamp = true 
    tier1.sinks.hdfs1.hdfs.fileSuffix = .log
    tier1.sinks.hdfs1.hdfs.inUseSuffix = .tmp
    tier1.sinks.hdfs1.hdfs.round = true
    tier1.sinks.hdfs1.hdfs.rollInterval = 3600
    tier1.sinks.hdfs1.hdfs.rollCount = 0
    tier1.sinks.hdfs1.hdfs.rollSize = 134217728
    tier1.sinks.hdfs1.hdfs.callTimeout = 60000
    tier1.sinks.hdfs1.hdfs.batchSize = 600
    tier1.sinks.hdfs1.hdfs.idleTimeout = 0
    # 如果希望上面配置的日志文件滚动策略生效,则必须要配置下面这一项
    tier1.sinks.hdfs1.hdfs.minBlockReplicas = 1
    #配置下面两项后,保存到HDFS中的数据才是文本
    #否则通过hdfs dfs -text查看时,显示的是经过压缩的16进制
    tier1.sinks.hdfs1.hdfs.serializer = TEXT
    tier1.sinks.hdfs1.hdfs.fileType = DataStream


    #对于channel的配置描述 使用内存缓冲区域做数据的临时缓存

    tier1.channels.c_k1.type   = memory
    tier1.channels.c_k1.capacity = 1000000000
    tier1.channels.c_k1.byteCapacityBufferPercentage = 20
    tier1.channels.c_k1.byteCapacity = 10240000000

    #对于channel的配置描述 使用文件作为数据的临时缓存
    tier1.channels.c_hdfs1.type= file
    tier1.channels.c_hdfs1.checkpointDir = /data/flume/cloud/filechannel/checkpoint
    tier1.channels.c_hdfs1.dataDirs = /data/flume/cloud/filechannel/data
    tier1.channels.c_hdfs1.useDualCheckpoints = true
    tier1.channels.c_hdfs1.backupCheckpointDir = /data/flume/cloud/filechannel/backupCheckpoint
    tier1.channels.c_hdfs1.checkpointInterval = 300000
    tier1.channels.c_hdfs1.transactionCapacity=600
    tier1.channels.c_hdfs1.capacity = 200000000
    tier1.channels.c_hdfs1.maxFileSize = 2146435071

    #通过channel将source和sink 关联起来
    tier1.sources.r1.channels = c_k1 c_hdfs1
    tier1.sources.r2.channels=c_k1 c_hdfs1
    tier1.sinks.k1.channel = c_k1
    tier1.sinks.hdfs1.channel=c_hdfs1

    3.4 测试

    3.4.1 创建kafka对应的topic

    a.查看topic列表

    bin/kafka-topics --list --zookeeper lzs01:2181,lzs02:2181,lzs03:2181/kafka

    b.创建topic

     bin/kafka-topics --zookeeper lzs01:2181,lzs02:2181,lzs03:2181/kafka --create --topic f-k-s  --partitions 3 --replication-factor 3

    c.启动消费

    bin/kafka-console-consumer --topic f-k-s  --bootstrap-server lzs01:9092,lzs02:9092,lzs03:9092 

    3.4.2 启动第二层的flume

    3.4.3 启动第一层的flume

     bin/flume-ng agent -c conf/ -f conf/flume-httparvo.conf -n a1  -Dflume.root.logger=INFO,console

    3.4.4 发送数据

    curl -X POST -d'[{"headers":{"h1":"v1","h2":"v2"},"body":"hello body"}]'  http://192.168.1.102:14703

    3.4.5查看kafka和hdfs对应的目录是否有数据


    展开全文
  • 一,整体流程1,首先来一份流程图2,照着这个流程图我们来进入今天讨论的内容,'数据同步模块'二,数据同步流程1,使用Flume完成数据采集的后半部分,即Kafka数据到Hadoop平台的落地三,同步配置1,版本信息基础构建...

    一,整体流程

    1,首先来一份流程图

    b5f022aabf9221dd89fc3db7771f92d6.png

    2,照着这个流程图我们来进入今天讨论的内容,'数据同步模块'

    二,数据同步流程

    29b638b77fd12102ee1ff4c7311228dd.png

    1,使用Flume完成数据采集的后半部分,即Kafka数据到Hadoop平台的落地

    三,同步配置

    1,版本信息

    基础构建CDH5.16.1,(推荐使用Flume之前升级到1.7及之后版本,5.16默认版本为1.6,本次使用1.6)

    2,Flume配置信息

    a1.sources=source_from_kafka

    a1.channels=mem_channel

    a1.sinks=hdfs_sink

    #kafka为source的配置

    a1.sources.source_from_kafka.type=org.apache.flume.source.kafka.KafkaSource

    a1.sources.source_from_kafka.batchSize=10

    a1.sources.source_from_kafka.kafka.bootstrap.servers=node01:9092,node02:9092,node03:9093

    a1.sources.source_from_kafka.topic=AdRealTimeLog

    a1.sources.source_from_kafka.channels=mem_channel

    a1.sources.source_from_kafka.consumer.timeout.ms=1000

    #hdfs为sink的配置

    # The channel can be defined as follows.

    a1.sinks.hdfs_sink.type=hdfs

    # 指定sink需要使用的channel的名字,注意这里是channel

    #Specify the channel the sink should use

    a1.sinks.hdfs_sink.channel=mem_channel

    #a1.sinks.hdfs_sink.filePrefix=%{host}

    a1.sinks.hdfs_sink.hdfs.path=hdfs://node03:8020/flume_sink/ds=%Y%m%d

    #File size to trigger roll, in bytes (0: never roll based on file size)

    a1.sinks.hdfs_sink.hdfs.rollSize=0

    #Number of events written to file before it rolled (0 = never roll based on number of events)

    a1.sinks.hdfs_sink.hdfs.rollCount=0

    a1.sinks.hdfs_sink.hdfs.rollInterval=3600

    a1.sinks.hdfs_sink.hdfs.threadsPoolSize=30

    #a1.sinks.hdfs_sink.hdfs.codeC=gzip

    #a1.sinks.hdfs_sink.hdfs.fileType=CompressedStream

    a1.sinks.hdfs_sink.hdfs.fileType=DataStream

    a1.sinks.hdfs_sink.hdfs.writeFormat=Text

    a1.sinks.hdfs_sink.hdfs.minBlockReplicas=1

    #channel的配置

    a1.channels.mem_channel.type=memory

    a1.channels.mem_channel.capacity=1000

    a1.channels.mem_channel.transactionCapacity=100

    #配置对应关系

    a1.sources.source_from_kafka.channels=mem_channel

    a1.sinks.hive_sink.channel=mem_channel

    3,启动测试

    flume-ng agent -n a1 -c /opt/flumeCon -f kafka2hdfs.conf -Dflume.root.logger=INFO,console

    4,查看存储到hdfs上面的文件

    c46beeedf69c1d8245b8c43922f93679.png

    ,后续操作

    1,数据按天导入hive分区表,后续数据抽取等等,后补
    展开全文
  • flume数据采集流程

    2020-12-11 17:25:36
    flume 流式日志采集工具 重点:流式数据 Flume功能 flume架构 基础框架
  • 无论数据来自什么企业,或是多大量级,通过部署Flume,可以确保数据都安全、 及时地到达大数据平台,用户可以将精力集中在如何洞悉数据上。...简单来说:Flume是实时采集日志的数据采集引擎。Flume架构.png三个重要...
  • flume进行数据采集,在采集端增加过滤
  • Flume 性能优化 和关键问题汇总 Flume 数据采集系统 性能优化和关键问题汇总 Flume 数据采集系统 性能优化和关键问题汇总
  • Flume 数据采集组件

    千次阅读 2019-01-04 05:44:10
    目录 1、数据收集工具/系统产生背景 2、专业的数据收集工具 2.1、Chukwa 2.2、Scribe 2.3、Fluentd ...3.3、Flume数据源和输出方式 4、Flume体系结构/核心组件 4.1、概述 4.2、Flume三大核心...
  • Flume数据采集准备

    2019-06-24 10:43:38
    Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可...
  • flume 数据采集简介

    2019-07-08 23:00:32
    在大数据的业务处理过程中,Flume主要负责数据采集。 2、Flume架构介绍 flume是分布式的日志收集系统,它将各个服务器中的数据收集起来并送到指定的地方去,比如说送到图中的HDFS,简单来说flume就是收集日志的...
  • 五大组件Flume数据采集组件安装和部署 1.Flume的安装 Flume的安装非常简单,只需要上传解压即可 tar -zxvf apache-flume-1.8.0-bin.tar.gz 进入 flume 的目录,修改 conf 下的 flume-env.sh,在里面配置 JAVA_...
  • flume数据采集架构

    2019-05-14 11:14:48
    在日常生产环境中,如果想要做数据采集基本上都要用到flume,现在就记录一下flume在整个项目中的架构。 先简单说一下这个项目,从微信小程序中记录用户数据,项目后台程序使用springBoot编写,部署在服务器上,使用...
  • Flume数据采集组件

    2018-12-19 15:28:55
    其中,数据采集是所有数据系统必不可少的,随着大数据越来越被重视,数据采集的挑战也 变的尤为突出。这其中包括: 数据源多种多样; 数据量大,变化块; 如何保证数据采集的可靠性的性能; 如何避免重复数据...
  • flume数据采集工具

    2021-01-10 20:40:12
    flime运行起来的进程叫agentflume采集系统就是由一个个agent连接起来所形成的一个或简单或复杂的数据传输通道每一个agent都有3个组件Source,channel,sink Source就相当于read(读数据) Channel就相当于缓存数据(为...
  • 今天为大家介绍几款数据采集平台:Apache Flume、Fluentd、Logstash、Chukwa、Scribe、Splunk Forwarder。大数据平台与数据采集任何完整的大数据平台,一般包括以下的几个过程:数据采集数据存储数据处理数据展现...
  • Flume数据采集各种配置详解

    万次阅读 2016-06-07 16:01:37
    Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可...
  • Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构,灵活简单。 Flume最主要的作用就是,实时读取服务器本地磁盘数据,将数据写入到HDFS,也可以将数据...
  • 是新朋友吗?记得先点蓝字关注我哦~ 今日课程菜单Java全栈开发 |Web前端+H5大数据开发 | 数据分析人工智能+Python | 人工智能+物联网来源:...今天为大家介绍几款数据采集平台:Apache Flume、Fluentd、Logstash、...
  • by图灵导言 笔者耗时将近3周的时间将flume相关知识点进行了总结,并带有详细的案例demo,希望能够帮助读者快速上手学习。另考虑到该文章篇幅较长,不适合读者短期阅读,建议读者收藏或者关注公众号后台回复"flume"可...
  • Sqoop Flume 准备实验的环境:准备Oracle数据库 1、实验:前面的实验:Oracle闪回(回收站) 2、自带用户:sh/sh ---> 表:sales订单表(大概92万条数据) ...数据采集的引擎:Sqoop: 采集关系型数据库 Fl...
  • flume数据采集扇入

    2019-07-05 16:02:07
    .conf文件的定义 #1 Agent a3.sources = r3 r4 a3.sinks = k3 a3.channels = c3 #2 source ...2个数据采集到的数据合并上传到bigdata112 hdfs目录下 查看hdfs里面sink进来的具体数据
  • FLume数据采集到kafka

    2018-09-20 22:51:50
    3.2.1 数据采集 思路: a) 配置kafka,启动zookeeper和kafka集群; b) 创建kafka主题; c) 启动kafka控制台消费者(此消费者只用于测试使用); d) 配置flume,监控日志文件; e) 启动flume监控任务; f) ...
  • # 功能:flume采集数据启动、停止 # 参数:Agent (flume的配置文件名) # flume是根据配置文件配置项,启动,采集数据的 # 说明:配置文件写在 $FLUME_HOME/jobs 目录下 Agent=$2 case $1 in "start"){ for i in ...
  • 名词解释埋点其实就是...开发背景我司之前在处理埋点数据采集时,模式很简单,当用户操作页面控件时,前端监听到操作事件,并根据上下文环境,将事件相关的数据通过接口调用发送至埋点数据采集服务(简称ets服务),...
  • 13:flume 数据采集

    2019-05-08 14:14:56
    一、Flume下载 从官网下载,或者在linux中使用下面的命令下载,然后解压 方式1:官网下载地址:http://archive.apache.org/dist/flume/ 二、Flume解压 tar -zxvf/usr/local/cdh-5.13.2-tar/flume-ng-1.6.0-cdh...
  • 下面该数据采集神器Flume出场了。 source 可以接收外部源发送过来的数据。不同的 source,可以接受不同的数据格式。比如有目录池(spooling directory)数据源,可以监控指定文件夹中的新文件变化,如果目录中有文件...
  • by图灵导言 笔者耗时将近3周的时间将flume相关知识点进行了总结,并带有详细的案例demo,希望能够帮助读者快速上手学习。另考虑到该文章篇幅较长,不适合读者短期阅读,建议读者收藏或者关注公众号后台回复"flume"可...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 3,267
精华内容 1,306
关键字:

flume数据采集