精华内容
下载资源
问答
  • flume采集数据

    2020-12-10 11:49:09
    今天写了一个flume的配置文件采集日志文件(json格式),将日志文件采集到kafka,结果在kafkatools工具中发现采集的每一条数据前面都多出了两个字节的乱码,显然后面的操作都是以json格式进行处理,数据被阻塞在...

    今天写了一个flume的配置文件采集日志文件(json格式),将日志文件采集到kafka,结果在kafkatools工具中发现采集的每一条数据前面都多出了两个字节的乱码,显然后面的操作都是以json格式进行处理,数据被阻塞在kafka中。

    后来进行了反复检查,发现问题还是出现在flume配置文件中。。。。。。。。。。。。
    在这里插入图片描述
    在这里配置是否以event格式写入kafka时,多加了kafka,flume读取数据时还是以默认设置以event格式读入数据,结果event中的header信息保存到kafka后变成了乱码,后续操作以json格式为基础都会报错。

    展开全文
  • flume 采集数据到hdfs

    2015-10-09 21:57:52
    前言:在两台机器上做flume 采集数据实验:hadoop05上安装flume 1.5.0版本,hadoop07上安装hadoop2.2.0版本 一、安装  前提:flume是依赖jdk,所以需要安装jdk,这里就不多说,jdk 安装目录/usr/local/...



    前言:在两台机器上做flume 采集数据实验:hadoop05上安装flume 1.5.0版本,hadoop07上安装hadoop2.2.0版本






    一、安装
        前提:flume是依赖jdk,所以需要安装jdk,这里就不多说,jdk 安装目录/usr/local/jdk1.6.0_45
        下载安装文件:apache-flume-1.5.0-bin.tar.gz,上传到linux 目录:/usr/local/
    解压文件tar -zvxf apache-flume-1.5.0-bin.tar.gz 得到apache-flume-1.5.0-bin ,以下称为FLUME_HOME
    二、修改配置文件
        cd apache-flume-1.5.0-bin/conf 下 
        mv flume-env.sh.template flume-env.sh
        vim flume-env.sh 添加jdk 配置:JAVA_HOME=/usr/local/jdk1.6.0_45 保存退出
    三、建立配置文件,这个配置文件是flume :source、channel、sink三大组件的配置,取名为example.conf ,内容如下:
    #定义agent名, source、channel、sink的名称
    example.sources = r1
    example.channels = c1
    example.sinks = k1




    #具体定义source
    example.sources.r1.type = spooldir
    example.sources.r1.spoolDir = /home/hadoop/logs




    #具体定义channel
    example.channels.c1.type = memory
    example.channels.c1.capacity = 10000
    example.channels.c1.transactionCapacity = 100




    #定义拦截器,为消息添加时间戳
    example.sources.r1.interceptors = i1
    example.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder








    #具体定义sink
    example.sinks.k1.type = hdfs
    example.sinks.k1.hdfs.path = hdfs://hadoop07:9000/flume/%Y%m%d
    example.sinks.k1.hdfs.filePrefix = events-
    example.sinks.k1.hdfs.fileType = DataStream
    #不按照条数生成文件
    example.sinks.k1.hdfs.rollCount = 0
    #HDFS上的文件达到128M时生成一个文件
    example.sinks.k1.hdfs.rollSize = 134217728
    #HDFS上的文件达到60秒生成一个文件
    example.sinks.k1.hdfs.rollInterval = 60




    #组装source、channel、sink
    example.sources.r1.channels = c1
    example.sinks.k1.channel = c1
      
    jiang
    四、拷贝需要hadoop 的jar包,因为是收集数据到hdfs,需要调用到hdfs 客户端接口,所以需要以下几个jar
          在hadoop 解压之后的相对路径:(这里是使用hadoop2.2.0版本)
         --hadoop-2.2.0/share/hadoop
                                                        --common/hadoop-common-2.2.0.jar
                                                        --common/lib/common-configuration-1.6.jar
                                                        --common/lib/hadoop-auth-2.2.0.jar
                                                       --hdfs/hadoop-hdfs-2.2.0.jar
      到${FLUME_HOME}/lib下
    五、通知flume 知道hadoop的信息
         要让flume 知道hadoop的配置,所以需要拷贝hadoop 的两个核心配置文件:core-site.xml 和hdfs-site.xml到
       ${FLUME_HOME}/conf下


    六、启动hadoop
    所有的配置都已经准备好,  启动hadoop07 机器上的hadoop ,根据example.conf 中的配置而定,我这里用的是hadoop07,为了防止错误,建立我们本机的监听目录:
        /home/hadoop/logs,
    七、启动flume
         所用的前提已经准备好,此时最后一步,启动flume:
        ${FLUME_HOME}/bin/flume-ng agent -n example -c conf -f conf/exmple.conf -Dflume.root.logger=INFO,console


      擦数说明:agent -n example     代理对象
                           -c conf        配置文件的地址目录      
                          -f  conf/exmple.conf        指制定配置文件
                          -Dflume.root.logger=INFO,console     日志级别为INFO ,打印在控制台上


    八、校验
       将拷贝文件     cp /data/flume_test.txt  /home/hadoop/logs 下
       flume 会将其采取,到hdfs 中查看是否有多了一个flume 文件


    展开全文
  • flume采集数据到hdfs、kafka配置文件 执行命令 nohup bin/flume-ng agent -n a10 -c conf/ -f ./conf/server/flume-taildir-kafka.conf -Dflume.root.logger=INFO,console >> ./logs/fflume-taildir-kafka....

    flume采集数据到hdfs、kafka配置文件

    • 执行命令
    nohup bin/flume-ng agent -n a10 -c conf/ -f ./conf/server/flume-taildir-kafka.conf -Dflume.root.logger=INFO,console >> ./logs/fflume-taildir-kafka.log 2>&1 &
    
    • 采集日志文件到kafka
      flume-taildir-kafka.conf
    #agent
    a10.sources = r1
    a10.channels = c1
    
    #source
    a10.sources.r1.type = TAILDIR
    a10.sources.r1.filegroups = f1
    #读取的文件路径
    a10.sources.r1.filegroups.f1 = /data1/onepiece-recommender/logs/statistics.log
    #通过 json 格式存下每个文件消费的偏移量,避免从头消费
    a10.sources.r1.positionFile = /usr/local/flume/logs/taildir_position_event_log.json
    a10.sources.r1.fileHeader = false
    #batchSize
    a10.sources.r1.batchSize = 1000
    
    #channel
    a10.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
    a10.channels.c1.kafka.bootstrap.servers = 10.70.110.70:9092,10.70.110.71:9092,10.70.110.82:9092
    a10.channels.c1.kafka.topic = es-service-cost-monitor
    a10.channels.c1.kafka.consumer.group.id = flume-consumer-event-001
    a10.channels.c1.parseAsFlumeEvent = false
    a10.channels.c1.kafka.producer.compression.type = lz4
    a10.channels.c1.kafka.flumeBatchSize = 5000
    a10.channels.c1.kafka.producer.acks = all
    a10.channels.c1.kafka.producer.linger.ms = 30
    
    #关联关系
    a10.sources.r1.channels = c1
    

    采集日志文件到hdfs

    #agent
    a10.sources = r1
    a10.channels = c1
    a10.sinks = k1
    
    #source
    a10.sources.r1.type = TAILDIR
    a10.sources.r1.filegroups = f1
    #读取的文件路径
    a10.sources.r1.filegroups.f1 = /usr/local/nginx/logs/action_data/access.log
    #通过 json 格式存下每个文件消费的偏移量,避免从头消费
    a10.sources.r1.positionFile = /home/user/apache-flume-1.9.0-bin/logs/taildir_position_event_log.json
    a10.sources.r1.fileHeader = false
    #batchSize
    a10.sources.r1.batchSize = 1000
    
    #channel
    a10.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
    a10.channels.c1.kafka.bootstrap.servers = 10.10.100.124:9092
    a10.channels.c1.kafka.topic = inference_raw_log
    a10.channels.c1.kafka.consumer.group.id = flume-consumer-eventlog-001
    a10.channels.c1.parseAsFlumeEvent = false
    a10.channels.c1.kafka.producer.compression.type = lz4
    a10.channels.c1.kafka.flumeBatchSize = 5000
    a10.channels.c1.kafka.producer.acks = all
    a10.channels.c1.kafka.producer.linger.ms = 30
    
    
    
    a10.sinks.k1.type = hdfs
    a10.sinks.k1.hdfs.path =hdfs://10.10.100.124:9000/data/action_log/%y%m%d/
    a10.sinks.k1.hdfs.filePrefix = access_log
    a10.sinks.k1.hdfs.maxOpenFiles = 5000
    a10.sinks.k1.hdfs.batchSize= 100
    a10.sinks.k1.hdfs.fileType = DataStream
    a10.sinks.k1.hdfs.writeFormat =Text
    a10.sinks.k1.hdfs.rollSize = 102400
    a10.sinks.k1.hdfs.rollCount = 1000000
    a10.sinks.k1.hdfs.rollInterval = 60
    a10.sinks.k1.hdfs.round = true
    a10.sinks.k1.hdfs.roundValue = 10
    a10.sinks.k1.hdfs.roundUnit = minute
    a10.sinks.k1.hdfs.useLocalTimeStamp = true
    
    #关联关系
    a10.sources.r1.channels = c1
    a10.sinks.k1.channel = c1
    
    
    展开全文
  • 方案一本方案的核心是flume采集数据后,按照hive表的结构,将采集数据输送到对应的地址中,达到数据实时存储的目的,这种实时实际上是一种准实时。假设hadoop集群已经正常启动,hive也已经正常启动,并且hive的文件...

    说明:本文不仅提供两种方案,还详细的记录了一些相关信息。

    方案一

    本方案的核心是flume采集数据后,按照hive表的结构,将采集数据输送到对应的地址中,达到数据实时存储的目的,这种实时实际上是一种准实时。

    假设hadoop集群已经正常启动,hive也已经正常启动,并且hive的文件地址是/hive/warehouse,然后hive里存在一张由以下建表语句创建的表

    create table flume_test(uuid string);

    可推断,表flume_test地址在/hive/warehouse/flume_test,下面介绍flume:

    flume安装步骤

    #下载

    cd /opt

    mkdir flume

    wget http://archive.apache.org/dist/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz

    tar xvzf apache-flume-1.6.0-bin.tar.gz

    cd apache-flume-1.6.0-bin/conf

    cp flume-env.sh.template flume-env.sh

    打开flume-env文件,添加java变量

    export JAVA_HOME=/usr/java/jdk1.8.0_111

    然后添加环境变量,为了一次过,分别在profile和bashrc末尾添加

    export FLUME_HOME=/opt/flume/apache-flume-1.6.0-bin

    export FLUME_CONF_DIR=$FLUME_HOME/conf

    export PATH=$PATH:$FLUME_HOME/bin

    然后

    source /etc/profile

    到此flume安装完毕,下面进行配置,切换到conf文件夹复制flume-conf.properties.template为agent.conf,然后编辑

    #定义活跃列表

    agent.sources=avroSrc

    agent.channels=memChannel

    agent.sinks=hdfsSink

    #定义source

    agent.sources.avroSrc.type=avro

    agent.sources.avroSrc.channels=memChannel

    agent.sources.avroSrc.bind=0.0.0.0

    agent.sources.avroSrc.port=4353

    agent.sources.avroSrc.interceptors=timestampinterceptor

    agent.sources.avroSrc.interceptors.timestampinterceptor.type=timestamp

    agent.sources.avroSrc.interceptors.timestampinterceptor.preserveExisting=false

    #定义channel

    agent.channels.memChannel.type=memory

    agent.channels.memChannel.capacity = 1000

    agent.channels.memChannel.transactionCapacity = 100

    #定义sink

    agent.sinks.hdfsSink.type=hdfs

    agent.sinks.hdfsSink.channel=memChannel

    #agent.sinks.hdfsSink.hdfs.path=hdfs://hadoop-n:9000/flume/test/%{topic}/%Y%m%d%H

    agent.sinks.hdfsSink.hdfs.path=hdfs://hadoop-n:9000/hive/warehouse/flume_test

    agent.sinks.hdfsSink.hdfs.filePrefix=stu-flume

    agent.sinks.hdfsSink.hdfs.inUsePrefix=inuse-stu-flume

    agent.sinks.hdfsSink.hdfs.inUseSuffix=.temp

    agent.sinks.hdfsSink.hdfs.rollInterval=0

    agent.sinks.hdfsSink.hdfs.rollSize=10240000

    agent.sinks.hdfsSink.hdfs.rollCount=0

    agent.sinks.hdfsSink.hdfs.idleTimeout=0

    agent.sinks.hdfsSink.hdfs.batchSize=100

    agent.sinks.hdfsSink.hdfs.minBlockReplicas=1

    # agent.sinks.hdfsSink.hdfs.writeFormat = Text

    agent.sinks.hdfsSink.hdfs.fileType = DataStream

    具体的每一项配置可参照下面这篇博客http://lxw1234.com/archives/2015/10/527.htm,需要警惕的是rollInterval、rollSize、rollCount、idleTimeout这四个属性,如果进行了配置发现不起作用,就要检查一下minBlockReplicas这个属性是否配置,并且值是否是1,下面这个连接是原因http://doc.okbase.net/chiweitree/archive/126197.html

    配置完毕后可以启动,启动命令

    ./flume-ng agent -f ../conf/agent.conf -n agent -c conf -Dflume.monitoring.type=http \-Dflume.monitoring.port=5653 -Dflume.root.logger=DEBUG,console

    注意:-n 指的是agent的名称,需要对应到配置文件的第一个值,本启动命令还开启了监控,监控地址http://host:5653/metrics;-f 指的是配置文件的路径及名称。flume的conf修改后不用重启,默认30秒刷新一次,自动装载最新的配置。

    flume安装并启动完毕后,编写测试程序。打开eclipse,创建maven项目

    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    4.0.0

    scc

    stu-flume

    0.0.1-SNAPSHOT

    war

    stu-flume

    log4j

    log4j

    1.2.9

    org.apache.flume.flume-ng-clients

    flume-ng-log4jappender

    1.6.0

    测试servlet

    public class GenerLogServlet extends HttpServlet {

    private static final Logger LOGGER = Logger.getLogger(GenerLogServlet.class);

    private static final long serialVersionUID = 1L;

    @Override

    protected void doGet(HttpServletRequest request, HttpServletResponse response)

    throws ServletException, IOException {

    for (;;) {

    LOGGER.info(UUID.randomUUID().toString());

    try {

    Thread.sleep(100);

    } catch (InterruptedException e) {

    // TODO Auto-generated catch block

    e.printStackTrace();

    }

    }

    }

    @Override

    protected void doPost(HttpServletRequest request, HttpServletResponse response)

    throws ServletException, IOException {

    this.doGet(request, response);

    }

    }

    log4j.properties

    #log4j settings

    #log4j.rootLogger=debug, CONSOLE

    log4j.logger.scc.stu_flume.GenerLogServlet=debug,GenerLogServlet

    #log4j.rootLogger=INFO

    log4j.appender.GenerLogServlet=org.apache.flume.clients.log4jappender.Log4jAppender

    log4j.appender.GenerLogServlet.Hostname=10.5.3.100

    log4j.appender.GenerLogServlet.Port=4353

    log4j.appender.GenerLogServletUnsafeMode=false

    启动项目,访问http://localhost:8080/log开始生产数据。需要注意的是,如果flume配置基于时间戳做文件分组(此种情况可以匹配hive根据时间进行分区),那么需要agent.conf中的source一定要配置

    agent.sources.avroSrc.interceptors=timestampinterceptor

    agent.sources.avroSrc.interceptors.timestampinterceptor.type=timestamp

    agent.sources.avroSrc.interceptors.timestampinterceptor.preserveExisting=false

    否则flume的sink会报找不到timestamp错误,因为源码org.apache.flume.clients.log4jappender.Log4jAvroHeaders中定义timestamp的key是flume.client.log4j.timestamp而不是timestamp,所以需要手动添加一个timestamp,如果对这个timestamp要求必须是数据生产的时间,可以修改源码或者为source添加拦截器手动配置。

    flume具有非常灵活的使用方式,可以自定义source、sink、拦截器、channel选择器等等,适应绝大部分采集、数据缓冲等场景。

    观察hadoop目录,发现flume已经按配置将数据移动到相应的hive表目录中,如下图:

    962aa00e87e6192f482745f2471bfbdb.png

    打开hive客户端,数据查询命令,发现数据已可被查询!并且针对hive的分区表和桶表flume都可以实现按照hive表数据规则写入,进而达到数据实时插入,至此,方案一结束。

    本方案缺点:

    由于flume在写入文件的时候,独占正在写入的文件资源,导致hive不能读取正在被写入的文件的内容,也就是说假如每5分钟生成一个文件,那么正在写的文件不会被hive读取到内容,也就意味了hive存在最大5分钟的延迟。而如果把时间变小,那么延迟就会降低,但是哪怕是设置30分钟或1个小时,flume流量不大的情况下,也会生成许多零散的小文件,这点与hive的特长相悖,hive擅长处理大文件,对于零散小文件hive性能会降低很多。

    方案二

    对比方案一,测试程序、source不变,sink改成hbase-sink,数据实时插入到hbase中,然后在hive建立一张hbase映射表,hive从hbase中读取数据,这样可达到实时插入的效果。由于字数限制,方案二记录在如下博客连接中:

    展开全文
  • Flume采集数据会丢失吗? 不会,Channel存储可以存储在File中,数据传输自身有事务。
  • tail使用方法 tail -Ftest.log 你会看到屏幕不断有内容被打印出来. 这时候中断第一个进程Ctrl-C ...flume采集数据不丢 当我们用flume采集一个实时文件的时候,我们的flume的source通常是这么...
  • 解决Flume采集数据时在HDFS上产生大量小文件的问题 flume指定HDFS类型的Sink时,采集数据至HDFS指定目录,会产生大量小文件。 解决方案: 去掉round时间系列参数,并将rollSize和rollCount置0, 表示不根据临时文件...
  • Flume采集数据利器

    2020-03-22 19:18:15
    Apache Flume 是一个分布式、高可靠、高可用的用来收集、聚合、转移不同来源的大量日志数据到中央数据仓库的工具。Flume 可以做离线也可以做实时分析。 二、Flume架构 如图所示: Agent就是Flume的一个部署实例,一...
  • 今天遇到这样的一个问题,flume拉取kafka数据,下沉到hdfs中,然后存取到hive中。...后来查询资料说,flume采集数据下沉到hdfs,有默认的文件格式, hdfs.fileType默认为SequenceFile,将其改为DataSt
  • Flume采集数据,在生成的HDFS文件中,总是有“SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable??H謺NSA???y”信息,在Flume文档中介绍,hdfs.fileType默认为SequenceFile,将其改为...
  • 1. 搭建flume 用来监控日志生成目录,将日志数据sink到kafka 2. kafka 存储数据,方便后续flume消费。另外也可以供spark streaming 消费。 3. 消费flume,消费kafka的数据,然后sink到hdfs 二、步骤 1.启动...
  • 概述  Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量...一般的采集需求,通过对 flume 的简单配置即可实现。针对特殊场景也具备 良好的自定义扩展能力。因此,flume 可以适用于大部分的日常数...
  • flume采集文件数据报错: flume版本1.7 kafka版本0.1.0 求大佬们帮忙解决一下## 标题
  • flume采集数据易出现的bug

    千次阅读 2018-08-08 10:30:40
    1.内存不足 修改文件内容 <文件根目录>/bin/flume-ng JAVA_OPTS="-Xms100m ...2.采集kafka数据或者生产kafka数据的的时候默认数据大小是1M,所以使用flume工具导数据如果大于1M,需要添加配置参数 配置文件...
  • Flume采集数据到kafka

    2019-07-08 10:26:51
    a1.sources = r1 a1.sinks = k1 a1.channels = c1 #对于source的配置描述 监听..../flume-ng agent -c /usr/local/flume/conf -f /usr/local/flume/conf/dir-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
  • flume采集数据到kafka和hive

    千次阅读 2017-06-20 17:33:08
    flume加载数据hive sink;kafka sink
  • flume采集数据的时候报异常

    万次阅读 2019-03-10 21:28:47
    WARN - org.apache.flume.sink.hdfs.BucketWriter.callWithTimeout(BucketWriter.java:735)] Unexpected Exception null java.lang.InterruptedException at java.util.concurrent.FutureTask.awaitDone(FutureTas....
  • 在一次实验过程中,使用flume 1.7采集本地的数据到hdfs文件系统时,由于配置文件不合理,导致出错。错误如下: [WARN - org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:611...
  • 用tail命令获取数据,下沉到hdfs 启动命令: bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1 ######## # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 #...
  • Flume采集日志数据到Kafka过程中Flume配置文件写法,Kafka topic创建、消费者查看结果
  • @flume从Windows采集数据传给服务器flume再上传给HDFS 从该博主学习到具体方法配置 Windows flume conf 文件: a1.sources = r1 a1.sinks = k1 a1.channels = c1 Describe/configure the source a1.sources.r1.type =...
  • flume采集数据到hdfs

    万次阅读 2015-03-03 23:39:33
    说明:flume1.5,hadoop2.2 1、配置JAVA_HOME和HADOOP_HOME 说明:HADOOP_HOME用于获取flume操作hdfs所需的jar和配置文件,如果不配置,也可以手动拷贝jar包和配置文件 2、解压flume,执行bin目录下的flume-...
  • 本人目前遇到flume采集写入hdfs性能等各种问题,大致如下。在10上的xx/xx目录下的数据进行读取 sink到08上的flume 由08上的flume写到07的hdfs上 30多m的文件写了好久。有时候会内存溢出等问题![图片说明]...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 3,268
精华内容 1,307
关键字:

flume采集数据