-
flume采集数据
2020-12-10 11:49:09今天写了一个flume的配置文件采集日志文件(json格式),将日志文件采集到kafka,结果在kafkatools工具中发现采集的每一条数据前面都多出了两个字节的乱码,显然后面的操作都是以json格式进行处理,数据被阻塞在...今天写了一个flume的配置文件采集日志文件(json格式),将日志文件采集到kafka,结果在kafkatools工具中发现采集的每一条数据前面都多出了两个字节的乱码,显然后面的操作都是以json格式进行处理,数据被阻塞在kafka中。
后来进行了反复检查,发现问题还是出现在flume配置文件中。。。。。。。。。。。。
在这里配置是否以event格式写入kafka时,多加了kafka,flume读取数据时还是以默认设置以event格式读入数据,结果event中的header信息保存到kafka后变成了乱码,后续操作以json格式为基础都会报错。 -
flume 采集数据到hdfs
2015-10-09 21:57:52前言:在两台机器上做flume 采集数据实验:hadoop05上安装flume 1.5.0版本,hadoop07上安装hadoop2.2.0版本 一、安装 前提:flume是依赖jdk,所以需要安装jdk,这里就不多说,jdk 安装目录/usr/local/...
前言:在两台机器上做flume 采集数据实验:hadoop05上安装flume 1.5.0版本,hadoop07上安装hadoop2.2.0版本
一、安装
前提:flume是依赖jdk,所以需要安装jdk,这里就不多说,jdk 安装目录/usr/local/jdk1.6.0_45
下载安装文件:apache-flume-1.5.0-bin.tar.gz,上传到linux 目录:/usr/local/
解压文件tar -zvxf apache-flume-1.5.0-bin.tar.gz 得到apache-flume-1.5.0-bin ,以下称为FLUME_HOME
二、修改配置文件
cd apache-flume-1.5.0-bin/conf 下
mv flume-env.sh.template flume-env.sh
vim flume-env.sh 添加jdk 配置:JAVA_HOME=/usr/local/jdk1.6.0_45 保存退出
三、建立配置文件,这个配置文件是flume :source、channel、sink三大组件的配置,取名为example.conf ,内容如下:
#定义agent名, source、channel、sink的名称
example.sources = r1
example.channels = c1
example.sinks = k1
#具体定义source
example.sources.r1.type = spooldir
example.sources.r1.spoolDir = /home/hadoop/logs
#具体定义channel
example.channels.c1.type = memory
example.channels.c1.capacity = 10000
example.channels.c1.transactionCapacity = 100
#定义拦截器,为消息添加时间戳
example.sources.r1.interceptors = i1
example.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
#具体定义sink
example.sinks.k1.type = hdfs
example.sinks.k1.hdfs.path = hdfs://hadoop07:9000/flume/%Y%m%d
example.sinks.k1.hdfs.filePrefix = events-
example.sinks.k1.hdfs.fileType = DataStream
#不按照条数生成文件
example.sinks.k1.hdfs.rollCount = 0
#HDFS上的文件达到128M时生成一个文件
example.sinks.k1.hdfs.rollSize = 134217728
#HDFS上的文件达到60秒生成一个文件
example.sinks.k1.hdfs.rollInterval = 60
#组装source、channel、sink
example.sources.r1.channels = c1
example.sinks.k1.channel = c1
jiang
四、拷贝需要hadoop 的jar包,因为是收集数据到hdfs,需要调用到hdfs 客户端接口,所以需要以下几个jar
在hadoop 解压之后的相对路径:(这里是使用hadoop2.2.0版本)
--hadoop-2.2.0/share/hadoop
--common/hadoop-common-2.2.0.jar
--common/lib/common-configuration-1.6.jar
--common/lib/hadoop-auth-2.2.0.jar
--hdfs/hadoop-hdfs-2.2.0.jar
到${FLUME_HOME}/lib下
五、通知flume 知道hadoop的信息
要让flume 知道hadoop的配置,所以需要拷贝hadoop 的两个核心配置文件:core-site.xml 和hdfs-site.xml到
${FLUME_HOME}/conf下
六、启动hadoop
所有的配置都已经准备好, 启动hadoop07 机器上的hadoop ,根据example.conf 中的配置而定,我这里用的是hadoop07,为了防止错误,建立我们本机的监听目录:
/home/hadoop/logs,
七、启动flume
所用的前提已经准备好,此时最后一步,启动flume:
${FLUME_HOME}/bin/flume-ng agent -n example -c conf -f conf/exmple.conf -Dflume.root.logger=INFO,console
擦数说明:agent -n example 代理对象
-c conf 配置文件的地址目录
-f conf/exmple.conf 指制定配置文件
-Dflume.root.logger=INFO,console 日志级别为INFO ,打印在控制台上
八、校验
将拷贝文件 cp /data/flume_test.txt /home/hadoop/logs 下
flume 会将其采取,到hdfs 中查看是否有多了一个flume 文件 -
flume采集数据到hdfs、kafka配置文件
2021-01-24 14:32:46flume采集数据到hdfs、kafka配置文件 执行命令 nohup bin/flume-ng agent -n a10 -c conf/ -f ./conf/server/flume-taildir-kafka.conf -Dflume.root.logger=INFO,console >> ./logs/fflume-taildir-kafka....flume采集数据到hdfs、kafka配置文件
- 执行命令
nohup bin/flume-ng agent -n a10 -c conf/ -f ./conf/server/flume-taildir-kafka.conf -Dflume.root.logger=INFO,console >> ./logs/fflume-taildir-kafka.log 2>&1 &
- 采集日志文件到kafka
flume-taildir-kafka.conf
#agent a10.sources = r1 a10.channels = c1 #source a10.sources.r1.type = TAILDIR a10.sources.r1.filegroups = f1 #读取的文件路径 a10.sources.r1.filegroups.f1 = /data1/onepiece-recommender/logs/statistics.log #通过 json 格式存下每个文件消费的偏移量,避免从头消费 a10.sources.r1.positionFile = /usr/local/flume/logs/taildir_position_event_log.json a10.sources.r1.fileHeader = false #batchSize a10.sources.r1.batchSize = 1000 #channel a10.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a10.channels.c1.kafka.bootstrap.servers = 10.70.110.70:9092,10.70.110.71:9092,10.70.110.82:9092 a10.channels.c1.kafka.topic = es-service-cost-monitor a10.channels.c1.kafka.consumer.group.id = flume-consumer-event-001 a10.channels.c1.parseAsFlumeEvent = false a10.channels.c1.kafka.producer.compression.type = lz4 a10.channels.c1.kafka.flumeBatchSize = 5000 a10.channels.c1.kafka.producer.acks = all a10.channels.c1.kafka.producer.linger.ms = 30 #关联关系 a10.sources.r1.channels = c1
采集日志文件到hdfs
#agent a10.sources = r1 a10.channels = c1 a10.sinks = k1 #source a10.sources.r1.type = TAILDIR a10.sources.r1.filegroups = f1 #读取的文件路径 a10.sources.r1.filegroups.f1 = /usr/local/nginx/logs/action_data/access.log #通过 json 格式存下每个文件消费的偏移量,避免从头消费 a10.sources.r1.positionFile = /home/user/apache-flume-1.9.0-bin/logs/taildir_position_event_log.json a10.sources.r1.fileHeader = false #batchSize a10.sources.r1.batchSize = 1000 #channel a10.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a10.channels.c1.kafka.bootstrap.servers = 10.10.100.124:9092 a10.channels.c1.kafka.topic = inference_raw_log a10.channels.c1.kafka.consumer.group.id = flume-consumer-eventlog-001 a10.channels.c1.parseAsFlumeEvent = false a10.channels.c1.kafka.producer.compression.type = lz4 a10.channels.c1.kafka.flumeBatchSize = 5000 a10.channels.c1.kafka.producer.acks = all a10.channels.c1.kafka.producer.linger.ms = 30 a10.sinks.k1.type = hdfs a10.sinks.k1.hdfs.path =hdfs://10.10.100.124:9000/data/action_log/%y%m%d/ a10.sinks.k1.hdfs.filePrefix = access_log a10.sinks.k1.hdfs.maxOpenFiles = 5000 a10.sinks.k1.hdfs.batchSize= 100 a10.sinks.k1.hdfs.fileType = DataStream a10.sinks.k1.hdfs.writeFormat =Text a10.sinks.k1.hdfs.rollSize = 102400 a10.sinks.k1.hdfs.rollCount = 1000000 a10.sinks.k1.hdfs.rollInterval = 60 a10.sinks.k1.hdfs.round = true a10.sinks.k1.hdfs.roundValue = 10 a10.sinks.k1.hdfs.roundUnit = minute a10.sinks.k1.hdfs.useLocalTimeStamp = true #关联关系 a10.sources.r1.channels = c1 a10.sinks.k1.channel = c1
-
flume mysql hive_flume采集数据实时存储hive两种解决方案
2021-02-01 03:40:05方案一本方案的核心是flume采集数据后,按照hive表的结构,将采集数据输送到对应的地址中,达到数据实时存储的目的,这种实时实际上是一种准实时。假设hadoop集群已经正常启动,hive也已经正常启动,并且hive的文件...说明:本文不仅提供两种方案,还详细的记录了一些相关信息。
方案一
本方案的核心是flume采集数据后,按照hive表的结构,将采集数据输送到对应的地址中,达到数据实时存储的目的,这种实时实际上是一种准实时。
假设hadoop集群已经正常启动,hive也已经正常启动,并且hive的文件地址是/hive/warehouse,然后hive里存在一张由以下建表语句创建的表
create table flume_test(uuid string);
可推断,表flume_test地址在/hive/warehouse/flume_test,下面介绍flume:
flume安装步骤
#下载
cd /opt
mkdir flume
wget http://archive.apache.org/dist/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz
tar xvzf apache-flume-1.6.0-bin.tar.gz
cd apache-flume-1.6.0-bin/conf
cp flume-env.sh.template flume-env.sh
打开flume-env文件,添加java变量
export JAVA_HOME=/usr/java/jdk1.8.0_111
然后添加环境变量,为了一次过,分别在profile和bashrc末尾添加
export FLUME_HOME=/opt/flume/apache-flume-1.6.0-bin
export FLUME_CONF_DIR=$FLUME_HOME/conf
export PATH=$PATH:$FLUME_HOME/bin
然后
source /etc/profile
到此flume安装完毕,下面进行配置,切换到conf文件夹复制flume-conf.properties.template为agent.conf,然后编辑
#定义活跃列表
agent.sources=avroSrc
agent.channels=memChannel
agent.sinks=hdfsSink
#定义source
agent.sources.avroSrc.type=avro
agent.sources.avroSrc.channels=memChannel
agent.sources.avroSrc.bind=0.0.0.0
agent.sources.avroSrc.port=4353
agent.sources.avroSrc.interceptors=timestampinterceptor
agent.sources.avroSrc.interceptors.timestampinterceptor.type=timestamp
agent.sources.avroSrc.interceptors.timestampinterceptor.preserveExisting=false
#定义channel
agent.channels.memChannel.type=memory
agent.channels.memChannel.capacity = 1000
agent.channels.memChannel.transactionCapacity = 100
#定义sink
agent.sinks.hdfsSink.type=hdfs
agent.sinks.hdfsSink.channel=memChannel
#agent.sinks.hdfsSink.hdfs.path=hdfs://hadoop-n:9000/flume/test/%{topic}/%Y%m%d%H
agent.sinks.hdfsSink.hdfs.path=hdfs://hadoop-n:9000/hive/warehouse/flume_test
agent.sinks.hdfsSink.hdfs.filePrefix=stu-flume
agent.sinks.hdfsSink.hdfs.inUsePrefix=inuse-stu-flume
agent.sinks.hdfsSink.hdfs.inUseSuffix=.temp
agent.sinks.hdfsSink.hdfs.rollInterval=0
agent.sinks.hdfsSink.hdfs.rollSize=10240000
agent.sinks.hdfsSink.hdfs.rollCount=0
agent.sinks.hdfsSink.hdfs.idleTimeout=0
agent.sinks.hdfsSink.hdfs.batchSize=100
agent.sinks.hdfsSink.hdfs.minBlockReplicas=1
# agent.sinks.hdfsSink.hdfs.writeFormat = Text
agent.sinks.hdfsSink.hdfs.fileType = DataStream
具体的每一项配置可参照下面这篇博客http://lxw1234.com/archives/2015/10/527.htm,需要警惕的是rollInterval、rollSize、rollCount、idleTimeout这四个属性,如果进行了配置发现不起作用,就要检查一下minBlockReplicas这个属性是否配置,并且值是否是1,下面这个连接是原因http://doc.okbase.net/chiweitree/archive/126197.html
配置完毕后可以启动,启动命令
./flume-ng agent -f ../conf/agent.conf -n agent -c conf -Dflume.monitoring.type=http \-Dflume.monitoring.port=5653 -Dflume.root.logger=DEBUG,console
注意:-n 指的是agent的名称,需要对应到配置文件的第一个值,本启动命令还开启了监控,监控地址http://host:5653/metrics;-f 指的是配置文件的路径及名称。flume的conf修改后不用重启,默认30秒刷新一次,自动装载最新的配置。
flume安装并启动完毕后,编写测试程序。打开eclipse,创建maven项目
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
scc
stu-flume
0.0.1-SNAPSHOT
war
stu-flume
log4j
log4j
1.2.9
org.apache.flume.flume-ng-clients
flume-ng-log4jappender
1.6.0
测试servlet
public class GenerLogServlet extends HttpServlet {
private static final Logger LOGGER = Logger.getLogger(GenerLogServlet.class);
private static final long serialVersionUID = 1L;
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
for (;;) {
LOGGER.info(UUID.randomUUID().toString());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
@Override
protected void doPost(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
this.doGet(request, response);
}
}
log4j.properties
#log4j settings
#log4j.rootLogger=debug, CONSOLE
log4j.logger.scc.stu_flume.GenerLogServlet=debug,GenerLogServlet
#log4j.rootLogger=INFO
log4j.appender.GenerLogServlet=org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.GenerLogServlet.Hostname=10.5.3.100
log4j.appender.GenerLogServlet.Port=4353
log4j.appender.GenerLogServletUnsafeMode=false
启动项目,访问http://localhost:8080/log开始生产数据。需要注意的是,如果flume配置基于时间戳做文件分组(此种情况可以匹配hive根据时间进行分区),那么需要agent.conf中的source一定要配置
agent.sources.avroSrc.interceptors=timestampinterceptor
agent.sources.avroSrc.interceptors.timestampinterceptor.type=timestamp
agent.sources.avroSrc.interceptors.timestampinterceptor.preserveExisting=false
否则flume的sink会报找不到timestamp错误,因为源码org.apache.flume.clients.log4jappender.Log4jAvroHeaders中定义timestamp的key是flume.client.log4j.timestamp而不是timestamp,所以需要手动添加一个timestamp,如果对这个timestamp要求必须是数据生产的时间,可以修改源码或者为source添加拦截器手动配置。
flume具有非常灵活的使用方式,可以自定义source、sink、拦截器、channel选择器等等,适应绝大部分采集、数据缓冲等场景。
观察hadoop目录,发现flume已经按配置将数据移动到相应的hive表目录中,如下图:
打开hive客户端,数据查询命令,发现数据已可被查询!并且针对hive的分区表和桶表flume都可以实现按照hive表数据规则写入,进而达到数据实时插入,至此,方案一结束。
本方案缺点:
由于flume在写入文件的时候,独占正在写入的文件资源,导致hive不能读取正在被写入的文件的内容,也就是说假如每5分钟生成一个文件,那么正在写的文件不会被hive读取到内容,也就意味了hive存在最大5分钟的延迟。而如果把时间变小,那么延迟就会降低,但是哪怕是设置30分钟或1个小时,flume流量不大的情况下,也会生成许多零散的小文件,这点与hive的特长相悖,hive擅长处理大文件,对于零散小文件hive性能会降低很多。
方案二
对比方案一,测试程序、source不变,sink改成hbase-sink,数据实时插入到hbase中,然后在hive建立一张hbase映射表,hive从hbase中读取数据,这样可达到实时插入的效果。由于字数限制,方案二记录在如下博客连接中:
-
经典面试题-Flume采集数据会丢失吗?
2020-03-23 14:37:10Flume采集数据会丢失吗? 不会,Channel存储可以存储在File中,数据传输自身有事务。 -
linux中tail使用方法及flume采集数据不丢失
2020-02-17 15:09:17tail使用方法 tail -Ftest.log 你会看到屏幕不断有内容被打印出来. 这时候中断第一个进程Ctrl-C ...flume采集数据不丢 当我们用flume采集一个实时文件的时候,我们的flume的source通常是这么... -
flume采集数据到hdfs上产生的问题解决
2019-06-26 20:28:23解决Flume采集数据时在HDFS上产生大量小文件的问题 flume指定HDFS类型的Sink时,采集数据至HDFS指定目录,会产生大量小文件。 解决方案: 去掉round时间系列参数,并将rollSize和rollCount置0, 表示不根据临时文件... -
Flume采集数据利器
2020-03-22 19:18:15Apache Flume 是一个分布式、高可靠、高可用的用来收集、聚合、转移不同来源的大量日志数据到中央数据仓库的工具。Flume 可以做离线也可以做实时分析。 二、Flume架构 如图所示: Agent就是Flume的一个部署实例,一... -
Flume采集数据到hdfs,文件开头有乱码
2020-05-28 16:00:58今天遇到这样的一个问题,flume拉取kafka数据,下沉到hdfs中,然后存取到hive中。...后来查询资料说,flume采集数据下沉到hdfs,有默认的文件格式, hdfs.fileType默认为SequenceFile,将其改为DataSt -
Flume采集数据到HDFS中,开头信息有乱码
2018-03-15 10:45:25Flume采集数据,在生成的HDFS文件中,总是有“SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable??H謺NSA???y”信息,在Flume文档中介绍,hdfs.fileType默认为SequenceFile,将其改为... -
日志数据产生 ——flume采集数据——kafka存储数据——flume消费kafka数据——hdfs
2020-01-11 15:35:021. 搭建flume 用来监控日志生成目录,将日志数据sink到kafka 2. kafka 存储数据,方便后续flume消费。另外也可以供spark streaming 消费。 3. 消费flume,消费kafka的数据,然后sink到hdfs 二、步骤 1.启动... -
Apache Flume采集数据简单案例
2018-06-21 15:32:52概述 Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量...一般的采集需求,通过对 flume 的简单配置即可实现。针对特殊场景也具备 良好的自定义扩展能力。因此,flume 可以适用于大部分的日常数... -
flume采集数据报错:EventDeliveryException,导致原因 Batch Expired
2020-05-09 15:48:47flume采集文件数据报错: flume版本1.7 kafka版本0.1.0 求大佬们帮忙解决一下## 标题 -
flume采集数据易出现的bug
2018-08-08 10:30:401.内存不足 修改文件内容 <文件根目录>/bin/flume-ng JAVA_OPTS="-Xms100m ...2.采集kafka数据或者生产kafka数据的的时候默认数据大小是1M,所以使用flume工具导数据如果大于1M,需要添加配置参数 配置文件... -
Flume采集数据到kafka
2019-07-08 10:26:51a1.sources = r1 a1.sinks = k1 a1.channels = c1 #对于source的配置描述 监听..../flume-ng agent -c /usr/local/flume/conf -f /usr/local/flume/conf/dir-hdfs.conf -n a1 -Dflume.root.logger=INFO,console -
flume采集数据到kafka和hive
2017-06-20 17:33:08flume加载数据hive sink;kafka sink -
flume采集数据的时候报异常
2019-03-10 21:28:47WARN - org.apache.flume.sink.hdfs.BucketWriter.callWithTimeout(BucketWriter.java:735)] Unexpected Exception null java.lang.InterruptedException at java.util.concurrent.FutureTask.awaitDone(FutureTas.... -
flume采集数据报错问题解决
2017-09-26 11:28:00在一次实验过程中,使用flume 1.7采集本地的数据到hdfs文件系统时,由于配置文件不合理,导致出错。错误如下: [WARN - org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:611... -
flume采集数据到hdfs配置
2019-03-15 17:24:17用tail命令获取数据,下沉到hdfs 启动命令: bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1 ######## # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 #... -
Flume+Kafka+SparkStreaming实时统计网站日志--3、Flume采集数据到Kafka
2019-11-12 17:28:16Flume采集日志数据到Kafka过程中Flume配置文件写法,Kafka topic创建、消费者查看结果 -
Windows上的flume采集数据传给服务器flume再上传给HDFS
2019-04-29 11:51:16@flume从Windows采集数据传给服务器flume再上传给HDFS 从该博主学习到具体方法配置 Windows flume conf 文件: a1.sources = r1 a1.sinks = k1 a1.channels = c1 Describe/configure the source a1.sources.r1.type =... -
flume采集数据到hdfs
2015-03-03 23:39:33说明:flume1.5,hadoop2.2 1、配置JAVA_HOME和HADOOP_HOME 说明:HADOOP_HOME用于获取flume操作hdfs所需的jar和配置文件,如果不配置,也可以手动拷贝jar包和配置文件 2、解压flume,执行bin目录下的flume-... -
flume采集数据到hdfs性能问题
2015-03-12 12:27:44本人目前遇到flume采集写入hdfs性能等各种问题,大致如下。在10上的xx/xx目录下的数据进行读取 sink到08上的flume 由08上的flume写到07的hdfs上 30多m的文件写了好久。有时候会内存溢出等问题![图片说明]...