FLUME
是一个海量日志收集系统。
Flume提供了从console(控制台)、RPC(Thrift-RPC)、text(文件)、tail(UNIX tail)、syslog(syslog日志系统),支持TCP和UDP等2种模式),exec(命令执行)等数据源上收集数据的能力。
Flume可以将应用产生的数据存储到任何集中存储器中,比如HDFS,HBase
Flume的结构
Agent主要由:source(源),channel(渠道),sink(洗涤槽;)三个组件组成.
sink
淹没; 下落;
Source:
从数据发生器接收数据,并将接收的数据以Flume的event格式传递给一个或者多个通道channal,Flume提供多种数据接收的方式,比如Avro,Thrift,twitter1%等
Channel:
channal是一种短暂的存储容器,它将从source处接收到的event格式的数据缓存起来,直到它们被sinks消费掉,它在source和sink间起着一共桥梁的作用,channal是一个完整的事务,这一点保证了数据在收发的时候的一致性. 并且它可以和任意数量的source和sink链接. 支持的类型有: JDBC channel , File System channel , Memort channel等.
sink:
sink将数据存储到集中存储器比如Hbase和HDFS,它从channals消费数据(events)并将其传递给目标地. 目标地可能是另一个sink,也可能HDFS,HBase.
==
今天开会讨论日志处理为什么要同时使用Flume和Kafka,是否可以只用Kafka 不使用Flume?当时想到的就只用Flume的接口多,不管是输入接口(socket 和 文件)以及输出接口(Kafka/HDFS/HBase等)。
考虑单一应用场景,从简化系统的角度考虑,在满足应用需求的情况下可能只使用一个比较好。但是考虑到现有系统业务发展,为了后面的灵活扩展,在先用系统设计时留有一定的扩展性感觉更重要,可能使用Flume+kafka架构相对只使用Kafka会多占用1-2台机器做Flume日志采集,但是为了方便以后日志数据处理方式的扩展,可以采用Flume+kafka架构。
Flume :管道 ----个人认为比较适合有多个生产者场景,或者有写入Hbase、HDFS和kafka需求的场景。
Kafka :消息队列-----由于Kafka是Pull模式,因此适合有多个消费者的场景。
目前应用场景,一台日志转发机负责产生日志。后端需要通过Strom消费日志信息,建议可以设置成log-->Kafka->Strom.如果以后有写入Hbase或者HDFS的需求可以,在Kafka后面再接上Strom,或者在日志转发机上直接日志落地,由Flume去读取日志消息。
==
关于Flume 的 一些核心概念:
组件名称 | 功能介绍 |
Agent代理 | 使用JVM 运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。 |
Client客户端 | 生产数据,运行在一个独立的线程。 |
Source源 | 从Client收集数据,传递给Channel。 |
Sink接收器 | 从Channel收集数据,进行相关操作,运行在一个独立线程。 |
Channel通道 | 连接 sources 和 sinks ,这个有点像一个队列。 |
Events事件 | 传输的基本数据负载。 |
--
flume翻译
水槽 [flu:m]
--
flume安装目录conf下新建kafka.properties文件,启动时也应当用此配置文件作为参数启动。下面看具体内容:

