
- 外文名
- flume
- 特 点
- 聚合和传输的系统
- 中文名
- 水槽
- 实 质
- 孵化项目
-
2022-03-30 17:57:29
简介
概述
- Flume是Cloudera公司开发的后来贡献给了Apache的一套用于进行日志的收集(collecting)、汇聚(aggregating)和传输(movlng)的机制
- 在大数据中,有超过70%的数据来源是日志 - 日志是大数据的基石
- Flume版本:
- Flume0.X:Flume-og,配置结构复杂,并且对分布式支持不好
- Flume1.X:Flume-ng,配置结构进行简化,并且很好的支持分布式
基本概念
-
Event:事件
-
Flume会将收集到的日志封装成Event对象,一个Event对象就是一条日志
-
Event本质上就说一个Json串,固定包含两个部分headers和body
{ "headers":{}, "body":"" }
-
-
Agent:是Flume流动结构的基本组成,固定的包含至少3个组件
- Source:从数据源采集数据 - conllecting
- Channel:临时存储数据 - aggregating
- Sink:将数据写到目的地 - moving
流动模型
-
单级流动
-
多级流动
-
扇入流动
-
扇出流动
- 复杂流动:需要根据需求将上述流动进行组合,构成了复杂流动
安装和入门
-
进入安装目录下,下载Flume的安装包
-
解压
tar -vxf apache-flume-1.9.0-bin.tar.gz
-
重命名
mv apache-flume-1.9.0-bin.tar.gz flume-1.9.0
-
添加环境变量
vim /etc/profile.d/flumehome.sh #文件中添加 export FLUME_HOME=/home/software/flume-1.9.0 export PATH=$FLUME_HOME$/bin #保存退出,重新生效 source /etc/profile.d/flumehome.sh
-
校验环境变量
flume-ng version
-
进去Flume安装目录的lib目录中,解决连接冲突问题
cd /home/software/flume-1.9.0/lib/ rm -rf guava—11.0.2.jar cp /home/software/hadoop-3.1.3/share/hadoop/common/lib/guava-27.0-jre.jar ./
-
构建数据目录
cd /home/software/flume-1.9.0 mkdir data
-
入门:编写格式文件
vim basic.properties
在文件中添加
# 给Agent起名 # 给Source起名 a1.sources = s1 # 给Channels起名 a1.channels = c1 # 给Sink起名 a1.sinks = k1 # 配置Source a1.sources.s1.type = netcat a1.sources.s1.bind = 0.0.0.0 a1.sources.s1.port = 8090 # 配置Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 # 配置Sink a1.sinks.k1.type = logger # 将Source和Channel进行绑定 a1.sources.s1.channels = c1 # 将Sink和Channel进行绑定 a1.sinks.k1.channel = c1
Source
AVRO Source
-
AVRO Source:采集被AVRO序列化之后的数据,然后将数据反序列化之后进行存储和传输。可以结合AVRO Sink来实现多级、扇入和扇出流动
-
示例
-
格式文件
a1.sources = s1 a1.channels = c1 a1.sinks = k1 # 配置AVRO Source # 类型只能是AVRO a1.sources.s1.type = avro # 监听的主机名/IP a1.sources.s1.bind = hadoop01 # 绑定的端口号 a1.sources.s1.port = 8090 a1.channels.c1.type = memory a1.sinks.k1.type = logger a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
启动Flume
flume-ng agent -n a1 -c $FLUME_HOME/conf -f avrosource.properties -Dflume.root.logger=INFO,console
-
启动Flume的AVRO客户端来发送数据
flume-ng avro-client -H hadoop01 -p 8090 -F a.txt -Dflume.root.logger=INFO,console
-
Exec Source
-
Exec Source:监听指定的Linux命令,然后将命令的执行结果作为日志进行收集
-
案例:监听指定的文件,获取这个文件中新添的数据
-
格式
a1.sources = s1 a1.channels = c1 a1.sinks = k1 # 配置Exec Source # 类型必须是exec a1.sources.s1.type = exec # 监听的命令 a1.sources.s1.command = tail -F /home/software/flume-1.9.0/data/a.txt # 命令类型 a1.sources.s1.shell = /bin/sh -c a1.channels.c1.type = memory a1.sinks.k1.type = logger a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
启动命令
flume-ng agent -n a1 -c $FLUME_HOME/conf -f execsource.properties -Dflume.root.logger=INFO,console
-
Spooling Diretory Source
-
Spooling Directory Source:监听指定的目录,如果目录中产生了新的文件,那么会自动收集新文件中的内容
-
如果不指定,那么被收集完的文件会自动添加一个后缀
.COMPLETED
-
示例
-
格式文件
a1.sources = s1 a1.channels = c1 a1.sinks = k1 # 配置Spooling Directory Source # 类型必须是spooldir a1.sources.s1.type = spooldir # 监听的目录 a1.sources.s1.spoolDir = /home/flumedata a1.channels.c1.type = memory a1.sinks.k1.type = logger a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
启动命令
flume-ng agent -n a1 -c $FLUME_HOME/conf -f spooldirsource.properties -Dflume.root.logger=INFO,console
-
Sequence Generator Source
-
Sequence Generator Source:序列产生器。从0开始不断递增,递增到指定大小。如果不指定,那么会递增到
Long.MAX_VALUE
-
示例
-
格式文件
a1.sources = s1 a1.channels = c1 a1.sinks = k1 # 配置Sequence Generator Source # 类型必须是seq a1.sources.s1.type = seq # 结束值 a1.sources.s1.totalEvents = 200 a1.channels.c1.type = memory a1.sinks.k1.type = logger a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
启动命令
flume-ng agent -n a1 -c $FLUME_HOME/conf -f seqsource.properties -Dflume.root.logger=INFO,console
-
HTTP Source
-
HTTP Source:监听HTTP请求,但是只能监听GET和POST请求。其中GET请求只能用于实验,因此只用这个Source来监听POST请求
-
示例
-
格式文件
a1.sources = s1 a1.channels = c1 a1.sinks = k1 # 配置HTTP Source # 类型必须是http a1.sources.s1.type = http # 监听的端口号 a1.sources.s1.port = 8090 a1.channels.c1.type = memory a1.sinks.k1.type = logger a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
启动命令
flume-ng agent -n a1 -c $FLUME_HOME/conf -f httpsource.properties -Dflume.root.logger=INFO,console
-
发送POST请求
curl -X POST -d '[{"headers":{"class":"bigdata"},"body":"testing"}]' http://hadoop01:8090
-
Custom Source
-
Custom Source:自定义Source,需要定义一个类实现Source接口的子接口之一:
EventDrivenSource
或者PollableSource
- EventDrivenSource:事件驱动Source - 被动型Source。在产生数据之后,才需要定义线程获取数据
- PollableSource:拉去Source - 主动型Source。无论是否有数据,都有自己提供线程去查询是否有数据
-
自定义Source
-
导入POM文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>djh.lb</groupId> <artifactId>flume</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-sdk</artifactId> <version>1.9.0</version> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-configuration</artifactId> <version>1.9.0</version> </dependency> </dependencies> </project>
-
代码
package djh.lb.flume.auth.source; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDrivenSource; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventBuilder; import org.apache.flume.source.AbstractSource; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; // 模拟:Sequence Generator Source public class AuthSource extends AbstractSource implements EventDrivenSource, Configurable { private long step; // 步长 private long end; // 结束值 private ExecutorService es; // 线程池 // 获取指定的属性值 @Override public void configure(Context context) { // 如果没有指定步长,那么默认步长为1 step = context.getLong("step", 1L); // 如果没有指定结束值,那么默认结束值为Long.MAX_VALUE end = context.getLong("end", Long.MAX_VALUE); } // 启动Source @Override public synchronized void start() { // 初始化线程池 es = Executors.newFixedThreadPool(5); // 获取ChannelProcessor ChannelProcessor cp = this.getChannelProcessor(); // 提交任务 es.submit(new Add(step, end, cp)); } // 结束Source @Override public synchronized void stop() { if (es != null) es.shutdown(); } } class Add implements Runnable { private final long step; private final long end; private final ChannelProcessor cp; public Add(long step, long end, ChannelProcessor cp) { this.step = step; this.end = end; this.cp = cp; } @Override public void run() { for (long i = 0; i < end; i += step) { // 将每一次递增的数据作为日志进行收集 // 构建headers Map<String, String> headers = new HashMap<>(); headers.put("time", System.currentTimeMillis() + ""); // 构建body部分 byte[] body = (i + "").getBytes(); // Flume会将收集到的数据封装成Event对象 Event e = EventBuilder.withBody(body, headers); // 需要将数据传递给Channel cp.processEvent(e); } } }
-
生成jar包,放到Flume安装目录的子目录lib下
-
编写格式文件
a1.sources = s1 a1.channels = c1 a1.sinks = k1 # 配置Custom Source # 类型必须是类的全路径名 a1.sources.s1.type = djh.lb.flume.auth.source.AuthSource # 递增步长 a1.sources.s1.step = 5 # 结束值 a1.sources.s1.end = 100 a1.channels.c1.type = memory a1.sinks.k1.type = logger a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
启动命令
flume-ng agent -n a1 -c $FLUME_HOME/conf -f authsource.properties -Dflume.root.logger=INFO,console
-
-
Channel
Memory Channel
- Memory Channel:将收集到的数据临时存储到内存的队列中。在存储的时候,如果队列已满,那么后续的数据会被阻塞
- 如果不指定,那么队列的容量默认为100,即意味着在队列中可以存储100条数据。可以通过capacity这个属性来修改。实际过程中,一般会将这个属性的值改成10w-30w,也可以考虑到50w
transactionCapacity
:决定了Source每一批给Channel的数据量,也决定了Channel每一批给Sink的数据量。如果不指定,这个属性的默认值为100- Memory Channel适合于要求速度而不要求可靠性的场景
File Channel
-
File Channel:将收集到的数据临时存储到磁盘的文件中。在存储的时候,如果不指定,会将数据临时存储到
~/.flume/file-channel.data
中 -
checkpoinDir:记录检查点的目录位置。检查点就说Sink获取的数据的位置,默认是
~/.flume/file-channel/checkpoint
-
File Channel适用于要求可靠性而不要求速度的场景
-
示例
- 格式文件
a1.sources = s1 a1.channels = c1 a1.sinks = k1 a1.sources.s1.type = netcat a1.sources.s1.bind = 0.0.0.0 a1.sources.s1.port = 8090 # 配置File Channel # 类型必须是file a1.channels.c1.type = file # 数据临时存储路径 a1.channels.c1.dataDirs = /home/flumedata/data # 检查点临时存储路径 a1.channels.c1.checkpointDir = /home/flumedata/checkpoint a1.sinks.k1.type = logger a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
启动命令
flume-ng agent -n a1 -c $FLUME_HOME/conf -f filechannel.properties -Dflume.root.logger=INFO,console
其他Channel
- JDBC Channel:将收集到的数据临时存储到数据库中。目前为止,JDBC Channel只支持Derby数据库,由于Derby数据库的特性(微型、单连接),导致实际开发中不使用这个Channel
- Spillable Memory Channel:先考虑将收集到的数据临时存储到内存中,如果内存队列被塞满,那么不会阻塞而是会将数据临时存储到磁盘上。
Sink
HDFS Sink
-
HDFS Sink:将数据最终写到HDFS上。在写出数据的时候,每隔30s滚动一次,在HDFS上生成一个文件,这样会在HDFS上生成大量的小文件。因此实际过程中需要将滚动时间来进行修改,可以通过属性
hdfs.rollInterval
来修改 -
hdfs.rollCount
:写入文件中的数据条数。默认数量为10,如果达到这个条数,那么这个文件也会产生滚动,也会导致产生大量的小文件。如果这个属性设置为0,那么不会根据数据条数进行滚动 -
hdfs.rollSize
:滚动时候的文件大小。默认是1024B,也就意味着在HDFS上,文件达到1KB大小就会产生滚动。如果这个属性设置为0,则表示不会根据文件大小来进行滚动。 -
hdfs.fileType
:HDFS上文件的存储类型,包含:SequenceFile
(序列文件),DataStream
(文本文件),CompressedStream
(压缩文件)。默认值是SequenceFile。如果指定为CompressedStream,那么还需要指定hdfs.codeC
属性的值 -
示例
-
格式文件
a1.sources = s1 a1.channels = c1 a1.sinks = k1 a1.sources.s1.type = netcat a1.sources.s1.bind = hadoop01 a1.sources.s1.port = 8090 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 # 配置HDFS Sink # 类型必须是hdfs a1.sinks.k1.type = hdfs # 文件在HDFS上的存储路径 a1.sinks.k1.hdfs.path = hdfs://hadoop01:9000/flumedata # 滚动间隔时间 a1.sinks.k1.hdfs.rollInterval = 3600 # 文件的滚动大小 a1.sinks.k1.hdfs.rollSize = 134217728 # 数据条数 a1.sinks.k1.hdfs.rollCount = 1000000 # 文件类型 a1.sinks.k1.hdfs.fileType = DataStream a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
启动HDFS
start-dfs.sh
-
启动Flume
flume-ng agent -n a1 -c $FLUME_HOME/conf -f hdfssink.properties -Dflume.root.logger=INFO,console
-
Logger Sink
- Logger Sink:将收集到的数据打印到控制台上。在打印的时候,对中文支持不好。还需要注意的是,Logger Sink为了防止过多的数据占满屏幕,还进行了限制:如果body部分的字节超过16位,超过部分不打印,可以通过属性
maxBytesToLog
来修改
File Roll Sink
-
File Roll Sink:将收集到的数据最终写到本地磁盘上。在写出的时候,每隔30s滚动一次生成一个小文件
-
示例
-
格式文件:
a1.sources = s1 a1.channels = c1 a1.sinks = k1 # 配置Source a1.sources.s1.type = netcat a1.sources.s1.bind = 0.0.0.0 a1.sources.s1.port = 8090 # 配置Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 # 配置File Roll Sink # 类型必须是file_roll a1.sinks.k1.type = file_roll # 文件存储位置 a1.sinks.k1.sink.directory = /home/flumedata # 文件滚动间隔时间 a1.sinks.k1.sink.rollInterval = 3600 # 将Source和Channel进行绑定 a1.sources.s1.channels = c1 # 将Sink和Channel进行绑定 a1.sinks.k1.channel = c1
-
启动命令
flume-ng agent -n a1 -c $FLUME_HOME/conf -f filerollsink.properties -Dflume.root.logger=INFO,console
-
AVRO Sink
-
AVRO Sink:将数据序列化之后写出,结合AVRO Source来实现多级扇入、扇出流动模型
-
多级流动
-
第一个节点
a1.sources = s1 a1.channels = c1 a1.sinks = k1 a1.sources.s1.type = netcat a1.sources.s1.bind = hadoop01 a1.sources.s1.port = 8090 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop02 a1.sinks.k1.port = 8090 a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
第二个节点
a1.sources = s1 a1.channels = c1 a1.sinks = k1 a1.sources.s1.type = avro a1.sources.s1.bind = hadoop02 a1.sources.s1.port = 8090 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop03 a1.sinks.k1.port = 8090 a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
第三个节点
a1.sources = s1 a1.channels = c1 a1.sinks = k1 a1.sources.s1.type = avro a1.sources.s1.bind = hadoop03 a1.sources.s1.port = 8090 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.type = logger a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
按顺序启动hadoop03、hadoop02、hadoop01
flume-ng agent -n a1 -c $FLUME_HOME/conf -f duoji.properties -Dflume.root.logger=INFO,console
-
-
扇入流动
-
第一个和第二个节点
a1.sources = s1 a1.channels = c1 a1.sinks = k1 a1.sources.s1.type = netcat a1.sources.s1.bind = 0.0.0.0 a1.sources.s1.port = 8090 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop03 a1.sinks.k1.port = 8090 a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
第三个节点
a1.sources = s1 a1.channels = c1 a1.sinks = k1 a1.sources.s1.type = avro a1.sources.s1.bind = 0.0.0.0 a1.sources.s1.port = 8090 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.type = logger a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
先启动hadoop01或者hadoop02,最后启动hadoop03
flume-ng agent -n a1 -c $FLUME_HOME/conf -f shanru.properties -Dflume.root.logger=INFO,console
-
-
多级扇出
-
第一个节点
a1.sources = s1 a1.channels = c1 c2 a1.sinks = k1 k2 a1.sources.s1.type = netcat a1.sources.s1.bind = 0.0.0.0 a1.sources.s1.port = 8090 a1.channels.c1.type = memory a1.channels.c2.type = memory a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop02 a1.sinks.k1.port = 8090 a1.sinks.k2.type = avro a1.sinks.k2.hostname = hadoop03 a1.sinks.k2.port = 8090 a1.sources.s1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
-
第二和第三个节点
a1.sources = s1 a1.channels = c1 a1.sinks = k1 a1.sources.s1.type = avro a1.sources.s1.bind = 0.0.0.0 a1.sources.s1.port = 8090 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.type = logger a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
先启动hadoop02或者hadoop03,最后启动hadoop01
flume-ng agent -n a1 -c $FLUME_HOME/conf -f shanchu.properties -Dflume.root.logger=INFO,console
-
Custom Sink
-
Custom Sink:Flume允许用户进行自定义Sink,需要定义一个类实现Sink接口
-
示例
-
代码
package cn.tedu.flume.auth.sink; import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import java.io.FileNotFoundException; import java.io.PrintStream; import java.util.Map; // 模拟:File Roll Sink public class AuthSink extends AbstractSink implements Sink, Configurable { private String path; private PrintStream ps; // 获取属性 @Override public void configure(Context context) { // 获取用户指定的路径 path = context.getString("path"); // 判断用户是否指定了路径 if (path == null || path.equals("")) throw new IllegalArgumentException("路径没有指定!!!"); } // 启动Sink @Override public synchronized void start() { // 初始化打印流 try { ps = new PrintStream(path + "/" + System.currentTimeMillis()); } catch (FileNotFoundException e) { e.printStackTrace(); } } @Override public Status process() { // 获取Channel Channel c = this.getChannel(); // 获取事务 Transaction t = c.getTransaction(); // 开启事务 t.begin(); // 获取数据 Event e; try { while ((e = c.take()) != null) { // 获取headers Map<String, String> headers = e.getHeaders(); // 写出headers部分 ps.println("headers:"); for (Map.Entry<String, String> en : headers.entrySet()) { ps.println("\t" + en.getKey() + "=" + en.getValue()); } // 获取body部分 byte[] body = e.getBody(); // 写出body部分 ps.println("body:"); ps.println("\t" + new String(body)); } // 提交事务 t.commit(); return Status.READY; } catch (Exception ex) { // 回滚事务 t.rollback(); return Status.BACKOFF; } finally { // 关闭事务 t.close(); } } // 关闭Sink @Override public synchronized void stop() { if (ps != null) ps.close(); } }
-
完成之后,打成jar包,放到Flume安装目录的子目录lib下
-
格式文件
a1.sources = s1 a1.channels = c1 a1.sinks = k1 a1.sources.s1.type = http a1.sources.s1.port = 8090 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 # 配置Custom Sink # 类型必须是类的全路径名 a1.sinks.k1.type = cn.tedu.flume.auth.sink.AuthSink # 存储位置 a1.sinks.k1.path = /home/flumedata a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
启动命令
flume-ng agent -n a1 -c $FLUME_HOME/conf -f authsink.properties -Dflume.root.logger=INFO,console
-
发送数据
curl -X POST -d '[{"headers":{"subject":"bigdata","class":"18计科G"},"body":"Welcome"}]' http://hadoop01:8090 curl -X POST -d '[{"headers":{"date":"20220317","time":"11:44"},"body":"testing"}]' http://hadoop01:8090
-
Sink Processor
- Sink Processor:Sink处理器,本质上是一个Sink Group(Sink组),在使用的时候,会将一个或者多个Sink绑定到同一个组中,同一个组中的Sink执行相同的功能
- Sink Processor提供了3种模式
- default:默认模式。在扇出结构中,如果不指定,那么默认使用的就是这个模式。在这个模式下,每一个Sink都对应了一个单独的Sink Group
- failover:奔溃恢复模式。在使用的时候,需要将多个Sink绑定到一个组中,需要给一个Sink来指定一个优先级,只要高优先级的Sink存货,数据就不会发送给低优先级的Sink
- load balancing:负载均衡模式。在使用的时候,也需要将多个Sink绑定到一个组中,需要给Sink指定均衡模式。Flume中,提供了两种均衡模式:round_robin(轮询),random(随机)
Interceptor
概述
- Interceptor:是Source的子组件,也就意味着是配置在Source上的
- 不同于Selecetor,Interceptor可以配置多个,构成拦截器链
Timestamp Interceptor
- Timestamp Interceptor:会在事件的Event部分来添加一个timestamp字段用于标记数据被收集的时间
- Timesamp Interceptor结合HDFS Sink来实现数据的按天存放
Host Interceptor
- Host Interceptor:在事件的headers中添加一个host字段用于标记数据的来源主机
Static Interceptor
Static Interceptor:在事件的headers部分添加指定字段,可以用于标记
UUID Interceptor
UUID Interceptor:在数据的headers部分来添加一个id字段,用于标记数据的唯一性
Search and Replace Interceptor
Search and Replace Interceptor:在使用的时候需要指定一个正则表达式,会将符合正则表达式的数据替换为指定形式
Regex Filtering Interceptor
Regex Filtering Interceptor:在使用的时候需要指定正则表达式,根据excludeEvents的属性的值来确定过滤规则。当excludeEvents属性的值为true的时候,符合正则表达式的值会被过滤掉;当excludeEvents属性值为fales的时候,不符合正则表达式的数据会被过滤掉
Custom Interceptor
Custom Interceptor:Flume支持用于自定义拦截器,在使用的时候需要定义一个类实现Interceptor接口
a1.sources = s1 a1.channels = c1 a1.sinks = k1 a1.sources.s1.type = netcat a1.sources.s1.bind = 0.0.0.0 a1.sources.s1.port = 8090 a1.sources.s1.interceptors = i1 # 自定义Interceptor a1.sources.s1.interceptors.i1.type = cn.tedu.flume.auth.source.AuthInterceptor$B uilder a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 a1.sinks.s1.channels = c1 a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
Flume执行流程
更多相关内容 -
flume
2021-03-25 17:10:52Apache Flume是一个分布式,可靠且可用的系统,用于有效地收集,聚合大量日志数据并将其从许多不同的源移动到集中式数据存储中https://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.5.1/ -
flume:Apache Flume的镜像
2021-02-03 20:07:11欢迎使用Apache Flume! Apache Flume是一种分布式,可靠且可用的服务,用于有效地收集,聚合和移动大量日志数据。 它具有基于流数据流的简单灵活的体系结构。 它具有可调整的可靠性机制以及许多故障转移和恢复机制... -
Flume用法
2021-01-07 14:17:16Flume包含三部分 Source:从哪收集,一般使用:avro(序列化),exec(命令行),spooling(目录),taildir(目录和文件,包含offset,不会数据丢失),kafka Channel:数据存哪里:(memory,kafka,file) Sink:数据输出到... -
rocketmq-flume-master:flume收集日志发送到rocketmq
2021-06-25 03:23:52rocketmq-flume Source&Sink 该项目用于与之间的消息接收和投递。 首先请确定您已经对和有了基本的了解 确保本地maven库中已经存在,或者下载RocketMQ源码自行编译 在rocketmq-flume项目根目录执行mvn clean install... -
Flume使用详解(一)
2021-01-07 13:16:23概述: Flume最早是Cloudera提供的日志收集系统,后贡献给Apache。所以目前是Apache下的项目,...当前Flume有两个版本Flume 0.9X版本的统称Flume-og,老版本的flume需要引入zookeeper集群管理,性能 也较低(单线程工 -
Flume部署和使用
2021-01-07 08:23:24Flume部署和使用 官方文档: http://flume.apache.org/ example: WebServer –> Agent[Source–>Channel–>Sink] –> HDFS 一.简介 Flume是一个分布式,可靠的的框架,它能从许多不同的数据源高效地收集、聚合和移动... -
flume.kafka:基于新 Kafka Producer 的 Flume kafka sink,可配置
2021-06-05 19:56:19基于新 Kafka Producer 的 Flume kafka sink,高性能且可配置。 它依赖于很少的项目/库,只有 Flume 1.5.2 kafka-clients-0.8.2.1 或更高版本,slf4j。 类似于 Flume 1.6 KafkaSink,但这里有一些不同: Flume 1.6 ... -
【Flume】(二)Flume 定义和基础架构
2021-01-20 12:30:43文章目录一、Flume 定义二、Flume 基础架构 一、Flume 定义 Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume 基于流式架构,灵活简单。 为什么选用 Flume ? 二、... -
关于Flume的优化和高可用
2021-01-20 12:23:47文章目录Flume优化一、内存参数优化(减少GC)1)-xmx和-xms设置相同值,避免在 GC 后调整堆大小带来的压力。2)JVM heap(堆内存)设置4G或更高二、channel优化Flume如何保证数据安全(高可用)事务机制Flume解决... -
flume-ng-sql-source:Flume Source从SQL数据库导入数据
2021-05-12 09:57:13flume-ng-sql-source 该项目用于与sql数据库进行通信 当前支持SQL数据库引擎 在最后一次更新之后,该代码已与hibernate集成在一起,因此该技术支持的所有数据库均应正常工作。 编译与包装 $ mvn package 部署方式 ... -
FlumeConfig:可视化 Flume 配置编辑器
2021-06-26 18:24:23#FlumeConfig###A 可视化 Flume 编辑器## 版本:0.1.0 Flume 配置完全用 Javascript 编写并且是自包含的。 它允许您直观地布置 Flume 拓扑,输入源、通道和接收器的属性,并为您创建水槽配置文件。 它可以处理多个... -
logback-flume-appender:Logback追加程序将日志消息转发到Flume代理
2021-05-11 19:03:47logback-flume-appender Logback追加程序将日志消息转发到Flume代理 配置条目 application :应用程序名称,如果未设置,则将从application.name系统属性中推断出来 hostname :主机名,如果未设置,则通过Box主机... -
Flume监听oracle表增量的步骤详解
2020-12-16 06:43:15(2)flume的开源包flume-ng-sql-source-1.4.3.jar 最新的好像是1.5的 小版本记不住了 这个下载地址直接csdn上就有 这两个jar 都拷贝到flume的lib下 (3)flume配置文件 a1.sources = r1 a1.sinks = k1 a1.... -
flume_exporter:普罗米修斯水槽出口商
2021-05-27 04:53:57flume_exporter 普罗米修斯水槽出口商。 要运行它: make build ./flume_exporter [flags] 标志帮助: ./flume_exporter --help 配置:config.yml agents: - name: "flume-agents" enabled: true # ... -
flume-timestamp-filter:水槽时间戳过滤器
2021-06-03 05:46:00水槽时间戳过滤器 ...$ cp /path/to/flume-timestamp-filter/target/flume-timestamp-filtering-interceptor-0.0.jar /usr/lib/flume-ng/plugins.d/flume-timestamp-filter/lib/ 根据Configuration配置flum -
flume+kafka+flink+mysql数据统计
2022-04-20 18:06:26flume+kafka+flink+mysql实现nginx数据统计与分析 -
flume配置文件案例
2020-09-27 23:37:47flume配置文件,文件配了说明,可以拿下来改一改就用。 可以获取端口数据监听或者文件、文件夹内容监听,实时写入hdfs、mysql或者你需要的路径。 -
flume-ftp-source 相关jar包
2020-09-08 16:08:19由于flume官方并未提供ftp,source的支持; 因此想使用ftp文件服务器的资源作为数据的来源就需要自定义ftpsource,根据github:https://github.com/keedio/flume-ftp-source,提示下载相关jar,再此作为记录。 -
hadoop集群配置之————flume安装配置(详细版)
2021-07-23 11:42:36hadoop集群配置之————flume安装配置(详细版) -
flume包,用于数据的采集
2019-01-12 16:50:05flume的包。flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、... -
FlumeKafkaSink:Flume-ng Sink 插件生成到 Kafka
2021-06-21 18:41:30# example.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type = netcat ... -
Flume+Kafka+HBase实例
2021-01-11 12:27:11结合电信客服项目,将数据存储模块独立成项目,欢迎下载 -
【Flume】(三)Flume 事务、拓扑结构和Flume Agent 内部原理
2021-01-07 08:09:09文章目录一、Flume 事务二、Flume Agent 内部原理三、Flume 拓扑结构1、简单串联2、复制和多路复用3、负载均衡和故障转移4、聚合 一、Flume 事务 二、Flume Agent 内部原理 重要组件: 1)ChannelSelector Channel... -
Apache-flume-1.7.0-bin.tar.gz
2021-08-11 15:41:55apache旗下组件flume,用于数据的采集,主要包括数据的source、channel以及sink,提供1.7.0稳定版本 -
Flume-InfluxDB-Sink:Flume Sink与最新的InfluxDB版本兼容
2021-05-18 13:33:05要部署它,请在flume类路径中复制flume-influxdb-sink-0.0.2.jar及其依赖项。 一个胖罐子,包括maven在build中的所有依赖项,因此也可以将其复制。 配置 这是示例接收器配置: agent.sinks.influx.type = ... -
flume-sqs-source:Flume Amazon SQS 源插件
2021-05-31 05:56:02Flume NG SQS 插件 该项目提供了一个源插件,用于从 Amazon 的简单队列服务 ( ) 中提取消息,这是一个快速、可靠、可扩展且完全托管的基于云的消息队列系统。 安装 首先,克隆存储库并构建包(需要 Maven) git ... -
flume+hdfs所需jar.rar
2021-03-25 17:39:44flume1.9.0+hdfs3.2.2相关jar -
flume-interceptor-1.0-SNAPSHOT.jar
2020-08-16 00:31:29注意:flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-...