精华内容
下载资源
问答
  • RocketMQ源码系列(一) NameServer 核心源码解析

    万次阅读 热门讨论 2021-06-15 17:52:51
    一、NameServer 介绍 二、NameServer 功能列表 三、NameServer 架构分析 四、NameServer 架构

    目录

    一、NameServer 介绍

    二、NameServer 功能列表

    三、NameServer 架构分析

    四、NameServer 工程目录解析

    五、NameServer 启动流程分析

    1)  创建NameSrvController

    2)  执行initialize()加载需要的配置

    3)  启动server

    六、NameServer核心源码解析

    1. 路由注册

    1)  broker向NameServer 发送心跳包

     2)  NameServer 处理心跳包

    2. 路由删除

    3. 路由发现

    小结


    rocketmq版本: 4.8.0

    一、NameServer 介绍

            NameServer 是rocketmq核心组件之一,与zookeeper一样天生具有分布式的特性,在rocketmq中担当着路由注册、发现、动态地维护broker相关信息的角色, NameServer 不提供Master-slave同步机制,但是能够保证数据的最终一致性。

    二、NameServer 功能列表

    1.  动态路由发现和注册功能。broker 启动时,会将brokerAddr 注册到NameServer里, 路由发现是指客户端会定时的向NameServer根据topic拉取路由的最新信息。
    2.  动态剔除功能。每隔10 s NameServer 会自动扫描所有的broker, 如果有broker失效,那么会从地址列表里将其剔除掉。

    三、NameServer 架构分析

    下面是 rocketmq 的部署图

    核心原理解析                                

             Broker消息服务器启动时会自动向NameServer 注册信息,消息生产者在发送消息时,会在NameServer的地址列表里通过负载均衡选择一个Broker进行消息发送。 NameServer 与每台broker保持长连接,broker会每隔30s向NameServer发送一个心跳包,NameServer每间隔10s查看broker是否存活,如果broker挂掉了,判断挂掉的逻辑是brokerLiveTable检测上次的心跳包与当前系统时间的时间差,如果时间戳大于120s, 那么就将broker从服务地址列表里剔除。

            这样设计的目的是降低NameServer 的复杂性, 在消息发送端提供容错机制来保证消息发送的高可用性。

            NameServer  可以通过集群来保证高可用性,但在同一时刻有可能获取到数据是不一致的,因为不提供同步机制,但能够保证多个节点的最终一致性。NameServer 这样设计是为了简单高效。

    四、NameServer 工程目录解析

    工程目录结构以及解析如下: 

    namesrv
    ├─ NamesrvController.java // 执行初始化逻辑,加载配置、注册Processor等
    ├─ NamesrvStartup.java // NameServer的启动类, 启动netty server
    ├─ kvconfig
    │    ├─ KVConfigManager.java // namespace和config配置管理
    │    └─ KVConfigSerializeWrapper.java // 将获取到的配置json序列化
    ├─ processor
    │    ├─ ClusterTestRequestProcessor.java //处理请求类型。
    │    └─ DefaultRequestProcessor.java  // 默认地请求处理器, 处理数据包
    └─ routeinfo
           ├─ BrokerHousekeepingService.java // netty 的channel共用方法抽象
           └─ RouteInfoManager.java   // 路由管理器,维护topic, broker, 
    //clusterName, brokerAddr等信息

            分析发现netty 是rocketmq 网络通信的核心,掌握netty 的常见用法是非常有必要的。

    五、NameServer 启动流程分析

    1)  创建NameSrvController

            加载 namesrvConfig 和 nettyServerConfig, 如果有手动配置也可以生效, 使用option类封装参数,在程序运行前添加配置Program arguments, 添加的格式: 例如 -c , -p 等。

        public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
       ....
        final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
    
            // remember all configs to prevent discard
            controller.getConfiguration().registerConfig(properties);
    
            return controller;
    }

    NameSrv的配置存放在 user.home\namesrv\ 目录下:

    2) 执行initialize()加载需要的配置

            NamesrvController 在执行start()方法前需要做一些准备工作,比如加载配置、创建Netty Server实例、注册请求处理器、扫描所有的失联的broker等

        具体的解释如下注释:

        public boolean initialize() {
           // 加载k,v 相关配置,含自定义配置。
            this.kvConfigManager.load();
            // 启动netty server, 管理channel
            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
            //  初始化netty 线程池
            this.remotingExecutor =
                Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
            //  注册netty 请求Handler, 可以通过NettyRequestProcessor接口找到其实现类
            this.registerProcessor();
            // 与broker建立长连接,扫描所有的broker
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    NamesrvController.this.routeInfoManager.scanNotActiveBroker();
                }
            }, 5, 10, TimeUnit.SECONDS);
           // 打印所有的config
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    NamesrvController.this.kvConfigManager.printAllPeriodically();
                }
            }, 1, 10, TimeUnit.MINUTES);
             // 监听文件里的配置是否修改
            if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
                // Register a listener to reload SslContext
                try {
                    fileWatchService = new FileWatchService(
                        new String[] {
                            TlsSystemConfig.tlsServerCertPath,
                            TlsSystemConfig.tlsServerKeyPath,
                            TlsSystemConfig.tlsServerTrustCertPath
                        },
                        new FileWatchService.Listener() {
                            boolean certChanged, keyChanged = false;
                            @Override
                            public void onChanged(String path) {
                                if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                    log.info("The trust certificate changed, reload the ssl context");
                                    reloadServerSslContext();
                                }
                                if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                    certChanged = true;
                                }
                                if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                    keyChanged = true;
                                }
                                if (certChanged && keyChanged) {
                                    log.info("The certificate and private key changed, reload the ssl context");
                                    certChanged = keyChanged = false;
                                    reloadServerSslContext();
                                }
                            }
                            private void reloadServerSslContext() {
                                ((NettyRemotingServer) remotingServer).loadSslContext();
                            }
                        });
                } catch (Exception e) {
                    log.warn("FileWatchService created error, can't load the certificate dynamically");
                }
            }
    
            return true;
        }

           如果initialize()方法返回false, 那么需要检查一些相关配置是否正确, 返回true后,就可以执行最后一步controller.start()方法, 该方法表示NameServer正式启动。

    3) 启动server

          接下来看下源代码分析start()方法做了哪些事

      public void start() throws Exception {
         // 1. 启动netty server
            this.remotingServer.start();
          // 2. 启动文件扫描线程,监听核心配置是否修改。
            if (this.fileWatchService != null) {
                this.fileWatchService.start();
            }
        }
    

            可以通过debug发现,首先会进入到NettyRemotingServer类里的start()方法, 该方法实现了nettyServer, 初始化netty的线程组和实例化 ServerBootStrap。

     然后开启一个线程执行FileWatchService 的run()方法:

    通过此线程扫描配置文件是否被修改。

    NameServer启动成功后,会在控制台打印 boot success的字样。

    六、NameServer核心源码解析

    1. 路由注册

    1)  broker向NameServer 发送心跳包

            找到brokerController的start()方法里,broker 通过 BrokerController.this.registerBrokerAll(true,false) 方法来向NameServer 发送心跳包,其中使用定时任务 sheduledExecutorService 线程池定时发送,每隔30s 发送一次, brokerConfig.getRegisterNameServerPeriod() 的默认值为30s。

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                    } catch (Throwable e) {
                        log.error("registerBrokerAll Exception", e);
                    }
                }
            }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
    

     然后进入到doRegisterBrokerAll()方法,找到BrokerOuterApi里的registerBrokerAll()方法, 用RegiterBrokerRequestHeader类封装broker相关的信息, RegiterBrokerRequestHeader 主要属性如下:

    • brokerName:  broker名称。
    • brokerAddr:  broker的地址。
    • cluterName: broker所在集群的名称。
    • haServerAddr: 集群master的地址。
    • brokerId:   brokerId为0的时候表示该broker为master, 如果大于0,表示该broker为slave。

    brokerId=0为master节点的配置在MixALL配置中:

    然后会调用到registerBrokerAll() 方法, 最终会将该broker信息注册到所有的NameServer上。

      public List<RegisterBrokerResult> registerBrokerAll(
            final String clusterName,
            final String brokerAddr,
            final String brokerName,
            final long brokerId,
            final String haServerAddr,
            final TopicConfigSerializeWrapper topicConfigWrapper,
            final List<String> filterServerList,
            final boolean oneway,
            final int timeoutMills,
            final boolean compressed) {
     
            final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
            List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
            if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
             // 封装broker信息
                final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
                requestHeader.setBrokerAddr(brokerAddr);
                requestHeader.setBrokerId(brokerId);
                requestHeader.setBrokerName(brokerName);
                requestHeader.setClusterName(clusterName);
                requestHeader.setHaServerAddr(haServerAddr);
                requestHeader.setCompressed(compressed);
    
                RegisterBrokerBody requestBody = new RegisterBrokerBody();
                requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
                requestBody.setFilterServerList(filterServerList);
                final byte[] body = requestBody.encode(compressed);
                final int bodyCrc32 = UtilAll.crc32(body);
                requestHeader.setBodyCrc32(bodyCrc32);
        // 等待所有的NameServer都含有broker信息后,才表示执行完毕。
                final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
           // 把该broker的信息注册到所有的NameServer上。
                for (final String namesrvAddr : nameServerAddressList) {
                    brokerOuterExecutor.execute(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                                if (result != null) {
                                    registerBrokerResultList.add(result);
                                }
    
                                log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                            } catch (Exception e) {
                                log.warn("registerBroker Exception, {}", namesrvAddr, e);
                            } finally {
                                countDownLatch.countDown();
                            }
                        }
                    });
                }
    
                try { 
               // 默认超时时间为6s, 在BrokerConfig里配有registerBrokerTimeoutMills=6000
                    countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                }
            }
    
            return registerBrokerResultList;
        }

            而broker相关信息是通过netty 发送给NameServer, broker信息的请求注册方式有oneway 和同步和异步,默认发送注册请求的方式是同步的。

            可以在BrokerOuterAPI类里的registerBroker(final String namesrvAddr, final boolean oneway, final int timeoutMills, final RegisterBrokerRequestHeader requestHeader, final byte[] body) 里找到通过同步的方式发送注册请求,同步的注册方式如下:

     2)  NameServer 处理心跳包

             NameServer接收到broker发送过来的请求后,首先会在DefaultRequestProcessor 网络处理器解析请求类型,请求类型如果为RequestCode.REGISTER_BROKER, 则最终的请求会到RouteInfoManager里的registerBroker()方法。

     public RemotingCommand registerBroker(ChannelHandlerContext ctx,
                                              RemotingCommand request) throws RemotingCommandException {
            final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
            final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
            final RegisterBrokerRequestHeader requestHeader =
                    (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
    
            if (!checksum(ctx, request, requestHeader)) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("crc32 not match");
                return response;
            }
            // 解析数据包
            TopicConfigSerializeWrapper topicConfigWrapper;
            if (request.getBody() != null) {
                topicConfigWrapper = TopicConfigSerializeWrapper.decode(request.getBody(), TopicConfigSerializeWrapper.class);
            } else {
                topicConfigWrapper = new TopicConfigSerializeWrapper();
                topicConfigWrapper.getDataVersion().setCounter(new AtomicLong(0));
                topicConfigWrapper.getDataVersion().setTimestamp(0);
            }
            // 用RouteInfoManager 注册broker
            RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
                    requestHeader.getClusterName(),
                    requestHeader.getBrokerAddr(),
                    requestHeader.getBrokerName(),
                    requestHeader.getBrokerId(),
                    requestHeader.getHaServerAddr(),
                    topicConfigWrapper,
                    null,
                    ctx.channel()
            );
            // 响应broker
            responseHeader.setHaServerAddr(result.getHaServerAddr());
            responseHeader.setMasterAddr(result.getMasterAddr());
    
            byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
            response.setBody(jsonValue);
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return response;
        }
    

            RouteInfoManager 里的registerBroker方法将broker的信息最终添加到 clusterAddrTable、brokerAddrTable、brokerLiveTable、filterServerTable里。

            画了一下broker的在NameSrv中的注册流程图

    2. 路由删除

            RouteInfoManager 的scanNotActiveBroker ()方法, 采用了单线程定时线程池每隔10s扫描所有broker的策略, 该方法在NamesrvController里的initialize()方法里, newSingleThreadScheduledExecutor线程池里只有一个线程实例,利用此线程池能极大地减少系统资源地开销,因为扫描broker本身不需要过多的资源,开启一个线程足以。

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    NamesrvController.this.routeInfoManager.scanNotActiveBroker();
                }
            }, 5, 10, TimeUnit.SECONDS);

            NameServer是如何判定broker失效的呢?

            继续跟着源码进入到scanNotActiveBroker()方法, 判定失效的逻辑是: 如果当前时间戳- 上一次更新的时间戳> 120s。那么判断该broker是失效的。 BROKER_CHANNEL_EXPIRED_TIME默认值为120s。因为broker每隔30s会给NameServer发送一次心跳信息,因此此方式可以判定broker是否失效。

       public void scanNotActiveBroker() {
            Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry<String, BrokerLiveInfo> next = it.next();
                long last = next.getValue().getLastUpdateTimestamp();
                if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
                    RemotingUtil.closeChannel(next.getValue().getChannel());
                    it.remove();
                    log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
                    this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
                }
            }
        }

            接着会将broker相关的信息从brokerLiveTable中移除掉,同时销毁掉netty对应的channel。brokerLiveTable是一个hashmap,归RouteInfoManager类持有。

    3. 路由发现

            RocketMQ的路由发现是非实时的,当Topic路由发生变化时,NameServer不主动推送给客户端,而是由客户端定时拉取主题最新的路由。根据主题拉取最新路由的编码为: GET_ROUTEINFO_BY_TOPIC。

            我们可以找到DefaultRequestProcessor处理器里的processRequest()方法,该方法用来处理Netty请求, 该方法有个判断,当request里的code为GET_ROUTEINFO_BY_TOPIC时,执行this.getRouteInfoByTopic(ctx, request)方法。

       @Override
        public RemotingCommand processRequest(ChannelHandlerContext ctx,
                                              RemotingCommand request) throws RemotingCommandException {
    
            if (ctx != null) {
                log.debug("receive request, {} {} {}",
                        request.getCode(),
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        request);
            }
           // 根据请求代码code来判断业务逻辑
    
            switch (request.getCode()) {
                case RequestCode.PUT_KV_CONFIG:
                    return this.putKVConfig(ctx, request);
                case RequestCode.GET_KV_CONFIG:
                    return this.getKVConfig(ctx, request);
                case RequestCode.DELETE_KV_CONFIG:
                    return this.deleteKVConfig(ctx, request);
                case RequestCode.QUERY_DATA_VERSION:
                    return queryBrokerTopicConfig(ctx, request);
                // 注册broker
                case RequestCode.REGISTER_BROKER:
                    Version brokerVersion = MQVersion.value2Version(request.getVersion());
                    if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                        return this.registerBrokerWithFilterServer(ctx, request);
                    } else {
                        return this.registerBroker(ctx, request);
                    }
                    // 移除broker
                case RequestCode.UNREGISTER_BROKER:
                    return this.unregisterBroker(ctx, request);
                // 根据topic获取路由
                case RequestCode.GET_ROUTEINFO_BY_TOPIC:
                    return this.getRouteInfoByTopic(ctx, request);
                case RequestCode.GET_BROKER_CLUSTER_INFO:
                    return this.getBrokerClusterInfo(ctx, request);
                case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
                    return this.wipeWritePermOfBroker(ctx, request);
                case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
                    return getAllTopicListFromNameserver(ctx, request);
                case RequestCode.DELETE_TOPIC_IN_NAMESRV:
                    return deleteTopicInNamesrv(ctx, request);
                case RequestCode.GET_KVLIST_BY_NAMESPACE:
                    return this.getKVListByNamespace(ctx, request);
                case RequestCode.GET_TOPICS_BY_CLUSTER:
                    return this.getTopicsByCluster(ctx, request);
                case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
                    return this.getSystemTopicListFromNs(ctx, request);
                case RequestCode.GET_UNIT_TOPIC_LIST:
                    return this.getUnitTopicList(ctx, request);
                case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
                    return this.getHasUnitSubTopicList(ctx, request);
                case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
                    return this.getHasUnitSubUnUnitTopicList(ctx, request);
                case RequestCode.UPDATE_NAMESRV_CONFIG:
                    return this.updateConfig(ctx, request);
                case RequestCode.GET_NAMESRV_CONFIG:
                    return this.getConfig(ctx, request);
                default:
                    break;
            }
            return null;
        }

     然后DefaultRequestProceesor类里的getRouteInfoByTopic(ctx,request)方法里主要做了两件事:

    1) 根据topic从RouteInfoManager的topicQueueTable里获取到所有的QueueData和BrokerData, 然后将他们设置到topicRouteData里返回出去。

    2) 判断指定的topic是否是顺序消息的topic,如果是那么给返回配置顺序消息的路由, 即给setOrderTopicConf赋值。

      public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
                                                   RemotingCommand request) throws RemotingCommandException {
            final RemotingCommand response = RemotingCommand.createResponseCommand(null);
            final GetRouteInfoRequestHeader requestHeader =
                    (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
            // 1. 获取到topicRouteData,包含topic所有的QueueData和BrokerData
            TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
    
            if (topicRouteData != null) {
                if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
                    // 2. 判断是否是顺序消息的topic, 如果是顺序消息,那么给该请求返回顺序消息的路由
                    String orderTopicConf =
                            this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                                    requestHeader.getTopic());
                    topicRouteData.setOrderTopicConf(orderTopicConf);
                }
    
                byte[] content = topicRouteData.encode();
                response.setBody(content);
                response.setCode(ResponseCode.SUCCESS);
                response.setRemark(null);
                return response;
            }
    
            response.setCode(ResponseCode.TOPIC_NOT_EXIST);
            response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
                    + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
            return response;
        }

    小结

    1. NameServer相当于rocketmq的注册中心,能够维护并实时监听broker的地址信息和队列信息等。

    2. NameServer和broker之间是基于netty通信的。

    3. DefaultRequestProcessor的 getRouteInfoByTopic(ChannelHandlerContext ctx,RemotingCommand request)方法是根据topc获取到路由信息(包含topic对应的所有queue和所有的broker)、registBroker() 方法将broker信息注册到NameServer上、unregisterBroker()方法移除NameServer上的broker信息。

    4. RouteInfoManager 类管理了所有的broker、cluster集群、topicQueue主题队列以及broker存活的信息。

    展开全文
  • RocketMQ NameServer

    2021-03-27 15:08:33
    前面我们已经简单地介绍了 RocketMQ 的整体设计思路,本文着重其中 NameServer 部分的实现细节。 NameServer 本节主要介绍RocketMQ路由管理、服务注册及服务发现的机制,NameServer是整个RocketMQ的“大脑”。相信...

    引言

    前面我们已经简单地介绍了 RocketMQ 的整体设计思路,本文着重其中 NameServer 部分的实现细节。更多相关文章和其他文章均收录于贝贝猫的文章目录

    NameServer

    本节主要介绍RocketMQ路由管理、服务注册及服务发现的机制,NameServer是整个RocketMQ的“大脑”。相信大家对“服务发现”这个词语并不陌生,分布式服务SOA架构体系中会有服务注册中心,分布式服务SOA的注册中心主要提供服务调用的解析服务,指引服务调用方(消费者)找到“远方”的服务提供者,完成网络通信,那么RocketMQ的路由中心存储的是什么数据呢?作为一款高性能的消息中间件,如何避免NameServer的单点故障,提供高可用性呢?

    Broker消息服务器在启动时向所有NameServer注册,消息生产者(Producer)在发送消息之前先从NameServer获取Broker服务器地址列表,然后根据负载算法从列表中选择一台消息服务器进行消息发送。NameServer与每台Broker服务器保持长连接,并间隔30s检测Broker是否存活,如果检测到Broker宕机,则从路由注册表中将其移除。但是路由变化不会马上通知消息生产者,为什么要这样设计呢?这是为了降低NameServer实现的复杂性,在消息发送端提供容错机制来保证消息发送的高可用性。

    NameServer本身的高可用可通过部署多台NameServer服务器来实现,但彼此之间互不通信,也就是NameServer服务器之间在某一时刻的数据并不会完全相同,但这对消息发送不会造成任何影响,这也是RocketMQ NameServer设计的一个亮点,RocketMQ NameServer设计追求简单高效。

    存储内容

    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    
    • topicQueueTable:Topic消息队列路由信息,消息发送时根据路由表进行负载均衡。
    • brokerAddrTable:Broker基础信息,包含brokerName、所属集群名称、主备Broker地址。
    • clusterAddrTable:Broker集群信息,存储集群中所有Broker名称。
    • brokerLiveTable:Broker状态信息,NameServer每次收到心跳包时会替换该信息。
    • filterServerTable:Broker上的FilterServer列表,用于类模式消息过滤。

    QueueData、BrokerData、BrokerLiveInfo类图如下图所示。
    meta-data
    RocketMQ 2主2从部署图如下所示。
    2m2s-deploy
    对应运行时数据结构如下图所示。
    memory-1
    memory-2

    路由注册

    RocketMQ路由注册是通过Broker与NameServer的心跳功能实现的。Broker启动时向集群中所有的NameServer发送心跳语句,每隔30s向集群中所有NameServer发送心跳包,NameServer收到Broker心跳包时会更新brokerLiveTable缓存中BrokerLiveInfo的lastUpdateTimestamp,然后NameServer每隔10s扫描brokerLiveTable,如果连续120s没有收到心跳包,NameServer将移除该Broker的路由信息同时关闭Socket连接。

    心跳包

    • brokerAddr:broker地址。
    • brokerId:brokerId,O:Master;大于0:Slave。
    • brokerName:broker名称。
    • clusterName:集群名称。
    • haServerAddr:master地址,初次请求时该值为空,slave向NameServer注册后返回其MasterAddr。
    • requestBody:
      • filterServerList:消息过滤服务器列表。
      • topicConfigWrapper:主题配置。

    从心跳包内容我们会发现,每次心跳包中都会包含所有的topic信息,如果一个broker上topic非常多的话,心跳包就会比较大,如果正好赶上网络不好的时候,可能就会导致broker下线。

    NameServer与Broker保持长连接,Broker状态存储在brokerLiveTable中,NameServer每收到一个心跳包,将更新brokerLiveTable中关于Broker的状态信息以及路由表(topicQueueTable、 brokerAddrTable、 brokerLiveTable、 filterServerTable)。

    路由删除

    Broker每隔30s向NameServer发送一个心跳包,心跳包中包含BrokerId、Broker地址、Broker名称、Broker所属集群名称、Broker关联的FilterServer列表。但是如果Broker宕机,NameServer无法收到心跳包,此时NameServer如何来剔除这些失效的Broker呢? NameServer会每隔1Os扫描brokerLiveTable状态表,如果BrokerLive的lastUpdateTimestamp的时间戳距当前时间超过120s,则认为Broker失效,移除该Broker, 关闭与Broker连接,并同时更新topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。

    RocketMQ有两个触发点来触发路由删除:

    1. NameServer定时扫描brokerLiveTable检测上次心跳包与当前系统时间的时间差,如果时间戳大于120s,则需要移除该Broker信息。
    2. Broker在正常被关闭的情况下,会执行unRegisterBroker指令,主动删除NameServer中关于自己的信息。

    路由发现

    RocketMQ路由发现是非实时的,当Topic路由出现变化后,NameServer不主动推送给客户端,而是由客户端定时拉取主题最新的路由。

    工作示意图

    name-server-work

    参考内容

    [1]《RocketMQ技术内幕》
    [2]《RocketMQ实战与原理解析》
    [3] 老生常谈——利用消息队列处理分布式事务
    [4] RocketMQ架构解析
    [5] MappedByteBuffer VS FileChannel 孰强孰弱?
    [6] 文件 IO 操作的一些最佳实践
    [7] 海量数据处理之Bloom Filter详解
    [8] rocketmq GitHub Wiki

    stun

    展开全文
  • NameServer架构

    2019-06-12 22:28:25
    1.架构设计 Broker 消息服务器在启动时向所有Name Server 注册,消息生产者(Producer)...NameServer 与每台Broker服务器保持长连接,并间隔30s 检 测Broker 是否存活,如果检测到Broker 宕机, 则从路由注册表中...

    1.架构设计

    Broker 消息服务器在启动时向所有Name Server 注册,消息生产者(Producer)在发送消 息之前先从Name Server获取Broker 服务器地址列表,然后根据负载算法从列表中选择一 台消息服务器进行消息发送。NameServer 与每台Broker服务器保持长连接,并间隔30s 检 测Broker 是否存活,如果检测到Broker 宕机, 则从路由注册表中将其移除。但是路由变化不会马上通知消息生产者,这样降低NameServer 实现的复杂性,在消息发送端提供容错机制来保证消息发送的高可用性。

    NameServer 本身的高可用可通过部署多台Namesrver 服务器来实现,但彼此之间互不通信,也就是NameServer服务器之间在某一时刻的数据并不会完全相同,但这对消息发送不会造成任何影响,这也是RocketMQ NameServer 设计的一个亮点,RocketMQNameServer 设计追求简单高效。

    2.NameServer的路由注册和故障剔除

    RocketMQ 基于订阅发布机制, 一个Topic 拥有多个消息队列,一个Broker 为每一主题默
    认创建4 个读队列4 个写队列。多个Broker 组成一个集群, BrokerName 由相同的多台Broker
    组成Master-Slave 架构, brokerId 为0 代表Master , 大于0 表示Slave 。BrokerLivelnfo 中的
    lastUpdateTimestamp 存储上次收到Broker 心跳包的时间。

    2.1 路由注册

    RocketMQ 路由注册是通过Broker 与Name Server 的心跳功能实现的。Broker 启动时
    向集群中所有的NameServer 发送心跳语句,每隔3 0s 向集群中所有NameServer 发送心
    跳包, NameServer 收到Broker 心跳包时会更新brokerLiveTable 缓存中BrokerLivelnfo 的
    lastUpdateTimestamp ,然后NameServer 每隔10s 扫描brokerLiveTable ,如果连续120s 没有收到心跳包, NameServer将移除该Broker 的路由信息同时关闭Socket 连接。

    设计亮点

    NameServe 与Broker 保持长连接,Broker状态存储在brokerLiveTable 中,NameS erver 每收到一个心跳包,将更新brokerLiveTable 中关于Broker 的状态信息以及路由表(topicQueueTable 、brokerAddrTable 、brokerLiveTable 、filterServerTable )。更新上述路由表( HashTable )使用了锁粒度较少的读写锁,允许多个消息发送者(Producer )并发读,保证消息发送时的高并发。但同一时刻NameServer 只处理一个Broker 心跳包,多个心跳包请求串行执行。

    2.2路由删除

    (1)NameServer 每隔10s 扫描brokerLiveTable ,如果连续120s 没有收到心跳包, NameServer将移除该Broker 的路由信息同时关闭Socket 连接。
    (2)Broker 在正常被关闭的情况下,会执行unregisterBroker 指令。

    2.3路由发现

    RocketMQ 路由发现是非实时的,当Topic 路由出现变化后, NameServer 不主动推送给客户端, 而是由客户端定时拉取主题最新的路由。根据主题名称拉取路由信息的命令编码为: GET_ROUTEINTO_BY_TOPIC.客户端默认会每30秒更新路由信息,每30秒向NameServer发送心跳包。

    总结

    展开全文
  • RocketMQ 中还有一个比较关键但是我们平时很容易忽略的组件——NameServer。 在日常的使用中,我们接触的最多的还是 Producer 和 Consumer,而 NameServer 没有直接跟我们有交互。就像 Kafka 集群背后用于其集群元...

    RocketMQ 中还有一个比较关键但是我们平时很容易忽略的组件——NameServer

    在日常的使用中,我们接触的最多的还是 Producer 和 Consumer,而 NameServer 没有直接跟我们有交互。就像 Kafka 集群背后用于其集群元数据管理的 Zookeeper 集群一样,NameServer 也在背后支撑着 RocketMQ 正常工作。

    你给翻译翻译,什么叫 NameServer

    NameServer 你可以简单的把它理解成注册中心

    Broker 启动的时候会将自己注册到 NameServer 中,注册的同时还会将 Broker 的 IP 地址、端口相关的数据,以及保存在 Broker 中的 RocketMQ 集群路由的数据一并跟随心跳发送到 NameServer。这里的路由信息是指 Topic 下的 MessageQueue 分别都在哪台 Broker 上。

    而 Producer 则会从 NameServer 中获取元数据,从而将 Message 发到对应的 Broker 中去。

    相应的,Consumer 也需要从 NameServer 中获取数据。平常我们配置消费者,里面重要的信息主要就两个,分别是你要消费的 Topic 和当前的 Consumer Group。根据配置,Consumer 会去 NameServer 获取对应的 Topic 都有哪些 Broker,其真实的 IP 地址和端口是多少,拿到了这个之后就可以开始进行消息消费了。

    注册 Broker 都做了什么

    这里我们先通过注册 Broker 的源码来预热一下,为后面阅读整个部分的源码做准备,直接上代码。

    首先这里做了一个对 Broker 版本的区分,不同的版本采用不同的处理方式,鉴于官网现在最新的版本都已经到了 4.9.0 了,就暂时先不考虑低版本的情况了,后面有时间再讨论。

    只有向上面那种几行的代码会给大家贴出来,其余的代码我会尽量用流程图代替

    校验 Body 的完整性

    首先是校验 Broker 传过来的数据的完整性。很简单的一个判断,将 Broker 传过来的 Body 用 CRC32算法 加密之后,和请求中 Header 中所带的由 Broker 加密的值进行对比,不同的话就说明数据的完整性出了问题,接下来需要中断注册流程。

    解析Body

    这里分成两种情况:

    • Body为空
    • Body不为空

    如果 Body 为空,则会将当前要注册的 Broker 的 DataVersion 给重置;

    而 **Body 不为空 **则会进行对 Body 进行解析,主要是从中解析出 DataVersion ,代表 Broker 中的数据版本。其次解析出这个 Broker 中存储的所有 Topic 及其相关的配置。

    执行注册逻辑

    这里就是注册的核心逻辑了,这里为了更加容易理解,我们来分情况讨论,就不把两种情况揉在一起了。

    • 首次注册
    • 非首次注册

    维护集群中 Broker 的 Name

    在整个操作开始之前,会先给 RouteInfoManager 加一把锁,这个 RouteInfoManager 里面就是 NameServer 存储数据的地方。这个锁是个读写锁,使用的是 Java 中的 ReentrantReadWriteLock

    这里的 BrokerName 是在 RocketMQ 配置文件中配置的变量。就是用于标识一个 Broker 的名字,但我们知道 Broker 是有主从架构的,并且 RocketMQ 4.5 之后推出的 Dleger 可以实现一主多从,换句话说,一个 Broker Name 可能会对应多个 Broker 实例。

    在 MQ 看来,Broker 是多实例部署的;而在 Producer 或者 Consumer 来看,Broker就只有一个。所以,这个步骤内所维护的就是在当前集群中,有多少个这样的 Broker Name。

    维护 Broker 的数据

    然后,RocketMQ 会在 brokerAddrTable 中维护每个 Broker 的核心数据,包含:

    • Broker 所处的集群
    • Broker 的名字(上面刚刚讨论过)
    • 所有 Broker 的 BrokerID 和 Address 的对应关系,是个 Map,Address 为 IP+端口

    同一个 Broker Name 下,为什么会有多个地址信息已经在上个步骤解答过,不在此赘述。

    Broker 的数据维护主要有两个方面:

    • 该 Broker 数据在 brokerAddrTable 中是否存在
    • brokerAddrTable 中维护的数据不能有重复的地址信息

    第一个过于基础简单,就不再赘述。我们重点看第二个点,我们知道会有多个 Broker 地址,存在一个 Map 中,因为 Broker 是基于主从架构。那不知道你有没有想过,NameServer 如何区分  和  的呢?

    答案是通过 Map 的 Key,如果是 0 则代表是 Master 节点,1 则代表 Slave 节点,因为 RocketMQ 自己实现的 Broker 主从架构是一主一从,而一主多从则是由 RocketMQ 4.5 之后加入的 Dleger 实现的,暂时先不讨论。区分的逻辑如下图:

    那什么时候会出现重复呢?

    答案是主从切换

    举个例子,假设某个 Slave Broker 的 Address 为 192.168.1.101:8081 ,且已经注册。此时brokerAddrs 中已经有一个key: 1 value: 192.168.1.101:8081 记录了。

    当集群中的 Master 宕机之后,会进行故障恢复,假设选中了上面这个 Broker 为新的 Master,在进行注册的时候会发现,brokerAddrs 中已经有一个同样的 Address 了,只是 Key 不同。但是由于它们从本质上来说就是同一台机器,如果不将 key 为1,也就是角色为 Slave 的记录去掉,就会造成数据一致性的问题。

    简单总结一下来说,同一个 Adreess,在 brokerAddrs 中只能存在一个。感兴趣的可以看一下源码,其实跟上面文字描述的逻辑是一样的。

    去除了重复的 Address 数据之后,就会将本次注册的 Broker 的数据注册进 brokerAddrs 中。

    维护 MessageQueue 的数据

    这里主要是根据 Broker 的数据更新其 MessageQueue 相关的数据。接下来,我们详细解析一下 Message Queue 的维护流程,同样会给出源码和流程图,两部分等价,可选择性观看。

    当 Master 节点来注册时,如果是首次注册或者数据有更新,便会调用一个方法createAndUpdateQueueData去维护 MessageQueue 相关的数据。这里对数据是否更新的判断,是基于 DataVersion 的,代表 Broker 数据的版本。

    此后通过 Topic 的 Name 拿到对应的 MessageQueue 的列表,这里可能会有点疑问,一个 Topic 难道不应该只有一个对 MessageQueue 相关的配置吗,为什么这里拿到的是个列表?

    小了,格局小了

    Topic 是个逻辑上的概念,一个 Topic 的 MessageQueue 会分布在不同的 Broker 上,所有这里是个列表。

    更新的流程如上图,拿到了 MessageQueue 的列表之后,会和本次注册的 Broker 中的 MessageQueue 数据做一个对比,如果发现不同就进行全量的替换,没什么其他的复杂对比逻辑。源码等同上图,感兴趣的可以自行查看。

    维护 Broker 的存活信息

    到这里,MessageQueue 相关的逻辑就处理完了,接下来 NameServer 会再去更新 brokerLiveTable 中的数据,这里存放了当前正在活跃的所有 Broker。这块的作用后续会讲。

    NameServer 启动流程

    上面通过了解注册 Broker的整个流程,对整个 NameServer 的架构有了个大概的了解,接下来再从整体视角来看一下 NameServer。

    NameServer的主要流程

    整体的流程上面这张图已经给出来了,就不放源码了,意义不大。

    这里说一下扫描不再活跃的Broker,这个后台线程会每 10秒 钟执行一次,这里会对上文提到的 brokerLiveTable 进行遍历处理,因为这里面维护了所有的正在活跃的 Broker。

    如果某个 Broker 超过了 120秒 没有发送心跳给 NameServer,就会将其从 brokerLiveTable 中移除。

    NameServer 可处理的操作

    上面简单了解了 注册 Broker 的流程,实际上 NameServer 还支持很多其他的操作,这里就不再这里列出来了,看了没有意义,感兴趣的可以自己去网上找,一大堆的资料。而且 Register Broker 这个操作中所涉及到源码中的数据结构,其他的操作都会用到,所以了解了 Register Broker 之后,再去阅读其他操作的源码会非常的顺。

    作者:SH的全栈笔记

    链接:https://my.oschina.net/leonsh/blog/5127540

    展开全文
  • RocketMQ之NameServer

    2020-04-10 21:24:09
    引言 消息生产者怎么得知消息会发往哪台服务器?...每个NameServer都会和所有的Broker连接,NameServer之间互不相连,同一时刻NameServer之间的数据可能并不相同。 当Broker启动时,先向 所有NameServer注册,二...
  • Nameserver + Broker

    2020-01-11 13:52:07
    对于一个消息队列来说,系统由很多机器组成,每个机器...Nameserver 的存在就是为了解决这些问题,由Nameserver维护这些配置信息、状态信息、其它角色都通过Nameserver协同执行。 Nameserver的功能 Nameserver...
  • RocketMQ源码分析之NameServer

    万次阅读 2017-07-17 23:06:40
    1、RocketMQ组件概述 NameServer ...NameServer彼此之间不通信,每个Broker与集群内所有的Nameserver保持长连接。 2、源码分析NameServer 本文不对 NameServer 与 Broker、Producer 集群、Cons...
  • RocketMQ NameServer协调者

    2020-01-31 22:51:54
    从3.x之前使用Zookeeper,之后改成NameServer NameServer是整个集群的状态服务器 NameServer部署,相互独立(相当于热备份) 为什么不用Zookeeper,因为根据业务需要不需要主从选举(相对来说比较重量级),...
  • NameServer 源码分析

    2020-09-26 02:12:39
    NameServer 源码分析 流程图 1.NameServer 架构设计 Broker 消息服务器在启动时向所有 Name Server 注册,消息生产者(Producer)在发送消息之前先从 Name Server 获取 Broker 服务器地址列表,然后根据负载算法从...
  • nameserver 127.0.1.1 & 如何设置nameserver

    千次阅读 2020-05-04 23:23:50
    一、 resolv.conf 详解 ...二、nameserver 127.0.1.1是什么 ubuntu下有一个本地默认的dns服务叫做dnsmasq,它是由NetworkManager控制的 ps -ef | grep dnsmasq 结果为: nobody 2104 1017 0 22:05 ? 0...
  • centos5 caching-nameserver

    2018-07-07 09:11:41
    DNS搭建所需的caching-nameserver-9.3.6-20.P1.el5_8.6.x86_64.rpm
  • RocketMQ之NameServer详解

    2021-03-21 18:54:03
    文章目录前言一、NameServer的作用二、NameServer启动过程1.启动类NamesrvStartup2.读入数据总结 前言 RocketMQ是阿里巴巴开源的一个顶级项目,高性能加上几乎能做到零丢失率让它在越来越多的企业项目中运用,这篇...
  • 本章主要介绍 RocketMQ 路由管理 、 服务注册及服务发现的机制, NameServer 是整个RocketMQ 的“大脑” 。 本章重点内容如下 。 • NameServer 整体架构设计 • NameServer 动态路由发现与剔除机制 2.1 NameServer ...
  • 某些情况下我们希望程序通过自定义Nameserver去查询域名,而不希望通过操作系统给定的Nameserver,本文介绍如何在Golang中实现自定义Nameserver。 DNS解析过程 Golang中一般通过net.Resolver的LookupHost(ctx ...
  • Nameserver 在 RocketMQ 整体架构中所处的位置就相当于 ZooKeeper、Dubbo 服务化架构体系中的位置,即充当“注册中心”,在 RocketMQ 中路由信息主要是指主题(Topic)的队列信息,即一个 Topic 的队列分布在哪些 ...
  • nameserver主要是为消息生产者和消费者提供关于主题topic的路由信息。 路由元信息:RouteInfoManager 路由注册:通过Broker与NameServer的心跳功能实现。Broker启动向集群中所有的NameServer发送心跳,每隔30s向...
  • RocketMQ——NameServer和Broker

    千次阅读 2019-08-16 21:21:50
    RocketMQ——NameServer和Broker 文章目录RocketMQ——NameServer和BrokerNameServerNameServer功能为什么不用zookeeper?BrokerBroker消息存储Broker的HA NameServer NameServer功能 NameServer负责维护Producer和...
  • RocketMQ之NameServer源码浅析

    千次阅读 2020-08-14 10:44:47
    NameServer简介 NameServer是整个消息队列中的状态服务器,集群的各个组件通过它来了解全局的信息 。 同时,各个角色的机器都要定期向 NameServer上报自己的状态,超时不上报的话, NameServer 会认为某个机器出故障...
  • Broker消息服务器在启动时向所有NameServer注册,消息生产者(Producer)在发送消息之前先从NameServer获取Broker服务器地址列表,然后根据负载算法从列表中选择一台消息服务器进行消息发送。NameServer与每台Broker...
  • RocketMQ-NameServer分析

    2019-03-13 21:38:01
    Rocketmq中的NameServer主要负责两个工作,broker管理和路由管理。这篇文章主要是分析NameServer如何完成这两个工作的。 一、broker管理 broker会定时上报broker的基本信息以及主题信息给NameServerNameServer会将...
  • RocketMQ--NameServer启动

    2020-12-03 16:59:12
    概述 NameServer 可以说式 Broker 的注册中心,Broker 在启动的...当生产者需要向 Broker 发送消息的时候,就会先从 NameServer 里面获取 Broker 的地址列表,然后负载均衡,选择一台消息服务器进行发送 Name Server
  • RocketMQ中的路由中心NameServer

    千次阅读 2019-05-07 18:30:07
    NameServer集群间互不通信。 先简单了解一下这三种能力的实现方式,后面会有源码分析。 路由管理:通过配置文件灵活加载配置。 服务注册:Broker在启动时向所有的NameServer心跳语句,每隔30S向所有NameServer发起...
  • MQ NameServer模块划分

    2017-08-07 16:18:05
    NameServer做Broker的服务发现,即客户端可以通过NameServer拿到Broker的信息 Broker汇报数据到NameServer NameServer的模块划分 在进行NameServer的模块划分讨论前,先整理一下NameServer的功能: 做...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 52,809
精华内容 21,123
关键字:

nameserver