精华内容
下载资源
问答
  • kafka 暂停消费

    千次阅读 2018-10-16 15:48:00
    kafkaListener 需要指定id,例如这里是:full-part-id。 @KafkaListener(topics = "part-full-topic", id = "full-part-id", containerGroup = "full-part-group") public void ...

    1、代码实现

    kafkaListener

    需要指定id,例如这里是:full-part-id。

    @KafkaListener(topics = "part-full-topic", id = "full-part-id", containerGroup = "full-part-group")
    public void listenFullPart(ConsumerRecord<String, String> record) {
        Optional<String> recordOptional = Optional.fromNullable(record.value());
        if (recordOptional.isPresent()) {
            List<PartStockInfoVo> partStockInfoVos = JSONObject.parseArray(recordOptional.get(), PartStockInfoVo.class);
            esPartInfoClient.updateFullIndex(partStockInfoVos);
        }
    }

    消费开关

    @RestController
    public class KafkaManageController {
    
        @Autowired
        private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
    
        @RequestMapping("/stop")
        public void stop() {
            MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("full-part-id");
            listenerContainer.stop();
        }
    
        @RequestMapping("/start")
        public void start() {
            MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("full-part-id");
            listenerContainer.start();
        }
    }

     

     

    参考:

    1、How can i stop consumers from consuming?

     

    展开全文
  • 在有些场景可以需要暂停某些分区消费,达到一定条件再恢复对这些分区的消费,可以使用pause()方法暂停消费,resume()方法恢复消费,示例代码如下: package com.bonc.rdpe.kafka110.consumer; import java.text....

    在有些场景可以需要暂停某些分区消费,达到一定条件再恢复对这些分区的消费,可以使用pause()方法暂停消费,resume()方法恢复消费,示例代码如下:

    package com.bonc.rdpe.kafka110.consumer;
     
    import java.text.DateFormat;
    import java.text.SimpleDateFormat;
    import java.util.Arrays;
    import java.util.Collections;
    import java.util.Properties;
     
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.TopicPartition;
     
    /**
     * @author YangYunhe
     * @date 2018-07-16 15:13:11
     * @description: 消费速度控制
     */
    public class PauseAndResumeConsumer {
        
        private static final DateFormat df = new SimpleDateFormat("HH");
        
        public static String getTimeRange() {
            long now = System.currentTimeMillis();
            String hourStr = df.format(now);
            int hour;
            if(hourStr.charAt(0) == '0') {
                hour = Integer.parseInt(hourStr.substring(1, 1));
            }else {
                hour = Integer.parseInt(hourStr);
            }
            if(hour >= 0 && hour < 8) {
                return "00:00-08:00";
            }else if(hour >= 8 && hour < 16) {
                return "08:00-16:00";
            }else {
                return "16:00-00:00";
            }
        }
        
        public static void main(String[] args) throws Exception {
     
            Properties props = new Properties();
            props.put("bootstrap.servers", "rdpecore4:9092,rdpecore5:9092,rdpecore6:9092");
            props.put("group.id", "dev3-yangyunhe-topic001-group003");
            props.put("auto.offset.reset", "earliest");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            
            TopicPartition partition0 = new TopicPartition("dev3-yangyunhe-topic001", 0);
            TopicPartition partition1 = new TopicPartition("dev3-yangyunhe-topic001", 1);
            TopicPartition partition2 = new TopicPartition("dev3-yangyunhe-topic001", 2);
            
            consumer.assign(Arrays.asList(new TopicPartition[]{partition0, partition1, partition2}));
            
            try {
                while (true) {
                    // 00:00-08:00从partition0读取数据
                    if(getTimeRange() == "00:00-08:00") {
                        consumer.pause(Arrays.asList(new TopicPartition[]{partition1, partition2}));
                        consumer.resume(Collections.singletonList(partition0));
                    // 08:00-16:00从partition1读取数据
                    }else if(getTimeRange() == "08:00-16:00") {
                        consumer.pause(Arrays.asList(new TopicPartition[]{partition0, partition2}));
                        consumer.resume(Collections.singletonList(partition1));
                    // 16:00-00:00从partition2读取数据
                    }else {
                        consumer.pause(Arrays.asList(new TopicPartition[]{partition0, partition1}));
                        consumer.resume(Collections.singletonList(partition2));
                    }
                    
                    ConsumerRecords<String, String> records = consumer.poll(1000);
                
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.println("topic = " + record.topic() + ", partition = " + record.partition());
                        System.out.println("offset = " + record.offset());
                    }
                }
            } finally {
                consumer.close();
            }
        }
     
    }
     
    结果:(我运行程序的时间是18:27,所以只会消费partition2中的消息)
    topic = dev3-yangyunhe-topic001, partition = 2
    offset = 0
    topic = dev3-yangyunhe-topic001, partition = 2
    offset = 1
    topic = dev3-yangyunhe-topic001, partition = 2
    offset = 2
    ......
    • 说明:如果需要暂停或者恢复某分区的消费,consumer 订阅 topic 的方式必须是 Assign
    展开全文
  • 原生Kafka可以很容易的控制消费者线程暂停与恢复,但是Spring-Kafka中我一直不太清楚如何暂停恢复,直到我在工作中遇到了适当的业务场景,于是我在百度上搜索了一下,再结合看源码,总结出了如下3个步骤: ...

    原生Kafka可以很容易的控制消费者线程暂停与恢复,但是Spring-Kafka中我一直不太清楚如何暂停恢复,直到我在工作中遇到了适当的业务场景,于是我在百度上搜索了一下,再结合看源码,总结出了如下3个步骤:

    1: 自动注入KafkaListenerEndpointRegistry

        @Autowired
        private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
    

    2: 在KafkaListener上指定id和groupId。其中groupId是为了统一管理,如果不指定的话,该Listener会将id当作groupId

    @KafkaListener(id = KAFKA_LISTENER_ID, groupId = "${consumer.group_id}", topics = {"${consumer.topic}"})
    

    3: 根据id获取Listener的实例,然后控制暂停和恢复

    MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(KAFKA_LISTENER_ID);
    listenerContainer.pause(); 	// 暂停消费
    listenerContainer.resume(); // 恢复消费
    
    展开全文
  • kafka

    2019-04-19 08:45:42
    kafka学习Kafka简介模型概念理解消费者组Kafka 的使用场景:Kafka 集群部署集群规划:Zookeeper 集群准备安装 Kafka启动 Kafka 集群测试Kafka 数据一致性之 ISR 机制简介关于消息同步API生产者消费者Kafka 数据丢失...

    Kafka简介

    Kafka 是一个高吞吐量、低延迟分布式的消息队列系统。

    模型

    kafka 的提供了一个生产者、缓冲区、消费者的模型。

    • Broker: kafka 集群有多个 Broker 服务器组成,用于存储数据(消息)
    • topic: 不同的数据(消息)被分为不同的 topic(主题)。
    • producer:消息生产者,往 broker 中某个 topic 里面生产数据
    • consumer:消息的消费者,从 broker 中某个 topic 获取数据

    概念理解

    • kafka 将所有消息组织成多个 topic 的形式存储,而每个 topic又可以拆分成多个 partition(分区),每个 partition 又由一个一个消息组成。每个消息都被标识了一个递增序列号代表其进来的先后顺序,并按顺序存储在 partition 中。如下图:
      在这里插入图片描述
    • Producer 选择一个 topic,生产消息, 消息会通过分配策略将消息追加到该 topic 下的某个 partition 分区末尾(队列)。
    • Consumer 选择一个 topic,通过 id 指定从哪个位置开始消费消息。消费完成之后保留 id,下次可以从这个位置开始继续消费,也可以从其他任意位置开始消费。
      这个 ID,在 Kafka 里我们称之为 offset(偏移量), 它能够唯一地标识该分区中的每个记录。Kafka 集群保留所有 prodeucer 生产的消息记录,不管这个消息记录有没有被消费过,即这条消息即使被消费过,它依然会保存在 Kafka 里。
    • Kafka 什么时候会对这些消息进行删除呢?默认消息的生命周期是 7*24 小时,时间一到,Kafka 就会从磁盘层面讲数据删除。我们可以通过配置,将数据保存时间进行调整。
      在这里插入图片描述
      实际上,每个消费者唯一保存的元数据信息就是消费者当前消费日志的位移位置。位移位置是由消费者控制,即消费者可以通过修改偏移量读取任何位置的数据。
    • 每个 consumer 都保留自己的 offset,互相之间不干扰,不存在线程安全问题,为并发消费提供了线程安全的保证。
    • 每个 topic 中的消息被组织成多个 partition,partition 均匀分配到集群 server 中。生产、消费消息的时候,会被路由到指定 partition,减少单台服务器的压力,增加了程序的并行能力。
    • 每个 topic 中保留的消息可能非常庞大,通过 partition 将消息切分成多个子消息,并通过负责均衡策略将 partition 分配到不同 server。这样当机器负载满的时候,通过扩容可以将消息重新均匀分配
    • 消息消费完成之后不会删除,可以通过重置 offset 重新消费。
    • 灵活的持久化策略。可以通过指定时间段(默认 7 天)来保存消息,节省 broker 存储空间。
    • 消息以 partition 分区为单位分配到多个 server,并以 partition 为单位进行备份。备份策略为:1 个 leader 和 N 个 followers,leader接受读写请求,followers 被动复制 leader。leader 和 followers 会在集群中打散,保证 partition 高可用

    消费者组

    在这里插入图片描述
    每个 consumer 将自己标记 consumer group 名称,之后系统会将 consumer group 按名称分组,将消息复制并分发给所有分组,每个分组只有一个 consumer 能消费这条消息。
    两个极端情况:

    • 当所有 consumer 的 consumer group 相同时,即消费者都在一个组里面,此时系统变成队列模式
    • 当每个 consumer 的 consumer group 都不相同时,即一个组里只有一个消费者时,系统变成发布订阅

    Kafka 的使用场景:

    • 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过 kafka 以统一接口服务的方式开放给各种 consumer,例如hadoop、Hbase、Solr 等。
    • 消息系统:解耦和生产者和消费者、缓存消息等。
    • 用户活动跟踪:Kafka 经常被用来记录 web 用户或者 app 用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到 kafka 的 topic 中,然后订阅者通过订阅这些topic 来做实时的监控分析,或者装载到 hadoop、数据仓库中做离线分析和挖掘。
    • 运营指标:Kafka 也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
    • 流式处理:比如 spark streaming 和 storm

    Kafka 集群部署

    集群规划:

    • Zookeeper 集群共三台服务器,分别为:node01、node02、node03。
    • Kafka 集群共三台服务器,分别为:node01、node02、node03。

    Zookeeper 集群准备

    kafka 是一个分布式消息队列,需要依赖 ZooKeeper,请先安装好 zk集群。

    安装 Kafka

    下载压缩包(官网地址:http://kafka.apache.org/downloads.html)

    1. 解压:
    2. 修改配置文件 config/server.properties
    • broker.id: broker 集群中唯一标识 id,0、1、2、3 依次增长(broker即 Kafka 集群中的一台服务器)
    • zookeeper.connect: zookeeper 集群地址列表
    1. 同步到其他机器上
      将当前node1服务器上的Kafka目录复制到到其他node02、node03服务器上:

    启动 Kafka 集群

    1. 启动 Zookeeper 集群。
    2. 启动 Kafka 集群。
      分别在三台服务器上执行以下命令启动:
      bin/kafka-server-start.sh config/server.properties

    测试

    创建话题(kafka-topics.sh --help 查看帮助手册)

    1. 创建 topic:
      bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181–create --replication-factor 2 --partitions 3 --topic test
      参数说明:
    • –replication-factor:指定每个分区的复制因子个数,默认 1 个
    • –partitions:指定当前创建的 kafka 分区数量,默认为 1 个
    • –topic:指定新建 topic 的名称
    1. 查看 topic 列表:
      bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181–list
    2. 创建生产者:
    • bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test
    • 创建生产者后往”test” topic 中 生产数据了.
    • 输入 hello 后,按回车键,就会发送数据到”test” topic 中了.
    • 依次操作,就可以将 xixi , huhu 也发送出去了.
    1. 创建消费者:
    • bin/kafka-console-consumer.sh --zookeeper node01:2181,node02:2181,node03:2181 --from-beginning --topic test
    • 再打开个控制台界面,创建一个消费者去消费”test” topic 里的数据,就能够将上面生产者生产的 hello,xixi,huhu,等数据进行读取消费。
    1. 查看“test”topic 描述:
      bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181
      –describe --topic test
      查看”test” topic 的相关信息,如图所示:
      在这里插入图片描述
    • Topic:test
      Topic 的名称 test
    • PartitionCount:3
      该 topic 的分区数有 3 个
    • ReplicationFactor:2
      每个分区的副本数位为 2,这里指的是每个分区总共有 2 份数据。
    • Partition: 0
      代表 0 号分区
    • Learder: 0
      代表 0 号分区的 2 份数据中,在 brokerId=0 的机器上的那份数据是主(leader)
    • Replicas: 0,1
      0 号分区的两份副本数据在 brokerID=0 和 brokerID=1 的这两台机器上
    • Isr:0 1
      BrokerId 为 0 和 1 的那两份数据(一个主,一个从),基本保持同步,ISR(in-sync Replica)

    Kafka 数据一致性之 ISR 机制

    简介

    Kafka中topic里的每个Partition分区有一个leader与多个follower,producer 往某个 Partition 中写入数据是,只会往 leader 中写入数据,然后数据才会被复制进其他的 Replica 中。
    那么数据的同步是由 leader push(推送)过去还是有 flower pull(拉取)过来?Kafka 是由 follower 到 leader 上拉取数据的方式进行同步的。
    所以 Kafka 上的副本机制是,写都往 leader 上写,读也只在 leader上读,flower 只是数据的一个备份,保证 leader 被挂掉后顶上来,并不往外提供服务。

    关于消息同步

    1. 关于复制,在分布式架构中分为两种:
    • 同步复制: 只有所有的 follower 把数据拿过去后才 commit,一致性好,可用性不高。
    • 异步复制: 只要 leader 拿到数据立即 commit,等 follower 慢慢去复制,可用性高,立即返回,一致性差一些。
    1. kafka 不是完全同步,也不是完全异步,是一种 ISR 机制:
    • leader 会维护一个与其基本保持同步的 Replica 列表,该列表称为ISR(in-sync Replica),每个 Partition 都会有一个 ISR,而且是由 leader动态维护
    • 如果一个 flower 比一个 leader 落后太多,或者超过一定时间未发起数据复制请求,则 leader 将其重 ISR 中移除
      两个相关参数:
    replica.lag.time.max.ms=10000
    如果 leader 发现 follower 超过 10 秒没有向它发起 fech 请求,那么 leader 就把它从 ISR 中移除。
    rerplica.lag.max.messages=4000
    #follower 与 leader 相差 4000 条数据,就将副本从 ISR 中 移除
    

    注意:当 follower 同时满足这两个条件后,leader 又会将它加入 ISR中,所以 ISR 是处于一个动态调整的情况
    ISR 里的 replicas 有什么用?
    当partion的leader挂掉,则会优先从ISR列表里的挑选一个follower选举成新leader,同时将旧 leader 移除出 ISR 列表。

    API

    生产者

    public class MyProducer extends Thread {
     private String topic; //发送给 Kafka 的数据,topic
     private Producer<String, String> producerForKafka;
    public MyProducer(String topic) {
     this.topic = topic;
     Properties conf = new Properties();
    conf.put("metadata.broker.list","node01:9092,node02:9092,node03:9092");
     conf.put("serializer.class", StringEncoder.class.getName());
     /**
     * ack=0 生产者不会等待来自任何服务器的响应,直接发送新数据
     * * ack=1 leader 收到数据后,给生产者返回响应消息,生产者再继续发送新的
    数据
     * ack=all 生产者发送一条数据后,leader 会等待所有 isr 列表里的服务器同
    步好数据后,才返回响应。
     *
     * ack=0.吞吐量高,但是消息存在丢失风险。
     * ack=1.数据的安全性和性能 都有一定保障
     * ack=all 安全性最高,但性能最差
     */
     conf.put("acks",1);
     //缓存数据,批量发送,当需要发送到同一个 partition 中的数据大小达到 15KB
    时,将数据发送出去
     conf.put("batch.size", 16384);
     producerForKafka = new Producer<>(new ProducerConfig(conf));
     }
     @Override
     public void run() {
     int counter = 0;
     while (true) {
     counter++;
     String value = "shsxt" + counter;
     String key = counter + "";
     /**
     *hash partitioner 当有 key 时,则默认通过 key 取 hash 后 ,对
    partition_number 取余数
     */
    
     KeyedMessage<String, String> message = new KeyedMessage<>(topic,
    key,value);
     producerForKafka.send(message);
     System.out.println(value + " - -- -- --- -- - -- - -");
     //每 2 条数据暂停 1 秒
     if (0 == counter % 2) {
     try {
     Thread.sleep(1000);
     } catch (InterruptedException e) {
     e.printStackTrace();
     }
     }
     }
     }
     public static void main(String[] args) {
     new MyProducer("test").start();
     }
    }
    

    消费者

    Kafka 消费者 API 分为两种

    public class MyConsumer extends Thread {
     private final ConsumerConnector consumer;
     private final String topic;
     public MyConsumer(String topic) {
     consumer = Consumer
     .createJavaConsumerConnector(createConsumerConfig());
     this.topic = topic;
     }
     private static ConsumerConfig createConsumerConfig() {
     Properties props = new Properties();
     //ZK 地址
     props.put("zookeeper.connect", "node01:2181,node02:2181,node03:2181");
     //消费者所在组的名称
     props.put("group.id", "shsxt4");
     //ZK 超时时间
     props.put("zookeeper.session.timeout.ms", "400");
     //消费者自动提交偏移量的时间间隔
     props.put("auto.commit.interval.ms", "1000");
     //当消费者第一次消费时,从最低的偏移量开始消费
     props.put("auto.offset.reset","smallest");
     //自动提交偏移量
     props.put("auto.commit.enable","true");
     return new ConsumerConfig(props);
     }
     public void run() {
     Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
     topicCountMap.put(topic, 1); // 描述读取哪个 topic,需要几个线程读
     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
     .createMessageStreams(topicCountMap);
     // 每个线程对应于一个 KafkaStream
     List<KafkaStream<byte[], byte[]>> list = consumerMap.get(topic);
     // 获取 kafkastream 流
     KafkaStream stream = list.get(0);
     ConsumerIterator<byte[], byte[]> it = stream.iterator();
     System.out.println("start................");
     while (it.hasNext()){
     // 获取一条消息
     String data = new String(it.next().message());
    
     System.err.println(data);
     }
    
     }
     public static void main(String[] args) {
     MyConsumer consumerThread = new MyConsumer("test");
     consumerThread.start();
     }
    }
    

    注意:

    • topic 下的一个 partition 分区,只能被同一组下的一个消费者消费。
    • 要想保证消费者从 topic 中消费的数据是有序的,则应当将 topic的分区设置为 1 个 partition

    Kafka 数据丢失和重复消费问题

    数据丢失

    1. producer 端导致数据丢失
      丢失原因:
    • 原因 1:producer 在发送数据给 kafka 时,kafka 一开始的数据是存储在服务器的 PageCache(内存)上的,定期 flush 到磁盘上的,如果忽然断电则数据会造成丢失。
    • 原因 2:在使用 kafka 的备份机制,producer 发数据给 topic 的分区时,可以对 partition 分区做备份。但是这种也得注意,因为当 producer 的 ack 设置为 0 或 1,最多只能保证 leader 有数据。若有一条 producer 发送的数据 leader 刚接收完毕,此时leader 挂掉,那么 partition 的 replicas 副本还未来得及同步,就会造成数据丢失。
      解决方案:
    • 解决方案 1:我们可以提高 flush 的频率来减少数据丢失量。但是这种并不会保证数据一定不丢失,官方也不建议我们这样弄,官方建议通过备份机制来解决数据丢失问题。
      相关参数:
    log.flush.interval.messages
    当缓存中有多少条数据时,触发溢写
    log.flush.interval.ms
    每隔多久时间,触发溢写
    
    • 解决方案 2:针对于备份机制而导致的数据丢失,要想数据不丢失,就要将 ack 设置为 all ,即所有的备份分区也同步到这条数据了,才发第二条数据,但是这样就降低了我们的性能。所以在实际工作中,往往得结合业务来平衡数据的一致性和系统的性能。
    1. consumer 端导致数据丢失
    • 丢失原因:在使用 kafka 的高级 API 时,消费者会自动每隔一段时间将 offset 保存到 zookeeper 上,此时如果刚好将偏移量提交到zookeeper 上后,但这条数据还没消费完,机器发生宕机,此时数据就丢失
    • 解决方案:关闭偏移量自动提交,改成手动提交,每次数据处理完后,再提交。

    数据重复消费

    • 产生原因:在消费者自动提交 offset 到 zookeeper 后,程序又消费了几条数据,但是还没有到下次提交 offset 到 zookeeper 之时,如果机器宕机了。再下一次机器重启的时候,消费者会先去读zookeeper 上的偏移量进行消费,这就会导致数据重复消费。
    • 解决方案:关闭自动提交,改成手动提交。

    Kafka 高吞吐的本质

    Kafka 是高吞吐低延迟的高并发、高性能的消息中间件,配置良好的 Kafka 集群甚至可以做到每秒几十万、上百万的超高并发写入。那么 Kafka 到底是如何做到这么高的吞吐量和性能的呢?

    页缓存技术 + 磁盘顺序写

    首先 Kafka 每次接收到数据都会往磁盘上去写,如下图所示。
    在这里插入图片描述
    如果把数据基于磁盘来存储,频繁的往磁盘文件里写数据,这个性能会不会很差?答案是肯定的。
    实际上 Kafka 在这里有极为优秀和出色的设计,就是为了保证数据写入性能,首先 Kafka 是基于操作系统的页缓存来实现文件写入的。

    • 操作系统本身有一层缓存,叫做 page cache,是在内存里的缓存,我们也可以称之为 os cache,意思就是操作系统自己管理的缓存。
    • 在写入磁盘文件的时候,可以直接写入这个 os cache 里,也就是仅仅写入内存中,接下来由操作系统自己决定什么时候把os cache 里的数据真的刷入磁盘文件中。
      仅仅这一个步骤,就可以将磁盘文件写性能提升很多了,因为其实这里相当于是在写内存,不是在写磁盘,如下图:
      在这里插入图片描述
      接着另外一个就是 kafka 写数据的时候,非常关键的一点,他是以磁盘顺序写的方式来写的。也就是说,仅仅将数据追加到文件的末尾,不是在文件的随机位置来修改数据。
      普通的机械磁盘如果要是随机写的话,确实性能极差,也就是随便找到文件的某个位置来写数据。但是如果你是追加文件末尾按照顺序的方式来写数据的话,那么这种磁盘顺序写的性能会比随机写快上几百倍。
      Kafka 在写数据的时候,一方面基于了 os 层面的 page cache 来写数据,所以性能很高,本质就是在写内存罢了。
      另外一个,他是采用磁盘顺序写的方式,所以即使数据刷入磁盘的时候,性能也是极高的,也跟写内存是差不多的。
      基于顺序写和 page cache 两点,kafka 就实现了写入数据的超高性能。

    零拷贝技术

    从 Kafka 里我们经常要消费数据,那么消费的时候实际上就是要从 kafka 的磁盘文件里读取某条数据然后发送给下游的消费者
    那么这里如果频繁的从磁盘读数据然后发给消费者,性能瓶颈在哪里呢?
    假设要是 kafka 什么优化都不做,就是很简单的从磁盘读数据发送给下游的消费者,那么大概过程如下所示:
    先看看要读的数据在不在 os cache 里,如果不在的话就从磁盘文件里读取数据。在从磁盘读取数据,并且返回给客户端消费者,经历以下四个阶段:

    • OS 从硬盘把数据读到内核区的 PageCache。
    • 用户进程把数据从内核区 Copy 到用户区的内存里。
    • 然后用户进程再把数据写入到 Socket,数据流入内核区的Socket Buffer 上。
    • OS 再把数据从 Socket Buffer 中 Copy 到网卡的 Buffer 上,最后发送给客户端消费者。
      整个过程,如下图:
      在这里插入图片描述
      从上图里很明显可以看到第 5和第 6步骤的两次拷贝是没必要的!一次是从操作系统的 cache 里拷贝到应用进程的缓存里,接着又从应用程序缓存里拷贝回操作系统的 Socket 缓存里。
      而且为了进行这两次拷贝,中间还发生了好几次上下文切换,一会儿是应用程序在执行,一会儿上下文切换到操作系统来执行。所以这种方式来读取数据是比较消耗性能的。
      Kafka 为了解决这个问题,在读数据的时候是引入零拷贝技术。也就是说,直接让操作系统的 cache 中的数据发送到网卡后传输给下游的消费者,中间跳过了两次拷贝数据的步骤,Socket 缓存中仅仅会拷贝一个描述符过去,不会拷贝数据到 Socket 缓存。
      零拷贝技术的过程,如下图:
      在这里插入图片描述
      通过零拷贝技术,就不需要把 os cache 里的数据拷贝到应用缓存,再从应用缓存拷贝到 Socket 缓存了,两次拷贝都省略了,所以叫做零拷贝。
      对 Socket 缓存仅仅就是拷贝数据的描述符过去,然后数据就直接从 os cache 中发送到网卡上去了,这个过程大大的提升了数据消费时读取文件数据的性能。
      在从磁盘读数据的时候,会先看看 os cache 内存中是否有,如果有的话,其实读数据都是直接读内存的。
      如果 kafka 集群经过良好的调优,我们会发现大量的数据都是直接写入 os cache 中,然后读数据的时候也是从 os cache 中读。
      相当于是 Kafka 完全基于内存提供数据的写和读了,所以这个整体性能会极其的高。

    Kafka 消息的持久化

    Kafka topic 的数据存储在磁盘的时候,默认存储在/tmp/kafka-logs目录下,这个目录可以自己设置。同时在该目录下,又会按 topic 的每个 partition 分区来存储,一个分区一个目录,一个 partition 目录下面又会有多个 segment 文件。
    在这里插入图片描述
    在这里插入图片描述
    如上图可以看到,test7-0 目录下(”test7” topic 的 0号分区)有.index文件和.log 文件。

    • index 文件为索引文件,命名规则为从 0 开始到,后续的由上一个文件的最大的 offset 偏移量来开头
    • log 文件为数据文件,存放具体消息数据
      kafka 从磁盘上查找数据时,会先根据 offset 偏移量,对 index文件名字进行扫描,通过用二分法的查找方式,可以快速定位到此 offset 所在的索引文件,然后通过索引文件里的索引,去对应的 log 文件种查找数据。
      比如:我要查找 offset=30 的数据,从上图中可以知道有 0,29,58开头的 index 文件,说明 offset=30 的索引数据落在 000029.index文件中。
      相关参数:
    1. Broker 全局参数:
    • message.max.bytes (默认:1000000) – broker 能接收消息的最大字节数,这个值应该比消费端的 fetch.message.max.bytes 更小才对,否则 broker 就会因为消费端无法使用这个消息而挂起。
    • log.segment.bytes (默认: 1GB) – segment 数据文件的大小,当 segment 文件大于此值时,会创建新文件,要确保这个数值大于一个消息的长度。一般说来使用默认值即可(一般一个消息很难大于 1G,因为这是一个消息系统,而不是文件系统)。
    • log.roll.hours (默认:7 天) - 当 segment 文件 7 天时间里都没达到log.segment.bytes 大小,也会产生一个新文件
    • replica.fetch.max.bytes (默认: 1MB) – broker 可复制的消息的最大字节数。这个值应该比 message.max.bytes 大,否则 broker会接收此消息,但无法将此消息复制出去,从而造成数据丢失。
    1. Consumer 端参数:
    • fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于 message.max.bytes。
    bin/kafka-topics.sh --zookeeper node01:2181 
    --create --replication-factor 1 --partitions 1 --topic test7 --config segment.bytes=1024
    

    由于 segment 默认 1GB 才能产生新文件,老师这里创建一个新 topic,将 segment 的大小改成 1KB,为了演示效果,企业里面这个参数可以不去动。
    注意:log.segment.bytes 这是一个全局参数,即所有的 topic都是这个配置值,老师这里只是要改变一个 topic 的参数值,所以用 segment.bytes 参数,这个参数是 topic 级别的参数。

    Flume & Kafka

    Flume 安装

    之前文章已有,这里省略

    Flume + Kafka

    1. 启动 Kafka 集群。
      bin/kafka-server-start.sh config/server.properties
    2. 配置 Flume 集群,并启动 Flume 集群。
      bin/flume-ng agent -n a1 -c conf -f conf/fk.conf
      -Dflume.root.logger=DEBUG,console
      其中,Flume 配置文件 fk.conf 内容如下:
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    #Describe/configure the source
    a1.sources.r1.type = avro
    a1.sources.r1.bind = node1
    a1.sources.r1.port = 41414
    
    #Describe the sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.topic = testflume
    a1.sinks.k1.brokerList = node1:9092,node2:9092,node3:9092
    a1.sinks.k1.requiredAcks = 1
    a1.sinks.k1.batchSize = 20
    a1.sinks.k1.channel = c1
    
    #Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000000
    a1.channels.c1.transactionCapacity = 10000
    
    #Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

    测试

    • 分别启动 Zookeeper、Kafka、Flume 集群。
    • 创建 topic:
      bin/kafka-topics.sh --zookeeper
      node01:2181,node02:2181,node03:2181 --create --replication-factor 2 --partitions 3 --topic testflume
    • 启动消费者:
      bin/kafka-console-consumer.sh --zookeeper node01:2181,node02:2181 --from-beginning --topic testflume
    • 运行“RpcClientDemo”代码,通过 rpc 请求发送数据到 Flume 集群。
      Flume 中 source 类型为 AVRO 类型,此时通过 Java 发送 rpc 请求,测试数据是否传入 Kafka。
      其中,Java 发送 Rpc 请求 Flume 代码示例如下:(参考 Flume 官方文档:http://flume.apache.org/FlumeDeveloperGuide.html)
    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.api.RpcClient;
    import org.apache.flume.api.RpcClientFactory;
    import org.apache.flume.event.EventBuilder;
    import java.nio.charset.Charset;
    /**
    * Flume官网案例
    * http://flume.apache.org/FlumeDeveloperGuide.html
    * @author root
    */
    public class RpcClientDemo {
    public static void main(String[] args) {
    MyRpcClientFacade client = new MyRpcClientFacade();
    // Initialize client with the remote Flume agent's host and port
    client.init("node1", 41414);
    // Send 10 events to the remote Flume agent. That agent should
    be
    // configured to listen with an AvroSource.
    String sampleData = "Hello Flume!";
    for (int i = 0; i < 10; i++) {
    client.sendDataToFlume(sampleData);
    System.out.println("发送数据:" + sampleData);
    }
    client.cleanUp();
    }
    }
    class MyRpcClientFacade {
    private RpcClient client;
    private String hostname;
    private int port;
    public void init(String hostname, int port) {
    // Setup the RPC connection
    this.hostname = hostname;
    this.port = port;
    this.client = RpcClientFactory.getDefaultInstance(hostname, port);
    // Use the following method to create a thrift client (insteadof the
    // above line):
    // this.client = RpcClientFactory.getThriftInstance(hostname,port);
    }
    public void sendDataToFlume(String data) {
    // Create a Flume Event object that encapsulates the sample data
    Event event = EventBuilder.withBody(data,Charset.forName("UTF-8"));
    // Send the event
    try {
    client.append(event);
    } catch (EventDeliveryException e) {
    // clean up and recreate the client
    client.close();
    client = null;
    client = RpcClientFactory.getDefaultInstance(hostname,port);
    // Use the following method to create a thrift client (insteadof
    // the above line):
    // this.client =
    RpcClientFactory.getThriftInstance(hostname, port);
    }
    }
    public void cleanUp() {
    // Close the RPC connection
    client.close();
    }
    }
    
    展开全文
  • Kafka

    2019-06-02 21:34:07
    Kafka 是一个分布式,支持分区的(partition),多副本的(replica),基于zookeeper协调的分布式消息系统 是一个分布式消息队列,生产者,消费者的功能 对消息保存时根据Topic进行分类,发送消息的为Producer,消息接受者为...
  • kafka.bootstrap-servers") private String bootstrapServers; @Value("kafka.consumerser.group-id") private String consumerGroup; @Bean public Map<String,Object> ...
  • 2.1.3之前的版本,可以在ConsumerAwareMessageListener中暂停一个消费者,通过监听ListenerContainerIdleEvent来恢复该消费者。这种方式存在安全问题,不能保证事件监听器由消费者线程调用。为了安全的暂停、恢复...
  • kafka原理

    2021-06-13 10:59:26
    本文主要内容一、kafka架构二、kafka 特性实现原理2.1、kafka 高可靠性2.2 kafka 高吞吐2.3 kafka 水平扩展三、kafka数据处理原理3.1、消息生产者3.2、消息处理及持久化3.3、消息消费者 一、kafka架构 二、kafka ...
  • (1) Kafka 新版消费者基于时间戳索引消费消息 kafka 在 0.10.1.1 版本增加了时间索引文件,因此我们可以根据时间戳来访问消息。 如以下需求:从半个小时之前的offset处开始消费消息,代码示例如下: package ...
  • Kafka 应用

    2021-01-31 21:25:37
    Kafka 中提供了许多命令行工具(位于$KAFKA_HOME/bin 目录下)用于管理集群的变更。 脚本名称 释义 kafka-configs.sh 用于配置管理,在4.1.5节和4.3.3节中有所提及 kafka-console-consumer.sh 用于消费...
  • kafka学习

    千次阅读 2020-09-20 15:34:01
    第1章 初识Kafka Kafka最初是由LinkedIn公司采用Scala语言开发的一个多分区、多副本并且基于ZooKeeper协调的分布 式消息系统,现在已经捐献给了Apache基金会。目前Kafka已经定位为一个分布式流式处理平台,它以 高...
  • kafka使用

    2020-06-14 19:56:01
    Kafka简介 什么是 Kafka Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统。 Kafka 的基本术语...
  • Kafka学习

    千次阅读 2018-11-07 17:03:55
    Kafka学习 0x01 摘要 本文是一篇Kafka学习的综述。 0x02 Kafka是什么 2.1 基本概念 2.2 可用性和持久性保证 Kafka生产者中有一个很重要的配置项: request.required.acks 不同的值决定生产者发送消息后不同...
  • Kafka使用

    2021-06-11 09:44:33
    1、kafka基本操作命令,包括linux和windows下 2、在spring项目中创建DefaultKafkaConsumerFactory
  • kafka运维

    2017-12-15 11:04:07
    Kafka 运维手册                 编制 架构设计部 伍超 审核   批准     1.编写说明 以下是LinkedIn在生产中使用kafka集群的一些经验和信息,基于kafka0.9.0版本官方文档翻译。...
  • Kafka深度解析

    2019-07-24 16:56:07
    Kafka简介  Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下: 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能 高吞吐率。即使在非常廉价的商用...
  • kafka杂记

    2020-11-16 16:36:24
    1、一个topic 可以拆分为多个partition...2、kafka broker集群内broker之间replica机制(消息同步机制,实现HA)是基于partition,而不是topic;kafka将每个partition数据复制到多个server上(也就是在多个broker上做一个该
  • 接上一章【RabbitMQ VS Apache Kafka (一)】,本章我们讨论KafkaKafka Kafka is a distributed, replicated commit log.Kafka本身没有队列的概念,作为一个消息中间件,乍看起来,这略显奇怪,这可能与...
  • kafka 入门

    千次阅读 2020-10-28 11:35:44
    初识 Kafka 什么是 Kafka Kafka 是由Linkedin公司开发的,它是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统。 Kafka 的基本术语 ...
  • Kafka Consumer

    千次阅读 2019-02-14 22:49:19
    客户端从kafka集群中消费数据,同时对于kafka&amp;nbsp;broker的失败客户端可以自动进行处理,也可以自动的适应topic partition在集群间的迁移。允许使用consumer&amp;nbsp;group来与broker进行交互以实现...
  • Kafka原理

    2020-03-01 22:35:13
    kafka是LinkedIn公司开发的一种分布式的基于“发布/订阅”的消息中间件。 1、主要特点: 1、发布/订阅高吞吐量。其中它的消息持久化的访问时间复杂度为O(1),即使对TB级别的数据也能维持常数时间的访问。 2、消息...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 4,420
精华内容 1,768
关键字:

kafka暂停