精华内容
下载资源
问答
  • flume连接hdfs和kafka

    2019-06-21 16:02:05
    flume连接hdfs和kafka练习 一、flume连接hdfs情况 选型是source端使用taildir channel端使用file channel sink端使用hdfs 脚本文件如图: 运行flume-ng,如下: 往flume 监控的文件taildir_test.data写入数据: ...

    flume连接hdfs和kafka练习

    一、flume连接hdfs情况

    选型是source端使用taildir
    channel端使用file channel
    sink端使用hdfs

    脚本文件如图:
    在这里插入图片描述

    运行flume-ng,如下:
    在这里插入图片描述
    往flume 监控的文件taildir_test.data写入数据:
    在这里插入图片描述
    等待控制台的输出命令:
    在这里插入图片描述
    登录HDFS查看,发现已经写入:
    在这里插入图片描述
    肯定有人要问了:memory channel这么好用 为什么不用啊,我的回答是:我就是练习使用file channel

    二、flume连接kafka

    类型选择:flume source端使用 taildir
    channel端使用file channel
    sink端使用kafka

    脚本文件如下:
    +
    我的集群是三台机器,node02,node03,node04都已经部署好了zookeeper,接着三个节点启动kafka服务:
    ./kafka-server-start.sh …/config/server.properties
    接着在node03上启动消费者:
    ./kafka-console-consumer.sh --bootstrap-server node02:9091 node03:9092 \ node04:9092 --topic flume_topic --from-beginning
    启动flume-ng :
    ./flume-ng agent --name a1 --conf …/conf --conf-file …/conf/taildir_fileM_kafka.conf \ -Dflume.root.logger=INFO,console

    接下来就是往flume监控的文件下写入数据:
    在这里插入图片描述
    在这里插入图片描述
    接着就能到node03的控制台看到输出的数据了:
    在这里插入图片描述

    思考

    我在create topic的时候,遇到这样的一个错误:
    Error while executing topic command : replication factor: 1 larger than available brokers: 0
    网上查了很多,众说纷纭,看得我眼花缭乱,好多粘贴复制的都说是kafka服务没起来,可是我的早就起来的。这就很神奇了,解决过程如下:
    我创建topic的指令是:
    ./kafka-topics.sh --create --zookeeper node02:2181/kafka,node03:2181/kafka, node04:2181/kafka --partitions 3 --replication-factor 3 --topic kafka_topic

    我跑去官网看了看。它是这样的:
    在这里插入图片描述
    这我就郁闷了,后来我试着这样改:
    /kafka-topics.sh --create --zookeeper node02:2181/kafka node03:2181/kafka node04:2181/kafka --partitions 3 --replication-factor 3 --topic kafka_topic
    发现不同了没有?把逗号去掉,这样就成功创建了topic

    写在后面的话:还是要多练习才能完全掌握知识

    展开全文
  • flume-NG整合hdfs和kafka

    千次阅读 2017-06-08 15:35:22
    flume版本:apache-flume-1.7.0-bin.tar hadoop版本:hadoop-2.7.3 ... 最近在安装搭建flume和kafka这两款软件,网上有很多这方面的简介,在这里,我把flume—NG和kafkahdfs整合在一起。flume作为消息采集

    flume版本:apache-flume-1.7.0-bin.tar
    hadoop版本:hadoop-2.7.3
    kafka版本:kafka_2.11-0.10.2.1
    zookeeper版本:zookeeper-3.4.6
    最近在安装搭建flume和kafka这两款软件,网上有很多这方面的简介,在这里,我把flume—NG和kafka、hdfs整合在一起。flume作为消息采集和传输系统,将数据落地到hdfs进行备份,然后就是kafka作为消息中间件为spark-streaming提供数据支持。当我们搜集某个网站的日志的时候,我们就可以使用flume监控log的一个文件或者是一个目录,每当有新的log,flume就可以将其持久化到hdfs,然后将这个消息发给kafka,kafka在对消息进行分发,处理,实时计算等等。
    在这里我准备了5台服务器,为了方便介绍,我画了一幅图,不是很好看,意思到位就行。
    这里写图片描述
    配置文件:
    flume-kafka-hdfs-client.properties

    # set agent name
    agent.sources = r1  
    agent.channels = c_kafka c_hdfs  
    agent.sinks = s_kafka_k1 s_kafka_k2 s_kafka_k3 s_hdfs_k1 s_hdfs_k2
    
    # set group
    agent1.sinkgroups = g_kafka g_hdfs
    
    # set sources
    agent.sources.r1.channels = c_kafka c_hdfs  
    agent.sources.r1.type = exec  
    agent.sources.r1.command = tail -F /root/logs/a.txt 
    agent.sources.r1.inputCharset = UTF-8
    
    # set kafka channels
    agent.channels.c_kafka.type = memory
    agent.channels.c_kafka.capacity = 1000
    agent.channels.c_kafka.transactionCapacity = 100
    
    # set hdfs channels
    agent.channels.c_hdfs.type = memory
    agent.channels.c_hdfs.capacity = 1000
    agent.channels.c_hdfs.transactionCapacity = 100
    
    # set kafka sink1
    agent.sinks.s_kafka_k1.channel = c_kafka
    agent.sinks.s_kafka_k1.type = avro
    agent.sinks.s_kafka_k1.hostname = 192.168.183.103
    agent.sinks.s_kafka_k1.port = 52021
    
    # set kafka sink2
    agent.sinks.s_kafka_k2.channel = c_kafka
    agent.sinks.s_kafka_k2.type = avro
    agent.sinks.s_kafka_k2.hostname = 192.168.183.104
    agent.sinks.s_kafka_k2.port = 52021
    
    # set kafka sink3
    agent.sinks.s_kafka_k3.channel = c_kafka
    agent.sinks.s_kafka_k3.type = avro
    agent.sinks.s_kafka_k3.hostname = 192.168.183.105
    agent.sinks.s_kafka_k3.port = 52021
    
    # set hdfs sink1
    agent.sinks.s_hdfs_k1.channel = c_hdfs
    agent.sinks.s_hdfs_k1.type = avro
    agent.sinks.s_hdfs_k1.hostname = 192.168.183.102
    agent.sinks.s_hdfs_k1.port = 52020
    
    # set hdfs sink2
    agent.sinks.s_hdfs_k1.channel = c_hdfs
    agent.sinks.s_hdfs_k1.type = avro
    agent.sinks.s_hdfs_k1.hostname = 192.168.183.103
    agent.sinks.s_hdfs_k1.port = 52020
    
    # set sink group
    agent.sinkgroups.g_kafka.sinks = s_kafka_k1 s_kafka_k2 s_kafka_k3
    agent.sinkgroups.g_hdfs.sinks = s_hdfs_k1 s_hdfs_k2
    
    # set failover_kafka
    agent.sinkgroups.g_kafka.processor.type = failover
    agent.sinkgroups.g_kafka.processor.priority.s_kafka_k1 = 1
    agent.sinkgroups.g_kafka.processor.priority.s_kafka_k2 = 10
    agent.sinkgroups.g_kafka.processor.priority.s_kafka_k3 = 100
    agent.sinkgroups.g_kafka.processor.maxpenalty = 10000
    
    # set failover_hdfs
    agent.sinkgroups.g_hdfs.processor.type = failover
    agent.sinkgroups.g_hdfs.processor.priority.s_hdfs_k1 = 1
    agent.sinkgroups.g_hdfs.processor.priority.s_kafka_k2 = 10
    agent.sinkgroups.g_hdfs.processor.maxpenalty = 10000 

    flume-hdfs-server1.properties

    #set Agent name
    hdfs1.sources = r1
    hdfs1.channels = c1
    hdfs1.sinks = k1
    
    #set channel
    hdfs1.channels.c1.type = memory
    hdfs1.channels.c1.capacity = 1000
    hdfs1.channels.c1.transactionCapacity = 100
    
    # set sources
    hdfs1.sources.r1.type = avro
    hdfs1.sources.r1.bind = 192.168.183.102
    hdfs1.sources.r1.port = 52020
    hdfs1.sources.r1.channels = c1
    
    #set sink to hdfs
    hdfs1.sinks.k1.type=hdfs
    hdfs1.sinks.k1.hdfs.path=hdfs://192.168.183.101:9000/flume/logs/%Y/%m/%d
    hdfs1.sinks.k1.hdfs.fileType=DataStream
    hdfs1.sinks.k1.hdfs.writeFormat=TEXT
    hdfs1.sinks.k1.custom.encoding = UTF-8
    hdfs1.sinks.k1.channel=c1
    hdfs1.sinks.k1.hdfs.filePrefix=%Y-%m-%d
    hdfs1.sinks.k1.hdfs.fileSuffix=.txt
    hdfs1.sinks.k1.hdfs.rollInterval=60
    hdfs1.sinks.k1.hdfs.rollSize=1024
    hdfs1.sinks.k1.hdfs.rollCount=0
    hdfs1.sinks.k1.hdfs.idleTimeout=60
    hdfs1.sinks.k1.hdfs.useLocalTimeStamp = true

    flume-hdfs-server2.properties

    #set Agent name
    hdfs2.sources = r1
    hdfs2.channels = c1
    hdfs2.sinks = k1
    
    #set channel
    hdfs2.channels.c1.type = memory
    hdfs2.channels.c1.capacity = 1000
    hdfs2.channels.c1.transactionCapacity = 100
    
    # set sources
    hdfs2.sources.r1.type = avro
    hdfs2.sources.r1.bind = 192.168.183.103
    hdfs2.sources.r1.port = 52020
    hdfs2.sources.r1.channels = c1
    
    #set sink to hdfs
    hdfs2.sinks.k1.type=hdfs
    hdfs2.sinks.k1.hdfs.path=hdfs://192.168.183.101:9000/flume/logs/%Y/%m/%d
    hdfs2.sinks.k1.hdfs.fileType=DataStream
    hdfs2.sinks.k1.hdfs.writeFormat=TEXT
    hdfs2.sinks.k1.custom.encoding = UTF-8
    hdfs2.sinks.k1.channel=c1
    hdfs2.sinks.k1.hdfs.filePrefix=%Y-%m-%d
    hdfs2.sinks.k1.hdfs.fileSuffix=.txt
    hdfs2.sinks.k1.hdfs.rollInterval=60
    hdfs2.sinks.k1.hdfs.rollSize=1024
    hdfs2.sinks.k1.hdfs.rollCount=0
    hdfs2.sinks.k1.hdfs.idleTimeout=60
    hdfs2.sinks.k1.hdfs.useLocalTimeStamp = true

    flume-kafka-server1.properties

    #set kafka1 name
    kafka1.sources = r1
    kafka1.channels = c1
    kafka1.sinks = k1
    
    #set channel
    kafka1.channels.c1.type = memory
    kafka1.channels.c1.capacity = 10000
    kafka1.channels.c1.transactionCapacity = 1000
    
    # set sources
    kafka1.sources.r1.type = avro
    kafka1.sources.r1.bind = 192.168.183.103
    kafka1.sources.r1.port = 52021
    kafka1.sources.r1.channels = c1
    
    # set sink to kafka
    kafka1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink  
    kafka1.sinks.k1.metadata.broker.list= node3:9092,node4:9092,node5:9092
    kafka1.sinks.k1.kafka.bootstrap.servers = node3:9092,node4:9092,node5:9092
    kafka1.sinks.k1.partition.key=0  
    kafka1.sinks.k1.partitioner.class=org.apache.flume.plugins.SinglePartition  
    kafka1.sinks.k1.serializer.class=kafka.serializer.StringEncoder  
    kafka1.sinks.k1.request.required.acks=0  
    kafka1.sinks.k1.max.message.size=1000000  
    kafka1.sinks.k1.producer.type=sync
    kafka1.sinks.k1.custom.encoding=UTF-8  
    #kafka1.sinks.k1.custom.topic.name=test
    kafka1.sinks.k1.kafka.topic=test
    kafka1.sinks.k1.channel = c1
    kafka1.sinks.k1.zkconnect = node1:2181,node2:2181,node3:2181,node4:2181,node5:2181

    flume-kafka-server2.properties

    #set kafka2 name
    kafka2.sources = r1
    kafka2.channels = c1
    kafka2.sinks = k1
    
    #set channel
    kafka2.channels.c1.type = memory
    kafka2.channels.c1.capacity = 10000
    kafka2.channels.c1.transactionCapacity = 1000
    
    # set sources
    kafka2.sources.r1.type = avro
    kafka2.sources.r1.bind = 192.168.183.104
    kafka2.sources.r1.port = 52021
    kafka2.sources.r1.channels = c1
    
    # set sink to kafka
    kafka2.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink  
    kafka2.sinks.k1.metadata.broker.list= node3:9092,node4:9092,node5:9092  
    kafka2.sinks.k1.kafka.bootstrap.servers = node3:9092,node4:9092,node5:9092
    kafka2.sinks.k1.partition.key=0  
    kafka2.sinks.k1.partitioner.class=org.apache.flume.plugins.SinglePartition  
    kafka2.sinks.k1.serializer.class=kafka.serializer.StringEncoder  
    kafka2.sinks.k1.request.required.acks=0  
    kafka2.sinks.k1.max.message.size=1000000  
    kafka2.sinks.k1.producer.type=sync 
    kafka2.sinks.k1.custom.encoding=UTF-8  
    #kafka2.sinks.k1.custom.topic.name=test
    kafka2.sinks.k1.kafka.topic=test
    kafka2.sinks.k1.channel = c1
    kafka2.sinks.k1.zkconnect = node1:2181,node2:2181,node3:2181,node4:2181,node5:2181

    flume-kafka-server3.properties

    #set kafka3 name
    kafka3.sources = r1
    kafka3.channels = c1
    kafka3.sinks = k1
    
    #set channel
    kafka3.channels.c1.type = memory
    kafka3.channels.c1.capacity = 10000
    kafka3.channels.c1.transactionCapacity = 1000
    
    # set sources
    kafka3.sources.r1.type = avro
    kafka3.sources.r1.bind = 192.168.183.105
    kafka3.sources.r1.port = 52021
    kafka3.sources.r1.channels = c1
    
    # set sink to kafka
    kafka3.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink 
    kafka3.sinks.k1.metadata.broker.list= node3:9092,node4:9092,node5:9092 
    kafka3.sinks.k1.kafka.bootstrap.servers = node3:9092,node4:9092,node5:9092 
    kafka3.sinks.k1.partition.key=0  
    kafka3.sinks.k1.partitioner.class=org.apache.flume.plugins.SinglePartition  
    kafka3.sinks.k1.serializer.class=kafka.serializer.StringEncoder  
    kafka3.sinks.k1.request.required.acks=0  
    kafka3.sinks.k1.max.message.size=1000000  
    kafka3.sinks.k1.producer.type=sync 
    kafka3.sinks.k1.custom.encoding=UTF-8  
    #kafka3.sinks.k1.custom.topic.name=test
    kafka3.sinks.k1.kafka.topic=test
    kafka3.sinks.k1.channel = c1
    kafka3.sinks.k1.zkconnect = node1:2181,node2:2181,node3:2181,node4:2181,node5:2181

    程序启动顺序:
    1、启动zookeeper
    2、启动hdfs
    3、启动kafka
    4、首先启动flume的server

    启动命令:
    1、node2节点
    flume-ng agent --conf conf --conf-file /root/myInstall/flume-1.7.0-bin/properties/flume-hdfs-server1.properties --name hdfs1 -Dflume.root.logger=INFO,console > /root/myInstall/flume-1.7.0-bin/logs/flume-hdfs-server1.log 2>&1 & 
    2、node3节点
    flume-ng agent --conf conf --conf-file /root/myInstall/flume-1.7.0-bin/properties/flume-hdfs-server2.properties --name hdfs2 -Dflume.root.logger=INFO,console > /root/myInstall/flume-1.7.0-bin/logs/flume-hdfs-server2.log 2>&1 & 
    flume-ng agent --conf conf --conf-file /root/myInstall/flume-1.7.0-bin/properties/flume-kafka-server1.properties --name kafka1 -Dflume.root.logger=INFO,console > /root/myInstall/flume-1.7.0-bin/logs/flume-kafka-server1.log 2>&1 & 
    3、node4节点
    flume-ng agent --conf conf --conf-file /root/myInstall/flume-1.7.0-bin/properties/flume-kafka-server2.properties --name kafka2 -Dflume.root.logger=INFO,console > /root/myInstall/flume-1.7.0-bin/logs/flume-kafka-server2.log 2>&1 & 
    4、node5节点
    flume-ng agent --conf conf --conf-file /root/myInstall/flume-1.7.0-bin/properties/flume-kafka-server3.properties --name kafka3 -Dflume.root.logger=INFO,console > /root/myInstall/flume-1.7.0-bin/logs/flume-kafka-server3.log 2>&1 & 

    5、然后启动flume的client

    1、node1节点
    flume-ng agent --conf conf --conf-file /root/myInstall/flume-1.7.0-bin/properties/flume-kafka-hdfs-client.properties --name agent -Dflume.root.logger=INFO,console > /root/myInstall/flume-1.7.0-bin/logs/flume-kafka-hdfs-client.log 2>&1 & 
    2、node2节点
    flume-ng agent --conf conf --conf-file /root/myInstall/flume-1.7.0-bin/properties/flume-kafka-hdfs-client.properties --name agent -Dflume.root.logger=INFO,console > /root/myInstall/flume-1.7.0-bin/logs/flume-kafka-hdfs-client.log 2>&1 & 

    好了,至此flume整合hdfs和kafka就搭建好了!

    展开全文
  • cd /usr/local/flume/conf vim flume-exec-total.conf ...#通过sink把数据分别输出到kafka和HDFS上 # Name the components on this agent agent.sources = r1 agent.sinks = k1 k2 agent.channels = c1 c2 ...

    cd /usr/local/flume/conf

    vim flume-exec-total.conf

    ## Explain
    #通过sink把数据分别输出到kafka和HDFS上
    
    
    # Name the components on this agent
    agent.sources = r1
    agent.sinks = k1 k2
    agent.channels = c1 c2
    
    # Describe/configuration the source
    agent.sources.r1.type = exec
    agent.sources.r1.command = tail -f /root/test.log
    agent.sources.r1.shell = /bin/bash -c 
    
    ## kafka
    #Describe the sink
    agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    agent.sinks.k1.topic = kafkatest
    agent.sinks.k1.brokerList = master:9092
    agent.sinks.k1.requiredAcks = 1
    agent.sinks.k1.batchSize = 2
    
    # Use a channel which buffers events in memory 
    agent.channels.c1.type = memory
    agent.channels.c1.capacity = 1000
    #agent.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    agent.sources.r1.channels = c1 c2
    agent.sinks.k1.channel = c1
    
    ## hdfs
    #Describe the sink
    agent.sinks.k2.type = hdfs
    agent.sinks.k2.hdfs.path = hdfs://master:9000/data/flume/tail
    agent.sinks.k2.hdfs.fileType=DataStream
    agent.sinks.k2.hdfs.writeFormat=Text
    #agent.sinks.k2.hdfs.rollInterval = 0
    #agent.sinks.k2.hdfs.rollSize = 134217728
    #agent.sinks.k2.hdfs.rollCount = 1000000
    agent.sinks.k2.hdfs.batchSize=10
    
    ## Use a channel which buffers events in memory 
    agent.channels.c2.type = memory
    #agent.channels.c1.capacity = 1000
    #agent.channels.c2.transactionCapacity = 100
    
    ## Bind the source and sink to the channel
    #agent.sources.r1.channels = c2
    agent.sinks.k2.channel = c2

     

    验证:

    1. 首先启动HDFS和kafka

    2. 创建topic

    kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 1 --partitions 1 --topic kafkatest

    启动flume以及测试

    3. 启动Flume

    服务端
    /usr/local/flume/bin/flume-ng agent -f flume-exec-total.conf -n agent -Dflume.root.logger=INFO, console
    
    客户端
    echo "wangzai doubi" > test.log

    4. 启动kafka客户端

    /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic kafkatest --from-beginning

     

    结果如图:

    Flume服务端:

    HDFS:

    Kafka客户端:

     

    转载于:https://www.cnblogs.com/654wangzai321/p/9693177.html

    展开全文
  • 进入flume安装目录的conf配置文件夹: [root@qiyu01 apache-flume-1.6.0-bin]# cd /opt/modules/apache-flume-1.6.0-bin/conf 创建flume配置文件并编辑...#通过sink把数据分别输出到kafka和HDFS上 # Name the comp

    进入flume安装目录的conf配置文件夹:

    [root@qiyu01 apache-flume-1.6.0-bin]# cd /opt/modules/apache-flume-1.6.0-bin/conf
    

    创建flume配置文件并编辑:

    [root@qiyu01 conf]# vi flumeByHDFS_Kafka.conf

    文件内容:(我的是ambari+hdp集群,kafka的端口为6667)

    #通过sink把数据分别输出到kafka和HDFS上
    
    # Name the components on this agent
    agent.sources = r1
    agent.sinks = k1 k2
    agent.channels = c1 c2
    
    # Describe/configuration the source
    agent.sources.r1.type = exec
    agent.sources.r1.command = tail -f /opt/flume_test/flumeByHDFS_Kafka.log
    agent.sources.r1.shell = /bin/bash -c 
    
    ## kafka
    #Describe the sink
    agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    agent.sinks.k1.topic = flumeByKafka_HDFS
    agent.sinks.k1.brokerList = qiyu01.com:6667,qiyu02.com:6667,qiyu03.com:6667
    agent.sinks.k1.requiredAcks = 1
    agent.sinks.k1.batchSize = 2
    
    # Use a channel which buffers events in memory 
    agent.channels.c1.type = memory
    agent.channels.c1.capacity = 1000
    #agent.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    agent.sources.r1.channels = c1 c2
    agent.sinks.k1.channel = c1
    
    ## hdfs
    #Describe the sink
    agent.sinks.k2.type = hdfs
    agent.sinks.k2.hdfs.path = hdfs://qiyu01.com:8020/weblog/%y/%m
    agent.sinks.k2.hdfs.fileType=DataStream
    agent.sinks.k2.hdfs.writeFormat=Text
    agent.sinks.k2.hdfs.rollInterval = 0
    agent.sinks.k2.hdfs.rollSize = 134217728
    agent.sinks.k2.hdfs.rollCount = 0
    agent.sinks.k2.hdfs.batchSize=10
    agent.sinks.k2.hdfs.useLocalTimeStamp = true
    
    ## Use a channel which buffers events in memory 
    agent.channels.c2.type = memory
    #agent.channels.c1.capacity = 1000
    #agent.channels.c2.transactionCapacity = 100
    
    ## Bind the source and sink to the channel
    #agent.sources.r1.channels = c2
    agent.sinks.k2.channel = c2

    回到flume的安装目录并执行启动flume:

    [root@qiyu01 conf]# cd ..
    [root@qiyu01 apache-flume-1.6.0-bin]# bin/flume-ng agent --conf-file  conf/flumeByHDFS_Kafka.conf -c conf/ --name agent -Dflume.root.logger=DEBUG,console
    

    创建kafka的topic:

    [root@qiyu01 apache-flume-1.6.0-bin]# cd /usr/hdp/3.0.1.0-187/kafka/
    [root@qiyu01 kafka]# bin/kafka-topics.sh --create --zookeeper qiyu01.com:2181,qiyu02.com:2181,qiyu03.com:2181 --replication-factor 1 --partitions 1 --topic flumeByKafka_HDFS

    写入数据:

    检测数据:

    打开新终端,在kafka安装目录下执行如下命令,生成对topic flumeByKafka_HDFS的消费

    kakfa消费:

    hdfs存储情况:

    成功!hdfs上的_tmp文件是临时文件。达到flume文件配置的是128M才会滚动文件,形成可用的文件

    展开全文
  • 在 /usr/local/apache-flume-1.6.0-bin/lib加入flume-ng-sql-source-1.3.7.jarmysql-connector-java-5.1.32-bin.jar 5.启动kafka消费者 bin/kafka-console-consumer.sh --zookeeper node2:2181,node3:2181...
  • 串联的Agent中间要采用Avro Sink Avro Source方式进行数据传输 案例: Agent的结构:source -> channel -> sink -> source -> channel -> sink Agent,Source选择:exec->...
  • Kafka Connect HDFS Connector kafka-connect-hdfs is a Kafka Connector for copying data between Kafka and Hadoop HDFS. Documentation for this connector can be found here. Development To build a ...
  • kafka-connect-hdfs是一个用于在Kafka和Hadoop HDFS之间复制数据。 可以在找到该连接器的文档。 发展 要构建开发版本,您需要Kafka的最新版本以及一系列上游Confluent项目,您必须从其相应的快照分支中进行构建。 ...
  • Flume配置Socket输入源HDFS+Kafka输出源

    千次阅读 2019-06-05 14:07:07
    我使用得Flume版本是1.6,输入源是netcat(socket通信),输出源分别是HDFS和Kafka两个,整体框架如下所示: 具体配置如下: a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 #define the source #a1....
  • flume采集数据到hdfskafka配置文件 执行命令 nohup bin/flume-ng agent -n a10 -c conf/ -f ./conf/server/flume-taildir-kafka.conf -Dflume.root.logger=INFO,console >> ./logs/fflume-taildir-kafka....
  • 生产环境使用ambari搭建集群,一台机器突然hdfs和kafka启动不了,查看错误日志如下(部分): stderr: 2019-10-31 14:39:05,949 - ***** WARNING ***** WARNING ***** WARNING ***** WARNING ***** WARNING ***** *...
  • flume数据数据 一路到hdfs 供离线分析,一路经kafka消息中间件 配置案例
  • Flume+HDFS+Kafka+Hive实例搭建

    千次阅读 2017-03-04 17:34:07
    摘要:本文要实现的是一个使用Flume来处理Kafka的数据,并将其存储到HDFS中去,然后通过Hive外部表关联查询出来存储的数据。 所以在建立一个maven工程,整个工程最终的目录如下: 下面开始一步一步讲解 1、定义...
  • 1、kafka 数据格式,tab 分隔 bucket_online_backends world 105102 1 540 0 2019-11-25 08:10:00.0 320 2019-11-25 07:37:15.0 bucket_online_logs version 1051189902 1 555 0 2019-11-25 08:10:0.0 46 661 ...
  • 前言:本篇文章详细的介绍了Flume的Agent配置Multiple flows向Kafka以及hdfs些数据,涉及的Hadoop、Zookeeper、Kafka均是伪分布式部署。 1.基础环境 1.1硬件环境 一台4G2Core的虚拟机 1.2组件版本 组件名称 组件...
  • 文章目录1. 概述2. HDFS3. Kafka4. HBase5....推荐阅读:redis系列之——高可用(主从、哨兵、集群) 1. 概述 ...Kafka:使用了主从模式,1个 leader 多个 follower 之间需要同步数据,假如 leader 宕机,
  • Flume增量采集mysql数据库数据到HDFSKafka

    千次阅读 热门讨论 2018-07-26 17:20:48
    软件版本号 jdk1.8、apache-flume-1.6.0-bin、kafka_2.8.0-0.8.0、zookeeper-3.4.5 需要向apache-flume-1.6.0-bin/lib 放入两个jar包 flume-ng-sql-source-1.3.7.jar -->flume的mysql source 下载地址: ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 34,141
精华内容 13,656
关键字:

hdfs和kafka