kafka的数据结构_kafka数据结构 - CSDN
  • Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition存储一部分Messa...

    参考资料:https://blog.csdn.net/gongxinju/article/details/72672375 以后继续深入总结。

    Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition存储一部分Message。借用官方的一张图,可以直观地看到topic和partition的关系。

    Anatomy of a Topic

    partition是以文件的形式存储在文件系统中,比如,创建了一个名为page_visits的topic,其有5个partition,那么在Kafka的数据目录中(由配置文件中的log.dirs指定的)中就有这样5个目录: page_visits-0, page_visits-1,page_visits-2,page_visits-3,page_visits-4,其命名规则为<topic_name>-<partition_id>,里面存储的分别就是这5个partition的数据。

    接下来,本文将分析partition目录中的文件的存储格式和相关的代码所在的位置。

    3.1、Partition的数据文件

    Partition中的每条Message由offset来表示它在这个partition中的偏移量,这个offset不是该Message在partition数据文件中的实际存储位置,而是逻辑上一个值,它唯一确定了partition中的一条Message。因此,可以认为offset是partition中Message的id。partition中的每条Message包含了以下三个属性:

    • offset
    • MessageSize
    • data

    其中offset为long型,MessageSize为int32,表示data有多大,data为message的具体内容。它的格式和Kafka通讯协议中介绍的MessageSet格式是一致。

    Partition的数据文件则包含了若干条上述格式的Message,按offset由小到大排列在一起。它的实现类为FileMessageSet,类图如下:
    FileMessageSet类图
    它的主要方法如下:

    • append: 把给定的ByteBufferMessageSet中的Message写入到这个数据文件中。
    • searchFor: 从指定的startingPosition开始搜索找到第一个Message其offset是大于或者等于指定的offset,并返回其在文件中的位置Position。它的实现方式是从startingPosition开始读取12个字节,分别是当前MessageSet的offset和size。如果当前offset小于指定的offset,那么将position向后移动LogOverHead+MessageSize(其中LogOverHead为offset+messagesize,为12个字节)。
    • read:准确名字应该是slice,它截取其中一部分返回一个新的FileMessageSet。它不保证截取的位置数据的完整性。
    • sizeInBytes: 表示这个FileMessageSet占有了多少字节的空间。
    • truncateTo: 把这个文件截断,这个方法不保证截断位置的Message的完整性。
    • readInto: 从指定的相对位置开始把文件的内容读取到对应的ByteBuffer中。

    我们来思考一下,如果一个partition只有一个数据文件会怎么样?

    1. 新数据是添加在文件末尾(调用FileMessageSet的append方法),不论文件数据文件有多大,这个操作永远都是O(1)的。
    2. 查找某个offset的Message(调用FileMessageSet的searchFor方法)是顺序查找的。因此,如果数据文件很大的话,查找的效率就低。

    那Kafka是如何解决查找效率的的问题呢?有两大法宝:1) 分段 2) 索引。

    3.2、数据文件的分段

    Kafka解决查询效率的手段之一是将数据文件分段,比如有100条Message,它们的offset是从0到99。假设将数据文件分成5段,第一段为0-19,第二段为20-39,以此类推,每段放在一个单独的数据文件里面,数据文件以该段中最小的offset命名。这样在查找指定offset的Message的时候,用二分查找就可以定位到该Message在哪个段中。

    3.3、为数据文件建索引

    数据文件分段使得可以在一个较小的数据文件中查找对应offset的Message了,但是这依然需要顺序扫描才能找到对应offset的Message。为了进一步提高查找的效率,Kafka为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为.index。
    索引文件中包含若干个索引条目,每个条目表示数据文件中一条Message的索引。索引包含两个部分(均为4个字节的数字),分别为相对offset和position。

    • 相对offset:因为数据文件分段以后,每个数据文件的起始offset不为0,相对offset表示这条Message相对于其所属数据文件中最小的offset的大小。举例,分段后的一个数据文件的offset是从20开始,那么offset为25的Message在index文件中的相对offset就是25-20 = 5。存储相对offset可以减小索引文件占用的空间。
    • position,表示该条Message在数据文件中的绝对位置。只要打开文件并移动文件指针到这个position就可以读取对应的Message了。

    index文件中并没有为数据文件中的每条Message建立索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。但缺点是没有建立索引的Message也不能一次定位到其在数据文件的位置,从而需要做一次顺序扫描,但是这次顺序扫描的范围就很小了。

    在Kafka中,索引文件的实现类为OffsetIndex,它的类图如下:
    OffsetIndex类图

    主要的方法有:

    • append方法,添加一对offset和position到index文件中,这里的offset将会被转成相对的offset。
    • lookup, 用二分查找的方式去查找小于或等于给定offset的最大的那个offset

    小结

    我们以几张图来总结一下Message是如何在Kafka中存储的,以及如何查找指定offset的Message的。

    Message是按照topic来组织,每个topic可以分成多个的partition,比如:有5个partition的名为为page_visits的topic的目录结构为:
    topic_partition

    partition是分段的,每个段叫LogSegment,包括了一个数据文件和一个索引文件,下图是某个partition目录下的文件:
    partition
    可以看到,这个partition有4个LogSegment。

    展示是如何查找Message的。
    search
    比如:要查找绝对offset为7的Message:

    1. 首先是用二分查找确定它是在哪个LogSegment中,自然是在第一个Segment中。
    2. 打开这个Segment的index文件,也是用二分查找找到offset小于或者等于指定offset的索引条目中最大的那个offset。自然offset为6的那个索引是我们要找的,通过索引文件我们知道offset为6的Message在数据文件中的位置为9807。
    3. 打开数据文件,从位置为9807的那个地方开始顺序扫描直到找到offset为7的那条Message。

    这套机制是建立在offset是有序的。索引文件被映射到内存中,所以查找的速度还是很快的。

    一句话,Kafka的Message存储采用了分区(partition),分段(LogSegment)和稀疏索引这几个手段来达到了高效性。

    展开全文
  • Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition存储一部分Message。借用官方...

    引言

    Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition存储一部分Message。借用官方的一张图,可以直观地看到topic和partition的关系。
    Anatomy of a Topic

    partition是以文件的形式存储在文件系统中,比如,创建了一个名为page_visits的topic,其有5个partition,那么在Kafka的数据目录中(由配置文件中的log.dirs指定的)中就有这样5个目录: page_visits-0, page_visits-1,page_visits-2,page_visits-3,page_visits-4,其命名规则为<topic_name>-<partition_id>,里面存储的分别就是这5个partition的数据。

    接下来,本文将分析partition目录中的文件的存储格式和相关的代码所在的位置。

    3.1、Partition的数据文件

    Partition中的每条Message由offset来表示它在这个partition中的偏移量,这个offset不是该Message在partition数据文件中的实际存储位置,而是逻辑上一个值,它唯一确定了partition中的一条Message。因此,可以认为offset是partition中Message的id。partition中的每条Message包含了以下三个属性:

    • offset
    • MessageSize
    • data

    其中offset为long型,MessageSize为int32,表示data有多大,data为message的具体内容。它的格式和Kafka通讯协议中介绍的MessageSet格式是一致。

    Partition的数据文件则包含了若干条上述格式的Message,按offset由小到大排列在一起。它的实现类为FileMessageSet,类图如下:
    FileMessageSet类图
    它的主要方法如下:

    • append: 把给定的ByteBufferMessageSet中的Message写入到这个数据文件中。
    • searchFor: 从指定的startingPosition开始搜索找到第一个Message其offset是大于或者等于指定的offset,并返回其在文件中的位置Position。它的实现方式是从startingPosition开始读取12个字节,分别是当前MessageSet的offset和size。如果当前offset小于指定的offset,那么将position向后移动LogOverHead+MessageSize(其中LogOverHead为offset+messagesize,为12个字节)。
    • read:准确名字应该是slice,它截取其中一部分返回一个新的FileMessageSet。它不保证截取的位置数据的完整性。
    • sizeInBytes: 表示这个FileMessageSet占有了多少字节的空间。
    • truncateTo: 把这个文件截断,这个方法不保证截断位置的Message的完整性。
    • readInto: 从指定的相对位置开始把文件的内容读取到对应的ByteBuffer中。

    我们来思考一下,如果一个partition只有一个数据文件会怎么样?

    1. 新数据是添加在文件末尾(调用FileMessageSet的append方法),不论文件数据文件有多大,这个操作永远都是O(1)的。
    2. 查找某个offset的Message(调用FileMessageSet的searchFor方法)是顺序查找的。因此,如果数据文件很大的话,查找的效率就低。

    那Kafka是如何解决查找效率的的问题呢?有两大法宝:1) 分段 2) 索引。

    3.2、数据文件的分段

    Kafka解决查询效率的手段之一是将数据文件分段,比如有100条Message,它们的offset是从0到99。假设将数据文件分成5段,第一段为0-19,第二段为20-39,以此类推,每段放在一个单独的数据文件里面,数据文件以该段中最小的offset命名。这样在查找指定offset的Message的时候,用二分查找就可以定位到该Message在哪个段中。

    3.3、为数据文件建索引

    数据文件分段使得可以在一个较小的数据文件中查找对应offset的Message了,但是这依然需要顺序扫描才能找到对应offset的Message。为了进一步提高查找的效率,Kafka为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为.index。
    索引文件中包含若干个索引条目,每个条目表示数据文件中一条Message的索引。索引包含两个部分(均为4个字节的数字),分别为相对offset和position。

    • 相对offset:因为数据文件分段以后,每个数据文件的起始offset不为0,相对offset表示这条Message相对于其所属数据文件中最小的offset的大小。举例,分段后的一个数据文件的offset是从20开始,那么offset为25的Message在index文件中的相对offset就是25-20 = 5。存储相对offset可以减小索引文件占用的空间。
    • position,表示该条Message在数据文件中的绝对位置。只要打开文件并移动文件指针到这个position就可以读取对应的Message了。

    index文件中并没有为数据文件中的每条Message建立索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。但缺点是没有建立索引的Message也不能一次定位到其在数据文件的位置,从而需要做一次顺序扫描,但是这次顺序扫描的范围就很小了。

    在Kafka中,索引文件的实现类为OffsetIndex,它的类图如下:
    OffsetIndex类图

    主要的方法有:

    • append方法,添加一对offset和position到index文件中,这里的offset将会被转成相对的offset。
    • lookup, 用二分查找的方式去查找小于或等于给定offset的最大的那个offset

    小结

    我们以几张图来总结一下Message是如何在Kafka中存储的,以及如何查找指定offset的Message的。

    Message是按照topic来组织,每个topic可以分成多个的partition,比如:有5个partition的名为为page_visits的topic的目录结构为:
    topic_partition

    partition是分段的,每个段叫LogSegment,包括了一个数据文件和一个索引文件,下图是某个partition目录下的文件:
    partition
    可以看到,这个partition有4个LogSegment。

    展示是如何查找Message的。
    search
    比如:要查找绝对offset为7的Message:

    1. 首先是用二分查找确定它是在哪个LogSegment中,自然是在第一个Segment中。
    2. 打开这个Segment的index文件,也是用二分查找找到offset小于或者等于指定offset的索引条目中最大的那个offset。自然offset为6的那个索引是我们要找的,通过索引文件我们知道offset为6的Message在数据文件中的位置为9807。
    3. 打开数据文件,从位置为9807的那个地方开始顺序扫描直到找到offset为7的那条Message。

    这套机制是建立在offset是有序的。索引文件被映射到内存中,所以查找的速度还是很快的。

    一句话,Kafka的Message存储采用了分区(partition),分段(LogSegment)和稀疏索引这几个手段来达到了高效性。

    展开全文
  • Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition存储一部分Message

    转载自:http://blog.csdn.net/gongxinju/article/details/72672375



    Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition存储一部分Message。借用官方的一张图,可以直观地看到topic和partition的关系。



    partition是以文件的形式存储在文件系统中,比如,创建了一个名为page_visits的topic,其有5个partition,那么在Kafka的数据目录中(由配置文件中的log.dirs指定的)中就有这样5个目录: page_visits-0, page_visits-1,page_visits-2,page_visits-3,page_visits-4,其命名规则为<topic_name>-<partition_id>,里面存储的分别就是这5个partition的数据。

    接下来,本文将分析partition目录中的文件的存储格式和相关的代码所在的位置。

    3.1、Partition的数据文件

    Partition中的每条Message由offset来表示它在这个partition中的偏移量,这个offset不是该Message在partition数据文件中的实际存储位置,而是逻辑上一个值,它唯一确定了partition中的一条Message。因此,可以认为offset是partition中Message的id。partition中的每条Message包含了以下三个属性:

    • offset
    • MessageSize
    • data

    其中offset为long型,MessageSize为int32,表示data有多大,data为message的具体内容。它的格式和Kafka通讯协议中介绍的MessageSet格式是一致。

    Partition的数据文件则包含了若干条上述格式的Message,按offset由小到大排列在一起。它的实现类为FileMessageSet,类图如下:


    它的主要方法如下:

    • append: 把给定的ByteBufferMessageSet中的Message写入到这个数据文件中。
    • searchFor: 从指定的startingPosition开始搜索找到第一个Message其offset是大于或者等于指定的offset,并返回其在文件中的位置Position。它的实现方式是从startingPosition开始读取12个字节,分别是当前MessageSet的offset和size。如果当前offset小于指定的offset,那么将position向后移动LogOverHead+MessageSize(其中LogOverHead为offset+messagesize,为12个字节)。
    • read:准确名字应该是slice,它截取其中一部分返回一个新的FileMessageSet。它不保证截取的位置数据的完整性。
    • sizeInBytes: 表示这个FileMessageSet占有了多少字节的空间。
    • truncateTo: 把这个文件截断,这个方法不保证截断位置的Message的完整性。
    • readInto: 从指定的相对位置开始把文件的内容读取到对应的ByteBuffer中。

    我们来思考一下,如果一个partition只有一个数据文件会怎么样?

    1. 新数据是添加在文件末尾(调用FileMessageSet的append方法),不论文件数据文件有多大,这个操作永远都是O(1)的。
    2. 查找某个offset的Message(调用FileMessageSet的searchFor方法)是顺序查找的。因此,如果数据文件很大的话,查找的效率就低。

    那Kafka是如何解决查找效率的的问题呢?有两大法宝:1) 分段 2) 索引。

    3.2、数据文件的分段

    Kafka解决查询效率的手段之一是将数据文件分段,比如有100条Message,它们的offset是从0到99。假设将数据文件分成5段,第一段为0-19,第二段为20-39,以此类推,每段放在一个单独的数据文件里面,数据文件以该段中最小的offset命名。这样在查找指定offset的Message的时候,用二分查找就可以定位到该Message在哪个段中。

    3.3、为数据文件建索引

    数据文件分段使得可以在一个较小的数据文件中查找对应offset的Message了,但是这依然需要顺序扫描才能找到对应offset的Message。为了进一步提高查找的效率,Kafka为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为.index。
    索引文件中包含若干个索引条目,每个条目表示数据文件中一条Message的索引。索引包含两个部分(均为4个字节的数字),分别为相对offset和position。

    • 相对offset:因为数据文件分段以后,每个数据文件的起始offset不为0,相对offset表示这条Message相对于其所属数据文件中最小的offset的大小。举例,分段后的一个数据文件的offset是从20开始,那么offset为25的Message在index文件中的相对offset就是25-20 = 5。存储相对offset可以减小索引文件占用的空间。
    • position,表示该条Message在数据文件中的绝对位置。只要打开文件并移动文件指针到这个position就可以读取对应的Message了。

    index文件中并没有为数据文件中的每条Message建立索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。但缺点是没有建立索引的Message也不能一次定位到其在数据文件的位置,从而需要做一次顺序扫描,但是这次顺序扫描的范围就很小了。

    在Kafka中,索引文件的实现类为OffsetIndex,它的类图如下:


    主要的方法有:

    • append方法,添加一对offset和position到index文件中,这里的offset将会被转成相对的offset。
    • lookup, 用二分查找的方式去查找小于或等于给定offset的最大的那个offset

    小结

    我们以几张图来总结一下Message是如何在Kafka中存储的,以及如何查找指定offset的Message的。

    Message是按照topic来组织,每个topic可以分成多个的partition,比如:有5个partition的名为为page_visits的topic的目录结构为:
    topic_partition

    partition是分段的,每个段叫LogSegment,包括了一个数据文件和一个索引文件,下图是某个partition目录下的文件:
    partition
    可以看到,这个partition有4个LogSegment。

    展示是如何查找Message的。



    比如:要查找绝对offset为7的Message:

    1. 首先是用二分查找确定它是在哪个LogSegment中,自然是在第一个Segment中。
    2. 打开这个Segment的index文件,也是用二分查找找到offset小于或者等于指定offset的索引条目中最大的那个offset。自然offset为6的那个索引是我们要找的,通过索引文件我们知道offset为6的Message在数据文件中的位置为9807。
    3. 打开数据文件,从位置为9807的那个地方开始顺序扫描直到找到offset为7的那条Message。

    这套机制是建立在offset是有序的。索引文件被映射到内存中,所以查找的速度还是很快的。

    一句话,Kafka的Message存储采用了分区(partition),分段(LogSegment)和稀疏索引这几个手段来达到了高效性。


    展开全文
  • flume 读取kafka 数据

    2014-07-18 12:55:11
    flume 读取kafka数据

    本文介绍flume读取kafka数据的方法

    代码:


    /*******************************************************************************
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *  
     * http://www.apache.org/licenses/LICENSE-2.0
     *  
     * Unless required by applicable law or agreed to in writing,
     * software distributed under the License is distributed on an
     * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
     * KIND, either express or implied.  See the License for the
     * specific language governing permissions and limitations
     * under the License.
     *******************************************************************************/
    package org.apache.flume.source.kafka;

    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;

    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.message.Message;

    import kafka.message.MessageAndMetadata;
    import org.apache.flume.*;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.conf.ConfigurationException;
    import org.apache.flume.event.SimpleEvent;
    import org.apache.flume.source.AbstractSource;
    import org.apache.flume.source.SyslogParser;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;


    /**
     * A Source for Kafka which reads messages from kafka. I use this in company production environment
     * and its performance is good. Over 100k messages per second can be read from kafka in one source.<p>
     * <tt>zookeeper.connect: </tt> the zookeeper ip kafka use.<p>
     * <tt>topic: </tt> the topic to read from kafka.<p>
     * <tt>group.id: </tt> the groupid of consumer group.<p>
     */
    public class KafkaSource extends AbstractSource implements Configurable, PollableSource {
        private static final Logger log = LoggerFactory.getLogger(KafkaSource.class);
        private ConsumerConnector consumer;
        private ConsumerIterator<byte[], byte[]> it;
        private String topic;
        
        public Status process() throws EventDeliveryException {
            List<Event> eventList = new ArrayList<Event>();
            MessageAndMetadata<byte[],byte[]> message;
            Event event;
            Map<String, String> headers;
            String strMessage;
            try {
                if(it.hasNext()) {
                    message = it.next();
                    event = new SimpleEvent();
                    headers = new HashMap<String, String>();
                    headers.put("timestamp", String.valueOf(System.currentTimeMillis()));

                    strMessage =  String.valueOf(System.currentTimeMillis()) + "|" + new String(message.message());
                    log.debug("Message: {}", strMessage);

                    event.setBody(strMessage.getBytes());
                    //event.setBody(message.message());
                    event.setHeaders(headers);
                    eventList.add(event);
                }
                getChannelProcessor().processEventBatch(eventList);
                return Status.READY;
            } catch (Exception e) {
                log.error("KafkaSource EXCEPTION, {}", e.getMessage());
                return Status.BACKOFF;
            }
        }

        public void configure(Context context) {
            topic = context.getString("topic");
            if(topic == null) {
                throw new ConfigurationException("Kafka topic must be specified.");
            }
            try {
                this.consumer = KafkaSourceUtil.getConsumer(context);
            } catch (IOException e) {
                log.error("IOException occur, {}", e.getMessage());
            } catch (InterruptedException e) {
                log.error("InterruptedException occur, {}", e.getMessage());
            }
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(topic, new Integer(1));
            Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
            if(consumerMap == null) {
                throw new ConfigurationException("topicCountMap is null");
            }
            List<KafkaStream<byte[], byte[]>> topicList = consumerMap.get(topic);
            if(topicList == null || topicList.isEmpty()) {
                throw new ConfigurationException("topicList is null or empty");
            }
            KafkaStream<byte[], byte[]> stream =  topicList.get(0);
            it = stream.iterator();
        }

        @Override
        public synchronized void stop() {
            consumer.shutdown();
            super.stop();
        }

    }

    /*******************************************************************************
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *  
     * http://www.apache.org/licenses/LICENSE-2.0
     *  
     * Unless required by applicable law or agreed to in writing,
     * software distributed under the License is distributed on an
     * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
     * KIND, either express or implied.  See the License for the
     * specific language governing permissions and limitations
     * under the License.
     *******************************************************************************/
    package org.apache.flume.source.kafka;


    import java.io.IOException;
    import java.util.Map;
    import java.util.Properties;

    import com.google.common.collect.ImmutableMap;
    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.javaapi.consumer.ConsumerConnector;

    import org.apache.flume.Context;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;


    public class KafkaSourceUtil {
        private static final Logger log = LoggerFactory.getLogger(KafkaSourceUtil.class);

        public static Properties getKafkaConfigProperties(Context context) {
            log.info("context={}",context.toString());
            Properties props = new Properties();
            ImmutableMap<String, String> contextMap = context.getParameters();
            for (Map.Entry<String,String> entry : contextMap.entrySet()) {
                String key = entry.getKey();
                if (!key.equals("type") && !key.equals("channel")) {
                    props.setProperty(entry.getKey(), entry.getValue());
                    log.info("key={},value={}", entry.getKey(), entry.getValue());
                }
            }
            return props;
        }
        public static ConsumerConnector getConsumer(Context context) throws IOException, InterruptedException {
            ConsumerConfig consumerConfig = new ConsumerConfig(getKafkaConfigProperties(context));
            ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
            return consumer;
        }
    }



    配置文件:( /etc/flume/conf/flume-kafka-file.properties)

    agent_log.sources = kafka0
    agent_log.channels = ch0
    agent_log.sinks = sink0

    agent_log.sources.kafka0.channels = ch0
    agent_log.sinks.sink0.channel = ch0



    agent_log.sources.kafka0.type = org.apache.flume.source.kafka.KafkaSource
    agent_log.sources.kafka0.zookeeper.connect = node3:2181,node4:2181,node5:2181
    agent_log.sources.kafka0.topic = kkt-test-topic
    agent_log.sources.kafka0.group.id= test

    agent_log.channels.ch0.type = memory
    agent_log.channels.ch0.capacity = 2048
    agent_log.channels.ch0.transactionCapacity = 1000


    agent_log.sinks.sink0.type=file_roll
    agent_log.sinks.sink0.sink.directory=/data/flumeng/data/test
    agent_log.sinks.sink0.sink.rollInterval=300

    启动脚本:

    sudo su  -l -s /bin/bash  flume  -c '/usr/lib/flume/bin/flume-ng agent --conf /etc/flume/conf --conf-file /etc/flume/conf/flume-kafka-file.properties -name agent_log -Dflume.root.logger=INFO,console '


    注意: 红色字体的功能是对原来数据加入时间戳

                版本 flume-1.4.0.2.1.1.0 + kafka2.8.0-0.8.0

                参考资料:https://github.com/baniuyao/flume-kafka

                 编译用到的库:

                flume-ng-configuration-1.4.0.2.1.1.0-385

                flume-ng-core-1.4.0.2.1.1.0-385

                flume-ng-sdk-1.4.0.2.1.1.0-385

                flume-tools-1.4.0.2.1.1.0-385

                guava-11.0.2

                kafka_2.8.0-0.8.0

                log4j-1.2.15

                scala-compiler

                scala-library

                slf4j-api-1.6.1

                slf4j-log4j12-1.6.1

                zkclient-0.3

                zookeeper-3.3.4

                    





    展开全文
  • 一.Kafka的逻辑架构   ... 当一个Topic中消息... 是为了对大量的数据进行分而治之,把数据分区,不同的Consumer可以消费不同分区的数据,不同Consumer对数据的消费可以做成并行的,这样可以加快数据处理的速
  • kafka架构详解图

    2020-03-19 08:28:39
    生产到消费 备份副本1 备份副本2 leader follwer 第二个生产到消费 元数据信息,节点信息等记录到zk中
  • KAFKA目录结构

    2018-08-20 17:26:27
     找到kafka的安装目录 find / -name kafka* -type d &nbsp; [root@DockerHostconfig]# cd /wls/kafka_2.11-0.8.2.1/config [root@DockerHostconfig]# ll total28 -rw-...
  • Kafka存储结构示意图

    2020-04-13 22:21:05
  • 一,组成部分概要 Producer:消息生产者 Consumer:消息消费者 Topic:特指kafka处理的消息源的不同分类 Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。...
  • 但订阅数据库过于死板,当源表数据结构发生变化时管道就要重新写,不够灵活。这一篇我们来看下数据通过kafka同步到mysql是如何配置的。 kafka origin的安装就不介绍了,直接在Package Manager里点击安装就可以了。...
  • 总结一下用golang 写的服务中接入kafka消息队列的关键和有用链接: golang 有两个主流接入kafka的lib, sarama和confluent-kafka-go, 在编译运行时均需要用到gcc。构建镜像需要加入apk add build-base 关于consumer,...
  • Kafka数据存储

    2017-06-27 15:17:08
    kafka数据的存储方式;kafka如何通过offset查找message。 1.前言 写介绍kafka的几个重要概念(可以参考之前的博文Kafka的简单介绍): Broker:消息中间件处理结点,一个Kafka节点就是一个broker,多个broker可以...
  • kafka查找数据的过程

    2019-12-30 22:56:49
    一.kafka查找数据: kafka里面一个topic有多个partition组成,每一个partition里面有多个segment组成,每个segment都由两部分组成,分别是.log文件....index文件:索引文件,使用索引文件,加快kafka数据的查找速...
  • kafka的offset值一直不变,可以往里面写数据 会是什么原因呢 手动改变offset的值是可以消费数据
  • 通过本模块的学习,能够掌握Kafka的负载均衡、Producer生产数据Kafka文件存储机制、Kafka自定义partition 课程大纲: 1、 Kafka整体结构图 2、 Consumer与topic关系 3、 Kafka Producer消息分发
  • 本系列文章为对《Kafka:The Definitive Guide》的学习整理,希望能够帮助到大家 应用从Kafka中读取数据需要使用KafkaConsumer订阅主题,然后接收这些主题的消息。在我们深入这些API之前,先来看下几个比较重要的...
  • kafka存储结构

    2018-04-23 18:32:10
    topic 主题,特指kafka处理的消息源的不同分类 partition topic物理上的分组,一个topic可以分为多个partition message 消息,是通信的基本单位 存储结构 topic结构 topic包含多个partition, topic只是...
  • 简介:目前项目中已有多个渠道到Kafka数据处理,本文主要记录通过Spark Streaming 读取Kafka中的数据,写入到Elasticsearch,达到一个实时(严格来说,是近实时,刷新时间间隔可以自定义)数据刷新的效果。...
1 2 3 4 5 ... 20
收藏数 42,791
精华内容 17,116
关键字:

kafka的数据结构