精华内容
下载资源
问答
  • kafkaProducer

    2017-01-19 02:29:43
    kafkaProducer概述kafkaProducer是由一池Buffer space(即缓冲池)组成的,buffer space会保持还没提交到server的record,同时background I/O线程把这些record转换成request并传输到cluster中。使用后如果没有关闭...


    kafkaProducer概述

    kafkaproducer是发送record到kafka集群的Kafka客户端。

    Kafkaproducer是线程安全的,线程之间共享单个producer实例比拥有多个实例更快。

    以下是使用producer发送包含连续号码的string字符串作为键/值对的record的示例。

     Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("acks", "all");
     props.put("retries", 0);
     props.put("batch.size", 16384);
     props.put("linger.ms", 1);
     props.put("buffer.memory", 33554432);
     props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
     props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
     Producer<String, String> producer = new KafkaProducer<>(props);
     for (int i = 0; i < 100; i++)
         producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
    
     producer.close();

    kafkaProducer是由一池Buffer space(即缓冲池)组成的,buffer space会保持还没提交到server的record,

    同时background I/O线程把这些record转换成request并传输到cluster中。使用后如果没有关闭producer会导致
    resource溢出。

    kafkaProducer的send()方法是异步的,当调用这个方法时会添加record到Buffer中,然后立即返回。这样可以
    批处理单条record,变得更加高效。


    ack配置控制request被认为完成的标准。 我们指定的“all”设置会导致record的完整提交被阻塞,这是最慢但最持久的设置

    如果request失败,producer可以自动重试,但由于我们将重试次数指定为0,它不会。 启用重试也会导致重复的可能性。


    producer维护每个partition中未发送record的Buffer。 这些buffer的大小由batch.size配置指定。 使其更大可以导致更多的批处理,但需要更多的内存(因为我们通常每个活动partition都有这些缓冲区中的一个)。


    默认情况下,即使Buffer中存在额外的未使用空间,Buffer也可立即发送。但是,如果你想减少request的数量,你可以将linger.ms设置为大于0的值。这将指示producer在发送请求之前等待这个毫秒数,以期望更多的record到达以填满同一批次。这与TCP中的Nagle算法类似。例如,在上面的代码片段中,由于我们将逗留时间设置为1毫秒,因此可能所有100条记录都将在单个请求中发送。然而,如果我们没有填充缓冲区,这个设置会给我们的请求增加1毫秒的延迟,等待更多的记录到达。请注意,即使在linger.ms = 0的情况下,到达时间接近的record通常也会一起批处理,因此在重负载下,无论linger配置如何,都会发生配料;然而,将此设置为大于0的值可能导致在不处于最大负载时,以较少延迟为代价,获得更少,更高效的请求。


    buffer.memory控制producer用于Buffer的总内存量。 如果record的发送速度比它们可以传送到server的速度快,那么这个buffer space将会很快被填满空间。 当Buffer space空间耗尽时,其它send call将被阻塞。 阻塞时间的阈值由max.block.ms决定,之后它会引发TimeoutException


    用户会给ProducerRecord提供键值对对象,key.serializer和value.serializer指示如何将这些键值对对象转换为字节。 您可以使用包含的ByteArraySerializer或StringSerializer作为简单的字符串或字节类型。


    从Kafka 0.11开始,KafkaProducer支持另外两种模式:idempotent producer和transactionial producer。 idempotent producer将kafka的delivery semantics(交付语义)从at least once加强到exactly once。 特别是producer重试将不再引入dupliactes。 transactional producer允许应用程序自动发送消息到多个partition(和topic)中去。


    要启用idempotence,必须将enable.idempotence配置设置为true。 如果设置,retries config(重试配置)将默认为Integer.MAX_VALUE,max.in.flight.requests.per.connection配置将默认为1,并且ack配置默认为all。 idempotent producer没有API更改,所以现有的应用程序将不需要修改以利用此功能。


    为了利用idempotent producer,必须避免应用程序级别的re-send,因为这些不能被重复删除。 因此,如果应用程序启用了idempotence,则建议不要设置retries config(重试配置),因为它将默认为Integer.MAX_VALUE。 此外,如果发送(ProducerRecord)返回错误,即使进行无限次重试也无济于事(例如,消息在发送前在缓冲区中到期),建议关闭producer并检查最后生成的消息的内容以确保 它不重复。 最后,producer只能保证在single Session中发送的message的idempotence(幂等性)。


    要使用transactionial producer和伴随的API,您必须设置transactional.id配置属性。如果设置了transactional.id,则idempotence会自动启用,同时producer配置哪个idempotence依赖于哪个。此外,transaction中包含的topic应配置为耐用性。特别地,replication.factor应该至少为3,并且这些topic的min.insync.replicas应该设置为2.最后,为了使事务保证从end-to-end实现,consumer必须是配置为只读取已提交的消息。


    transactional.id的目的是在单个producer instance的多个Session中启用事务恢复(transaction recovery)。它通常来自分区的有状态应用程序中的 shard identifier(碎片标识符)。因此,对于在partitioned application(分区应用程序)中运行的每个producer instance而言,transactional.id应该是唯一的。


    所有新的事务性API都会在失败时被阻塞,并抛出异常。下面的例子说明了如何使用新的API。它与上面的例子类似,只是所有的100条消息都是单个事务的一部分。
    Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("transactional.id", "my-transactional-id");
     Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
    
     producer.initTransactions();
    
     try {
         producer.beginTransaction();
         for (int i = 0; i < 100; i++)
             producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
         producer.commitTransaction();
     } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
         // We can't recover from these exceptions, so our only option is to close the producer and exit.
         producer.close();
     } catch (KafkaException e) {
         // For all other exceptions, just abort the transaction and try again.
         producer.abortTransaction();
     }
     producer.close();
    正如在示例中所暗示的那样,每个生产者只能有一个打开的事务。在beginTransaction()和commitTransaction()调用之间发送的所有消息都将成为单个事务的一部分。当指定transactional.id时,生产者发送的所有消息都必须是事务的一部分。

    Transactional producer使用异常来传达错误状态。特别是,不需要为producer.send()指定回调或在返回的Future上调用.get():如果任何producer.send()或事务性调用在执行期间遇到不可恢复的错误时,将抛出KafkaException。有关检测事务性发送错误的更多详细信息,请参阅send(ProducerRecord)文档。

    通过在收到KafkaException异常时调用producer.abortTransaction(),我们可以确保任何成功的写入被标记为中止,因此保持事务性保证。

    该客户端可以与0.10.0或更新版本的broker进行通信。较早或较新的broker可能不支持某些客户端功能。


    kafkaProducer对象的initTransaction()

    需要在任何其它方法之前被调用,当在configuration中有使用transaction.id时。这个方法会如下工作:
    1、保证已经被之前的producer实例初始化的拥有相同transaction.id的transaction能够完成。
       如果之前的实例在一个进程中事务失败了,它会被丢弃。如果最后一个事务已经开始完成了,但还没完成,
       这个方法会等待它的完成。

    2、获取内部的producer id和epoch。在以后producer发送的transaction message中都会使用到它们


    译自:kafkaProducer api文档






    展开全文
  • KafkaProducer剖析

    2017-04-08 22:22:50
    KafkaProducer剖析包含以下重要组成部分 1. accumulator 以[topic,partition]为维度收集应用调用send时的数据,每个维度是一个recordbatch链表,每个recordbatch保存着一批消息,内部采用bytebuffer进行存储,...

    KafkaProducer剖析

    包含以下重要组成部分
    1. accumulator
    以[topic,partition]为维度收集应用调用send时的数据,每个维度是一个recordbatch链表,每个recordbatch保存着一批消息,内部采用bytebuffer进行存储,如果而bytebuffer大小刚好等于batch.size,则会使用内存池进行存放
    2. sender
    一个守护线程,负责监视accumulator中是否有可发送的recordbatch,如果有则以broker为维度收集要发往该broker的[topic, partition]数据;如果没有则会估算有可发送recordbatch的时间,并等待再触发检查accumulator,期间可能有被唤醒的情况
    3. NetworkClient
    对多路复用IO包装,使用一个selector监视所有broker;sender线程会在每次循环中调用poll方法,处理连接,发送sender收集到的要发往每个broker的数据,读取响应数据,触发回调
    4. Mertrics
    负责监视消息的大小,发送速度,发送延迟等,监控数据默认支持JMX的方式导出

    展开全文
  • KafkaProducer源码分析

    2019-08-25 18:04:15
    KafkaProducer源码分析 Kafka常用术语 Broker:Kafka的服务端即Kafka实例,Kafka集群由一个或多个Broker组成,主要负责接收和处理客户端的请求 Topic:主题,Kafka承载消息的逻辑容器,每条发布到Kafka的消息都有...

    KafkaProducer源码分析

    Kafka常用术语

    Broker:Kafka的服务端即Kafka实例,Kafka集群由一个或多个Broker组成,主要负责接收和处理客户端的请求

    Topic:主题,Kafka承载消息的逻辑容器,每条发布到Kafka的消息都有对应的逻辑容器,工作中多用于区分业务

    Partition:分区,是物理概念,代表有序不变的消息序列,每个Topic由一个或多个Partion组成

    Replica:副本,Kafka中同一条消息拷贝到多个地方做数据冗余,这些地方就是副本,副本分为Leader和Follower,角色不同作用不同,副本是对Partition而言的,每个分区可配置多个副本来实现高可用

    Record:消息,Kafka处理的对象

    Offset:消息位移,分区中每条消息的位置信息,是单调递增且不变的值

    Producer:生产者,向主题发送新消息的应用程序

    Consumer:消费者,从主题订阅新消息的应用程序

    Consumer Offset:消费者位移,记录消费者的消费进度,每个消费者都有自己的消费者位移

    Consumer Group:消费者组,多个消费者组成一个消费者组,同时消费多个分区来实现高可用(组内消费者的个数不能多于分区个数以免浪费资源

    Reblance:重平衡,消费组内消费者实例数量变更后,其他消费者实例自动重新分配订阅主题分区的过程

     

    下面用一张图展示上面提到的部分概念(用PPT画的图,太费劲了,画了老半天,有好用的画图工具欢迎推荐)

     

    消息生产流程

    先来个KafkaProducer的小demo

    public static void main(String[] args) throws ExecutionException, InterruptedException {
            if (args.length != 2) {
                throw new IllegalArgumentException("usage: com.ding.KafkaProducerDemo bootstrap-servers topic-name");
            }
    
            Properties props = new Properties();
            // kafka服务器ip和端口,多个用逗号分割
            props.put("bootstrap.servers", args[0]);
            // 确认信号配置
            // ack=0 代表producer端不需要等待确认信号,可用性最低
            // ack=1 等待至少一个leader成功把消息写到log中,不保证follower写入成功,如果leader宕机同时follower没有把数据写入成功
            // 消息丢失
            // ack=all leader需要等待所有follower成功备份,可用性最高
            props.put("ack", "all");
            // 重试次数
            props.put("retries", 0);
            // 批处理消息的大小,批处理可以增加吞吐量
            props.put("batch.size", 16384);
            // 延迟发送消息的时间
            props.put("linger.ms", 1);
            // 用来换出数据的内存大小
            props.put("buffer.memory", 33554432);
            // key 序列化方式
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            // value 序列化方式
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            // 创建KafkaProducer对象,创建时会启动Sender线程
            Producer<String, String> producer = new KafkaProducer<>(props);
            for (int i = 0; i < 100; i++) {
                // 往RecordAccumulator中写消息
                Future<RecordMetadata> result = producer.send(new ProducerRecord<>(args[1], Integer.toString(i), Integer.toString(i)));
                RecordMetadata rm = result.get();
                System.out.println("topic: " + rm.topic() + ", partition: " +  rm.partition() + ", offset: " + rm.offset());
            }
            producer.close();
        }
    
     

    实例化

    KafkaProducer构造方法主要是根据配置文件进行一些实例化操作

    1.解析clientId,若没有配置则由是producer-递增的数字

    2.解析并实例化分区器partitioner,可以实现自己的partitioner,比如根据key分区,可以保证相同key分到同一个分区,对保证顺序很有用。若没有指定分区规则,采用默认的规则(消息有key,对key做hash,然后对可用分区取模;若没有key,用随机数对可用分区取模【没有key的时候说随机数对可用分区取模不准确,counter值初始值是随机的,但后面都是递增的,所以可以算到roundrobin】)

     

    3.解析key、value的序列化方式并实例化

    4.解析并实例化拦截器

    5.解析并实例化RecordAccumulator,主要用于存放消息(KafkaProducer主线程往RecordAccumulator中写消息,Sender线程从RecordAccumulator中读消息并发送到Kafka中)

    6.解析Broker地址

    7.创建一个Sender线程并启动

    ...
    this.sender = newSender(logContext, kafkaClient, this.metadata);
    this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
    this.ioThread.start();
    ...
    
     

    消息发送流程

    消息的发送入口是KafkaProducer.send方法,主要过程如下

    KafkaProducer.send
    KafkaProducer.doSend
    // 获取集群信息
    KafkaProducer.waitOnMetadata 
    // key/value序列化
    key\value serialize
    // 分区
    KafkaProducer.partion
    // 创建TopciPartion对象,记录消息的topic和partion信息
    TopicPartition
    // 写入消息
    RecordAccumulator.applend
    // 唤醒Sender线程
    Sender.wakeup
    
     

    RecordAccumulator

    RecordAccumulator是消息队列用于缓存消息,根据TopicPartition对消息分组

    重点看下RecordAccumulator.applend追加消息的流程

    // 记录进行applend的线程数
    appendsInProgress.incrementAndGet();
    // 根据TopicPartition获取或新建Deque双端队列
    Deque<ProducerBatch> dq = getOrCreateDeque(tp);
    ...
    private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) {
        Deque<ProducerBatch> d = this.batches.get(tp);
        if (d != null)
            return d;
        d = new ArrayDeque<>();
        Deque<ProducerBatch> previous = this.batches.putIfAbsent(tp, d);
        if (previous == null)
            return d;
        else
            return previous;
    }
    // 尝试将消息加入到缓冲区中
    // 加锁保证同一个TopicPartition写入有序
    synchronized (dq) {
        if (closed)
        	throw new KafkaException("Producer closed while send in progress");
        // 尝试写入
        RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
        if (appendResult != null)
        	return appendResult;
    }
    private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, Deque<ProducerBatch> deque) {
        // 从双端队列的尾部取出ProducerBatch
        ProducerBatch last = deque.peekLast();
        if (last != null) {
            // 取到了,尝试添加消息
            FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
            // 空间不够,返回null
            if (future == null)
                last.closeForRecordAppends();
            else
                return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
        }
        // 取不到返回null
        return null;
    }
    public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
        // 空间不够,返回null
        if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
            return null;
        } else {
            // 真正添加消息
            Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
            ...
            FutureRecordMetadata future = ...
            // future和回调callback进行关联    
            thunks.add(new Thunk(callback, future));
            ...
            return future;
        }
    }
    // 尝试applend失败(返回null),会走到这里。如果tryApplend成功直接返回了
    // 从BufferPool中申请内存空间,用于创建新的ProducerBatch
    buffer = free.allocate(size, maxTimeToBlock);
    synchronized (dq) {
        // 注意这里,前面已经尝试添加失败了,且已经分配了内存,为何还要尝试添加?
        // 因为可能已经有其他线程创建了ProducerBatch或者之前的ProducerBatch已经被Sender线程释放了一些空间,所以在尝试添加一次。这里如果添加成功,后面会在finally中释放申请的空间
        RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
        if (appendResult != null) {
            return appendResult;
        }
    
        // 尝试添加失败了,新建ProducerBatch
        MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
        ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
        FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
    
        dq.addLast(batch);
        incomplete.add(batch);
        // 将buffer置为null,避免在finally汇总释放空间
        buffer = null;
        return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
    }
    finally {
        // 最后如果再次尝试添加成功,会释放之前申请的内存(为了新建ProducerBatch)
        if (buffer != null)
            free.deallocate(buffer);
        appendsInProgress.decrementAndGet();
    }
    // 将消息写入缓冲区
    RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs);
    if (result.batchIsFull || result.newBatchCreated) {
        // 缓冲区满了或者新创建的ProducerBatch,唤起Sender线程
        this.sender.wakeup();
    }
    return result.future;
    
     

    Sender发送消息线程

    主要流程如下

    Sender.run
    Sender.runOnce
    Sender.sendProducerData
    // 获取集群信息
    Metadata.fetch
    // 获取可以发送消息的分区且已经获取到了leader分区的节点
    RecordAccumulator.ready
    // 根据准备好的节点信息从缓冲区中获取topicPartion对应的Deque队列中取出ProducerBatch信息
    RecordAccumulator.drain
    // 将消息转移到每个节点的生产请求队列中
    Sender.sendProduceRequests
    // 为消息创建生产请求队列
    Sender.sendProducerRequest
    KafkaClient.newClientRequest
    // 下面是发送消息
    KafkaClient.sent
    NetWorkClient.doSent
    Selector.send
    // 其实上面并不是真正执行I/O,只是写入到KafkaChannel中
    // poll 真正执行I/O
    KafkaClient.poll
    
     

    通过源码分析下Sender线程的主要流程

    KafkaProducer的构造方法在实例化时启动一个KafkaThread线程来执行Sender

    // KafkaProducer构造方法启动Sender
    String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
    this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
    this.ioThread.start();
    // Sender->run()->runOnce()
    long currentTimeMs = time.milliseconds();
    // 发送生产的消息
    long pollTimeout = sendProducerData(currentTimeMs);
    // 真正执行I/O操作
    client.poll(pollTimeout, currentTimeMs);
    // 获取集群信息
    Cluster cluster = metadata.fetch();
    
     
    // 获取准备好可以发送消息的分区且已经获取到leader分区的节点
    RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
    // ReadyCheckResult 包含可以发送消息且获取到leader分区的节点集合、未获取到leader分区节点的topic集合
    public final Set<Node> 的节点;
    public final long nextReadyCheckDelayMs;
    public final Set<String> unknownLeaderTopics;

    ready方法主要是遍历在上面介绍RecordAccumulator添加消息的容器,Map<TopicPartition, Deque<ProducerBatch>>,从集群信息中根据TopicPartition获取leader分区所在节点,找不到对应leader节点但有要发送的消息的topic添加到unknownLeaderTopics中。同时把那些根据TopicPartition可以获取leader分区且消息满足发送的条件的节点添加到的节点中

    // 遍历batches
    for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
        TopicPartition part = entry.getKey();
        Deque<ProducerBatch> deque = entry.getValue();
        // 根据TopicPartition从集群信息获取leader分区所在节点
        Node leader = cluster.leaderFor(part);
        synchronized (deque) {
            if (leader == null && !deque.isEmpty()) {
                // 添加未找到对应leader分区所在节点但有要发送的消息的topic
                unknownLeaderTopics.add(part.topic());
            } else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) {
                    ....
                    if (sendable && !backingOff) {
                        // 添加准备好的节点
                        readyNodes.add(leader);
                    } else {
                       ...
    }

    然后对返回的unknownLeaderTopics进行遍历,将topic加入到metadata信息中,调用metadata.requestUpdate方法请求更新metadata信息

    for (String topic : result.unknownLeaderTopics)
        this.metadata.add(topic);
        result.unknownLeaderTopics);
        this.metadata.requestUpdate();

    对已经准备好的节点进行最后的检查,移除那些节点连接没有就绪的节点,主要根据KafkaClient.ready方法进行判断

    Iterator<Node> iter = result.readyNodes.iterator();
    long notReadyTimeout = Long.MAX_VALUE;
    while (iter.hasNext()) {
        Node node = iter.next();
        // 调用KafkaClient.ready方法验证节点连接是否就绪
        if (!this.client.ready(node, now)) {
            // 移除没有就绪的节点
            iter.remove();
            notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
        }
    }

    下面开始创建生产消息的请求

    // 从RecordAccumulator中取出TopicPartition对应的Deque双端队列,然后从双端队列头部取出ProducerBatch,作为要发送的信息
    Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);

    把消息封装成ClientRequest

    ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,requestTimeoutMs, callback);

    调用KafkaClient发送消息(并非真正执行I/O),涉及到KafkaChannel。Kafka的通信采用的是NIO方式

    // NetworkClient.doSent方法
    String destination = clientRequest.destination();
    RequestHeader header = clientRequest.makeHeader(request.version());
    ...
    Send send = request.toSend(destination, header);
    InFlightRequest inFlightRequest = new InFlightRequest(clientRequest,header,isInternalRequest,request,send,now);
    this.inFlightRequests.add(inFlightRequest);
    selector.send(send);
    ​
    ...
    ​
    // Selector.send方法    
    String connectionId = send.destination();
    KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
    if (closingChannels.containsKey(connectionId)) {
        this.failedSends.add(connectionId);
    } else {
        try {
            channel.setSend(send);
        ...

    到这里,发送消息的工作准备的差不多了,调用KafkaClient.poll方法,真正执行I/O操作

    client.poll(pollTimeout, currentTimeMs);

    用一张图总结Sender线程的流程

    通过上面的介绍,我们梳理出了Kafka生产消息的主要流程,涉及到主线程往RecordAccumulator中写入消息,同时后台的Sender线程从RecordAccumulator中获取消息,使用NIO的方式把消息发送给Kafka,用一张图总结

    后记

    这是本公众号第一次尝试写源码相关的文章,说实话真不知道该如何下笔,代码截图、贴整体代码等感觉都被我否定了,最后采用了这种方式,介绍主要流程,把无关代码省略,配合流程图。

    上周参加了华为云kafka实战课程,简单看了下kafka的生产和消费代码,想简单梳理下,然后在周日中午即8.17开始阅读源码,梳理流程,一直写到了晚上12点多,还剩一点没有完成,周一早晨早起完成了这篇文章。当然这篇文章忽略了很多更细节的东西,后面会继续深入,勇于尝试,不断精进,加油!

     

    参考资料

    华为云实战

    极客时间kafka专栏

    展开全文
  • KafkaProducer源码走读

    2019-08-14 19:15:39
    用户循环调用KafkaProducer.send(ProducerRecord<K,V>record,Callbackcallback) ->KafkaProducer.doSend –>阻塞等待topic的元数据信息->等待sender线程更新元数据 –>accumulator.append —>...

    用户线程

    用户循环调用KafkaProducer.send(ProducerRecord<K,V>record,Callbackcallback)
    ->KafkaProducer.doSend
    –>阻塞等待topic的元数据信息->等待sender线程更新元数据
    –>accumulator.append
    —>根据Topic-Partition信息从batches获取Deque,取双向队列的最后一个执行last.tryAppend(队列为空的时候要收集空间new一个RecordBatch入队)
    –>结束

    Sender线程

    KafkaProducer构造的时候启动一个Sender线程循环调用
    –>run
    —>accumulator.ready
    ---->batches迭代调用,查找里面每个<TopicPartition,Deque>,获取TopicPartition的leader节点,从Deque取第一个RecordBatch,判断这个RecordBatch已经准备好了,就把leader节点信息放到readyNodes里面,构造ReadyCheckResult(readyNodes,nextReadyCheckDelayMs,unknownLeadersExist)返回
    —>readyNodes迭代调用,判断NetworkClient里面该节点是否就绪:
    NetworkClient为每个节点维护转态的表connectionStates
    selector为每个节点维护channel的表channels
    inFlightRequests为每个节点维护Deque,放在requests里面
    ---->如果没有就绪则initiateConnect:
    connectionStates.connecting
    channels.put(id,channel)
    —>如果没有就绪则从readyNodes中移除该节点
    —>就绪了则执行this.accumulator.drain(cluster,result.readyNodes,this.maxRequestSize,now)
    ---->readyNodes的每个节点:
    从集群cluster中取得节点的分区信息Listparts;
    从中顺序取一个分区构造TopicPartition,根据这个TopicPartition从batches里面取队列deque,取队列的第一个RecordBatch元素,判断满足一些条件,把batch放到ready里面
    再取下一个分区构造,直到总大小maxSize。//这里是取了一个节点的多个分区队列,每个队列的第一个RecordBatch元素
    放到Map<Integer,List>里面,node->List
    —>从accumulator里面把上面收集的batch的TopicPartition执行mutePartition操作
    —>accumulator.abortExpiredBatches把batches里面的超时的RecordBatch移除
    —>上面收集的batches的每个元素执行createProduceRequests//每个节点执行一次
    ---->把Listbatches里面的batch放到produceRecordsByPartition里面,构造ProduceRequest
    ----->groupDataByTopic组成topic-partition-buffer,构造RequestSend,构造callback,构造ClientRequest
    —>每个ClientRequest调用client.send(request,now)
    ---->doSend
    ----->this.inFlightRequests.add(request);
    ----->selector.send(request.request());
    ------>channel.setSend(send);
    —>client.poll(pollTimeout,now)
    ---->metadataUpdater.maybeUpdate->构造更新元数据的请求,但是为什么发到最空闲的node?随便一个node都可以更新元数据吗??
    ---->selector.poll
    ---->处理response和回调
    –>结束

    展开全文
  • KafkaProducer-源码学习

    2019-10-23 08:59:25
    Kafka-Clients 源码学习:KafkaProducer 篇 zwangbo 阅读数:527 2019 年 10 月 22 日 13:55 前言 本文基于 Kafka-clients:1.1.0 版本。 KafkaProducer 的发送流程 调用流程图 此图描述的是用户启动一个线程...
  • KafkaProducer及其API

    2018-07-05 11:48:27
    KafkaProducer向Kafka集群发布记录的Kafka客户端。生产者是线程安全的,并且在线程之间共享一个生产者实例通常比拥有多个实例要快。生产者由缓冲空间池组成,缓冲空间池保存尚未传输到服务器的记录,以及后台I/O线程...
  • kafka笔记-KafkaProducer

    2021-04-03 20:13:29
    KafkaProducer 1. 主要的成员变量 KafkaThread:producer启动的时候会创建一个线程,这个线程里面有个重要的Sender Sender:sender也是在producer初始化化的时候创建的,两个重要的东西KafkaClient(即...
  • 3、深潜KafkaProducer基础架构 kafka 自定义了一套网络协议,我们可以使用任意语言来实现这套协议,实现向 kafka 集群 push message 以及从 kafka 集群 pull message 的效果。在 kafka 2.8.0 版本的源码中的 ...
  • 深入分析KafkaProducer消息发送流程

    万次阅读 多人点赞 2018-04-06 13:20:09
    一、Kafka生产者发送消息示例 注意:以下所用kafka版本为0.10.1.0 KafkaProducer是线程安全对象,建议可以将其封装成多线程共享一个实例,效率反而比多实例更高,在深入分析前,先简单看一个生产者生产消息的demo...
  • KafkaProducer详解

    千次阅读 2020-08-09 20:21:33
    多线程多KafkaProducer 关于这两种方案各有利弊,对比如下: 模式 说明 优势 劣势 使用场景 多线程单KafkaProducer 多个线程共享一个实例 简单,性能好 1.所有的线程共享一个缓冲区需要设置较大值。 2.一旦一个线程...
  • KafkaProducer介绍 KafkaProducer是kafka生成者生成数据写入kafka集群,通过实例化对象调取send方法发送数据到对应的Topic中存储。 实例化 相关参数的校验 分区器 序列化器 后台发送线程 本地缓存记录集合 生产者...
  • Kafka源码解读——生产者_KafkaProducer(间断持续更......) KafkaProducer类的注释里有以下几段: producer是线程安全的,因此多线程使用一个单例模式的对象比多个对象更快些。 producer有一个缓冲池用来保存还...
  • 我们先通过一张图来了解一下KafkaProducer发送消息的整个流程: 1、ProducerInterceptors对消息进行拦截。 2、Serializer对消息的key和value进行序列化 3、Partitioner为消息选择合适的Partition 4、...
  • KafkaProducer中一般会发生两种类型的异常: 可重试的异常和不可重试的异常。 常见的可重试异常有:NetworkException、LeaderNotAvailableException、UnknownTopicOrPartitionException、...
  • Flink KafkaProducer

    2020-12-24 23:12:27
    import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment....
  • KafkaProducer是一个Kafka客户端实现,可以发布记录records至Kafka集群。KafkaProducer是线程安全的,多线程之间共享单独一个producer实例通常会比多个producer实例要快。KafkaProducer包含一组缓存池空间,存储尚未...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 9,866
精华内容 3,946
关键字:

kafkaproducer