精华内容
下载资源
问答
  • flume采集日志到kafka配置
    千次阅读
    2019-04-03 16:10:40

    配置文件: vim dir-hdfs.conf

    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    #对于source的配置描述 监听文件中的新增数据 exec
    a1.sources.r1.type = exec
    a1.sources.r1.command  = tail -F /opt/log/access.log            //采集数据的路径
    a1.sources.ri.shell = /bin/sh -c
    #对于sink的配置描述 使用kafka做数据的消费
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.topic = kk8                                                          //采集到哪里
    a1.sinks.k1.brokerList = 192.168.88.11:9092,192.168.88.12:9092,192.168.88.13:9092
    a1.sinks.k1.requiredAcks = 1
    a1.sinks.k1.batchSize = 5
    #对于channel的配置描述 使用内存缓冲区域做数据的临时缓存
    a1.channels.c1.type = memory
    
    #通过channel c1将source r1和sink k1关联起来
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

    启动flume

    ./flume-ng agent -c /usr/local/flume/conf  -f /usr/local/flume/conf/dir-hdfs.conf  -n a1 -Dflume.root.logger=INFO,console 
    
    更多相关内容
  • 涉及的技术有flumeKafka,zookeeper。 操作步骤: 1、构建agent train.sources=trainSource train.channels=trainChannel train.sinks=trainSink train.sources.trainSource.type=spooldir train.sources....

    环境准备:

    涉及到的技术有flume,Kafka,zookeeper。

    操作步骤:

    1、构建agent

    train.sources=trainSource
    train.channels=trainChannel
    train.sinks=trainSink
    
    train.sources.trainSource.type=spooldir
    train.sources.trainSource.spoolDir=/opt/kb15tmp/flumelogfile/train
    train.sources.trainSource.deserializer=LINE
    train.sources.trainSource.deserializer.maxLineLength=320000
    train.sources.trainSource.includePattern=train_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
    train.sources.trainSource.interceptors=head_filter
    train.sources.trainSource.interceptors.head_filter.type=regex_filter
    train.sources.trainSource.interceptors.head_filter.regex=^user*
    train.sources.trainSource.interceptors.head_filter.excludeEvents=true
    
    train.channels.trainChannel.type=file
    train.channels.trainChannel.checkpointDir=/opt/kb15tmp/checkpoint/train
    train.channels.trainChannel.dataDirs=/opt/kb15tmp/checkpoint/data/train
    
    train.sinks.trainSink.type=org.apache.flume.sink.kafka.KafkaSink
    train.sinks.trainSink.batchSize=640
    train.sinks.trainSink.brokerList=192.168.91.180:9092
    train.sinks.trainSink.topic=train
    
    train.sources.trainSource.channels=trainChannel
    train.sinks.trainSink.channel=trainChannel
    

    2、启动Kafka和zookeeper
    启动zookeeperzkServer.sh start
    启动Kafkanohup kafka-server-start.sh /opt/soft/kafka211/config/server.properties &

    3、启动消费者进行消费
    首先先创建主题,kafka-topics.sh --create --zookeeper 192.168.91.180:2181 --topic train --partitions 1 --replication-factor 1
    消费:
    kafka-console-consumer.sh --bootstrap-server 192.168.91.180:9092 --topic train --from-beginning

    4、启动flume
    ./bin/flume-ng agent --name train --conf conf/ --conf-file conf/KB15conf/train.conf -Dflume.root.logger=INFO,console

    5、将需要消费的日志文件拷贝到指定的文件夹下
    cp train.csv /opt/kb15tmp/flumelogfile/train/train_2021-12-27.csv

    展开全文
  • flume采集日志到kafka

    千次阅读 2020-01-05 10:25:23
    这样就开启了日志采集 日志采集完毕之后 flume会提示 如下图: 文件会写入到kafka中 具体路径是kafka配置文件中server.properties里面Log Basics的配置 如下图: 查看文件 数据就写入上图文件中   ...

    一、为flume构建agent

    先进去flume下的配文件夹里面  (此处我的配置文件夹名字为:myconf)  编写构建agent的配置文件(命名为:flume2kafka.conf)

    flume2kafka.conf

    # 定义这个agent中各组件的名字
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
     
    # 描述和配置source组件:r1
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /opt/datas
    a1.sources.r1.fileHeader = true
     
    # 描述和配置sink组件:k1
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = jsonTopic
    a1.sinks.k1.kafka.bootstrap.servers = 127.0.0.1:9092
    a1.sinks.k1.kafka.flumeBatchSize = 20
    a1.sinks.k1.kafka.producer.acks = 1
    a1.sinks.k1.kafka.producer.linger.ms = 1
    a1.sinks.ki.kafka.producer.compression.type = snappy
     
    # 描述和配置channel组件,此处使用是内存缓存的方式
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
     
    # 描述和配置source  channel   sink之间的连接关系
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

    二、启动zookeeper

    sh zkServer.sh start
    

    三、启动kafka的producer

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    

    四、启动flume的agent

    bin/flume-ng agent -c conf -f 配置文件夹名/配置文件名 -n a1 -Dflume.root.logger=INFO,console
    

    五、启动kafka的消费者

    bin/kafka-server-start.sh config/server.properties
    

    topic 和 配置文件flume2kafka.conf里的sink组件中的topic一致

    这样就开启了日志采集  日志采集完毕之后 flume会提示 如下图:

    在这里插入图片描述

    文件会写入到kafka中 具体路径是kafka配置文件中server.properties里面Log Basics的配置  如下图:

    在这里插入图片描述

    查看文件

    在这里插入图片描述

    数据就写入上图文件中

     

    六、遇到的问题

    java.lang.IllegalStateException:File name has been re-used with different files. Spooling assumptions violated for /opt/data/hello.txt.COMPLETED

    跟踪抛出异常的源码,SpoolDirectorySource会启动一个线程轮询监控目录下的目标文件,当读取完该文件(readEvents)之后会对该文件进行重名(rollCurrentFile),当重命名失败时会抛出IllegalStateException,被SpoolDirectoryRunnable catch重新抛出RuntimeException,导致当前线程退出,从源码看SpoolDirectoryRunnable是单线程执行的,因此线程结束后,监控目录下其他文件不再被处理。所以,再新建个 word.txt 文件,flume没有监听动作了。

    七、正确做法

    不要在flume_test文件夹下直接新建文件,写内容。在其他文件下新建,写好内容,mv 到flume_test文件夹下。

    [hadoop@nbdo3 ~]$ cd testdata/
    [hadoop@nbdo3 testdata]$ ll
    total 4
    -rw-rw-r–. 1 hadoop hadoop 71 Mar 10 20:19 hello.txt
    [hadoop@nbdo3 testdata]$ cp hello.txt …/data/
    [hadoop@nbdo3 testdata]$ echo “123456778” >> world.txt
    [hadoop@nbdo3 testdata]$ cp world.txt …/data/
    [hadoop@nbdo3 testdata]$

    ------------- 本文结束 感谢您的阅读😜-------------
    展开全文
  • 自己研究大数据多年,写的一个日志数据采集方案笔记,可快速熟悉FlumeKafka,Hdfs的操作使用,以及相互的操作接口。
  • 之前我们讲过怎么flume日志采集组件,使用非常简单,在这里我们只需要把flume的sink模块换成kafka的模块就可以了。我们去flume的官方网站中找到这段sink配置 我们进入flume官网后点击Documentation–》Flume User ...

    之前我们讲过怎么flume日志采集组件,使用非常简单,在这里我们只需要把flume的sink模块换成kafka的模块就可以了。我们去flume的官方网站中找到这段sink配置
    在这里插入图片描述
    我们进入flume官网后点击Documentation–》Flume User Guide
    在这里插入图片描述
    在这里插入图片描述
    我们只需把这段sink代码写到原来的agent的文件当中即可。

    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = mytopic
    a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
    a1.sinks.k1.kafka.flumeBatchSize = 20
    a1.sinks.k1.kafka.producer.acks = 1
    a1.sinks.k1.kafka.producer.linger.ms = 1
    a1.sinks.k1.kafka.producer.compression.type = snappy
    

    a1.conf

    #bin/flume-ng agent -n a1 -f myagent/a1.conf -c conf -Dflume.root.logger=INFO,console
    #定义agent名, source、channel、sink的名称
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    
    #具体定义source
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /root/training/logs
    
    #定义拦截器,为消息添加时间戳
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
    
    #具体定义channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 100
    
    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = mytopic1
    a1.sinks.k1.kafka.bootstrap.servers = bigdata111:9092,bigdata111:9093
    a1.sinks.k1.kafka.flumeBatchSize = 20
    a1.sinks.k1.kafka.producer.acks = 1
    a1.sinks.k1.kafka.producer.linger.ms = 1
    a1.sinks.k1.kafka.producer.compression.type = snappy
    
    #组装source、channel、sink
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

    我们把上边的这个a1.conf放到之前我们的减压后的flume/myagent下,并使用配置文件的第一行启动flume即可,然后我们就以用source监控文件目录了。我们再打开一个consumer客户端去消费采集到的数据:

    ./kafka-console-consumer.sh --bootstrap-server bigdata111:9092,bigdata111:9093 --topic mytopic1
    

    然后我们在/root/training/logs目录下添加一个data.txt的文件

    I love Beijing
    I love china
    Beijing is the capital of china
    

    然后我们就看到消费者可以消费到这个采集到的消息了
    在这里插入图片描述

    展开全文
  • 2.在flumekafka文件夹下面建立flume1kafka.conf配置文件: # 定义这个agent中各组件的名字 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 描述和配置source组件:r1 a1.sources.r1.type = exec a1.sources.r1...
  • 2020-08-29 Flume采集日志数据到kafka

    千次阅读 2020-08-29 22:26:34
    一、Flume安装 Flume的安装使用可以说非常简单,直接进官网:http://flume.apache.org/ 最新是1.9.0版本,我们选择1.8.0版本下载。 然后在Linux下解压: 配置用户环境变量:(如果有root权限可以配置在/etc/...
  • 使用Flume1.7采集日志到Kafka配置File-Flume-Kafka flume1.7 支持断点续传 flume自定义拦截器使用(日志清洗、日志分类) 配置文件 a1.sources=r1 a1.channels=c1 c2 a1.sinks=k1 k2 configure source a1.sources....
  • Flume实时采集日志到Kafka(极简版)

    万次阅读 2019-04-01 22:04:34
    由于项目采用微服务架构,业务日志文件数量较多,我做了个简单的日志监控服务,先在此分享下日志采集的简单步骤,没有任何花里胡哨 ~(ps:一切数据kafka就非常好解决了!) 一、Flume安装 Flume的安装使用...
  • Flume + kafka + log4j构建日志采集系统,附实例及文档。
  • 学习电商数仓的日志采集时,flume定义了filesource-KafkaChannel-无sink来生产日志放在kafka中,消费日志使用kafkaSource-fileChannel-hdfsSink来将日志hdfs中,好让日后进行相关计算业务。 问题:...
  • 写一个循环脚本制造假数据,通过flume将这些数据下沉到kafka中,然后通过java代码,把这些数据暂存本地临时路径下,然后设置一个定时器,定时将这些数据上传hdfs中,代码如下: 1、消费者:负责将数据放本地...
  • 一、Flume的安装配置 去Flume官网下载Flume安装包,我下载的版本为apache-flume-1.9.0-bin.tar.gz,解压。 (1)修改flume-env.sh。Flume运行在JVM之上,因此安装之前要确保系统安装了JDK,编辑环境配置文件,执行...
  • 讲述如何采用最简单的kafka+flume的方式,实时的去读取oracle中的重做日志+归档日志的信息,从而达到日志文件数据实时写入hdfs中,然后将hdfs中的数据结构化hive中。
  • flumekafka整合——采集实时日志落地hdfs一、采用架构二、 前期准备2.1 虚拟机配置2.2 启动hadoop集群2.3 启动zookeeper集群,kafka集群三、编写配置文件3.1 slave1创建flume-kafka.conf3.2 slave3 创建kafka-...
  • kafkaflume.conf agent.sources = s1 agent.channels = c1 agent.sinks = k1 agent.sources.s1.type=...agent.sources.s1.command=tail -F /usr/local/src/flume/testflume2.log agent.sources.s1.channels=c1 ag...
  • flume采集日志数据到kafka缓存数据

    千次阅读 2020-02-07 17:14:23
    =============== 一 前言 在一个完整的大数据处理系统中,除了hdfs+mapreduce+hive组成分析...flume是由cloudera软件公司产出的可分布式日志收集系统,后与2009年被捐赠了apache软件基金会,为hadoop相关组件之一...
  • Welcome to Apache Flume! Apache Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible ...
  • package com.zpark.kafka; import java.io.*; import java.net.URI; import java.net.URISyntaxException; import java.util.Collections; import java.util.Properties; import org.apach...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 8,419
精华内容 3,367
关键字:

flume采集日志到kafka

友情链接: myusb.rar