flume 订阅
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。当前Flume有两个版本Flume 0.9X版本的统称Flume-og,Flume1.X版本的统称Flume-ng。由于Flume-ng经过重大重构,与Flume-og有很大不同,使用时请注意区分。 展开全文
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。当前Flume有两个版本Flume 0.9X版本的统称Flume-og,Flume1.X版本的统称Flume-ng。由于Flume-ng经过重大重构,与Flume-og有很大不同,使用时请注意区分。
信息
外文名
flume
特    点
聚合和传输的系统
中文名
水槽
实    质
孵化项目
flume日志收集
Flume最早是Cloudera提供的日志收集系统,是Apache下的一个孵化项目,Flume支持在日志系统中定制各类数据发送方,用于收集数据。
收起全文
精华内容
下载资源
问答
  • flume

    2021-03-25 17:43:12
    Apache Flume是一个分布式,可靠且可用的系统,用于有效地收集,聚合大量日志数据并将其从许多不同的源移动到集中式数据存储中https://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.4.0/
  • Flume

    千次阅读 多人点赞 2019-10-07 10:59:02
    Flume 总结参数配置详解概述Flume数据的传输过程SourcesNetCat SourceAvro SourceExec SourceSpooling Directory SourceTaildir SourceChannelsFile ChannelSinksLogger SinkHDFS SinkAvro SinkKafka Sink案例汇聚者...

    参数配置详解

    概述

    本篇文章是根据Flume官网对Flume组件(Source,Channel,Sink)的常用配置参数做一个主要介绍,如有表达意思错误希望不吝指出。

    在这里插入图片描述

    1. source
      采集数据
      采集日志数据,将采集到的日志数据传输给channel

    2. channel
      一个队列,存储source传递过来的数据

    3. sink
      从channel中获取数据,将数据输出到目标位置(HDFS、HBase、Source)

    4. Event
      传输数据的单元,Flume中采集数据并传输的最小单位

    Flume数据的传输过程

    在这里插入图片描述

    Sources

    Flume中常用的Source有NetCat,Avro,Exec,Spooling Directory,Taildir,也可以根据业务场景的需要自定义Source,具体介绍如下。

    NetCat Source

    NetCat Source可以使用TCP和UDP两种协议方式,使用方法基本相同,通过监听指定的IP和端口来传输数据,它会将监听到的每一行数据转化成一个Event写入到Channel中。(必须参数以@标示,下类同)
    channels@ –
    type@ – 类型指定为:netcat
    bind@ – 绑定机器名或IP地址
    port@ – 端口号
    max-line-length

    Property NameDefaultDescription
    channels@
    type@类型指定为:netcat
    bind@绑定机器名或IP地址
    port@端口号
    max-line-length512一行的最大字节数
    ack-every-eventtrue对成功接受的Event返回OK
    selector.typereplicating选择器类型replicating or multiplexing
    selector.*选择器相关参数
    interceptors拦截器列表,多个以空格分隔
    interceptors.*拦截器相关参数

    Avro Source

    不同主机上的Agent通过网络传输数据可使用的Source,一般是接受Avro client的数据,或和是上一级Agent的Avro Sink成对存在。

    Property NameDefaultDescription
    channels@
    type@类型指定为:avro
    bind@监听的主机名或IP地址
    port@端口号
    threads传输可使用的最大线程数
    selector.type
    selector.*
    interceptors拦截器列表
    interceptors.*
    compression-typenone可设置为“none” 或 “deflate”. 压缩类型需要和AvroSource匹配

    Exec Source

    Exec source通过执行给定的Unix命令的传输结果数据,如cat,tail -F等,实时性比较高,但是一旦Agent进程出现问题,可能会导致数据的丢失。

    Property NameDefaultDescription
    channels@
    type@类型指定为:exec
    command@需要去执行的命令
    shell运行命令的shell脚本文件
    restartThrottle10000尝试重启的超时时间
    restartfalse如果命令执行失败,是否重启
    logStdErrfalse是否记录错误日志
    batchSize20批次写入channel的最大日志数量
    batchTimeout3000批次写入数据的最大等待时间(毫秒)
    selector.typereplicating选择器类型replicating or multiplexing
    selector.*选择器其他参数
    interceptors拦截器列表,多个空格分隔
    interceptors.*

    Spooling Directory Source

    通过监控一个文件夹将新增文件内容转换成Event传输数据,特点是不会丢失数据,使用Spooling Directory Source需要注意的两点是,1)不能对被监控的文件夹下的新增的文件做出任何更改,2)新增到监控文件夹的文件名称必须是唯一的。由于是对整个新增文件的监控,Spooling Directory Source的实时性相对较低,不过可以采用对文件高粒度分割达到近似实时。

    Property NameDefaultDescription
    channels@
    type@类型指定:spooldir.
    spoolDir@被监控的文件夹目录
    fileSuffix.COMPLETED完成数据传输的文件后缀标志
    deletePolicynever删除已经完成数据传输的文件时间:never or immediate
    fileHeaderfalse是否在header中添加文件的完整路径信息
    fileHeaderKeyfile如果header中添加文件的完整路径信息时key的名称
    basenameHeaderfalse是否在header中添加文件的基本名称信息
    basenameHeaderKeybasename如果header中添加文件的基本名称信息时key的名称
    includePattern^.*$使用正则来匹配新增文件需要被传输数据的文件
    ignorePattern^$使用正则来忽略新增的文件
    trackerDir.flumespool存储元数据信息目录
    consumeOrderoldest文件消费顺序:oldest, youngest and random.
    maxBackoff4000如果channel容量不足,尝试写入的超时时间,如果仍然不能写入,则会抛出ChannelException
    batchSize100批次处理粒度
    inputCharsetUTF-8输入码表格式
    decodeErrorPolicyFAIL遇到不可解码字符后的处理方式:FAIL,REPLACE,IGNORE
    selector.typereplicating选择器类型:replicating or multiplexing
    selector.*选择器其他参数
    interceptors拦截器列表,空格分隔
    interceptors.*

    Taildir Source

    可以实时的监控指定一个或多个文件中的新增内容,由于该方式将数据的偏移量保存在一个指定的json文件中,即使在Agent挂掉或被kill也不会有数据的丢失,需要注意的是,该Source不能在Windows上使用。

    Property NameDefaultDescription
    channels@
    type@指定类型:TAILDIR.
    filegroups@文件组的名称,多个空格分隔
    filegroups.@被监控文件的绝对路径
    positionFile~/.flume/taildir_position.json存储数据偏移量路径
    headers..Header key的名称
    byteOffsetHeaderfalse是否添加字节偏移量到key为‘byteoffset’值中
    skipToEndfalse当偏移量不能写入到文件时是否跳到文件结尾
    idleTimeout120000关闭没有新增内容的文件超时时间(毫秒)
    writePosInterval3000在positionfile 写入每一个文件lastposition的时间间隔
    batchSize100批次处理行数
    fileHeaderfalse是否添加header存储文件绝对路径
    fileHeaderKeyfilefileHeader启用时,使用的key

    Channels

    官网提供的Channel有多种类型可供选择,这里介绍Memory Channel和File Channel。
    Memory Channel
    Memory Channel是使用内存来存储Event,使用内存的意味着数据传输速率会很快,但是当Agent挂掉后,存储在Channel中的数据将会丢失。

    Property NameDefaultDescription
    type@类型指定为:memory
    capacity100存储在channel中的最大容量
    transactionCapacity100从一个source中去或者给一个sink,每个事务中最大的事件数
    keep-alive3对于添加或者删除一个事件的超时的秒钟
    byteCapacityBufferPercentage20定义缓存百分比
    byteCapacitysee descriptionChannel中允许存储的最大字节总数

    File Channel

    File Channel使用磁盘来存储Event,速率相对于Memory Channel较慢,但数据不会丢失。

    Property NameDefaultDescription
    type@类型指定:file.
    checkpointDir~/.flume/file-channel/checkpointcheckpoint目录
    useDualCheckpointsfalse备份checkpoint,为True,backupCheckpointDir必须设置
    backupCheckpointDir备份checkpoint目录
    dataDirs~/.flume/file-channel/data数据存储所在的目录设置
    transactionCapacity10000Event存储最大值
    checkpointInterval30000checkpoint间隔时间
    maxFileSize2146435071单一日志最大设置字节数
    minimumRequiredSpace524288000最小的请求闲置空间(以字节为单位)
    capacity1000000Channel最大容量
    keep-alive3一个存放操作的等待时间值(秒)
    use-log-replay-v1falseExpert: 使用老的回复逻辑
    use-fast-replayfalseExpert: 回复不需要队列
    checkpointOnClosetrue

    Sinks

    Flume常用Sinks有Log Sink,HDFS Sink,Avro Sink,Kafka Sink,当然也可以自定义Sink。

    Logger Sink

    Logger Sink以INFO 级别的日志记录到log日志中,这种方式通常用于测试。

    Property NameDefaultDescription
    channel@
    type@类型指定:logger
    maxBytesToLog16能够记录的最大Event Body字节数

    HDFS Sink

    Sink数据到HDFS,目前支持text 和 sequence files两种文件格式,支持压缩,并可以对数据进行分区,分桶存储。

    NameDefaultDescription
    channel@
    type@指定类型:hdfs
    hdfs.path@HDFS的路径 hdfs://namenode/flume/webdata/
    hdfs.filePrefixFlumeData保存数据文件的前缀名
    hdfs.fileSuffix保存数据文件的后缀名
    hdfs.inUsePrefix临时写入的文件前缀名
    hdfs.inUseSuffix.tmp临时写入的文件后缀名
    hdfs.rollInterval30间隔多长将临时文件滚动成最终目标文件,单位:秒, 如果设置成0,则表示不根据时间来滚动文件
    hdfs.rollSize1024当临时文件达到多少(单位:bytes)时,滚动成目标文件, 如果设置成0,则表示不根据临时文件大小来滚动文件
    hdfs.rollCount10当 events 数据达到该数量时候,将临时文件滚动成目标文件,如果设置成0,则表示不根据events数据来滚动文件
    hdfs.idleTimeout0当目前被打开的临时文件在该参数指定的时间(秒)内, 没有任何数据写入,则将该临时文件关闭并重命名成目标文件
    hdfs.batchSize100每个批次刷新到 HDFS 上的 events 数量
    hdfs.codeC文件压缩格式,包括:gzip, bzip2, lzo, lzop, snappy
    hdfs.fileTypeSequenceFile文件格式,包括:SequenceFile, DataStream,CompressedStre, 当使用DataStream时候,文件不会被压缩,不需要设置hdfs.codeC; 当使用CompressedStream时候,必须设置一个正确的hdfs.codeC值;
    hdfs.maxOpenFiles5000最大允许打开的HDFS文件数,当打开的文件数达到该值,最早打开的文件将会被关闭
    hdfs.minBlockReplicasHDFS副本数,写入 HDFS 文件块的最小副本数。 该参数会影响文件的滚动配置,一般将该参数配置成1,才可以按照配置正确滚动文件
    hdfs.writeFormatWritable写 sequence 文件的格式。包含:Text, Writable(默认)
    hdfs.callTimeout10000执行HDFS操作的超时时间(单位:毫秒)
    hdfs.threadsPoolSize10hdfs sink 启动的操作HDFS的线程数
    hdfs.rollTimerPoolSize1hdfs sink 启动的根据时间滚动文件的线程数
    hdfs.kerberosPrincipalHDFS安全认证kerberos配置
    hdfs.kerberosKeytabHDFS安全认证kerberos配置
    hdfs.proxyUser代理用户
    hdfs.roundfalse是否启用时间上的”舍弃”
    hdfs.roundValue1时间上进行“舍弃”的值
    hdfs.roundUnitsecond时间上进行”舍弃”的单位,包含:second,minute,hour
    hdfs.timeZoneLocal Time时区。
    hdfs.useLocalTimeStampfalse是否使用当地时间
    hdfs.closeTries 0Numberhdfs sink 关闭文件的尝试次数;如果设置为1,当一次关闭文件失败后,hdfs sink将不会再次尝试关闭文件, 这个未关闭的文件将会一直留在那,并且是打开状态; 设置为0,当一次关闭失败后,hdfs sink会继续尝试下一次关闭,直到成功
    hdfs.retryInterval180hdfs sink 尝试关闭文件的时间间隔, 如果设置为0,表示不尝试,相当于于将hdfs.closeTries设置成1
    serializerTEXT序列化类型
    serializer.*

    Avro Sink

    Property NameDefaultDescription
    channel@
    type@指定类型:avro.
    hostname@主机名或IP
    port@端口号
    batch-size100批次处理Event数
    connect-timeout 20000连接超时时间
    request-timeout20000请求超时时间
    compression-typenone压缩类型,“none” or “deflate”.
    compression-level6压缩级别,0表示不压缩,1-9数字越大,压缩比越高
    sslfalse使用ssl加密

    Kafka Sink

    传输数据到Kafka中,需要注意的是Flume版本和Kafka版本的兼容性

    Property NameDefaultDescription
    type指定类型:org.apache.flume.sink.kafka.KafkaSink
    kafka.bootstrap.serverskafka服务地址
    kafka.topicdefault-flume-topickafka Topic
    flumeBatchSize100批次写入kafka Event数

    案例

    汇聚者 (collect.conf)

    collect.sources=cs1 //来源
    collect.channels=cc1 //管道
    collect.sinks=csi //传输

    #sources

    collect.sources.cs1.type=avro //类型
    collect.sources.cs1.port=5140 //端口号
    collect.sources.cs1.bind=master //节点名称
    collect.sources.cs1.thread=15 //线程
    collect.sources.cs1.channels=cc1

    #channels
    collect.channels.cc1.type=memory //存储地方
    collect.channels.cc1.capacity=100000 //容量
    collect.channels.cc1.transactionCapacity=200 // Event存储最大值
    collect.channels.cc1.keep-alive = 30 //存放等待时间

    #sinks
    collect.sinks.csi.channel=cc1
    collect.sinks.csi.type=hdfs
    collect.sinks.csi.hdfs.path=hdfs://master:9000/flume/school/getData/
    collect.sinks.csi.hdfs.filePrefix=infomation
    collect.sinks.csi.hdfs.minBlockReplicas=1
    collect.sinks.csi.hdfs.fileSuffix=.text
    collect.sinks.csi.hdfs.writeFormat=Text
    collect.sinks.csi.hdfs.rollInterval=300
    collect.sinks.csi.hdfs.fileType=DataStream
    collect.sinks.csi.hdfs.rollSize = 0
    collect.sinks.csi.hdfs.rollCount = 0
    collect.sinks.csi.hdfs.batchSize = 10
    collect.sinks.csi.txnEventMax = 1000
    collect.sinks.csi.hdfs.callTimeout = 60000
    collect.sinks.csi.hdfs.appendTimeout=60000

    执行
    vim collect.sh
    flume-ng agent -n collect
    (agent名称)
    -c conf
    -f 路径/collect.conf
    (执行文件路径)
    -Dflume.root.logger=DEBUG,console > /flume/logs/school_collect.log 2>&1 &
    (当有错误时日志存放路径
    最后一个& :后台运行)

    flume-ng agent -n collect -c conf -f /opt/programfile/flume/conf/conf/collect.conf -Dflume.root.logger=DEBUG,console > /flume/logs/school_collect.log 2>&1 &
    

    bash collect.sh

    收集者(product1.conf)

    product1.sources=ps1
    product1.channels=pc1
    product1.sinks=pk1

    #sources

    product1.sources.ps1.type=syslogtcp //类型
    product1.sources.ps1.port=5140 //绑定端口号
    product1.sources.ps1.bind=slave1 //绑定节点名称
    product1.sources.ps1.channels=pc1

    #channels
    product1.channels.pc1.type=memory
    product1.channels.pc1.capacity=100000
    product1.channels.pc1.transactionCapacity=100
    product1.channels.pc1.keep-alive = 30

    #sinks
    product1.sinks.pk1.channel=pc1
    product1.sinks.pk1.type=avro
    product1.sinks.pk1.hostname=master
    product1.sinks.pk1.port=5140

    执行
    vim product1.sh

    flume-ng agent -n product1 -c conf -f /opt/programfile/flume/conf/conf/product1.conf -Dflume.root.logger=DEBUG,console > /flume/logs/school_slave1.log 2>&1 &
    

    bash product1.sh

    注意:
    启动节点的顺序:先启动数据的汇聚节点,在启动数据的采集节点

    spark 与 flume 整合

    push

    #flume-to-spark-push.conf: A single-node Flume configuration
    #Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    #Describe/configure the source
    #把Flume Source类别设置为netcat,绑定到node3的33333端口
    #可以通过“telnet node3 33333”命令向Flume Source发送消息
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = node3
    a1.sources.r1.port = 33333
    
    #Describe the sink
    #Flume Sink类别设置为avro,绑定到node2的44444端口
    #Flume Source把采集到的消息汇集到Flume Sink以后,Sink会把消息推送给node2的44444端口
    #Spark Streaming程序一直在监听node2的44444端口,一旦有消息到达,就会被Spark Streaming应用程序取走进行处理
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = slave
    a1.sinks.k1.port = 44444
    #Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000000
    a1.channels.c1.transactionCapacity = 1000000
    
    a1.sources.r1.channels = c1 
    a1.sinks.k1.channel = c1
    

    scala代码

    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.flume._
    
    object FlumeEventCount {
      def main(args: Array[String]) {
    	val host = “slave"
      val port = 44444
    // Create the context and set the batch size
    val conf = new SparkConf().setAppName("FlumeEventCount").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(10))
    // 减少终端的输出信息。设置为ERROR时,由于flume没有启动,仍有大量的输出信息
    ssc.sparkContext.setLogLevel("ERROR")
    
    // Create a flume stream
    val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
    
    // Print out the count of events received from this server in each batch
    stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
    ssc.start()
    ssc.awaitTermination()
      }
    }// 备注:host (node1),必须是Spark集群中的一台节点,Spark会在这台机器上启动NettyServer
    

    pull

    注意
    将spark-streaming-flume-sink_2.11-2.3.0.jar、scala-library-2.11.8.jar拷贝到$FLUME_HOME/lib中
    备注 scala-library-2.10.5.jar 删除
    启动flume:
    flume-ng agent --conf-file $FLUME_HOME/conf/flume-to-spark-pull.conf --name a1 -Dflume.root.logger=INFO,console

    定义配置文件 flume-to-spark-pull.conf 	
    
    # agent名称,source、channel、sink的名称
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    # 定义具体的source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = node3
    a1.sources.r1.port = 22222
    a1.sources.r1.channels = c1
    # 定义具体的channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    # 定义具体的sink
    a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
    a1.sinks.k1.hostname = node3
    a1.sinks.k1.port = 11111
    a1.sinks.k1.channel = c1
    # 备注:node3是安装了flume的节点
    

    scala
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.flume._

    object FlumePullingEventCount {
      def main(args: Array[String]) {
        val host = "node3"
        val port = 11111
    
        val conf = new SparkConf().setAppName("FlumePullingEventCount").setMaster("local[*]")
        val ssc = new StreamingContext(conf, Seconds(5))
        ssc.sparkContext.setLogLevel("ERROR")
    
        val stream = FlumeUtils.createPollingStream(ssc, host, port)
        stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    kafka与 flume 整合

    1. 将kafkaflume-plugin.jar导入到flume/lib下

    2. 编写配置文件conf
      vi /opt/programfile/flume/conf/kflume.conf

       # config component
       producer.sources=s
       producer.channels=c
       producer.sinks=r
      
       # config sources
       producer.sources.s.type=exec
       producer.sources.s.command=tail -F /home/zhangsan/userEventlogs.log
       producer.sources.s.channels=c
      
       # config channel
       producer.channels.c.type=memory
       producer.channels.c.capacity=15000
       producer.channels.c.transactionCapacity=150
      
      
       # config sink4 (kafka中的producer : broker-list,topic)
       
       producer.sinks.r.type=org.apache.flume.plugins.KafkaSink
       producer.sinks.r.metadata.broker.list=master:9092,slave1:9092,slave2:9092
       						//9092是broker server的服务器端口,允许只写一个主机地址
       producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
       producer.sinks.r.request.required.acks= -1
       producer.sinks.r.max.message.size=1000000
       producer.sinks.r.producer.type=sync 
       producer.sinks.r.custom.encoding=UTF-8
       producer.sinks.r.custom.topic.name=flumetokafka2
       producer.sinks.r.channel = c
      
    3. 编写执行脚本
      vi /opt/programfile/flume/kflume.bash
      #!/bin/bash
      bin/flume-ng agent -n producer -c conf -f conf/kflume.conf
      -Dflume.root.logger=DEBUG,console>./kflume.log 2>&1 &

    4. 启动脚本
      bash kflume.sh

    5. 向/home/zhangsan/userEventlogs.log 追加数据
      echo “hello ,flume to kafka” >> userEventlogs.log

    6. 启动kafka
      创建主题:flumetokafka

      启动kafka消费者:
      bin/kafka-console-consumer.sh --zookeeper master:2181 --topic flumetokafka --from-beginning

    7. 实时的测试
      不断向userEventlogs.log中写入新的信息

    在这里插入图片描述在这里插入图片描述在这里插入图片描述在这里插入图片描述在这里插入图片描述在这里插入图片描述

    展开全文
  • Flume用法

    2021-01-07 14:17:16
    Flume包含三部分 Source:从哪收集,一般使用:avro(序列化),exec(命令行),spooling(目录),taildir(目录和文件,包含offset,不会数据丢失),kafka Channel:数据存哪里:(memory,kafka,file) Sink:数据输出到...
  • flume安装包

    2019-02-13 17:53:27
    flume安装包 flume安装包 flume安装包 看好版本再下载,以免下错
  • Flume安装部署

    万次阅读 2019-12-16 21:17:29
    Flume介绍 Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。 Flume可以采集文件,socket数据包、文件、文件夹、kafka等各种形式源数据,又可以将采集到的数据(下沉sink)输出到HDFS、hbase、...

    Flume介绍
    Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。
    Flume可以采集文件,socket数据包、文件、文件夹、kafka等各种形式源数据,又可以将采集到的数据(下沉sink)输出到HDFS、hbase、hive、kafka等众多外部存储系统中。

    安装部署
    1、 Flume的安装非常简单,只需要解压即可,当然,前提是已有hadoop环境
    2、上传安装文件并解压
    tar -zxvf flume-ng-1.6.0-cdh5.14.0.tar.gz -C /export/servers/
    3、进入解压后的目录中的conf里
    cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf
    4、 拷贝一份flume-env.sh
    cp flume-env.sh.template flume-env.sh
    5、 进入flume-env.sh修改
    vim flume-env.sh
    export JAVA_HOME=/export/servers/jdk1.8.0_141

    展开全文
  • flume安装配置

    千次阅读 2016-11-20 18:43:49
    flume

    flume

    参考:
    Flume NG 简介及配置实战
    Flume介绍与安装

    1、flume 下载

        flume下载的地址

    2、安装

      将安装包放到安装路径, 解压:

    //解压
    tar -zxf apache-flume-1.7.0-bin.tar.gz
    //移到指定路径 
    sudo mv apache-flume-1.7.0-bin /usr/local/flume-1.7.0

    3、配置参数

    3.1、 配置/etc/profile 参数

    编辑/etc/profile文件,声明flume的home路径和在path加入bin的路径:

    export FLUME_HOME=/usr/local/flume-1.7.0
    export FLUME_CONF_DIR=$FLUME_HOME/conf
    export PATH=$PATH:$FLUME_HOME/bin

    编译配置文件/etc/profile,并确认生效

    source /etc/profile
    echo $PATH

    3.2、设置flume-env.sh配置文件

    在$FLUME_HOME/conf 下复制改名flume-env.sh.template为flume-env.sh,修改conf/ flume-env.sh配置文件:

    JAVA_HOME= /app/lib/jdk1.7.0_79
    JAVA_OPTS="-Xms100m -Xmx200m -Dcom.sun.management.jmxremote"

    这里写图片描述

    3.3 验证安装

    3.3.1、修改flume-conf配置文件 在$FLUME_HOME/conf目录下修改flume-conf.properties.template文件,复制并改名为 flume-conf.properties, 编辑flume-conf.properties:

    /usr/local/flume-1.7.0/conf
    cp flume-conf.properties.template flume-conf2.properties

    3.3.2、修改flume-conf.properties:

    # The configuration file needs to define the sources, the channels and the sinks.
    # Sources, channels and sinks are defined per agent, in this case called 'a1'
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # For each one of the sources, the type is defined
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    #The channel can be defined as follows.
    a1.sources.r1.channels = c1
    # Each sink's type must be defined
    a1.sinks.k1.type = logger
    
    #Specify the channel the sink should use
    a1.sinks.k1.channel = c1
    
    # Each channel's type is defined.
    a1.channels.c1.type = memory
    # Other config values specific to each type of channel(sink or source)
    # can be defined as well
    # In this case, it specifies the capacity of the memory channel
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    3.3.3、在flume的安装目录/flume-1.7.0下运行

    cd /usr/local/flume-1.7.0
    ./bin/flume-ng agent --conf ./conf/ --conf-file - ./conf/flume-conf.properties --name a1 -Dflume.root.logger=INFO,console
    

    PS:-Dflume.root.logger=INFO,console 仅为 debug 使用,请勿生产环境生搬硬套,否则大量的日志会返回到终端。。。

    -c/–conf 后跟配置目录,-f/–conf-file 后跟具体的配置文件,-n/–name 指定agent的名称

    出错:

    2016-11-20 08:54:34,702 (main) [ERROR - org.apache.flume.node.Application.main(Application.java:348)] A fatal error occurred while running. Exception follows.
    org.apache.commons.cli.ParseException: The specified configuration file does not exist: /usr/local/flume-1.7.0/conf/flume-conf2.properties
            at org.apache.flume.node.Application.main(Application.java:316)

    解决:

    由于上面的 flume-conf2.properties修改成了 flume-conf.properties, 改一下文件名
    

    3.4、 测试收集日志到HDFS

    修改 flume-conf2.properties:

    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    a1.sources.r1.type = exec
    a1.sources.r1.channels = c1
    //监视的日志文件
    a1.sources.r1.command = tail -F /usr/local/hadoop/logs/hadoop-chb-namenode-TEST.log
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.channel = c1
    //输出到hdfs
    a1.sinks.k1.hdfs.path = hdfs://192.168.1.124:9000/output/out_flume
    //输出文件前缀
    a1.sinks.k1.hdfs.filePrefix = events-
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 10
    a1.sinks.k1.hdfs.roundUnit = minute
    a1.sinks.k1.hdfs.rollSize = 4000000
    a1.sinks.k1.hdfs.rollCount = 0
    a1.sinks.k1.hdfs.writeFormat = Text
    a1.sinks.k1.hdfs.fileType = DataStream
    a1.sinks.k1.hdfs.batchSize = 10
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    运行flume

    cd   /usr/local/flume-1.7.0
    //运行flume
    ./bin/flume-ng agent --conf ./conf/ --conf-file ./conf/flume-conf2.properties --name a1 -Dflume.root.logger=INFO,console
    

    查看hdfs中 /output/out_flume中的文件:

    hadoop fs -cat  /output/out_flume/
    hadoop fs -cat  /output/out_flume/events-.1479642835192
    展开全文
  • 文章目录一、Flume 事务二、Flume Agent 内部原理三、Flume 拓扑结构1、简单串联2、复制和多路复用3、负载均衡和故障转移4、聚合 一、Flume 事务 二、Flume Agent 内部原理 重要组件: 1)ChannelSelector Channel...
  • Apache Flume

    万次阅读 2019-12-04 21:49:14
    Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的软件。 Flume的核心是把数据从数据源(source)收集过来,再将收集到的数据送到指定的目的地(sink)。为了保证输送的过程一定成功,...

    1丶概述
    Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的软件。
    Flume的核心是把数据从数据源(source)收集过来,再将收集到的数据送到指定的目的地(sink)。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,flume在删除自己缓存的数据。
    Flume支持定制各类数据发送方,用于收集各类型数据;同时,Flume支持定制各种数据接受方,用于最终存储数据。一般的采集需求,通过对flume的简单配置即可实现。针对特殊场景也具备良好的自定义扩展能力。因此,flume可以适用于大部分的日常数据采集场景。
    当前Flume有两个版本。Flume 0.9X版本的统称Flume OG(original generation),Flume1.X版本的统称Flume NG(next generation)。由于Flume NG经过核心组件、核心配置以及代码架构重构,与Flume OG有很大不同,使用时请注意区分。改动的另一原因是将Flume纳入 apache 旗下,Cloudera Flume 改名为 Apache Flume。

    2丶运行机制
    Flume系统中核心的角色是agent,agent本身是一个Java进程,一般运行在日志收集节点。

    在这里插入图片描述每一个agent相当于一个数据传递员,内部有三个组件:
    Source:采集源,用于跟数据源对接,以获取数据;
    Sink:下沉地,采集数据的传送目的,用于往下一级agent传递数据或者往
    最终存储系统传递数据;
    Channel:agent内部的数据传输通道,用于从source将数据传递到sink;
    在整个数据的传输的过程中,流动的是event,它是Flume内部数据传输的最基本单元。event将传输的数据进行封装。如果是文本文件,通常是一行记录,event也是事务的基本单位。event从source,流向channel,再到sink,本身为一个字节数组,并可携带headers(头信息)信息。event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。
    一个完整的event包括:event headers、event body、event信息,其中event信息就是flume收集到的日记记录。

    3丶Flume采集系统结构图
    3.1简单结构
    单个agent采集数据
    在这里插入图片描述
    3.2复杂结构
    多级agent之间串联
    在这里插入图片描述

    Flume安装部署

    上传安装包到数据源所在节点上
    然后解压 tar -zxvf apache-flume-1.8.0-bin.tar.gz
    然后进入flume的目录,修改conf下的flume-env.sh,在里面配置JAVA_HOME
    根据数据采集需求配置采集方案,描述在配置文件中(文件名可任意自定义)
    指定采集方案配置文件,在相应的节点上启动flume agent

    先用一个最简单的例子来测试一下程序环境是否正常
    1、先在flume的conf目录下新建一个文件
    vi netcat-logger.conf

    # 定义这个agent中各组件的名字
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # 描述和配置source组件:r1
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # 描述和配置sink组件:k1
    a1.sinks.k1.type = logger
    
    # 描述和配置channel组件,此处使用是内存缓存的方式
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # 描述和配置source  channel   sink之间的连接关系
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    2、启动agent去采集数据
    bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1  -		Dflume.root.logger=INFO,console
    -c conf   指定flume自身的配置文件所在目录
    -f conf/netcat-logger.con  指定我们所描述的采集方案
    -n a1  指定我们这个agent的名字
    3、测试
    先要往agent采集监听的端口上发送数据,让agent有数据可采。
    随便在一个能跟agent节点联网的机器上:
    telnet anget-hostname  port   (telnet localhost 44444)
    
    测试会出现
    	-bash: telnet: command not found
    	原因 :
    	没有安装telnet服务
    	解决方案
    	yum install telnet-server -y
    	yum install telnet.*  -y
    
    展开全文
  • 欢迎使用Apache Flume! Apache Flume是一种分布式,可靠且可用的服务,用于有效地收集,聚合和移动大量日志数据。 它具有基于流数据流的简单灵活的体系结构。 它具有可调整的可靠性机制以及许多故障转移和恢复机制...
  • Flume学习文档(2){Flume安装部署、Flume配置文件}。 记录我的学习之旅,每份文档倾心倾力,带我成我大牛,回头观望满脸笑意,望大家多多给予意见,有问题或错误,请联系 我将及时改正;借鉴文章标明出处,谢谢
  • Using Flume

    2018-12-21 16:01:01
    Using Flume provides an overview of various components within Flume, diving into details where necessary. Operators will find this book immensely valuable for understanding how to easily set up and ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 61,183
精华内容 24,473
关键字:

flume