精华内容
下载资源
问答
  • topic
    千次阅读
    2022-03-14 15:25:03

    本文依然是以kafka0.8.2.2为例讲解

    一,如何删除一个topic

    删除一个topic有两个关键点:

    1,配置删除参数

    delete.topic.enable这个Broker参数配置为True。

    2,执行

    bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name

    假如不配置删除参数为true的话,topic其实并没有被清除,只是被标记为删除。此时,估计一般人的做法是删除topic在Zookeeper的信息和日志,其实这个操作并不会清除kafkaBroker内存的topic数据。所以,此时最佳的策略是配置删除参数为true然后,重启kafka。

    二,重要的类介绍

    1,PartitionStateMachine

    该类代表分区的状态机。决定者分区的当前状态,和状态转移。四种状态

    NonExistentPartition

    NewPartition

    OnlinePartition

    OfflinePartition

    2,ReplicaManager

    负责管理当前机器的所有副本,处理读写、删除等具体动作。

    读写:写获取partition对象,再获取Replica对象,再获取Log对象,采用其管理的Segment对象将数据写入、读出。

    3,ReplicaStateMachine

    副本的状态机。决定者副本的当前状态和状态之间的转移。一个副本总共可以处于一下几种状态的一种

    NewReplica:Crontroller在分区重分配的时候可以创建一个新的副本。只能接受变为follower的请求。前状态可以是NonExistentReplica

    OnlineReplica:新启动的分区,能接受变为leader或者follower请求。前状态可以是NewReplica, OnlineReplica or OfflineReplica

    OfflineReplica:死亡的副本处于这种状态。前状态可以是NewReplica, OnlineReplica

    ReplicaDeletionStarted:分本删除开始的时候处于这种状态,前状态是OfflineReplica

    ReplicaDeletionSuccessful:副本删除成功。前状态是ReplicaDeletionStarted

    ReplicaDeletionIneligible:删除失败的时候处于这种状态。前状态是ReplicaDeletionStarted

    NonExistentReplica:副本成功删除之后处于这种状态,前状态是ReplicaDeletionSuccessful

    4,TopicDeletionManager

    该类管理着topic删除的状态机

    1),TopicCommand通过创建/admin/delete_topics/,来发布topic删除命令。

    2),Controller监听/admin/delete_topic子节点变动,开始分别删除topic。想学习交流HashMap,nginx、dubbo、Spring MVC,分布式、高性能高可用、MySQL,redis、jvm、多线程、netty、kafka、的加尉xin(同英):1253431195 扩列获取资料学习,无工作经验不要加哦!

    3),Controller有个后台线程负责删除Topic

    三,源码彻底解析topic的删除过程

    此处会分四个部分:

    A),客户端执行删除命令作用

    B),不配置delete.topic.enable整个流水的源码

    C),配置了delete.topic.enable整个流水的源码

    D),手动删除zk上topic信息和磁盘数据

    1,客户端执行删除命令

    bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name

    进入kafka-topics.sh我们会看到

    exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand $@

    进入TopicCommand里面,main方法里面

    else if(opts.options.has(opts.deleteOpt))
    deleteTopic(zkClient, opts)

    实际内容是

    val topics = getTopics(zkClient, opts)
    if (topics.length == 0) {
    println(“Topic %s does not exist”.format(opts.options.valueOf(opts.topicOpt)))
    }
    topics.foreach { topic =>
    try {
    ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic))
    在"/admin/delete_topics"目录下创建了一个topicName的节点。

    2,假如不配置delete.topic.enable整个流水是

    总共有两处listener会响应:

    A),TopicChangeListener

    B),DeleteTopicsListener

    使用topic的删除命令删除一个topic的话,指挥触发DeleteTopicListener。

    var topicsToBeDeleted = {
    import JavaConversions._
    (children: Buffer[String]).toSet
    }
    val nonExistentTopics = topicsToBeDeleted.filter(t => !controllerContext.allTopics.contains(t))
    topicsToBeDeleted --= nonExistentTopics
    if(topicsToBeDeleted.size > 0) {
    info(“Starting topic deletion for topics " + topicsToBeDeleted.mkString(”,"))
    // mark topic ineligible for deletion if other state changes are in progress
    topicsToBeDeleted.foreach { topic =>
    val preferredReplicaElectionInProgress =
    controllerContext.partitionsUndergoingPreferredReplicaElection.map(.topic).contains(topic)
    val partitionReassignmentInProgress =
    controllerContext.partitionsBeingReassigned.keySet.map(
    .topic).contains(topic)
    if(preferredReplicaElectionInProgress || partitionReassignmentInProgress)
    controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
    }
    // add topic to deletion list
    controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
    }
    由于都会判断delete.topic.enable是否为true,假如不为true就不会执行,为true就进入执行

    controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))

    controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)

    3,delete.topic.enable配置为true

    此处与步骤2的区别,就是那两个处理函数。

    controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
    controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
    markTopicIneligibleForDeletion函数的处理为
    if(isDeleteTopicEnabled) {
    val newTopicsToHaltDeletion = topicsToBeDeleted & topics
    topicsIneligibleForDeletion ++= newTopicsToHaltDeletion
    if(newTopicsToHaltDeletion.size > 0)
    info(“Halted deletion of topics %s”.format(newTopicsToHaltDeletion.mkString(",")))
    }
    主要是停止删除topic,假如存储以下三种情况

    • Halt delete topic if -
      1. replicas being down
      1. partition reassignment in progress for some partitions of the topic
      1. preferred replica election in progress for some partitions of the topic

    enqueueTopicsForDeletion主要作用是更新删除topic的集合,并激活TopicDeleteThread

    def enqueueTopicsForDeletion(topics: Set[String]) {
    if(isDeleteTopicEnabled) {
    topicsToBeDeleted ++= topics
    partitionsToBeDeleted ++= topics.flatMap(controllerContext.partitionsForTopic)
    resumeTopicDeletionThread()
    }
    }
    在删除线程DeleteTopicsThread的doWork方法中

    topicsQueuedForDeletion.foreach { topic =>
    // if all replicas are marked as deleted successfully, then topic deletion is done
    if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {
    // clear up all state for this topic from controller cache and zookeeper
    completeDeleteTopic(topic)
    info(“Deletion of topic %s successfully completed”.format(topic))
    }
    进入completeDeleteTopic方法中

    // deregister partition change listener on the deleted topic. This is to prevent the partition change listener
    // firing before the new topic listener when a deleted topic gets auto created
    partitionStateMachine.deregisterPartitionChangeListener(topic)
    val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic,ReplicaDeletionSuccessful)
    // controller will remove this replica from the state machine as well as its partition assignment cache
    replicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica)
    val partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic)
    // move respective partition to OfflinePartition and NonExistentPartition state
    partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition)
    partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition)
    topicsToBeDeleted -= topic
    partitionsToBeDeleted.retain(_.topic != topic)
    controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
    controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicConfigPath(topic))
    controllerContext.zkClient.delete(ZkUtils.getDeleteTopicPath(topic))
    controllerContext.removeTopic(topic)
    主要作用是解除掉监控分区变动的listener,删除Zookeeper具体节点信息,删除磁盘数据,更新内存数据结构,比如从副本状态机里面移除分区的具体信息。

    其实,最终要的是我们的副本磁盘数据是如何删除的。我们重点介绍这个部分。

    首次清除的话,在删除线程DeleteTopicsThread的doWork方法中

    {
    // if you come here, then no replica is in TopicDeletionStarted and all replicas are not in
    // TopicDeletionSuccessful. That means, that either given topic haven’t initiated deletion
    // or there is at least one failed replica (which means topic deletion should be retried).
    if(controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
    // mark topic for deletion retry
    markTopicForDeletionRetry(topic)
    }

    进入markTopicForDeletionRetry
    val failedReplicas = controller.replicaStateMachine.replicasInState(topic,ReplicaDeletionIneligible)
    info(“Retrying delete topic for topic %s since replicas %s were not successfully deleted”
    .format(topic, failedReplicas.mkString(",")))
    controller.replicaStateMachine.handleStateChanges(failedReplicas, OfflineReplica)
    在ReplicaStateMachine的handleStateChanges方法中,调用了handleStateChange,处理OfflineReplica
    // send stop replica command to the replica so that it stops fetching from the leader
    brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition,deletePartition = false)
    接着在handleStateChanges中
    brokerRequestBatch.sendRequestsToBrokers(controller.epoch,controllerContext.correlationId.getAndIncrement)
    给副本数据存储节点发送StopReplicaKey副本指令,并开始删除数据
    stopReplicaRequestMap foreach { case(broker, replicaInfoList) =>
    val stopReplicaWithDelete = replicaInfoList.filter(p => p.deletePartition == true).map(i => i.replica).toSet
    val stopReplicaWithoutDelete = replicaInfoList.filter(p => p.deletePartition == false).map(i => i.replica).toSet
    debug(“The stop replica request (delete = true) sent to broker %d is %s”
    .format(broker, stopReplicaWithDelete.mkString(",")))
    debug(“The stop replica request (delete = false) sent to broker %d is %s”
    .format(broker, stopReplicaWithoutDelete.mkString(",")))
    replicaInfoList.foreach { r =>
    val stopReplicaRequest = new StopReplicaRequest(r.deletePartition,
    Set(TopicAndPartition(r.replica.topic, r.replica.partition)), controllerId, controllerEpoch,correlationId)
    controller.sendRequest(broker, stopReplicaRequest, r.callback)
    }
    }
    stopReplicaRequestMap.clear()
    Broker的KafkaApis的Handle方法在接受到指令后
    case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)

    val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)
    接着是在stopReplicas方法中
    {
    controllerEpoch = stopReplicaRequest.controllerEpoch
    // First stop fetchers for all partitions, then stop the corresponding replicas
    replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map(r =>TopicAndPartition(r.topic, r.partition)))
    for(topicAndPartition <- stopReplicaRequest.partitions){
    val errorCode = stopReplica(topicAndPartition.topic, topicAndPartition.partition,stopReplicaRequest.deletePartitions)
    responseMap.put(topicAndPartition, errorCode)
    }
    (responseMap, ErrorMapping.NoError)
    }
    进一步进入stopReplica方法,正式进入日志删除
    getPartition(topic, partitionId) match {
    case Some(partition) =>
    if(deletePartition) {
    val removedPartition = allPartitions.remove((topic, partitionId))
    if (removedPartition != null)
    removedPartition.delete() // this will delete the local log
    }
    以上就是kafka的整个日志删除流水。

    4,手动删除zk上topic信息和磁盘数据

    TopicChangeListener会监听处理,但是处理很简单,只是更新了

    val deletedTopics = controllerContext.allTopics – currentChildren
    controllerContext.allTopics = currentChildren

    val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient,newTopics.toSeq)
    controllerContext.partitionReplicaAssignment =controllerContext.partitionReplicaAssignment.filter(p =>
    四,总结

    Kafka的topic的删除过程,实际上就是基于Zookeeper做了一个订阅发布系统。Zookeeper的客户端创建一个节点/admin/delete_topics/,由kafka Controller监听到事件之后正式触发topic的删除:解除Partition变更监听的listener,清除内存数据结构,删除副本数据,删除topic的相关Zookeeper节点。想学习交流HashMap,nginx、dubbo、Spring MVC,分布式、高性能高可用、MySQL,redis、jvm、多线程、netty、kafka、的加尉xin(同英):1253431195 扩列获取资料学习,无工作经验不要加哦!

    delete.topic.enable配置该参数为false的情况下执行了topic的删除命令,实际上未做任何动作。我们此时要彻底删除topic建议修改该参数为true,重启kafka,这样topic信息会被彻底删除,已经测试。

    一般流行的做法是手动删除Zookeeper的topic相关信息及磁盘数据但是这样的话会造成部分内存数据未清除。至于是否会有隐患,未测试。

    更多相关内容
  • 问题:一个consumer订阅两个topic,其中一个topic消息过多堆积了,会影响另一个topic消费吗 答案:不影响。 为什么呢? 因为rocketmq首先对消息进行负载均衡(rebalance),就是将topic中的队列按照consumer进行分配...

    问题:一个consumer订阅两个topic,其中一个topic消息过多堆积了,会影响另一个topic消费吗
    答案:不影响。
    为什么呢?

    • 因为rocketmq首先对消息进行负载均衡(rebalance),就是将topic中的队列按照consumer进行分配之后,将pullRequest(里面存放了topic,brokerName,queueId)放入到一个linkedBlockingQueue中,这个时候已经排好了后面消费的顺序。例如100个request中有大概50个是TopicTest1,另外50个是TopicTest2。

    • 在实际进行拉取的时候,多个线程从LinkedBlockingQueue中去take消息,按照放入的顺序进行消费,这个时候topicTest1消息已经堆积了,但还是会照常去消费TopicTest2,如果处理速度不影响的话,只有当TopicTest消费的速度过慢,导致将所有的线程都占住了,那也只是一时的,会影响TopicTest2的消费速度,但不会阻塞TopicTest2的消费。

    • 如下图所示,PullMessageService线程负责将request放入队列,同时NettyClientPublicExecutor_1,2,3,4负责处理消息拉取,他们会并发从队列中获取消息,当获取到消息并处理完后会接着去take新的消息然后继续处理。

    -------放入消息-----:MessageQueue [topic=%RETRY%testConsumer, brokerName=broker-a, queueId=0]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=1]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=0]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_4
    处理TopicTest2...NettyClientPublicExecutor_4
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=1]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=0]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_3
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_1
    处理TopicTest2...NettyClientPublicExecutor_1
    拉取到消息:TopicTest2,NettyClientPublicExecutor_2
    处理TopicTest2...NettyClientPublicExecutor_2
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_3
    处理TopicTest2...NettyClientPublicExecutor_3
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_4
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_4
    处理TopicTest2...NettyClientPublicExecutor_4
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_1
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_1
    处理TopicTest2...NettyClientPublicExecutor_1
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_2
    拉取到消息:TopicTest2,NettyClientPublicExecutor_3
    处理TopicTest2...NettyClientPublicExecutor_3
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_2
    处理TopicTest2...NettyClientPublicExecutor_2
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_4
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_4
    处理TopicTest2...NettyClientPublicExecutor_4
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_1
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_1
    处理TopicTest2...NettyClientPublicExecutor_1
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_3
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    -------放入消息-----:MessageQueue [topic=%RETRY%testConsumer, brokerName=broker-a, queueId=0]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=0]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=1]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_3
    处理TopicTest2...NettyClientPublicExecutor_3
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_2
    拉取到消息:TopicTest2,NettyClientPublicExecutor_2
    处理TopicTest2...NettyClientPublicExecutor_2
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_4
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_4
    处理TopicTest2...NettyClientPublicExecutor_4
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_1
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_1
    处理TopicTest2...NettyClientPublicExecutor_1
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_3
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_3
    处理TopicTest2...NettyClientPublicExecutor_3
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_2
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_2
    处理TopicTest2...NettyClientPublicExecutor_2
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_4
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_4
    处理TopicTest2...NettyClientPublicExecutor_4
    拉取到消息:TopicTest1,NettyClientPublicExecutor_1
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_1
    处理TopicTest2...NettyClientPublicExecutor_1
    拉取到消息:TopicTest1,NettyClientPublicExecutor_3
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_3
    处理TopicTest2...NettyClientPublicExecutor_3
    -------放入消息-----:MessageQueue [topic=%RETRY%testConsumer, brokerName=broker-a, queueId=0]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=0]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=1]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_2
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_2
    处理TopicTest2...NettyClientPublicExecutor_2
    拉取到消息:TopicTest1,NettyClientPublicExecutor_4
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_4
    处理TopicTest2...NettyClientPublicExecutor_4
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_3
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_1
    处理TopicTest2...NettyClientPublicExecutor_1
    拉取到消息:TopicTest2,NettyClientPublicExecutor_3
    处理TopicTest2...NettyClientPublicExecutor_3
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_2
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_2
    处理TopicTest2...NettyClientPublicExecutor_2
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_4
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_4
    处理TopicTest2...NettyClientPublicExecutor_4
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_1
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_3
    处理TopicTest2...NettyClientPublicExecutor_3
    拉取到消息:TopicTest2,NettyClientPublicExecutor_1
    处理TopicTest2...NettyClientPublicExecutor_1
    -------放入消息-----:MessageQueue [topic=%RETRY%testConsumer, brokerName=broker-a, queueId=0]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_2
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=1]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=0]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_2
    处理TopicTest2...NettyClientPublicExecutor_2
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_4
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_4
    处理TopicTest2...NettyClientPublicExecutor_4
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_3
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_3
    处理TopicTest2...NettyClientPublicExecutor_3
    拉取到消息:TopicTest1,NettyClientPublicExecutor_1
    拉取到消息:TopicTest2,NettyClientPublicExecutor_1
    处理TopicTest2...NettyClientPublicExecutor_1
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_2
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_2
    处理TopicTest2...NettyClientPublicExecutor_2
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_4
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_4
    处理TopicTest2...NettyClientPublicExecutor_4
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_3
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_1
    处理TopicTest2...NettyClientPublicExecutor_1
    拉取到消息:TopicTest2,NettyClientPublicExecutor_3
    处理TopicTest2...NettyClientPublicExecutor_3
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_2
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    -------放入消息-----:MessageQueue [topic=%RETRY%testConsumer, brokerName=broker-a, queueId=0]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=0]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=1]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_2
    处理TopicTest2...NettyClientPublicExecutor_2
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_4
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_4
    处理TopicTest2...NettyClientPublicExecutor_4
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_3
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_1
    处理TopicTest2...NettyClientPublicExecutor_1
    拉取到消息:TopicTest2,NettyClientPublicExecutor_3
    处理TopicTest2...NettyClientPublicExecutor_3
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_2
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_2
    处理TopicTest2...NettyClientPublicExecutor_2
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_4
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_4
    处理TopicTest2...NettyClientPublicExecutor_4
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_1
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_1
    处理TopicTest2...NettyClientPublicExecutor_1
    -------放入消息-----:MessageQueue [topic=%RETRY%testConsumer, brokerName=broker-a, queueId=0]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=1]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=0]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_2
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_3
    处理TopicTest2...NettyClientPublicExecutor_3
    拉取到消息:TopicTest2,NettyClientPublicExecutor_2
    处理TopicTest2...NettyClientPublicExecutor_2
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_4
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_4
    处理TopicTest2...NettyClientPublicExecutor_4
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_1
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_1
    处理TopicTest2...NettyClientPublicExecutor_1
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_2
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_3
    处理TopicTest2...NettyClientPublicExecutor_3
    拉取到消息:TopicTest2,NettyClientPublicExecutor_2
    处理TopicTest2...NettyClientPublicExecutor_2
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_4
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_4
    处理TopicTest2...NettyClientPublicExecutor_4
    -------放入消息-----:MessageQueue [topic=%RETRY%testConsumer, brokerName=broker-a, queueId=0]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=0]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=1]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_1
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_1
    处理TopicTest2...NettyClientPublicExecutor_1
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_3
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_2
    处理TopicTest2...NettyClientPublicExecutor_2
    拉取到消息:TopicTest2,NettyClientPublicExecutor_3
    处理TopicTest2...NettyClientPublicExecutor_3
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_4
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_4
    处理TopicTest2...NettyClientPublicExecutor_4
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_1
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_1
    处理TopicTest2...NettyClientPublicExecutor_1
    拉取到消息:TopicTest1,NettyClientPublicExecutor_2
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_2
    处理TopicTest2...NettyClientPublicExecutor_2
    拉取到消息:TopicTest2,NettyClientPublicExecutor_3
    处理TopicTest2...NettyClientPublicExecutor_3
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_4
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_4
    处理TopicTest2...NettyClientPublicExecutor_4
    -------放入消息-----:MessageQueue [topic=%RETRY%testConsumer, brokerName=broker-a, queueId=0]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=0]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=1]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_1
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_1
    处理TopicTest2...NettyClientPublicExecutor_1
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService
    -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService
    拉取到消息:TopicTest1,NettyClientPublicExecutor_3
    -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService
    拉取到消息:TopicTest2,NettyClientPublicExecutor_2
    处理TopicTest2...NettyClientPublicExecutor_2
    拉取到消息:TopicTest2,NettyClientPublicExecutor_3
    处理TopicTest2...NettyClientPublicExecutor_3
    
    展开全文
  • kakfa 3.0 创建topic流程(源码)

    千次阅读 2022-03-13 02:34:57
    文章目录1、通过create命令到组装创建topic需要的数据流程(scala部分)2、创建topic的请求实现,再通过队列线程异步执行实现(1)runnable.call(队列和线程后台线程执行)(2)getCreateTopicsCall(创建发送创建topic的...

    1、通过create命令到组装创建topic需要的数据流程(scala部分)

    首先创建kafka topic的命令是下面这个

    bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name \--partitions 20 --replication-factor 3 --config x=y
    

    --bootstrap-server 某一台kafka服务器地址和端口
    --create 代表这个命令是创建
    --topic 后面是想创建的topic
    partitions 主动设置分区数
    --replication-factor 主动设置一个分区数中有几个副本
    --config x=y 在命令行上添加的配置会覆盖服务器的默认设置,例如数据应该保留的时间长度。此处记录了完整的每个主题配置集。选填

    之后再看kafka-topics.sh 里面的命令

    exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
    

    知道了其实是执行了源码core/src/main/scala/kafka/admin/TopicCommand.scala文件中的方法
    这里需要注意的是从kafka 2.8以后,删除了ZooKeeper,通过KRaft进行自己的集群管理,所以下面源码中没有ZookeeperTopicService 这个创建topic的方法了

    
    object TopicCommand extends Logging {
    
      def main(args: Array[String]): Unit = {
        val opts = new TopicCommandOptions(args)
        opts.checkArgs()
    	//初始化得到实例化的topicService
        val topicService = TopicService(opts.commandConfig, opts.bootstrapServer)
    
        var exitCode = 0
        try {
          if (opts.hasCreateOption)
          	//这个是通过判断命令中的是否是--create 关键字来判断是否执行createTopic
            topicService.createTopic(opts)
          else if (opts.hasAlterOption)
            topicService.alterTopic(opts)
          else if (opts.hasListOption)
            topicService.listTopics(opts)
          else if (opts.hasDescribeOption)
            topicService.describeTopic(opts)
          else if (opts.hasDeleteOption)
            topicService.deleteTopic(opts)
        } catch {
          case e: ExecutionException =>
            if (e.getCause != null)
              printException(e.getCause)
            else
              printException(e)
            exitCode = 1
          case e: Throwable =>
            printException(e)
            exitCode = 1
        } finally {
          topicService.close()
          Exit.exit(exitCode)
        }
      }
    
    

    TopicService(opts.commandConfig, opts.bootstrapServer) 执行的是下面的方法中的apply

     object TopicService {
        def createAdminClient(commandConfig: Properties, bootstrapServer: Option[String]): Admin = {
          bootstrapServer match {
            case Some(serverList) => commandConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, serverList)
            case None =>
          }
          Admin.create(commandConfig)
        }
    
        def apply(commandConfig: Properties, bootstrapServer: Option[String]): TopicService =
          new TopicService(createAdminClient(commandConfig, bootstrapServer))
      }
    

    之后又调用的createAdminClient创建的一个客户端,来创建topic

    下面就是验证参数,是否指定参数设置等等,之后调用新创建的clien创建topic

    case class TopicService private (adminClient: Admin) extends AutoCloseable {
    
        def createTopic(opts: TopicCommandOptions): Unit = {
          //创建一个topic,把输入参数,比如分区数,副本数等等参数设置上
          val topic = new CommandTopicPartition(opts)
          if (Topic.hasCollisionChars(topic.name)) //检查topic名称中的特殊字符
            println("WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could " +
              "collide. To avoid issues it is best to use either, but not both.")
          createTopic(topic)
        }
    
        def createTopic(topic: CommandTopicPartition): Unit = {
          // //如果配置了副本副本数--replication-factor 一定要大于0
          if (topic.replicationFactor.exists(rf => rf > Short.MaxValue || rf < 1))
            throw new IllegalArgumentException(s"The replication factor must be between 1 and ${Short.MaxValue} inclusive")
          //如果配置了--partitions 分区数 必须大于0
          if (topic.partitions.exists(partitions => partitions < 1))
            throw new IllegalArgumentException(s"The partitions must be greater than 0")
    
          try {
            val newTopic = if (topic.hasReplicaAssignment)
              // 如果指定了--replica-assignment参数;则按照指定的来分配副本
              new NewTopic(topic.name, asJavaReplicaReassignment(topic.replicaAssignment.get))
            else {
              new NewTopic(
                topic.name,
                topic.partitions.asJava,
                topic.replicationFactor.map(_.toShort).map(Short.box).asJava)
            }
            //将配置--config 解析成一个配置map
            val configsMap = topic.configsToAdd.stringPropertyNames()
              .asScala
              .map(name => name -> topic.configsToAdd.getProperty(name))
              .toMap.asJava
    
            newTopic.configs(configsMap)
            //调用adminClient创建Topic
            val createResult = adminClient.createTopics(Collections.singleton(newTopic),
              new CreateTopicsOptions().retryOnQuotaViolation(false))
            createResult.all().get()
            println(s"Created topic ${topic.name}.")
          } catch {
            case e : ExecutionException =>
              if (e.getCause == null)
                throw e
              if (!(e.getCause.isInstanceOf[TopicExistsException] && topic.ifTopicDoesntExist()))
                throw e.getCause
          }
        }
    

    2、创建一个客户端,此客户端通过队列多线程异步发送创建topic的请求

    KafkaAdminClient.java 中的createTopics方法

      @Override
        public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics,
                                               final CreateTopicsOptions options) {
            final Map<String, KafkaFutureImpl<TopicMetadataAndConfig>> topicFutures = new HashMap<>(newTopics.size());
            final CreatableTopicCollection topics = new CreatableTopicCollection();
            //遍历要创建的topic集合
            for (NewTopic newTopic : newTopics) {
                if (topicNameIsUnrepresentable(newTopic.name())) {
                    //topic名称不存在
                    KafkaFutureImpl<TopicMetadataAndConfig> future = new KafkaFutureImpl<>();
                    future.completeExceptionally(new InvalidTopicException("The given topic name '" +
                        newTopic.name() + "' cannot be represented in a request."));
                    topicFutures.put(newTopic.name(), future);
                } else if (!topicFutures.containsKey(newTopic.name())) {//防止发一次创建多个topic时有重复的
                    topicFutures.put(newTopic.name(), new KafkaFutureImpl<>());
                    topics.add(newTopic.convertToCreatableTopic());
                }
            }
            //如果topics不为null。则去创建
            if (!topics.isEmpty()) {
                final long now = time.milliseconds();
                final long deadline = calcDeadlineMs(now, options.timeoutMs());
                //初始化创建topic的调用,
                final Call call = getCreateTopicsCall(options, topicFutures, topics,
                    Collections.emptyMap(), now, deadline);
                //这里面才是调用,上面call只是初始化    
                runnable.call(call, now);
            }
            return new CreateTopicsResult(new HashMap<>(topicFutures));
        }
    

    (1)runnable.call(队列和多线程执行)

    为什么先讲解这个?而不是先getCreateTopicsCall?因为我觉得先看这个比较好理解,因为它不是单调执行的一步到位,比如先看getCreateTopicsCall会有点迷糊

     /**
             * Initiate a new call.
             *发起新呼叫
             * This will fail if the AdminClient is scheduled to shut down.
             *如果 AdminClient 计划关闭,这将失败
             * @param call      The new call object.
             * @param now       The current time in milliseconds.
             */
            void call(Call call, long now) {
                if (hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME) {
                    log.debug("The AdminClient is not accepting new calls. Timing out {}.", call);
                    call.handleTimeoutFailure(time.milliseconds(),
                        new TimeoutException("The AdminClient thread is not accepting new calls."));
                } else {
                    enqueue(call, now);
                }
            }
             /**
             * Queue a call for sending.
             *排队发送呼叫
             * If the AdminClient thread has exited, this will fail. Otherwise, it will succeed (even
             * if the AdminClient is shutting down). This function should called when retrying an
             * existing call.
             *如果 AdminClient 线程已退出,这将失败。否则,它将成功(即使 如果 AdminClient 正在关闭)。
             * 重试现有调用时应调用此函数
             * @param call      The new call object.
             * @param now       The current time in milliseconds.
             */
            void enqueue(Call call, long now) {
                if (call.tries > maxRetries) {
                    log.debug("Max retries {} for {} reached", maxRetries, call);
                    call.handleTimeoutFailure(time.milliseconds(), new TimeoutException(
                        "Exceeded maxRetries after " + call.tries + " tries."));
                    return;
                }
                if (log.isDebugEnabled()) {
                    log.debug("Queueing {} with a timeout {} ms from now.", call,
                        Math.min(requestTimeoutMs, call.deadlineMs - now));
                }
                boolean accepted = false;
                //把call放到一个newCalls队列中
                synchronized (this) {
                    if (!closing) {
                        newCalls.add(call);
                        accepted = true;
                    }
                }
                //唤醒线程去执行
                if (accepted) {
                    client.wakeup(); // wake the thread if it is in poll()如果线程处于轮询中,则唤醒线程
                } else {
                    log.debug("The AdminClient thread has exited. Timing out {}.", call);
                    call.handleTimeoutFailure(time.milliseconds(),
                        new TimeoutException("The AdminClient thread has exited."));
                }
            }
    

    client.wakeup()唤醒的线程执行下面的

      		@Override
            public void run() {
                log.debug("Thread starting");
                try {
                	//这里是处理请求
                    processRequests();
                } finally {
                    closing = true;
                   //省略
                    log.debug("Exiting AdminClientRunnable thread.");
                }
            }
       	 private void processRequests() {
                long now = time.milliseconds();
                while (true) {
                    // Copy newCalls into pendingCalls.
                    //将 newCalls 复制到 pendingCalls
                    drainNewCalls();
    
                    // Check if the AdminClient thread should shut down.
                    //检查 AdminClient 线程是否应该关闭
                    long curHardShutdownTimeMs = hardShutdownTimeMs.get();
                    if ((curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) && threadShouldExit(now, curHardShutdownTimeMs))
                        break;
    
                    // Handle timeouts.
                    //处理超时
                    TimeoutProcessor timeoutProcessor = timeoutProcessorFactory.create(now);
                    timeoutPendingCalls(timeoutProcessor);
                    timeoutCallsToSend(timeoutProcessor);
                    timeoutCallsInFlight(timeoutProcessor);
    
                    long pollTimeout = Math.min(1200000, timeoutProcessor.nextTimeoutMs());
                    if (curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) {
                        pollTimeout = Math.min(pollTimeout, curHardShutdownTimeMs - now);
                    }
    
                    // Choose nodes for our pending calls.为我们的待处理呼叫选择节点
                    pollTimeout = Math.min(pollTimeout, maybeDrainPendingCalls(now));
                    long metadataFetchDelayMs = metadataManager.metadataFetchDelayMs(now);
                    if (metadataFetchDelayMs == 0) {
                        metadataManager.transitionToUpdatePending(now);
                        Call metadataCall = makeMetadataCall(now);
                        // Create a new metadata fetch call and add it to the end of pendingCalls.
                        //创建一个新的元数据获取调用并将其添加到 pendingCalls 的末尾
                        // Assign a node for just the new call (we handled the other pending nodes above).
                        //为新调用分配一个节点(我们处理了上面的其他待处理节点)。
                        if (!maybeDrainPendingCall(metadataCall, now))
                            pendingCalls.add(metadataCall);
                    }
                    pollTimeout = Math.min(pollTimeout, sendEligibleCalls(now));
    
                    if (metadataFetchDelayMs > 0) {
                        pollTimeout = Math.min(pollTimeout, metadataFetchDelayMs);
                    }
    
                    // Ensure that we use a small poll timeout if there are pending calls which need to be sent
                    //如果有待发送的呼叫需要发送,请确保我们使用一个小的轮询超时
                    if (!pendingCalls.isEmpty())
                        pollTimeout = Math.min(pollTimeout, retryBackoffMs);
    
                    // Wait for network responses.
                    //等待网络响应
                    log.trace("Entering KafkaClient#poll(timeout={})", pollTimeout);
                    List<ClientResponse> responses = client.poll(Math.max(0L, pollTimeout), now);
                    log.trace("KafkaClient#poll retrieved {} response(s)", responses.size());
    
                    // unassign calls to disconnected nodes
                    //取消对断开节点的调用
                    unassignUnsentCalls(client::connectionFailed);
    
                    // Update the current time and handle the latest responses.
                    //更新当前时间并处理最新响应
                    now = time.milliseconds();
                    handleResponses(now, responses);
                }
            }
    

    sendEligibleCalls 这个方法是实际调用的call的方法

     /**
             * Send the calls which are ready.
             *发送准备好的电话
             * @param now                   The current time in milliseconds.
             * @return                      The minimum timeout we need for poll().
             */
            private long sendEligibleCalls(long now) {
                long pollTimeout = Long.MAX_VALUE;
                for (Iterator<Map.Entry<Node, List<Call>>> iter = callsToSend.entrySet().iterator(); iter.hasNext(); ) {
                    Map.Entry<Node, List<Call>> entry = iter.next();
                    List<Call> calls = entry.getValue();
                    if (calls.isEmpty()) {
                        iter.remove();
                        continue;
                    }
                    //省略。。。
                    while (!calls.isEmpty()) {
                        Call call = calls.remove(0);
                        int timeoutMs = Math.min(remainingRequestTime,
                            calcTimeoutMsRemainingAsInt(now, call.deadlineMs));
                        AbstractRequest.Builder<?> requestBuilder;
                        try {
                           //获得call中的requestBuilder
                            requestBuilder = call.createRequest(timeoutMs);
                        } catch (Throwable t) {
                            call.fail(now, new KafkaException(String.format(
                                "Internal error sending %s to %s.", call.callName, node), t));
                            continue;
                        }
                        ClientRequest clientRequest = client.newClientRequest(node.idString(),
                            requestBuilder, now, true, timeoutMs, null);
                        log.debug("Sending {} to {}. correlationId={}, timeoutMs={}",
                            requestBuilder, node, clientRequest.correlationId(), timeoutMs);
                        //实际调用请求    
                        client.send(clientRequest, now);
                        callsInFlight.put(node.idString(), call);
                        correlationIdToCalls.put(clientRequest.correlationId(), call);
                        break;
                    }
                }
                return pollTimeout;
            }
    

    这里需要多注意一下requestBuilder = call.createRequest(timeoutMs); 这一行,下面getCreateTopicsCall才是requestBuilder 的初始化

    (2)getCreateTopicsCall(创建发送创建topic的requestBuilder)

    看完上面的runnable.call,下面接着看getCreateTopicsCall如何生成Call 的。

     private Call getCreateTopicsCall(final CreateTopicsOptions options,
                                         final Map<String, KafkaFutureImpl<TopicMetadataAndConfig>> futures,
                                         final CreatableTopicCollection topics,
                                         final Map<String, ThrottlingQuotaExceededException> quotaExceededExceptions,
                                         final long now,
                                         final long deadline) {
            return new Call("createTopics", deadline, new ControllerNodeProvider()) {
                @Override
                public CreateTopicsRequest.Builder createRequest(int timeoutMs) {
                    return new CreateTopicsRequest.Builder(
                        new CreateTopicsRequestData()
                            .setTopics(topics)
                            .setTimeoutMs(timeoutMs)
                            .setValidateOnly(options.shouldValidateOnly()));
                }
    
                @Override
                public void handleResponse(AbstractResponse abstractResponse) {
                  //省略..
                }
    
                private ConfigEntry configEntry(CreatableTopicConfigs config) {
                    return new ConfigEntry(
                        config.name(),
                        config.value(),
                        configSource(DescribeConfigsResponse.ConfigSource.forId(config.configSource())),
                        config.isSensitive(),
                        config.readOnly(),
                        Collections.emptyList(),
                        null,
                        null);
                }
    
                @Override
                void handleFailure(Throwable throwable) {
                    // If there were any topics retries due to a quota exceeded exception, we propagate
                    // the initial error back to the caller if the request timed out.
                    maybeCompleteQuotaExceededException(options.shouldRetryOnQuotaViolation(),
                        throwable, futures, quotaExceededExceptions, (int) (time.milliseconds() - now));
                    // Fail all the other remaining futures
                    completeAllExceptionally(futures.values(), throwable);
                }
            };
        }
    

    其中new ControllerNodeProvider() 返回的是controller列表,这样的话相当于服务端是用controller接收的,

     /**
         * Provides the controller node.
         * 提供控制器节点
         */
        private class ControllerNodeProvider implements NodeProvider {
            @Override
            public Node provide() {
                if (metadataManager.isReady() &&
                        (metadataManager.controller() != null)) {
                    return metadataManager.controller();
                }
                metadataManager.requestUpdate();
                return null;
            }
        }
    

    3、服务端创建topic的请求(handleCreateTopicsRequest)

    (1)这里先看一下kafka集群启动时的操作

    为什么要加这一步?
    主要是因为从kafka2.8开始,除了zk我们又有新的选择,用kraft来做zk的工作,并被称为革命性的,但是旧的zk其实没有被废弃,只是提供了新的选择

    可以去看我另一篇文章:kafka 2.8 如何选择启用kraft还是ZooKeeper(选择逻辑源码,不涉及到kraft的实现)

    (2)在初始化KafkaApis时如何选择是zk的还是raft的

    在启动kafka时,会调用startup做初始化
    后面只演示KafkaRaftServer

    def startup(): Unit = {
    	//省略
    	 // Create the request processor objects.
          //创建请求处理器对象,这里需要特别注意raftSupport 和在new KafkaApis中参数的位置。
          val raftSupport = RaftSupport(forwardingManager, metadataCache)
          dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, raftSupport,
            replicaManager, groupCoordinator, transactionCoordinator, autoTopicCreationManager,
            config.nodeId, config, metadataCache, metadataCache, metrics, authorizer, quotaManagers,
            fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager)
    
          dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
            socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
            config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent",
            SocketServer.DataPlaneThreadPrefix)
            //省略
     }       
    

    这里看一下KafkaApis 的构造代码,可以认为服务端是controller的列表,在KafkaApis.scala 文件中

    
    /**
     * Logic to handle the various Kafka requests
     */
    class KafkaApis(val requestChannel: RequestChannel,
                    val metadataSupport: MetadataSupport,
                    val replicaManager: ReplicaManager,
                    val groupCoordinator: GroupCoordinator,
                    val txnCoordinator: TransactionCoordinator,
                    val autoTopicCreationManager: AutoTopicCreationManager,
                    val brokerId: Int,
                    val config: KafkaConfig,
                    val configRepository: ConfigRepository,
                    val metadataCache: MetadataCache,
                    val metrics: Metrics,
                    val authorizer: Option[Authorizer],
                    val quotas: QuotaManagers,
                    val fetchManager: FetchManager,
                    brokerTopicStats: BrokerTopicStats,
                    val clusterId: String,
                    time: Time,
                    val tokenManager: DelegationTokenManager,
                    val apiVersionManager: ApiVersionManager) extends ApiRequestHandler with Logging 
    

    其中第二个位置MetadataSupport,在startup中是raftSuppert,所以后面的源码如果出现MetadataSupport调用如果获得的字段是带zk,不要认为就是zk相关的,其实是raft

    创建的客户端发送的创建topic请求是由handleCreateTopicsRequest接收处理,

     /**
       * Top-level method that handles all requests and multiplexes to the right api
       * 处理所有请求并多路复用到正确 api 的顶级方法
       */
      override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
        try {
        	//省略。。。
          request.header.apiKey match {
          //省略。。
           case ApiKeys.CREATE_TOPICS => maybeForwardToController(request, handleCreateTopicsRequest)
           //省略。。
          }
        } catch {
         //省略
        } finally {
          //省略
        }
      }
    

    maybeForwardToController 这个就不多做解释,直接看handleCreateTopicsRequest

    def handleCreateTopicsRequest(request: RequestChannel.Request): Unit = {
        //虽然字段名是zkSupport 但实际上是raftSupport,原因看3(1)
        val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
    	  //省略
        val createTopicsRequest = request.body[CreateTopicsRequest]
        val results = new CreatableTopicResultCollection(createTopicsRequest.data.topics.size)
        //如果当前Broker不是属于Controller的话,就抛出异常
        if (!zkSupport.controller.isActive) {
          createTopicsRequest.data.topics.forEach { topic =>
            results.add(new CreatableTopicResult().setName(topic.name)
              .setErrorCode(Errors.NOT_CONTROLLER.code))
          }
          sendResponseCallback(results)
        } else {	  
        //省略
          zkSupport.adminManager.createTopics(
            createTopicsRequest.data.timeoutMs,
            createTopicsRequest.data.validateOnly,
            toCreate,
            authorizedForDescribeConfigs,
            controllerMutationQuota,
            handleCreateTopicsResults)
     }
    

    zkSupport.adminManager.createTopics这里面是实际的调用,

    (3)、这里卡住了,

    1、zkSupport.adminManager.createTopics为什么走的是ZkAdminManager中的createTopics方法,不应该有个RaftAdminManager的吗?
    2、val zkSupport = metadataSupport.requireZkOrThrow 的实现

    
    case class ZkSupport(adminManager: ZkAdminManager,
                         controller: KafkaController,
                         zkClient: KafkaZkClient,
                         forwardingManager: Option[ForwardingManager],
                         metadataCache: ZkMetadataCache) extends MetadataSupport {
      val adminZkClient = new AdminZkClient(zkClient)
    
      override def requireZkOrThrow(createException: => Exception): ZkSupport = this
      override def requireRaftOrThrow(createException: => Exception): RaftSupport = throw createException
    
      override def ensureConsistentWith(config: KafkaConfig): Unit = {
        if (!config.requiresZookeeper) {
          throw new IllegalStateException("Config specifies Raft but metadata support instance is for ZooKeeper")
        }
      }
    
      override def maybeForward(request: RequestChannel.Request,
                                handler: RequestChannel.Request => Unit,
                                responseCallback: Option[AbstractResponse] => Unit): Unit = {
        forwardingManager match {
          case Some(mgr) if !request.isForwarded && !controller.isActive => mgr.forwardRequest(request, responseCallback)
          case _ => handler(request)
        }
      }
    
      override def controllerId: Option[Int] =  metadataCache.getControllerId
    }
    
    case class RaftSupport(fwdMgr: ForwardingManager, metadataCache: KRaftMetadataCache)
        extends MetadataSupport {
      override val forwardingManager: Option[ForwardingManager] = Some(fwdMgr)
      override def requireZkOrThrow(createException: => Exception): ZkSupport = throw createException
      override def requireRaftOrThrow(createException: => Exception): RaftSupport = this
    
      override def ensureConsistentWith(config: KafkaConfig): Unit = {
        if (config.requiresZookeeper) {
          throw new IllegalStateException("Config specifies ZooKeeper but metadata support instance is for Raft")
        }
      }
    
      override def maybeForward(request: RequestChannel.Request,
                                handler: RequestChannel.Request => Unit,
                                responseCallback: Option[AbstractResponse] => Unit): Unit = {
        if (!request.isForwarded) {
          fwdMgr.forwardRequest(request, responseCallback)
        } else {
          handler(request) // will reject
        }
      }
    

    如果是zk模式则是ZkSupport下的requireZkOrThrow 还好理解,如果是raft则是RaftSupport的requireZkOrThrow,那override def requireZkOrThrow(createException: => Exception): ZkSupport = throw createException 返回给handleCreateTopicsRequestzkSupport 还用继续走下面的zkSupport.adminManager.createTopics 吗?

    有知道的给个解释吧?或者以后再看看

    展开全文
  • kafka如何创建topic

    千次阅读 2021-05-29 17:01:20
    Kafka创建topic命令很简单,一条命令足矣:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test  这条命令会创建一个名为test的topic,有3个分区,每...

     

    Kafka创建topic命令很简单,一条命令足矣:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test    

      这条命令会创建一个名为test的topic,有3个分区,每个分区需分配3个副本。那么在这条命令之后Kafka又做了什么事情呢?本文将对此进行一下梳理,完整地阐述Kafka topic是如何创建的。

      topic创建主要分为两个部分:命令行部分+后台(controller)逻辑部分,如下图所示。主要的思想就是后台逻辑会监听zookeeper下对应的目录节点,一旦发起topic创建命令,该命令会创建新的数据节点从而触发后台的创建逻辑。

      简单来说我们发起的命令行主要做两件事情:1. 确定分区副本的分配方案(就是每个分区的副本都分配到哪些broker上);2. 创建zookeeper节点,把这个方案写入/brokers/topics/<topic>节点下

    Kafka controller部分主要做下面这些事情:1. 创建分区;2. 创建副本;3. 为每个分区选举leader、ISR;4.更新各种缓存

    下面我们详细说说其中的逻辑。在开始之前,我们假设本例中要创建的topic名字是test,有3个分区,副本因子(replication-factor)也是3。注意:本文只涉及主要的逻辑,一些非默认行为不在此次讨论之中。

    命令行部分

      我们发起topic创建命令之后,Kafka会做一些基本的校验,比如是否同时指定了分区数、副本因子或是topic名字中是否含有非法字符等。值得一提的是,0.10版本支持指定broker的机架信息,类似于Hadoop那样,可以更好地利用局部性原理减少集群中网络开销。如果指定了机架信息(broker.rack), Kafka在为分区做副本分配时就会考虑这部分信息,尽可能地为副本挑选不同机架的broker。当然本例中我们暂时不考虑机架信息对于创建topic的影响。

      做完基本的校验之后,Kafka会从zookeeper的/brokers/ids下获取集群当前存活broker列表然后开始执行副本的分配工作。首先,分区副本的分配有以下3个目标:

    • 尽可能地在各个broker之间均匀地分配副本
    • 如果分区的某个副本被分配到了一个broker,那么要尽可能地让该分区的其他副本均匀地分配到其他broker上
    • 如果所有broker都指定了机架信息,那么尽可能地让每个分区的副本都分配到不同的机架上

    第3个目标目前对于我们没什么用,那前两点是如何做到的?如果直接看源码可能有些晦涩难懂,概括起来就一句话:随机挑选一个broker采用轮询的方式分配每个分区的第一个副本,然后采用增量右移的方式分配其他的副本。好像还是比较难理解,是吧? 那我举个例子吧:假设你有10个分区p0, p1, p2, ..., p9,每个分区的副本因子都是3,即总共30个副本,要分配在5个broker(b0, b1, b2, b3, b4)上,采用上面的策略就是这样的:

    1 Kafka会从5个broker中随机选一个broker,假设它选了b0

    2 它会依次采用轮询的方式为所有分区分配第一个副本,如下表所示。即从b0开始依次顺序分配broker给10个分区的第一个副本。

    3 目前Kafka已经分配了10个副本,剩下的20个副本Kafka会采用增量右移的方式,比如如果前两行是1,2,3,4,5(第一行) 6,7,8,9,10(第二行),那么第3行右移1位,变成5,1,2,3,4,第4行右移2位,变成:9, 10, 6, 7, 8,以此类推。那么采用这种方式分配的副本方案如下表所示:

    当然,如果考虑机架信息,分配算法会有所调整,但基本上也是满足上面那3个目标的。

    对于本文中使用的例子,我们假设分配方案如下:(格式是分区号 -> 副本所在broker Id集合)

    0 -> [0,1,2]

    1 -> [1,0,2]

    2 -> [2,0,1]

      确定了分区副本分配方案之后,Kafka会把这个分配方案持久化到zookeeper的/brokers/topics/<topic>节点下,类似于这样的信息:{"version":1,"partitions":{"0":[0,1,2],"1":[1,0,2],"2":[2,0,1]}}

    okay,至此命令行部分的工作就算完成了,此时你应该可以看到Kafka会返回Created topic "test"给你,表明topic创建成功。但是,千万不要以为Kafka创建topic的工作就完成了,后面还有很多事情要做,即controller要登场了。

     

    后台逻辑部分

      所谓的后台逻辑其实是由Kafka的controller负责提供的。Kafka的controller内部保存了很多信息,其中有一个分区状态机,用于记录topic各个分区的状态。这个状态机内部注册了一些zookeeper监听器。Controller在启动的时候会创建这些监听器。其中一个监听器(TopicChangeListener)就是用于监听zookeeper的/brokers/topics目录的子节点变化的。一旦该目录子节点数发生变化就会调用这个监听器的处理方法。对于上面的例子来说,由于命令行已将分配方案持久化到/brokers/topics/test下,所以会触发该监听器的处理方法。

      TopicChangeListener监听器一方面会更新controller的缓存信息(比如更新集群当前所有的topic列表以及更新新增topic的分区副本分配方案缓存等),另一方面就是创建对应的分区及其副本对象并为每个分区确定leader副本及ISR。

      至此,整个topic的创建就完成了!

    ====================================================================================================================

      显然,刚才关于后台controller逻辑部分几乎就是一笔带过了,没有详细展开。毕竟如果直接讲代码会比较枯燥。一般情况下,我们了解到此程度就可以了。下面将针对代码详细分析下controller是如何创建topic的。

      上边提到过,controller内部定义了很多数据结构用于记录当前集群的各种状态。在Controller中还分别定义了一个分区状态机(PartitionStateMachine)和副本状态机(ReplicaStateMachine),分别记录各个分区的状态和状态流转,如下面两张图所示:

    咋一看,这两张图似乎差不多,但一个是分区状态流转,一个是副本状态流转。不管是分区还是副本,只有处于Online状态的才能正常工作。当然在设置这个状态之前必须要先完成一些工作。下面详细说说:

    1 首先,分区状态机的registerPartitionChangeListener方法会注册一个zookeeper监听器,监听到/brokers/topics下新增了test节点之后,立即处理TopicChangeListener的handleChildChange方法

    2 handleChildChange方法的具体逻辑是:

      2.1 结合controller缓存的topic列表和/brokers/topics目录下的topic列表,找出新增的topic:test。假设controller topic列表是A,/brokers/topics下列表是B,新增topic列表可由A - B求得

      2.2 使用类似的方法,确定已经被删除的topic集合,即B - A

      2.3 更新controller缓存的topic列表(把test加进去,把那些已经被删除的topic从缓存中踢出去)

      2.4 从/brokers/topics/test节点中取出这个topic所有分区的副本分配方案,然后去更新controller对应的这部分信息(其实也是把test的方案加入到缓存中,另外也会把已删除的topic对应的方案也踢出去)

      2.5 调用onNewTopicCreation开始创建topic

    3 onNewTopicCreation:创建topic的回调方法,实现真正的创建topic的逻辑:

      3.1 注册分区变更监听器——之前说过了分区状态机会注册一些zookeeper监听器,刚刚提到的TopicChangeListener只是其中之一,而这里的监听器是监听topic的分区变化的。该监听器就是PartitionModificationListener类,顾名思义,它负责监听topic下分区的变化情况,具体来说就是监听/brokers/topics/topic节点的数据,一旦发生变化该监听器就会被触发。当然对于创建topic而言,这一步仅仅是注册而已并不会被触发,因为在注册这个监听器之前Kafka已经把数据写入这个节点了。所以此时该监听器不会触发操作,这是为以后修改topic时候使用的。 既然本次不会触发监听器,代码里面就手动调用onNewPartitionCreation来创建分区了

      3.2 调用onNewPartitionCreation方法创建分区

    4 onNewPartitionCreation: 这个方法的目的就是创建topic的所有分区对象,主要涉及4个步骤:

      4.1 创建分区对象,并设置成NewPartition状态:既然叫分区状态机,必然有个地方要保存Kafka集群下所有topic的所有分区的状态。每当有新topic创建时,就需要把新增topic所有分区加入这部分缓存,以达到同步的效果。新增的分区状态统一设置成NewPartition

      4.2 为每个分区创建对应的副本对象:Kafka首先从controller缓存中找出这个分区对应的分配方案(还记得吧,controller有个地方保存了所有topic的分区副本分配方案,就是从这里找),然后把这个分区下的所有副本都设置成NewReplica状态——具体来说Kafka是怎么做的呢?首先,它会尝试去获取zookeeper中/brokers/topics/test/partitions/<partitionId>/state节点的数据,该节点保存了每个分区的leader副本和ISR信息。不过对于创建topic来说,目前这个topic的所有分区都没有leader和ISR信息,所以该节点应该还不存在,应该是空——这是正常的,因为后面会开始选举!所以这里Kafka仅仅是更新副本状态机的状态缓存就可以了(忘了说了,既然分区状态机有个缓存保存集群中所有分区的状态,那么副本状态机自然也有类似的缓存来保存集群中所有topic下所有分区的副本的状态,所以此时还需要更新这部分缓存)

      4.3 前2步创建了分区对象和副本对象,并分别设置成了NewPartition和NewReplica状态。那么这一步就要把分区状态转换到OnlinePartition,只有处于此状态才可以正常使用。这也是这一步需要做的事情:leader选举! 代码写的很冗长,但简单来说就是选取副本集合中的第一个副本作为leader副本,并把整个副本集合作为ISR。举例来说,对于test的分区0,它的副本集合是0,1,2,那么分区0的leader副本就是0,ISR就是[0,1,2]。之后Kafka会把这些信息连同controller的epoch和leader的epoch(多说一句,controller epoch值表示controller被易主的次数,leader epoch也是同理)一同写入zookeeper的/brokers/topics/test/partitions/0/state节点下,之后更新controller的leader缓存。(再多说一句,controller有个地方记录了topic所有分区的leader和ISR信息)。 okay,现在新增topic的所有分区都选好了leader和ISR,那么就需要让集群中其他broker知晓—— 因此需要发送UpdateMetadataRequest给当前所有broker——具体的发送方法其实就是将分区的leader和ISR信息打包封装进一个map然后为map中的每一项都构造一个UpdateMetadataReuqest对象并通过controller的sendRequest方法发给所有存活着的broker(为什么要发送给所有broker?因为LeaderAndIsr请求是唯一一个所有broker都能立即响应而不需要求助于leader broker的请求!) 具体的发送逻辑由于涉及了Kafka底层网络协议及KafkaApi机制,等以后有机会再详谈吧。。。

      4.4 设置副本对象为OnlineReplica:目前所有的分区都已经选好了leader和ISR并已经持久化到zookeeper中,当然还都传播到了其他broker上。那么这最后一步就是将副本状态机中缓存的副本状态从NewReplica转换到OnlineReplica

      okay,至此一个topic就完整地创建出来了~~

    展开全文
  • 如果你想开多条消费者线程去消费这些topic,添加@KafkaListener注解的参数concurrency的值为自己想要的消费者个数即可(注意,消费者数要小于等于你开的所有topic的分区数总和) 运行程序,console打印的效果如下: ...
  • rabbitmq主要有三大类交换机:faout,direct,topic,他们从名字上分别是扇区交换机、直连交换机和主题交换机。其实还有headers一类的交换机,这里不去深究。 faout交换机也叫无路由交换机,就是它直接与交换机...
  • topic创建详解

    千次阅读 2020-03-24 12:38:56
    1、自动创建 如果kafka broker中的config/server....那么当生产者向一个尚未创建的topic发送消息时,会自动创建一个num.partitions(默认值为1)个分区和default.replication.factor(默认值为1)个副本的对应to...
  • RocketMq Topic创建和删除

    千次阅读 2020-12-24 13:25:09
    系列开篇这个系列主要用以分析mqadmin常见的比较核心的几个命令,主要包括订阅分组和topic的创建和删除、Topic的权限变更。这篇文章主要是用来分析Topic的创建和删除。创建TopicTopic创建的核心步骤如下1、mqadmin向...
  • KafkaController创建topic流程解析

    千次阅读 2022-03-18 23:40:35
    一、kafka-topic.sh 为了便于操作Kafka集群,Kafka源码包中提供了多个shell脚本,其中kafka-topic.sh提供了Topic的创建、修改、列举、描述、删除功能,内部通过TopicCommand来实现。其脚本内容如下: //kafka-run...
  • 本文主要介绍RocketMQ中Topic、Tag、GroupName的概念、设计初衷以及使用方法。 一.Topic 首先看看官方的定义: Topic是生产者在发送消息和消费者在拉取消息的类别。Topic与生产者和消费者之间的关系非常松散。具体...
  • Kafka:增加Topic的分区数

    千次阅读 2022-02-12 14:40:46
    // 创建TopicTopic名称为new-topic-user,分区数为1,复制因子为1 admin.createTopic("new-topic-user", 1, (short) 1); // 获取指定Topic的描述 admin.describeTopic("new-topic-user"); // 增加指定Topic的分区...
  • 命令行操作 2.1 查看所有topic 2.2 创建topic 2.3 删除topic 2.4 查看某个Topic的详情 2.5 修改分区数 2.6 发送消息 2.7 消费消息 2.8 查看消费者组 2.9 更新消费者的偏移位置 前言 kafka官网 1. 基础概念 Broker ...
  • 使用RocketMQ进行发消息时,一般我们是必须要指定topic,此外topic必须要提前建立,但是topic的创建(自动或者手动方式)的设置有一个开关autoCreateTopicEnable,此部分主要会在broker节点的配置文件的时候进行设置...
  • 我们知道RocketMQ有一个配置autoCreateTopicEnable,开发环境一般我们会设置为true,允许创建topic,而生产上为了统一管理topic, 则会设置成false,在管理界面由管理员统一创建即可 那么, rocketMQ是如何根据这个配置...
  • Kafka的Topic配置详解

    千次阅读 2021-01-12 17:40:22
    一、Kafka中topic级别配置配置topic级别参数时,相同(参数)属性topic级别会覆盖全局的,否则默认为全局配置属性值。创建topic参数可以设置一个或多个--config "Property(属性)",下面是创建一个topic名称为"my-topic...
  • kafka topic管理

    千次阅读 2022-03-16 21:38:35
    简记 kafka topic管理常用命令
  • kafka查看topic列表和topic消息

    千次阅读 2021-09-28 10:19:15
    查询topic 列表信息 前提是需要进入到kafka的目录 Linux 目录 \kafka_2.12-2.8.0\bin\ sh kafka-topics.sh --list --zookeeper localhost:2181 windows 目录 \kafka_2.12-2.8.0\bin\windows kafka-topics.bat --...
  • kafka删除topic,彻底删除topic

    千次阅读 2021-07-13 19:01:09
    当启动kafka的服务出现Map failed时,需要把有死循环的topic删除 1、删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录 2、Kafka 删除topic的命令是: bin/kafka-...
  • 1、看到业务团队这个表述,让我理解为线上集群存在问题,业务方topic在console创建成功了,但是发送消息时失败了,为了确保线上集群没问题,我先创建了一个测试的topic,写一个Java客户端进行发送消息,发现一切正常...
  • 这个问题是最近一个朋友问我的,用sparkstreaming消费kafka的多个topic,怎么获取topic的信息,然后根据不同topic的数据做不同的逻辑处理.其实这个问题非常简单,...
  • 2.使用topic交换机实现消息的发送和接收 2.1 编写消息接收类(有三个) 2.2 编写消息发送类 1.写在前面 所有 MQ 产品从模型抽象上来说都是一样的过程: 消费者(consumer)订阅某个队列。生产者(producer)...
  • Kafka Topic xxx not present in metadata bug问题排查 异常堆栈 17343C60D7EF4C23A2EE4E973F22AAAD] ERROR [] o.s.k.s.LoggingProducerListener error:254 - Exception thrown when sending a message with key='...
  • ROS基础(一):ROS通讯之话题(topic)通讯

    千次阅读 多人点赞 2021-01-08 21:42:58
    目录 第一章:ROS通讯之话题(topic)通讯 一、topic通讯之基础篇 1. Node Master大管家 2. Node节点 3. message 与topic 4. 小结 5. 实例 二、topic通讯之进阶篇 1. 创建learn_ topic功能包 2. 自定义message 3. 创建...
  • /data/service/kafka/bin/kafka-topics.sh --delete --topic history-debezium-uds-dd_sdi_ods_uds_uds_stg_1h --zookeeper p-qcbj6-dd-hadoop-common-zk-001:2181/kafka Topic history-debezium-uds-dd_sdi_ods_uds...
  • kafka创建topic、添加配置等相关操作

    千次阅读 2022-06-28 09:56:15
    创建topic、kafka命令行消费者、kafka命令行生产者、删除topic、kafka单个topic增加配置。 kafka-console-consumer、kafka-console-producer、kafka-topics、kafka-configs的使用
  • 这种方式使用的是链接zk来创建topic。在所有使用的zk的版本中都是可以用的。 需要用的依赖: <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</...
  • 这个系列的博客主要用来介绍ROS中的两种通讯方式:topic和service,分为上下两篇。先以经典的小海龟为例,然后再自己动手写一个相关的实现,主要参考古月居大佬的视频教程。 topic publisher的编程实现 上图就是一...
  • kafka中的消息(topic)和日志(log)

    千次阅读 2022-03-26 18:18:58
    可以理解Topic是一个类别的名称,同类消息发送到同一个Topic下面。对于每一个Topic,下面可以有多个分区(Partition)日志文件: Partition是一个有序的message序列(消息队列),这些message按顺序添加到一个叫做...
  • RocketMQ-Topic创建

    千次阅读 2021-12-18 12:36:12
    这里创建Topic分为集群模式,和Broker模式,二者区别如下 集群模式:该模式下创建的Topic是指当前集群下,所有的Broker中的Queue数量是相同的 Broker模式:该模式下创建的Topic在集群中,每隔Broker中的Queue数量...
  • 深度解析RocketMQ Topic的创建机制

    万次阅读 2019-03-31 09:05:00
    我还记得第一次使用rocketmq的时候,需要去控制台预先创建topic,我当时就想为什么要这么设计,于是我决定撸一波源码,带大家从根源上吃透rocketmq topic...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 454,179
精华内容 181,671
关键字:

topic

友情链接: 学习RxJS操作符.zip