精华内容
下载资源
问答
  • Flume实战

    2021-06-24 09:56:08
    一、Flume 简介及基本使用 二、Linux 环境下 Flume 的安装部署 三、Flume 整合 Kafka

     官网地址:http://flume.apache.org/

    文档查看地址:http://flume.apache.org/FlumeUserGuide.html

    下载地址:http://archive.apache.org/dist/flume/

    目录

    一、Flume 简介及基本使用

    1.1.Flume简介

    1.2.Flume架构和基本概念   

    1.2.1.基本架构

    1.2.2 基本概念

    1.2.3.组件种类

    1.3.Flume拓扑结构

    1.3.1.multi-agent flow

    1.3.2.Consolidation

    1.3.3.Multiplexing the flow

    1.4.Flume Agent内部原理

    二、Flume 的安装部署

    2.1.下载并解压

    2.2.配置环境变量

    2.3.验证

    三、Flume的配置格式及使用案例

    3.1.Flume配置格式

    3.2.使用案例

    3.2.1.案例一

    3.2.2.案例二

    3.2.3.案例三

    3.2.4.案例四

    四、Flume 整合 Kafka

    4.1 背景

    为什么要使用 Flume + Kafka?

    4.2 整合流程

    4.2.1 启动Zookeeper和Kafka

    4.2.2 创建主题

    4.2.3 启动kafka消费者

    4.2.4 配置Flume

    4.2.5 启动Flume

    4.2.6 测试


    一、Flume 简介及基本使用

    1.1.Flume简介

    Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

    Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构,灵活简单。

    1.2.Flume架构和基本概念   

     Flume 的基本架构图:

    1.2.1.基本架构

    外部数据源以特定格式向 Flume 发送 events (事件),当 source 接收到 events 时,它将其存储到一个或多个 channelchannel 会一直保存 events 直到它被 sink 所消费。sink 的主要功能从 channel 中读取 events,并将其存入外部存储系统或转发到下一个 source,成功后再从 channel 中移除 events

    1.2.2 基本概念

    1.2.2.1.Agent

    Agent是一个JVM进程,它以事件的形式将数据从源头送至目的,Flume数据传输的基本单元

    Agent主要有3个部分组成,Source、Channel、Sink。

    1.2.2.2.Source

    Source是负责接收外部数据到Flume Agent组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。

    1.2.2.3.Channel

    Channel是位于Source和Sink之间的缓冲区。因此,Channel允许Source和Sink运作在不同的速率上。Channel是线程安全的,可以同时处理几个Source的写入操作和几个Sink的读取操作。

    Flume自带两种Channel:Memory Channel和File Channel。

    • Memory Channel : 使用内存,优点是速度快,但数据可能会丢失 (程序死亡、机器宕机或者重启);
    • File Channel : 使用持久化的文件系统,优点是能保证数据不丢失,但是速度慢。

    1.2.2.4.Sink

    Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent。

    Sink是完全事务的。在从Channel批量删除数据之前,每个Sink用Channel启动一个事务。批量事件一旦成功写出到存储系统或下一个Flume Agent,Sink就利用Channel提交事务。事务一旦被提交,该Channel从自己的内部缓冲区删除事件。

    Sink组件目的地包括hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。

    1.2.2.5.Event

    传输单元,Flume数据传输的基本单元,以事件的形式将数据从源头送至目的地。一个 Event 由标题和正文组成:前者是键/值映射,后者是任意字节数组。

    1.2.3.组件种类

    Flume 中的每一个组件都提供了丰富的类型,适用于不同场景:

    • Source 类型 :内置了几十种类型,如 Avro SourceThrift SourceKafka SourceJMS Source

    • Sink 类型 :HDFS SinkHive SinkHBaseSinksAvro Sink 等;

    • Channel 类型 :Memory ChannelJDBC ChannelKafka ChannelFile Channel 等。

    对于 Flume 的使用,除非有特别的需求,否则通过组合内置的各种类型的 Source,Sink 和 Channel 就能满足大多数的需求。在 Flume 官网 上对所有类型组件的配置参数均以表格的方式做了详尽的介绍,并附有配置样例;同时不同版本的参数可能略有所不同,所以使用时建议选取官网对应版本的 User Guide 作为主要参考资料。

    1.3.Flume拓扑结构

    Flume 支持多种架构模式

    1.3.1.multi-agent flow

    Flume 支持跨越多个 Agent 的数据传递,这要求前一个 Agent 的 Sink 和后一个 Agent 的 Source 都必须是 Avro 类型,前一个Sink 指向后一个 Source 所在主机名 (或 IP 地址) 和端口

    1.3.2.Consolidation

    负载均衡:

    日志收集中常常存在大量的客户端(比如分布式 web 服务),Flume 支持使用多个 Agent 分别收集日志,然后通过一个或者多个 Agent 聚合后再存储到文件系统中。

    1.3.3.Multiplexing the flow

    Flume 支持从一个 Source 向多个 Channel,也就是向多个 Sink 传递事件,这个操作称之为 Fan Out(扇出)。默认情况下 Fan Out 是向所有的 Channel 复制 Event,即所有 Channel 收到的数据都是相同的。同时 Flume 也支持在 Source 上自定义一个复用选择器 (multiplexing selector) 来实现自定义的路由规则。

    1.4.Flume Agent内部原理

    二、Flume 的安装部署

    2.1.下载并解压

    1)将apache-flume-1.7.0-bin.tar.gz上传到linux的/opt/software目录下

    2)解压apache-flume-1.7.0-bin.tar.gz到/opt/module/目录下

    # tar -zxf apache-flume-1.7.0-bin.tar.gz -C /opt/module/

    3)修改apache-flume-1.7.0-bin的名称为flume

    # mv apache-flume-1.7.0-bin flume
    
    #或创建软链接
    # ln -s apache-flume-1.7.0-bin flume

    2.2.配置环境变量

    # vim /etc/profile
    

    添加环境变量:

    export FLUME_HOME=/opt/module/flume
    export PATH=$FLUME_HOME/bin:$PATH

    使环境变量生效:

    source /etc/profile

    将flume/conf下的flume-env.sh.template文件修改为flume-env.sh,并配置flume-env.sh文件

    # cp flume-env.sh.template flume-env.sh
    
    # vi flume-env.sh
    
    修改 flume-env.sh,指定 JDK 的安装路径:
    
    # Enviroment variables can be set here.
    export JAVA_HOME=/opt/module/jdk1.8.0_144

    2.3.验证

    由于已经将 Flume 的 bin 目录配置到环境变量,直接使用以下命令验证是否配置成功:

    # flume-ng version
    

    出现对应的版本信息则代表配置成功。

    三、Flume的配置格式及使用案例

    3.1.Flume配置格式

    Flume 配置通常需要以下两个步骤:

    1. 分别定义好 Agent 的 Sources,Sinks,Channels,然后将 Sources 和 Sinks 与通道进行绑定。需要注意的是一个 Source 可以配置多个 Channel,但一个 Sink 只能配置一个 Channel。基本格式如下:
      <Agent>.sources = <Source>
      <Agent>.sinks = <Sink>
      <Agent>.channels = <Channel1> <Channel2>
      
      # set channel for source
      <Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...
      
      # set channel for sink
      <Agent>.sinks.<Sink>.channel = <Channel1>
      

    3.2.使用案例

    介绍几个 Flume 的使用案例:

    • 案例一:使用 Flume 监控端口数据,将监听的数据实时显示在控制台。
    • 案例二:使用 Flume 监听文件内容变动,将新增加的内容输出到控制台和HDFS。
    • 案例三:使用 Flume 监听指定目录,将目录下新增加的文件存储到 HDFS。
    • 案例四:使用 Avro 将本服务器收集到的日志数据发送到另外一台服务器。

    3.2.1.案例一

    需求: Flume监控本机6666端口,然后通过telnet/netcat工具向本机6666端口发送消息,最后Flume将监听的数据实时显示在控制台。

    实现: 主要使用 netcat Source 配合 logger Sink 命令实现。

    1. 配置

    注:配置文件来源于官方手册http://flume.apache.org/FlumeUserGuide.html

    创建Flume Agent配置文件flume-netcat-log.conf

    在flume目录下创建job文件夹并进入job文件夹。

    [hadoop@hadoop01 flume]$ mkdir job
    
    [hadoop@hadoop01 flume]$ cd job/

    在job文件夹下创建Flume Agent配置文件flume-netcat-log.conf。

    [hadoop@hadoop01 job]$ touch flume-netcat-log.conf

    在flume-netcat-log.conf文件中添加如下内容。

    [hadoop@hadoop01 job]$ vim flume-netcat-log.conf

    添加内容如下:

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 6666
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    2. 启动

    先开启flume监听端口

    [hadoop@hadoop01 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-log.conf -Dflume.root.logger=INFO,console

    参数说明:

    --conf conf/  :表示配置文件存储在conf/目录

    --name a1 :表示给agent起名为a1

    --conf-file job/flume-telnet.conf :flume本次启动读取的配置文件是在job文件夹下的flume-telnet.conf文件。

    -Dflume.root.logger==INFO,console :-D表示flume运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别。日志级别包括:log、info、warn、error。

    使用netcat工具向本机的44444端口发送内容

    nc localhost 6666

    3. 测试

    在Flume监听页面观察接收数据情况

    3.2.2.案例二

    需求: 监听文件内容变动,将新增加的内容输出到控制台和HDFS。

    实现: 主要使用 Exec Source 配合 tail 命令实现。

    输出到控制台:

    1. 配置

    新建配置文件 flume-file-log.conf,其内容如下:

    #指定agent的sources,sinks,channels
    a1.sources = s1  
    a1.sinks = k1  
    a1.channels = c1  
       
    #配置sources属性
    a1.sources.s1.type = exec
    a1.sources.s1.command = tail -F /tmp/log.txt
    a1.sources.s1.shell = /bin/bash -c
    
    #将sources与channels进行绑定
    a1.sources.s1.channels = c1
       
    #配置sink 
    a1.sinks.k1.type = logger
    
    #将sinks与channels进行绑定  
    a1.sinks.k1.channel = c1  
       
    #配置channel类型
    a1.channels.c1.type = memory
    

    2. 启动

    flume-ng agent \
    --conf conf \
    --conf-file /usr/app/flume/job/flume-file-log.conf \
    --name a1 \
    -Dflume.root.logger=INFO,console
    

    3. 测试

    向文件中追加数据:

    控制台的显示:

    输出到HDFS:

    Flume要想将数据输出到HDFS,必须持有Hadoop相关jar包

    将commons-configuration-1.6.jar、

    hadoop-auth-2.7.2.jar、

    hadoop-common-2.7.2.jar、

    hadoop-hdfs-2.7.2.jar、

    commons-io-2.4.jar、

    htrace-core-3.1.0-incubating.jar

    拷贝到/opt/module/flume/lib文件夹下。

    创建flume-file-hdfs.conf文件

    创建文件

    [hadoop@hadoop01 job]$ touch flume-file-hdfs.conf

    注:要想读取Linux系统中的文件,就得按照Linux命令的规则执行命令。由于Hive日志在Linux系统中所以读取文件的类型选择:exec即execute执行的意思。表示执行Linux命令来读取文件。

    1. 配置

    [hadoop@hadoop01 job]$ vim flume-file-hdfs.conf

    添加如下内容:

    # Name the components on this agent
    a2.sources = r2
    a2.sinks = k2
    a2.channels = c2
    
    # Describe/configure the source
    a2.sources.r2.type = exec
    a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
    a2.sources.r2.shell = /bin/bash -c
    
    # Describe the sink
    a2.sinks.k2.type = hdfs
    a2.sinks.k2.hdfs.path = hdfs://hadoop01:9000/flume/%Y%m%d/%H
    #上传文件的前缀
    a2.sinks.k2.hdfs.filePrefix = logs-
    #是否按照时间滚动文件夹
    a2.sinks.k2.hdfs.round = true
    #多少时间单位创建一个新的文件夹
    a2.sinks.k2.hdfs.roundValue = 1
    #重新定义时间单位
    a2.sinks.k2.hdfs.roundUnit = hour
    #是否使用本地时间戳
    a2.sinks.k2.hdfs.useLocalTimeStamp = true
    #积攒多少个Event才flush到HDFS一次
    a2.sinks.k2.hdfs.batchSize = 1000
    #设置文件类型,可支持压缩
    a2.sinks.k2.hdfs.fileType = DataStream
    #多久生成一个新的文件
    a2.sinks.k2.hdfs.rollInterval = 600
    #设置每个文件的滚动大小
    a2.sinks.k2.hdfs.rollSize = 134217700
    #文件的滚动与Event数量无关
    a2.sinks.k2.hdfs.rollCount = 0
    #最小冗余数
    a2.sinks.k2.hdfs.minBlockReplicas = 1
    
    # Use a channel which buffers events in memory
    a2.channels.c2.type = memory
    a2.channels.c2.capacity = 1000
    a2.channels.c2.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a2.sources.r2.channels = c2
    a2.sinks.k2.channel = c2

    2.启动

    [hadoop@hadoop01 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf

    3. 测试

    在HDFS上查看文件是否存在。

    3.2.3.案例三

    需求: 监听指定目录,将目录下新增加的文件存储到 HDFS。

    实现:使用 Spooling Directory Source 和 HDFS Sink

    1. 配置

    [hadoop@hadoop01 job]$ vim flume-spooldir-hdfs.conf
    #指定agent的sources,sinks,channels
    a1.sources = s1  
    a1.sinks = k1  
    a1.channels = c1  
       
    #配置sources属性
    a1.sources.s1.type =spooldir  
    a1.sources.s1.spoolDir =/tmp/logs
    a1.sources.s1.basenameHeader = true
    a1.sources.s1.basenameHeaderKey = fileName 
    
    #配置channel类型
    a1.channels.c1.type = memory
       
    #配置sink 
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H/
    a1.sinks.k1.hdfs.filePrefix = %{fileName}
    #生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
    a1.sinks.k1.hdfs.fileType = DataStream  
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    
    #将sources与channels进行绑定  
    a1.sources.s1.channels =c1 
    #将sinks与channels进行绑定  
    a1.sinks.k1.channel = c1
       
    

    2. 启动

    flume-ng agent \
    --conf conf \
    --conf-file /usr/app/flume/job/flume-spooldir-hdfs.conf \
    --name a1 -Dflume.root.logger=INFO,console
    

    3. 测试

    拷贝任意文件到监听目录下,可以从日志看到文件上传到 HDFS 的路径:

    # cp log.txt logs/

    查看HDFS上的内容与日志文件是否一致

    3.2.4.案例四

    需求: 将本服务器收集到的数据发送到另外一台服务器。

    实现:使用 avro sources 和 avro Sink 实现。

    1. 配置日志收集Flume

    新建配置 flume-netcat-memory-avro.conf,监听文件内容变化,然后将新的文件内容通过 avro sink 发送到 hadoop02 这台服务器的 8888 端口:

    #指定agent的sources,sinks,channels
    a1.sources = s1
    a1.sinks = k1
    a1.channels = c1
    
    #配置sources属性
    a1.sources.s1.type = exec
    a1.sources.s1.command = tail -F /tmp/log.txt
    a1.sources.s1.shell = /bin/bash -c
    a1.sources.s1.channels = c1
    
    #配置sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = hadoop02
    a1.sinks.k1.port = 8888
    a1.sinks.k1.batch-size = 1
    a1.sinks.k1.channel = c1
    
    #配置channel类型
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    

     2.配置日志聚合Flume

    使用 avro source 监听 hadoop02 服务器的 8888 端口,将获取到内容输出到控制台:

    #指定agent的sources,sinks,channels
    a2.sources = s2
    a2.sinks = k2
    a2.channels = c2
    
    #配置sources属性
    a2.sources.s2.type = avro
    a2.sources.s2.bind = hadoop02
    a2.sources.s2.port = 8888
    
    #将sources与channels进行绑定
    a2.sources.s2.channels = c2
    
    #配置sink
    a2.sinks.k2.type = logger
    
    #将sinks与channels进行绑定
    a2.sinks.k2.channel = c2
    
    #配置channel类型
    a2.channels.c2.type = memory
    a2.channels.c2.capacity = 1000
    a2.channels.c2.transactionCapacity = 100
    

    3. 启动

    在hadoop02上启动日志聚集 Flume:

    flume-ng agent \
    --conf conf \
    --conf-file /usr/app/flume/job/flume-netcat-memory-avro.conf \
    --name a2 -Dflume.root.logger=INFO,console

    在hadoop01上启动日志收集 Flume:

    flume-ng agent \
    --conf conf \
    --conf-file /usr/app/flume/job/flume-netcat-memory-avro.conf \
    --name a1 -Dflume.root.logger=INFO,console

    这里建议按以上顺序启动,原因是 avro.source 会先与端口进行绑定,这样 avro sink 连接时才不会报无法连接的异常。但是即使不按顺序启动也是没关系的,sink 会一直重试,直至建立好连接。

    4.测试

    向文件 tmp/log.txt 中追加内容:

    可以看到已经从 8888 端口监听到内容,并成功输出到控制台:

    四、Flume 整合 Kafka

    4.1 背景

    为什么要使用 Flume + Kafka?

    以实时流处理项目为例,由于采集的数据量可能存在峰值和峰谷,假设是一个电商项目,那么峰值通常出现在秒杀时,这时如果直接将 Flume 聚合后的数据输入到 Storm 等分布式计算框架中,可能就会超过集群的处理能力,这时采用 Kafka 就可以起到削峰的作用。Kafka 天生为大数据场景而设计,具有高吞吐的特性,能很好地抗住峰值数据的冲击。

    4.2 整合流程

    Flume 发送数据到 Kafka 上主要是通过 KafkaSink 来实现的,主要步骤如下:

    4.2.1 启动Zookeeper和Kafka

    这里启动一个单节点的 Kafka 作为测试:

    # 启动Zookeeper
    zkServer.sh start
    # 启动kafka
    bin/kafka-server-start.sh config/server.properties

    4.2.2 创建主题

    创建一个主题 flume-kafka,之后 Flume 收集到的数据都会发到这个主题上:

    # 创建主题
    bin/kafka-topics.sh --create \
    --zookeeper hadoop001:2181 \
    --replication-factor 1   \
    --partitions 1 --topic flume-kafka
    # 查看创建的主题
    bin/kafka-topics.sh --zookeeper hadoop001:2181 --list

    4.2.3 启动kafka消费者

    启动一个消费者,监听我们刚才创建的 flume-kafka 主题:

    bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic flume-kafka

    4.2.4 配置Flume

    新建配置文件 exec-memory-kafka.properties,文件内容如下。这里我们监听一个名为 kafka.log 的文件,当文件内容有变化时,将新增加的内容发送到 Kafka 的 flume-kafka 主题上。

    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1                                                                                         
    a1.sources.s1.type=exec
    a1.sources.s1.command=tail -F /tmp/kafka.log
    a1.sources.s1.channels=c1 
    #设置Kafka接收器
    a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
    #设置Kafka地址
    a1.sinks.k1.brokerList=hadoop001:9092
    #设置发送到Kafka上的主题
    a1.sinks.k1.topic=flume-kafka
    #设置序列化方式
    a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
    a1.sinks.k1.channel=c1     
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=10000
    a1.channels.c1.transactionCapacity=100

    4.2.5 启动Flume

    flume-ng agent \
    --conf conf \
    --conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/exec-memory-kafka.properties \
    --name a1 -Dflume.root.logger=INFO,console

    4.2.6 测试

    向监听的 /tmp/kafka.log 文件中追加内容,查看 Kafka 消费者的输出:

    可以看到 flume-kafka 主题的消费端已经收到了对应的消息:

    展开全文
  • flume实战

    2019-09-14 22:11:40
    [root@cloudera1 flume-1.7.0]# bin/flume-ng agent -c conf -f conf/avro-hdfs.conf -n a1 -Dflume.root.logger=INFO,console 3.2 在hadoop02上启动一个agent服务 [root@cloudera2 conf]# vim tail-avro.conf...

    1.采集目录到HDFS

    采集需求:某服务器的某特定目录下,会不断产生新的文件,每当有新文件出现,就需要把文件采集到HDFS中去

    根据需求,首先定义以下3大要素

    l  采集源,即source——监控文件目录 :  spooldir

    l  下沉目标,即sink——HDFS文件系统  :  hdfs sink

    l  source和sink之间的传递通道——channel,可用file channel 也可以用内存channel

     

    配置文件编写:

    agent1.sources = source1
    agent1.sinks = sink1
    agent1.channels = channel1
    
    agent1.sources.source1.type = spooldir
    agent1.sources.source1.spoolDir = /logs
    agent1.sources.source1.fileHeader = false
    
    agent1.sources.source1.interceptors = i1
    agent1.sources.source1.interceptors.i1.type = host
    agent1.sources.source1.interceptors.i1.hostHeader = hostname
    agent1.sources.source1.deserializer.outputCharset=UTF-8
    agent1.sources.source1.deserializer.outputCharset=ISO-8859-1 #读取文件编码方式
    agent1.sources.source1.ignorePattern = ^(.)*\\.tmp$  #文件没上传完成不采集
    
    agent1.sinks.sink1.type = hdfs
    agent1.sinks.sink1.hdfs.path =hdfs://10.1.20.174:8020/web_log
    agent1.sinks.sink1.hdfs.filePrefix = log_
    agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
    agent1.sinks.sink1.hdfs.batchSize= 100
    agent1.sinks.sink1.hdfs.fileType = DataStream
    agent1.sinks.sink1.hdfs.writeFormat =Text
    agent1.sinks.sink1.hdfs.rollSize = 102400
    agent1.sinks.sink1.hdfs.rollCount = 1000000
    agent1.sinks.sink1.hdfs.rollInterval = 60
    #agent1.sinks.sink1.hdfs.round = true
    #agent1.sinks.sink1.hdfs.roundValue = 10
    #agent1.sinks.sink1.hdfs.roundUnit = minute
    agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
    agent1.sinks.sink1.hdfs.callTimeout=30000
    # Use a channel which buffers events in memory
    agent1.channels.channel1.type = memory
    agent1.channels.channel1.keep-alive = 120
    agent1.channels.channel1.capacity = 500000
    agent1.channels.channel1.transactionCapacity = 600
    
    # Bind the source and sink to the channel
    agent1.sources.source1.channels = channel1
    agent1.sinks.sink1.channel = channel1
    

    启动命令(在flume根目录启动)

    bin/flume-ng agent -c ./conf -f ./conf/spooldir-hdfs.conf -n agent1 -Dflume.root.logger=INFO,console

    2.采集文件到HDFS

    采集需求:比如业务系统使用log4j生成的日志,日志内容不断增加,需要把追加到日志文件中的数据实时采集到hdfs

     

    根据需求,首先定义以下3大要素

    l  采集源,即source——监控文件内容更新 :  exec  ‘tail -F file’

    l  下沉目标,即sink——HDFS文件系统  :  hdfs sink

    l  Source和sink之间的传递通道——channel,可用file channel 也可以用 内存channel

     

    配置文件编写:

    agent1.sources = source1
    agent1.sinks = sink1
    agent1.channels = channel1
    
    # Describe/configure tail -F source1
    agent1.sources.source1.type = exec
    agent1.sources.source1.command = tail -F /logs/server.log
    agent1.sources.source1.channels = channel1
    
    #configure host for source
    agent1.sources.source1.interceptors = i1
    agent1.sources.source1.interceptors.i1.type = host
    agent1.sources.source1.interceptors.i1.hostHeader = hostname
    
    # Describe sink1
    agent1.sinks.sink1.type = hdfs
    #a1.sinks.k1.channel = c1
    agent1.sinks.sink1.hdfs.path =hdfs://10.1.20.174:8020/web_log
    agent1.sinks.sink1.hdfs.filePrefix = server.log
    agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
    agent1.sinks.sink1.hdfs.batchSize= 100
    agent1.sinks.sink1.hdfs.fileType = DataStream
    agent1.sinks.sink1.hdfs.writeFormat =Text
    agent1.sinks.sink1.hdfs.rollSize = 102400
    agent1.sinks.sink1.hdfs.rollCount = 1000000
    agent1.sinks.sink1.hdfs.rollInterval = 60
    #agent1.sinks.sink1.hdfs.round = true
    #agent1.sinks.sink1.hdfs.roundValue = 10
    #agent1.sinks.sink1.hdfs.roundUnit = minute
    agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
    # Use a channel which buffers events in memory
    agent1.channels.channel1.type = memory
    agent1.channels.channel1.keep-alive = 120
    agent1.channels.channel1.capacity = 500000
    agent1.channels.channel1.transactionCapacity = 600
    
    # Bind the source and sink to the channel
    agent1.sources.source1.channels = channel1
    agent1.sinks.sink1.channel = channel1
    

    启动命令(在flume根目录)

    bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n agent1 -Dflume.root.logger=INFO,console

    3.综合实例

    需求:将hadoop02采集到的文件发送给hadoop01,并由hadoop01保存到hdfs

    步骤:

    1.编写javaee程序,配置log4j 和对应的日志格式

    2.将应用程序打包发布到hadoop02服务器(JAVAEE服务器)

    3. 启动flume

    3.1在hadoop01(hdfs)上启动一个flume的agent(avro-hdfs.conf)

    [root@cloudera1 conf]# vim tail-hdfs.conf

    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = avro
    a1.sources.r1.channels = c1
    a1.sources.r1.bind = 10.1.20.174
    a1.sources.r1.port = 4141
    
    # Describe the sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.channel = c1
    a1.sinks.k1.hdfs.path = /web_log
    a1.sinks.k1.hdfs.filePrefix = server.log
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 10
    a1.sinks.k1.hdfs.roundUnit = minute
    a1.sinks.k1.hdfs.rollInterval = 3
    a1.sinks.k1.hdfs.rollSize = 20
    a1.sinks.k1.hdfs.rollCount = 5
    a1.sinks.k1.hdfs.batchSize = 1
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    a1.sinks.k1.hdfs.fileType = DataStream
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

    启动:

    [root@cloudera1 flume-1.7.0]# bin/flume-ng agent -c conf -f conf/avro-hdfs.conf -n a1 -Dflume.root.logger=INFO,console

    3.2 在hadoop02上启动一个agent服务

    [root@cloudera2 conf]# vim tail-avro.conf

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /logs/server.log
    a1.sources.r1.channels = c1
    
    # Describe the sink
    a1.sinks = k1
    a1.sinks.k1.type = avro
    a1.sinks.k1.channel = c1
    a1.sinks.k1.hostname = 10.1.20.174
    a1.sinks.k1.port = 4141
    a1.sinks.k1.batch-size = 2
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

    启动:

    root@cloudera2 flume-1.7.0]# bin/flume-ng agent -c conf -f conf/tail-avro.conf -n a1 -Dflume.root.logger=INFO,console
    
    

    4.打包web程序,发布

    package com.bocai;
    
    import java.io.IOException;
    
    import javax.servlet.ServletException;
    import javax.servlet.annotation.WebServlet;
    import javax.servlet.http.HttpServlet;
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    
    import org.apache.log4j.Logger;
    
    @WebServlet("/item")
    public class ItemServlet extends HttpServlet{
    
    	private static final long serialVersionUID = -586155548844563441L;
    	
    	public static Logger logger = Logger.getLogger(ItemServlet.class);
    
    	@Override
    	protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
    		
    //		Date date = new Date();
    //		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    		logger.info(req.getRemoteAddr() + "\t" +req.getRequestURL()+"\t"+req.getParameter("id") + "\t" +"item");
    		resp.getWriter().print("您访问了####系统 ");
    	}
    
    	@Override
    	protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
    		
    	}
    }
    

    log4j.properties

    log4j.rootLogger=INFO,Console,DailyRollingFile
    #Console
    log4j.appender.Console=org.apache.log4j.ConsoleAppender
    log4j.appender.Console.layout=org.apache.log4j.PatternLayout
    log4j.appender.Console.layout.ConversionPattern=%-d{yyyy-MM-dd HH\:mm\:ss}	%m	%n
    
    log4j.appender.DailyRollingFile=org.apache.log4j.DailyRollingFileAppender
    log4j.appender.DailyRollingFile.layout=org.apache.log4j.PatternLayout
    log4j.appender.DailyRollingFile.layout.ConversionPattern=%-d{yyyy-MM-dd HH\:mm\:ss}	%m	%n
    log4j.appender.DailyRollingFile.Append=true
    log4j.appender.DailyRollingFile.DatePattern ='_'yyyy-MM-dd'.log'
    log4j.appender.DailyRollingFile.File=/logs/server.log
    
    
    

     

    转载于:https://my.oschina.net/u/2954291/blog/876891

    展开全文
  • Flume实战记录-目录

    2020-02-25 21:54:25
    目录 Flume实战记录(1)-安装部署:https://blog.csdn.net/weixin_39565597/article/details/104506443
    展开全文
  • 大数据-Flume实战案例

    2019-12-24 16:11:06
    2. Flume 实战案例 案例:使用网络telent命令向一台机器发送一些网络数据,然后通过flume采集网络端口数据 2.1. Flume 的安装部署 Step 1: 下载解压修改配置文件 下载地址: ...

    2. Flume 实战案例

    案例:使用网络telent命令向一台机器发送一些网络数据,然后通过flume采集网络端口数据
    在这里插入图片描述

    2.1. Flume 的安装部署

    Step 1: 下载解压修改配置文件

    下载地址:
    http://archive.apache.org/dist/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz

    Flume的安装非常简单,只需要解压即可,当然,前提是已有hadoop环境
    上传安装包到数据源所在节点上

    这里我们采用在第三台机器来进行安装

    cd /export/softwares/ 
    tar -zxvf apache-flume-1.8.0-bin.tar.gz -C ../servers/ 
    cd /export/servers/apache-flume-1.8.0-bin/conf 
    cp flume-env.sh.template flume-env.sh 
    vim flume-env.sh 
    export JAVA_HOME=/export/servers/jdk1.8.0_141
    

    Step 2: 开发配置文件

    根据数据采集的需求配置采集方案,描述在配置文件中(文件名可任意自定义)

    配置我们的网络收集的配置文件
    在flume的conf目录下新建一个配置文件(采集方案)

    vim /export/servers/apache-flume-1.8.0-bin/conf/netcat-logger.conf
    
    # 定义这个agent中各组件的名字 
    a1.sources = r1 a1.sinks = k1 a1.channels = c1 
    # 描述和配置source组件:
    r1 a1.sources.r1.type = netcat a1.sources.r1.bind = 192.168.174. a1.sources.r1.port = 44444 
    # 描述和配置sink组件:
    k1 a1.sinks.k1.type = logger 
    # 描述和配置channel组件,此处使用是内存缓存的方式 
    a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 
    # 描述和配置source channel sink之间的连接关系 
    a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
    

    Step 3: 启动配置文件

    指定采集方案配置文件,在相应的节点上启动flume agent

    先用一个最简单的例子来测试一下程序环境是否正常 启动agent去采集数据

    bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1 -Dflume.root.logger=INFO **
    
    • -c conf 指定flume自身的配置文件所在目录
    • -f conf/netcat-logger.con 指定我们所描述的采集方案
    • -n a1 指定我们这个agent的名字

    Step 4: 安装 Telnet 准备测试

    在node02机器上面安装telnet客户端,用于模拟数据的发送

    # 使用telnet模拟数据发送
    yum -y install 
    telnet telnet node03 44444 
    
    展开全文
  • SparkStreaming整合flume实战
  • Flume实战案例 一、监控端口数据官方案例 1. 案例需求: 首先,Flume 监控本机 44444 端口,然后通过 telnet 工具向本机 44444 端口发 送消息,最后 Flume 将监听的数据实时显示在控制台。 2. 需求分析: 3. 实现...
  • 集群flume实战

    2016-11-02 18:33:12
    Flume集群测试 假设:现有两台机子,命名为:agent,collect。 agent IP地址为:192.168.26.65 collect为192.168.26.61 要求:实现agent到collect的连接,并能向collect发送日志。 步骤: 两台电脑上分别装上 jdk,...
  • flume实战案例

    2020-06-28 10:44:40
    目录实战1实战2实战3实战4 单数据源多出口案例(选择器)实战5 单数据源多出口案例(Sink组)实战6 多数据源汇总案例 实战1 首先启动Flume任务,监控本机44444端口,服务端; 然后通过netcat工具向本机44444端口发送消息...
  • Spark Streaming整合Flume实战二基于拉方式:Pull-based Approach using a Custom Sink配置步骤 基于拉方式:Pull-based Approach using a Custom Sink Spark Streaming 基于拉方式处理 Flume-ng 数据源(Pull-based ...
  • Flume实战案例

    千次阅读 2018-10-10 09:35:46
    Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可...
  • Flume实战案例 02

    2019-03-26 09:20:09
    1. Flume的安装部署 1.1 需求 案例:使用网络telent命令向一台机器发送一些网络数据,然后通过flume采集网络端口数据。 1.2 分析 1.3 实现 下载安装包,解压,修改配置文件 Flume的安装非常简单,只需要解压即可...
  • Flume 实战案例

    2019-03-31 18:47:32
    bin/flume-ng agent -c conf -f agentconf/spooldir-hdfs.properties -n agent1 测试: 1、 如果 HDFS 集群是高可用集群,那么必须要放入 core-site.xml 和 hdfs-site.xml 文件到 $FLUME_HOME/conf 目录中 2、 ...
  • Flume实战开发配置

    2019-09-06 11:17:20
    实际开发中我们常用的是把Flume采集的日志按照指定的格式传到HDFS上,为我们的离线分析提供数据支撑 我们使用二个主机进行数据的生产与采集,简单的了写了一个脚本, 服务器 hadoop1 #!/bin/bash #打印100次文本到...
  • : Spark大型项目广告点击项目技术骨架实现之Spark+Kafka+Flume实战 flume 安装在集群的worker4上,地址192.168.189.5 1.下载 flume  http://flume.apache.org/download.html 2.上传worker4 192.168.189.5

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 5,559
精华内容 2,223
关键字:

flume实战