-
2022-03-14 15:25:03
本文依然是以kafka0.8.2.2为例讲解
一,如何删除一个topic
删除一个topic有两个关键点:
1,配置删除参数
delete.topic.enable这个Broker参数配置为True。
2,执行
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name
假如不配置删除参数为true的话,topic其实并没有被清除,只是被标记为删除。此时,估计一般人的做法是删除topic在Zookeeper的信息和日志,其实这个操作并不会清除kafkaBroker内存的topic数据。所以,此时最佳的策略是配置删除参数为true然后,重启kafka。
二,重要的类介绍
1,PartitionStateMachine
该类代表分区的状态机。决定者分区的当前状态,和状态转移。四种状态
NonExistentPartition
NewPartition
OnlinePartition
OfflinePartition
2,ReplicaManager
负责管理当前机器的所有副本,处理读写、删除等具体动作。
读写:写获取partition对象,再获取Replica对象,再获取Log对象,采用其管理的Segment对象将数据写入、读出。
3,ReplicaStateMachine
副本的状态机。决定者副本的当前状态和状态之间的转移。一个副本总共可以处于一下几种状态的一种
NewReplica:Crontroller在分区重分配的时候可以创建一个新的副本。只能接受变为follower的请求。前状态可以是NonExistentReplica
OnlineReplica:新启动的分区,能接受变为leader或者follower请求。前状态可以是NewReplica, OnlineReplica or OfflineReplica
OfflineReplica:死亡的副本处于这种状态。前状态可以是NewReplica, OnlineReplica
ReplicaDeletionStarted:分本删除开始的时候处于这种状态,前状态是OfflineReplica
ReplicaDeletionSuccessful:副本删除成功。前状态是ReplicaDeletionStarted
ReplicaDeletionIneligible:删除失败的时候处于这种状态。前状态是ReplicaDeletionStarted
NonExistentReplica:副本成功删除之后处于这种状态,前状态是ReplicaDeletionSuccessful
4,TopicDeletionManager
该类管理着topic删除的状态机
1),TopicCommand通过创建/admin/delete_topics/,来发布topic删除命令。
2),Controller监听/admin/delete_topic子节点变动,开始分别删除topic。想学习交流HashMap,nginx、dubbo、Spring MVC,分布式、高性能高可用、MySQL,redis、jvm、多线程、netty、kafka、的加尉xin(同英):1253431195 扩列获取资料学习,无工作经验不要加哦!
3),Controller有个后台线程负责删除Topic
三,源码彻底解析topic的删除过程
此处会分四个部分:
A),客户端执行删除命令作用
B),不配置delete.topic.enable整个流水的源码
C),配置了delete.topic.enable整个流水的源码
D),手动删除zk上topic信息和磁盘数据
1,客户端执行删除命令
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name
进入kafka-topics.sh我们会看到
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand $@
进入TopicCommand里面,main方法里面
else if(opts.options.has(opts.deleteOpt))
deleteTopic(zkClient, opts)实际内容是
val topics = getTopics(zkClient, opts)
if (topics.length == 0) {
println(“Topic %s does not exist”.format(opts.options.valueOf(opts.topicOpt)))
}
topics.foreach { topic =>
try {
ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic))
在"/admin/delete_topics"目录下创建了一个topicName的节点。2,假如不配置delete.topic.enable整个流水是
总共有两处listener会响应:
A),TopicChangeListener
B),DeleteTopicsListener
使用topic的删除命令删除一个topic的话,指挥触发DeleteTopicListener。
var topicsToBeDeleted = {
import JavaConversions._
(children: Buffer[String]).toSet
}
val nonExistentTopics = topicsToBeDeleted.filter(t => !controllerContext.allTopics.contains(t))
topicsToBeDeleted --= nonExistentTopics
if(topicsToBeDeleted.size > 0) {
info(“Starting topic deletion for topics " + topicsToBeDeleted.mkString(”,"))
// mark topic ineligible for deletion if other state changes are in progress
topicsToBeDeleted.foreach { topic =>
val preferredReplicaElectionInProgress =
controllerContext.partitionsUndergoingPreferredReplicaElection.map(.topic).contains(topic)
val partitionReassignmentInProgress =
controllerContext.partitionsBeingReassigned.keySet.map(.topic).contains(topic)
if(preferredReplicaElectionInProgress || partitionReassignmentInProgress)
controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
}
// add topic to deletion list
controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
}
由于都会判断delete.topic.enable是否为true,假如不为true就不会执行,为true就进入执行controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
3,delete.topic.enable配置为true
此处与步骤2的区别,就是那两个处理函数。
controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
markTopicIneligibleForDeletion函数的处理为
if(isDeleteTopicEnabled) {
val newTopicsToHaltDeletion = topicsToBeDeleted & topics
topicsIneligibleForDeletion ++= newTopicsToHaltDeletion
if(newTopicsToHaltDeletion.size > 0)
info(“Halted deletion of topics %s”.format(newTopicsToHaltDeletion.mkString(",")))
}
主要是停止删除topic,假如存储以下三种情况- Halt delete topic if -
-
- replicas being down
-
- partition reassignment in progress for some partitions of the topic
-
- preferred replica election in progress for some partitions of the topic
enqueueTopicsForDeletion主要作用是更新删除topic的集合,并激活TopicDeleteThread
def enqueueTopicsForDeletion(topics: Set[String]) {
if(isDeleteTopicEnabled) {
topicsToBeDeleted ++= topics
partitionsToBeDeleted ++= topics.flatMap(controllerContext.partitionsForTopic)
resumeTopicDeletionThread()
}
}
在删除线程DeleteTopicsThread的doWork方法中topicsQueuedForDeletion.foreach { topic =>
// if all replicas are marked as deleted successfully, then topic deletion is done
if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {
// clear up all state for this topic from controller cache and zookeeper
completeDeleteTopic(topic)
info(“Deletion of topic %s successfully completed”.format(topic))
}
进入completeDeleteTopic方法中// deregister partition change listener on the deleted topic. This is to prevent the partition change listener
// firing before the new topic listener when a deleted topic gets auto created
partitionStateMachine.deregisterPartitionChangeListener(topic)
val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic,ReplicaDeletionSuccessful)
// controller will remove this replica from the state machine as well as its partition assignment cache
replicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica)
val partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic)
// move respective partition to OfflinePartition and NonExistentPartition state
partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition)
partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition)
topicsToBeDeleted -= topic
partitionsToBeDeleted.retain(_.topic != topic)
controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicConfigPath(topic))
controllerContext.zkClient.delete(ZkUtils.getDeleteTopicPath(topic))
controllerContext.removeTopic(topic)
主要作用是解除掉监控分区变动的listener,删除Zookeeper具体节点信息,删除磁盘数据,更新内存数据结构,比如从副本状态机里面移除分区的具体信息。其实,最终要的是我们的副本磁盘数据是如何删除的。我们重点介绍这个部分。
首次清除的话,在删除线程DeleteTopicsThread的doWork方法中
{
// if you come here, then no replica is in TopicDeletionStarted and all replicas are not in
// TopicDeletionSuccessful. That means, that either given topic haven’t initiated deletion
// or there is at least one failed replica (which means topic deletion should be retried).
if(controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
// mark topic for deletion retry
markTopicForDeletionRetry(topic)
}进入markTopicForDeletionRetry
val failedReplicas = controller.replicaStateMachine.replicasInState(topic,ReplicaDeletionIneligible)
info(“Retrying delete topic for topic %s since replicas %s were not successfully deleted”
.format(topic, failedReplicas.mkString(",")))
controller.replicaStateMachine.handleStateChanges(failedReplicas, OfflineReplica)
在ReplicaStateMachine的handleStateChanges方法中,调用了handleStateChange,处理OfflineReplica
// send stop replica command to the replica so that it stops fetching from the leader
brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition,deletePartition = false)
接着在handleStateChanges中
brokerRequestBatch.sendRequestsToBrokers(controller.epoch,controllerContext.correlationId.getAndIncrement)
给副本数据存储节点发送StopReplicaKey副本指令,并开始删除数据
stopReplicaRequestMap foreach { case(broker, replicaInfoList) =>
val stopReplicaWithDelete = replicaInfoList.filter(p => p.deletePartition == true).map(i => i.replica).toSet
val stopReplicaWithoutDelete = replicaInfoList.filter(p => p.deletePartition == false).map(i => i.replica).toSet
debug(“The stop replica request (delete = true) sent to broker %d is %s”
.format(broker, stopReplicaWithDelete.mkString(",")))
debug(“The stop replica request (delete = false) sent to broker %d is %s”
.format(broker, stopReplicaWithoutDelete.mkString(",")))
replicaInfoList.foreach { r =>
val stopReplicaRequest = new StopReplicaRequest(r.deletePartition,
Set(TopicAndPartition(r.replica.topic, r.replica.partition)), controllerId, controllerEpoch,correlationId)
controller.sendRequest(broker, stopReplicaRequest, r.callback)
}
}
stopReplicaRequestMap.clear()
Broker的KafkaApis的Handle方法在接受到指令后
case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)
接着是在stopReplicas方法中
{
controllerEpoch = stopReplicaRequest.controllerEpoch
// First stop fetchers for all partitions, then stop the corresponding replicas
replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map(r =>TopicAndPartition(r.topic, r.partition)))
for(topicAndPartition <- stopReplicaRequest.partitions){
val errorCode = stopReplica(topicAndPartition.topic, topicAndPartition.partition,stopReplicaRequest.deletePartitions)
responseMap.put(topicAndPartition, errorCode)
}
(responseMap, ErrorMapping.NoError)
}
进一步进入stopReplica方法,正式进入日志删除
getPartition(topic, partitionId) match {
case Some(partition) =>
if(deletePartition) {
val removedPartition = allPartitions.remove((topic, partitionId))
if (removedPartition != null)
removedPartition.delete() // this will delete the local log
}
以上就是kafka的整个日志删除流水。4,手动删除zk上topic信息和磁盘数据
TopicChangeListener会监听处理,但是处理很简单,只是更新了
val deletedTopics = controllerContext.allTopics – currentChildren
controllerContext.allTopics = currentChildrenval addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient,newTopics.toSeq)
controllerContext.partitionReplicaAssignment =controllerContext.partitionReplicaAssignment.filter(p =>
四,总结Kafka的topic的删除过程,实际上就是基于Zookeeper做了一个订阅发布系统。Zookeeper的客户端创建一个节点/admin/delete_topics/,由kafka Controller监听到事件之后正式触发topic的删除:解除Partition变更监听的listener,清除内存数据结构,删除副本数据,删除topic的相关Zookeeper节点。想学习交流HashMap,nginx、dubbo、Spring MVC,分布式、高性能高可用、MySQL,redis、jvm、多线程、netty、kafka、的加尉xin(同英):1253431195 扩列获取资料学习,无工作经验不要加哦!
delete.topic.enable配置该参数为false的情况下执行了topic的删除命令,实际上未做任何动作。我们此时要彻底删除topic建议修改该参数为true,重启kafka,这样topic信息会被彻底删除,已经测试。
一般流行的做法是手动删除Zookeeper的topic相关信息及磁盘数据但是这样的话会造成部分内存数据未清除。至于是否会有隐患,未测试。
更多相关内容 -
面试题:一个consumer订阅两个topic,其中一个topic消息过多堆积了,会影响另一个topic消费吗
2021-07-15 11:14:14问题:一个consumer订阅两个topic,其中一个topic消息过多堆积了,会影响另一个topic消费吗 答案:不影响。 为什么呢? 因为rocketmq首先对消息进行负载均衡(rebalance),就是将topic中的队列按照consumer进行分配...问题:一个consumer订阅两个topic,其中一个topic消息过多堆积了,会影响另一个topic消费吗
答案:不影响。
为什么呢?-
因为rocketmq首先对消息进行负载均衡(rebalance),就是将topic中的队列按照consumer进行分配之后,将pullRequest(里面存放了topic,brokerName,queueId)放入到一个linkedBlockingQueue中,这个时候已经排好了后面消费的顺序。例如100个request中有大概50个是TopicTest1,另外50个是TopicTest2。
-
在实际进行拉取的时候,多个线程从LinkedBlockingQueue中去take消息,按照放入的顺序进行消费,这个时候topicTest1消息已经堆积了,但还是会照常去消费TopicTest2,如果处理速度不影响的话,只有当TopicTest消费的速度过慢,导致将所有的线程都占住了,那也只是一时的,会影响TopicTest2的消费速度,但不会阻塞TopicTest2的消费。
-
如下图所示,PullMessageService线程负责将request放入队列,同时NettyClientPublicExecutor_1,2,3,4负责处理消息拉取,他们会并发从队列中获取消息,当获取到消息并处理完后会接着去take新的消息然后继续处理。
-------放入消息-----:MessageQueue [topic=%RETRY%testConsumer, brokerName=broker-a, queueId=0]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=1]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=0]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_4 处理TopicTest2...NettyClientPublicExecutor_4 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=1]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=0]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_3 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_1 处理TopicTest2...NettyClientPublicExecutor_1 拉取到消息:TopicTest2,NettyClientPublicExecutor_2 处理TopicTest2...NettyClientPublicExecutor_2 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_3 处理TopicTest2...NettyClientPublicExecutor_3 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_4 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_4 处理TopicTest2...NettyClientPublicExecutor_4 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_1 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_1 处理TopicTest2...NettyClientPublicExecutor_1 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_2 拉取到消息:TopicTest2,NettyClientPublicExecutor_3 处理TopicTest2...NettyClientPublicExecutor_3 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_2 处理TopicTest2...NettyClientPublicExecutor_2 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_4 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_4 处理TopicTest2...NettyClientPublicExecutor_4 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_1 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_1 处理TopicTest2...NettyClientPublicExecutor_1 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_3 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService -------放入消息-----:MessageQueue [topic=%RETRY%testConsumer, brokerName=broker-a, queueId=0]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=0]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=1]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_3 处理TopicTest2...NettyClientPublicExecutor_3 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_2 拉取到消息:TopicTest2,NettyClientPublicExecutor_2 处理TopicTest2...NettyClientPublicExecutor_2 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_4 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_4 处理TopicTest2...NettyClientPublicExecutor_4 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_1 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_1 处理TopicTest2...NettyClientPublicExecutor_1 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_3 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_3 处理TopicTest2...NettyClientPublicExecutor_3 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_2 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_2 处理TopicTest2...NettyClientPublicExecutor_2 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_4 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_4 处理TopicTest2...NettyClientPublicExecutor_4 拉取到消息:TopicTest1,NettyClientPublicExecutor_1 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_1 处理TopicTest2...NettyClientPublicExecutor_1 拉取到消息:TopicTest1,NettyClientPublicExecutor_3 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_3 处理TopicTest2...NettyClientPublicExecutor_3 -------放入消息-----:MessageQueue [topic=%RETRY%testConsumer, brokerName=broker-a, queueId=0]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=0]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=1]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_2 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_2 处理TopicTest2...NettyClientPublicExecutor_2 拉取到消息:TopicTest1,NettyClientPublicExecutor_4 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_4 处理TopicTest2...NettyClientPublicExecutor_4 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_3 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_1 处理TopicTest2...NettyClientPublicExecutor_1 拉取到消息:TopicTest2,NettyClientPublicExecutor_3 处理TopicTest2...NettyClientPublicExecutor_3 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_2 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_2 处理TopicTest2...NettyClientPublicExecutor_2 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_4 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_4 处理TopicTest2...NettyClientPublicExecutor_4 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_1 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_3 处理TopicTest2...NettyClientPublicExecutor_3 拉取到消息:TopicTest2,NettyClientPublicExecutor_1 处理TopicTest2...NettyClientPublicExecutor_1 -------放入消息-----:MessageQueue [topic=%RETRY%testConsumer, brokerName=broker-a, queueId=0]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_2 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=1]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=0]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_2 处理TopicTest2...NettyClientPublicExecutor_2 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_4 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_4 处理TopicTest2...NettyClientPublicExecutor_4 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_3 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_3 处理TopicTest2...NettyClientPublicExecutor_3 拉取到消息:TopicTest1,NettyClientPublicExecutor_1 拉取到消息:TopicTest2,NettyClientPublicExecutor_1 处理TopicTest2...NettyClientPublicExecutor_1 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_2 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_2 处理TopicTest2...NettyClientPublicExecutor_2 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_4 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_4 处理TopicTest2...NettyClientPublicExecutor_4 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_3 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_1 处理TopicTest2...NettyClientPublicExecutor_1 拉取到消息:TopicTest2,NettyClientPublicExecutor_3 处理TopicTest2...NettyClientPublicExecutor_3 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_2 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService -------放入消息-----:MessageQueue [topic=%RETRY%testConsumer, brokerName=broker-a, queueId=0]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=0]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=1]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_2 处理TopicTest2...NettyClientPublicExecutor_2 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_4 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_4 处理TopicTest2...NettyClientPublicExecutor_4 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_3 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_1 处理TopicTest2...NettyClientPublicExecutor_1 拉取到消息:TopicTest2,NettyClientPublicExecutor_3 处理TopicTest2...NettyClientPublicExecutor_3 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_2 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_2 处理TopicTest2...NettyClientPublicExecutor_2 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_4 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_4 处理TopicTest2...NettyClientPublicExecutor_4 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_1 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_1 处理TopicTest2...NettyClientPublicExecutor_1 -------放入消息-----:MessageQueue [topic=%RETRY%testConsumer, brokerName=broker-a, queueId=0]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=1]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=0]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_2 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_3 处理TopicTest2...NettyClientPublicExecutor_3 拉取到消息:TopicTest2,NettyClientPublicExecutor_2 处理TopicTest2...NettyClientPublicExecutor_2 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_4 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_4 处理TopicTest2...NettyClientPublicExecutor_4 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_1 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_1 处理TopicTest2...NettyClientPublicExecutor_1 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_2 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_3 处理TopicTest2...NettyClientPublicExecutor_3 拉取到消息:TopicTest2,NettyClientPublicExecutor_2 处理TopicTest2...NettyClientPublicExecutor_2 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_4 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_4 处理TopicTest2...NettyClientPublicExecutor_4 -------放入消息-----:MessageQueue [topic=%RETRY%testConsumer, brokerName=broker-a, queueId=0]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=0]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=1]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_1 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_1 处理TopicTest2...NettyClientPublicExecutor_1 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_3 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_2 处理TopicTest2...NettyClientPublicExecutor_2 拉取到消息:TopicTest2,NettyClientPublicExecutor_3 处理TopicTest2...NettyClientPublicExecutor_3 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_4 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_4 处理TopicTest2...NettyClientPublicExecutor_4 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_1 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_1 处理TopicTest2...NettyClientPublicExecutor_1 拉取到消息:TopicTest1,NettyClientPublicExecutor_2 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_2 处理TopicTest2...NettyClientPublicExecutor_2 拉取到消息:TopicTest2,NettyClientPublicExecutor_3 处理TopicTest2...NettyClientPublicExecutor_3 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_4 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_4 处理TopicTest2...NettyClientPublicExecutor_4 -------放入消息-----:MessageQueue [topic=%RETRY%testConsumer, brokerName=broker-a, queueId=0]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=0]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=1]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_1 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_1 处理TopicTest2...NettyClientPublicExecutor_1 -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2]PullMessageService -------放入消息-----:MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1]PullMessageService 拉取到消息:TopicTest1,NettyClientPublicExecutor_3 -------放入消息-----:MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2]PullMessageService 拉取到消息:TopicTest2,NettyClientPublicExecutor_2 处理TopicTest2...NettyClientPublicExecutor_2 拉取到消息:TopicTest2,NettyClientPublicExecutor_3 处理TopicTest2...NettyClientPublicExecutor_3
-
-
kakfa 3.0 创建topic流程(源码)
2022-03-13 02:34:57文章目录1、通过create命令到组装创建topic需要的数据流程(scala部分)2、创建topic的请求实现,再通过队列线程异步执行实现(1)runnable.call(队列和线程后台线程执行)(2)getCreateTopicsCall(创建发送创建topic的...文章目录
1、通过create命令到组装创建topic需要的数据流程(scala部分)
首先创建kafka topic的命令是下面这个
bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name \--partitions 20 --replication-factor 3 --config x=y
--bootstrap-server
某一台kafka服务器地址和端口
--create
代表这个命令是创建
--topic
后面是想创建的topic
partitions
主动设置分区数
--replication-factor
主动设置一个分区数中有几个副本
--config x=y
在命令行上添加的配置会覆盖服务器的默认设置,例如数据应该保留的时间长度。此处记录了完整的每个主题配置集。选填之后再看
kafka-topics.sh
里面的命令exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
知道了其实是执行了源码
core/src/main/scala/kafka/admin/TopicCommand.scala
文件中的方法
这里需要注意的是从kafka 2.8以后,删除了ZooKeeper,通过KRaft
进行自己的集群管理,所以下面源码中没有ZookeeperTopicService
这个创建topic的方法了object TopicCommand extends Logging { def main(args: Array[String]): Unit = { val opts = new TopicCommandOptions(args) opts.checkArgs() //初始化得到实例化的topicService val topicService = TopicService(opts.commandConfig, opts.bootstrapServer) var exitCode = 0 try { if (opts.hasCreateOption) //这个是通过判断命令中的是否是--create 关键字来判断是否执行createTopic topicService.createTopic(opts) else if (opts.hasAlterOption) topicService.alterTopic(opts) else if (opts.hasListOption) topicService.listTopics(opts) else if (opts.hasDescribeOption) topicService.describeTopic(opts) else if (opts.hasDeleteOption) topicService.deleteTopic(opts) } catch { case e: ExecutionException => if (e.getCause != null) printException(e.getCause) else printException(e) exitCode = 1 case e: Throwable => printException(e) exitCode = 1 } finally { topicService.close() Exit.exit(exitCode) } }
TopicService(opts.commandConfig, opts.bootstrapServer)
执行的是下面的方法中的apply
object TopicService { def createAdminClient(commandConfig: Properties, bootstrapServer: Option[String]): Admin = { bootstrapServer match { case Some(serverList) => commandConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, serverList) case None => } Admin.create(commandConfig) } def apply(commandConfig: Properties, bootstrapServer: Option[String]): TopicService = new TopicService(createAdminClient(commandConfig, bootstrapServer)) }
之后又调用的
createAdminClient
创建的一个客户端,来创建topic下面就是验证参数,是否指定参数设置等等,之后调用新创建的clien创建topic
case class TopicService private (adminClient: Admin) extends AutoCloseable { def createTopic(opts: TopicCommandOptions): Unit = { //创建一个topic,把输入参数,比如分区数,副本数等等参数设置上 val topic = new CommandTopicPartition(opts) if (Topic.hasCollisionChars(topic.name)) //检查topic名称中的特殊字符 println("WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could " + "collide. To avoid issues it is best to use either, but not both.") createTopic(topic) } def createTopic(topic: CommandTopicPartition): Unit = { // //如果配置了副本副本数--replication-factor 一定要大于0 if (topic.replicationFactor.exists(rf => rf > Short.MaxValue || rf < 1)) throw new IllegalArgumentException(s"The replication factor must be between 1 and ${Short.MaxValue} inclusive") //如果配置了--partitions 分区数 必须大于0 if (topic.partitions.exists(partitions => partitions < 1)) throw new IllegalArgumentException(s"The partitions must be greater than 0") try { val newTopic = if (topic.hasReplicaAssignment) // 如果指定了--replica-assignment参数;则按照指定的来分配副本 new NewTopic(topic.name, asJavaReplicaReassignment(topic.replicaAssignment.get)) else { new NewTopic( topic.name, topic.partitions.asJava, topic.replicationFactor.map(_.toShort).map(Short.box).asJava) } //将配置--config 解析成一个配置map val configsMap = topic.configsToAdd.stringPropertyNames() .asScala .map(name => name -> topic.configsToAdd.getProperty(name)) .toMap.asJava newTopic.configs(configsMap) //调用adminClient创建Topic val createResult = adminClient.createTopics(Collections.singleton(newTopic), new CreateTopicsOptions().retryOnQuotaViolation(false)) createResult.all().get() println(s"Created topic ${topic.name}.") } catch { case e : ExecutionException => if (e.getCause == null) throw e if (!(e.getCause.isInstanceOf[TopicExistsException] && topic.ifTopicDoesntExist())) throw e.getCause } }
2、创建一个客户端,此客户端通过队列多线程异步发送创建topic的请求
在
KafkaAdminClient.java
中的createTopics
方法@Override public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics, final CreateTopicsOptions options) { final Map<String, KafkaFutureImpl<TopicMetadataAndConfig>> topicFutures = new HashMap<>(newTopics.size()); final CreatableTopicCollection topics = new CreatableTopicCollection(); //遍历要创建的topic集合 for (NewTopic newTopic : newTopics) { if (topicNameIsUnrepresentable(newTopic.name())) { //topic名称不存在 KafkaFutureImpl<TopicMetadataAndConfig> future = new KafkaFutureImpl<>(); future.completeExceptionally(new InvalidTopicException("The given topic name '" + newTopic.name() + "' cannot be represented in a request.")); topicFutures.put(newTopic.name(), future); } else if (!topicFutures.containsKey(newTopic.name())) {//防止发一次创建多个topic时有重复的 topicFutures.put(newTopic.name(), new KafkaFutureImpl<>()); topics.add(newTopic.convertToCreatableTopic()); } } //如果topics不为null。则去创建 if (!topics.isEmpty()) { final long now = time.milliseconds(); final long deadline = calcDeadlineMs(now, options.timeoutMs()); //初始化创建topic的调用, final Call call = getCreateTopicsCall(options, topicFutures, topics, Collections.emptyMap(), now, deadline); //这里面才是调用,上面call只是初始化 runnable.call(call, now); } return new CreateTopicsResult(new HashMap<>(topicFutures)); }
(1)runnable.call(队列和多线程执行)
为什么先讲解这个?而不是先
getCreateTopicsCall
?因为我觉得先看这个比较好理解,因为它不是单调执行的一步到位,比如先看getCreateTopicsCall
会有点迷糊/** * Initiate a new call. *发起新呼叫 * This will fail if the AdminClient is scheduled to shut down. *如果 AdminClient 计划关闭,这将失败 * @param call The new call object. * @param now The current time in milliseconds. */ void call(Call call, long now) { if (hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME) { log.debug("The AdminClient is not accepting new calls. Timing out {}.", call); call.handleTimeoutFailure(time.milliseconds(), new TimeoutException("The AdminClient thread is not accepting new calls.")); } else { enqueue(call, now); } } /** * Queue a call for sending. *排队发送呼叫 * If the AdminClient thread has exited, this will fail. Otherwise, it will succeed (even * if the AdminClient is shutting down). This function should called when retrying an * existing call. *如果 AdminClient 线程已退出,这将失败。否则,它将成功(即使 如果 AdminClient 正在关闭)。 * 重试现有调用时应调用此函数 * @param call The new call object. * @param now The current time in milliseconds. */ void enqueue(Call call, long now) { if (call.tries > maxRetries) { log.debug("Max retries {} for {} reached", maxRetries, call); call.handleTimeoutFailure(time.milliseconds(), new TimeoutException( "Exceeded maxRetries after " + call.tries + " tries.")); return; } if (log.isDebugEnabled()) { log.debug("Queueing {} with a timeout {} ms from now.", call, Math.min(requestTimeoutMs, call.deadlineMs - now)); } boolean accepted = false; //把call放到一个newCalls队列中 synchronized (this) { if (!closing) { newCalls.add(call); accepted = true; } } //唤醒线程去执行 if (accepted) { client.wakeup(); // wake the thread if it is in poll()如果线程处于轮询中,则唤醒线程 } else { log.debug("The AdminClient thread has exited. Timing out {}.", call); call.handleTimeoutFailure(time.milliseconds(), new TimeoutException("The AdminClient thread has exited.")); } }
client.wakeup()
唤醒的线程执行下面的@Override public void run() { log.debug("Thread starting"); try { //这里是处理请求 processRequests(); } finally { closing = true; //省略 log.debug("Exiting AdminClientRunnable thread."); } } private void processRequests() { long now = time.milliseconds(); while (true) { // Copy newCalls into pendingCalls. //将 newCalls 复制到 pendingCalls drainNewCalls(); // Check if the AdminClient thread should shut down. //检查 AdminClient 线程是否应该关闭 long curHardShutdownTimeMs = hardShutdownTimeMs.get(); if ((curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) && threadShouldExit(now, curHardShutdownTimeMs)) break; // Handle timeouts. //处理超时 TimeoutProcessor timeoutProcessor = timeoutProcessorFactory.create(now); timeoutPendingCalls(timeoutProcessor); timeoutCallsToSend(timeoutProcessor); timeoutCallsInFlight(timeoutProcessor); long pollTimeout = Math.min(1200000, timeoutProcessor.nextTimeoutMs()); if (curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) { pollTimeout = Math.min(pollTimeout, curHardShutdownTimeMs - now); } // Choose nodes for our pending calls.为我们的待处理呼叫选择节点 pollTimeout = Math.min(pollTimeout, maybeDrainPendingCalls(now)); long metadataFetchDelayMs = metadataManager.metadataFetchDelayMs(now); if (metadataFetchDelayMs == 0) { metadataManager.transitionToUpdatePending(now); Call metadataCall = makeMetadataCall(now); // Create a new metadata fetch call and add it to the end of pendingCalls. //创建一个新的元数据获取调用并将其添加到 pendingCalls 的末尾 // Assign a node for just the new call (we handled the other pending nodes above). //为新调用分配一个节点(我们处理了上面的其他待处理节点)。 if (!maybeDrainPendingCall(metadataCall, now)) pendingCalls.add(metadataCall); } pollTimeout = Math.min(pollTimeout, sendEligibleCalls(now)); if (metadataFetchDelayMs > 0) { pollTimeout = Math.min(pollTimeout, metadataFetchDelayMs); } // Ensure that we use a small poll timeout if there are pending calls which need to be sent //如果有待发送的呼叫需要发送,请确保我们使用一个小的轮询超时 if (!pendingCalls.isEmpty()) pollTimeout = Math.min(pollTimeout, retryBackoffMs); // Wait for network responses. //等待网络响应 log.trace("Entering KafkaClient#poll(timeout={})", pollTimeout); List<ClientResponse> responses = client.poll(Math.max(0L, pollTimeout), now); log.trace("KafkaClient#poll retrieved {} response(s)", responses.size()); // unassign calls to disconnected nodes //取消对断开节点的调用 unassignUnsentCalls(client::connectionFailed); // Update the current time and handle the latest responses. //更新当前时间并处理最新响应 now = time.milliseconds(); handleResponses(now, responses); } }
sendEligibleCalls
这个方法是实际调用的call的方法/** * Send the calls which are ready. *发送准备好的电话 * @param now The current time in milliseconds. * @return The minimum timeout we need for poll(). */ private long sendEligibleCalls(long now) { long pollTimeout = Long.MAX_VALUE; for (Iterator<Map.Entry<Node, List<Call>>> iter = callsToSend.entrySet().iterator(); iter.hasNext(); ) { Map.Entry<Node, List<Call>> entry = iter.next(); List<Call> calls = entry.getValue(); if (calls.isEmpty()) { iter.remove(); continue; } //省略。。。 while (!calls.isEmpty()) { Call call = calls.remove(0); int timeoutMs = Math.min(remainingRequestTime, calcTimeoutMsRemainingAsInt(now, call.deadlineMs)); AbstractRequest.Builder<?> requestBuilder; try { //获得call中的requestBuilder requestBuilder = call.createRequest(timeoutMs); } catch (Throwable t) { call.fail(now, new KafkaException(String.format( "Internal error sending %s to %s.", call.callName, node), t)); continue; } ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true, timeoutMs, null); log.debug("Sending {} to {}. correlationId={}, timeoutMs={}", requestBuilder, node, clientRequest.correlationId(), timeoutMs); //实际调用请求 client.send(clientRequest, now); callsInFlight.put(node.idString(), call); correlationIdToCalls.put(clientRequest.correlationId(), call); break; } } return pollTimeout; }
这里需要多注意一下
requestBuilder = call.createRequest(timeoutMs);
这一行,下面getCreateTopicsCall
才是requestBuilder 的初始化(2)getCreateTopicsCall(创建发送创建topic的requestBuilder)
看完上面的runnable.call,下面接着看
getCreateTopicsCall
如何生成Call
的。private Call getCreateTopicsCall(final CreateTopicsOptions options, final Map<String, KafkaFutureImpl<TopicMetadataAndConfig>> futures, final CreatableTopicCollection topics, final Map<String, ThrottlingQuotaExceededException> quotaExceededExceptions, final long now, final long deadline) { return new Call("createTopics", deadline, new ControllerNodeProvider()) { @Override public CreateTopicsRequest.Builder createRequest(int timeoutMs) { return new CreateTopicsRequest.Builder( new CreateTopicsRequestData() .setTopics(topics) .setTimeoutMs(timeoutMs) .setValidateOnly(options.shouldValidateOnly())); } @Override public void handleResponse(AbstractResponse abstractResponse) { //省略.. } private ConfigEntry configEntry(CreatableTopicConfigs config) { return new ConfigEntry( config.name(), config.value(), configSource(DescribeConfigsResponse.ConfigSource.forId(config.configSource())), config.isSensitive(), config.readOnly(), Collections.emptyList(), null, null); } @Override void handleFailure(Throwable throwable) { // If there were any topics retries due to a quota exceeded exception, we propagate // the initial error back to the caller if the request timed out. maybeCompleteQuotaExceededException(options.shouldRetryOnQuotaViolation(), throwable, futures, quotaExceededExceptions, (int) (time.milliseconds() - now)); // Fail all the other remaining futures completeAllExceptionally(futures.values(), throwable); } }; }
其中
new ControllerNodeProvider()
返回的是controller列表,这样的话相当于服务端是用controller
接收的,/** * Provides the controller node. * 提供控制器节点 */ private class ControllerNodeProvider implements NodeProvider { @Override public Node provide() { if (metadataManager.isReady() && (metadataManager.controller() != null)) { return metadataManager.controller(); } metadataManager.requestUpdate(); return null; } }
3、服务端创建topic的请求(handleCreateTopicsRequest)
(1)这里先看一下kafka集群启动时的操作
为什么要加这一步?
主要是因为从kafka2.8开始,除了zk我们又有新的选择,用kraft来做zk的工作,并被称为革命性的,但是旧的zk其实没有被废弃,只是提供了新的选择可以去看我另一篇文章:kafka 2.8 如何选择启用kraft还是ZooKeeper(选择逻辑源码,不涉及到kraft的实现)
(2)在初始化KafkaApis时如何选择是zk的还是raft的
在启动kafka时,会调用startup做初始化
后面只演示KafkaRaftServer
def startup(): Unit = { //省略 // Create the request processor objects. //创建请求处理器对象,这里需要特别注意raftSupport 和在new KafkaApis中参数的位置。 val raftSupport = RaftSupport(forwardingManager, metadataCache) dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, raftSupport, replicaManager, groupCoordinator, transactionCoordinator, autoTopicCreationManager, config.nodeId, config, metadataCache, metadataCache, metrics, authorizer, quotaManagers, fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager) dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix) //省略 }
这里看一下
KafkaApis
的构造代码,可以认为服务端是controller
的列表,在KafkaApis.scala
文件中/** * Logic to handle the various Kafka requests */ class KafkaApis(val requestChannel: RequestChannel, val metadataSupport: MetadataSupport, val replicaManager: ReplicaManager, val groupCoordinator: GroupCoordinator, val txnCoordinator: TransactionCoordinator, val autoTopicCreationManager: AutoTopicCreationManager, val brokerId: Int, val config: KafkaConfig, val configRepository: ConfigRepository, val metadataCache: MetadataCache, val metrics: Metrics, val authorizer: Option[Authorizer], val quotas: QuotaManagers, val fetchManager: FetchManager, brokerTopicStats: BrokerTopicStats, val clusterId: String, time: Time, val tokenManager: DelegationTokenManager, val apiVersionManager: ApiVersionManager) extends ApiRequestHandler with Logging
其中第二个位置
MetadataSupport
,在startup
中是raftSuppert
,所以后面的源码如果出现MetadataSupport
调用如果获得的字段是带zk,不要认为就是zk相关的,其实是raft创建的客户端发送的创建topic请求是由
handleCreateTopicsRequest
接收处理,/** * Top-level method that handles all requests and multiplexes to the right api * 处理所有请求并多路复用到正确 api 的顶级方法 */ override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { try { //省略。。。 request.header.apiKey match { //省略。。 case ApiKeys.CREATE_TOPICS => maybeForwardToController(request, handleCreateTopicsRequest) //省略。。 } } catch { //省略 } finally { //省略 } }
maybeForwardToController
这个就不多做解释,直接看handleCreateTopicsRequest
吧def handleCreateTopicsRequest(request: RequestChannel.Request): Unit = { //虽然字段名是zkSupport 但实际上是raftSupport,原因看3(1) val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request)) //省略 val createTopicsRequest = request.body[CreateTopicsRequest] val results = new CreatableTopicResultCollection(createTopicsRequest.data.topics.size) //如果当前Broker不是属于Controller的话,就抛出异常 if (!zkSupport.controller.isActive) { createTopicsRequest.data.topics.forEach { topic => results.add(new CreatableTopicResult().setName(topic.name) .setErrorCode(Errors.NOT_CONTROLLER.code)) } sendResponseCallback(results) } else { //省略 zkSupport.adminManager.createTopics( createTopicsRequest.data.timeoutMs, createTopicsRequest.data.validateOnly, toCreate, authorizedForDescribeConfigs, controllerMutationQuota, handleCreateTopicsResults) }
zkSupport.adminManager.createTopics这里面是实际的调用,
(3)、这里卡住了,
1、
zkSupport.adminManager.createTopics
为什么走的是ZkAdminManager
中的createTopics
方法,不应该有个RaftAdminManager的吗?
2、val zkSupport = metadataSupport.requireZkOrThrow
的实现case class ZkSupport(adminManager: ZkAdminManager, controller: KafkaController, zkClient: KafkaZkClient, forwardingManager: Option[ForwardingManager], metadataCache: ZkMetadataCache) extends MetadataSupport { val adminZkClient = new AdminZkClient(zkClient) override def requireZkOrThrow(createException: => Exception): ZkSupport = this override def requireRaftOrThrow(createException: => Exception): RaftSupport = throw createException override def ensureConsistentWith(config: KafkaConfig): Unit = { if (!config.requiresZookeeper) { throw new IllegalStateException("Config specifies Raft but metadata support instance is for ZooKeeper") } } override def maybeForward(request: RequestChannel.Request, handler: RequestChannel.Request => Unit, responseCallback: Option[AbstractResponse] => Unit): Unit = { forwardingManager match { case Some(mgr) if !request.isForwarded && !controller.isActive => mgr.forwardRequest(request, responseCallback) case _ => handler(request) } } override def controllerId: Option[Int] = metadataCache.getControllerId } case class RaftSupport(fwdMgr: ForwardingManager, metadataCache: KRaftMetadataCache) extends MetadataSupport { override val forwardingManager: Option[ForwardingManager] = Some(fwdMgr) override def requireZkOrThrow(createException: => Exception): ZkSupport = throw createException override def requireRaftOrThrow(createException: => Exception): RaftSupport = this override def ensureConsistentWith(config: KafkaConfig): Unit = { if (config.requiresZookeeper) { throw new IllegalStateException("Config specifies ZooKeeper but metadata support instance is for Raft") } } override def maybeForward(request: RequestChannel.Request, handler: RequestChannel.Request => Unit, responseCallback: Option[AbstractResponse] => Unit): Unit = { if (!request.isForwarded) { fwdMgr.forwardRequest(request, responseCallback) } else { handler(request) // will reject } }
如果是zk模式则是ZkSupport下的
requireZkOrThrow
还好理解,如果是raft则是RaftSupport的requireZkOrThrow
,那override def requireZkOrThrow(createException: => Exception): ZkSupport = throw createException
返回给handleCreateTopicsRequest
的zkSupport
还用继续走下面的zkSupport.adminManager.createTopics
吗?有知道的给个解释吧?或者以后再看看
-
kafka如何创建topic
2021-05-29 17:01:20Kafka创建topic命令很简单,一条命令足矣:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test 这条命令会创建一个名为test的topic,有3个分区,每...Kafka创建topic命令很简单,一条命令足矣:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test
这条命令会创建一个名为test的topic,有3个分区,每个分区需分配3个副本。那么在这条命令之后Kafka又做了什么事情呢?本文将对此进行一下梳理,完整地阐述Kafka topic是如何创建的。
topic创建主要分为两个部分:命令行部分+后台(controller)逻辑部分,如下图所示。主要的思想就是后台逻辑会监听zookeeper下对应的目录节点,一旦发起topic创建命令,该命令会创建新的数据节点从而触发后台的创建逻辑。
简单来说我们发起的命令行主要做两件事情:1. 确定分区副本的分配方案(就是每个分区的副本都分配到哪些broker上);2. 创建zookeeper节点,把这个方案写入/brokers/topics/<topic>节点下
Kafka controller部分主要做下面这些事情:1. 创建分区;2. 创建副本;3. 为每个分区选举leader、ISR;4.更新各种缓存
下面我们详细说说其中的逻辑。在开始之前,我们假设本例中要创建的topic名字是test,有3个分区,副本因子(replication-factor)也是3。注意:本文只涉及主要的逻辑,一些非默认行为不在此次讨论之中。
命令行部分
我们发起topic创建命令之后,Kafka会做一些基本的校验,比如是否同时指定了分区数、副本因子或是topic名字中是否含有非法字符等。值得一提的是,0.10版本支持指定broker的机架信息,类似于Hadoop那样,可以更好地利用局部性原理减少集群中网络开销。如果指定了机架信息(broker.rack), Kafka在为分区做副本分配时就会考虑这部分信息,尽可能地为副本挑选不同机架的broker。当然本例中我们暂时不考虑机架信息对于创建topic的影响。
做完基本的校验之后,Kafka会从zookeeper的/brokers/ids下获取集群当前存活broker列表然后开始执行副本的分配工作。首先,分区副本的分配有以下3个目标:
- 尽可能地在各个broker之间均匀地分配副本
- 如果分区的某个副本被分配到了一个broker,那么要尽可能地让该分区的其他副本均匀地分配到其他broker上
- 如果所有broker都指定了机架信息,那么尽可能地让每个分区的副本都分配到不同的机架上
第3个目标目前对于我们没什么用,那前两点是如何做到的?如果直接看源码可能有些晦涩难懂,概括起来就一句话:随机挑选一个broker采用轮询的方式分配每个分区的第一个副本,然后采用增量右移的方式分配其他的副本。好像还是比较难理解,是吧? 那我举个例子吧:假设你有10个分区p0, p1, p2, ..., p9,每个分区的副本因子都是3,即总共30个副本,要分配在5个broker(b0, b1, b2, b3, b4)上,采用上面的策略就是这样的:
1 Kafka会从5个broker中随机选一个broker,假设它选了b0
2 它会依次采用轮询的方式为所有分区分配第一个副本,如下表所示。即从b0开始依次顺序分配broker给10个分区的第一个副本。
3 目前Kafka已经分配了10个副本,剩下的20个副本Kafka会采用增量右移的方式,比如如果前两行是1,2,3,4,5(第一行) 6,7,8,9,10(第二行),那么第3行右移1位,变成5,1,2,3,4,第4行右移2位,变成:9, 10, 6, 7, 8,以此类推。那么采用这种方式分配的副本方案如下表所示:
当然,如果考虑机架信息,分配算法会有所调整,但基本上也是满足上面那3个目标的。
对于本文中使用的例子,我们假设分配方案如下:(格式是分区号 -> 副本所在broker Id集合)
0 -> [0,1,2]
1 -> [1,0,2]
2 -> [2,0,1]
确定了分区副本分配方案之后,Kafka会把这个分配方案持久化到zookeeper的/brokers/topics/<topic>节点下,类似于这样的信息:{"version":1,"partitions":{"0":[0,1,2],"1":[1,0,2],"2":[2,0,1]}}
okay,至此命令行部分的工作就算完成了,此时你应该可以看到Kafka会返回Created topic "test"给你,表明topic创建成功。但是,千万不要以为Kafka创建topic的工作就完成了,后面还有很多事情要做,即controller要登场了。
后台逻辑部分
所谓的后台逻辑其实是由Kafka的controller负责提供的。Kafka的controller内部保存了很多信息,其中有一个分区状态机,用于记录topic各个分区的状态。这个状态机内部注册了一些zookeeper监听器。Controller在启动的时候会创建这些监听器。其中一个监听器(TopicChangeListener)就是用于监听zookeeper的/brokers/topics目录的子节点变化的。一旦该目录子节点数发生变化就会调用这个监听器的处理方法。对于上面的例子来说,由于命令行已将分配方案持久化到/brokers/topics/test下,所以会触发该监听器的处理方法。
TopicChangeListener监听器一方面会更新controller的缓存信息(比如更新集群当前所有的topic列表以及更新新增topic的分区副本分配方案缓存等),另一方面就是创建对应的分区及其副本对象并为每个分区确定leader副本及ISR。
至此,整个topic的创建就完成了!
====================================================================================================================
显然,刚才关于后台controller逻辑部分几乎就是一笔带过了,没有详细展开。毕竟如果直接讲代码会比较枯燥。一般情况下,我们了解到此程度就可以了。下面将针对代码详细分析下controller是如何创建topic的。
上边提到过,controller内部定义了很多数据结构用于记录当前集群的各种状态。在Controller中还分别定义了一个分区状态机(PartitionStateMachine)和副本状态机(ReplicaStateMachine),分别记录各个分区的状态和状态流转,如下面两张图所示:
咋一看,这两张图似乎差不多,但一个是分区状态流转,一个是副本状态流转。不管是分区还是副本,只有处于Online状态的才能正常工作。当然在设置这个状态之前必须要先完成一些工作。下面详细说说:
1 首先,分区状态机的registerPartitionChangeListener方法会注册一个zookeeper监听器,监听到/brokers/topics下新增了test节点之后,立即处理TopicChangeListener的handleChildChange方法
2 handleChildChange方法的具体逻辑是:
2.1 结合controller缓存的topic列表和/brokers/topics目录下的topic列表,找出新增的topic:test。假设controller topic列表是A,/brokers/topics下列表是B,新增topic列表可由A - B求得
2.2 使用类似的方法,确定已经被删除的topic集合,即B - A
2.3 更新controller缓存的topic列表(把test加进去,把那些已经被删除的topic从缓存中踢出去)
2.4 从/brokers/topics/test节点中取出这个topic所有分区的副本分配方案,然后去更新controller对应的这部分信息(其实也是把test的方案加入到缓存中,另外也会把已删除的topic对应的方案也踢出去)
2.5 调用onNewTopicCreation开始创建topic
3 onNewTopicCreation:创建topic的回调方法,实现真正的创建topic的逻辑:
3.1 注册分区变更监听器——之前说过了分区状态机会注册一些zookeeper监听器,刚刚提到的TopicChangeListener只是其中之一,而这里的监听器是监听topic的分区变化的。该监听器就是PartitionModificationListener类,顾名思义,它负责监听topic下分区的变化情况,具体来说就是监听/brokers/topics/topic节点的数据,一旦发生变化该监听器就会被触发。当然对于创建topic而言,这一步仅仅是注册而已并不会被触发,因为在注册这个监听器之前Kafka已经把数据写入这个节点了。所以此时该监听器不会触发操作,这是为以后修改topic时候使用的。 既然本次不会触发监听器,代码里面就手动调用onNewPartitionCreation来创建分区了
3.2 调用onNewPartitionCreation方法创建分区
4 onNewPartitionCreation: 这个方法的目的就是创建topic的所有分区对象,主要涉及4个步骤:
4.1 创建分区对象,并设置成NewPartition状态:既然叫分区状态机,必然有个地方要保存Kafka集群下所有topic的所有分区的状态。每当有新topic创建时,就需要把新增topic所有分区加入这部分缓存,以达到同步的效果。新增的分区状态统一设置成NewPartition
4.2 为每个分区创建对应的副本对象:Kafka首先从controller缓存中找出这个分区对应的分配方案(还记得吧,controller有个地方保存了所有topic的分区副本分配方案,就是从这里找),然后把这个分区下的所有副本都设置成NewReplica状态——具体来说Kafka是怎么做的呢?首先,它会尝试去获取zookeeper中/brokers/topics/test/partitions/<partitionId>/state节点的数据,该节点保存了每个分区的leader副本和ISR信息。不过对于创建topic来说,目前这个topic的所有分区都没有leader和ISR信息,所以该节点应该还不存在,应该是空——这是正常的,因为后面会开始选举!所以这里Kafka仅仅是更新副本状态机的状态缓存就可以了(忘了说了,既然分区状态机有个缓存保存集群中所有分区的状态,那么副本状态机自然也有类似的缓存来保存集群中所有topic下所有分区的副本的状态,所以此时还需要更新这部分缓存)
4.3 前2步创建了分区对象和副本对象,并分别设置成了NewPartition和NewReplica状态。那么这一步就要把分区状态转换到OnlinePartition,只有处于此状态才可以正常使用。这也是这一步需要做的事情:leader选举! 代码写的很冗长,但简单来说就是选取副本集合中的第一个副本作为leader副本,并把整个副本集合作为ISR。举例来说,对于test的分区0,它的副本集合是0,1,2,那么分区0的leader副本就是0,ISR就是[0,1,2]。之后Kafka会把这些信息连同controller的epoch和leader的epoch(多说一句,controller epoch值表示controller被易主的次数,leader epoch也是同理)一同写入zookeeper的/brokers/topics/test/partitions/0/state节点下,之后更新controller的leader缓存。(再多说一句,controller有个地方记录了topic所有分区的leader和ISR信息)。 okay,现在新增topic的所有分区都选好了leader和ISR,那么就需要让集群中其他broker知晓—— 因此需要发送UpdateMetadataRequest给当前所有broker——具体的发送方法其实就是将分区的leader和ISR信息打包封装进一个map然后为map中的每一项都构造一个UpdateMetadataReuqest对象并通过controller的sendRequest方法发给所有存活着的broker(为什么要发送给所有broker?因为LeaderAndIsr请求是唯一一个所有broker都能立即响应而不需要求助于leader broker的请求!) 具体的发送逻辑由于涉及了Kafka底层网络协议及KafkaApi机制,等以后有机会再详谈吧。。。
4.4 设置副本对象为OnlineReplica:目前所有的分区都已经选好了leader和ISR并已经持久化到zookeeper中,当然还都传播到了其他broker上。那么这最后一步就是将副本状态机中缓存的副本状态从NewReplica转换到OnlineReplica
okay,至此一个topic就完整地创建出来了~~
-
Kafka中@KafkaListener如何动态指定多个topic
2021-08-09 00:38:26如果你想开多条消费者线程去消费这些topic,添加@KafkaListener注解的参数concurrency的值为自己想要的消费者个数即可(注意,消费者数要小于等于你开的所有topic的分区数总和) 运行程序,console打印的效果如下: ... -
rabbitmq中交换机类型direct和topic的区别
2021-03-31 10:01:25rabbitmq主要有三大类交换机:faout,direct,topic,他们从名字上分别是扇区交换机、直连交换机和主题交换机。其实还有headers一类的交换机,这里不去深究。 faout交换机也叫无路由交换机,就是它直接与交换机... -
topic创建详解
2020-03-24 12:38:561、自动创建 如果kafka broker中的config/server....那么当生产者向一个尚未创建的topic发送消息时,会自动创建一个num.partitions(默认值为1)个分区和default.replication.factor(默认值为1)个副本的对应to... -
RocketMq Topic创建和删除
2020-12-24 13:25:09系列开篇这个系列主要用以分析mqadmin常见的比较核心的几个命令,主要包括订阅分组和topic的创建和删除、Topic的权限变更。这篇文章主要是用来分析Topic的创建和删除。创建TopicTopic创建的核心步骤如下1、mqadmin向... -
KafkaController创建topic流程解析
2022-03-18 23:40:35一、kafka-topic.sh 为了便于操作Kafka集群,Kafka源码包中提供了多个shell脚本,其中kafka-topic.sh提供了Topic的创建、修改、列举、描述、删除功能,内部通过TopicCommand来实现。其脚本内容如下: //kafka-run... -
RocketMQ 中Topic、Tag、GroupName基本概念介绍
2021-07-08 00:57:41本文主要介绍RocketMQ中Topic、Tag、GroupName的概念、设计初衷以及使用方法。 一.Topic 首先看看官方的定义: Topic是生产者在发送消息和消费者在拉取消息的类别。Topic与生产者和消费者之间的关系非常松散。具体... -
Kafka:增加Topic的分区数
2022-02-12 14:40:46// 创建Topic,Topic名称为new-topic-user,分区数为1,复制因子为1 admin.createTopic("new-topic-user", 1, (short) 1); // 获取指定Topic的描述 admin.describeTopic("new-topic-user"); // 增加指定Topic的分区... -
kafka 基础概念、命令行操作(查看所有topic、创建topic、删除topic、查看某个Topic的详情、修改分区数、...
2022-07-20 10:32:55命令行操作 2.1 查看所有topic 2.2 创建topic 2.3 删除topic 2.4 查看某个Topic的详情 2.5 修改分区数 2.6 发送消息 2.7 消费消息 2.8 查看消费者组 2.9 更新消费者的偏移位置 前言 kafka官网 1. 基础概念 Broker ... -
【RocketMQ技术系列】「原理分析专题」Broker服务端自动创建topic的原理分析和问题要点指南
2022-01-19 19:51:30使用RocketMQ进行发消息时,一般我们是必须要指定topic,此外topic必须要提前建立,但是topic的创建(自动或者手动方式)的设置有一个开关autoCreateTopicEnable,此部分主要会在broker节点的配置文件的时候进行设置... -
RocketMQ发送消息时topic不存在是如何处理的?RocketMQ发送消息流程
2021-04-11 14:18:59我们知道RocketMQ有一个配置autoCreateTopicEnable,开发环境一般我们会设置为true,允许创建topic,而生产上为了统一管理topic, 则会设置成false,在管理界面由管理员统一创建即可 那么, rocketMQ是如何根据这个配置... -
Kafka的Topic配置详解
2021-01-12 17:40:22一、Kafka中topic级别配置配置topic级别参数时,相同(参数)属性topic级别会覆盖全局的,否则默认为全局配置属性值。创建topic参数可以设置一个或多个--config "Property(属性)",下面是创建一个topic名称为"my-topic... -
kafka topic管理
2022-03-16 21:38:35简记 kafka topic管理常用命令 -
kafka查看topic列表和topic消息
2021-09-28 10:19:15查询topic 列表信息 前提是需要进入到kafka的目录 Linux 目录 \kafka_2.12-2.8.0\bin\ sh kafka-topics.sh --list --zookeeper localhost:2181 windows 目录 \kafka_2.12-2.8.0\bin\windows kafka-topics.bat --... -
kafka删除topic,彻底删除topic
2021-07-13 19:01:09当启动kafka的服务出现Map failed时,需要把有死循环的topic删除 1、删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录 2、Kafka 删除topic的命令是: bin/kafka-... -
【RocketMQ】客户端报大量warn日志:No topic route info in name server for the topic:RMQ_SYS_TRACE_...
2021-05-27 20:06:051、看到业务团队这个表述,让我理解为线上集群存在问题,业务方topic在console创建成功了,但是发送消息时失败了,为了确保线上集群没问题,我先创建了一个测试的topic,写一个Java客户端进行发送消息,发现一切正常... -
sparkstreaming消费多个topic数据获取topic的信息
2020-06-01 18:49:09这个问题是最近一个朋友问我的,用sparkstreaming消费kafka的多个topic,怎么获取topic的信息,然后根据不同topic的数据做不同的逻辑处理.其实这个问题非常简单,... -
RabbitMQ——使用Exchange中的topic交换机实现消息发送和接收
2021-06-26 16:25:562.使用topic交换机实现消息的发送和接收 2.1 编写消息接收类(有三个) 2.2 编写消息发送类 1.写在前面 所有 MQ 产品从模型抽象上来说都是一样的过程: 消费者(consumer)订阅某个队列。生产者(producer)... -
分区配置错误导致Kafka Topic xxx not present in metadata bug问题排查
2021-07-18 18:58:14Kafka Topic xxx not present in metadata bug问题排查 异常堆栈 17343C60D7EF4C23A2EE4E973F22AAAD] ERROR [] o.s.k.s.LoggingProducerListener error:254 - Exception thrown when sending a message with key='... -
ROS基础(一):ROS通讯之话题(topic)通讯
2021-01-08 21:42:58目录 第一章:ROS通讯之话题(topic)通讯 一、topic通讯之基础篇 1. Node Master大管家 2. Node节点 3. message 与topic 4. 小结 5. 实例 二、topic通讯之进阶篇 1. 创建learn_ topic功能包 2. 自定义message 3. 创建... -
kafka系列之:删除kafka topic报Topic is marked for dele,需要手动删除kafka topic对应的目录数据
2022-01-25 00:45:12/data/service/kafka/bin/kafka-topics.sh --delete --topic history-debezium-uds-dd_sdi_ods_uds_uds_stg_1h --zookeeper p-qcbj6-dd-hadoop-common-zk-001:2181/kafka Topic history-debezium-uds-dd_sdi_ods_uds... -
kafka创建topic、添加配置等相关操作
2022-06-28 09:56:15创建topic、kafka命令行消费者、kafka命令行生产者、删除topic、kafka单个topic增加配置。 kafka-console-consumer、kafka-console-producer、kafka-topics、kafka-configs的使用 -
java代码实现kafka的topic创建,删除,分区修改
2020-12-28 10:15:15这种方式使用的是链接zk来创建topic。在所有使用的zk的版本中都是可以用的。 需要用的依赖: <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</... -
ROS保姆级教程(一)--Topic通讯方式实现
2020-08-06 15:33:29这个系列的博客主要用来介绍ROS中的两种通讯方式:topic和service,分为上下两篇。先以经典的小海龟为例,然后再自己动手写一个相关的实现,主要参考古月居大佬的视频教程。 topic publisher的编程实现 上图就是一... -
kafka中的消息(topic)和日志(log)
2022-03-26 18:18:58可以理解Topic是一个类别的名称,同类消息发送到同一个Topic下面。对于每一个Topic,下面可以有多个分区(Partition)日志文件: Partition是一个有序的message序列(消息队列),这些message按顺序添加到一个叫做... -
RocketMQ-Topic创建
2021-12-18 12:36:12这里创建Topic分为集群模式,和Broker模式,二者区别如下 集群模式:该模式下创建的Topic是指当前集群下,所有的Broker中的Queue数量是相同的 Broker模式:该模式下创建的Topic在集群中,每隔Broker中的Queue数量... -
深度解析RocketMQ Topic的创建机制
2019-03-31 09:05:00我还记得第一次使用rocketmq的时候,需要去控制台预先创建topic,我当时就想为什么要这么设计,于是我决定撸一波源码,带大家从根源上吃透rocketmq topic...