精华内容
下载资源
问答
  • kafka源码分析

    2018-08-15 14:57:30
    kafka源码分析, Introduction kafka-Intro kafka-Unix kafka-Producer kafka-Producer-Scala kafka-SocketServer kafka-LogAppend kafka-ISR kafka-Consumer-init-Scala
  • Kafka源码分析

    2017-11-12 20:30:59
    历经了大概一个多月的时间,参考了徐郡明老师的Apache Kafka源码剖析,把源码梳理了一遍,放到了github上,后续的会整理成系列博客,希望各位能多多指教。 ...

    历经了大概一个多月的时间,参考了徐郡明老师的Apache Kafka源码剖析,把源码梳理了一遍,放到了github上,后续的会整理成系列博客,希望各位能多多指教。

    展开全文
  • kafka源码分析之producer

    万次阅读 2016-07-27 10:39:52
    kafka源码分析之kafka producer发送数据源码分析

    Producerclient

    示例代码

    Properties props = new Properties();
    props.put("bootstrap.servers""localhost:9092");
    props.put("client.id""DemoProducer");
    props.put("key.serializer""org.apache.kafka.common.serialization.ByteArraySerializer");
    props.put("value.serializer""org.apache.kafka.common.serialization.ByteArraySerializer");
    producer new KafkaProducer<IntegerString>(props);
    this.topic = topic;
    this.isAsync = isAsync;

    String messageStr = "Message_";
    long startTime = System.currentTimeMillis();
    if (isAsync) {

    异步处理,这个过程需要定义一个回调函数来监听发送的消息的响应结果

     // Send asynchronously
      
    producer.send(new ProducerRecord<byte[]byte[]>(topic,
          messageNo.getBytes()/*key*/,
          messageNo.getBytes()/*value*/),

          /*异步处理,回调函数*/

          new DemoCallBack(startTimemessageNomessageStr));
    else 

    同步处理,发送完成后,等待发送的响应结果。

    // Send synchronously
      
    try {
        producer.send(new ProducerRecord<IntegerString>(topic,
            messageNo.getBytes()/*key*/,
            messageNo.getBytes()/*value*/)).get();
        System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
      catch (InterruptedException e) {
        e.printStackTrace();
      catch (ExecutionException e) {
        e.printStackTrace();
      }
    }

     

    关于异步处理的回调函数定义:

    这个回调函数实现需要实现org.apache.kafka.clients.producer.Callback接口。

    class DemoCallBack implements Callback 

     

    并实现接口中的函数:

    public void onCompletion(RecordMetadata metadataException exception) {

    这里的startTime是发送这条消息时,生成回调函数时传入的消息发送的开始时间,

    计算出来了这次发送这条消息共花的时间
      long elapsedTime = System.currentTimeMillis() - startTime;
      if (metadata != null) {

    如果metadata信息不为空,表示消息添加成功,可以得到当前添加成功的消息的offset.
        System.out.println(
            "message(" key ", " message ") sent to partition(" 

               + metadata.partition() +
                "), " +
                "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
      else {

    这种情况下,表示exception有值,也就是添加消息失败了,可以直接打印这个失败的消息的内容。
        exception.printStackTrace();
      }
    }

     

    Client端的生成与处理流程

    生成KafkaProducer实例

    1,首先看看KafkaProducer实例生成:

    根据传入的properties配置信息,生成用于Producer的config实例。

    this(new ProducerConfig(properties)nullnull);

     

    2,解析必要的配置项:

    2,1,配置项client.id,用于标记client端的一个编码值,默认值为producer-1。在同一个进程内,多个client端时,如果没有指定,默认根据1这个值向后增加。

    2,2,配置项partitioner.class,配置用于producer写入数据时用于计算这条数据对应的partition的分配算子实例,这个实例必须是的Partitioner实现。实例初始化时会调用configure函数把配置文件传入进去,用于实例生成时使用,默认情况下分区算子是DefaultPartitioner。这个默认算子根据当前的key值进行murmur2 hash并与对应的topic的个数于模,如果key为null时,根据一个自增的integer的值与partition的个数取模.

    2,3,配置项retry.backoff.ms,用于在向broker发送数据失败后的重试间隔时间,默认值为100ms

    2,4,配置项metadata.max.age.ms,用于配置每个producer端缓存topic的metadata的过期时间,默认值为5分钟。配置上面的2,3,与2,4的配置,生成一个Metadata实例。

    2,5,配置项max.request.size,用于配置每次producer请求的最大的字节数,默认值为1MB。

    2,6,配置项buffer.memory,用于配置producer端等待向server发送的数据的缓冲区的大小,默认值为32MB。

    2,7,配置项compression.type,默认值none,用于配置数据的压缩算法,默认为不压缩,可配置的值为none,gzip,snappy,lz4

    2,8,配置项max.block.ms,用于配置send数据或partitionFor函数得到对应的leader时,最大的等待时间,默认值为60秒。

    2,9,配置项request.timeout.ms,用于配置socket请求的最大超时时间,默认值为30秒。

     

    3,生成record的累加器,这是一个用于对producer要发送的数据进行缓冲的实例:

    this.accumulator new RecordAccumulator(

            config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
            this.totalMemorySize,
            this.compressionType,
            config.getLong(ProducerConfig.LINGER_MS_CONFIG),
            retryBackoffMs,
            metrics,
            time,
            metricTags);

     

    3,1,RecordAccumulator实例需要的配置:

    3,1,1配置项batch.size,用于批量提交的batch字节大小,默认值为16384。

    3,1,2配置项linger.ms,这个配置与3,1,1配合使用,用于配置数据缓存的最大延迟时间,默认值0.

    3,1,3依赖的其它配置项:2,6  2,7 2,3。

     

    4,根据配置项bootstrap.servers,多个配置使用逗号分开,

    生成用于socket请求的InetSocketAddress实例集合。

    4,1并根据配置的broker的连接地址集合,生成Cluster的实例。把cluster实例更新到metadata的实例中。

     

    5,生成NetworkClient实例,这个实例用于与各个broker进行socket通信,生成用于进行数据发送的Sender实例,并生成用于数据发送的KafkaThread线程并启动。

     

    6,根据配置项key.serializer/value.serializer,生成key与value的序列化实例,这实例必须是Serializer的实现。

     

    KafkaThread线程初始化

    生成NetworkClient实例需要的配置项:

    1,配置项connections.max.idle.ms,默认值为9分钟,用于设置连接最大的空闲时间,

    2,配置项max.in.flight.requests.per.connection,默认值5,用于设置每个连接最大的请求个数

    3,配置项reconnect.backoff.ms,默认值50ms,用于设置重新尝试连接的等待时间。

    4,配置项send.buffer.bytes,默认值128kb,用于设置socket的发送缓冲区SO_SNDBUF的大小。

    5,配置项receive.buffer.bytes,默认值32kb,用于设置socket的接收响应的缓冲区SO_RCVBUF的大小。

    6,配置项request.timeout.ms,用于配置socket请求的最大超时时间,默认值为30秒。

    NetworkClient client = new NetworkClient(
            new Selector(

                config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG)

                this.metricstime"producer"metricTagschannelBuilder),
            this.metadata,
            clientId,
            config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
            config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
            config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
            config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
            this.requestTimeoutMstime);

     

    Sender是一个用于发送数据的线程。

    需要的配置项:

    1,配置项max.request.size,用于配置每次producer请求的最大的字节数,默认值为1MB。

    2,配置项acks,默认值1,用于配置请求的ack的类型,-1,0,1三种。

    3,配置项retries,默认值0,用于配置发送失败的重试次数。
    this.sender new Sender(client,
            this.metadata,
            this.accumulator,
            config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
            (shortparseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
            config.getInt(ProducerConfig.RETRIES_CONFIG),
            this.metrics,
            new SystemTime(),
            clientId,
            this.requestTimeoutMs);
    String ioThreadName = "kafka-producer-network-thread" 

            + (clientId.length() > " | " clientId "");

     

    这里用于启动用于对producer中的数据进行发送的线程Sender实例。
    this.ioThread new KafkaThread(ioThreadNamethis.sendertrue);
    this.ioThread.start();

     

    通过producer发送数据

    Producersend函数

    public Future<RecordMetadata> send(ProducerRecord<KV> record) {
        return send(recordnull);
    }

    如果需要考虑数据发送成功的回调处理时,需要实现Callback。
    public Future<RecordMetadata> send(ProducerRecord<KV> record

    Callback callback) {
        try {

    这里根据请求的记录的topic的名称,得到这个topic对应的metadata信息,这里通过Metadata实例来得到。函数返回值是读取topic的metadata信息的读取时间。

    1,从metadata实例中的topics集合中检查这个topic是否存在,如果不存在,把这个topic添加到集合中,

    2,从metadata对应的Cluster实例(这里存储有每个broker的连接信息)中的partitionsByTopic集合中根据topic得到topic对应的partition信息的集合,如果partitionsByTopic中已经存在有对应的partitions的记录,说明这个topic的metadata信息已经被加载出来,函数直接返回0。

    3,如果当前的topic在metadata中没有对应的partitions的信息,根据max.block.ms配置的最大等待时间,通过每个broker的连接,随机取出一个broker的连接,如果broker的连接不存在时,会创建这个连接并向broker发起一个TopicMetadataRequest请求得到这个topic对应的metadata信息。
            // first make sure the metadata for the topic is available
            
    long waitedOnMetadataMs = waitOnMetadata(record.topic()

                 this.maxBlockTimeMs);

     

    这里得到总的等待时间除去得到metadata信息用去的时间后还可以用于等待添加数据到发送队列处理的等待时间。
            long remainingWaitMs = Math.max(0this.maxBlockTimeMs 

                   waitedOnMetadataMs);

     

    对传入的key与value进行序列化操作,并得到序列化后的byte array的key与value.
            byte[] serializedKey;
            try {
                serializedKey = keySerializer.serialize(record.topic()record.key());
            catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " 

                        + record.key().getClass().getName() +
                        " to class " producerConfig.getClass(

                            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer");
            }
            byte[] serializedValue;
            try {
                serializedValue = valueSerializer.serialize(record.topic()

                    record.value());
            catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " 

                        record.value().getClass().getName() +
                        " to class " producerConfig.getClass(

                            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer");
            }

     

    得到这条记录对应的partition,并根据这个partition生成TopicPartition,

    在得到对应的partition时,如果传入参数中包含有partition的id时,判断这个partition的值是否在指定的范围内,必须在指定的范围内,如果partition没有传入时,通过指定的partitioner的实例,根据record的kv信息,生成一个partition的id值。
            int partition = partition(recordserializedKeyserializedValue

                     metadata.fetch());

    得到一条记录的长度,这个记录的长度为size(4),offset(8),crc(4),magic(1),attr(1),

                Keysize(4),key,valuesize(4),value
            int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey

                       serializedValue);
            ensureValidRecordSize(serializedSize);
            TopicPartition tp = new TopicPartition(record.topic()partition);


            log.trace("Sending record {} with callback {} to topic {} partition {}",

                   recordcallbackrecord.topic()partition);

     

    向client端的消息缓冲区内写入这条消息。
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp

                  serializedKeyserializedValuecallbackremainingWaitMs);


            if (result.batchIsFull || result.newBatchCreated) {

    如果当前的缓冲区的batch的大小已经满了,或者说这个缓冲区中重新生成了一个batch时,唤醒sender的线程,让sender的run函数继续执行,完成对数据的发送操作。
                log.trace("Waking up the sender since topic {} partition {} is either full 

                      or getting a new batch"record.topic()partition);


                this.sender.wakeup();
            }
            return result.future;
            // handling exceptions and record the errors;
            // for API exceptions return them in the future,
            // for other exceptions throw directly
        
    catch (ApiException e) {
            log.debug("Exception occurred during message send:"e);
            if (callback != null)
                callback.onCompletion(nulle);
            this.errors.record();
            return new FutureFailure(e);
        catch (InterruptedException e) {
            this.errors.record();
            throw new InterruptException(e);
        catch (BufferExhaustedException e) {
            this.errors.record();
            this.metrics.sensor("buffer-exhausted-records").record();
            throw e;
        catch (KafkaException e) {
            this.errors.record();
            throw e;
        }
    }

     

    Producer的缓冲区的append

    在执行producer的send函数时,并不是直接就向socket发起网络请求,而是先把数据存储到发送的缓冲区中,这个缓冲区的实现是一个RecordAccumulator实例。

    这个实例生成时,需要的配置项:

    配置项batch.size,用于批量提交的batch字节大小,默认值为16384。

    配置项linger.ms,这个配置与3,1,1配合使用,用于配置数据缓存的最大延迟时间,默认值0.

    配置项buffer.memory,用于配置producer端等待向server发送的数据的缓冲区的大小,默认值为32MB。

    配置项compression.type,默认值none,用于配置数据的压缩算法,默认为不压缩,可配置的值为none,gzip,snappy,lz4

    配置项retry.backoff.ms,用于在向broker发送数据失败后的重试间隔时间,默认值为100ms

     

    接下来看看用于添加数据到缓冲区的append函数
    public RecordAppendResult append(TopicPartition tpbyte[] keybyte[] valueCallback callbacklong maxTimeToBlock) throws InterruptedException {


        
    appendsInProgress.incrementAndGet();
        try {
            if (closed)
                throw new IllegalStateException(

                      "Cannot send after the producer is closed.");

     

    首先从当前的batchs集合中得到对应这个partition的RecordBatch的双端队列。

    如果batchs集合中还不存在partition对应的双端队列时,生成一个ArrayDeque的队列实例,并放入到batchs的集合中。这个函数返回batchs集合中对应这个partition的双端队列实例。
            // check if we have an in-progress batch
            
    Deque<RecordBatch> dq = dequeFor(tp);
            synchronized (dq) {

    得到这个队列中,最后一个RecordBatch的实例值,在一个ArrayDeque的双端队列中,实例初始化时默认生成16个元素的数组(2的倍数),如果是addLast是从0开始向后添加,如果是addFirst是从数组尾部向前添加。
                RecordBatch last = dq.peekLast();
                if (last != null) {

    如果这个双端队列中,得到了一个RecordBatch的用于存储batch的实例时,表示这个队列中是存在待提交的batch的信息。向这个recordBatch中添加这个kvy-value进去。

    第一次进行partition的消息添加时,这个流程不会被执行。

    这里向队列中最后一个RecordBatch添加这个kv消息,这个流程被执行时,表示这个RecordBatch一定存在一个大于0的record的记录数,

    向这个RecordBatch中添加消息的流程:

    1,检查这个batch是否处于可写的状态,Sender线程未对此batch进行提交时,这个值为true,同时这个buffer中没有写入任何内容时,这个buffer的大小能够放下当前的kv,

    2,检查这个batch中当前的内存位置加上当前要send进去的kv的大小是否超过了batchSize的大小,

    3,如果流程执行到这里,表示这个kv能被添加到这个RecordBatch中,向RecordBatch中添加这条消息,并返回这个RecordBatch的FutureRecordMetadata的信息,

    4,如果当前的RecordBatch没有足够的空间来存储这个kv时,这里返回的future是一个null值。
                    FutureRecordMetadata future = last.tryAppend(keyvaluecallback

                           time.milliseconds());

     

    如果说future的值不为空,表示这条消息成功添加到这个buffer中,检查这个队列中的RecordBatch的个数是否大于1或者当前的RecordBatch的大小是否已经达到了不能写的情况,如果满足这两种情况中的一种那么生成这个RecordAppendResult的第二个参数为true,否则为false,第三个参数,由于这个情况下是直接拿到的队列中的一个buffe进行的添加,并不是新创建的RecordBatch,这里的值为false.
                    if (future != null)
                        return new RecordAppendResult(futuredq.size() > || 

                             last.records.isFull()false);
                }
            }

    如果对应的partition的双端队列中是第一次添加消息到缓冲区,或者说当前双端队列中最后一个RecordBatch内存储的消息已经达到了batch的大小,需要重新生成一个RecordBatch,计算出这个batch存储的大小。
            // we don't have an in-progress record batch try to allocate a new batch
            
    int size = Math.max(this.batchSizeRecords.LOG_OVERHEAD 

                    Record.recordSize(keyvalue));


            log.trace("Allocating a new {} byte message buffer for topic {} partition {}"

                        sizetp.topic()tp.partition());

     

    从内存池中分配指定大小的一个缓冲区出来,分配流程:

    1,如果申请的size大小与每个batchSize的大小(可以理解为内存页)相同,同时内存池中刚好有缓存起来的已经分配好的buffer,直接取出这个双端队列中的第一个buffer(采用移出的方式).

    2,如果当前内存池中可用的内存加上所有分配的内存页的大小(每个内存页是一个batchSize的大小)相加大于或者等于当前要申请的size大小,如果当前可用的内存小于申请的size时,释放掉内存池的双端队列中最后一个缓存起来的已经分配的buffer的容量(这个过程是一个迭代过程,直到释放的内存达到可以存储这个size的大小为结束,每次迭代移出最后一个buffer),把当前的可用内存减去分配的size的大小,并根据这个size生成一个ByteBuffer实例。返回这个ByteBuffer.

    3,这种情况表示当前的缓冲区已经没有足够的大小用来分配buffer,通过while进行迭代,直到可用的内存达到size的大小,每次迭代当前的send线程就wait住,等待Sender线程对buffer进行提交释放后唤醒线程,

    3,1,在线程被唤醒后,检查如果当前的while的迭代是第一次迭代,同时要分配的size刚好就是内存页的大小,同时内存池中空闲的内存页的buffer刚好还有多于的的,取出内存池中双端队列的第一个buffer.停止迭代,返回这个buffer.

    3,2这种情况下,send函数的线程已经被唤醒,但是要分配的size是一个比batchSize(内存页)大的size时,如果当前的内存池中还有缓存的内存页可用,释放缓存的内存页buffer,直到释放到可分配size的空间(如果无法释放更多的空间时,while重新迭代,线程重新进入到wait的状态,等待提交后的唤醒),这种情况下返回的buffer会根据size大小重新申请一个ByteBuffer并返回,在能够分配可用大小的内存空间后,同时当前内存池中可用的内存大于0,或者说内存池还有被缓存起来的内存页buffer,唤醒下一个等待线程。
            ByteBuffer buffer = free.allocate(sizemaxTimeToBlock);
            synchronized (dq) {
                
    if (closed)
                    throw new IllegalStateException(

                            "Cannot send after the producer is closed.");

     

    如果这个双端队列中,得到了一个RecordBatch的用于存储batch的实例时,表示这个队列中是存在待提交的batch的信息。向这个recordBatch中添加这个kvy-value进去。

    第一次进行partition的消息添加时,这个流程不会被执行。

    这里向队列中最后一个RecordBatch添加这个kv消息,这个流程被执行时,表示这个RecordBatch一定存在一个大于0的record的记录数,

    向这个RecordBatch中添加消息的流程:

    1,检查这个batch是否处于可写的状态,Sender线程未对此batch进行提交时,这个值为true,同时这个buffer中没有写入任何内容时,这个buffer的大小能够放下当前的kv,

    2,检查这个batch中当前的内存位置加上当前要send进去的kv的大小是否超过了batchSize的大小,

    3,如果流程执行到这里,表示这个kv能被添加到这个RecordBatch中,向RecordBatch中添加这条消息,并返回这个RecordBatch的FutureRecordMetadata的信息,

    4,如果当前的RecordBatch没有足够的空间来存储这个kv时,这里返回的future是一个nul值。
                RecordBatch last = dq.peekLast();
                if (last != null) {
                    FutureRecordMetadata future = last.tryAppend(keyvaluecallback

                            time.milliseconds());

     

    下面的future不为空的情况,通常情况下应该不会被发生,如果发生了,把当前申请的buffer释放掉。并把这个buffer放到这个内存池的缓存队列中,用于下一次使用时,不需要重复申请内存。

    如果说future的值不为空,表示这条消息成功添加到这个buffer中,检查这个队列中的RecordBatch的个数是否大于1或者当前的RecordBatch的大小是否已经达到了不能写的情况,如果满足这两种情况中的一种那么生成这个RecordAppendResult的第二个参数为true,否则为false,第三个参数,由于这个情况下是直接拿到的队列中的一个buffe进行的添加,并不是新创建的RecordBatch,这里的值为false.
                    if (future != null) {
                        
    free.deallocate(buffer);
                        return new RecordAppendResult(futuredq.size() > || 

                            last.records.isFull()false);
                    }
                }

     

    流程执行到这里,表示当前的partition中对应的双端队列的最后一个RecordBatch不能够存储这个kv的大小,或者说当前的队列中不存在RecordBatch,新生成一个Records用于存储要send的消息集合,并生成一个RecordBatch来归类这个records,把要send的kv添加到这个records中,

    并生成一个FutureRecordMetadata实例返回,

    这个实例中引用一个对应此RecordBatch中的ProduceRequestResult实例,这个实例用于在Sender线程中用于控制batch是否被提交成功的处理,第二个参数是表示当前的RecordBatch中已经存储的消息条数,在这里通过RecordBatch添加消息时(tryAppend函数),在这个流程的处理中一定会返回一个FutureRecordMetadata的实例,因为这是第一次添加,RecordBatch中的buffer被定义成刚好能够存储这个kv或者说这个buffer不光能够存储这一个kv.
                MemoryRecords records = MemoryRecords.emptyRecords(buffercompression

                      this.batchSize);
                RecordBatch batch = new RecordBatch(tprecordstime.milliseconds());
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(keyvalue

                      callbacktime.milliseconds()));

    最后,把这个新生成的batch添加到队列中。
                dq.addLast(batch);
                incomplete.add(batch);

    生成要返回的信息,这个信息中包含有future用于控制提交是否成功,第二个参数1:如果说队列中已经有超过一个的batch时,表示第一个batch已经满了,或者说第一个的batch已经不能存储新send进来的kv,又新创建了一个batch,2:新生成的batch的records中已经存储了大于或等于batchSize的大小的数据,

    ==>如果说这个值是true时,表示这个参数上面的两种情况最少满足一种。

    第三个参数由于这里是新创建的一个RecordBatch,因为为true.
                return new RecordAppendResult(futuredq.size() > || 

                      batch.records.isFull()true);
            }
        } finally {
            appendsInProgress.decrementAndGet();
        }
    }

     

    Sender线程处理数据的发送

     

    线程的run函数:
    /**
     * The main run loop for the sender thread
     */
    public void run() {
        log.debug("Starting Kafka producer I/O thread.");

    KafkaProducer实例生成时,KafkaThread线程启动后,会执行Sender实例中的run函数,
        // main loop, runs until close is called
        
    while (running) {

    如果producer没有执行shutdown操作前,run函数会一直在这个地方进行执行,不断的执行run函数传入当前的执行时的系统时间。
            try {
                run(time.milliseconds());
            catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: "e);
            }
        }

    如果流程执行到这里,说明produce已经执行了shutdown操作,准备执行停止producer的操作。
        log.debug("Beginning shutdown of Kafka producer I/O thread,

                  sending remaining records.");

        // okay we stopped accepting requests but there may still be
        // requests in the accumulator or waiting for acknowledgment,
        // wait until these are completed.
        
    while (!forceClose && (this.accumulator.hasUnsent() || 

                    this.client.inFlightRequestCount() > 0)) {

    如果当前的accumulator的缓冲区还有数据没有被处理,同时networkClient中还有正在进行的请求,迭代执行run函数,直到数据被全部发送完成。
            try {
                run(time.milliseconds());
            catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: "e);
            }
        }
        if (forceClose) {
            
    this.accumulator.abortIncompleteBatches();
        }

    关闭网络连接。
        try {
            this.client.close();
        catch (Exception e) {
            log.error("Failed to close network client"e);
        }

        log.debug("Shutdown of Kafka producer I/O thread has completed.");
    }

     

    执行缓冲区数据的发送操作的函数:


    /**
     * Run a single iteration of sending
     * 
     * 
    @param now
     
    *            The current POSIX time in milliseconds
     */
    public void run(long now) {

    得到当前cluster中所有的broker节点的连接信息。
        Cluster cluster = metadata.fetch();

     

    这里从缓冲区中的所有的partition的batch中进行计算,取出已经准备好的需要进行发送的broker的节点集合,具体流程:

    1,对缓冲区中batchs集合进行迭代,取出每个partition对应的双端队列(存储数据缓存的batch),

    2,如果partition在cluster对应的partitionsByTopicPartition集合中存在,表示这个topic的metadata已经被加载过来,得到这个partition的leader,

    3,如果partition的leader不存在,

           设置这个函数返回ReadyCheckResult类型的unknownLeadersExist值为true.

    4,如果迭代的partition的leader存在,取出这个partition的队列中的第一个batch,如果这个batch存在,表示有缓存的数据,

    4,1检查这个batch是否已经被提交过,重试次数大于0,

          同时上一次重试的时间已经大于了retry.backoff.ms(默认100ms)配置的等待时间,

         把这个partition的leader添加到返回的ReadyCheckResult实例中的readyNodes集合中。

             (readyNodes是一个set集合)

    4,2如果这个partition对应的队列中已经缓存有超过一个以上的batch,

    或者说有batch的缓存大小已经达到了batchSize的配置大小时,

    把这个leader添加到readyNodes中。

    4,3如果这个partition的队列中有batch已经达到了linger.ms(默认值0)配置的等待时间,

        把这个leader添加到readyNodes中。

    5,这个返回的ReadyCheckResult实例中,属性nextReadyCheckDelayMs的值,表示要delay到的下一次时间,也就是下一次执行的wait时间,如果当前的所有的batch中没有超过等待时间时(retry.backoff.ms/linger.ms),也就是当前执行时间与等待时间的差值。
        // get the list of partitions with data ready to send
        
    RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster

                   now);

     如果上面的执行返回的结果中unknownLeadersExist属性值为true,表示topic的metadata还没有被加载过来(这个情况一般不会发生),标记metadata需要被更新。
        
    if (result.unknownLeadersExist)
            this.metadata.requestUpdate();

    对返回的结果集中readyNodes集合中准备好的节点进行迭代,这个while的迭代中主要执行如下的流程:

    1,通过NetworkClient检查这个node是否已经被连接,同时metadata还没有达到需要更新的时间,同时连接队列中个数数小于max.in.flight.requests.per.connection配置的连接个数。那么这个node会被保留,

    2,如果当前迭代的node的连接已经超时,或者metadata需要被更新,或者node对应的broker还没有被创建连接,移出这个node.
        // remove any nodes we aren't ready to send to
        
    Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(nodenow)) {
                iter.remove();

    如果connection是已经被关闭掉的连接,this.client.connectionDelay(nodenow)返回的timeout是reconnect.backoff.ms(50ms)配置的值。
                notReadyTimeout = Math.min(notReadyTimeout

                   this.client.connectionDelay(nodenow));
            }
        }

     

    流程执行到这里时,result中的readyNodes集合中包含的是已经与broker创建有连接的node的信息。

    这里根据可以发起连接的broker的nodes集合,迭代每个node中的所有的partition的队列,

    取出这个队列中的第一个recordBatch(如果这个batch已经发送失败过一次,同时还没到重试的时间间隔,跳过这个batch),关闭这个batch(表示这个batch不能在写入)同时把这个batch添加到要返回的map集合中,这个迭代的过程直到找完所有的node中对应的partition中队列的第一个元素,

    或者达到max.request.size配置的最大的请求的消息字节数的大小为结束。
        // create produce requests
        
    Map<IntegerList<RecordBatch>> batches = this.accumulator.drain(cluster,
                                                                         result.readyNodes,
                                                                         this.maxRequestSize,
                                                                         now);

    这里根据request.timeout.ms(默认30秒)配置的请求超时时间,得到缓冲区中所有请求超时的batch的集合(通过batch的最后一次写入消息的时间来判断是否达到了超时时间),如果发现batch已经起时,从缓冲区中移出这个batch,并回收这个batch对应的buffer.
        List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(

                 this.requestTimeoutclusternow);
        // update sensors
        
    for (RecordBatch expiredBatch : expiredBatches)
            this.sensors.recordErrors(expiredBatch.topicPartition.topic()

                    expiredBatch.recordCount);

        sensors.updateProduceRequestMetrics(batches);

     

    根据每个broker对应的partition的batch的消息集合,生成对应的ProduceRequest请求,

    这个请求每一个broker生成一个请求,这个请求中包含了这个broker中所有的partition的buffer的集合。
        List<ClientRequest> requests = createProduceRequests(batchesnow);

     

    这里计算出下一次执行需要的等待间隔,根据retry.backoff.ms/linger.ms配置的时间,如果说这次需要进行提交数据到指定的broker的readyNodes的集合大于0,设置这个间隔时间为0.
        
    long pollTimeout = Math.min(result.nextReadyCheckDelayMsnotReadyTimeout);
        if (result.readyNodes.size() > 0) {
            log.trace("Nodes with data ready to send: {}"result.readyNodes);
            log.trace("Created {} produce requests: {}"requests.size()requests);
            pollTimeout = 0;
        }

     

    迭代每一个broker的Produce的请求,通过NetworkClient向每一个broker发送对应的请求。
        for (ClientRequest request : requests)
            client.send(requestnow);

    检查是否需要更新metadata,如果需要,重新向broker发送metadata的请求,并更新metadata.

    接收请求的响应信息,并调用对应的callback函数。

    检查连接中是否有超过指定的时间connections.max.idle.ms(默认9分钟)没有活动的broker连接,如果有,关闭这个连接。
        
    this.client.poll(pollTimeoutnow);
    }

    展开全文
  • apache kafka技术内幕 和 apacke kafka源码分析2本PDF 电子书 网盘下载
  • kafka源码分析之consumer的源码

    千次阅读 2016-07-27 10:44:33
    kafka源码分析之kafka-consumer接收log消息的源码

    Consumerclient

    示例代码

    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG"localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG"DemoConsumer");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG"false");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG"30000");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG

            "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG

            "org.apache.kafka.common.serialization.ByteArrayDeserializer");

     

     

    consumer = new KafkaConsumer<>(props);
    this.topic = topic;

     

    consumer.subscribe(Collections.singletonList(this.topic));

    //下面的传入一个listener这个部分的注释如果需要对partition在当前的consumer中分配或者取消分配时做一定的操作时(比如取消分配时提交offset),可以实现这个接口。

    //subscribe(List<String> topicsConsumerRebalanceListener listener)

    while(true) {
       ConsumerRecords<IntegerString> records = consumer.poll(1000);
       for (ConsumerRecord<IntegerString> record : records) {
         System.out.println("Received message: (" + record.key() + ", " + record.value() 

             + ") at offset " + record.offset());

          
       }

     

       consumer.commitSync()

    }

     

    生成KafkaConsumer实例


    @SuppressWarnings("unchecked")
    private KafkaConsumer(ConsumerConfig config,
                          Deserializer<K> keyDeserializer,
                          Deserializer<V> valueDeserializer) {
        try {
            log.debug("Starting the Kafka consumer");

     

    根据配置信息,得到如下三个配置的配置值,并检查配置的合法:

    1,读取request.timeout.ms配置项的值,默认值为40秒。用于配置请求的超时时间。

    2,读取session.timeout.ms配置项的值,默认值为30秒,用于配置当前的consumer的session的超时时间,也就是client端多长时间不给server发送心跳就表示这个client端超时。

    3,读取fetch.max.wait.ms配置项的值,默认值为500ms。用于配置从server中读取数据最长的等待时间。
            this.requestTimeoutMs = config.getInt(

                    ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
            int sessionTimeOutMs = config.getInt(

                    ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
            int fetchMaxWaitMs = config.getInt(

                    ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);

     

    如果请求超时时间不是一个大于session的超时时间的值或者请求超时时间不是一个大于fetch的最大等待时间的值时,表示requestTimeoutMs的配置不合法,直接throw exception.
            if (this.requestTimeoutMs <= sessionTimeOutMs ||

                    this.requestTimeoutMs <= fetchMaxWaitMs)
                throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG 

                   " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG 

                   " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);

     

            this.time new SystemTime();

            MetricConfig metricConfig = new MetricConfig().samples(

                        config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
                    .timeWindow(config.getLong(

                            ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
                            TimeUnit.MILLISECONDS);

     

    这里得到对应的consumer的client端id的client.id配置,如果这个值没有配置时,默认随机生成一个。
            clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
            if (clientId.length() <= 0)
                clientId "consumer-" CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();


            List<MetricsReporter> reporters = config.getConfiguredInstances(

                    ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                    MetricsReporter.class);
            reporters.add(new JmxReporter(JMX_PREFIX));
            this.metrics new Metrics(metricConfigreporterstime);

     

    读取retry.backoff.ms配置的值,默认值为100ms,用于配置重试的间隔周期。

            this.retryBackoffMs = config.getLong(

                  ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);

     

    根据重试的间隔周期加上metadata.max.age.ms配置项的值生成Metadata实例,

    配置metadata.max.age.ms项默认值为5分钟,用于设置metadata定期重新读取的生命周期。
            this.metadata new Metadata(retryBackoffMs

                  config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG));

     

    读取bootstrap.servers配置的要读取的kafka brokers的配置列表,并根据broker的连接信息,生成Cluster实例,并把Cluster实例更新到metadata的实例。
            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(

                  config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
            this.metadata.update(Cluster.bootstrap(addresses)0);


            String metricGrpPrefix = "consumer";
            Map<StringString> metricsTags = new LinkedHashMap<StringString>();
            metricsTags.put("client-id"clientId);

     

    生成NetworkClient的实例,生成实例需要如下几个配置文件:

    1,配置项connections.max.idle.ms,默认值9分钟,用于配置连接最大的空闲时间(每个连接的最大连接队列为100)。

    2,配置项reconnect.backoff.ms,默认值50ms,用于配置连接断开后重新连接的间隔时间。

    3,配置项send.buffer.bytes,默认值128kb,用于配置SOCKET的SO_SNDBUF发送数据的缓冲区大小。

    4,配置项receive.buffer.bytes,默认值32kb,用于配置SOCKET的SO_RCVBUF接收数据的缓冲区大小。

    5,读取request.timeout.ms配置项的值,默认值为40秒。用于配置请求的超时时间。
            ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(

                   config.values());
            NetworkClient netClient = new NetworkClient(

                    new Selector(config.getLong(

                       ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),

                       metricstimemetricGrpPrefixmetricsTagschannelBuilder),
                    this.metadata,
                    clientId,
                    100// a fixed large enough value will suffice
                    
    config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                    config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
                    config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
                    config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG)time);

     

    根据retry.backoff.ms配置的值,生成一个ConsumerNetworkClient的实例。
            this.client new ConsumerNetworkClient(netClientmetadatatime

                     retryBackoffMs);

     

    读取auto.offset.reset配置项的值,默认值为latest。可配置("latest""earliest""none"),这个配置用于在读取partition的offset超出范围时,对offset进行重置的规则。
            OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(

                        config.getString(

                        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase());

     

    生成用于管理订阅的topic的partition的状态管理的组件,用于管理partition的状态与当前的offset的信息。
            this.subscriptions new SubscriptionState(offsetResetStrategy);

     

    生成用于管理相同的一个groupId下的多个client端的partition的分区控制,

    通过partition.assignment.strategy配置,默认实例为RangeAssignor
            List<PartitionAssignor> assignors = config.getConfiguredInstances(
                    ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                    PartitionAssignor.class);

     

    生成用于对consumer进行协调的实例,这个实例依赖如下配置:

    1,配置项group.id,用于配置consumer对应的订阅的组名称,相同的组的多个client端进行协调消费处理。

    2,读取session.timeout.ms配置项的值,默认值为30秒,用于配置当前的consumer的session的超时时间,也就是client端多长时间不给server发送心跳就表示这个client端超时。

    3,配置项heartbeat.interval.ms,默认值3秒,用于定时向server发送心跳的时间间隔。

    4,根据retry.backoff.ms配置的值来设置读取信息失败的重试间隔。

    5,配置项enable.auto.commit,默认值true,设置是否自动提交消费过的offset的值的设置。

    5,配置项auto.commit.interval.ms,默认值5秒,如果设置有自动提交offset时,自动提交的间隔时间。
            this.coordinator new ConsumerCoordinator(this.client,
                    config.getString(ConsumerConfig.GROUP_ID_CONFIG),
                    config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
                    config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
                    assignors,
                    this.metadata,
                    this.subscriptions,
                    metrics,
                    metricGrpPrefix,
                    metricsTags,
                    this.time,
                    retryBackoffMs,
                    new ConsumerCoordinator.DefaultOffsetCommitCallback(),
                    config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
                    config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));

     

    根据key.deserializer配置与value.deserializer配置的key,value的反序列化的配置,生成反序列化消息的实例。这个类必须是实现Deserializer接口的类。
            if (keyDeserializer == null) {
                this.keyDeserializer = config.getConfiguredInstance(

                        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                        Deserializer.class);
                this.keyDeserializer.configure(config.originals()true);
            else {
                config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
                this.keyDeserializer = keyDeserializer;
            }


            if (valueDeserializer == null) {
                this.valueDeserializer = config.getConfiguredInstance(

                        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                        Deserializer.class);
                this.valueDeserializer.configure(config.originals()false);
            else {
                config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
                this.valueDeserializer = valueDeserializer;
            }

     

    生成用于具体读取消息的读取实例,这个实例依赖如下几个配置信息:

    1,配置项fetch.min.bytes,默认值1,用于设置每次读取的最小byte数。

    2,配置项fetch.max.wait.ms,默认值500ms,用于设置每次读取的最大等待时间。

    3,配置项max.partition.fetch.bytes,默认值1MB,用于设置每个partition每次读取的最大的数据量。

    4,配置项check.crcs,默认值true,用于设置是否校验数据的完整性。

    根据retry.backoff.ms配置的值来设置读取信息失败的重试间隔。
            this.fetcher new Fetcher<>(this.client,
                    config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
                    config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
                    config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
                    config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
                    this.keyDeserializer,
                    this.valueDeserializer,
                    this.metadata,
                    this.subscriptions,
                    metrics,
                    metricGrpPrefix,
                    metricsTags,
                    this.time,
                    this.retryBackoffMs);

            config.logUnused();
            AppInfoParser.registerAppInfo(JMX_PREFIXclientId);

            log.debug("Kafka consumer created");
        catch (Throwable t) {
            // call close methods if internal objects are already constructed
            // this is to prevent resource leak. see KAFKA-2121
            
    close(true);
            // now propagate the exception
            
    throw new KafkaException("Failed to construct kafka consumer"t);
        }
    }

     

     

    订阅topic

    要对一个topic进行订阅时,在consumer生成后,通过调用如下的函数来进行订阅,

    第二个参数是对一个partition的分配与取消分析时的监听操作,可用于监听对分配到当前的consumer或者取消时的其它操作。

    public void subscribe(List<String> topicsConsumerRebalanceListener listener) {
        acquire();
        try {
            if (topics.isEmpty()) {
                // treat subscribing to empty topic list as the same as unsubscribing
                
    this.unsubscribe();
            else {
                log.debug("Subscribed to topic(s): {}"Utils.join(topics", "));

     

    通过对订阅的状态管理的组件对需要订阅的topic进行处理。

    把订阅的topic集合添加到subscriptions实例中的subscription集合与groupSubscription集合中,设置needsPartitionAssignment属性的值为true,表示需要重新分配partition在当前的consumer中。
                this.subscriptions.subscribe(topicslistener);

     

    把订阅的topic集合添加到metadata的topics的集合中,如果传入的topic集合有不包含在当前的metadata中的topic时,表示metadata需要更新,设置metadata的needUpdate属性为true.
                metadata.setTopics(subscriptions.groupSubscription());
            }
        } finally {
            release();
        }
    }

     

    读取订阅的topic的消息

    外部调用入口

    读取订阅的topic的消息时通过KafkaConsumer中的poll函数来进行读取,这个函数需要传入一个timeout的时间值,必须是一个大于0的值。

    @Override
    public ConsumerRecords<KV> poll(long timeout) {
        acquire();
        try {
            if (timeout < 0)
                throw new IllegalArgumentException("Timeout must not be negative");

            // poll for new data until the timeout expires
            
    long start = time.milliseconds();
            long remaining = timeout;

     

    这里根据设置的超时时间,读取订阅的topic中对应此consumer分配的partition的数据,每次读取根据读取的时长,来计算这个while可用于超时的剩余等待时间,如果读取到数据,就直接返回这个数据,while迭代停止,否则等到指定的超时时间到达。
            do {

    这里检查group是否被加入到kafka leader中,检查订阅的topic是否被分配对应的消费的partitions,

    从当前的cache中读取records的消息记录,如果读取不到,通过fetcher发起一个请求,并重新读取。

    这里如果cache中没有数据时,会先通过fetcher.initFetchers进行一次数据的fetch的消费请求,最长等待的超时时间就是传入的剩余的超时时间。如果没有读取到消费,这里返回的值是一个空的集合。
                Map<TopicPartitionList<ConsumerRecord<KV>>> records = 

                        pollOnce(remaining);
                if (!records.isEmpty()) {

    如果当前的cache中已经被到数据(或者cache中没有数据也会发起一次请求,并等待响应结果),

    这里重新向server进行fetch数据的请求,用于异步向server发起一个请求,并把请求的结果数据存储到cache中,下一次fetchRecords时,直接使用这个cache的数据。
                    
    fetcher.initFetches(metadata.fetch());
                    client.quickPoll();
                    return new ConsumerRecords<>(records);
                }

                long elapsed = time.milliseconds() - start;
                remaining = timeout - elapsed;
            while (remaining > 0);

     

    如果在指定的超时时间内没有读取到数据,直接返回一个空的集合。
            return ConsumerRecords.empty();
        finally {
            release();
        }
    }

     

    读取要消费的记录

    在外部调用poll函数时,会通过pollOnce函数来得到一组要消费的数据。
    private Map<TopicPartitionList<ConsumerRecord<KV>>> pollOnce(long timeout) {

     

    这里首先检查与kafka leader中的consumerCoordinator的实例进行交互的client是否初始化,如果没有或者连接被断开,这里需要重新对连接进行初始化操作。

    可参见consumer的group coordinator中的检查并初始化coordinator的连接部分。
        // TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
        
    coordinator
    .ensureCoordinatorKnown();

    如果consumer是直接针对一个topic进行的订阅时,partitionsAutoAssigned函数返回的值是true.

    如果这个函数返回的值是true时,通过coordinator执行partition的分配。

    这里通过ConsumerCoordinator中的consumer的group加入部分来进行处理,会执行如下操作:

    1,向coordinator对应的broker发起joinGroup的请求,

    2,根据joinGroup得到的响应如果是leader时,对当前的group所有的consumers进行订阅的topic的partition的分配,

    3,向coordinator进行syncGroup的操作,把分配的每一个consumer对应的partition同步到broker中。
        // ensure we have partitions assigned if we expect to
        
    if (subscriptions.partitionsAutoAssigned())
            coordinator.ensurePartitionAssignment();

    这里检查是否有这个consumer需要消费的partition集合中,是否有没有初始化需要fetch的offset的partition,如果有,执行updateFetchPositions来更新这些个partition.

    得到当前消费的partition对应的offset的位置的函数处理流程:

    1,检查subscriptions(订阅的状态)中needsFetchCommittedOffsets属性是否为true,如果为true表示有partition需要更新fetch的offset的值。

    2,根据当前的groupId与这个consumer对应的partitions发起一个OffsetFetchRequest请求向coordinator对应的broker节点并得到每个partition响应的offset的metadata信息,如果这个partition对应的消费记录的offset已经过期或者不存在是,返回的是一个NoOffset的实例(offset为-1)。

    3,根据得到的partition的集合对应的offset的值,

        更新到partition的状态管理TopicPartitionState实例的committed属性中。这里需要注意的是如果对应的partition返回的offset是NoOffset的实例时,offset的值为-1,这个时候不会被设置到committed的属性中,这个时候默认情况下committed的值还是一个null值。

    4,,设置subscriptions对应的订阅状态管理中的needsFetchCommittedOffsets属性的值为false.表示fetchCommittedoffset的过程已经被执行在当前分配的partitions中。

    5,根据得到的committed的offset的值,迭代分配的partition的集合,设置每一个partition开始进行数据读取的position的值,

    5,1,如果partition对应的管理状态的TopicPartitionState实例中awaitingReset属性值为true,这种情况表示这个partition没有得到对应的partition的offset(过期或第一次消费,或者说这个offset超过了当前partition中log的offset的范围),这个时候向这个partition对应的leader节点发起一个ListOffsetRequest(其实就是OffsetRequest)请求,这个请求直接得到当前partition的log中最大或者最小的offset的值。得到具体最大或是最小的offset的值通过auto.offset.reset配置项的值,默认值为latest

    执行seek函数,更新这个partition对应的TopicPartitionState实例中position的值为请求返回的offset值,设置awaitingReset属性值为false,设置hasValidPosition属性值为true,表示这个partition已经seek到指定要读取的位置。seek作用于把partition的offset设置到一个开始读取的位置上。

    5,2,如果partition对应的TopicPartitionState状态实例中committed属性值为null,表示这个partition是第一次消费或者说这个partition的上次消费时间已经达到过期的时间,被删除掉了,设置awaitingReset属性值为true,表示这个partition需要重置offset.执行offset的重置操作,通过resetOffset函数。向这个partition对应的leader节点发起一个ListOffsetRequest(其实就是OffsetRequest)请求,这个请求直接得到当前partition的log中最大或者最小的offset的值。得到具体最大或是最小的offset的值通过auto.offset.reset配置项的值,默认值为latest

    执行seek函数,更新这个partition对应的TopicPartitionState实例中position的值为请求返回的offset值,设置awaitingReset属性值为false,设置hasValidPosition属性值为true,表示这个partition已经seek到指定要读取的位置。seek作用于把partition的offset设置到一个开始读取的位置上。

    5,3,这个情况表示committed属性的值是一个正常的offset的值,得到这个committed的offset的值,执行seek函数,把position属性的值设置为这个committed的offset的值。设置awaitingReset属性值为false,设置hasValidPosition属性值为true,表示这个partition已经seek到指定要读取的位置。
        // fetch positions if we have partitions we're subscribed to that we
        // don't know the offset for
        
    if (!subscriptions.hasAllFetchPositions())
            updateFetchPositions(this.subscriptions.missingFetchPositions());

        // init any new fetches (won't resend pending fetches)
        
    Cluster cluster = this.metadata.fetch();
        Map<TopicPartitionList<ConsumerRecord<KV>>> records = 

                 fetcher.fetchedRecords();

        
    if (!records.isEmpty()) {
            return records;
        }

        fetcher.initFetches(cluster);
        client.poll(timeout);
        return fetcher.fetchedRecords();
    }

     

    Fetchercache中读取已经加载到的数据

    这个部分在consumer在执行poll函数->pollOnce函数时,会先从通过Fetcher实例中的fetchedRecords函数来从cache中读取已经加载过来的消息。下面来看看这个函数的处理流程:

    public Map<TopicPartitionList<ConsumerRecord<KV>>> fetchedRecords() {
        if (this.subscriptions.partitionAssignmentNeeded()) {

    如果订阅的状态中needsPartitionAssignment属性的值为true表示partition需要重新进行分配,直接返回一个空的集合。
            return Collections.emptyMap();
        else {
            Map<TopicPartitionList<ConsumerRecord<KV>>> drained = new HashMap<>();

    1,这里检查是否有position的值超出了对应的partition的offset的范围,

      迭代offsetOutOfRangePartitions集合,这个集合中存储了上一次超出范围的partition的offset的集合,取出当前这个partition对应的position的值,如果这个值与这个集合中对应的partition的offset的值相同时,throw一个OffsetOutOfRangeException exception.同时清空这个offsetOutOfRangePartitions集合。
            throwIfOffsetOutOfRange();

    2,检查是否有未认证通过的topics的集合(unauthorizedTopics不为空),

      如果有throw一个TopicAuthorizationException的exception,

      同时清空TopicAuthorizationException这个集合。
            throwIfUnauthorizedTopics();

    3,检查recordTooLargePartitions集合(存储有消息记录超过指定大小的partition的集合)是否有值,如果有值,表示有消息大小超出了限制,throw一个RecordTooLargeException的exception.同时清空这个集合。
            throwIfRecordTooLarge();

    流程执行到这里,表示上面的3个检查都已经完成,消息记录正常,读取cache中已经加载过来的消息,并迭代每一条消息进行包装并返回。
            for (PartitionRecords<KV> part : this.records) {

    这里先检查对应这条消息的partition是否在当前的consumer是一个分配的partition,如果不是,直接continue,这个情况有可能发生在cache完成后,partition进行了重新分配,原来在这个consumer进行消费的partition被rebalance到其它的consumer中。
                if (!subscriptions.isAssigned(part.partition)) {
                    
    log.debug("Not returning fetched records for partition {} since it is 

                       no longer assigned"part.partition);
                    continue;
                }

    得到当前这个partition消息到的position的值,也就是开始的offset的值。
                // note that the consumed position should always be available
                // as long as the partition is still assigned
                
    long position = subscriptions.position(part.partition);

    如果当前的partition的不是处理fetchable的状态(被暂停或者position没有准备好),打印一个提示日志。
                if (!subscriptions.isFetchable(part.partition)) {
                    
    log.debug("Not returning fetched records for assigned partition {} 

                         since it is no longer fetchable"part.partition);
                else if (part.fetchOffset == position) {

    这种情况下,表示当前这个partition中开始进行fetch的offset的值与当前consumer中存储的position的位置是一个相同的值,得到这个partition中加载到的所有的消息中的最后一条消息的offfset,并把这个offset加一,得到的nextOffset的值就是下一次要读取的开始位置。
                    long nextOffset = part.records.get(

                             part.records.size() - 1).offset() + 1;

                    log.trace("Returning fetched records at offset {} for assigned 

                             partition {} and update " +
                            "position to {}"positionpart.partitionnextOffset);


    把这个partition已经加载过来的消息的集合添加到要返回的map中这个partition对应的位置。
                    List<ConsumerRecord<KV>> records = drained.get(part.partition);
                    if (records == null) {
                        records = part.records;
                        drained.put(part.partitionrecords);
                    else {
                        records.addAll(part.records);
                    }

    更新这个对应的partition的下一次读取数据的开始位置为nextOffset的值,这个nextOffset其实就是当前partition已经加载到的数据的最后一条记录的offset的值加一。
                    subscriptions.position(part.partitionnextOffset);
                else {
                    
    log.debug("Ignoring fetched records for {} at offset {} since the 

                            current position is {}",
                            part.partitionpart.fetchOffsetposition);
                }
            }

    最后,清空cache,并返回对cache进行处理后的集合,这个集合中存储了每个partition对应的读取到的消息集合。
            this.records.clear();
            return drained;
        }
    }

    异步加载数据到cunsumer的缓存

    在每次执行poll操作来加载数据时,从cache中读取完成数据后,或者说当前的cache中没有数据时,会通过Fetcher中的initFetchers函数来进行数据的异步加载操作。

     

    下面先看看这个函数的发起fetch的请求部分

    public void initFetches(Cluster cluster) {

    首先根据cluster中所有的节点生成FetchRequest的请求,这里迭代当前的consumer中所有要消息的partition并得到对应的node节点,根据node节点中包含的partition(leader的副本对应的节点)为key,value是这个节点中所有的partition的节点的请求信息,每一个节点生成一个FetchRequest的信息(包含要消费的partition对应的node),

    如果partition对应的leader的node在consumer的metadata中不存在,表示这个元数据需要被更新,设置metadata实例中的needUpdate属性为true,并跳过这个节点的分配,

    这里的createFetchRequests函数中返回的只包含目前metadata中包含有元数据的节点的集合。
        for (Map.Entry<NodeFetchRequest> fetchEntry: 

                     createFetchRequests(cluster).entrySet()) {
            final FetchRequest fetch = fetchEntry.getValue();

     

    向所有的已经连接的node节点发起FETCH的请求。如果请求成功,执行handleFetchResponse的函数处理来对响应的结果进行解析。
            client.send(fetchEntry.getKey()ApiKeys.FETCHfetch)
                    .addListener(new RequestFutureListener<ClientResponse>() {
                        @Override
                        public void onSuccess(ClientResponse response) {
                            handleFetchResponse(responsefetch);
                        }

                        @Override
                        public void onFailure(RuntimeException e) {
                            log.debug("Fetch failed"e);
                        }
                    });
        }
    }

     

    接下来 看看fetch请求响应成功的响应结果的处理函数
    private void handleFetchResponse(ClientResponse respFetchRequest request) {
        int totalBytes = 0;
        int totalCount = 0;
        FetchResponse response = new FetchResponse(resp.responseBody());

     

    这里迭代这个broker的响应信息,每一个partition的响应表示一次迭代。
        for (Map.Entry<TopicPartitionFetchResponse.PartitionData> entry : 

                     response.responseData().entrySet()) {
            TopicPartition tp = entry.getKey();
            FetchResponse.PartitionData partition = entry.getValue();


            if (!subscriptions.isFetchable(tp)) {
                
    log.debug("Ignoring fetched records for partition {} since it is no longer 

                         fetchable"tp);
            else if (partition.errorCode == Errors.NONE.code()) {

    这个情况,表示partition正常fetch到了数据,对fetch到的数据进行解析,并进行cache操作。

    得到这个请求开始的offset,也就是上一次fetch时记录下来的position的值。
                long fetchOffset = request.fetchData().get(tp).offset;

     

    得到当前订阅中对应这个partition的position的值,这个值必须与这一次请求的offset的值相同,因为fetch的请求是通过这个position当成的开始的offset的值,如果这个值不相同,表示这个线程中间可能产生了其它的并发问题。
                
    Long position = subscriptions.position(tp);
                if (position == null || position != fetchOffset) {
                    log.debug("Discarding fetch response for partition {} since its offset 

                            {} does not match " +
                            "the expected offset {}"tpfetchOffsetposition);
                    continue;
                }

    对这个partition响应的消息集合进行解析,并迭代这个消息集合,把每一条消息添加到一个临时的parsed的集合中,当对这个响应的消息集合迭代完成后,这个parsed的临时集合不为空时,把这个集合添加到consumer的cache的records集合中,记录这个partition开始fetch的offset,对应的partition与消息集合。

    如果迭代完成后,响应消息的buffer中还有未进行处理的数据,表示这次请求响应的数据大于了设置的最大的响应消息的大小,把这个partition添加到recordTooLargePartitions集合中,表示这个partition这次的消息响应结果太大。
                int bytes = 0;
                ByteBuffer buffer = partition.recordSet;
                MemoryRecords records = MemoryRecords.readableRecords(buffer);
                List<ConsumerRecord<KV>> parsed = new ArrayList<>();
                for (LogEntry logEntry : records) {
                    // Skip the messages earlier than current position.
                    
    if (logEntry.offset() >= position) {
                        parsed.add(parseRecord(tplogEntry));
                        bytes += logEntry.size();
                    }
                }

                if (!parsed.isEmpty()) {
                    log.trace("Adding fetched record for partition {} with offset {} to 

                            buffered record list"tpposition);


                    ConsumerRecord<KV> record = parsed.get(parsed.size() - 1);
                    this.records.add(new PartitionRecords<>(fetchOffsettpparsed));
                    this.sensors.recordsFetchLag.record(

                           partition.highWatermark - record.offset());
                else if (buffer.limit() > 0) {
                    // we did not read a single message from a non-empty buffer
                    // because that message's size is larger than fetch size, in this case
                    // record this exception
                    
    this.recordTooLargePartitions.put(tpfetchOffset);
                }

                this.sensors.recordTopicFetchMetrics(tp.topic()bytesparsed.size());
                totalBytes += bytes;
                totalCount += parsed.size();
            

     

    如果执行fetch操作,对应这个fetch的partition响应的代码是如下两个代码时,表示这个partition的leader发生了变化,设置metadata的needUpdate属性为true,表示需要重新更新这个topic对应的metadata的信息。

            else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
                  || partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
                this.metadata.requestUpdate();
            

     

    如果执行fetch操作时,请求过去的position的offset超出了这个partition的offset的范围时,

    如果auto.offset.reset配置的值不是NONE时,设置这个partition的状态中的awaitingReset属性为true,设置position的值为Null,设置hasValidPosition属性为false,设置重置offset时,读取offset的值LATEST是或者是EARLIEST,通过得到auto.offset.reset配置的值。表示这个partition需要重置offset.否则把这个partition添加到offsetOutOfRangePartitions集合中,表示这个partition的offset请求超出了log中offset的范围。

            else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) {
                long fetchOffset = request.fetchData().get(tp).offset;
                if (subscriptions.hasDefaultOffsetResetPolicy())
                    subscriptions.needOffsetReset(tp);
                else
                    this
    .offsetOutOfRangePartitions.put(tpfetchOffset);
                log.info("Fetch offset {} is out of range, resetting offset"fetchOffset);
            

     

    如果响应的代码是TOPIC_AUTHORIZATION_FAILED代码时,表示当前的consumer没有操作这个partition对应的topic,直接把这个partition对应的topic添加到unauthorizedTopics集合中。

            else if (partition.errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) {
                log.warn("Not authorized to read from topic {}."tp.topic());
                unauthorizedTopics.add(tp.topic());
            else if (partition.errorCode == Errors.UNKNOWN.code()) {
                log.warn("Unknown error fetching data for topic-partition {}"tp);
            else {
                throw new IllegalStateException("Unexpected error code " 

                       partition.errorCode " while fetching data");
            }
        }


        this.sensors.bytesFetched.record(totalBytes);
        this.sensors.recordsFetched.record(totalCount);
        this.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
        this.sensors.fetchLatency.record(resp.requestLatencyMs());
    }

     

    ConsumerNetworkClient中的poll函数

    这个函数用于具体的请求的发送操作,传入参数中的第三个参数表示是否需要计算并执行consumer中定义的一些定时器任务,比如心跳与设置有自动提交offset

    private void poll(long timeoutlong nowboolean executeDelayedTasks) {

    通过trySend来执行请求队列中需要发送的所有的请求。在这个过程中,如果请求对应的broker的节点还没有建立连接或者连接失效,会重新创建这个broker的连接。
        // send all the requests we can send now
        
    trySend(now);

        // ensure we don't poll any longer than the deadline for
        // the next scheduled task
        
    timeout = Math.min(timeoutdelayedTasks.nextTimeout(now));

     

    这里的clientPoll函数处理几下几个操作:

    1,如果metadata被标记为需要更新时,执行metadata的更新操作重新读取这个consumer中订阅的所有的topic的metadata的信息。

    2,对发送成功,接收到broker响应,连接被关闭,连接被打开,请求超时的selector进行处理,如果处理的是连接被关闭或者请求超时时,会关闭对应的node的连接,同时设置metadata的needUpdate属性为true,这个时候需要重新更新metadata.
        clientPoll(timeoutnow);
        now = time.milliseconds();

    这里处理node连接失败的节点的请求,把这个节点的请求移出。
        
    checkDisconnects(now);

    检查是否有定时间隔达到需要进行执行的定时器任务,如果有执行这个任务。
        // execute scheduled tasks
        
    if (executeDelayedTasks)
            delayedTasks.poll(now);

        // try again to send requests since buffer space may have been
        // cleared or a connect finished in the poll
        
    trySend(now);

    对失败的请求进行处理,主要是调用对应请求的回调函数来进行处理。
        // fail all requests that couldn't be sent
        
    failUnsentRequests();
    }

    consumergroup coordinator

    ConsumerCoordinator实例生成

    在KafkaConsumer实例生成时,会生成一个ConsumerCoordinator的实例,这个实例用于管理group的加入,partition的分配,当前的client的心跳管理 。

    在KafkaConsumer实例生成时:

    这个实例依赖如下配置:

    1,配置项group.id,用于配置consumer对应的订阅的组名称,相同的组的多个client端进行协调消费处理。

    2,读取session.timeout.ms配置项的值,默认值为30秒,用于配置当前的consumer的session的超时时间,也就是client端多长时间不给server发送心跳就表示这个client端超时。

    3,配置项heartbeat.interval.ms,默认值3秒,用于定时向server发送心跳的时间间隔。

    4,根据retry.backoff.ms配置的值来设置读取信息失败的重试间隔。

    5,配置项enable.auto.commit,默认值true,设置是否自动提交消费过的offset的值的设置。

    5,配置项auto.commit.interval.ms,默认值5秒,如果设置有自动提交offset时,自动提交的间隔时间。

    this.coordinator new ConsumerCoordinator(this.client,
            config.getString(ConsumerConfig.GROUP_ID_CONFIG),
            config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
            config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
            assignors,
            this.metadata,
            this.subscriptions,
            metrics,
            metricGrpPrefix,
            metricsTags,
            this.time,
            retryBackoffMs,
            new ConsumerCoordinator.DefaultOffsetCommitCallback(),
            config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
            config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));

     

     

    实例初始化的处理流程:

    super(client,
            groupId,
            sessionTimeoutMs,
            heartbeatIntervalMs,
            metrics,
            metricGrpPrefix,
            metricTags,
            time,
            retryBackoffMs);


    this.metadata = metadata;

    this.metadata.requestUpdate();
    this.metadataSnapshot new MetadataSnapshot();
    this.subscriptions = subscriptions;
    this.defaultOffsetCommitCallback = defaultOffsetCommitCallback;
    this.autoCommitEnabled = autoCommitEnabled;
    this.assignors = assignors;

    这里生成一个用于当metadata的内容发生变化时,也就是当订阅发生变化时,设置订阅状态管理组件中的needsPartitionAssignment属性为true,表示partition需要重新进行分配。
    addMetadataListener();

    根据是否设置有自动提交offset来生成自动提交offset的线程。
    this.autoCommitTask = autoCommitEnabled ? 

           new AutoCommitTask(autoCommitIntervalMs) : null;
    this.sensors new ConsumerCoordinatorMetrics(metricsmetricGrpPrefix

           metricTags);

     

    这个实例的上层实例的初始化:

    public AbstractCoordinator(ConsumerNetworkClient client,
                               String groupId,
                               int sessionTimeoutMs,
                               int heartbeatIntervalMs,
                               Metrics metrics,
                               String metricGrpPrefix,
                               Map<StringString> metricTags,
                               Time time,
                               long retryBackoffMs) {
        this.client = client;
        this.time = time;
        this.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID;
        this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
        this.groupId = groupId;
        this.coordinator null;
        this.sessionTimeoutMs = sessionTimeoutMs;

     

    生成用于发送心跳的管理组件。
        this.heartbeat new Heartbeat(this.sessionTimeoutMsheartbeatIntervalMs

               time.milliseconds());

     

    用于发送心跳的task.
        this.heartbeatTask new HeartbeatTask();
        this.sensors new GroupCoordinatorMetrics(metricsmetricGrpPrefix

                   metricTags);
        this.retryBackoffMs = retryBackoffMs;
    }

     

    这个实例中,默认初始化完成时,rejoinNeeded的值为true,表示需要对group进行加入操作,这个操作用于分配对应的partitions到当前的consumer中。

    这里的rejoinNeeded的属性,如果join成功后,属性会被设置成false,如果失败(join or sync)这个属性会被重新设置为true.

    private boolean needsJoinPrepare true;
    private boolean rejoinNeeded true;

     

    检查并初始化coordinator的连接

    这个检查的过程在KafkaConsumerfetch消息时,通过pollOnce函数进行调用。

    由ConsumerCoordinator中的ensureCoordinatorKnown函数来实现:

    public void ensureCoordinatorKnown() {

    这里的coordinatorUnknown函数用于判断ConsumerCoordinator实例中的coordinator(对应这个groupId的partition的leader的node信息)属性是否为空或者是一个失败的连接,如果是,这里就一直迭代得到这个group的metadata的内容。
        while (coordinatorUnknown()) {

    这里,如果请求不成功,一直迭代。
            RequestFuture<Void> future = sendGroupMetadataRequest();
            client.poll(future);

            if (future.failed()) {
                if (future.isRetriable())
                    client.awaitMetadataUpdate();
                else
                    throw 
    future.exception();
            }
        }
    }

     

    接下来看看sendGroupMetadataRequest函数的流程:

    在这个函数中,通过生成GroupCoordinatorRequest的请求。这个请求的请求key为GroupCoordinator,传入的参数就是当前的consumer对应的消费者的groupId的值。

    private RequestFuture<Void> sendGroupMetadataRequest() {
        // initiate the group metadata request
        // find a node to ask about the coordinator
        
    Node node = this.client.leastLoadedNode();
        if (node == null) {
            
    // from configuration?
            
    return RequestFuture.noBrokersAvailable();
        else {
            // create a group  metadata request
            
    log.debug("Issuing group metadata request to broker {}"node.id());
            GroupCoordinatorRequest metadataRequest = 

                    new GroupCoordinatorRequest(this.groupId);


            return client.send(nodeApiKeys.GROUP_COORDINATORmetadataRequest)
                    .compose(new RequestFutureAdapter<ClientResponseVoid>() {
                        @Override
                        public void onSuccess(ClientResponse response

                                     RequestFuture<Void> future) {
                            handleGroupMetadataResponse(responsefuture);
                        }
                    });
        }
    }

     

    接收对Group的metadata的响应:

    这个操作通过handleGroupMetadataResponse函数来进行处理:

    private void handleGroupMetadataResponse(ClientResponse resp,

                   RequestFuture<Void> future) {
        log.debug("Group metadata response {}"resp);
    如果coordinatorUnknown的返回值是false,表示这个group对应的消费者offset记录的partition的leader已经存在,不处理。
        if (!coordinatorUnknown()) {
            // We already found the coordinator, so ignore the request
            
    future.complete(null);
        else {

     

    这里读取kafka server响应回来的数据,这主要包含两部分数据,响应代码与此groupId对应存储消费元数据的topic的partition对应的leader的kafka server节点。
            GroupCoordinatorResponse groupCoordinatorResponse = 

                     new GroupCoordinatorResponse(resp.responseBody());
            // use MAX_VALUE - node.id as the coordinator id to mimic separate connections
            // for the coordinator in the underlying network client layer
            // 
    TODO: this needs to be better handled in KAFKA-1935
            
    short 
    errorCode = groupCoordinatorResponse.errorCode();

     

    如果server返回来的代码是NONE,表示这是一个没有错误的请求处理,得到这个groupId对应要写入数据的partition的leader的连接信息,并生成这个broker的socket连接。

    把HeartbeatTask(向节点发送心跳的管理组件)加入到调度器中,用于定时向server发送心跳。
            if (errorCode == Errors.NONE.code()) {
                this.coordinator new Node(Integer.MAX_VALUE 

                             groupCoordinatorResponse.node().id(),
                        groupCoordinatorResponse.node().host(),
                        groupCoordinatorResponse.node().port());

                client.tryConnect(coordinator);

                // start sending heartbeats only if we have a valid generation
                
    if (generation 0)
                    heartbeatTask.reset();
                future.complete(null);
            else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
                future.raise(new GroupAuthorizationException(groupId));
            else {
                future.raise(Errors.forCode(errorCode));
            }
        }
    }

    管理与group对应的partitionleader节点的心跳

    当group加入并成功完成对partitions的分配后,会启动用于心跳管理的HeartbeatTask定时器。

    当执行ConsumerNetworkClient的poll函数时,如果executeDelayedTasks参数传入是true时,会检查当前的执行时间是否达到了task的超时时间的设置,如果达到这个设置时,会执行这个task的run函数来处理对应的操作。

    public void run(final long now) {

    如果joinGroup没有成功,或者需要进行重新join或者coordinator对应的broker节点的连接失效时,直接返回,不处理。
        if (generation || needRejoin() || coordinatorUnknown()) {
            // no need to send the heartbeat we're not using auto-assignment or if we are
            // awaiting a rebalance
            
    return;
        }

    如果heartbeat实例中,最后一次发送消息的时间已经超过了超时的设置时间,直接停止掉与coordinator的连接,这种情况下表示coordinator对应的broker节点失效了。
        if (heartbeat.sessionTimeoutExpired(now)) {
            // we haven't received a successful heartbeat in one session interval
            // so mark the coordinator dead
            
    coordinatorDead();
            return;
        }

    这里根据上一次的心跳时间与当前时间的间隔检查是否达到了要发送心跳的时间,如果没有达到,根据下一次的执行时间为调度的时间,设置到调度器中等待下次的调度处理。
        if (!heartbeat.shouldHeartbeat(now)) {
            // we don't need to heartbeat now, so reschedule for when we do
            
    client.schedule(thisnow + heartbeat.timeToNextHeartbeat(now));
        else {

    流程执行到这里,表示需要向coordinator对应的broker节点发起心跳,设置当前的心跳发送时间为当前时间。
            heartbeat.sentHeartbeat(now);
            requestInFlight true;
    向coordinator对应的broker节点发送HeartbeatRequest请求。如果请求处理成功响应,设置最后一次接收心跳响应的时间为当前时间,表示这是最后一次成功向coordinator节点请求数据。

    如果处理失败(我们主要考虑是请求心跳过去后,发现group又新join了一个consumer进去),这个时候设置rejoinNeeded属性的值为true,表示需要重新进行joinGroup的操作,重新来进行consumer对应的partitions的分配操作。

    如果处理失败的错误是coordinator节点没有准备好,表示这个leader节点可能是刚切换完成,或者说group对应的partition的leader已经发生了变化,关闭掉coordinator的连接,这里表示需要重新得到group的metadata的信息,并重新创建与新的coordinator的连接。
            RequestFuture<Void> future = sendHeartbeatRequest();
            future.addListener(new RequestFutureListener<Void>() {
                @Override
                public void onSuccess(Void value) {
                    requestInFlight false;
                    long now = time.milliseconds();
                    heartbeat.receiveHeartbeat(now);
                    long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now);
                    client.schedule(HeartbeatTask.thisnextHeartbeatTime);
                }

                @Override
                public void onFailure(RuntimeException e) {
                    requestInFlight false;
                    client.schedule(HeartbeatTask.this,

                         time.milliseconds() + retryBackoffMs);
                }
            });
        }
    }

    Consumergroup加入

    当执行KafkaConsumer中的poll函数时,会检查partition是否被执行分配,如果没有时,会通过ConsumerCoordinator实例中的ensureActiveGroup函数来把group加入到对应groupId的partition中,并根据group的id与consumer的个数,对当前的consumer进行partition的分配。

    public void ensureActiveGroup() {

    检查rejoinNeeded属性的值是否是false,如果是false表示这个订阅已经被执行过分配,或者说不需要重新执行group的Join操作。
        if (!needRejoin())
            return;


        if (needsJoinPrepare) {

    属性needsJoinPrepare在这个实例默认初始化时的值为true,也就是在初始化时,这个流程会被执行。

    memberId的值默认为UNKNOWN_MEMBER_ID。generation的默认值为DEFAULT_GENERATION_ID

    执行group加入前的准备工作:

    1,如果配置有自动提交offset时,先disable掉自动提交的线程,并对当前consumer中cache中的partition的offset进行提交操作。

    2,如果订阅topic时,设置有ConsumerRebalanceListener接口的实现,调用接口中的onPartitionsRevoked函数把当前consumer中所有已经分配的partition传入并执行下线前的操作(这个接口在partition下线时,让用户自定义下线前对partition的处理,比如手动指定offset时用于提交要下线的partition的offset)。

    3,设置groupSubscription集合的值为subscription集合的值,

    并设置needsPartitionAssignment属性的值为true,表示需要分配partition.

    最后设置needsJoinPrepare属性为false.
            onJoinPrepare(generationmemberId);
            needsJoinPrepare false;
        }

    这里根据属性的值,如果这个值一直没有被更新为false时,这里一直开始迭代,向broker发起加入group的请求。
        while (needRejoin()) {
            ensureCoordinatorKnown();

     

    如果coordinator这个节点现在已经有请求正在被执行,等待请求执行完成。
            
    if (client.pendingRequestCount(this.coordinator) > 0) {
                client.awaitPendingRequests(this.coordinator);
                continue;
            }

    这里生成一个JoinGroupRequest请求(JOIN_GROUP),并向coordinator的节点发起这个请求。

    接收这个请求通过JoinGroupResponseHandler来处理。这里的处理主要会执行如下几个步骤:

    1,向coordinator发起一个joinGroup的操作,

    2,根据joinGroup的响应结果,在leader的consumer中执行所有消费这个group的consumer的partition的分配操作,并向coorninator中发起一个syncGroup的操作(leader consumer与follower的consumer都会发起,并只有leader进行处理,follower的请求等待leader的请求接收到后统一处理完成后每个consumer才会接收到响应)

    3,每个consumer得到syncGroup后给自己分配的partition的分配信息,并执行onJoinComplete函数。
            RequestFuture<ByteBuffer> future = performGroupJoin();
            client.poll(future);

            if (future.succeeded()) {

    在执行group的加入操作完成后的处理,包含join->assignor partitions->sync操作完成后,

    这个onJoinComplete函数的处理流程:

    1,解析出响应过来对应此memberId对应的partitions的分配信息,并设置订阅的needsFetchCommittedOffsets属性为true,表示需要加载消费者已经提过过的offset的值。

    2,把接收到的partitions的分配信息设置到订阅的assignment集合中,每一个partition都对应一个TopicPartitionState实例的值,用来存储这个partition的消费状态,包含消费到的offset,commit的offset等,更新订阅的needsPartitionAssignment属性值为false,表示分配完成。

    3,如果启用有自动提交offset的配置时,启动自动提交offset的定时器,用于定时自动提交offset.

    4,如果设置有consumer的rebalance的listener时,执行listener的onPartitionsAssigned函数。
                onJoinComplete(generationmemberIdprotocolfuture.value());

    启动用于member与group leader broker的节点心跳管理 的定时执行器。
                needsJoinPrepare true;
                heartbeatTask.reset();
            else {

    流程执行到这里,说明这次请求得到了一个失败的返回,根据返回的错误代码,检查是否需要重新向broker发起一个joinGroup的操作。
                RuntimeException exception = future.exception();
                if (exception instanceof UnknownMemberIdException ||
                        exception instanceof RebalanceInProgressException ||
                        exception instanceof IllegalGenerationException)
                    continue;
                else if (!future.isRetriable())
                    throw exception;
                time.sleep(retryBackoffMs);
            }
        }
    }

     

    接下来看看接收joinGroup请求的响应监听处理实例JoinGroupResponseHandler

    public void handle(JoinGroupResponse joinResponse,

     RequestFuture<ByteBuffer> future) {
            // process the response
            
    short errorCode = joinResponse.errorCode();

    根据coordinator的server端返回的响应代码,进行相应的处理流程。
            if (errorCode == Errors.NONE.code()) {

    如果流程执行到这里,表示server端处理joinGroup的请求正常结束,向当前的client端返回了消息,

    得到joinGroup后对当前的client端成员生成的memberId的值与generation的值,并设置当当前的ConsumerCoordinator的实例中。
                log.debug("Joined group: {}"joinResponse.toStruct());
                AbstractCoordinator.this.memberId = joinResponse.memberId();
                AbstractCoordinator.this.generation = joinResponse.generationId();
                AbstractCoordinator.this.rejoinNeeded false;
                AbstractCoordinator.this.protocol = joinResponse.groupProtocol();
                sensors.joinLatency.record(response.requestLatencyMs());

    这里根据当前的consumer的member是否是group的leader来进行相应的处理。
                if (joinResponse.isLeader()) {
                    onJoinLeader(joinResponse).chain(future);
                

    如果当前的consumer的member不是leader的member时,执行onJoinFollower来完成对join后的处理。

                else {
                    onJoinFollower().chain(future);
                }
            } 

    这里处理如果group在server端正在执行rebalance的操作时,这种情况下server端对group中的所有的member会执行清空处理,等待consumer重新发送joinGroup的操作过去。

            else if (errorCode == Errors.GROUP_LOAD_IN_PROGRESS.code()) {
                log.debug("Attempt to join group {} rejected since coordinator is loading 

                     the group."groupId);
                // backoff and retry
                
    future.raise(Errors.forCode(errorCode));
            

    这里的处理错误代码如果是UNKNOWN_MEMBER_ID值时,表示这是一个新加入的group,如果当前的consumer传入的memberId的值不是默认的值时,会得到这个响应代码。等待consumer重新发送joinGroup的操作过去。

            else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()) {
                // reset the member id and retry immediately
                
    AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
                log.info("Attempt to join group {} failed due to unknown member id, 

                        resetting and retrying.",
                        groupId);
                future.raise(Errors.UNKNOWN_MEMBER_ID);
            }

    如果请求的group对应的状态是DEAD的状态时,表示这个group已经被标记为死亡的状态,这种情况表示说当前的group对应的partition的broker leader发生了变化,设置当前的coordinator节点信息属性为null,表示需要在下一次读取时,重新得到这个group对应的leader的broker节点。

            else if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()
                    || errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
                // re-discover the coordinator and retry with backoff
                
    coordinatorDead();
                log.info("Attempt to join group {} failed due to obsolete coordinator 

                    information, retrying.",
                    groupId);
                future.raise(Errors.forCode(errorCode));
            else if (errorCode == Errors.INCONSISTENT_GROUP_PROTOCOL.code()
                    || errorCode == Errors.INVALID_SESSION_TIMEOUT.code()
                    || errorCode == Errors.INVALID_GROUP_ID.code()) {
                // log the error and re-throw the exception
                
    Errors error = Errors.forCode(errorCode);
                log.error("Attempt to join group {} failed due to: {}",
                        groupIderror.exception().getMessage());
                future.raise(error);
            else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
                future.raise(new GroupAuthorizationException(groupId));
            else {
                // unexpected error, throw the exception
                
    future.raise(new KafkaException(

                      "Unexpected error in join group response: " + Errors.forCode(

                      joinResponse.errorCode()).exception().getMessage()));
            }
        }
    }

     

    对joinGroup后,当前的consumer是leader member时的处理

    在得到broker对joinGroup的成功响应后,如果返回的member标记着当前的consumer是leader节点,

    执行onJoinLeader函数来进行操作,

    private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
        try {

    如果当前的consumer是这个group中的leader的member的节点时,通过leader来进行partition的分配工作,这里得到每一个member根据其消费的topic进行partition分配后,每个member的partition的分配信息。
            
    Map<StringByteBuffer> groupAssignment = performAssignment(

                    joinResponse.leaderId(),

                    joinResponse.groupProtocol(),
                    joinResponse.members());

    根据当前的leader的memberId,group的id与当前的member对应的generation的值,生成一个SyncGroupRequest的请求,并通过函数向coordinator对应的broker节点发送请求syncGroup的请求。
            SyncGroupRequest request = new SyncGroupRequest(groupIdgeneration

                   memberIdgroupAssignment);


            log.debug("Issuing leader SyncGroup ({}: {}) to coordinator {}"

                ApiKeys.SYNC_GROUPrequestthis.coordinator.id());
            return sendSyncGroupRequest(request);
        catch (RuntimeException e) {
            return RequestFuture.failure(e);
        }
    }

     

    preformAssignment函数,用于执行对group的所有的member的partition进行分配:
    protected Map<StringByteBuffer> performAssignment(String leaderId,
                                                        String assignmentStrategy,
                                       Map<StringByteBuffer> allSubscriptions) {

     

    得到用于进行partition的分配的实例,默认是RangeAssignor实例。
        PartitionAssignor assignor = lookupAssignor(assignmentStrategy);


        if (assignor == null)
            throw new IllegalStateException("Coordinator selected invalid assignment 

                   protocol: " + assignmentStrategy);

    得到当前group中所有的consumer所有消费的topic的集合。
        Set<String> allSubscribedTopics = new HashSet<>();

     

    这里根据memberId存储每一个member对应的消费的topcis与assignor的实例名称信息。
        Map<StringSubscription> subscriptions = new HashMap<>();

    这里开始迭代此group中每一个member的实例,并取出每一个member订阅的topci的集合存储起来。
        for (Map.Entry<StringByteBuffer> subscriptionEntry : 

                                                allSubscriptions.entrySet()) {
            Subscription subscription = ConsumerProtocol.deserializeSubscription(

                           subscriptionEntry.getValue());
            subscriptions.put(subscriptionEntry.getKey()subscription);
            allSubscribedTopics.addAll(subscription.topics());
        }

     

    更新groupSubscription集合的topic信息为当前的group中所有的member消费的topci的集合。
        
    this.subscriptions.groupSubscribe(allSubscribedTopics);

    把所有的topic集合添加到metadata的实例中,如果metadata中当前的topics的集合中不包含当前添加进行的部分topic时,设置needUpdate属性的值为true,表示metadata需要被更新。
        metadata.setTopics(this.subscriptions.groupSubscription());

    如果说metadata中有topic被更新,这里等待metadata更新完成,在ConsumerNetworkClient中通过clientPoll进行调用时,会执行NetworkClient中的poll函数(maybeUpdate函数),这个函数根据是否需要更新metadata来生成一个MetadataRequest请求,得到group中所有的consumer需要的topic的metadata的信息。这个过程会一直等待,直到metadata更新成功。
        client.ensureFreshMetadata();

        log.debug("Performing {} assignment for subscriptions {}"assignor.name()

              subscriptions);

    通过assignor的实现来完成分配工作,根据所有的consumer订阅的topic的集合与当前的cluster中的节点信息。assignor的实现通过partition.assignment.strategy配置,默认是RangeAssignor.

    分配执行流程:

    1,得到所有member订阅过的所有的topic的集合,并得到每一个topic对应的partition的个数。

    2,根据每个topic对应的partition的个数与member对应的topic的集合进行分配:

    2,1,先根据每一个member对应的topic的集合,得到每个topic对应的consumer的集合,

    2,2,根据生成出来的topic对consumer的集合进行迭代,得到每一个迭代的topic的partition的个数,并根据topic对应的consumer的集合进行排序,计算出每一个consumer平均要分配的partition的个数与剩余的partition的个数,

    2,2,1迭代每一个topic中的consumer,开始针对这一个consumer进行partition的分配,这里使用一个示例来说明分配情况:

    假如,一个topic有11个partition,三个consumer时,

    A,这时候,每个conumser平均分配3个partition还于下2个partition,

    B,对第一个consumer进行分配时,原则上应该是从0开始分配,长度是3,但是由于还于下2个partition,那么第一个consumer分配就是从0开始,分配长度为4个partition,这里迭代的consumer的下标为0,

    开始位置:平均个数×0 + (0=min(下标,于下个数)),长度:平均个数+(0+1>于下个数?0:1)=1

    C,对第二个consumer进行分配时,这里根据平均值计算出来应该是从3开始,但是由于于下了2个partition,上一次分配时长度加了1,因此,这里开始的位置从4开始,根据于下的partition时,分配长度也是4个partition,这时,还剩下3个partition,迭代的下标为1,

    开始位置:平均个数×1 + (1=min(下标,于下个数)),长度:平均个数+(1+1>于下个数?0:1)=1

    D,最后,对第三个consumer进行分配,这里迭代的下标是2,

    开始位置:平均个数×2 + (2=min(下标,于下个数)),长度:平均个数+(2+1>于下个数?0:1)=0

    3,最后,这个函数返回的值是一个map,key就是每个consumer的member的id,value是对应的TopicPartition的集合。


        Map<StringAssignment> assignment = assignor.assign(metadata.fetch()

             subscriptions);

        log.debug("Finished assignment: {}"assignment);

    把针对每一个member分配好的partition集合的Assignment实例进行序列化,并返回这个member的分配集合。
        Map<StringByteBuffer> groupAssignment = new HashMap<>();
        for (Map.Entry<StringAssignment> assignmentEntry : assignment.entrySet()) {
            ByteBuffer buffer = ConsumerProtocol.serializeAssignment(

                      assignmentEntry.getValue());
            groupAssignment.put(assignmentEntry.getKey()buffer);
        }

        return groupAssignment;
    }

     

    对joinGroup后,当前的consumer是非leader member时的处理

    通过onJoinFollower函数来进行处理,这种情况下,当前的consumer的member是一个从节点,只对leader对其分配的partition的数据进行消费。

    在非leader的consumer中,向coonrdinator的broker发起syncGroup的操作时,在follower的member中发起syncGroup的请求时,传入的consumer的分配信息为一个空的集合。

    private RequestFuture<ByteBuffer> onJoinFollower() {
        // send follower's sync group with an empty assignment
        
    SyncGroupRequest request = new SyncGroupRequest(groupIdgeneration,
                memberIdCollections.<StringByteBuffer>emptyMap());
        log.debug("Issuing follower SyncGroup ({}: {}) to coordinator {}"

              ApiKeys.SYNC_GROUPrequestthis.coordinator.id());
        return sendSyncGroupRequest(request);
    }

     

    提交完成消费的offset

    通过执行KafkaConsumer中的commitSync函数来提交已经消费到的每个partition中最后一个offset的值。针对这个函数的处理时,如果是手动进行调用,需要自行进行try catch的操作,否则这个过程中有可能会导致程序无法正常被执行。

    public void commitSync() {
        acquire();
        try {
            commitSync(subscriptions.allConsumed());
        finally {
            release();
        }
    }

     

    首先看看SubscriptionState实例中的allConsumed函数:

    这里迭代这个consumer订阅的所有的topic分配的partition的集合,并取出这个partition中消费到的最后一个offset记录的position的值(只有在consumer执行poll后,这个值才会被更新,就是消费掉的最后一条记录对应的offset的值),把每一个partition对应的offset生成到一个map集合中,返回给上层的调用函数。

    public Map<TopicPartitionOffsetAndMetadata> allConsumed() {
        Map<TopicPartitionOffsetAndMetadata> allConsumed = new HashMap<>();
        for (Map.Entry<TopicPartitionTopicPartitionState> entry : 

                       assignment.entrySet()) {
            TopicPartitionState state = entry.getValue();
            if (state.hasValidPosition)
                allConsumed.put(entry.getKey()new OffsetAndMetadata(state.position));
        }
        return allConsumed;
    }

     

    接下来看看commitSync提交对应的partition记录到的最后一个消费的offset的流程:

    public void commitSync(final Map<TopicPartitionOffsetAndMetadata> offsets) {
        acquire();
        try {

    这个函数中直接通过coordinator对应的broker节点的连接信息提交offsets,这个由ConsumerCoordinator实现。
            coordinator.commitOffsetsSync(offsets);
        finally {
            release();
        }
    }

     

    ConsumerCoordinator中的commitOffsetsSync函数,生成提交当前consumer中已经消费的offset的请求:

    public void commitOffsetsSync(Map<TopicPartitionOffsetAndMetadata> offsets) {
        if (offsets.isEmpty())
            return;

        while (true) {

    首先检查coordinator的broker是否是连接的状态,如果不是先对coordinator进行连接。
            ensureCoordinatorKnown();
    生成一个OffsetCommitRequest请求,这个请求中针对每一个partition对应的offset生成请求的内容,并向coordinator对应的broker节点发起这个请求,

    处理这个请求的响应通过OffsetCommitResponseHandler实例来完成。
            RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
            client.poll(future);

            if (future.succeeded())
                return;

            if (!future.isRetriable())
                throw future.exception();

            time.sleep(retryBackoffMs);
        }
    }

     

    接收Offset提交的响应处理流程:

    public void handle(OffsetCommitResponse commitResponseRequestFuture<Void> future) {
        sensors.commitLatency.record(response.requestLatencyMs());
        Set<String> unauthorizedTopics = new HashSet<>();
    迭代对每一个partition的offset的提交的响应代码进行处理。
        for (Map.Entry<TopicPartitionShort> entry : 

                      commitResponse.responseData().entrySet()) {
            TopicPartition tp = entry.getKey();
            OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
            long offset = offsetAndMetadata.offset();

            Errors error = Errors.forCode(entry.getValue());
            if (error == Errors.NONE) {

    如果请求提交offsets正常被server端进行处理,更新这个partition对应的TopicPrtitionState中的committed属性的值为提交时对应的position属性的值。。
                log.debug("Committed offset {} for partition {}"offsettp);
                if (subscriptions.isAssigned(tp))
                    // update the local cache only if the partition is still assigned
                    
    subscriptions.committed(tpoffsetAndMetadata);
            else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                log.error("Unauthorized to commit for group {}"groupId);
                future.raise(new GroupAuthorizationException(groupId));
                return;
            else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                unauthorizedTopics.add(tp.topic());
            else if (error == Errors.OFFSET_METADATA_TOO_LARGE
                    
    || error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
                // raise the error to the user
                
    log.info("Offset commit for group {} failed on partition {} due to {}, 

                           will retry"groupIdtperror);
                future.raise(error);
                return;
            else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {

    这种情况是group发生了leader的变化,group的cache信息正在被加载,需要进行重试。
                // just retry
                
    log.info("Offset commit for group {} failed due to {}, will retry"

                         groupIderror);
                future.raise(error);
                return;
            else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
                    
    || error == Errors.NOT_COORDINATOR_FOR_GROUP
                    
    || error == Errors.REQUEST_TIMED_OUT) {

    这种情况表示group已经不在原来的节点中,需要断开与原来的coordinator的broker的连接,并进行重试。
                log.info("Offset commit for group {} failed due to {}, will find new 

                       coordinator and retry"groupIderror);
                coordinatorDead();
                future.raise(error);
                return;
            else if (error == Errors.UNKNOWN_MEMBER_ID
                    
    || error == Errors.ILLEGAL_GENERATION
                    
    || error == Errors.REBALANCE_IN_PROGRESS) {

    这种情况表示当前的group中有consumer上线或者下线,需要对partition进行重新分配。
                // need to re-join group
                
    log.error("Error {} occurred while committing offsets for group {}"error

                          groupId);
                subscriptions.needReassignment();
                future.raise(new CommitFailedException("Commit cannot be completed due 

                        to group rebalance"));
                return;
            else {
                log.error("Error committing partition {} at offset {}: {}"tpoffset

                      error.exception().getMessage());
                future.raise(new KafkaException("Unexpected error in commit: " 

                       error.exception().getMessage()));
                return;
            }
        }

        if (!unauthorizedTopics.isEmpty()) {
            log.error("Unauthorized to commit to topics {}"unauthorizedTopics);
            future.raise(new TopicAuthorizationException(unauthorizedTopics));
        else {
            future.complete(null);
        }
    }

    展开全文
  • storm-kafka源码分析

    千次阅读 2017-08-13 20:09:22
    storm-kafka源码分析@(KAFKA)[kafka, 大数据, storm]storm-kafka源码分析 一概述 一代码结构 二orgapachestormkafka 三orgapachestormkafkatrident 1spout 2state 3metric 四其它说明 1线程与分区 二...

    storm-kafka源码分析

    @(KAFKA)[kafka, 大数据, storm]

    一、概述

    storm-kafka是storm用于读取kafka消息的连接器,本文主要对trident的实现部分作了解读。

    (一)代码结构

    storm-kafka中多7个package中,其中的org.apache.storm.kafka与org.apache.storm.kafka.trident中最核心的2个,分别用于处理storm-core与trident,其它package只是这2个的辅助。我们下面分别先简单看一下这2个package的内容。

    注:还有一个包org.apache.storm.kafka.bolt用于向kafka写入数据,用得较少,暂不分析。

    (二)org.apache.storm.kafka

    org.apache.storm.kafka这个package包括了一些公共模块,以及storm-core的spout处理。

    (三)org.apache.storm.kafka.trident

    trident这个package中的类按照其功能可大致分为3类:spout, state和metric。除此之外,trident还调用了一些org.apache.storm.kafka中的类用于处理相同的事务,如metric, exception, DynamicBrokerReader等

    1、spout

    spout指定了如何从kafka中读取消息,根据trident的构架,它涉及的主要类为:
    * OpaqueTridentKafkaSpout, TransactionalTridentKafkaSpout: 2种类型的spout
    * Coordinator, TridentKafkaEmitter: 即Coordinator与Emitter的具体实现。
    * GlobalPartitionInformation, ZkBrokerReader:2个重要的辅助类,分别记录了partition的信息以及如何从zk中读取kafka的状态(还有一个静态指定的,这里不分析)。

    2、state

    3、metric

    主要涉及一个类:MaxMetric,其实还有其它metric,但在org.apache.storm.kafka中定义了。

    (四)其它说明

    1、线程与分区

    注意,storm-kafka中的spout只是其中一个线程。
    严格来说是每个partition只能由一个task负责,当然,一个task可以处理多个partition。但task和partition之间是怎么对应的呢?如何决定一个task处理哪些partition?

    在trident拓扑中,多个batch会同时被处理(由MAX_SPOUT_PENDING决定),每个batch包含多个或者全部分区,每个batch读取的消息大小由fetchSizeBytes决定。

    二、org.apache.storm.kafka

    (一)基础类

    这些基础的功能类可以大致分为以下几类:
    * Bean类:表示某一种实体,包括Broker,BrokerHost, Partition 和trident.GlobalPartitionInformation
    * 配置类: 包括KafkaConfig 和 SpoutConfig。
    * zk读写类:包括获取state内容的ZkState,以及读取broker信息的DynamicBrokersReader和trident.ZkBrokerReader。
    * 数据处理类:ZkCoordinator用于确定自已这个spout要处理哪些分区,以及某个分区对于的PartitionManager对象,而PartitionManager则真正的对某个分区进行处理了,DynamicPartitionConnections用于被PartitionManager调用以获取分区对应的SimpleConsumer,
    * KafkaUtils: 一些功能方法。
    另外还有一些metric和错误处理的类等,暂不介绍。

    1、Broker

    Broker只有2个变量:

    public String host;
    public int port;
    

    表示一台kafka机器的地址与端口。

    2、BrokerHosts

    有2种实现:StaticHosts 与 ZkHost。
    以ZkHost为例:

    private static final String DEFAULT_ZK_PATH = "/brokers";
    public String brokerZkStr = null;
    public String brokerZkPath = null; // e.g., /kafka/brokers
    public int refreshFreqSecs = 60;
    

    可以看出,这是记录了kafka在zk中的位置(ip与路径),以及多久刷新一下这个信息。默认为/kafka/brokers,有2个子目录:

    topic   ids
    

    分别记录了topic信息及broker信息。

    3、Partition

    Partition记录了一个分区的具体信息,包括(所在的broker, 所属的topic,partition号)。

    Partition(Broker host, String topic, int partition)
    

    4、trident.GlobalPartitionInformation

    GlobalPartitionInformation记录的是某个topic的所有分区信息,其中分区信息以一个TreeMap的形式来保存。

    public String topic;
    private Map<Integer, Broker> partitionMap;
    

    它有一个getOrderedPartitions()方法,返回的就是这个topic的所有分区信息:

    public List<Partition> getOrderedPartitions() {
        List<Partition> partitions = new LinkedList<Partition>();
        for (Map.Entry<Integer, Broker> partition : partitionMap.entrySet()) {
            partitions.add(new Partition(partition.getValue(), this.topic, partition.getKey(), this.bUseTopicNameForPartitionPathId));
        }
        return partitions;
    }
    

    注意,因为使用了TreeMap的数据结构,因此返回的结果就是有序的。

    5、KafkaConfig

    就是关于kafkaSpout的一些配置项,完整列表为:

    public final BrokerHosts hosts;
    public final String topic;
    public final String clientId;
    
    public int fetchSizeBytes = 1024 * 1024;
    public int socketTimeoutMs = 10000;
    public int fetchMaxWait = 10000;
    public int bufferSizeBytes = 1024 * 1024;
    public MultiScheme scheme = new RawMultiScheme();
    public boolean ignoreZkOffsets = false;
    public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
    public long maxOffsetBehind = Long.MAX_VALUE;
    public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
    public int metricsTimeBucketSizeInSecs = 60;
    

    6、SpoutConfig

    SpoutConfig extends KafkaConfig
    

    加了几个配置项:

    public List<String> zkServers = null;
    public Integer zkPort = null;
    public String zkRoot = null;
    public String id = null;
    
    public String outputStreamId;
    
    // setting for how often to save the current kafka offset to ZooKeeper
    public long stateUpdateIntervalMs = 2000;
    
    // Exponential back-off retry settings.  These are used when retrying messages after a bolt
    // calls OutputCollector.fail().
    public long retryInitialDelayMs = 0;
    public double retryDelayMultiplier = 1.0;
    public long retryDelayMaxMs = 60 * 1000;
    

    7、ZkState

    ZkState记录了每个partition的处理情况,它是通过读写zk来实现的,zk中的内容如下:

    {"topology":{"id":"2e3226e2-ef45-4c53-b03f-aacd94068bc9","name":"ljhtest"},"offset":8066973,"partition":0,"broker":{"host":"gdc-kafka08-log.i.nease.net","port":9092},"topic":"ma30"}
    

    上面的信息分别为topoId,拓扑名称,这个分区处理到的offset,分区号,这个分区在哪台kafka机器,哪个端口,以及topic名称。
    ZkState只要提供了对这个zk信息的读写操作,如readJSON, writeJSON。

    这些信息在zk中的位置通过构建KafkaConfig对象时的第3、4个参数指定,如下面的配置,则数据被写在/kafka2/ljhtest下面。因此第4个参数必须唯一,否则不同拓扑会有冲突。

     SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "ma30", "/kafka2", "ljhtest");
    

    而trident的默认位置为/transactional/${topo}

    8、DynamicBrokersReader

    读取zk中关于kafka的信息,如topic的分区等。

    public List<GlobalPartitionInformation> getBrokerInfo() 
    

    获取所有topic的分区信息。

    private int getNumPartitions(String topic)
    

    获取某个topic的分区数量。

    9、trident.ZkBrokerReader

    trident.ZkBrokerReader大部分功能通过DynamicBrokersReader完成,关于与zk的连接,都是通过前者完成。同时增加了以下2个方法:

    • getBrokerForTopic():返回某个topic的分区信息,返回的是GlobalPartitionInformation对象。这是由于可能同时读取多个分区的情况。
    • getAllBrokers():读取所有的分区,不指定topic。因为支持正则topic,所以有可能有多个topic。
    • refresh(): 这是一个private方法,每隔一段时间去refresh分区信息,在上面2个方法中被调用。
      每次发送一个新的batch时,会通过DynamicPartitionConnections#register()方法调用上面的方法,当时间超过refreshFreqSecs时,即会刷新分区信息。

    10、ZkCoordinator

    ZkCoordinator implements PartitionCoordinator
    

    与之对应的还有个StaticCoordinator。
    主要功能是读取zk中的分区信息,然后计算自己这个task负责哪些分区。

    PartitionCoordinator只有3个方法:
    (1)主要方法为getMyManagedPartitions(),即计算自己这个spout应该处理哪些分区。
    还有refresh是去刷新分区信息的。

    List<PartitionManager> getMyManagedPartitions();
    

    (2)获取PartitionManager对象:

    PartitionManager getManager(Partition partition);
    

    (3)定期刷新分区信息

    void refresh();
    

    11、PartitionManager

    记录了某个分区的连接信息,如:

    Long _committedTo;
    LinkedList<MessageAndOffset> _waitingToEmit = new LinkedList<MessageAndOffset>();
    Partition _partition;
    SpoutConfig _spoutConfig;
    String _topologyInstanceId;
    SimpleConsumer _consumer;
    DynamicPartitionConnections _connections;
    ZkState _state;
    

    即这个分区的分区号,consumer等信息,还有用于发送消息的next()方法等,反正对某个分区的处理都在这个类中。
    2个重点方法:
    * fill()用于从kafka中获取消息,写到_waitingToEmit这个列表中。
    * next()从上面准备的列表中读取数据,通过emit()发送出去。
    * 还有ack(),fail等方法。
    PartitionManager持有一个DynamicPartitionConnections对象,通过这个对象的regist方法可以获取到一个SimpleConsumer对象,从而对消息进行读取。

    12、DynamicPartitionConnections

    DynamicPartitionConnections用于记录broker—SimpleConsumber—-分区之间的关系。* 一个broker对应一个SimpleConsumber,但一个SimpleConsumer可以对应多个分区。尤其是spout的数量比分区数量少的时候*

    主要用于创建SimpleConsumer,通过Partition信息,返回一个SimpleConsumer对象:

    public SimpleConsumer register(Partition partition) {...}
    

    以及unRegister()方法,取消关联。

    Map<Broker, ConnectionInfo> _connections = new HashMap();
    

    这个变量记录了一个broker的连接信息,其中ConnectionInfo有2个成员变量:

    static class ConnectionInfo {
        SimpleConsumer consumer;
        Set<String> partitions = new HashSet<String>();
    
        public ConnectionInfo(SimpleConsumer consumer) {
            this.consumer = consumer;
        }
    }
    

    因此一个broker对应一个ConnectionInfo对象,而ConnectionInfo对象内有一个SimpleConsumber对象和其对应的多个分区。

    13、KafkaUtils

    很多公用方法,以后一个一个解释:

    (1)calculatePartitionsForTask

    public static List<Partition> calculatePartitionsForTask(List<GlobalPartitionInformation> partitons, int totalTasks, int taskIndex) {
    

    计算某个task负责哪些分区。
    注意,tridentSpout并未使用这个方法计算所负责的分区。TridentSpout的分区计算不在storm-kafka中实现,而是Trident机制自带的。详细的说是在OpaquePartitionedTridentSpoutExecutor的emitBatch()方法中计算。这就有个问题了,为什么在trident中,会自己计算负责的分区,而一般的storm需要自己来实现。

    (二)KafkaSpout

    在用户代码中,用户通过使用KafKaConfig对象创建一个KafkaSpout,这是整个拓扑的起点:

        SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "ma30", "/test2", "ljhtest");
        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("words", new KafkaSpout(kafkaConfig), 10);
    

    KafkaSpout继承自BaseRichSpout,有open(), nextTuple(), ack(), fail()等方法。
    下面我们详细分析一下KafkaSpout这个类。

    1、open()

    KafkaSpout完成初始化的方法,当一个spout 被创建时,这个方法被调用。这个方法主要完成了以下几个对象的初始化:
    * _state : 获取state目录下的内容,详见ZkState中的介绍。
    * _connection:用于在每次发送消息(nextTuple方法法)时,获取某个分区的SimpleConsumer对象。
    * _coordinator:用于在每次必发送消息时获取这个spout要处理哪些分区。
    此外还有2个metric。

    2、nextTuple()

        //获取这个task要处理哪些分区,然后对每个分区数据开始处理
        List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
        for (int i = 0; i < managers.size(); i++) {
    
            // in case the number of managers decreased
           _currPartitionIndex = _currPartitionIndex % managers.size();
            //发送消息,下面慢慢分析。
            mitState state = managers.get(_currPartitionIndex).next(_collector);
        }
    

    只要就2个步骤:
    * 获取到这个spout要处理哪些分区
    * 然后遍历分区,对消息进行处理,处理的过程在ParitionManage中,稍后再详细介绍。

    三、trident

    OpaqueTridentKafkaSpout implements IOpaquePartitionedTridentSpout

    (一)tridentspout的主要流程

    1、主要调用流程回顾

    先说明一下,一个spout的组成分成三个部分,简单的说就是消息是从MasterBatchCoordinator开始的,它是一个真正的spout,而TridentSpoutCoordinator与TridentSpoutExecutor都是bolt,MasterBatchCoordinator发起协调消息,最后的结果是TridentSpoutExecutor发送业务消息。而发送协调消息与业务消息的都是调用用户Spout中BatchCoordinator与Emitter中定义的代码。

    MaterBatchCorodeinator —————> ITridentSpout.Coordinator#isReady
    |
    |
    v
    TridentSpoutCoordinator —————> ITridentSpout.Coordinator#[initialTransaction, success, close]
    |
    |
    v
    TridentSpoutExecutor —————> ITridentSpout.Emitter#(emitBatch, success(),close)
    对于分区是OpaquePartitionedTridentSpoutExecutor等

    如果需要详细了解这个过程,可参考:
    http://blog.csdn.net/lujinhong2/article/details/49785077

    我们先简单介绍一下所有的相关类及其位置,然后分别介绍Coordinator与Emitter的实现。尤其是着重分析一下Emitter部分,因为它是实际读取kafka消息,并向下游发送的过程。

    2、指定spout

    用户在代码中用以下语句指定使用哪个spout,如:

    OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);
    

    然后storm根据这个spout的代码,找到对应的Coordinator与Emitter。我们看一下OpaqueTridentKafkaSpout的代码。
    这代码很简单,主要完成了:
    (1)初始化一个Spout时,会要求传递一个TridentKafkaConfig的参数,指定一些配置参数。

    TridentKafkaConfig _config;
    
    public OpaqueTridentKafkaSpout(TridentKafkaConfig config) {
        _config = config;
    }
    

    (2)然后就分别指定了Coordinator与Emitter:

    @Override
    public IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map> getEmitter(Map conf, TopologyContext context) {
        return new TridentKafkaEmitter(conf, context, _config, context
                .getStormId()).asOpaqueEmitter();
    }
    
    @Override
    public IOpaquePartitionedTridentSpout.Coordinator getCoordinator(Map conf, TopologyContext tc) {
        return new org.apache.storm.kafka.trident.Coordinator(conf, _config);
    }
    

    (二)Coordinator

    1、Coordinator的实例化

    public Coordinator(Map conf, TridentKafkaConfig tridentKafkaConfig) {
        config = tridentKafkaConfig;
        reader = KafkaUtils.makeBrokerReader(conf, config);
    }
    

    2、close()与isReady()

    Coordinator通过TridentKafkaConfig传入一个DefaultCoordinator的对象,Coordinator的close()及isReady()均是通过调用DefaultCoordinator的实现来完成的。

    @Override
    public void close() {
        config.coordinator.close();
    }
    
    @Override
    public boolean isReady(long txid) {
        return config.coordinator.isReady(txid);
    }
    

    我们接着看一下DefaultCoordinator的实现:

    @Override
    public boolean isReady(long txid) {
        return true;
    }
    
    @Override
    public void close() {
    }
    

    很简单,isReady()直接返回true,close()则不做任何事情。

    3、getPartitionsForBatch()

    这个方法的功能是在初始化一个事务时,去zk读取最新的分区信息(当然是缓存超时后才读)。

    @Override
    public List<GlobalPartitionInformation> getPartitionsForBatch() {
        return reader.getAllBrokers();
    }
    

    注释为:
    Return the partitions currently in the source of data. The idea is is that if a new partition is added and a prior transaction is replayed, it doesn’t emit tuples for the new partition because it knows what partitions were in that transaction.

    由下面可以看出,getPartitionsForBatch()都是在初始化一个事务时被调用的。
    透明型:

        @Override
        public Object initializeTransaction(long txid, Object prevMetadata, Object currMetadata) {
            return _coordinator.getPartitionsForBatch();
        }
    

    事务型:

        @Override
        public Integer initializeTransaction(long txid, Integer prevMetadata, Integer currMetadata) {
            if(currMetadata!=null) {
                return currMetadata;
            } else {
                return _coordinator.getPartitionsForBatch();            
            }
        }
    

    那我们继续看看这个方法完成了什么功能:

    @Override
    public List<GlobalPartitionInformation> getAllBrokers() {
        refresh();
        return cachedBrokers;
    }
    

    除了这个,还有一个使用静态指定的,暂不管它。

    private void refresh() {
        long currTime = System.currentTimeMillis();
        if (currTime > lastRefreshTimeMs + refreshMillis) {
            try {
                LOG.info("brokers need refreshing because " + refreshMillis + "ms have expired");
                cachedBrokers = reader.getBrokerInfo();
                lastRefreshTimeMs = currTime;
            } catch (java.net.SocketTimeoutException e) {
                LOG.warn("Failed to update brokers", e);
            }
        }
    }
    

    其它就是在超时的情况下去zk读取broker的信息,并返回partitions的信息。返回的信息为GlobalPartitionInformation列表,即topic与其具体分区信息的map。

    public List<GlobalPartitionInformation> getBrokerInfo() throws SocketTimeoutException {
      List<String> topics =  getTopics();
      List<GlobalPartitionInformation> partitions =  new ArrayList<GlobalPartitionInformation>();
    
      for (String topic : topics) {
          GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(topic, this._isWildcardTopic);
          try {
              int numPartitionsForTopic = getNumPartitions(topic);
              String brokerInfoPath = brokerPath();
              for (int partition = 0; partition < numPartitionsForTopic; partition++) {
                  int leader = getLeaderFor(topic,partition);
                  String path = brokerInfoPath + "/" + leader;
                  try {
                      byte[] brokerData = _curator.getData().forPath(path);
                      Broker hp = getBrokerHost(brokerData);
                      globalPartitionInformation.addPartition(partition, hp);
                  } catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
                      LOG.error("Node {} does not exist ", path);
                  }
              }
          } catch (SocketTimeoutException e) {
              throw e;
          } catch (Exception e) {
              throw new RuntimeException(e);
          }
          LOG.info("Read partition info from zookeeper: " + globalPartitionInformation);
          partitions.add(globalPartitionInformation);
      }
        return partitions;
    }
    

    以下内容均是对emitter的介绍
    注意,在trident中,每个task负责哪些分区是在storm-core中计算好的,因此在emitter中只负责处理这个分区的消息就行了,具体来说是在OpaquePartitionedTridentSpoutExecutor.emitBatch()中计算分区的

    (三)Emitter : TridentKafkaEmitter结构

    TridentKafkaEmitter中有2个内部类,分别对应事务型与透明型的spout。事务型的spout重发batch时必须与上一批次相同,而透明型是没这个需要的,可以从其它可能的分区中取一批新的数据。

    1、offset与nextOffset

    消息处理的metaData中保存了offset与nextOffset2个数据,其中后者一般通过MessageAndOffset#nextOffset()来获取到。offset表示当前正在处理的消息的offset,nextOffset表示当前消息的下一个offset。举个例子:

    (offset)*这是一批消息**(nextOffset)
    因此正常情况下,应该offset

    1、事务型的spout

    有5个方法,我们这里先讨论其中2个核心方法。storm根据某个batch是否第一次发送来决定调用哪个方法。

    emitPartitionBatchNew()

    当某个batch是第一次发送时,调用此方法,这个方法的调用顺序为:

    emitPartitionBatchNew() —-> failFastEmitNewPartitionBatch() —–> doEmitNewPartitionBatch()

    emitPartitionBatch()

    当某个batch是重发时,调用此方法,这个方法的调用顺序为:
    emitPartitionBatch() —–> reEmitPartitionBatch()

    2、透明型的spout

    透明型的spout不需要保证重发的batch与上一批次是相同的,因此,对于每一次发送都是相同的逻辑即可,不需要管是否第一次发送,它只有一个发送方法。

    emitPartitionBatch()

    emitPartitionBatch() —–> emitNewPartitionBatch() —-> failFastEmitNewPartitionBatch() —–> doEmitNewPartitionBatch()

    2种类型发送数据时只终均是调用doEmitNewPartitionBatch(),而透明型的spout在调用之前会先使用emitNewPartitionBatch()来捕获FailedFetchException,重新获取一份新的元数据,以准备读取新的消息

    image

    3、公共方法

    除了以上的发送数据方法以外,它们均还有以下3个方法,下面再详细分析。

            @Override
            public void refreshPartitions(List<Partition> partitions) {
                refresh(partitions);
            }
    
            @Override
            public List<Partition> getOrderedPartitions(GlobalPartitionInformation partitionInformation) {
                return orderPartitions(partitionInformation);
            }
    
            @Override
            public void close() {
                clear();
            }
    

    (四)透明型spout

    1、emitPartitionBatch()

    /**
             * Emit a batch of tuples for a partition/transaction.
             *
             * Return the metadata describing this batch that will be used as lastPartitionMeta
             * for defining the parameters of the next batch.
             */
            @Override
            public Map emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
                return emitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
            }
    

    当需要发送一个新的batch时,storm会调用emitPartitionBatch方法,此方法直接调用emitNewPartitionBatch。

    参数说明:
    * transactionAttempt,只有2个成员变量,即long _txId和int _attemptId,即记录了当前的事务id及已经尝试的次数。
    * tridentCollector,就是用于发送消息的collector。
    * partition,表示一个分区,可以理解为kafka的一个分区,有2个成员变量,分别为Broker host和int partition,即kafka的机器与分区id。
    * map,用于记录这个事务的元数据,详细见后面分析。

    2、emitNewPartitionBatch()

    private Map emitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) {
        try {
            return failFastEmitNewPartitionBatch(attempt, collector, partition, lastMeta);
        } catch (FailedFetchException e) {
            LOG.warn("Failed to fetch from partition " + partition);
            if (lastMeta == null) {
                return null;
            } else {
                Map ret = new HashMap();
                ret.put("offset", lastMeta.get("nextOffset"));
                ret.put("nextOffset", lastMeta.get("nextOffset"));
                ret.put("partition", partition.partition);
                ret.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port));
                ret.put("topic", _config.topic);
                ret.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId));
                return ret;
            }
        }
    }
    

    很明显,也只是简单调用failFastEmitNewPartitionBatch,但如果获取消息失败的话,则会创建一个新元数据。
    如果lastMeta为null的话,则会直接返回null,则会从其它地方(如zk)进行初始化(邮见下面的分析);如果不为空,则根据lastMeta的值,根据一个新的元数据。元数据包括以下几个字段:
    * offset:下一个需要处理的offset
    * nextOffset:由于未开始处理batch,所以offset与nextOffset都是同一个值。注意,如果正在处理一个batch,则offset是正在处理的batch的offset,而nextOffset则是下一个需要处理的offset。
    * partition:就是哪个分区了
    * broker:哪台kafka机器以及端口
    * topic:哪个kafka topic
    * topology:拓扑的名称与id。

    TODO:ImmutableMap.of()

    ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId)
    

    TODO:如果获取失败,哪里更新了新的分区信息,是fetch里面作了处理吗?后面再看。

    3、failFastEmitNewPartitionBatch()

        private Map failFastEmitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) {
        SimpleConsumer consumer = _connections.register(partition);
        Map ret = doEmitNewPartitionBatch(consumer, partition, collector, lastMeta);
        _kafkaOffsetMetric.setLatestEmittedOffset(partition, (Long) ret.get("offset"));
        return ret;
    }
    

    先根据partition信息注册一个consumer,注意这里的分区信息包括了机器、端口还有分区id等。然后就调用doEmitNewPartitionBatch执行实际事务,最后的是metric的使用。

    4、doEmitNewPartitionBatch()

    (1)确定offset

    简单的说,就是
    * 如果lastMeta为空,则从其它地方(如zk)获取offset;
    * 否则,如果当前topoid与之前的不同(表示拓扑重启过)而且ignoreZkOffsets为true,则从指定的offset开始;
    * 如果当前topoid与之前的相同(表示在持续处理消息中),或者ignoreZkOffsets为false,则从之前的位置继续处理

        long offset;
        //1、如果lastMeta不为空,则:
        if (lastMeta != null) {
            String lastInstanceId = null;
            Map lastTopoMeta = (Map) lastMeta.get("topology");
            if (lastTopoMeta != null) {
                lastInstanceId = (String) lastTopoMeta.get("id");
            }
            //1.1:如果ignoreZkOffsets为true,而且当前拓扑id与之前的id不同时,则从指定的时间点开始获取消息。
            if (_config.ignoreZkOffsets && !_topologyInstanceId.equals(lastInstanceId)) {
                offset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config.startOffsetTime);
            } else {
                //1.2:如果ignoreZkOffsets为false,或者当前拓扑id与之前的id相同(表示拓扑没有重启过,一直在处理消息中),则继续之前的处理。
                offset = (Long) lastMeta.get("nextOffset");
            }
        } else {
            //2、如果lastMeta为空,则从其它地方(如zk)获取之前的offset
            offset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config);
        }
    

    (2)读取消息

    ByteBufferMessageSet msgs = null;
        try {
            msgs = fetchMessages(consumer, partition, offset);
        } catch (TopicOffsetOutOfRangeException e) {
            long newOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
            LOG.warn("OffsetOutOfRange: Updating offset from offset = " + offset + " to offset = " + newOffset);
            offset = newOffset;
            msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
        }
    

    如果TopicOffsetOutOfRangeException,则从最旧的消息开始读。

    (3)发送消息并更新offset

    long endoffset = offset;
        for (MessageAndOffset msg : msgs) {
            emit(collector, msg.message());
            endoffset = msg.nextOffset();
        }
    

    每发送一条消息则将endoffset往后移一位,直到发送完时,endoffset就是下一个需要处理的offset。

    (4)构建下一个meta并返回

        Map newMeta = new HashMap();
        newMeta.put("offset", offset);
        newMeta.put("nextOffset", endoffset);
        newMeta.put("instanceId", _topologyInstanceId);
        newMeta.put("partition", partition.partition);
        newMeta.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port));
        newMeta.put("topic", _config.topic);
        newMeta.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId));
        return newMeta;
    

    关于metric的设置以及读取kafka消息的实现,下面单独介绍

    (五)事务型spout

    1、emitPartitionBatchNew()

    当某个batch第一次发送时调用此方法,返回是这个batch相关的元数据,可用于重构这个batch。如果这个batch出错需要重发,则调用emitPartitionBatch(),下面再介绍。

            /**
             * Emit a batch of tuples for a partition/transaction that's never been emitted before.
             * Return the metadata that can be used to reconstruct this partition/batch in the future.
             */
            @Override
            public Map emitPartitionBatchNew(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
                return failFastEmitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
            }
    

    与透明型不同的是,它没有捕获FailedFetchException这个异常,因此出现获取消息失败时,会一直等待某个分区恢复。其它处理逻辑与透明型相同,参考上面的介绍即可。

    2、emitPartitionBatch()

            /**
             * Emit a batch of tuples for a partition/transaction that has been emitted before, using
             * the metadata created when it was first emitted.
             */
            @Override
            public void emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
                reEmitPartitionBatch(transactionAttempt, tridentCollector, partition, map);
            }
    

    当一个batch之前已经发送过,但失败了,则调用此方法重试。

    3、reEmitPartitionBatch()

    重试发送消息的主要实现,逻辑也相对简单。
    直接去fetch消息。如果消息不为空的话,则判断offset:
    * 如果offset与nextoffset相等,则表示消息已经处理完了
    * 如果offset>nextOffset,则出错了,抛出以下运行时异常:

        throw new RuntimeException("Error when re-emitting batch. overshot the end offset");
    

    最后发送消息,并更新nextOffset。
    完整代码如下:

    private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map meta) {
        LOG.info("re-emitting batch, attempt " + attempt);
        String instanceId = (String) meta.get("instanceId");
        if (!_config.ignoreZkOffsets || instanceId.equals(_topologyInstanceId)) {
            SimpleConsumer consumer = _connections.register(partition);
            long offset = (Long) meta.get("offset");
            long nextOffset = (Long) meta.get("nextOffset");
            ByteBufferMessageSet msgs = null;
            msgs = fetchMessages(consumer, partition, offset);
    
            if(msgs != null) {
                for (MessageAndOffset msg : msgs) {
                    if (offset == nextOffset) {
                        break;
                    }
                    if (offset > nextOffset) {
                        throw new RuntimeException("Error when re-emitting batch. overshot the end offset");
                    }
                    emit(collector, msg.message());
                    offset = msg.nextOffset();
                }
            }
        }
    }
    

    (六)2种spout的公共方法

    1、refreshPartitions()

    根据注释可知,当处理一些新的分区时,管理到这些分区的连接信息。

      /**
             * This method is called when this task is responsible for a new set of partitions. Should be used
             * to manage things like connections to brokers.
             */
            @Override
            public void refreshPartitions(List<Partition> partitions) {
                refresh(partitions);
            }
    

    2、getOrderedPartitions()

    getOrderedPartitions()方法会在分区元数据发生变化(即Partitions发生变化)时被调用。该方法与refreshPartitions()方法调用时机相同,用来应对分区的变化。例如,建立并维护与新增加Partitions的连接时就可以使用这个方法。

    3、close()

    看下面的实现,其实refreshPartitions()和close()都只是简单的清空了连接,而getOrderedPartitions是获取分区信息。

    private void clear() {
        _connections.clear();
    }
    
    private List<Partition> orderPartitions(GlobalPartitionInformation partitions) {
        return partitions.getOrderedPartitions();
    }
    
    private void refresh(List<Partition> list) {
        _connections.clear();
        _kafkaOffsetMetric.refreshPartitions(new HashSet<Partition>(list));
    }
    

    4、Partitions与Partition

    Partitions含义为分区的元数据,如一共存在多少个分区,分区所在的broker等,具体信息由用户定义,不过这些信息一般是比较稳定的。在kafka中,是通过以下代码指定的:

    new ZkHosts(brokerHosts)
    

    看如何将zk中的信息导入Partitions的:

    Partition则是某个具体的分区了。

    在coordinator的getPartitionsForBatch()中指定。

    (七)fetch消息的逻辑

    _connection包括了一些连接信息,如broker,端口,分区id等,通过它可以获取到一个simpleConsumer,下面重点分析这个获取消息的过程。

     msgs = fetchMessages(consumer, partition, offset);
    

    1、fetchMessages()

    private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) {
        long start = System.nanoTime();
        ByteBufferMessageSet msgs = null;
        msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
        long end = System.nanoTime();
        long millis = (end - start) / 1000000;
        _kafkaMeanFetchLatencyMetric.update(millis);
        _kafkaMaxFetchLatencyMetric.update(millis);
        return msgs;
    }
    

    主要调用 KafkaUtils.fetchMessages(_config, consumer, partition, offset);其余代码用于更新metric,统计获取消息的平均时长以及最大时长。

    2、KafkaUtil.fetchMessages()

    逻辑很简单,构建一个FetchRequest,然后得到FetchResponse。此外就是一些处理异常的代码了

    public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset)
            throws TopicOffsetOutOfRangeException, FailedFetchException,RuntimeException {
        ByteBufferMessageSet msgs = null;
        String topic = config.topic;
        int partitionId = partition.partition;
        FetchRequestBuilder builder = new FetchRequestBuilder();
        FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes).
                clientId(config.clientId).maxWait(config.fetchMaxWait).build();
        FetchResponse fetchResponse;
        try {
            fetchResponse = consumer.fetch(fetchRequest);
        } catch (Exception e) {
            if (e instanceof ConnectException ||
                    e instanceof SocketTimeoutException ||
                    e instanceof IOException ||
                    e instanceof UnresolvedAddressException
                    ) {
                LOG.warn("Network error when fetching messages:", e);
                throw new FailedFetchException(e);
            } else {
                throw new RuntimeException(e);
            }
        }
        if (fetchResponse.hasError()) {
            KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
            if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) {
                String msg = "Got fetch request with offset out of range: [" + offset + "]";
                LOG.warn(msg);
                throw new TopicOffsetOutOfRangeException(msg);
            } else {
                String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
                LOG.error(message);
                throw new FailedFetchException(message);
            }
        } else {
            msgs = fetchResponse.messageSet(topic, partitionId);
        }
        return msgs;
    }
    

    (八)KafkaOffsetMetric

    TODO:还有其它metric吧

    storm-kafka中定义了一个metric用来计算目前正在处理的offset与最新的offset之间有多少差距,即落后了多少条数据。

    这个类定义在KafkaUtil中,主要有2个核心变量:
    _partitionToOffset是一个hashMap,内容为(分区,正在处理的offset)
    _partitions就是_partitionToOffset的key组成的一个集合。

    public static class KafkaOffsetMetric implements IMetric {
        Map<Partition, Long> _partitionToOffset = new HashMap<Partition, Long>();
        Set<Partition> _partitions;
        String _topic;
        DynamicPartitionConnections _connections;
    
        public KafkaOffsetMetric(String topic, DynamicPartitionConnections connections) {
            _topic = topic;
            _connections = connections;
        }
    
        public void setLatestEmittedOffset(Partition partition, long offset) {
            _partitionToOffset.put(partition, offset);
        }
    
        @Override
        public Object getValueAndReset() {
            try {
                long totalSpoutLag = 0;
                long totalEarliestTimeOffset = 0;
                long totalLatestTimeOffset = 0;
                long totalLatestEmittedOffset = 0;
                HashMap ret = new HashMap();
                if (_partitions != null && _partitions.size() == _partitionToOffset.size()) {
                    for (Map.Entry<Partition, Long> e : _partitionToOffset.entrySet()) {
                        Partition partition = e.getKey();
                        SimpleConsumer consumer = _connections.getConnection(partition);
                        if (consumer == null) {
                            LOG.warn("partitionToOffset contains partition not found in _connections. Stale partition data?");
                            return null;
                        }
                        long latestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.LatestTime());
                        long earliestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
                        if (latestTimeOffset == KafkaUtils.NO_OFFSET) {
                            LOG.warn("No data found in Kafka Partition " + partition.getId());
                            return null;
                        }
                        long latestEmittedOffset = e.getValue();
                        long spoutLag = latestTimeOffset - latestEmittedOffset;
                        ret.put(_topic + "/" + partition.getId() + "/" + "spoutLag", spoutLag);
                        ret.put(_topic + "/" + partition.getId() + "/" + "earliestTimeOffset", earliestTimeOffset);
                        ret.put(_topic + "/" + partition.getId() + "/" + "latestTimeOffset", latestTimeOffset);
                        ret.put(_topic + "/" + partition.getId() + "/" + "latestEmittedOffset", latestEmittedOffset);
                        totalSpoutLag += spoutLag;
                        totalEarliestTimeOffset += earliestTimeOffset;
                        totalLatestTimeOffset += latestTimeOffset;
                        totalLatestEmittedOffset += latestEmittedOffset;
                    }
                    ret.put(_topic + "/" + "totalSpoutLag", totalSpoutLag);
                    ret.put(_topic + "/" + "totalEarliestTimeOffset", totalEarliestTimeOffset);
                    ret.put(_topic + "/" + "totalLatestTimeOffset", totalLatestTimeOffset);
                    ret.put(_topic + "/" + "totalLatestEmittedOffset", totalLatestEmittedOffset);
                    return ret;
                } else {
                    LOG.info("Metrics Tick: Not enough data to calculate spout lag.");
                }
            } catch (Throwable t) {
                LOG.warn("Metrics Tick: Exception when computing kafkaOffset metric.", t);
            }
            return null;
        }
    
        public void refreshPartitions(Set<Partition> partitions) {
            _partitions = partitions;
            Iterator<Partition> it = _partitionToOffset.keySet().iterator();
            while (it.hasNext()) {
                if (!partitions.contains(it.next())) {
                    it.remove();
                }
            }
        }
    }
    

    这个metric只在2个地方被调用:
    (1)第一次读取一个分区时

    _kafkaOffsetMetric.setLatestEmittedOffset(partition, (Long) ret.get("offset"));
    

    (2)refresh时

    _kafkaOffsetMetric.refreshPartitions(new HashSet<Partition>(list));
    

    refreshPartitions()时会调用refresh方法。This method is called when this task is responsible for a new set of partitions. Should be used to manage things like connections to brokers.

    展开全文
  • kafka源码分析之副本管理-ReplicaManager

    千次阅读 2016-07-27 10:25:52
    kafka源码分析 kafka日志的副本管理 kafka副本同步
  • Kafka源码分析系列》的目录
  • kafka-cluster源码 kafka-leader选举源码 kafka 源码
  • kafka源码分析之kafka启动-SocketServer

    千次阅读 2016-07-27 10:14:39
    kafka socketserver的处理流程源码, kafka源码
  • Apache Kafka源码分析-模块简介 1 概述 一个开源分布式发布订阅消息系统,基于对磁盘文件的顺序存取实现在廉价硬件基础上提供高吞吐量、易扩展、随机消费等特点已被广泛使用。 2 目的  以源码入手对Kafka架构...
  • 1、前置安装包 需安装 JDK 1.8 、 Gradle 3.1 Scala 2.10 scala下载地址 ... 2、在windows上部署和启动zookeeper ... 修改下zoo.cfg文件 ...3、使用gradle来构建Kafka源码 使用命令 gradl
  • Kafka源码分析 v0.01

    2019-01-30 18:59:25
    Apache Kafka源码剖析&gt; https://blog.csdn.net/rigete/article/details/49663327 eclipse的scala插件在eclipse market搜下,需要eclipse 4.7及以上版本 scala的ide可以用...
  • Kafka需要Gradle 2.0或更高版本。 为了在运行时支持Java 7和Java 8,应该使用Java 7进行构建。 首先引导并下载包装器 cd kafka_source_dir gradle 现在,其他所有内容都将起作用。 建造一个罐子并运行它 ./gradlew...
  • kafka 源码分析02

    2020-05-02 23:12:42
    06 | 请求通道:如何实现Kafka请求队列
  • kafka作为常用的消息中间件,在各大企业得到广泛应用,笔者从今天开始将陆续对kafka 0.10.1进行源码分析。工欲善其事必先利其器,在阅读Kafka源码之前,首先第一步得搭建源码阅读环境,选择合适的工具可以使我们的...
  • kafka 源码分析01

    2020-05-01 23:25:47
    总的来说,Kafka 日志对象由多个日志段对象组成,而每个日志段对象会在磁盘上创建一组文件,包括消息日志文件(.log)、位移索引文件(.index)、时间戳索引文件(.timeindex)以及已中止(Aborted)事务的索引文件...
  • 源码分析之网络层(一)》,本编主要介绍在Processor中使用的nio selector的又一封装,负责具体数据的接收和发送。 PS:丰富的一线技术、多元化的表现形式,尽在“HULK一线技术杂谈”,点关注哦! 对...
  • kafka server的健康状态管理源码分析
  • kafka源码分析_服务端

    2020-05-27 16:44:04
    基础流程 - V2.5 ...启动日志分析 starting (kafka.server.KafkaServer) -- 启动kafkaServer Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer) -- kafkaServer链接zk [Zoo...
  • kafka源码分析

    千次阅读 2017-12-23 14:33:59
    package kafka import java.util.Properties import joptsimple.OptionParser import kafka.server.{KafkaServer, KafkaServerStartable} import kafka.utils.{CommandLineUtils, Logging} import org.apache.kafk
  • 本文描述了kafka的controller的实现原理,并对其源代码的实现进行了讲解。 controller运行原理 在Kafka集群中,controller多个broker中的一个(也只有一个controller),它除了实现正常的broker的功能外,还负责...
  • 一、软件环境 JDK:1.8.0_112 Scala:2.10.6 Gradle:3.2.1 Zookeeper:3.4.5-cdh5.7.1 OS:Mac OS X 10.10.5 ...由于Kafka是依赖ZK的,需要先启动ZK服务。...2.2 下载Kafka源代码 ...可以从Apache官网下载Kafka源码包...
  • KAFKA源码分析及整理

    千次阅读 2018-05-28 14:42:28
    gitee:https://gitee.com/yuanyihan/yyh-idea-kafka一、Kafka生产者发送消息示例 注意:以下所用kafka版本为0.10.1.0 KafkaProducer是线程安全对象,建议可以将其封装成多线程共享一个实例,效率反而比多实例更高...
  • kafka中相同的group下多个consumer的负载均衡处理源码分析

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 23,773
精华内容 9,509
关键字:

kafka源码分析