1. kafka.properties:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | agent.sources = s1
agent.channels = c1
agent.sinks = k1
agent.sources.s1.type=exec
agent.sources.s1.command=tail -F /tmp/logs/kafka.log
agent.sources.s1.channels=c1
agent.channels.c1.type=memory
agent.channels.c1.capacity=10000
agent.channels.c1.transactionCapacity=100
#设置Kafka接收器
agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
#设置Kafka的broker地址和端口号
agent.sinks.k1.brokerList=master:9092
#设置Kafka的Topic
agent.sinks.k1.topic=kafkatest
#设置序列化方式
agent.sinks.k1.serializer. class =kafka.serializer.StringEncoder
agent.sinks.k1.channel=c1
|
关于配置文件中注意3点:
a. agent.sources.s1.command=tail -F /tmp/logs/kafka.log
b. agent.sinks.k1.brokerList=master:9092
c . agent.sinks.k1.topic=kafkatest
2.很明显,由配置文件可以了解到:
a.我们需要在/tmp/logs下建一个kafka.log的文件,且向文件中输出内容(下面会说到);
b.flume连接到kafka的地址是 master:9092,注意不要配置出错了;
c.flume会将采集后的内容输出到Kafka topic 为kafkatest上,所以我们启动zk(zookeeper),kafka后需要打开一个终端消费topic kafkatest的内容。这样就可以看到flume与kafka之间玩起来了~~
具体操作:
a.在/tmp/logs下建立空文件kafka.log。在mfz 用户目录下新建脚本kafkaoutput.sh(一定要给予可执行权限),用来向kafka.log输入内容: kafka_test***
1 2 3 | for ((i=0;i<=1000;i++));
do echo "kafka_test-" +$i>>/tmp/logs/kafka.log;
done
|
b. 在kafka安装目录下执行如下命令,启动zk,kafka 。
1 | bin/zookeeper-server-start.sh -daemon config/zookeeper.properties &
|
1 | bin/kafka-server-start.sh -daemon config/server.properties &
|
c.新增Topic kafkatest
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkatest
d.打开新终端,在kafka安装目录下执行如下命令,生成对topic kafkatest 的消费
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafkatest --from-beginning --zookeeper master
e.启动flume
1 | bin/flume-ng agent --conf-file conf/kafka.properties -c conf/ --name agent -Dflume.root.logger=DEBUG,console
|
d.执行kafkaoutput.sh脚本(注意观察kafka.log内容及消费终端接收到的内容)


e.查看新终端消费信息

整体流程如图:

==
使用Flume+Kafka+SparkStreaming进行实时日志分析
每个公司想要进行数据分析或数据挖掘,收集日志、ETL都是第一步的,今天就讲一下如何实时地(准实时,每分钟分析一次)收集日志,处理日志,把处理后的记录存入Hive中,并附上完整实战代码
1. 整体架构
思考一下,正常情况下我们会如何收集并分析日志呢?
首先,业务日志会通过Nginx(或者其他方式,我们是使用Nginx写入日志)每分钟写入到磁盘中,现在我们想要使用Spark分析日志,就需要先将磁盘中的文件上传到HDFS上,然后Spark处理,最后存入Hive表中,如图所示:

我们之前就是使用这种方式每天分析一次日志,但是这样有几个缺点:
首先我们的日志是通过Nginx每分钟存成一个文件,这样一天的文件数很多,不利于后续的分析任务,所以先要把一天的所有日志文件合并起来
合并起来以后需要把该文件从磁盘传到Hdfs上,但是我们的日志服务器并不在Hadoop集群内,所以没办法直接传到Hdfs上,需要首先把文件从日志服务器传输到Hadoop集群所在的服务器,然后再上传到Hdfs
最后也是最重要的,滞后一天分析数据已经不能满足我们新的业务需求了,最好能控制在一个小时的滞后时间
可以看出来我们以前收集分析日志的方式还是比较原始的,而且比较耗时,很多时间浪费在了网络传输上面,如果日志量大的话还有丢失数据的可能性,所以在此基础上改进了一下架构:

整个过程就是,Flume会实时监控写入日志的磁盘,只要有新的日志写入,Flume就会将日志以消息的形式传递给Kafka,然后Spark Streaming实时消费消息传入Hive
那么Flume是什么呢,它为什么可以监控一个磁盘文件呢?简而言之,Flume是用来收集、汇聚并且移动大量日志文件的开源框架,所以很适合这种实时收集日志并且传递日志的场景
Kafka是一个消息系统,Flume收集的日志可以移动到Kafka消息队列中,然后就可以被多处消费了,而且可以保证不丢失数据
通过这套架构,收集到的日志可以及时被Flume发现传到Kafka,通过Kafka我们可以把日志用到各个地方,同一份日志可以存入Hdfs中,也可以离线进行分析,还可以实时计算,而且可以保证安全性,基本可以达到实时的要求
整个流程已经清晰了,下面各个突破,我们开始动手实现整套系统
2. 实战演练
2.1 安装Kafka
下载安装Kafka以及一些基本命令请传送到这里: Kafka安装与简介
安装好以后新建名为launcher_click的topic:
-. -- -- ,,,, --- -- --
查看一下该topic:
-. -- -- ,,,, --

