2019-07-02 22:40:09 CHANGGUOLONG 阅读数 309
  • 大数据kafka详解

    Apache Kafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目.在流式计算中,Kafka一般用来缓存数据, Storm通过消费Kafka的数据进行计算。Kafka是一个分布式消息队列。本教程从kafka概述开始,讲解了kafka的集群部署,详细的工作流程,java api操作,kafka的拦截器,以及kafka streams和kafka与flume的交互.让你快速上手kafka.

    109 人正在学习 去看看 冯文凯

kafka和flume的整合

kafka和flume的整合应用非常的广泛
Flume是一个数据采集搬运工。配置数据源,可以源源不断的将数据采集过来,flume不会持久性的保存数据,但是会做一个临时性的缓存,最后还是需要sink将数据落地到外部的存储系统,比如hdfs、kafka。
实际上使用hdfs和kafka走的是两条线,flume和hdfs的整合一般都是做离线的批处理,而flume和kafka的整合一般都走的是实时流处理的路线。
在这里插入图片描述

案例

需求,用shell脚本实现每秒向一个文件中追加一条时间的信息,来模拟实时的日志生成的过程,使用flume实时监视此文件,将文件中增加的内容收集起来,传入到kafka,也就是flume作为kafka的生产者.使用API的消费者进行消费,将消息实时的读取出来

1.编写shell脚本

#!bin/bash
while true
do
echo $(date) >> /home/hadoop/apps/kafka_2
.11-1.1.1/time.txt
sleep 1
done

2.编写flume的配置文件

# 指定各个核心组件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 准备数据源
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/apps/kafka_2.11-1.1.1/time.txt

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic=hadoop
a1.sinks.k1.kafka.bootstrap.servers=hadoop01:9092,hadoop02:9092,hadoop03:9092
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3.消费者代码实现

package com.chang;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class Myconsumer {
    public static void main(String[] args) {
        Consumer consumer=null;
        Properties properties=new Properties();
        try {
            //加载配置文件
            properties.load(Myconsumer.class.getClassLoader().getResourceAsStream("consumer.properties"));
            consumer = new KafkaConsumer(properties);
            /*订阅topic,因为 public void subscribe(Collection<String> topics);接收的是一个集合的类型
            *所以要对topic的封装在集合中
             */
            List<String> list=new ArrayList<>();
            list.add("hadoop");
            consumer.subscribe(list);
                //循环接收
            while (true){
                //消费,指定拉取的时间间隔
                ConsumerRecords<Integer, String> records = consumer.poll(1000);
            for (ConsumerRecord<Integer, String> record : records) {
                //获取消息的属性并打印
                String topic = record.topic();
                int partition = record.partition();
                Integer key = record.key();
                String value = record.value();
                long offset = record.offset();
                System.out.println(String.format("topic %s\t,partition %d\t,key:%d\t,value:%s\t,offset:%d\t",
                        topic, partition, key, value, offset));

            }
        }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
           // consumer.close();
        }
    }
}

配置文件

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see org.apache.kafka.clients.consumer.ConsumerConfig for more details

# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=hadoop01:9092,haoop02:9092,hadoop03:9092

# consumer group id
group.id=myconsumer

# 消费数据的方式:latest(从偏移量最新的位置开始消费), earliest(从偏移量最早的位置开始消费)
# 默认latest
auto.offset.reset=earliest
#指定反序列化
#key对应的反序列化器de
key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer

#value对应的序列化器这两个参数如果不指定的话会报错
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

4.打开消费者

5.运行脚本

sh: vim time_seconds.sh

6.运行flume

bin/flume-ng agent -n a1 -c conf -f agentconf/exec_source_kafka_sink.properties

7.观察消费者读取到的信息

在这里插入图片描述

2018-11-07 12:11:35 qq_15508167 阅读数 833
  • 大数据kafka详解

    Apache Kafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目.在流式计算中,Kafka一般用来缓存数据, Storm通过消费Kafka的数据进行计算。Kafka是一个分布式消息队列。本教程从kafka概述开始,讲解了kafka的集群部署,详细的工作流程,java api操作,kafka的拦截器,以及kafka streams和kafka与flume的交互.让你快速上手kafka.

    109 人正在学习 去看看 冯文凯

一、整体步骤:

