精华内容
下载资源
问答
  • 它可以处理消费者规模的网站中的所有动作流数据   3.具有高性能、持久化、多副本备份、横向扩展能力 生产者:  消息写入leader后,follower是主动的去leader进行同步的!  producer采用push模式将数据发布到...

    kafka:

    消息队列:用于暂存数据,这就具有了解耦和削峰的作用,以应对临时高峰期达到缓冲的目的
     1.Kafka是一种高吞吐量的分布式发布/订阅式消息系统
      2.它可以处理消费者规模的网站中的所有动作流数据
      3.具有高性能、持久化、多副本备份、横向扩展能力
    生产者:
     消息写入leader后,follower是主动的去leader进行同步的!
     producer采用push模式将数据发布到broker,每条消息追加到分区中,顺序写入磁盘,所以保证同一分区内的数据是有序的!
    分区的主要目的:
      1、 方便扩展。提高负载能力,,,因为一个topic可以有多个partition,所以我们可以通过扩展机器去轻松的应对日益增长的数据量。
      2、 提高并发。以partition为读写单位,可以多个消费者同时消费数据,提高了消息的处理效率。
    分区的原则,数据发往哪个partition呢?kafka中有几个原则:
      1、 partition在写入的时候可以指定需要写入的partition,如果有指定,则写入对应的partition。
      2、 如果没有指定partition,但是设置了数据的key,则会根据key的值hash出一个partition。
      3、 如果既没指定partition,又没有设置key,则会轮询选出一个partition。

    ACK应答机制
      在生产者向队列写入数据的时候可以设置参数来确定是否确认kafka接收到数据,这个参数可设置的值为0、1、all。
      0代表producer往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。
      1代表producer往集群发送数据只要leader应答就可以发送下一条,只确保leader发送成功。
      all代表producer往集群发送数据需要所有的follower都完成从leader的同步才会发送下一条,确保leader发送成功和所有的副本都完成备份。安全性最高,但是效率最低。

    如果往不存在的topic写数据,能不能写入成功呢?kafka会自动创建topic,分区和副本的数量根据默认配置都是1。

    存储
      生产者生产的消息不断追加到log文件末尾,防止文件过大,查找效率变慢,kafka采用分片和索引机制
      分片:对log文件分片,1G为一片
      索引:即.index文件,存储索引和对应log文件中每条消息的起始位置
    查找
    根据二分查找找到对应的.index文件,从而找到对应消息的初始位置
    消费者
    多个消费者可以组成一个消费者组(consumer group),每个消费者组都有一个组id!
    同一个消费组者的消费者可以消费同一topic下不同分区的数据,但是不会组内多个消费者消费同一分区的数据!!!

    消费者组内的消费者小于partition数量:会出现某个消费者消费多个partition数据的情况,消费的速度也就不及只处理一个partition的消费者的处理速度!
    如果是消费者组的消费者多于partition的数量:多出来的消费者不消费任何partition的数据。
    所以在实际的应用中,建议消费者组的consumer的数量与partition的数量一致!

    展开全文
  • Kafka生产者消费者的工作原理

    万次阅读 2020-08-06 19:52:15
    探究Kafka生产者的工作原理 主题和日志 对于每个主题,Kafka群集都会维护一个分区日志,如下所示: 每个分区(Partition)都是有序的(所以每一个Partition内部都是有序的),不变的记录序列,这些记录连续地附加到...

    探究Kafka生产者的工作原理

    主题和日志

    对于每个主题,Kafka群集都会维护一个分区日志,如下所示:
    在这里插入图片描述

    每个分区(Partition)都是有序的(所以每一个Partition内部都是有序的),不变的记录序列,这些记录连续地附加到结构化的提交日志中。分区中的每个记录均分配有一个称为偏移的顺序ID号,该ID 唯一地标识分区中的每个记录。

    每个消费者保留的唯一元数据是该消费者在日志中的偏移量或位置。此偏移量由使用者控制:通常,使用者在读取记录时会线性地推进其偏移量,但实际上,由于位置是由使用者控制的,因此它可以按喜欢的任何顺序使用记录。例如,使用者可以重置到较旧的偏移量以重新处理过去的数据,或者跳到最近的记录并从“现在”开始使用。(类似于游标指针的方式顺序处理数据,并且该指标可以任意移动)

    分区的设计结构

    • 提供了负载均衡的能力,实现了系统的高伸缩性。
    • 不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。
    • 可以通过添加新的节点机器来增加整体系统的吞吐量。
    • Kafka分区的设计逻辑和ES分片的设计逻辑是相同的。

    生产者分区策略

    生产者分区策略是 决定生产者将消息发送到哪个分区的算法,
    主要有以下几种:

    • 轮询策略:Round-robin 策略,即顺序分配,
      轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略。(默认、常用)
    • 随机策略: Randomness 策略。所谓随机就是我们随意地将消息放置到任意一个分区上。
    • 消息键保序策略:key-ordering 策略,Kafka 中每条消息都会有自己的key,一旦消息被定义了 Key,那么你就可以保证同一个
      Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的

    kafka消息的有序性,是采用消息键保序策略来实现的。
    一个topic,一个partition(分割),一个consumer,内部单线程消费,写N个内存queue,然后N个线程分别消费一个内存queue。

    • 通过指定key的方式,具有相同key的消息会分发到同一个partition
    • partition会内部对其进行排序,保证其有序性。
      在这里插入图片描述

    Kafka的消息压缩机制

    kafka发送进行消息压缩有两个地方,分别是生产端压缩和Broker端压缩。

    • 一般情况下压缩机制:在生产者端解压、Broker端保持、消费者端解压
    • Kafka 支持 4 种压缩算法:GZIP、Snappy 、LZ4,从 2.1.0 开始,Kafka 正式支持 Zstandard
      算法(简写为 zstd)。
    • 压缩机制本质上以消费者端CPU性能换取节省网络传输带宽以及Kafka Broker端的磁盘占用。

    生产者端压缩
    生产者压缩通常采用的GZIP算法这样 Producer 启动后生产的每个消息集合都是经 GZIP 压缩过的,故而能很好地节省网络传输带宽以及 Kafka Broker 端的磁盘占用。
    配置参数:
    具体在springboot中使用kafka的案例可参考:我的博客

    <!-- 定义producer的参数 -->
        <bean id="producerProperties" class="java.util.HashMap">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="127.0.0.1:9092"/>
                    <!-- acks表示所有需同步返回确认的节点数,all或者‑1表示分区全部备份节点均需响应,可靠性最
                    高,但吞吐量会相对降低;
                    1表示只需分区leader节点响应;
                    0表示无需等待服务端响应;
                    大部分业务建议配置1,风控或安全建议配置0 -->
                    <entry key="acks" value="1"/>
                    <!-- retries表示重试次数,如果配置重试请保证消费端具有业务上幂等,根据业务需求配置  -->
                    <entry key="retries" value="1"/>
                    <!-- 发送消息请求的超时时间,规范2000 -->
                    <entry key="request.timeout.ms" value="2000"/>
                    <!-- 如果发送方buffer满或者获取不到元数据时最大阻塞时间,规范2000 -->
                    <entry key="max.block.ms" value="2000"/>
                    <!--开启GZIP压缩 -->
                    <entry key="compression.type" value="gzip"/>
                    <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
                    <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
                </map>
            </constructor-arg>
        </bean>
    

    Broker压缩
    大部分情况下 Broker 从 Producer 端接收到消息后仅仅是原封不动地保存而不会对其进行任何修改,但以下情况会引发Broker压缩

    • Broker端和Producer端采用了不同的压缩算法
    • Broker端发生了消息格式转换(如过集群中同时保存多种版本的消息格式。为了兼容老版本,Broker会将消息转换为老版本格式,这对性能影响很大,而且会丧失Zero
      Copy的特性)

    消费者端解压
    Kafka 会将启用了哪种压缩算法封装进消息集合中,在Consummer中进行解压操作。

    消息可靠性

    kafka提供以下特性来保证其消息的不丢失,从而保证消息的可靠性

    生产者确认机制
    当 Kafka 的若干个 Broker(根据配置策略,可以是一个,也可以是ALL) 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时,这条消息在 Kafka 看来就正式变为“已提交”消息了。
    设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。

    生产者失败回调机制
    生产者不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。producer.send(msg, callback) 采用异步的方式,当发生失败时会调用callback方法。

    失败重试机制
    设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。

    消费者确认机制
    确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。

    副本机制
    设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
    设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
    确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。

    限定Broker选取Leader机制
    设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。

    消息幂等性和事务

    由于kafka生产者确认机制、失败重试机制的存在,kafka的消息不会丢失但是存在由于网络延迟等原因造成重复发送的可能性。
    所以我们要考虑消息幂等性的设计。
    kafka提供了幂等性Producer的方式来保证消息幂等性。使用 ****的方式开启幂等性。

    幂等性 Producer 的作用范围:

    • 只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。
    • 只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,可以理解为 Producer 进程的一次运行。当你重启了 Producer
      进程之后,这种幂等性保证就丧失了。

    Kafka事务
    事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。
    同样使用 的方式开启事务。

    探究Kafka消费者的工作原理

    消费者组

    consumer group是kafka提供的可扩展且具有容错性的消费者机制。它是由一个或者多个消费者组成,它们共享同一个Group ID.
    组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一个消费组内的一个consumer来消费。

    consummer group有以下的特性:

    • consumer group下可以有一个或多个consumer instance,consumer
      instance可以是一个进程,也可以是一个线程(所以消费者可以采用多线程的方式去消费消息)
    • group.id是一个字符串,唯一标识一个consumer group
    • consumer group下订阅的topic下的每个分区只能分配给某个group下的一个consumer(当然该分区还可以被分配给其他group)

    消费者位置
    消费者位置,即位移。 消费者在消费的过程中需要记录自己消费了多少数据。
    位移提交有自动、手动两种方式进行位移提交。

    • 自动提交:在kafka拉取到数据之后就直接提交,这样很容易丢失数据
    • 手动提交:成功拉取数据之后,对数据进行相应的处理之后再进行提交。如拉取数据之后进行写入mysql这种 (存在数据处理失败的可能性),
      所以这时我们就需要进行手动提交kafka的offset下标。
    <!-- 定义consumer的参数 -->
    <bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />
                <!--关闭自动提交,使用spring实现的提交方案-->
                <entry key="enable.auto.commit" value="false" />
                <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
                <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
            </map>
        </constructor-arg>
    </bean>
    

    Kafka通过一个内置Topic(__consumer_offsets)来管理消费者位移。

    Rebalance机制

    rebalance本质上是一种协议,规定了一个consumer group下的所有consumer如何达成一致来分配订阅topic的每个分区。

    Kafka提供了一个角色:coordinator来执行对于consumer group的管理。
    Group Coordinator是一个服务,每个Broker在启动的时候都会启动一个该服务。Group Coordinator的作用是用来存储Group的相关Meta信息,并将对应Partition的Offset信息记录到Kafka内置Topic(__consumer_offsets)中。

    Rebalance 过程分为两步:Join 和 Sync。
    Join 顾名思义就是加入组。这一步中,所有成员都向coordinator发送JoinGroup请求,请求加入消费组。一旦所有成员都发送了JoinGroup请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader——注意leader和coordinator不是一个概念。leader负责消费分配方案的制定。
    在这里插入图片描述

    Sync,这一步leader开始分配消费方案,即哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案封装进SyncGroup请求中发给coordinator,非leader也会发SyncGroup请求,只是内容为空。coordinator接收到分配方案之后会把方案塞进SyncGroup的response中发给各个consumer。这样组内的所有成员就都知道自己应该消费哪些分区了。
    在这里插入图片描述

    在这里插入图片描述

    展开全文
  • Kafka生产者原理剖析

    千次阅读 2021-06-28 09:24:22
    Kafka 生产者剖析 ”生存还是毁灭,这是一个问题。“ 是的对Kafka来说这个曾经受万人追捧的分布式消息引擎,现在倒还真有点跌入神坛的趋势。因为Pulsar(消息系统的新贵)仿佛正在全面替代KafkaKafka真的不行了吗...

    Kafka 生产者剖析

    ”生存还是毁灭,这是一个问题。“ 是的对Kafka来说这个曾经受万人追捧的分布式消息引擎,现在倒还真有点跌入神坛的趋势。因为Pulsar(消息系统的新贵)仿佛正在全面替代Kafka。

    Kafka真的不行了吗?

    答案个人觉得是否定的 固然Pulsar有着Kafka没有的存储和计算分离的设计,Pulsar在大数据大集群的租户管理上确实也要比Kafka更好。

    但是Kafka2.8版本推出了社区呼吁已久的操作移除了Zookeeper,使用Kraft来进行代替,虽然只是测试版本,但是官方实测的数据对比上:

    • 支持的分区数由20万个分区,变成了可以支持到200万个分区左右,是之前的数十倍之多。
    • 性能相同分区的情况下也是得到了数倍的提升
    • 最重要的是Kafka现在仅仅是一个进程,而不再需要一个Zookeeper集群了,更加轻量化。

    现在看来对于性能Kafka还是有所期待的。

    俗话说”万变不离其宗“,Pulsar肯定也有很多好的优秀的设计值得我们学习。但是现在的技术更新换代真的是太快了,也许,你今天正在学习的一个技术,明天就湮灭在历史的尘埃之中。我们要做的就是抓住事情的本质。弄明白它的原理。

    无论是 Kafka、Pulsar、rabbitmq 它们不变的都是作为一个消息系统的构成 生产者、消费者、服务端。只有弄明白其中的原理,才能在技术快速更新还贷的时代里不被淘汰。

    接下来详细的剖析一下KafkaProducer 的原理。

    1.Kafka如何发送消息

    1.1Producer发送消息代码示例
    public class Producer extends Thread {
        private final KafkaProducer<Integer, String> producer;
        private final String topic;
        private final Boolean isAsync;
    
        public void run() {
            int messageNo = 1;
            // todo: 一直会往kafka发送数据
            while (true) {
                String messageStr = "Message_" + messageNo;
                long startTime = System.currentTimeMillis();    
                if (isAsync) { // Send asynchronously
                    producer.send(new ProducerRecord<>(topic,
                            messageNo,
                            messageStr), new DemoCallBack(startTime, messageNo, messageStr));
                } else { // Send synchronously
                    try {
                        producer.send(new ProducerRecord<>(topic,
                                messageNo,
                                messageStr)).get();
                        System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
                    } catch (InterruptedException | ExecutionException e) {
                        e.printStackTrace();
                    }
                }
                ++messageNo;
            }
        }
    }
    

    上边代码所示为KafkaProducer发送消息的一个简单的实例。主要有两个步骤:

    • 1.初始化KafkaProducer对象(代码中省略了)

    • 2.发送消息

      但是Kafka发送一条消息的过程就这么简单呢,实则不然,一条消息要发送并存储到Server端的路还很漫长。

    1.2 Kafka 发送消息具体流程

    如下图所示为KafkaProducer发送消息的具体流程:

    在这里插入图片描述

    总体来说分为四个步骤:

    • 主线程
      • 拦截器对消息做一些封装
      • 序列化消息以便进行网络传输
      • 消息分区 (默认轮询的分区策略)
      • 将消息 添加到 RecordAccumulator中。
    • Sender线程
      • 更新元数据
      • RecordAccumulator拉取消息
        • Ready
        • Drain
      • 封装ClientRequest
      • 调用NetworkClient进行发送(使用的是NIO)

    其实消息的发送的步骤 不止这些,比如元数据的更新、消息失败的重试、响应信息的各种处理方式等等 这里就不再做详细的叙述了,主需要了解消息发送的一个整体流程就可以了。

    1.3Kafka为什么选择双线程来进行消息发送?

    优点:

    • 客户端使用者仅仅需要调用KafkaProducer 的send 方法,具体的消息发送、重试、与Server端的网络连接等都交给Sender线程来进行处理。分工更明确,逻辑更清晰。

    • Sender来与Server端交互,主线程不比去做网络连接处理请求等操作。

    • 主线程仅仅将一条一条的消息放入消息累加器中,Sender线程根据触发发送消息的条件将消息一批一批的发送,效率更高。

    缺点

    这个缺点其实不是双线程发送的缺点,而是Kafka创建Sender线程的方式,Kafka创建Sender的方式是在调用KafkaProducer的构造方法的时候创建的,并且启动了Sender线程。Kafka并发编程的坐着曾经指出在对象的构造方法中创建并且启动一个线程会造成this指针的逃逸。

    afkaProducer(ProducerConfig config,
                      Serializer<K> keySerializer,
                      Serializer<V> valueSerializer,
                      Metadata metadata,
                      KafkaClient kafkaClient,
                      ProducerInterceptors interceptors,
                      Time time) {
      	...
          this.sender = newSender(logContext, kafkaClient, this.metadata);
          String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
          this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
          this.ioThread.start();
        ...
    }
    

    2.Sender线程

    Sender线程发送流程如下所示:

    在这里插入图片描述

    • 1.获取元数据的最新信息
    • 2.获取RecordAccumulator有哪些消息准备好了
    • 3.如果有topic的元数据不存在降该topic的更新元数据的标记设置为true意味着可以进行元数据更新了
    • 4.检查与要发送数据的主机网络是否建立好,去掉那些不能发送信息的节点
    • 5.drain这个方法很重要一会会做详细的分析。
    • 6.放弃超时的Batchs。
    • 7.创建ProducerRequest
    • 8.调用NetWorkClient的send方法,降请求添加到请求队列中
    • 9.触发发送操作。

    drain操作

    将ProducerBatch与Broker节点做映射

    核心逻辑是将RecordAccumulator记录的Map<TopicPartition,Deque> 转换Map<String,Deque> 类型。

    • 在网络层面更关心的是数据和对应节点的映射而不是TopicPartition的映射。而上层逻辑与之相反所以需要做这一次的转换。

    drain的操作其实和MapReduce和Spark的 shuffle有着异曲同工的作用,而且都是处于非常重要的位置。这样看来大数据领域的好多理念都是想通的,最重要的就是去弄通它们的原理,就可以达到一知百解的效果。

     public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
            if (nodes.isEmpty())
                return Collections.emptyMap();
    
            Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
            for (Node node : nodes) {
                List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
                batches.put(node.id(), ready);
            }
            return batches;
        }
    

    drainIndex 防止饥饿提高系统的可用性

    如果strat在每次发送消息的时候,都是从0开始遍历,就会出现每次只发送相对Topic的前几个分区的数据,后边分区的数据一直得不到发送。利用drainIndex记录了上次发送分区的位置,可以防止饥饿提高系统的可用性。

     private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
            int size = 0;
            List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
            List<ProducerBatch> ready = new ArrayList<>();
            /* to make starvation less likely this loop doesn't start at 0
            *  防止饥饿
            * */
            int start = drainIndex = drainIndex % parts.size();
            do {
                PartitionInfo part = parts.get(drainIndex);
                TopicPartition tp = new TopicPartition(part.topic(), part.partition());
                this.drainIndex = (this.drainIndex + 1) % parts.size();
              ...
                }
              ...  
          }
    

    3.RecordAccumulator

    如下图所示,为RecordAccumulator,它会将Producer发送的消息按照TopicPartition进行分类。然后将消息存入BufferPoll中。每个TopicPartition的消息放入一个队列中。TopicPartition 的唯一性由两个字段确定 topicName和partition.
    在这里插入图片描述

    BufferPoll

    如上图所示BufferPoll主要由两部分构成:

    • free 缓存数据,有效的数据的频繁的创建和销毁。
    • nonPooledAvailableMemory 防止传入的消息size太大。free 的batchSize不够分配的情况。
    /**
     * 缓存了指定大小的 byteBuffer 对象 batchSize 缓冲了大量的 ByteBuffer防止频繁的创建和销毁。每个批次的
     * 文件中配置制定的
     */
    private final Deque<ByteBuffer> free;
    
     /** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize.  */
        // 非缓冲池的可用内存大小,非缓冲池分配内存,其实就是调用ByteBuffer.allocate分配真实的JVM内存。
    //但是这部分的数据是不走内存池的用完就销毁,用了再重新申请
    private long nonPooledAvailableMemory;
    
    

    4.消息交付可靠性保障

    4.1 可靠性保障种类
    • At most once:最多发送一次 消息可能会丢失,但是不会重复
    • At Least once:最少发送一次 消息可能会重复 不会丢失
    • Exactly once:恰好一次 每条消息只被传递一次

    日常的开发场景 At most once 很少用到,我们最需要的就是 Exactly once 恰好一次。

    但是如果 某个topic的所有消息都是幂等的,存储多条,重复消费也不会影响结果,那么At least once 是一个好的选择。因为所有的事物都是平衡的,在保证Exactly only的同时,一定会损失点其他的东西,就是性能。其实并不是说那种语义最好,脱离了场景一切都是白谈,假如我对消息的丢失无所谓,你却非要去保证消息的Exactly once 那不就是做了很多无用功还损失了性能,所以一切脱离了具体的场景去谈问题,都是耍流氓。

    Kafka默认选择的是 At Least once的方式消息发送失败会选择重试,这样就可能会造成消息重复。

    如果关掉了重试的机制就是 At most once

    4.2Kafka如何实现Exactly once?

    分区维度

    幂等性 Producer 是Kakfa 0.11版本引入的新功能,添加如下配置即可:

    trueprops.put(ProducerfConfig.ENABLE_IDEMPOTENCE_CONFIG, true分区)
    

    服务端会根据一个唯一标识给我们做去重,但是仅仅是对单分区保证恰好一次的,不同的分区并不能保证恰好一次的语义,还是会有消息重复。而且 单个分区也是单次会话起作用的,假如生产者端重启了,不好意思,他就不能消息不会和上次会话的消息不重复了。

    全局维度

    事务型 Producer 能够保证将消息原子性地写入到多个分区中,不会有重复消息。

    事务型 Producer即使在多次回话中 ,Kafka 依然保证它们发送消息的精确一次处理。

    如何开启事务:

    #开启事务
    enable.idempotence = true。
    #设置事务id 最好是和业务相关是一个有意义的id
    transactional.id=MYTRAN
    

    代码也会做一些修改:

    //开启事务
    producer.initTransactions();
    try {
                producer.beginTransaction();
                producer.send(record1);
                producer.send(record2);
      					producer.send(record3);
      					//提交事务
                producer.commitTransaction();
    } catch (KafkaException e) {
      					//回滚
                producer.abortTransaction();
    }
    

    需要注意的是Producer开启了事务后,Consumer对这些API也要有着相同的事务试图:

    • read_uncommitted:默认值, Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。
    • read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。

    这里需要将Conumer的isolation.level参数设置为read_committed才可以。

    4.3 自己如何实现Exactly once
    • 禁止重试,消息只发送到一个分区,当消息发送失败后,具体的重试逻辑由生产者主线程做处理
    • 生产者不做处理,并开启重试机制,对每条消息创建一个唯一表示,具体的去重操作,由消费者来做。

    KakfaProducer端还有很多优秀的设计,提供的API也比较丰富,比如分段锁的使用,在多线程下使线程更少的去竞争锁的资源,ConcurrentMap的使用针对都多写少的场景,网络请求使用TCP方式,使用了NIO实现了自己的网络框架等。都是值得我们去学习的地方。

    展开全文
  • Kafka生产者客户端原理 1.整体架构 整个生产者客户端由两个线程协调运行,一个是主线程,一个是Sender线程;在主线程中消息会先经过拦截器,序列化器,分区器,消息累加器;之后Sender线程从消息累加器中取消息并将...

    Kafka生产者客户端原理

    1.整体架构

    在这里插入图片描述

    整个生产者客户端由两个线程协调运行,一个是主线程,一个是Sender线程;在主线程中消息会先经过拦截器,序列化器,分区器,消息累加器;之后Sender线程从消息累加器中取消息并将其发送到kafka中。

    • 拦截器:主要是对kafka的消息进行拦截,可以进行过滤,加上前缀,后缀等等操作
    • 序列化器:对发送的消息进行序列化操作,将对象转换成字节数组才能通过网络发送到kafka
    • 分区器:我们可以对发送消息的key计算partition的值,作用就是为消息分配分区
    • 消息累加器:主要是用来缓存消息以便Sender线程可以批量发送,从而减少网络传输的资源消耗以便提升性能;消息发送过来会存储在RecordAccumulator中的某个分区的队列中,每个分区队列中的内容是ProducerBatch;消息写入缓存时会追加到某个分区队列的尾部,Sender就会在这个分区队列的头部读取;ProducerBatch是指一个消息批次,多个ProducerRecord会包含在ProducerBatch中,这样能够减少网络请求的次数以提升整体的吞吐量
    • Request:是指Kafka的各种协议请求,对于消息发送而言就是具体的ProducerRequest
    • node:表示Kafka集群的broker节点;当Sender从RecordAccumulator中获取缓存消息时,会将<分区,Deque>转化为<Node,List>形式,最终还会进一步封装成<Node,Request>形式,这样就可以将Request请求发送到各个Node节点

    2.AR,ISR,OSR概念

    • ISR(In-Sync Replicas):与leader保持同步的副本集合,ISR集合是AR集合的一个子集
    • AR(Assigned Replicas):分区内所有的副本
    • OSR(Out-of-Sync Replicas):与leader副本同步滞后过多的副本组成

    3.HW,LEO概念

    • HW(High Watermark):高水位,指的是消费者所能见的最大偏移量(offset)
    • LEO(Log End Offset):每一个副本最后一条消息的offset

    下图中第一条消息的offset为0,最后一条消息的offset为8,offset为9的消息用虚线框表示,代表下一条带写入的消息。下图中的HW为6,表示消费者只能拉取到offset在0~5之间的消息;LEO为9,LEO的大小相当于当前日志分区中最后一条消息的offset值加1
    在这里插入图片描述

    4.ACKS机制

    这个机制在生产者中可以通过参数进行配置,用来指定分区内必须要有多少个副本收到这条消息才能认定这条消息写成功,这个参数总共有三种类型值

    • acks=1:默认值就为1,当生产者发送完消息后,只要分区内的leader副本写入成功,那么就会收到服务器的成功响应。但是当消息成功写入到leader副本并响应给生产者这段时间内,其它的follower副本同步leader副本数据时leader副本崩溃,那么会造成消息的丢失
    • acks=0:生产者发送消息不需要等待任何服务器的响应,这种方式的效率最高,但是在消息写入kafka的过程中发生了某些异常,就会导致kafka丢失数据;这种情况就是kafka保证了最大吞吐量,可靠性最低
    • acks=-1或acks=all:生产者在消息发送后,会等ISR中所有的副本都成功写入消息后才能收到来自服务器端的成功响应;这样保证了数据最强可靠性,但是如果follower同步完成后,服务器端发送确认消息之前leader发生故障,那么会造成数据的重复

    参考书籍:《深入理解Kafka:核心设计与实践原理》

    展开全文
  • 根据源码分析kafka java客户端的生产者消费者的流程。 基于zookeeper的旧消费者 kafka消费者从消费数据到关闭经历的流程。 由于3个核心线程 基于zookeeper的连接器监听该消费者是否触发重平衡,并获取该...
  • Kafka生产者消费者详解

    千次阅读 2019-12-25 15:11:52
    什么是 Kafka Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统。 Kafka 的基本术语 消息:Kafka ...
  • Kafka消费者生产者实例

    万次阅读 2017-07-30 18:22:56
    为了更为直观展示Kafka的消息生产消费的过程,我会从基于Console和基于Application两个方面介绍使用实例。...由于主要介绍如何使用Kafka快速构建生产者消费者实例,所以不会涉及Kafka内部的原理。一个基于Kafk
  • Kafka发送消费称为生产者,在生产者将消息发送出去之前,需要经历拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)等一系列的作用,随后才真正进入发送消息发送流程。本文梳理了Kafka生产者...
  • kafka 生产/消费API、offset管理/原理kafka命令 kafka 命令、API Kafka安装、原理、使用 mapreduce 实时消费 kafka 数据 创建topic kafka-topics --create --zookeeper node1:2181 --replication-factor 3...
  • 首先来看一下Kafka生产者组件图 第一步,Kafka 会将发送消息包装为 ProducerRecord 对象, ProducerRecord 对象包含了目标主题和要发送的内容,同时还可以指定键和分区。在发送 ProducerRecord 对象前,生产者...
  • Kafka 中,发布订阅的对象是主题(Topic),向主题发布消息的客户端应用程序称为生产者(Producer),而订阅这些主题消息的客户端应用程序就被称为消费者(Consumer)。生产者消费者统称...
  • Kafka消费者原理解析

    千次阅读 2019-01-28 17:55:52
    文章目录消费者和消费组创建Kafka消费者分区再均衡触发时机rebalance generatianrebalance协议rebalance流程消费者配置fetch.min.bytesfetch.max.wait.msmax.partition.fetch.bytessession.timeout.msauto.offset....
  • 目录一、 KafkaProducer发送模型一、数据入池二、NIO发送数据 二、KafkaTemplate模板三、...kafka原理和实践(一)原理:10分钟入门 kafka原理和实践(二)spring-kafka简单实践 kafka原理和实践(三)spring-kaf...
  • 在 0.11 版 本以前的 Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局 去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。 0.11 版本的 Kafka,引入...
  • Kafka生产者

    2019-09-24 23:56:00
    Kafka的历史变迁中,一共有两个大版本的生产者客户端:第一个是于 Kafka开源之初使用Scaa语言编写的客户端,我们可以称之为旧生产者客户端(Old Producer)或 Scala版生产者客户端;第二个是从 Kafka 0.9.x版本开始推出...
  • 1. Kafka和ZK的关系 元数据存放到zk(节点中),应用了zk统一命名的功能 集群节点信息:brokerId 每台机器的id topic信息:在哪台机器上的哪个位置 parition主从信息:每个partition多个副本中,谁是老大 ...
  • 目录一、kafkaConsumer消费者模型一、kafkaConsumer构造二、消费者容器启动...kafka原理和实践(三)spring-kafka生产者源码 kafka原理和实践(四)spring-kafka消费者源码 kafka原理和实践(五)spring-kafka配置...
  • 本文说明了Kafka生产者消费者的设计原则,通过本文可以了解生产者消费者的设计原理和思想,为更好的理解Kafka的运行机制打下基础。 生产者(Producer)的设计 生产则的设计考虑了两个方面: 如何实现负载...
  • 图解Kafka的Java API 1、前言 2、生产者 1.1、复习一下命令 1.2、写入分区的策略有哪些 1.3、Java代码原理图 1.4、Java代码 1.5、试运行 2、消费者 2.1、复习一下命令 2.2、消费者偏移量 2.3、Java代码 2.4、Spark...
  • pykafka是Samsa的升级版本,使用samsa连接zookeeper,生产者直接连接kafka服务器列表,消费者才用zookeeper。 使用kafka Cluster。 二、pykafka (1) pykafka安装 根据机器环境从以下三种方式中选择进行一种...
  • kafka消息队列主要由生产者(producer)、消费者(consumer)以及消息代理(broker)构成,生产者会源源不断地将...本文主要涉及kafka的生产者模块,从功能和底层原理两个方面对kafka生产的部分进行分析。 1.kakfa生产...
  • 在上一篇,我们谈到了从生产者一端,kafka是基于何种策略,将消息推送到集群下topic的不同分区中,可以使用kafka自带的分区策略,也可以根据自身的业务定制消息推送的分区策略 而从消费者一端来看,consumer连接到...
  • EFK 收集messages日志环境准备1. 关闭防火墙 selinux2. 修改主机名称3. 修改vim /etc/hosts文件4.... 创建topic 【一台创建模拟生产,一台模拟消费】13. 安装服务 【192.168.112.153端】14. 查看列表15. 安装elastics
  • 2、kafka生产者

    2019-05-07 21:53:52
    本文主要用来整理kafka生产者相关的一些知识点,主要参考自《深入理解kafka核心设计与实践原理》--朱忠华 kafka先后有两个大版本的生产者客户端:第一个是kafka开源之初使用Scala语言编写的客户端,我们称Scala...
  • push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 ...
  • 二、pykafka (1) pykafka安装 (2) pykafka的api (3) pykafka生产者api (4) pykafka消费者api 三、kafka-python (1) kafka-python安装 (2) kafka-python的api (3) kafka-python生产者 (4) kafka-python消费者 ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 18,060
精华内容 7,224
关键字:

kafka生产者消费者原理