-
flume数据采集
2017-10-21 13:15:20从Hadoop的业务开发流程图中可以看出,在大数据的业务处理过程中,对于数据的采集是十分重要的一步,也是不可避免的一步,从而引出我们本文的主角—Flume。本文将围绕Flume的架构、Flume的应用(日志采集)进行详细的...在具体介绍本文内容之前,先给大家看一下Hadoop业务的整体开发流程:
从Hadoop的业务开发流程图中可以看出,在大数据的业务处理过程中,对于数据的采集是十分重要的一步,也是不可避免的一步,从而引出我们本文的主角—Flume。本文将围绕Flume的架构、Flume的应用(日志采集)进行详细的介绍。
(一)Flume架构介绍
1、Flume的概念
flume是分布式的日志收集系统,它将各个服务器中的数据收集起来并送到指定的地方去,比如说送到图中的HDFS,简单来说flume就是收集日志的。
2、Event的概念
在这里有必要先介绍一下flume中event的相关概念:flume的核心是把数据从数据源(source)收集过来,在将收集到的数据送到指定的目的地(sink)。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,flume在删除自己缓存的数据。
在整个数据的传输的过程中,流动的是event,即事务保证是在event级别进行的。那么什么是event呢?—–event将传输的数据进行封装,是flume传输数据的基本单位,如果是文本文件,通常是一行记录,event也是事务的基本单位。event从source,流向channel,再到sink,本身为一个字节数组,并可携带headers(头信息)信息。event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。
为了方便大家理解,给出一张event的数据流向图:
一个完整的event包括:event headers、event body、event信息(即文本文件中的单行记录),如下所以:
其中event信息就是flume收集到的日记记录。
3、flume架构介绍
flume之所以这么神奇,是源于它自身的一个设计,这个设计就是agent,agent本身是一个java进程,运行在日志收集节点—所谓日志收集节点就是服务器节点。
agent里面包含3个核心的组件:source—->channel—–>sink,类似生产者、仓库、消费者的架构。
source:source组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定义。
channel:source组件把数据收集来以后,临时存放在channel中,即channel组件在agent中是专门用来存放临时数据的——对采集到的数据进行简单的缓存,可以存放在memory、jdbc、file等等。
sink:sink组件是用于把数据发送到目的地的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、hbase、solr、自定义。
4、flume的运行机制
flume的核心就是一个agent,这个agent对外有两个进行交互的地方,一个是接受数据的输入——source,一个是数据的输出sink,sink负责将数据发送到外部指定的目的地。source接收到数据之后,将数据发送给channel,chanel作为一个数据缓冲区会临时存放这些数据,随后sink会将channel中的数据发送到指定的地方—-例如HDFS等,注意:只有在sink将channel中的数据成功发送出去之后,channel才会将临时数据进行删除,这种机制保证了数据传输的可靠性与安全性。
5、flume的广义用法
flume之所以这么神奇—-其原因也在于flume可以支持多级flume的agent,即flume可以前后相继,例如sink可以将数据写到下一个agent的source中,这样的话就可以连成串了,可以整体处理了。flume还支持扇入(fan-in)、扇出(fan-out)。所谓扇入就是source可以接受多个输入,所谓扇出就是sink可以将数据输出多个目的地destination中。
(二)flume应用—日志采集
对于flume的原理其实很容易理解,我们更应该掌握flume的具体使用方法,flume提供了大量内置的Source、Channel和Sink类型。而且不同类型的Source、Channel和Sink可以自由组合—–组合方式基于用户设置的配置文件,非常灵活。比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。下面我将用具体的案例详述flume的具体用法。
其实flume的用法很简单—-书写一个配置文件,在配置文件当中描述source、channel与sink的具体实现,而后运行一个agent实例,在运行agent实例的过程中会读取配置文件的内容,这样flume就会采集到数据。
配置文件的编写原则:
1>从整体上描述代理agent中sources、sinks、channels所涉及到的组件# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1
- 1
- 2
- 3
- 4
2>详细描述agent中每一个source、sink与channel的具体实现:即在描述source的时候,需要
指定source到底是什么类型的,即这个source是接受文件的、还是接受http的、还是接受thrift
的;对于sink也是同理,需要指定结果是输出到HDFS中,还是Hbase中啊等等;对于channel
需要指定是内存啊,还是数据库啊,还是文件啊等等。# Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
3>通过channel将source与sink连接起来
# Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- 1
- 2
- 3
启动agent的shell操作:
flume-ng agent -n a1 -c ../conf -f ../conf/example.file -Dflume.root.logger=DEBUG,console
- 1
- 2
参数说明: -n 指定agent名称(与配置文件中代理的名字相同)
-c 指定flume中配置文件的目录
-f 指定配置文件
-Dflume.root.logger=DEBUG,console 设置日志等级具体案例:
案例1: NetCat Source:监听一个指定的网络端口,即只要应用程序向这个端口里面写数据,这个source组件就可以获取到信息。 其中 Sink:logger Channel:memory
flume官网中NetCat Source描述:Property Name Default Description channels – type – The component type name, needs to be netcat bind – 日志需要发送到的主机名或者Ip地址,该主机运行着netcat类型的source在监听 port – 日志需要发送到的端口号,该端口号要有netcat类型的source在监听
- 1
- 2
- 3
- 4
- 5
a) 编写配置文件:
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = 192.168.80.80 a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
b) 启动flume agent a1 服务端
flume-ng agent -n a1 -c ../conf -f ../conf/netcat.conf -Dflume.root.logger=DEBUG,console
- 1
c) 使用telnet发送数据
telnet 192.168.80.80 44444 big data world!(windows中运行的)
- 1
d) 在控制台上查看flume收集到的日志数据:
案例2:NetCat Source:监听一个指定的网络端口,即只要应用程序向这个端口里面写数据,这个source组件就可以获取到信息。 其中 Sink:hdfs Channel:file (相比于案例1的两个变化)
flume官网中HDFS Sink的描述:
a) 编写配置文件:# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = 192.168.80.80 a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutput a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 0 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S a1.sinks.k1.hdfs.useLocalTimeStamp = true # Use a channel which buffers events in file a1.channels.c1.type = file a1.channels.c1.checkpointDir = /usr/flume/checkpoint a1.channels.c1.dataDirs = /usr/flume/data # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
b) 启动flume agent a1 服务端
flume-ng agent -n a1 -c ../conf -f ../conf/netcat.conf -Dflume.root.logger=DEBUG,console
- 1
c) 使用telnet发送数据
telnet 192.168.80.80 44444 big data world!(windows中运行的)
- 1
d) 在HDFS中查看flume收集到的日志数据:
案例3:Spooling Directory Source:监听一个指定的目录,即只要应用程序向这个指定的目录中添加新的文件,source组件就可以获取到该信息,并解析该文件的内容,然后写入到channle。写入完成后,标记该文件已完成或者删除该文件。其中 Sink:logger Channel:memory
flume官网中Spooling Directory Source描述:Property Name Default Description channels – type – The component type name, needs to be spooldir. spoolDir – Spooling Directory Source监听的目录 fileSuffix .COMPLETED 文件内容写入到channel之后,标记该文件 deletePolicy never 文件内容写入到channel之后的删除策略: never or immediate fileHeader false Whether to add a header storing the absolute path filename. ignorePattern ^$ Regular expression specifying which files to ignore (skip) interceptors – 指定传输中event的head(头信息),常用timestamp
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
Spooling Directory Source的两个注意事项:
①If a file is written to after being placed into the spooling directory, Flume will print an error to its log file and stop processing. 即:拷贝到spool目录下的文件不可以再打开编辑 ②If a file name is reused at a later time, Flume will print an error to its log file and stop processing. 即:不能将具有相同文件名字的文件拷贝到这个目录下
- 1
- 2
- 3
- 4
a) 编写配置文件:
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /usr/local/datainput a1.sources.r1.fileHeader = true a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = timestamp # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
b) 启动flume agent a1 服务端
flume-ng agent -n a1 -c ../conf -f ../conf/spool.conf -Dflume.root.logger=DEBUG,console
- 1
c) 使用cp命令向Spooling Directory 中发送数据
cp datafile /usr/local/datainput (注:datafile中的内容为:big data world!)
- 1
d) 在控制台上查看flume收集到的日志数据:
从控制台显示的结果可以看出event的头信息中包含了时间戳信息。
同时我们查看一下Spooling Directory中的datafile信息—-文件内容写入到channel之后,该文件被标记了:[root@hadoop80 datainput]# ls datafile.COMPLETED
- 1
- 2
案例4:Spooling Directory Source:监听一个指定的目录,即只要应用程序向这个指定的目录中添加新的文件,source组件就可以获取到该信息,并解析该文件的内容,然后写入到channle。写入完成后,标记该文件已完成或者删除该文件。 其中 Sink:hdfs Channel:file (相比于案例3的两个变化)
a) 编写配置文件:
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /usr/local/datainput a1.sources.r1.fileHeader = true a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = timestamp # Describe the sink # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutput a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 0 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S a1.sinks.k1.hdfs.useLocalTimeStamp = true # Use a channel which buffers events in file a1.channels.c1.type = file a1.channels.c1.checkpointDir = /usr/flume/checkpoint a1.channels.c1.dataDirs = /usr/flume/data # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
b) 启动flume agent a1 服务端
flume-ng agent -n a1 -c ../conf -f ../conf/spool.conf -Dflume.root.logger=DEBUG,console
- 1
c) 使用cp命令向Spooling Directory 中发送数据
cp datafile /usr/local/datainput (注:datafile中的内容为:big data world!)
- 1
d) 在控制台上可以参看sink的运行进度日志:
d) 在HDFS中查看flume收集到的日志数据:
从案例1与案例2、案例3与案例4的对比中我们可以发现:flume的配置文件在编写的过程中是非常灵活的。案例5:Exec Source:监听一个指定的命令,获取一条命令的结果作为它的数据源
常用的是tail -F file指令,即只要应用程序向日志(文件)里面写数据,source组件就可以获取到日志(文件)中最新的内容 。 其中 Sink:hdfs Channel:file
这个案列为了方便显示Exec Source的运行效果,结合Hive中的external table进行来说明。a) 编写配置文件:
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /usr/local/log.file # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutput a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 0 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S a1.sinks.k1.hdfs.useLocalTimeStamp = true # Use a channel which buffers events in file a1.channels.c1.type = file a1.channels.c1.checkpointDir = /usr/flume/checkpoint a1.channels.c1.dataDirs = /usr/flume/data # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
b)在hive中建立外部表—–hdfs://hadoop80:9000/dataoutput的目录,方便查看日志捕获内容
hive> create external table t1(infor string) > row format delimited > fields terminated by '\t' > location '/dataoutput/'; OK Time taken: 0.284 seconds
- 1
- 2
- 3
- 4
- 5
- 6
c) 启动flume agent a1 服务端
flume-ng agent -n a1 -c ../conf -f ../conf/exec.conf -Dflume.root.logger=DEBUG,console
- 1
d) 使用echo命令向/usr/local/datainput 中发送数据
echo big data > log.file
- 1
d) 在HDFS和Hive分别中查看flume收集到的日志数据:
hive> select * from t1; OK big data Time taken: 0.086 seconds
- 1
- 2
- 3
- 4
e)使用echo命令向/usr/local/datainput 中在追加一条数据
echo big data world! >> log.file
- 1
d) 在HDFS和Hive再次分别中查看flume收集到的日志数据:
hive> select * from t1; OK big data big data world! Time taken: 0.511 seconds
- 1
- 2
- 3
- 4
- 5
总结Exec source:Exec source和Spooling Directory Source是两种常用的日志采集的方式,其中Exec source可以实现对日志的实时采集,Spooling Directory Source在对日志的实时采集上稍有欠缺,尽管Exec source可以实现对日志的实时采集,但是当Flume不运行或者指令执行出错时,Exec source将无法收集到日志数据,日志会出现丢失,从而无法保证收集日志的完整性。
案例6:Avro Source:监听一个指定的Avro 端口,通过Avro 端口可以获取到Avro client发送过来的文件 。即只要应用程序通过Avro 端口发送文件,source组件就可以获取到该文件中的内容。 其中 Sink:hdfs Channel:file
(注:Avro和Thrift都是一些序列化的网络端口–通过这些网络端口可以接受或者发送信息,Avro可以发送一个给定的文件给Flume,Avro 源使用AVRO RPC机制)
Avro Source运行原理如下图:
flume官网中Avro Source的描述:Property Name Default Description channels – type – The component type name, needs to be avro bind – 日志需要发送到的主机名或者ip,该主机运行着ARVO类型的source port – 日志需要发送到的端口号,该端口要有ARVO类型的source在监听
- 1
- 2
- 3
- 4
- 5
1)编写配置文件
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.bind = 192.168.80.80 a1.sources.r1.port = 4141 # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutput a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 0 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S a1.sinks.k1.hdfs.useLocalTimeStamp = true # Use a channel which buffers events in file a1.channels.c1.type = file a1.channels.c1.checkpointDir = /usr/flume/checkpoint a1.channels.c1.dataDirs = /usr/flume/data # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
b) 启动flume agent a1 服务端
flume-ng agent -n a1 -c ../conf -f ../conf/avro.conf -Dflume.root.logger=DEBUG,console
- 1
c)使用avro-client发送文件
flume-ng avro-client -c ../conf -H 192.168.80.80 -p 4141 -F /usr/local/log.file
- 1
注:log.file文件中的内容为:
[root@hadoop80 local]# more log.file big data big data world!
- 1
- 2
- 3
d) 在HDFS中查看flume收集到的日志数据:
通过上面的几个案例,我们可以发现:flume配置文件的书写是相当灵活的—-不同类型的Source、Channel和Sink可以自由组合!
最后对上面用的几个flume source进行适当总结:
① NetCat Source:监听一个指定的网络端口,即只要应用程序向这个端口里面写数据,这个source组件
就可以获取到信息。
②Spooling Directory Source:监听一个指定的目录,即只要应用程序向这个指定的目录中添加新的文
件,source组件就可以获取到该信息,并解析该文件的内容,然后写入到channle。写入完成后,标记
该文件已完成或者删除该文件。
③Exec Source:监听一个指定的命令,获取一条命令的结果作为它的数据源
常用的是tail -F file指令,即只要应用程序向日志(文件)里面写数据,source组件就可以获取到日志(文件)中最新的内容 。
④Avro Source:监听一个指定的Avro 端口,通过Avro 端口可以获取到Avro client发送过来的文件 。即只要应用程序通过Avro 端口发送文件,source组件就可以获取到该文件中的内容。如有问题,欢迎留言指正!
-
flume 数据采集
2017-12-29 21:00:07在大数据的业务处理过程中,Flume主要负责数据的采集。 2、Flume架构介绍 flume是分布式的日志收集系统,它将各个服务器中的数据收集起来并送到指定的地方去,比如说送到图中的HDFS,简单来说flume就是收集...1、flume在大数据业务中的角色
Hadoop业务的整体开发流程:
在大数据的业务处理过程中,Flume主要负责数据的采集。
2、Flume架构介绍
flume是分布式的日志收集系统,它将各个服务器中的数据收集起来并送到指定的地方去,比如说送到图中的HDFS,简单来说flume就是收集日志的。
2、Event
在这里有必要先介绍一下flume中event的相关概念:flume的核心是把数据从数据源(source)收集过来,在将收集到的数据送到指定的目的地(sink)。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,flume在删除自己缓存的数据。
在整个数据的传输的过程中,流动的是event,即事务保证是在event级别进行的。那么什么是event呢?—–event将传输的数据进行封装,是flume传输数据的基本单位,如果是文本文件,通常是一行记录,event也是事务的基本单位。event从source,流向channel,再到sink,本身为一个字节数组,并可携带headers(头信息)信息。event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。
为了方便大家理解,给出一张event的数据流向图:
一个完整的event包括:event headers、event body、event信息(即文本文件中的单行记录),如下所以:
其中event信息就是flume收集到的日记记录。3、flume组件介绍
flume之所以这么神奇,是源于它自身的一个设计,这个设计就是agent,agent本身是一个java进程,运行在日志收集节点—所谓日志收集节点就是服务器节点。
agent里面包含3个核心的组件:source—->channel—–>sink,类似生产者、仓库、消费者的架构。
source:source组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spoolingdirectory、netcat、sequencegenerator、syslog、http、legacy、自定义。channel:source组件把数据收集来以后,临时存放在channel中,即channel组件在agent中是专门用来存放临时数据的——对采集到的数据进行简单的缓存,可以存放在memory、jdbc、file等等。
sink:sink组件是用于把数据发送到目的地的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、hbase、solr、自定义。
4、flume的运行机制
flume的核心就是一个agent,这个agent对外有两个进行交互的地方,一个是接受数据的输入——source,一个是数据的输出sink,sink负责将数据发送到外部指定的目的地。source接收到数据之后,将数据发送给channel,chanel作为一个数据缓冲区会临时存放这些数据,随后sink会将channel中的数据发送到指定的地方—-例如HDFS等,注意:只有在sink将channel中的数据成功发送出去之后,channel才会将临时数据进行删除,这种机制保证了数据传输的可靠性与安全性。
5、flume的用法
flume之所以这么神奇—-其原因也在于flume可以支持多级flume的agent,即flume可以前后相继,例如sink可以将数据写到下一个agent的source中,这样的话就可以连成串了,可以整体处理了。flume还支持扇入(fan-in)、扇出(fan-out)。所谓扇入就是source可以接受多个输入,所谓扇出就是sink可以将数据输出多个目的地destination中。对于flume的原理其实很容易理解,我们更应该掌握flume的具体使用方法,flume提供了大量内置的Source、Channel和Sink类型。而且不同类型的Source、Channel和Sink可以自由组合—–组合方式基于用户设置的配置文件,非常灵活。比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。
案例一:监控端口数据
目标:Flume监控一端Console,另一端Console发送消息,使被监控端实时显示。
分步实现:
1) 创建Flume Agent配置文件flume-telnet.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2) 判断44444端口是否被占用
$ netstat -tunlp | grep 44444
3) 先开启flume先听端口
$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-telnet.conf -Dflume.root.logger==INFO,console
4) 使用telnet工具向本机的44444端口发送内容
$ telnet localhost 44444
案例二:实时读取本地文件到HDFS
目标:实时监控hive日志,并上传到HDFS中
分步实现:
1) 拷贝Hadoop相关jar到Flume的lib目录下(要学会根据自己的目录和版本查找jar包)
$ cp share/hadoop/common/lib/hadoop-auth-2.5.0-cdh5.3.6.jar ./lib/
$ cp share/hadoop/common/lib/commons-configuration-1.6.jar ./lib/
$ cp share/hadoop/mapreduce1/lib/hadoop-hdfs-2.5.0-cdh5.3.6.jar ./lib/
$ cp share/hadoop/common/hadoop-common-2.5.0-cdh5.3.6.jar ./lib/
$ cp ./share/hadoop/hdfs/lib/htrace-core-3.1.0-incubating.jar ./lib/
$ cp ./share/hadoop/hdfs/lib/commons-io-2.4.jar ./lib/
尖叫提示:标红的jar为1.99版本flume必须引用的jar
2) 创建flume-hdfs.conf文件
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /home/admin/modules/apache-hive-1.2.2-bin/hive.log
a2.sources.r2.shell = /bin/bash -c
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://linux01:8020/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 600
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount = 0
#最小冗余数
a2.sinks.k2.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
3) 执行监控配置
$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-hdfs.conf
案例三:实时读取目录文件到HDFS
目标:使用flume监听整个目录的文件
分步实现:
1) 创建配置文件flume-dir.conf
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /home/admin/modules/apache-flume-1.7.0-bin/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://linux01:8020/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0
#最小冗余数
a3.sinks.k3.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
2) 执行测试:执行如下脚本后,请向upload文件夹中添加文件试试
$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir.conf
尖叫提示: 在使用SpoolingDirectory Source时
1) 不要在监控目录中创建并持续修改文件
2) 上传完成的文件会以.COMPLETED结尾
3) 被监控文件夹每600毫秒扫描一次文件变动
案例4:监听一个指定的Avro 端口
1)编写配置文件
# Namethe components on this agent
a1.sources= r1
a1.sinks =k1
a1.channels= c1
#Describe/configure the source
a1.sources.r1.type= avro
a1.sources.r1.bind= 192.168.80.80
a1.sources.r1.port= 4141
#Describe the sink
a1.sinks.k1.type= hdfs
a1.sinks.k1.hdfs.path= hdfs://hadoop80:9000/dataoutput
a1.sinks.k1.hdfs.writeFormat= Text
a1.sinks.k1.hdfs.fileType= DataStream
a1.sinks.k1.hdfs.rollInterval= 10
a1.sinks.k1.hdfs.rollSize= 0
a1.sinks.k1.hdfs.rollCount= 0
a1.sinks.k1.hdfs.filePrefix= %Y-%m-%d-%H-%M-%S
a1.sinks.k1.hdfs.useLocalTimeStamp= true
# Use achannel which buffers events in file
a1.channels.c1.type= file
a1.channels.c1.checkpointDir= /usr/flume/checkpoint
a1.channels.c1.dataDirs= /usr/flume/data
# Bindthe source and sink to the channel
a1.sources.r1.channels= c1
a1.sinks.k1.channel= c1
2) 启动flume agent a1服务端
flume-ng agent -n a1 -c ../conf -f ../conf/avro.conf -Dflume.root.logger=DEBUG,console
3)使用avro-client发送文件
flume-ng avro-client -c ../conf -H 192.168.80.80 -p 4141 -F/usr/local/log.file
案例五:Flume与Flume之间数据传递:单Flume多Channel、Sink,
目标:使用flume-1监控文件变动,flume-1将变动内容传递给flume-2,flume-2负责存储到HDFS。同时flume-1将变动内容传递给flume-3,flume-3负责输出到。
local filesystem。
分步实现:
1) 创建flume-1.conf,用于监控hive.log文件的变动,同时产生两个channel和两个sink分别输送给flume-2和flume3:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给多个channel
a1.sources.r1.selector.type = replicating
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/admin/modules/apache-hive-1.2.2-bin/hive.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux01
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = linux01
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
2) 创建flume-2.conf,用于接收flume-1的event,同时产生1个channel和1个sink,将数据输送给hdfs:
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = linux01
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://linux01:8020/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0
#最小冗余数
a2.sinks.k1.hdfs.minBlockReplicas = 1
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
3) 创建flume-3.conf,用于接收flume-1的event,同时产生1个channel和1个sink,将数据输送给本地目录:
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = linux01
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /home/admin/Desktop/flume3
# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
尖叫提示:输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录。
4) 执行测试:分别开启对应flume-job(依次启动flume-3,flume-2,flume-1),同时产生文件变动并观察结果:
$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group-job1/flume-3.conf
$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group-job1/flume-2.conf
$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group-job1/flume-1.conf
案例六:Flume与Flume之间数据传递,多Flume汇总数据到单Flume
目标:flume-1监控文件hive.log,flume-2监控某一个端口的数据流,flume-1与flume-2将数据发送给flume-3,flume3将最终数据写入到HDFS。
分步实现:
1) 创建flume-1.conf,用于监控hive.log文件,同时sink数据到flume-3:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/admin/modules/apache-hive-1.2.2-bin/hive.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux01
a1.sinks.k1.port = 4141
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2) 创建flume-2.conf,用于监控端口44444数据流,同时sink数据到flume-3:
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = linux01
a2.sources.r1.port = 44444
# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = linux01
a2.sinks.k1.port = 4141
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
3) 创建flume-3.conf,用于接收flume-1与flume-2发送过来的数据流,最终合并后sink到HDFS:
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = linux01
a3.sources.r1.port = 4141
# Describe the sink
a3.sinks.k1.type = hdfs
a3.sinks.k1.hdfs.path = hdfs://linux01:8020/flume3/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k1.hdfs.filePrefix = flume3-
#是否按照时间滚动文件夹
a3.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k1.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是128M
a3.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k1.hdfs.rollCount = 0
#最小冗余数
a3.sinks.k1.hdfs.minBlockReplicas = 1
# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
4) 执行测试:分别开启对应flume-job(依次启动flume-3,flume-2,flume-1),同时产生文件变动并观察结果:
$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group-job2/flume-3.conf
$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group-job2/flume-2.conf
$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group-job2/flume-1.conf
尖叫提示:测试时记得启动hive产生一些日志,同时使用telnet向44444端口发送内容,如:
$ bin/hive
$ telnet linux01 44444
总结:
通过上面的几个案例,我们可以发现:flume配置文件的书写是相当灵活的—-不同类型的Source、Channel和Sink可以自由组合!
最后对上面用的几个flume source进行适当总结:
① NetCat Source:监听一个指定的网络端口,即只要应用程序向这个端口里面写数据,这个source组件
就可以获取到信息。
②Spooling Directory Source:监听一个指定的目录,即只要应用程序向这个指定的目录中添加新的文
件,source组件就可以获取到该信息,并解析该文件的内容,然后写入到channle。写入完成后,标记
该文件已完成或者删除该文件。
③Exec Source:监听一个指定的命令,获取一条命令的结果作为它的数据源
常用的是tail -F file指令,即只要应用程序向日志(文件)里面写数据,source组件就可以获取到日志(文件)中最新的内容 。
④Avro Source:监听一个指定的Avro 端口,通过Avro端口可以获取到Avro client发送过来的文件 。即只要应用程序通过Avro 端口发送文件,source组件就可以获取到该文件中的内容。 -
Flume数据采集
2018-06-12 15:42:16flume自带很长多的source,如:exe、kafka......常用的使用场景:对于有些应用环境中,不能部署Flume SDK及其依赖项,可以在代码中通过HTTP而不是Flume的PRC发送数据的情况,此时HTTP SOURCE可以用来将数据接收到Fl...flume自带很长多的source,如:exe、kafka...其中有一个非常简单的source——httpsource,使用httpSource,flume启动后会拉起一个web服务来监听指定的ip和port。常用的使用场景:对于有些应用环境中,不能部署Flume SDK及其依赖项,可以在代码中通过HTTP而不是Flume的PRC发送数据的情况,此时HTTP SOURCE可以用来将数据接收到Flume中。
1、httpsource 参数:
配置参数 默认值 描述 type http (org.apache.fluem.source.httpSource) bind 绑定的IP地址或主机名 port 绑定的端口号 enableSSL false keystore 使用的keystore文件的路径 keystorePassword 能够进入keystore的密码 handler JSONHandler HTTP SOURCE使用的处理程序类 handler.* 传给处理程序类的任何参数 可以 通过使用此参数(*)配置传入 1)handler:
Flume使用一个可插拔的“handler”程序来实现转换,如果不指定默认是:JSONHandler,它能处理JSON格式的事件,格式如下。此外用户可以自定义handler,必须实现HTTPSourceHandler接口。
json数据格式:- [ { "headers":{"":"","":""
- },
- "body":"the first event"
- },
- { "headers":{"":"","":""
- },
- "body":"the second event"
- }
- ]
2、简单介绍一下flume的logger sink:记录INFO级别的日志,一般用于调试。本文将使用这种类型的sink,配置的属性:
- type logger
- maxBytesToLog 16 Maximum number of bytes of the Event body to log
3、应用
3.1 Flume配置如下:
3.2 结构说明
在该场景中,使用三个Flume Agent分别部署在三台Web服务器上,用来采集终端的数据,然后其数据的下沉方式都为发送到另外两个Flume Agent上,然后这两个Flume agent 将数据发送到kafka或者hdfs中
3.3 配置
3.3.1 第一层Flume配置
######################################################### ## ##主要作用是监听文件中的新增数据,采集到数据之后,输出到avro ## 注意:Flume agent的运行,主要就是配置source channel sink ## 下面的a1就是agent的代号,source叫r1 channel叫c1 sink叫k1 #########################################################
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2a1.sinkgroups= g1
a1.sources.r1.type = http
a1.sources.r1.port = 14703
a1.sources.r1.channels = c1
a1.sources.r1.charset = UTF-8
a1.channels.c1.type = memory#对于channel的配置描述 使用内存作为数据的临时缓存
a1.channels.c1.capacity = 1000000000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 10240000000
a1.sinks.k1.type=avro#对于sink的配置描述 使用avro日志做数据的消费
a1.sinks.k1.channel=c1
a1.sinks.k1.hostname=lzs01a1.sinks.k1.port=44444
a1.sinks.k2.type=avro
a1.sinks.k2.channel=c1
a1.sinks.k2.hostname=lzs02a1.sinks.k2.port=44445
a1.sinkgroups.g1.sinks= k1 k2
a1.sinkgroups.g1.processor.type= load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin3.3.2第二层Flume配置
tier1.sources = r1 r2
tier1.sinks = k1 hdfs1
tier1.channels = c_k1 c_hdfs1
#对于source的配置描述 监听avro
tier1.sources.r1.type = avro
tier1.sources.r1.bind = 0.0.0.0
tier1.sources.r1.port = 44444
tier1.sources.r1.charset = UTF-8
#对于source的配置描述 监听avro
tier1.sources.r2.type = avro
tier1.sources.r2.bind = 0.0.0.0
tier1.sources.r2.port = 44445
tier1.sources.r2.charset = UTF-8
#对于sink的配置描述 使用kafka做数据的消费
tier1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.k1.topic = f-k-s
tier1.sinks.k1.brokerList = lzs01:9092,lzs02:9092,lzs03:9092
tier1.sinks.k1.requiredAcks = 1
tier1.sinks.k1.batchSize = 20
tier1.sinks.k1.channel = k1
#对于sink的配置描述 使用日志做数据的消费
tier1.sinks.hdfs1.type = hdfs
tier1.sinks.hdfs1.hdfs.path = hdfs://nameservice1/user/flume/cloud/%Y%m%d/
tier1.sinks.hdfs1.hdfs.filePrefix = stb-%Y-%m-%d-%H
tier1.sinks.hdfs1.hdfs.useLocalTimeStamp = true
tier1.sinks.hdfs1.hdfs.fileSuffix = .log
tier1.sinks.hdfs1.hdfs.inUseSuffix = .tmp
tier1.sinks.hdfs1.hdfs.round = true
tier1.sinks.hdfs1.hdfs.rollInterval = 3600
tier1.sinks.hdfs1.hdfs.rollCount = 0
tier1.sinks.hdfs1.hdfs.rollSize = 134217728
tier1.sinks.hdfs1.hdfs.callTimeout = 60000
tier1.sinks.hdfs1.hdfs.batchSize = 600
tier1.sinks.hdfs1.hdfs.idleTimeout = 0
# 如果希望上面配置的日志文件滚动策略生效,则必须要配置下面这一项
tier1.sinks.hdfs1.hdfs.minBlockReplicas = 1
#配置下面两项后,保存到HDFS中的数据才是文本
#否则通过hdfs dfs -text查看时,显示的是经过压缩的16进制
tier1.sinks.hdfs1.hdfs.serializer = TEXT
tier1.sinks.hdfs1.hdfs.fileType = DataStream
#对于channel的配置描述 使用内存缓冲区域做数据的临时缓存
tier1.channels.c_k1.type = memory
tier1.channels.c_k1.capacity = 1000000000
tier1.channels.c_k1.byteCapacityBufferPercentage = 20
tier1.channels.c_k1.byteCapacity = 10240000000
#对于channel的配置描述 使用文件作为数据的临时缓存
tier1.channels.c_hdfs1.type= file
tier1.channels.c_hdfs1.checkpointDir = /data/flume/cloud/filechannel/checkpoint
tier1.channels.c_hdfs1.dataDirs = /data/flume/cloud/filechannel/data
tier1.channels.c_hdfs1.useDualCheckpoints = true
tier1.channels.c_hdfs1.backupCheckpointDir = /data/flume/cloud/filechannel/backupCheckpoint
tier1.channels.c_hdfs1.checkpointInterval = 300000
tier1.channels.c_hdfs1.transactionCapacity=600
tier1.channels.c_hdfs1.capacity = 200000000
tier1.channels.c_hdfs1.maxFileSize = 2146435071
#通过channel将source和sink 关联起来
tier1.sources.r1.channels = c_k1 c_hdfs1
tier1.sources.r2.channels=c_k1 c_hdfs1
tier1.sinks.k1.channel = c_k1
tier1.sinks.hdfs1.channel=c_hdfs13.4 测试
3.4.1 创建kafka对应的topic
a.查看topic列表
bin/kafka-topics --list --zookeeper lzs01:2181,lzs02:2181,lzs03:2181/kafka
b.创建topic
bin/kafka-topics --zookeeper lzs01:2181,lzs02:2181,lzs03:2181/kafka --create --topic f-k-s --partitions 3 --replication-factor 3
c.启动消费
bin/kafka-console-consumer --topic f-k-s --bootstrap-server lzs01:9092,lzs02:9092,lzs03:9092
3.4.2 启动第二层的flume
3.4.3 启动第一层的flume
bin/flume-ng agent -c conf/ -f conf/flume-httparvo.conf -n a1 -Dflume.root.logger=INFO,console
3.4.4 发送数据
curl -X POST -d'[{"headers":{"h1":"v1","h2":"v2"},"body":"hello body"}]' http://192.168.1.102:14703
3.4.5查看kafka和hdfs对应的目录是否有数据
-
flume数据采集_flume采集Kafka数据到hdfshive
2020-11-30 11:10:04一,整体流程1,首先来一份流程图2,照着这个流程图我们来进入今天讨论的内容,'数据同步模块'二,数据同步流程1,使用Flume完成数据采集的后半部分,即Kafka数据到Hadoop平台的落地三,同步配置1,版本信息基础构建...一,整体流程
1,首先来一份流程图
2,照着这个流程图我们来进入今天讨论的内容,'数据同步模块'
二,数据同步流程
1,使用Flume完成数据采集的后半部分,即Kafka数据到Hadoop平台的落地
三,同步配置
1,版本信息
基础构建CDH5.16.1,(推荐使用Flume之前升级到1.7及之后版本,5.16默认版本为1.6,本次使用1.6)
2,Flume配置信息
a1.sources=source_from_kafka
a1.channels=mem_channel
a1.sinks=hdfs_sink
#kafka为source的配置
a1.sources.source_from_kafka.type=org.apache.flume.source.kafka.KafkaSource
a1.sources.source_from_kafka.batchSize=10
a1.sources.source_from_kafka.kafka.bootstrap.servers=node01:9092,node02:9092,node03:9093
a1.sources.source_from_kafka.topic=AdRealTimeLog
a1.sources.source_from_kafka.channels=mem_channel
a1.sources.source_from_kafka.consumer.timeout.ms=1000
#hdfs为sink的配置
# The channel can be defined as follows.
a1.sinks.hdfs_sink.type=hdfs
# 指定sink需要使用的channel的名字,注意这里是channel
#Specify the channel the sink should use
a1.sinks.hdfs_sink.channel=mem_channel
#a1.sinks.hdfs_sink.filePrefix=%{host}
a1.sinks.hdfs_sink.hdfs.path=hdfs://node03:8020/flume_sink/ds=%Y%m%d
#File size to trigger roll, in bytes (0: never roll based on file size)
a1.sinks.hdfs_sink.hdfs.rollSize=0
#Number of events written to file before it rolled (0 = never roll based on number of events)
a1.sinks.hdfs_sink.hdfs.rollCount=0
a1.sinks.hdfs_sink.hdfs.rollInterval=3600
a1.sinks.hdfs_sink.hdfs.threadsPoolSize=30
#a1.sinks.hdfs_sink.hdfs.codeC=gzip
#a1.sinks.hdfs_sink.hdfs.fileType=CompressedStream
a1.sinks.hdfs_sink.hdfs.fileType=DataStream
a1.sinks.hdfs_sink.hdfs.writeFormat=Text
a1.sinks.hdfs_sink.hdfs.minBlockReplicas=1
#channel的配置
a1.channels.mem_channel.type=memory
a1.channels.mem_channel.capacity=1000
a1.channels.mem_channel.transactionCapacity=100
#配置对应关系
a1.sources.source_from_kafka.channels=mem_channel
a1.sinks.hive_sink.channel=mem_channel
3,启动测试
flume-ng agent -n a1 -c /opt/flumeCon -f kafka2hdfs.conf -Dflume.root.logger=INFO,console
4,查看存储到hdfs上面的文件
四,后续操作
1,数据按天导入hive分区表,后续数据抽取等等,后补
-
flume数据采集流程
2020-12-11 17:25:36flume 流式日志采集工具 重点:流式数据 Flume功能 flume架构 基础框架 -
flume数据采集_Flume概述
2020-11-30 11:10:05无论数据来自什么企业,或是多大量级,通过部署Flume,可以确保数据都安全、 及时地到达大数据平台,用户可以将精力集中在如何洞悉数据上。...简单来说:Flume是实时采集日志的数据采集引擎。Flume架构.png三个重要... -
flume数据采集端过滤工程
2017-01-04 11:33:07flume进行数据采集,在采集端增加过滤 -
Flume 数据采集系统 性能优化和关键问题汇总
2016-06-01 20:02:56Flume 性能优化 和关键问题汇总 Flume 数据采集系统 性能优化和关键问题汇总 Flume 数据采集系统 性能优化和关键问题汇总 -
Flume 数据采集组件
2019-01-04 05:44:10目录 1、数据收集工具/系统产生背景 2、专业的数据收集工具 2.1、Chukwa 2.2、Scribe 2.3、Fluentd ...3.3、Flume数据源和输出方式 4、Flume体系结构/核心组件 4.1、概述 4.2、Flume三大核心... -
Flume数据采集准备
2019-06-24 10:43:38Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可... -
flume 数据采集简介
2019-07-08 23:00:32在大数据的业务处理过程中,Flume主要负责数据的采集。 2、Flume架构介绍 flume是分布式的日志收集系统,它将各个服务器中的数据收集起来并送到指定的地方去,比如说送到图中的HDFS,简单来说flume就是收集日志的... -
五大工具组件Flume数据采集组件安装和部署
2018-08-14 09:51:13五大组件Flume数据采集组件安装和部署 1.Flume的安装 Flume的安装非常简单,只需要上传解压即可 tar -zxvf apache-flume-1.8.0-bin.tar.gz 进入 flume 的目录,修改 conf 下的 flume-env.sh,在里面配置 JAVA_... -
flume数据采集架构
2019-05-14 11:14:48在日常生产环境中,如果想要做数据采集基本上都要用到flume,现在就记录一下flume在整个项目中的架构。 先简单说一下这个项目,从微信小程序中记录用户数据,项目后台程序使用springBoot编写,部署在服务器上,使用... -
Flume数据采集组件
2018-12-19 15:28:55其中,数据采集是所有数据系统必不可少的,随着大数据越来越被重视,数据采集的挑战也 变的尤为突出。这其中包括: 数据源多种多样; 数据量大,变化块; 如何保证数据采集的可靠性的性能; 如何避免重复数据... -
flume数据采集工具
2021-01-10 20:40:12flime运行起来的进程叫agentflume采集系统就是由一个个agent连接起来所形成的一个或简单或复杂的数据传输通道每一个agent都有3个组件Source,channel,sink Source就相当于read(读数据) Channel就相当于缓存数据(为... -
flume数据采集_六大数据采集平台的架构对比分析
2020-11-21 00:04:56今天为大家介绍几款数据采集平台:Apache Flume、Fluentd、Logstash、Chukwa、Scribe、Splunk Forwarder。大数据平台与数据采集任何完整的大数据平台,一般包括以下的几个过程:数据采集数据存储数据处理数据展现... -
Flume数据采集各种配置详解
2016-06-07 16:01:37Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可... -
大数据之Flume数据采集框架
2020-12-01 18:18:11Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构,灵活简单。 Flume最主要的作用就是,实时读取服务器本地磁盘数据,将数据写入到HDFS,也可以将数据... -
flume数据采集_六个大数据采集工具架构分析
2020-11-26 21:47:38是新朋友吗?记得先点蓝字关注我哦~ 今日课程菜单Java全栈开发 |Web前端+H5大数据开发 | 数据分析人工智能+Python | 人工智能+物联网来源:...今天为大家介绍几款数据采集平台:Apache Flume、Fluentd、Logstash、... -
flume数据采集_17000字总结Flume
2020-11-27 03:01:15by图灵导言 笔者耗时将近3周的时间将flume相关知识点进行了总结,并带有详细的案例demo,希望能够帮助读者快速上手学习。另考虑到该文章篇幅较长,不适合读者短期阅读,建议读者收藏或者关注公众号后台回复"flume"可... -
Sqoop Flume 数据采集引擎
2019-04-14 23:02:12Sqoop Flume 准备实验的环境:准备Oracle数据库 1、实验:前面的实验:Oracle闪回(回收站) 2、自带用户:sh/sh ---> 表:sales订单表(大概92万条数据) ...数据采集的引擎:Sqoop: 采集关系型数据库 Fl... -
flume数据采集扇入
2019-07-05 16:02:07.conf文件的定义 #1 Agent a3.sources = r3 r4 a3.sinks = k3 a3.channels = c3 #2 source ...2个数据源采集到的数据合并上传到bigdata112 hdfs目录下 查看hdfs里面sink进来的具体数据 -
FLume数据采集到kafka
2018-09-20 22:51:503.2.1 数据采集 思路: a) 配置kafka,启动zookeeper和kafka集群; b) 创建kafka主题; c) 启动kafka控制台消费者(此消费者只用于测试使用); d) 配置flume,监控日志文件; e) 启动flume监控任务; f) ... -
Flume数据采集 启动、停止脚本
2020-06-29 14:53:16# 功能:flume采集数据启动、停止 # 参数:Agent (flume的配置文件名) # flume是根据配置文件配置项,启动,采集数据的 # 说明:配置文件写在 $FLUME_HOME/jobs 目录下 Agent=$2 case $1 in "start"){ for i in ... -
flume数据采集_基于nginx+flume+kafka+mongodb实现埋点数据采集
2020-11-30 11:10:12名词解释埋点其实就是...开发背景我司之前在处理埋点数据采集时,模式很简单,当用户操作页面控件时,前端监听到操作事件,并根据上下文环境,将事件相关的数据通过接口调用发送至埋点数据采集服务(简称ets服务),... -
13:flume 数据采集
2019-05-08 14:14:56一、Flume下载 从官网下载,或者在linux中使用下面的命令下载,然后解压 方式1:官网下载地址:http://archive.apache.org/dist/flume/ 二、Flume解压 tar -zxvf/usr/local/cdh-5.13.2-tar/flume-ng-1.6.0-cdh... -
flume数据采集_用户行为分析之数据采集
2020-11-30 11:10:05下面该数据采集神器Flume出场了。 source 可以接收外部源发送过来的数据。不同的 source,可以接受不同的数据格式。比如有目录池(spooling directory)数据源,可以监控指定文件夹中的新文件变化,如果目录中有文件... -
flume数据采集_一万字完整总结Flume
2020-11-27 03:01:12by图灵导言 笔者耗时将近3周的时间将flume相关知识点进行了总结,并带有详细的案例demo,希望能够帮助读者快速上手学习。另考虑到该文章篇幅较长,不适合读者短期阅读,建议读者收藏或者关注公众号后台回复"flume"可...