1.首先安装kafka,配置flume。创建kafka的topic(利用zookeeper进行管理,所以首先要安装zookeeper)

2.将文件放置在flume的source目录下,启动flume。将文件读取到指定的kafka的topic中。

3.启动的kafka的consumer端。

 

二、具体整合过程:

1.前提kafka和flume已经安装好,我们主要讲解整合过程。

2,创建kafka的topic

:[root@hadoop11 ~]# kafka-topic.sh --create --topic mytopic --replication-factor 1 --partition 10 --zookeeper localhosts:2181

查看创建topic:

[root@hadoop11 ~]# kafka-topic.sh --list --zookeeper localhosts:2181   

3.flume的读取文件到kafka的配置,在flume的conf目录下创建flume-dirToKafka.properties,添加如下配置

[root@hadoop11 conf]# cat flume-dirToKafka.properties 
#agent1 name
agent1.sources=source1
agent1.sinks=sink1
agent1.channels=channel1


#set source1
agent1.sources.source1.type=spooldir

#注意创建目录的权限问题:chmod 777 -R (flumePath)和(dir)
agent1.sources.source1.spoolDir=/yangxiaohai/flumePath/dir/logdfs
agent1.sources.source1.channels=channel1
agent1.sources.source1.fileHeader = false
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = timestamp

 

#set sink1

#设置获取数据存储位置,这里是kafka,如果是hdfs,就设置为相应的hdfs
agent1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.sink1.topic =mytopic(创建的kafka topic)
agent1.sinks.sink1.brokerList = hadoop11:9092,hadoop12:9092,hadoop13:9092
agent1.sinks.sink1.requiredAcks = 1
agent1.sinks.sink1.batchSize = 100
agent1.sinks.sink1.channel = channel1

 

#set channel1
agent1.channels.channel1.type=file
agent1.channels.channel1.checkpointDir=/yangxiaohai/flumePath/dir/logdfstmp/point 
agent1.channels.channel1.dataDirs=/yangxiaohai/flumePath/dir/logdfstmp

4.启动flume:

注意:agent1为配置文件中设置的agent命名,要对应,不然启动会卡主不动。大致为:no configuration host=错误名

[root@hadoop11 bin]# ./flume-ng agent --conf conf --conf-file ../conf/flume-dirToKafka.properties --name agent1 -Dflume.root.logger=INFO,console 

注意:在这里可能会报一个错误,如下:

18/11/07 00:39:29 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: sink1 started
18/11/07 00:39:29 ERROR kafka.KafkaSink: Failed to publish events
java.lang.IllegalStateException: Empty value [channel=[channel=channel1]]
        at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
        at org.apache.flume.channel.file.FlumeEventQueue.removeHead(FlumeEventQueue.java:160)
        at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doTake(FileChannel.java:512)
        at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
        at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:97)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
18/11/07 00:39:29 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events

解决方法:

     删除checkpointDir=/flumePath/dir/logdfstmp/point 目录下的所有文件,再次启动,就可以了。

flume启动成功后,会显示如下,已将spoolDir=/flumePath/dir/logdfs下的文件读入topic中,在此处于停留监控状态,当我们在监控目录下传入数据时,会在此下面显示:

18/11/07 00:47:20 INFO producer.SyncProducer: Connected to hadoop13:9092 for producing
18/11/07 00:47:20 INFO producer.SyncProducer: Disconnecting from hadoop13:9092
18/11/07 00:47:20 INFO producer.SyncProducer: Disconnecting from hadoop12:9092
18/11/07 00:47:20 INFO producer.SyncProducer: Connected to hadoop12:9092 for producing
18/11/07 00:47:43 INFO file.EventQueueBackingStoreFile: Start checkpoint for /yangxiaohai/flumePath/dir/logdfstmp/point/checkpoint, elements to sync = 26806
18/11/07 00:47:43 INFO file.EventQueueBackingStoreFile: Updating checkpoint metadata: logWriteOrderID: 1541522865614, queueSize: 0, queueHead: 43508
18/11/07 00:47:43 INFO file.Log: Updated checkpoint for file:/yangxiaohai/flumePath/dir/logdfstmp/log-12 position: 1108730 logWriteOrderID: 1541522865614
18/11/07 00:47:43 INFO file.LogFile: Closing RandomReader /yangxiaohai/flumePath/dir/logdfstmp/log-8
18/11/07 00:47:43 INFO file.LogFile: Closing RandomReader /yangxiaohai/flumePath/dir/logdfstmp/log-9
18/11/07 00:47:43 INFO file.LogFile: Closing RandomReader /yangxiaoha
i/flumePath/dir/logdfstmp/log-10