2.2 安装Flume
1、下载解压
下载地址: https://flume.apache.org/download.html
注意进入下载地址页面,使用清华大学的那个地址,否则会很慢
wget http://apache.fayea.com/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
tar -xvf apache-flume-1.7.0-bin.tar.gz
2、修改配置文件
进入flume目录,修改conf/flume-env.sh
export JAVA_HOME=/data/install/jdk
export JAVA_OPTS="-Xms1000m -Xmx2000m -Dcom.sun.management.jmxremote"
添加配置文件:conf/flume_launcherclick.conf
# logser可以看做是flume服务的名称,每个flume都由sources、channels和sinks三部分组成
# sources可以看做是数据源头、channels是中间转存的渠道、sinks是数据后面的去向
logser.sources = src_launcherclick
logser.sinks = kfk_launcherclick
logser.channels = ch_launcherclick
# source
# 源头类型是TAILDIR,就可以实时监控以追加形式写入文件的日志
logser.sources.src_launcherclick.type = TAILDIR
# positionFile记录所有监控的文件信息
logser.sources.src_launcherclick.positionFile = /data/install/flume/position/launcherclick/taildir_position.json
# 监控的文件组
logser.sources.src_launcherclick.filegroups = f1
# 文件组包含的具体文件,也就是我们监控的文件
logser.sources.src_launcherclick.filegroups.f1 = /data/launcher/stat_app/.*
# interceptor
# 写kafka的topic即可
logser.sources.src_launcherclick.interceptors = i1 i2
logser.sources.src_launcherclick.interceptors.i1.type=static
logser.sources.src_launcherclick.interceptors.i1.key = type
logser.sources.src_launcherclick.interceptors.i1.value = launcher_click
logser.sources.src_launcherclick.interceptors.i2.type=static
logser.sources.src_launcherclick.interceptors.i2.key = topic
logser.sources.src_launcherclick.interceptors.i2.value = launcher_click
# channel
logser.channels.ch_launcherclick.type = memory
logser.channels.ch_launcherclick.capacity = 10000
logser.channels.ch_launcherclick.transactionCapacity = 1000
# kfk sink
# 指定sink类型是Kafka,说明日志最后要发送到Kafka
logser.sinks.kfk_launcherclick.type = org.apache.flume.sink.kafka.KafkaSink
# Kafka broker
logser.sinks.kfk_launcherclick.brokerList = 10.0.0.80:9092,10.0.0.140:9092
# Bind the source and sink to the channel
logser.sources.src_launcherclick.channels = ch_launcherclick
logser.sinks.kfk_launcherclick.channel = ch_launcherclick
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
3、启动
nohup bin/flume-ng agent --conf conf/ --conf-file conf/flume_launcherclick.conf --name logser -Dflume.root.logger=INFO,console >> logs/flume_launcherclick.log &
此时Kafka和Flume都已经启动了,从配置可以看到Flume的监控文件是/data/launcher/stat_app/.*,所以只要该目录下文件内容有增加就会发送到Kafka,大家可以自己追加一些测试日志到这个目录的文件下,然后开一个Kafka Consumer看一下Kafka是否接收到消息,这里我们完成SparkStreaming以后再看测试结果
2.3 SparkStreaming编程
SparkStreaming是Spark用来处理实时流的,能够实时到秒级,我们这里不需要这么实时,是每分钟执行一次日志分析程序,主要代码如下:
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sparkConf = new SparkConf().setAppName("LauncherStreaming")
val ssc = new StreamingContext(sparkConf, Seconds(60))
val kafkaStream = KafkaUtils.createStream(
ssc,
"hxf:2181,cfg:2181,jqs:2181,jxf:2181,sxtb:2181",
"launcher-streaming",
Map[String, Int]("launcher_click" -> 0, "launcher_click" -> 1),
StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
kafkaStream.foreachRDD((rdd: RDD[String], time: Time) => {
val result = rdd.map(log => parseLog(log))
.filter(t => StringUtils.isNotBlank(t._1) && StringUtils.isNotBlank(t._2))
result.saveAsHadoopFile(HDFS_DIR, classOf[String], classOf[String], classOf[LauncherMultipleTextOutputFormat[String, String]])
})
ssc.start()
ssc.awaitTermination()
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
篇目有限,完整代码访问我的github:https://github.com/Trigl/SparkLearning/blob/master/src/main/scala/com/trigl/spark/streaming/LauncherStreaming.scala
然后打包上传到master运行:
nohup /data/install/spark-2.0.0-bin-hadoop2.7/bin/spark-submit --master spark://hxf:7077 --executor-memory 1G --total-executor-cores 4 --class com.analysis.main.LauncherStreaming --jars /home/hadoop/jar/kafka-clients-0.10.0.0.jar,/home/hadoop/jar/metrics-core-2.2.0.jar,/home/hadoop/jar/zkclient-0.3.jar,/home/hadoop/jar/spark-streaming-kafka-0-8_2.11-2.0.0.jar,/home/hadoop/jar/kafka_2.11-0.8.2.1.jar /home/hadoop/jar/SparkLearning.jar >> /home/hadoop/logs/LauncherDM.log &
然后开始测试,往Flume监控目录/data/launcher/stat_app/.*写日志,原始日志内容类似下面这样:
118.120.102.3|1495608541.238|UEsDBBQACAgIACB2uEoAAAAAAAAAAAAAAAABAAAAMGWUbW7bMAyGb6NfnUFRFEWhJ+gBdgBZVjpjjp04brMAO*yY2DKa9Y+B1+DnQ1LCztoITgK4wPGHfNUhmKGUPOn3DyP*zdOxSWM3T33XXMqy9OP7xXTZiTC1xlL0HgMEi+BfHoooBEGKr3fPpYy5jMse4Xzupus4TKkrs4kZOhI51CgWWKxsUQBRPMDr1*w5Hcuc0LiUEFBwdXQxAARXHb3+QXlOfzya0uZWOGwlEwBDwLD5oJBVFHsEEPF2U0EUToyr8k4tg9v8AkRrIcKmxGsU2eqQIM45dKuKFICo5oveEqOjh2JAIITImyIJqBk3JS4qh7Wby*TroxnL9ZKHXrsyWeBQoMXaEgXUKh6mOQ1l7NLc*Hwz8aDpAtndLFJEetkVc6S9V*bg+RFiKMvnTv6ahuGUTmWexqEfi3Elezx0botJrCCQn5jfCzWaqaUOqNpFYO23ckYl5GOlx4rLQuUllh27SsjZyLQTUn4K+3uVczlOi+7uuMzTYLoibeIspk71DtKuJC+7T5qXPg9lLddaZs6+Lolnj7ANW0dBGKOn72m3cbQJI2Kq4*C6Xhz9E5Pzeeg*i2l1IAJtpReILNq6DY4peFjHeO5vffPZd2UyejEJ28Puo0sI*2*5ojvhfNcquWomFMVp02Pz++M6Nach3e6XR5wOlrdSg4T7RkgtQAuC6HYl2sc62i6dUq*om+HWjvdHAPSk8hYkegHraxC8PwPons73XZeozDfXmaRzzzaD2XI4fX0QX*8BUEsHCKeftc48AgAAmQQAAA==
查看HDFS的对应目录是否有内容:

HDFS存储的分析后的日志内容如下:
99000945863664
SparkStreaming任务状态如下:

可以看到的确是每分钟执行一次
==
相关文章:
大数据系列之Kafka安装
大数据系列之Flume--几种不同的Sources
大数据系列之Flume+HDFS
==
1 | bin/zookeeper-server-start.sh -daemon config/zookeeper.properties &
|
1 | bin/kafka-server-start.sh -daemon config/server.properties &
|