flume_flume 大数据 - CSDN
flume 订阅
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。当前Flume有两个版本Flume 0.9X版本的统称Flume-og,Flume1.X版本的统称Flume-ng。由于Flume-ng经过重大重构,与Flume-og有很大不同,使用时请注意区分。 展开全文
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。当前Flume有两个版本Flume 0.9X版本的统称Flume-og,Flume1.X版本的统称Flume-ng。由于Flume-ng经过重大重构,与Flume-og有很大不同,使用时请注意区分。
信息
外文名
flume
特    点
聚合和传输的系统
中文名
水槽
实    质
孵化项目
flume日志收集
Flume最早是Cloudera提供的日志收集系统,是Apache下的一个孵化项目,Flume支持在日志系统中定制各类数据发送方,用于收集数据。
收起全文
  • Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构,灵活简单。是大数据大数据开发工程师必须会的框架之一。 在本课程中,你将学习到,Flume架构原理、...
  • Flume日志采集

    千次阅读 2020-09-04 20:21:57
    2.1 什么是Flume 2.2 Flume特性 三 Flume原理 3.1 Flume组件详解 3.2 Flume采集结构图 3.2.1 简单结构 3.2.2 复杂结构 4 Flume实战案例 4.1 Flume的安装部署 4.2 采集案例 4.2.1 采集目录到HDFS 4.2.2 ...
    1. 本博客已迁移至微信公众号!将不再更新
    2. 关注公众号即可获得免费学习资源,获得免费指导!!!
    3. 公众号后续将会持续更新clickhouse,sparkstreaming,flink,数仓建模,用户画像,实时计算,推荐系统,实时数仓等内容,感兴趣的朋友可以关注
    4. 不定期会有朋友的面经分享

    目录

    一 前言

    二 概述

    2.1 什么是Flume

    2.2 Flume特性

    三 Flume原理

    3.1 Flume组件详解

    3.2 Flume采集结构图

    3.2.1 简单结构

    3.2.2 复杂结构

    4 Flume实战案例

    4.1 Flume的安装部署

    4.2 采集案例

    4.2.1 采集目录到HDFS

    4.2.2 采集文件到HDFS

    4.3 更多source和sink组件

    4.4 HA Flume配置案例

    4.4.1 角色分配

    4.4.2 配置

    4.4.3 FAILOVER测试


    一 前言

    在一个完整的大数据处理系统中,除了hdfs+mapreduce+hive组成分析系统的核心之外,还需要数据采集、结果数据导出、任务调度等不可或缺的辅助系统,而这些辅助工具在hadoop生态体系中都有便捷的开源框架,如图所示:

    flume是由cloudera软件公司产出的可分布式日志收集系统,后与2009年被捐赠了apache软件基金会,为hadoop相关组件之一。尤其近几年随着flume的不断被完善以及升级版本的逐一推出,特别是flume-ng;同时flume内部的各种组件不断丰富,用户在开发的过程中使用的便利性得到很大的改善,现已成为apache top项目之一.

    二 概述

    2.1 什么是Flume

      Apache Flume 是一个从可以收集例如日志,事件等数据资源,并将这些数量庞大的数据从各项数据资源中集中起来存储的工具/服务,或者数集中机制。flume具有高可用,分布式,配置工具,其设计的原理也是基于将数据流,如日志数据从各种网站服务器上汇集起来存储到HDFS,HBase等集中存储器中。其结构如下图所示:

    2.2 Flume特性

    1. Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。
    2. Flume可以采集文件,socket数据包等各种形式源数据,又可以将采集到的数据输出到HDFS、hbase、hive、kafka等众多外部存储系统中
    3. 一般的采集需求,通过对flume的简单配置即可实现
    4. Flume针对特殊场景也具备良好的自定义扩展能力,因此,flume可以适用于大部分的日常数据采集场景

    三 Flume原理

    3.1 Flume组件详解

    对于每一个Agent来说,它就是一共独立的守护进程(JVM),它从客户端接收数据,如下图所示flume的基本模型

    1. Flume分布式系统中最核心的角色是agent,flume采集系统就是由一个个agent所连接起来形成
    2. 每一个agent相当于一个数据(被封装成Event对象)传递员,内部有三个组件:
        1. Source:采集组件,用于跟数据源对接,以获取数据
        2. Sink:下沉组件,用于往下一级agent传递数据或者往最终存储系统传递数据
        3. Channel:传输通道组件,用于从source将数据传递到sink

    首先来看一下flume官网中对Event的定义

     

    一行文本内容会被反序列化成一个event(序列化是将对象状态转换为可保持或传输的格式的过程。与序列化相对的是反序列化,它将流转换为对象。这两个过程结合起来,可以轻松地存储和传输数据),event的最大定义为2048字节,超过,则会切割,剩下的会被放到下一个event中,默认编码是UTF-8。

    3.2 Flume采集结构图

    3.2.1 简单结构

    单个agent采集数据

    3.2.2 复杂结构

    多级agent之间串联

    4 Flume实战案例

    4.1 Flume的安装部署

    1.Flume的安装非常简单,只需要解压即可,当然,前提是已有hadoop环境

    上传安装包到数据源所在节点上

    然后解压  tar -zxvf apache-flume-1.6.0-bin.tar.gz

    然后进入flume的目录,修改conf下的flume-env.sh,在里面配置JAVA_HOME

    2.根据数据采集的需求配置采集方案,描述在配置文件中(文件名可任意自定义)

    3.指定采集方案配置文件,在相应的节点上启动flume agent

    先用一个最简单的例子来测试一下程序环境是否正常

    1.先在flume的conf目录下新建一个文件

    vi netcat-logger.conf

    # 定义这个agent中各组件的名字

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

     

    # 描述和配置source组件:r1

    a1.sources.r1.type = netcat

    a1.sources.r1.bind = localhost

    a1.sources.r1.port = 44444

     

    # 描述和配置sink组件:k1

    a1.sinks.k1.type = logger

     

    # 描述和配置channel组件,此处使用是内存缓存的方式

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

     

    # 描述和配置source  channel   sink之间的连接关系

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    2.启动agent去采集数据

    bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1  -Dflume.root.logger=INFO,console

    -c conf   指定flume自身的配置文件所在目录

    -f conf/netcat-logger.conf  指定我们所描述的采集方案

    -n a1  指定我们这个agent的名字

    3.测试

    先要往agent的source所监听的端口上发送数据,让agent有数据可采

    随便在一个能跟agent节点联网的机器上

    telnet anget-hostname  port   (telnet localhost 44444)

     

    4.2 采集案例

    4.2.1 采集目录到HDFS

    结构示意图:

    采集需求:某服务器的某特定目录下,会不断产生新的文件,每当有新文件出现,就需要把文件采集到HDFS中去

    根据需求,首先定义以下3大要素

    • 数据源组件,即source ——监控文件目录 :  spooldir

          spooldir特性:

    1.                监视一个目录,只要目录中出现新文件,就会采集文件中的内容
    2.                采集完成的文件,会被agent自动添加一个后缀:COMPLETED
    3.                 所监视的目录中不允许重复出现相同文件名的文件
    • 下沉组件,即sink——HDFS文件系统  :  hdfs sink
    • 通道组件,即channel——可用file channel 也可以用内存channel

    配置文件编写:

    #定义三大组件的名称

    agent1.sources = source1

    agent1.sinks = sink1

    agent1.channels = channel1

     

    # 配置source组件

    agent1.sources.source1.type = spooldir

    agent1.sources.source1.spoolDir = /home/hadoop/logs/

    agent1.sources.source1.fileHeader = false

     

    #配置拦截器

    agent1.sources.source1.interceptors = i1

    agent1.sources.source1.interceptors.i1.type = host

    agent1.sources.source1.interceptors.i1.hostHeader = hostname

     

    # 配置sink组件

    agent1.sinks.sink1.type = hdfs

    agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M

    agent1.sinks.sink1.hdfs.filePrefix = access_log

    agent1.sinks.sink1.hdfs.maxOpenFiles = 5000

    agent1.sinks.sink1.hdfs.batchSize= 100

    agent1.sinks.sink1.hdfs.fileType = DataStream

    agent1.sinks.sink1.hdfs.writeFormat =Text

    agent1.sinks.sink1.hdfs.rollSize = 102400

    agent1.sinks.sink1.hdfs.rollCount = 1000000

    agent1.sinks.sink1.hdfs.rollInterval = 60

    #agent1.sinks.sink1.hdfs.round = true

    #agent1.sinks.sink1.hdfs.roundValue = 10

    #agent1.sinks.sink1.hdfs.roundUnit = minute

    agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

    # Use a channel which buffers events in memory

    agent1.channels.channel1.type = memory

    agent1.channels.channel1.keep-alive = 120

    agent1.channels.channel1.capacity = 500000

    agent1.channels.channel1.transactionCapacity = 600

     

    # Bind the source and sink to the channel

    agent1.sources.source1.channels = channel1

    agent1.sinks.sink1.channel = channel1

    Channel参数解释:

    capacity:默认该通道中最大的可以存储的event数量

    trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量

    keep-alive:event添加到通道中或者移出的允许时间

     

    4.2.2 采集文件到HDFS

    采集需求:比如业务系统使用log4j生成的日志,日志内容不断增加,需要把追加到日志文件中的数据实时采集到hdfs

    根据需求,首先定义以下3大要素

    1. 采集源,即source——监控文件内容更新 :  exec  ‘tail -F file’
    2. 下沉目标,即sink——HDFS文件系统  :  hdfs sink
    3. Source和sink之间的传递通道——channel,可用file channel 也可以用 内存channel

    配置文件编写:

    agent1.sources = source1

    agent1.sinks = sink1

    agent1.channels = channel1

     

    # Describe/configure tail -F source1

    agent1.sources.source1.type = exec

    agent1.sources.source1.command = tail -F /home/hadoop/logs/access_log

    agent1.sources.source1.channels = channel1

     

    #configure host for source

    agent1.sources.source1.interceptors = i1

    agent1.sources.source1.interceptors.i1.type = host

    agent1.sources.source1.interceptors.i1.hostHeader = hostname

     

    # Describe sink1

    agent1.sinks.sink1.type = hdfs

    #a1.sinks.k1.channel = c1

    agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M

    agent1.sinks.sink1.hdfs.filePrefix = access_log

    agent1.sinks.sink1.hdfs.maxOpenFiles = 5000

    agent1.sinks.sink1.hdfs.batchSize= 100

    agent1.sinks.sink1.hdfs.fileType = DataStream

    agent1.sinks.sink1.hdfs.writeFormat =Text

    agent1.sinks.sink1.hdfs.rollSize = 102400

    agent1.sinks.sink1.hdfs.rollCount = 1000000

    agent1.sinks.sink1.hdfs.rollInterval = 60

    agent1.sinks.sink1.hdfs.round = true

    agent1.sinks.sink1.hdfs.roundValue = 10

    agent1.sinks.sink1.hdfs.roundUnit = minute

    agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

     

    # Use a channel which buffers events in memory

    agent1.channels.channel1.type = memory

    agent1.channels.channel1.keep-alive = 120

    agent1.channels.channel1.capacity = 500000

    agent1.channels.channel1.transactionCapacity = 600

     

    # Bind the source and sink to the channel

    agent1.sources.source1.channels = channel1

    agent1.sinks.sink1.channel = channel1

    3.两个agent级联

    4.3 更多source和sink组件

    Flume支持众多的source和sink类型,详细手册可参考官方文档

    http://flume.apache.org/FlumeUserGuide.html

    4.4 HA Flume配置案例

    在完成单点的Flume NG搭建后,下面我们搭建一个高可用的Flume NG集群,架构图如下所示:

     

    图中,我们可以看出,Flume的存储可以支持多种,这里只列举了HDFS和Kafka(如:存储最新的一周日志,并给Spark Streaming系统提供实时日志流。

    4.4.1 角色分配

    Flume的Agent和Collector分布如下表所示:

    名称 

    HOST

    角色

    Agent1

    mini1

    Web Server

    Agent2

    mini2

    Web Server

    Agent3

    mini3

    Web Server

    Collector1

    mini4

    AgentMstr1

    Collector2

    mini5

    AgentMstr2

    图中所示,Agent1,Agent2,Agent3数据分别流入到Collector1和Collector2,Flume NG本身提供了Failover机制,可以自动切换和恢复。在上图中,有3个产生日志服务器分布在不同的机房,要把所有的日志都收集到一个集群中存储。下面我们开发配置Flume NG集群。

    4.4.2 配置

    在下面单点Flume中,基本配置都完成了,我们只需要新添加两个配置文件,它们是agent.properties和collector.properties,其配置内容如下所示:

    1、agent配置

    [root@mini1 apache-flume-1.6.0-bin]# vi conf/agent.properties

    #agent1 name

    agent1.channels = c1

    agent1.sources = r1

    agent1.sinks = k1 k2

     

    #set gruop

    agent1.sinkgroups = g1

     

    #set channel

    agent1.channels.c1.type = memory

    agent1.channels.c1.capacity = 1000

    agent1.channels.c1.transactionCapacity = 100

     

    agent1.sources.r1.channels = c1

    agent1.sources.r1.type = exec

    agent1.sources.r1.command = tail -F /root/log/test.log

     

    agent1.sources.r1.interceptors = i1 i2

    agent1.sources.r1.interceptors.i1.type = static

    agent1.sources.r1.interceptors.i1.key = Type

    agent1.sources.r1.interceptors.i1.value = LOGIN

    agent1.sources.r1.interceptors.i2.type = timestamp

     

    # set sink1

    agent1.sinks.k1.channel = c1

    agent1.sinks.k1.type = avro

    agent1.sinks.k1.hostname = mini2

    agent1.sinks.k1.port = 52020

     

    # set sink2

    agent1.sinks.k2.channel = c1

    agent1.sinks.k2.type = avro

    agent1.sinks.k2.hostname = mini3

    agent1.sinks.k2.port = 52020

     

    #set sink group

    agent1.sinkgroups.g1.sinks = k1 k2

     

    #set failover

    agent1.sinkgroups.g1.processor.type = failover

    agent1.sinkgroups.g1.processor.priority.k1 = 10

    agent1.sinkgroups.g1.processor.priority.k2 = 1

    agent1.sinkgroups.g1.processor.maxpenalty = 10000

    启动命令

    bin/flume-ng agent -n agent1 -c conf -f conf/agent.properties -Dflume.root.logger=DEBUG,console

    2.collector配置

    [root@mini2 conf]# vi collector.properties

    #set Agent name

    a1.sources = r1

    a1.channels = c1

    a1.sinks = k1

     

    #set channel

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

     

    # other node,nna to nns

    a1.sources.r1.type = avro

    a1.sources.r1.bind = mini2

    a1.sources.r1.port = 52020

    a1.sources.r1.interceptors = i1

    a1.sources.r1.interceptors.i1.type = static

    a1.sources.r1.interceptors.i1.key = Collector

    a1.sources.r1.interceptors.i1.value = mini2

    a1.sources.r1.channels = c1

     

    #set sink to hdfs

    a1.sinks.k1.type=hdfs

    a1.sinks.k1.hdfs.path=/home/hdfs/flume/logdfs

    a1.sinks.k1.hdfs.fileType=DataStream

    a1.sinks.k1.hdfs.writeFormat=TEXT

    a1.sinks.k1.hdfs.rollInterval=10

    a1.sinks.k1.channel=c1

    a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d

    在mini3上,需要修改上述配置中的红色字体主机名为mini3

    启动命令:

    bin/flume-ng agent -n a1 -c conf -f conf/collector.properties -Dflume.root.logger=DEBUG,console

    4.4.3 FAILOVER测试

    下面我们来测试下Flume NG集群的高可用(故障转移)。场景如下:我们在Agent1节点上传文件,由于我们配置Collector1的权重比Collector2大,所以 Collector1优先采集并上传到存储系统。然后我们kill掉Collector1,此时有Collector2负责日志的采集上传工作,之后,我 们手动恢复Collector1节点的Flume服务,再次在Agent1上次文件,发现Collector1恢复优先级别的采集工作。具体截图如下所 示:

    Collector1优先上传

    HDFS集群中上传的log内容预览

    Collector1宕机,Collector2获取优先上传权限

    重启Collector1服务,Collector1重新获得优先上传的权限

     

    展开全文
  • Flume_概述

    2017-12-06 13:45:22
    flume概述flume是分布式的日志收集系统,它将各个服务器中的数据收集起来并送到指定的地方,比如送到HDFS,简单来说flume就是收集日志的。 flume的核心是把数据从数据源(source)收集过来,再将收集到的数据送到指定...
    1. flume概述

      flume是分布式的日志收集系统,它将各个服务器中的数据收集起来并送到指定的地方,比如送到HDFS,简单来说flume就是收集日志的。
      flume的核心是把数据从数据源(source)收集过来,再将收集到的数据送到指定的目的地(sink)。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,flume再删除己缓存的数据。

      这里写图片描述

    2. flume架构

      agent本身是一个java进程,运行在日志收集节点—所谓日志收集节点就是服务器节点。agent里面包含3个核心的组件:source—->channel—–>sink,类似生产者、仓库、消费者的架构。

      source:source组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定义。 source是从一些其他产生数据的应用中接收数据的活跃组件。source可以监听一个或者多个网络端口,用于接收数据或者从本地文件系统读取数据。每个source必须至少连接一个channel。基于一些标准,一个source可以写入几个channel,复制事件到所有或者某些channel。source通过处理器-拦截器-选择器路由写入多个channel。channel选择器是决定每个事件必须写入到source附带的哪个channel组件,而拦截器可以用来插入或删除事件中的数据。

      channel:channel组件在agent中是专门用来存放临时数据的——对采集到的数据进行简单的缓存,可以存放在memory、jdbc、file等等。一般来说channel是被动组件,channel像队列,source写入到它们,sink从它们中读取。多个source可以安全写入到相同的channel,并且多个sink可以从相同的channel读取。但是一个sink只能从一个channel读取。

      sink:sink组件是用于把数据发送到目的地的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、hbase、solr、自定义。sink连续轮询各自的channel来读取和删除事件。sink将事件推送到下一个阶段(RPC Sink的情况下),或到最终的目的地。一旦在下一个阶段或其目的地中数据是安全的,sink通过事务提交通知channel,可以从channel中删除这些事件。sink运行器运行一个sink组,sink组可含有一个或多个sink。如果组中只存在一个sink,那么没有组将更有效率。每个sink组有一个sink处理器,处理器选择组中的sink之一去处理下一个事件集合。

      这里写图片描述

    3. flume拓扑

      flume可以支持多级flume的agent,即flume可以前后相继,例如sink可以将数据写到下一个agent的source中,这样的话就可以连成串了,可以整体处理了。这个flume agent链条可以用于将数据从一个位置移动到另一个位置——特别是从生产数据的应用程序到HDFS、HBase等。agent是一个Java应用程序,接收或者生产数据并缓冲数据,直到最终写入到agent或者存储或者索引系统。flume还支持扇入(fan-in)、扇出(fan-out)。扇入就是source可以接受多个输入,扇出就是sink可以将数据输出多个目的地destination中。

      基于有多少服务器生产多少数据,agent可以组织到一个、两个或者更多的层,每一层的agent使用RPC sink-RPC source的组合,从一层转发数据到另一层。当应用程序产生更多的数据或添加了更多服务器,很容易通过最外层添加更多的agent,并配置它们向第二层的机器写数据来扩展。

      这里写图片描述

    4. event

      在整个数据的传输的过程中,流动的是event,即事务保证是在event级别进行的。event将传输的数据进行封装,是flume传输数据的基本单位,如果是文本文件,通常是一行记录,event也是事务的基本单位。event从source流向channel,再到sink,本身为一个字节数组,并可携带headers(头信息)。event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。

      事件event具有一个主体和一个报头集合。主体是一个字节数组,通常是flume传送的负载。报头被表示为一个map,其中有字符串key和字符串value。报头并不是用来传输数据的,只为了路由的意图和跟踪发送事件的优先级和严重性。报头也可以用于给事件增加ID或者UUID

    5. 动态路由

      flume的一个重要特性是动态路由。多路复用channel选择器是一种通过特定报头的值,检查每一个经过选择器传入事件的channel选择器,基于此值,它选择一组事件必须写入的channel。这是flume内置的,选择的报头、值和channel却是可配置的。

    6. flume的无数据丢失保证

      flume保证事件至少一次被送到它们的目的地,意味着通过flume到存储系统的发送事件至少将存储一次。如果用例是重复敏感型的,在事件中插入唯一标识符通常是一个好主意。后续的处理工作可以使用这些标识符删除重复的数据。

    7. 批量的重要性

      由于实际调用相关的元数据和所有额外的TCP开销,RPC调用有额外开销。当发送的数据量非常小时,这些开销是每个RPC调用成本的一大部分,导致不必要的网络利用率等。为避免这种开销,单个RPC调用中批量处理几个事件或从远程客户端写入,总是一个好主意。对于最大为几个字节的事件,批量大小在100和1000之间通常性能很好(具体而定)。
      批量太大也会有问题,如:网络上太多的碎片。同时批量太大会增加事件重复的风险,因为每一批失败最终可能会导致大量的事件再次写入,如果有事件已经成功写入HDFS,这些事件最终会重新写入

    8. Hadoop业务的整体开发流程

      这里写图片描述

    9. 设置Agent

      ① 配置单个组件
      Flume agent配置存储在一个本地配置文件中。这是一个跟Java 属性文件格式一样的文本文件。一个或者多个agent可以指定同一个配置文件来进行配置。配置文件包括每个source的属性,agent中的sink和channel以及它们是如何连接构成数据流。

      ② 碎片集合
      agent需要知道每个组件加载什么和它们是怎样连接构成流。这通过列出agent中每个source、sink和channel和指定每个sink和source连接的channel。例如,一个agent流事件从一个称为avroWeb的Avro sources通过一个称为file-channel的文件channel流向一个称为hdfs-cluster1的HDFS sink。配置文档将包含这些组件的名字和avroWeb source和hdfs-cluster1 sink中间共享的file-channel。

      ③ 开始一个agent
      agent通过一个称为flume-ng shell位于Flume项目中bin目录下的脚本来启动。须在命令行中指定一个agent名字,配置目录和配置文档
      bin/flumengagentnagent_name -c conf -f conf/flume-conf.properties.template
      现在agent将会开始运行给定的属性文档中的source和sink。

      ④ 记录原始数据
      通过摄取管道获取记录到Flume log文件的原始数据流大多不会描述生产环境的行为因为数据里面可能包含敏感数据或者安全相关的配置,例如安全密钥。默认情况,Flume不会记录这些信息。另一方面,如果数据管道损坏,Flume会试图提供一些线索来调试问题。
      一个调试事件管道的方法是设置一个额外的内存channel来连接Logger Sink,用来将所有事件数据都记录到Flume log。然而,在一些情况之下,这种方法还是不足以解决问题。
      为了能够记录配置相关的日志,设置-Dorg.apache.flume.log.printconfig=true。这个也可以通过命令行或者在flume-env.sh设置JAVA_OPTS属性。
      为了能够记录数据,通过跟上面相同的方式来设置-Dorg.apache.flume.log.rawdata=true 。对于大部分组件来说,log4j的打印级别必须设置为DEBUG或者TRACE来让指定事件的日志信息出现在Flume log中。
      下面是一个例子能够保证将配置信息和原始数据在log4j的打印级别设置在DEBUG的情况下输出到控制台:

      $ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true     
      

      ⑤ zookeeper基础配置
      Flume支持通过ZooKeeper来配置Agent。这是个实验性的特性。配置文档必须上传到ZooKeeper中,(在一个可配置的前缀下)。这个配置文档存储在ZooKeeper节点数据下。下面是ZooKeeper下的节点树结构:

      - /flume
       |- /a1 [Agent config file]
       |- /a2 [Agent config file]
      

      一旦配置文档上传完成,通过下面选项来启动agent

      $ bin/flume-ng agent –conf conf -z zkhost:2181,zkhost1:2181 -p /flume –name a1 -Dflume.root.logger=INFO,console
      

      ⑥ 安装第三方插件
      Flume拥有一个全备的插件架构。虽然Flume自带许多开箱即用的sources、channel,sinks和serializers等,同时也存在许多跟Flume之外的实现。
      Flume曾经支持在flume-env.sh中的FLUME_CLASSPATH中添加一些自定义的Flume组件,现在Flume支持一个特殊路径plugins.d自动地安装那些按照指定格式存储的插件。
      这个plugins.d目录位于FLUME_HOME/plugins.d。在启动的时候,flume-ng启动脚本扫描plugins.d目录下的遵循格式的插件并在启动java时将它们放在合适的路径。
      在plugins.d下的每个插件(子目录)包含三个子目录:
      1).lib – 插件的jar包
      2).libext – 插件的依赖jar包
      3).native – 任何需要的本地库,例如.so文档。

    10. 数据获取

      Flume支持从外部来源获取数据的一系列机制。
      ① RPC
      Flume 中的Avro client 可以用avro RPC 机制来发送一个给定文档给Flume Avro source。
      $ bin/flume-ng avro-client -H localhost -p 41414 -F /usr/logs/log.10
      上面的命令行发送/usr/logs/log.10的内容给监听在那个端口的Flume source。

      ② Executing commands(执行命令行)
      由一个执行source来执行给出的命令和消费输出。输出的一行文本带着(‘\r’)或者(‘\n’)或者两者皆有。
      说明:Flume不支持一个结尾符作为一个资源。所以可以用一个可执行的源码来包装结尾命令输出文件。

      ③ Network streams
      Flume支持下面的机制来读取受欢迎的日志类型,例如:
      1).Avro
      2).Thrift
      3).Syslog
      4).Netcat

    展开全文
  • flume分布式日志采集系统实战

    千人学习 2018-10-22 21:38:05
    随着公司业务的不断增长,划分了许多应用,不同应用的日志在不同服务器上面,很难进行统一管理,通过学习该课程,你可以自己搭建日志采集系统,可以进行数据分析,挖掘等工作
  • 第十九记·Flume详解

    千次阅读 2018-08-13 16:10:44
     Flume最早是Cloudera提供的日志收集系统,目前是Apache下的一个孵化项目,Flume支持在日志系统中定制各类数据发送方,用于收集数据。Flume是一种分布式,可靠且可用的服务,用于有效地收集,聚合...

    XY个人记

        Apache Flume 用于移动大规模批量流数据到 HDFS 系统。从Web服务器收集当前日志文件数据到HDFS聚集用于分析,一个常见的用例是Flume。

        Flume最早是Cloudera提供的日志收集系统,目前是Apache下的一个孵化项目,Flume支持在日志系统中定制各类数据发送方,用于收集数据。Flume是一种分布式,可靠且可用的服务,用于有效地收集,聚合和移动大量日志数据。它具有基于流数据流的简单灵活的架构。它具有可靠的可靠性机制和许多故障转移和恢复机制,具有强大的容错能力。它使用简单的可扩展数据模型,允许在线分析应用程序。Flume可以将应用产生的数据存储到任何集中存储器中,比如HDFS,HBase。

    Flume 支持多种来源,如:

    • “tail”(从本地文件,该文件的管道数据和通过Flume写入 HDFS,类似于Unix命令“tail”)

    • 系统日志

    • Apache log4j (允许Java应用程序通过Flume事件写入到HDFS文件)。

     

    Flume的工作方式有两种Flume-og(Cloudera)和Flume-ng(Apache)。

        Flume-og采用了多Master的方式。为了保证配置数据的一致性,Flume引入了ZooKeeper,用于保存配置数据,ZooKeeper本身可保证配置数据的一致性和高可用,另外,在配置数据发生变化时,ZooKeeper可以通知Flume Master节点。Flume Master间使用gossip协议同步数据。
        Flume-ng最明显的改动就是取消了集中管理配置的 Master 和 Zookeeper,变为一个纯粹的传输工具。Flume-ng另一个主要的不同点是读入数据和写出数据现在由不同的工作线程处理(称为 Runner)。 在 Flume-og 中,读入线程同样做写出工作(除了故障重试)。如果写出慢的话(不是完全失败),它将阻塞 Flume 接收数据的能力。这种异步的设计使读入线程可以顺畅的工作而无需关注下游的任何问题。而我们现在用的就是Flume-ng。

    Flume-ng

    (官网:http://flume.apache.org/

    Flume-ng只有一个角色的节点:agent的角色,agent由source、channel、sink组成。

        Source:Source用于采集数据,Source是产生数据流的地方,同时Source会将产生的数据流传输到Channel

        Channel:连接 sources 和 sinks ,这个有点像一个队列。

        Sink:从Channel收集数据,将数据写到目标源,可以是下一个Source也可以是HDFS或者HBase。

        Event是Flume数据传输的基本单元

        source监控某个文件,将数据拿到,封装在一个event当中,并put/commit到channel当中,channel是一个队列,队列的优点是先进先出,放好后尾部一个个event出来,sink主动去从channel当中去拉数据,sink再把数据写到某个地方,比如HDFS上面去。

    设置多代理流程

    为了跨多个代理或跳数据流,先前代理的接收器和当前跳的源需要是avro类型,接收器指向源的主机名(或IP地址)和端口。

    合并

        日志收集中非常常见的情况是大量日志生成客户端将数据发送到连接到存储子系统的少数消费者代理。例如,从数百个Web服务器收集的日志发送给写入HDFS集群的十几个代理。

        这可以通过使用avro接收器配置多个第一层代理在Flume中实现,所有这些都指向单个代理的avro源(同样,您可以在这种情况下使用简单的方式 sources/sinks/clients)。第二层代理上的此源将接收的事件合并到单个信道中,该信道由接收器消耗到其最终目的地。

    多路复用流程

    Flume支持将事件流多路复用到一个或多个目的地。这是通过定义可以复制或选择性地将事件路由到一个或多个通道的流复用器来实现的。

        上面的例子显示了来自代理“foo”的源代码将流程扇出到三个不同的通道。这个扇出可以复制或多路复用。在复制流的情况下,每个事件被发送到所有三个通道。对于多路复用情况,当事件的属性与预配置的值匹配时,事件将传递到可用通道的子集。例如,如果一个名为“txnType”的事件属性设置为“customer”,那么它应该转到channel1和channel3,如果它是“vendor”,那么它应该转到channel2,否则转到channel3。可以在代理的配置文件中设置映射。

    安装配置Flume

    解压、并修改配置文件flume-env.sh

    $ tar -zxf ../../software/apache-flume-1.7.0-bin.tar.gz -C ./
    $ mv apache-flume-1.7.0-bin flume-1.7.0
    $ cp flume-env.sh.template flume-env.sh
    
    配置flume-env.sh中的
    JAVA_HOME=/opt/modules/jdk1.7.0_67
    

    运行之前先添加一些flume依赖的hdfs的jar包,添加到flume的lib目录

    完成后配置是否成功

    bin/flume-ng version

    bin/flume-ng.sh 查看flume的flume-ng命令相关命令

    Usage: bin/flume-ng <command> [options]...
    
    commands:
      agent                     run a Flume agent
      avro-client               run an avro Flume client
    
    global options:
      --conf,-c <conf>          use configs in <conf> directory
    
    agent options:
      --name,-n <name>          the name of this agent (required)
      --conf-file,-f <file>     specify a config file (required if -z missing)
    
    avro-client options:
      --rpcProps,-P <file>   RPC client properties file with server connection params
      --host,-H <host>       hostname to which events will be sent
      --port,-p <port>       port of the avro source
      --dirname <dir>        directory to stream to avro source
      --filename,-F <file>   text file to stream to avro source (default: std input)
      --headerFile,-R <file> File containing event headers as key/value pairs on each new line

     

    根据官网上的配置运行官网上的一个配置案例:

    图片中的红框中相当于一个Agent
    下面这个flume-test.properties相当于一个Agent
    首先vi一个flume-test.properties
    复制官网上的案例进行更改

    # Name the components on this agent 三个组件的别名 a1.sources = r1 a1.sinks = k1 a1.channels = c1

    下面是三个组件的属性
    # Describe/configure the source a1.sources.r1.type = netcat     #类型
    a1.sources.r1.bind = localhost     #主机名
    a1.sources.r1.port = 44444    #端口号

     

    #capacity 肯定要比 transactionCapacity 参数大
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000 memory最多能存储多少个events事件
    a1.channels.c1.transactionCapacity = 100 一次能提交多少个events数量

    一个source,可以绑定多个channel
    一个sink,只能绑定一个channel
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1 #绑定到c1
    a1.sinks.k1.channel = c1 #

    更改log4j的日志级别,flume.root.logger=INFO,LOGFILE 更改为flume.root.logger=INFO,console(在控制台输出)


    查看系统有没有安装telnet,没有的就安装

    rpm -qa | grep telnet
    安装
    sudo yum -y install telnet
    或
    yum -y install nc

    运行flume-test.properties

    $ bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-test.properties 

    执行流程:先创建channel ---> 再创建sink ---> 再创建source

    完成之后使用telnet链接或者使用netcat链接,连接成功后输入信息

    telnet 192.168.142.167 44444 或 nc hadoop01.com 44444

    之后可以在44444端口看到监控的到的信息

    退出telnet:输入ctrl + ] 然后输入quit

     

    运行自定义案例

    需求:监控apache服务器的日志,利用flume监控某一个文件
    监控:/var/log/httpd/access_log日志文件

    首先

    安装httpd服务
    安装完成之后,会有个目录生成 /var/www/html
    启动服务
    浏览网页:输入主机名[hostname] 端口号默认是80。没有主页默认进到下面页面,在 /var/www/html中
    创建一个index.html文件
    会默认进入到index.html中

    $ sudo yum -y install httpd    #安装httpd服务
    $ sudo service httpd start    #启动服务
    $ sudo vi index.html    #穿件index.html文件

    访问主页的时候日志产生的路径:/var/log/httpd/access_log

    然后

    创建一个文件flume-apache-sink-hdfs.properties

    $ cp flume-test.properties flume-apache-sink-hdfs.properties    #copy后更改

    官网案例如图

    配置如下:

    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -f /var/log/httpd/access_log
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = hdfs://hadoop01.com/flume/webdata

    运行:

    $ bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-apache-sink-hdfs.properties

    日志会实时记录,每次刷新都会自动加载数据并上传到HDFS

     

    优化 sink hdfs 类型

    在HDFS中我们看到产生的日志文件很多但是非常小,这样就违背了HDFS的原则(数据文件要大,数量要小)

    关于临时文件生成目标文件的条件

    rollInterval
    默认值:30
    hdfs sink间隔多长将临时文件滚动成最终目标文件,单位:秒;
    如果设置成0,则表示不根据时间来滚动文件;
    注:滚动(roll)指的是,hdfs sink将临时文件重命名成最终目标文件,并新打开一个临时文件来写入数据;

    rollSize
    默认值:1024
    当临时文件达到该大小(单位:bytes)时,滚动成目标文件;
    如果设置成0,则表示不根据临时文件大小来滚动文件;

    rollCount
    默认值:10
    当events数据达到该数量时候,将临时文件滚动成目标文件;
    如果设置成0,则表示不根据events数据来滚动文件;

    idleTimeout
    默认值:0
    当目前被打开的临时文件在该参数指定的时间(秒)内,没有任何数据写入,则将该临时文件关闭并重命名成目标文件;

    解决:文件数量多,文件大小太小
    hdfs.rollInterval = 600 (这个地方最好还是设置一个时间)
    hdfs.rollSize = 1048576 (1M,134217728-》128M)
    hdfs.rollCount = 0
    hdfs.minBlockReplicas = 1 (这个不设置的话,上面的参数有可能不会生效)

    多次刷新查看

    只创建了一个文件不会创建多个,等待达到相应条件以后再生成目标文件,说明我们的设置有效

     

    在hdfs文件上设置时间格式分层 年月日/时 每小时生成一个文件

    解决:

    hdfs.useLocalTimeStamp = true #开启
    hdfs.round = true
    hdfs.roundValue = 1
    hdfs.roundUnit = hour #根据小时进行分层

    因为我们设置的是按照小时分层,等到16点再次操作查看又会生成一个新的文件层

     

    利用flume监控某一个文件目录(/home/beifeng/logs)

    需求:将目录下滚动好的文件实时抽取到HDFS上

    拷贝一个文件flume-spooldir-sink-hdfs.properties,并拷贝hadoop的的logs用来进行测试

    $ cp flume-apache-sink-hdfs.properties flume-spooldir-sink-hdfs.properties
    $ cp logs/ -r /opt/datas/

    按照文档进行如下配置

    类型选择
    source:spooldir
    channel:file
    sink:hdfs

    • source:spooldir(已经生成好的最终的数据文件)
    • recursiveDirectorySearch 是否监视子目录以查找要读取的新文件
    • includePattern 正则表达式,指定要包含的文件 (只.csv数据文件,是正则匹配)
    • ignorePattern 正则表达式,指定要忽略的文件 (不抽取.csv数据文件,是正则匹配)
    • 缺点:不能对目录文件进行修改,如果有追加内容的文本文件,是不允许的(有可能不会被抽取,有可能会有错误)

     

    flume监控目录,支持文件修改,并记录文件状态

    • source:taildir (类似exec + spooldir的组合)
    • filegroups :设置source组 可设置多个 filegroups = f1
    • filegroups.<filegroupName>:设置组员的监控目录和监控文件类型,使用正则表示,只能监控文件
    • positionFile:设置定位文件的位置,以JSON格式写入给定位置文件上每个文件的最后读取位置

    配置如图

    # example.conf: A single-node Flume configuration
    
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /opt/datas/logs
    
    # Describe the sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = hdfs://hadoop01.com/flume/webdata/spooldir/%y%m%d/%H
    a1.sinks.k1.hdfs.rollInterval = 600
    a1.sinks.k1.hdfs.rollSize = 1048576
    a1.sinks.k1.hdfs.rollCount = 0
    a1.sinks.k1.hdfs.minBlockReplicas = 1
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue	= 1
    a1.sinks.k1.hdfs.roundUnit = hour
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /opt/modules/apache/flume-1.7.0/checkpointDir
    a1.channels.c1.dataDirs = /opt/modules/apache/flume-1.7.0/dataDirs
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    运行:flume-spooldir-sink-hdfs.properties

    $ bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-spooldir-sink-hdfs.properties

    抽取完的文件都被重命名(Spooling Directory Source会把新文件的内容读取并推送到Channel中,并且把已读取的文件重命名成指定格式或者把文件删除

    查看HDFS文件大小都是1G的(a1.sinks.k1.hdfs.rollSize = 1048576),大小会有一点的偏差,所以不能设置成128M(会造成分块区的浪费)

    a1.sources.r1.recursiveDirectorySearch = true  监听子级目录 默认为false
    
    hdfs上数据默认是二进制的文件类型:bin/hdfs dfs -text /
    可以修改hdfs.fileType = DataStream(数据流)
        hdfs.writeFormat = Text 改为文本格式
    当使用DataStream时候,文件不会被压缩,不需要设置hdfs.codeC;
    当使用CompressedStream时候,必须设置一个正确的hdfs.codeC值;
    hdfs.codeC压缩编码解码器 --》snappy压缩
    batchSize默认值:100 每个批次刷新到HDFS上的events数量;  

     

     

    sink-hive

    在flume1.6以上包括1.6支持sink hive

    首先,将hive的一些jar拷贝过来 flume的lib目录下,将mysql的驱动jar拷贝过来,除了有几个在lib目录下还有几个在hcatalog下

     

    配置 flume agent
        source:netcat
        channel:Memory
        sink:hive

    做一个基于netcat的配置

    在官网上可以查看Hive Sink的配置

    serializer: 负责解析事件中的字段并将它们映射到hive表中的列

    DELIMITED

    DELIMITED:普通文本json文件 (不需要配置,JSON中的对象名称直接映射到Hive表中具有相同名称的列,内部使用org.apache.hive.hcatalog.data.JsonSerDe)

    serializer.delimiter:传入数据中的字段分隔符,用双引号括起来,例如"\t",默认用‘,’隔开

    serializer.fieldnames:从输入字段到hive表中的列的映射,指定为hive表列名称的逗号分隔列表

    serializer.serdeSeparator:输出字段分隔符,单引号括起来,例如'\t'

    a1.sinks.k1.type = hive
    a1.sinks.k1.hive.metastore = thrift://hadoop01.com:9083 #需要在hive中配置metastore
    a1.sinks.k1.hive.database = flume_test
    a1.sinks.k1.hive.table = flume_user
    a1.sinks.k1.serializer = DELIMITED
    # hive.partition = 分区
    a1.sinks.k1.serializer.delimiter = "\t" #输入
    a1.sinks.k1.serializer.fieldnames = user_id,user_name,user_age 字段
    a1.sinks.k1.serializer.serdeSeparator = '\t' #输出

    hive-site.xml中hive参数设置:

    <property>
        <name>hive.metastore.uris</name>
        <value>thrift://hadoop01.com:9083</value>
    </property>

     

    配置完成后启动hive的元数据服务,开启hive

    $ bin/hive --service metastore &
    $ bin/hive

    建库建表

    创建库和表 (表必须是CLUSTERED BY ,INTO BUCKETS --->分桶的表)
    create database flume_test;
    
    create database flume_test;
    use flume_test;
    create table flume_user(
    user_id int,
    user_name string,
    user_age int
    )CLUSTERED BY (user_id) INTO 2 BUCKETS
    row format delimited fields terminated by '\t'
    stored as orc;

    运行

    $ bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-sink-hive.properties

    使用nc链接并输入 1 jeffrey 18

    $ nc hadoop01.com 44444

    键入完成后发现报错

    Caused by: org.apache.thrift.TApplicationException: Internal error processing open_txns
            at org.apache.thrift.TApplicationException.read(TApplicationException.java:111)
            at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:71)
            at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_open_txns(ThriftHiveMetastore.java:3834)
            at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.open_txns(ThriftHiveMetastore.java:3821)

    解决办法:配置hive-site.xml

     

    •        hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; -》打开一部分事务支持

    协同配置

    •        hive.compactor.initiator.on=true; -》运行启动程序和清除线程,用于打开所需参数的完整列表事务
    •        hive.compactor.worker.threads=1; -》增加工作线程的数量将减少花费的时间
    •        hive.support.concurrency=true;  -》是否支持并发,默认是false
    •        hive.enforce.bucketing=true;   -》是否启用bucketing,写入table数据
    <property>
    	<name>hive.txn.manager</name>
    	<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
    </property>
    <property>
    	<name>hive.compactor.initiator.on</name>
    	<value>true</value>
    </property>
    <property>
    	<name>hive.compactor.worker.threads</name>
    	<value>1</value>
    </property>
    <property>
    	<name>hive.support.concurrency</name>
    	<value>true</value>
    </property>
    <property>
    	<name>hive.enforce.bucketing</name>
    	<value>true</value>
    </property>

    启动metastore时报错:

    Table 'metastore.COMPACTION_QUEUE' doesn't exist

    配置以下属性再启动,又报错,然后在去掉此属性

    Error rolling back: Can't call rollback when autocommit=true
    <property>
        <name>hive.in.test</name>
        <value>true</value>
    </property>
    
    这个是用来创建COMPACTION_QUEUE这张表的,所以开始启动时因为没有此表报错,加上此表后生成此表删除即可

    在开启metastore正常,然后运行,使用nc链接并输入 :1 jeffrey 18

    $ bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-sink-hive.properties 
    $ nc hadoop01.com 44444

     

    接下来做一个基于exec的配置

    首先copy一个文件,并运行

    $ cp conf/flume-sink-hive.properties conf/flume-sink-hive2.properties 
    $ bin/flume-ng agent --conf conf  --name a1 --conf-file conf/flume-sink-hive2.properties 

    在HDFS中也可以查看到我们分桶的数据

     

    扇入、扇出的案例

        扇入:多个对一个
        开启三个flume agent:

    第一个:监控httpd日志
      source:exec
      channel:memory
      sink:avro
    第二个:监控hive的日志
      source:exec
      channel:memory
      sink:avro
    第三个:agent:把前两个的数据进行汇合--》hdfs
      source:avro
      channel:memory
      sink:hdfs

    拷贝复制三个文件

    $ cp conf/flume-apache-sink-hdfs.properties conf/flume-avro-sink.properties
    $ cp conf/flume-apache-sink-hdfs.properties conf/flume-avro-sink2.properties
    $ cp conf/flume-apache-sink-hdfs.properties conf/flume-avro-source.properties

    检查端口号是否被占用:$ sodu netstat -anp | grep 8888,如果没有被占用可以使用8888端口

    配置完成后运行
    首先运行第三个,“&“ 后台运行此任务,关闭需要找到任务进程pid ,然后kill

    $ bin/flume-ng agent --conf conf  --name a3 --conf-file conf/flume-avro-source.properties &
    $ bin/flume-ng agent --conf conf  --name a1 --conf-file conf/flume-avro-sink.properties & 
    $ bin/flume-ng agent --conf conf  --name a2 --conf-file conf/flume-avro-sink2.properties &

        扇出:一个对多个
        监控hive的日志:

        创建一个文件 flume-channels-sinks.properties

    hdfs.path = /hive_log/flume1: 需要这样写
    前提:把hadoop的core-site.xml和hdfs-site.xml文件拷贝到flume的conf目录下

    $ cp ../hadoop-2.7.3/etc/hadoop/core-site.xml conf/
    $ cp ../hadoop-2.7.3/etc/hadoop/hdfs-site.xml conf/
    运行:
    $ bin/flume-ng agent --conf conf  --name agent1 --conf-file conf/flume-channels-sinks.properties 

    一个channels对应一个sinks,channels里的数据是一样的

     

    展开全文
  • Flume简介

    2019-03-14 11:01:13
    1、什么是flume flume 是由 cloudera 软件公司产出的可分布式日志收集系统,后与 2009 年被捐赠了 apache 软件基金会, 为hadoop 相关组件之一。 Flume 是一种分布式 , 可靠且可用的服务 , 用于高效地收集 , 汇总...

    1、什么是flume

    flume 是由 cloudera 软件公司产出的可分布式日志收集系统,后与 2009 年被捐赠了 apache 软件基金会, 为hadoop 相关组件之一。
    Flume 是一种分布式 , 可靠且可用的服务 , 用于高效地收集 , 汇总和移动大量日志数据 。 它具有基于流式数据流的简单而灵活的架构 。 它具有可靠的可靠性机制以及许多故障转移和恢复机制 , 具有强大的容错性和容错能力。它使用一个简单的可扩展数据模型,允许在线分析应用程序。

    2、为什么需要flume

    1、当大量的数据在同一个时间要写入HDFS时,每次一个文件被创建或者分配一个新的块,都会在namenode发生很复杂的操作,主节点压力很大,会造成很多问题,比如写入时间严重延迟、写入失败等。
    2、flume是一个灵活的分布式系统,易扩展,高度可定制化。
    3、flume中的核心组件Agent。一个Agent可以连接一个或者多个Agent,可以从一个或者多个Agent上收集数据。多个Agent相互连接,可以建立流作业,在Agent链上,就能将数据从一个位置移动到另一个地方(HDFS、HBase等)。

    3、flume特性

    Flume 是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。
    Flume 可以采集文件,socket 数据包、文件、文件夹、kafka 等各种形式源数据,又可以将采集到的数据(下 下 沉
    sink) 输出到 HDFS 、hbase 、hive 、kafka 等众多外部存储系统中
    对 一般的采集需求,通过对 flume 的简单配置即可实现
    Flume 针对特殊场景也具备良好的自定义扩展能力,因此,flume 可以适用于大部分的日常数据采集场景
    Flume 的管道是基 于事务,保证了数据在传送和接收时的一致性.
    Flume 是可靠的,容错性高的,可升级的,易管理的, 并且可定制的。

    4、Flume Agent简单架构

    简单

    Agent结构

    Agent:Agent是Flume中的核心组件,用来收集数据。一个Agent就是一个JVM进程,它是Flume中最小的独立运行的单元。

    在Agent中有三个组件,分别为Source,Channel,Sink

    Flume Event
    Event:在flume中,event是最小的数据单元,由header和body组成 event 是 是 flume
    中处理消息的基本单元,个 由零个或者多个 header 和正文 body 组成 。 Header 是 是 key/value 形式的
    , 可以用来制造路由决策或携带其他结构化信息( 如事件的时间戳或 事件来源的服务器主机名)和 。你可以把它想象成和 HTTP
    头一样提供相同的功能—— 通过该方法 来传输正文之外的额外信息。 Body 是一 个字节数组 ,包含了实际的内容。 flume
    提供的同 不同 source 会给其生成的 event 添加不同的 header

    Source:数据源。负责将数据捕获后进行特殊的格式化,然后再封装在Event中,再将数据推入到Channel中
    常见类型: :avro 、exec、 jms、spooling directory、source 、kafka 、netcat 等

    Channel:连接source和sink的组件,可以理解为数据缓冲区(数据队列),可以将event暂存在内存上,也可以持久化到本地磁盘上,直到sink消费完。
    常见类型:Memory、JDBC、File等

    Sink:数据下沉。从channel中取出数据后,分发到别的地方,比如HDFS或者HBase等,也可以是其他Agent的source。当日志数据量小的时候,可以将数据存在文件系统中,并设定一定的时间间隔来存储数据。
    常见类型:HDFS、Logger、File Roll、Avro、Thrift、HBase等
    Source、Channel和Sink的类型具体说明参照官网用户指南

    5、多个Agent连接
    多个Agent
    如图,3个Agent连接一个Agent时,要注意的是,3个Agent中Sinks的类型需要统一,因为另一个Agent的Source类型需要统一的数据源类型来接收。

    一个或者多个Agent连接就形成了,流将数据推送到了另一个Agent,最终将数据推送到存储或者索引系统。

    展开全文
  • Flume的简单介绍

    千次阅读 2018-07-09 10:53:27
    要想使用Flume,就需要运行Flume代理。Flume代理是由持续运行的sorce(数据来源)、sink(数据目标)以及channei(用干连接sours。和sink)构成的Java进程。Flume的source产生事件,并将其传送给channel , channel存储这些...
  • Flume概念与原理、与Kafka优势对比

    万次阅读 多人点赞 2018-03-27 11:40:17
    尤其近几年随着flume的不断被完善以及升级版本的逐一推出,特别是flume-ng;同时flume内部的各种组件不断丰富,用户在开发的过程中使用的便利性得到很大的改善,现已成为apache top项目之一.2 .概述 1. 什么是f...
  • Flume简单介绍

    2020-09-11 16:10:13
    还需要数据采集、结果数据导出、任务调度等不可或缺的辅助系统,而这些辅助工具在hadoop生态体系中都有便捷的开源框架,在此,我们首先来介绍下数据采集部分所用的的开源框架——Flume。 一、FLUME概述 Flume是...
  • Flume

    千次阅读 2018-11-04 17:52:16
    离线数据的分析流程 1.案例分析 ...2.需求 web点击流日志,包含网站运行的重要信息,通过日志的分析我们可以知道网站的访问量,哪个网站访问的人数最多,哪个网页有价值,广告转换率,访客的来源信息,访客的终端信息 ...
  • Flume教程(一) Flume入门教程

    万次阅读 2016-04-13 15:22:09
    Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可...
  • Flume架构以及应用介绍

    万次阅读 多人点赞 2016-12-24 10:00:35
    在具体介绍本文内容之前,先给大家看一下Hadoop业务的整体开发流程: ...本文将围绕Flume的架构、Flume的应用(日志采集)进行详细的介绍。 (一)Flume架构介绍 1、Flume的概念 flume是分布式的日志
  • 大数据Flume系列之Flume集群搭建

    万次阅读 2018-08-30 00:50:17
    1. 概念 ...接下来就要实现Flume集群搭建。集群如下图所示。 2. Flume搭建 2.1 部署准备 部署主机 192.168.9.139 host14 192.168.9.128 host15 host14主机下载flume软件包 # cd /opt/t...
  • 1.为什么要有flume? flume的设计宗旨是向hadoop集群批量导入基于事件的海量数据。一个典型的例子就是利用flume从一组web服务器中收集日志文件,然后把这些文件中的日志事件转移到一个新的HDFS汇总文件中以做进一步...
  • Flume学习(一)Flume初始

    千次阅读 2016-09-22 14:36:35
    一、FLUME介绍 Flume是一个分布式、可靠、和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。 设计目标...
  • 1、Flume NG简述 Flume NG是一个分布式,高可用,可靠的系统,它能将不同的海量数据收集,移动并存储到一个数据存储系统中。轻量,配置简单,适用于各种日志收集,并支持 Failover和负载均衡。并且它拥有非常丰富的...
  • flume篇1:flume把json数据写入kudu(flume-kudu-sink) 对应非json数据同样适用,可以把非json数据通过拦截器拼接成一个json send出去,这样也是ok的 废话不多说,直接上干货 一、 自定义拦截器: 1 拦截器要求:...
  • Flume介绍与安装

    万次阅读 2019-08-02 20:19:02
    Flume 1.8.0用户指南 1.介绍 概述 系统要求 2.架构 数据流模型 复杂流动 可靠性 可恢复性 安装 多个Agent 整合 多路复用 配置多Agent流 扇出流 对人工智能感兴趣的同学,可以点击以下链接: 现在人工...
  • Flume整体架构总结

    千次阅读 2018-12-11 10:56:18
    Flume简介: Flume 是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统, Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种...
  • Flume初始篇之flume安装及简单测试

    千次阅读 2018-07-08 16:24:03
    Flume安装及简单测试一、Flume简介 flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用。Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera。但...
1 2 3 4 5 ... 20
收藏数 37,655
精华内容 15,062
关键字:

flume