5.这时flume已经source目录下的文件读入kafka的mytopic中,这时我们启动kafka的consumer,这时会有文件不断的被独处,结果如下:

启动consumer:

[root@hadoop11 ~]# kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic(创建的topic) --from-beginning

读取的文件的结果:

{"timestamp":"2017-02-11T10:49:43.043Z","url":"/original/index_6.html","is_entrance":0,"data_type":"null","channel":"null","to_target":0,"keyword":"","search_engine":"","country":"中国","area":"华北","province":"北京市","city":"北京市","use_duration_cat":"null","domain":"www.donews.com","to_domain":0,"use_interval_cat":"null","is_exit":0,"event":"startup","os":"Windows XP","os_type":"pc","browser":"Firefox","browser_version":"Firefox 9.0.1","suuid":"47ab648cb5c15bc8e1952efc16a037cb","short_cookie":"null","ip":"118.192.66.41","use_duration":0,"use_interval":0,"pv_cat":"null","event_name":[],"refer":"","hour":"10","gender":"null","age":0,"account_level":0,"payment_method":"","consumption_point":"","money":0.0,"account":"","zone_id":"","app_version":"","network":"null","nettype":"null","lang":"","app_upgrade_from":"","display":"null","device_type":"null","register_days":0,"refer_domain":"null","appkey":"donews_website_nginx_log","day":"2017-02-11"}
{"timestamp":"2017-02-11T10:22:01.001Z","url":"/column/get_adv_bottom","is_entrance":0,"data_type":"null","channel":"null","to_target":0,"keyword":"","search_engine":"","country":"中国","area":"华东","province":"福建省","city":"福州市","use_duration_cat":"null","domain":"www.donews.com","to_domain":0,"use_interval_cat":"null","is_exit":0,"event":"jump","os":"Windows 7","os_type":"pc","browser":"Internet Explorer","browser_version":"Internet Explorer 7.0","suuid":"4f41eff515e7be6774749383270794e7","short_cookie":"null","ip":"112.5.236.153","use_duration":0,"use_interval":0,"pv_cat":"null","event_name":[],"refer":"http://www.donews.com/ent/index","hour":"10","gender":"null","age":0,"account_level":0,"payment_method":"","consumption_point":"","money":0.0,"account":"","zone_id":"","app_version":"","network":"null","nettype":"null","lang":"","app_upgrade_from":"","display":"null","device_type":"null","register_days":0,"refer_domain":"null","appkey":"donews_website_nginx_log","day":"2017-02-11"}
{"timestamp":"2017-02-11T10:22:14.014Z","url":"/editor/person/34","is_entrance":0,"data_type":"null","channel":"null","to_target":0,"keyword^C
[root@hadoop11 ~]# ^C

6.这时当我们在次向flume的spoolDir=/flumePath/dir/logdfs下传入文件时,flume将会监控到,并显示。我们传入一个test.txt.

[root@hadoop11 dir]# scp -r test.txt ./logdfs
[root@hadoop11 dir]# cd logdfs
[root@hadoop11 logdfs]# ll
-rw-r--r--. 1 root root 14506039 8月  19 2017 sdkJson.log.COMPLETED
-rw-r--r--. 1 root root       34 11月  7 00:50 test.txt.COMPLETED

flume端的监控结果:

18/11/07 00:47:43 INFO file.LogFile: Closing RandomReader /yangxiaohai/flumePath/dir/logdfstmp/log-8
18/11/07 00:47:43 INFO file.LogFile: Closing RandomReader /yangxiaohai/flumePath/dir/logdfstmp/log-9
18/11/07 00:47:43 INFO file.LogFile: Closing RandomReader /yangxiaohai/flumePath/dir/logdfstmp/log-10

(以上是启动时停留的位置,下面是添加文件后增加的日志)


18/11/07 00:50:58 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one.
18/11/07 00:50:58 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /flumePath/dir/logdfs/test.txt to /yangxiaohai/flumePath/dir/logdfs/test.txt.COMPLETED
18/11/07 00:51:13 INFO file.EventQueueBackingStoreFile: Start checkpoint for /yangxiaohai/flumePath/dir/logdfstmp/point/checkpoint, elements to sync = 2
18/11/07 00:51:13 INFO file.EventQueueBackingStoreFile: Updating checkpoint metadata: logWriteOrderID: 1541522865621, queueSize: 0, queueHead: 43508
18/11/07 00:51:13 INFO file.Log: Updated checkpoint for file:/yangxiaohai/flumePath/dir/logdfstmp/log-12 position: 1109060 logWriteOrderID: 1541522865621

18/11/07 00:51:13 INFO file.Log: Removing old file:/yangxiaohai /flumePath/dir/logdfstmp/log-8
18/11/07 00:51:13 INFO file.Log: Removing old file:/yangxiaohai/flumePath/dir/logdfstmp/log-8.meta
18/11/07 00:51:13 INFO file.Log: Removing old file: /yangxiaohai/flumePath/dir/logdfstmp/log-9
18/11/07 00:51:13 INFO file.Log: Removing old file: /yangxiaohai/flumePath/dir/logdfstmp/log-9.meta
18/11/07 00:51:13 INFO file.Log: Removing old file: /yangxiaohai/flumePath/dir/logdfstmp/log-10
18/11/07 00:51:13 INFO file.Log: Removing old file: /yangxiaohai/flumePath/dir/logdfstmp/log-10.meta

 

2016-08-21 17:54:40 xiaoyu_BD 阅读数 3717
  • 大数据kafka详解

    Apache Kafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目.在流式计算中,Kafka一般用来缓存数据, Storm通过消费Kafka的数据进行计算。Kafka是一个分布式消息队列。本教程从kafka概述开始,讲解了kafka的集群部署,详细的工作流程,java api操作,kafka的拦截器,以及kafka streams和kafka与flume的交互.让你快速上手kafka.

    109 人正在学习 去看看 冯文凯

本文介绍一些关于kafka有关的数据交互。

Flume+kafka

1. 安装Flume

安装包:  apache-flume-1.6.0-bin

安装过程这里不予说明,详见《Flume-ng安装.pdf

2. 安装kafka

安装包:  kafka_2.11-0.9.0.0

安装过程这里不予说明,详见《kafka集群安装.pdf》

3. 下载插件包

下载地址:https://github.com/beyondj2ee/flumeng-kafka-plugin

包名: flumeng-kafka-plugin-master.zip

4. 修改配置文件

提取插件中的flume-conf.properties文件

修改该文件:

#source section

producer.sources.s.type = exec

producer.sources.s.command = tail -f -n+1 /app/data/flume/test.log

producer.sources.s.channels = c

 

producer.sinks.r.custom.topic.name=test2

 

consumer.sources.s.zookeeper.connect=127.0.0.1:42182

consumer.sources.s.custom.topic.name=test2

红色部分可根据实际情况自行修改

将改后的配置文件放进flume/conf目录下。

5. 添加jar

插件libs目录下

 

插件package目录下

 

如果是flume1.6.0版本还需要添加kafka项目libs目录中的

kafka-clients-0.9.0.0.jar

把以上jar包全部添加到flume的libs目录下。

6. 启动

先启动zookeeper

zkServer.sh  start        (注意启动目录生成out文件)

 

启动kafka

bin/kafka-server-start.sh -daemon config/server.properties &

 

最后再启动flume

nohup bin/flume-ng agent --conf conf --conf-file conf/flume.conf.properties--name producer -Dflume.root.logger=INFO,console &

7. 测试

echo aaaaaaaaaaaaaaaa” 〉〉/app/data/flume/test.log

查看kafka消费信息

bin/kafka-console-consumer.sh --zookeeper localhost:42182 --topic test2 --from-beginning

2019-05-14 18:27:41 ZJX103RLF 阅读数 143
  • 大数据kafka详解

    Apache Kafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目.在流式计算中,Kafka一般用来缓存数据, Storm通过消费Kafka的数据进行计算。Kafka是一个分布式消息队列。本教程从kafka概述开始,讲解了kafka的集群部署,详细的工作流程,java api操作,kafka的拦截器,以及kafka streams和kafka与flume的交互.让你快速上手kafka.

    109 人正在学习 去看看 冯文凯

整合kafka与flume

读kafka中topic的消息发到hdfs,flume配置:

注意:

我不确定是不是必须要写成zookeeper的leader,我写成本机zookeeper的时候报一个错误,大概就是没找到zookooper,我换成leader的域名slave3就好了

所以agent.sources.kafkaSource.zookeeperConnect=slave3:2181,这个需要自己试一下

其他的就是topic和hdfs的地址需要改一下,应该就可以了。

agent.sources = kafkaSource
agent.channels = memoryChannel
agent.sinks = hdfsSink


# The channel can be defined as follows.
agent.sources.kafkaSource.channels = memoryChannel
agent.sources.kafkaSource.type=org.apache.flume.source.kafka.KafkaSource
agent.sources.kafkaSource.zookeeperConnect=slave3:2181
agent.sources.kafkaSource.topic=zjx
#agent.sources.kafkaSource.groupId=flume
agent.sources.kafkaSource.kafka.consumer.timeout.ms=100

agent.channels.memoryChannel.type=memory
agent.channels.memoryChannel.capacity=1000
agent.channels.memoryChannel.transactionCapacity=100


# the sink of hdfs
agent.sinks.hdfsSink.type=hdfs
agent.sinks.hdfsSink.channel = memoryChannel
agent.sinks.hdfsSink.hdfs.path=hdfs://slave3:9000/zjx/rizhi_kafka/%y-%m-%d/%H-%M
agent.sinks.hdfsSink.hdfs.writeFormat=Text
agent.sinks.hdfsSink.hdfs.fileType=DataStream

首先起hdfs,这个前面有说过,

然后就是起kafka

kafka-server-start.sh config/server.properties

kafka-server-start.sh config/server.properties &     //这个是运行在后台的命令,ctrl+c以后kafka也不会退出

创建topic
./kafka-topics.sh --zookeeper b12node1:2181 --create  --partitions 2 --replication-factor 1  --topic zjx

观察是否有注册到zookeeper上
./zkCli.sh -server b12node1:2181

也可以在kafka直接用命令查看topic
./kafka-topics.sh --list --zookeeper b12node1:2181

输入以下命令可以写入消息(生产者)
./kafka-console-producer.sh --broker-list b12node1:9092 --topic zjx

输入以下命令查看消息(消费者)
./kafka-console-consumer.sh --zookeeper b12node1:2181 --topic zjx --from-beginning

 

启动flume
 bin/flume-ng agent -c conf/ -f dir-hdfs.properties -n ag1 -Dflume.root.logger=INFO,console

这个是后台运行的命令
nohup bin/flume-ng agent -c conf/ -f dir-hdfs.properties -n ag1 1>/dev/null 2>&1 &

然后在hdfs上查看文件,END.

2018-11-10 20:30:48 gnak4321 阅读数 524
  • 大数据kafka详解

    Apache Kafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目.在流式计算中,Kafka一般用来缓存数据, Storm通过消费Kafka的数据进行计算。Kafka是一个分布式消息队列。本教程从kafka概述开始,讲解了kafka的集群部署,详细的工作流程,java api操作,kafka的拦截器,以及kafka streams和kafka与flume的交互.让你快速上手kafka.

    109 人正在学习 去看看 冯文凯

Kafka和flume作为数据采集通道的区别:
将数据从某一个数据源导入HDFS或者HBase,Kafka是一个半成品,需要自己完成消费者程序的编写,而flume只需要改配置就可以导数据进入HDFS或者HBase,相当于Flume自身包含了消费者程序,不需要程序员去开发。另外,Flume自带的interceptors也可以用来处理数据,而Kafka如果要处理数据还需要接入外部流处理系统,比如storm,spark等。

所以实际生产中,通常是外部数据源采数据到Kafka,Kafka再通过Flume导数据到HDFS。为什么不直接将外部数据源接入Flume,这是因为Kafka可以实现多生产者和多消费者,可以接入外部各种各样的数据源,同时Kafka可以同时接入Flume之外的其他数据导入系统,这样以后扩展更灵活。

Kafka 与 Flume

阅读数 414

没有更多推荐了,返回首页