精华内容
下载资源
问答
  • dubbo服务引用

    2017-12-28 08:26:09
    dubbo服务引用
    1.dubbo的xml配置文件dubbo:reference 对应的就是ReferenceBean,它实现了FactoryBean,所以在初始化 实例时就会调用他的实现方法,从而开启服务引用
    public Object getObject() throws Exception {
            return get();
        }

    检查并且保存各种配置到Map<String, String> map,然后根据这些生成具体的代理 ref = createProxy(map)

    2.加载注册URL后开始暴露具体的refer, invoker = refprotocol.refer(interfaceClass, urls.get(0));

    // 通过注册中心配置拼装URL
                	List<URL> us = loadRegistries(false);
                	if (us != null && us.size() > 0) {
                    	for (URL u : us) {
                    	    URL monitorUrl = loadMonitor(u);
                            if (monitorUrl != null) {
                                map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                            }
                    	    urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        }
                	}
    同样经过ProtocolFilterWrapper和ProtocolListenerWrapper包装后,开始获取注册中心,这块和服务暴露一样,
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
            Registry registry = registryFactory.getRegistry(url);
            return doRefer(cluster, registry, type, url);
        }

    3.创建注册目录保存注册的信息,在zk中创建某个接口的短暂的consumer节点,重置当前注册目录的consumerUrl,在注册中心订阅并创建针对该接口的三个节点providers,configurators,routers并且设置监听器ChildListener监听他们以及子节点的变化,最后通过调用NotifyListener来触发相应的事件,因为注册目录也实现了该接口,所以最开始传进来的NotifyListener就是最开始创建的注册目录类RegistryDirectory

    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
            RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
            directory.setRegistry(registry);
            directory.setProtocol(protocol);
            URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
            if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
                    && url.getParameter(Constants.REGISTER_KEY, true)) {
                registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                        Constants.CHECK_KEY, String.valueOf(false)));
            }
            directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 
                    Constants.PROVIDERS_CATEGORY 
                    + "," + Constants.CONFIGURATORS_CATEGORY 
                    + "," + Constants.ROUTERS_CATEGORY));
            return cluster.join(directory);
        }
     public synchronized void notify(List<URL> urls) {
            List<URL> invokerUrls = new ArrayList<URL>();
            List<URL> routerUrls = new ArrayList<URL>();
            List<URL> configuratorUrls = new ArrayList<URL>();
            for (URL url : urls) {
                String protocol = url.getProtocol();
                String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                if (Constants.ROUTERS_CATEGORY.equals(category) 
                        || Constants.ROUTE_PROTOCOL.equals(protocol)) {
                    routerUrls.add(url);
                } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) 
                        || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
                    configuratorUrls.add(url);
                } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
                    invokerUrls.add(url);
                } else {
                    logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
                }
            }
            // configurators 
            if (configuratorUrls != null && configuratorUrls.size() >0 ){
                this.configurators = toConfigurators(configuratorUrls);
            }
            // routers
            if (routerUrls != null && routerUrls.size() >0 ){
                List<Router> routers = toRouters(routerUrls);
                if(routers != null){ // null - do nothing
                    setRouters(routers);
                }
            }
            List<Configurator> localConfigurators = this.configurators; // local reference
            // 合并override参数
            this.overrideDirectoryUrl = directoryUrl;
            if (localConfigurators != null && localConfigurators.size() > 0) {
                for (Configurator configurator : localConfigurators) {
                    this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
                }
            }
            // providers
            refreshInvoker(invokerUrls);
        }

    4.把URL信息转化成对应的键值对缓存,为了以后直接调用代理执行接口方法打下基础,将URL列表转成Invoker列表,换方法名映射Invoker列表

    /**
         * 根据invokerURL列表转换为invoker列表。转换规则如下:
         * 1.如果url已经被转换为invoker,则不在重新引用,直接从缓存中获取,注意如果url中任何一个参数变更也会重新引用
         * 2.如果传入的invoker列表不为空,则表示最新的invoker列表
         * 3.如果传入的invokerUrl列表是空,则表示只是下发的override规则或route规则,需要重新交叉对比,决定是否需要重新引用。
         * @param invokerUrls 传入的参数不能为null
         */
        private void refreshInvoker(List<URL> invokerUrls){
            if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
                    && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
                this.forbidden = true; // 禁止访问
                this.methodInvokerMap = null; // 置空列表
                destroyAllInvokers(); // 关闭所有Invoker
            } else {
                this.forbidden = false; // 允许访问
                Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
                if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null){
                    invokerUrls.addAll(this.cachedInvokerUrls);
                } else {
                    this.cachedInvokerUrls = new HashSet<URL>();
                    this.cachedInvokerUrls.addAll(invokerUrls);//缓存invokerUrls列表,便于交叉对比
                }
                if (invokerUrls.size() ==0 ){
                	return;
                }
                Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls) ;// 将URL列表转成Invoker列表
                Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 换方法名映射Invoker列表
                // state change
                //如果计算错误,则不进行处理.
                if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0 ){
                    logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :"+invokerUrls.size() + ", invoker.size :0. urls :"+invokerUrls.toString()));
                    return ;
                }
                this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
                this.urlInvokerMap = newUrlInvokerMap;
                try{
                    destroyUnusedInvokers(oldUrlInvokerMap,newUrlInvokerMap); // 关闭未使用的Invoker
                }catch (Exception e) {
                    logger.warn("destroyUnusedInvokers error. ", e);
                }
            }
        }
    这里会根据dubbo协议进行refer生成invoker返回保存起来invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);过滤器和监听器包装

    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
            // create rpc invoker.
            DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
            invokers.add(invoker);
            return invoker;
        }
    5.准备创建客户端连接,默认是共享连接 ,第一次没有的话会初始化ExchangeClient exchagneclient = initClient(url);获取网络通讯段netty,设置版本心跳编解码器,

    最后ExchangeClient client = Exchangers.connect(url ,requestHandler);这块的逻辑和服务暴露是类似的,只不过服务引用是connect,而服务暴露是bind,请求处理器进行

    包装,增加编解码,请求头处理

    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
            return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
        }
    通过NettyTransporter实例化NettyClient,然后就是和NettyServer类似的过程,doOpen();打开ClientBootstrap,连接暴露服务的服务端 doConnect();HeaderExchangeChannel,加心跳功能HeaderExchangeClient,最后用ReferenceCountExchangeClient包裹后缓存起来,连接获取完成之后,和接口类型,URL等,对应起来,保存成DubboInvoker返回并添加到invokers里保存.

    6.组建集群功能,用MockClusterWrapper包裹 FailoverCluster,UI后返回一个FailoverClusterInvoker,在方法调用是这块会实现负载均衡以及重试,创建服务代理proxyFactory.getProxy(invoker);一样的用JavassistProxyFactory生成具体的代理类

    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
            return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
        }

    展开全文
  • Dubbo服务引用

    2021-03-27 14:41:07
    引言 前面的文章中,我们已经详细介绍了服务暴露的相关细节,本文中,我们主要深入介绍服务引用的实现细节。 引用服务方式 在 Dubbo 中,我们可以通过两种方式引用远程服务。...服务直连的方式仅适合在...Dubbo 服务引用

    引言

    前面的文章中,我们已经详细介绍了服务暴露的相关细节,本文中,我们主要深入介绍服务引用的实现细节。更多相关文章和其他文章均收录于贝贝猫的文章目录

    引用服务方式

    在 Dubbo 中,我们可以通过两种方式引用远程服务。第一种是使用服务直连的方式引用服务,第二种方式是基于注册中心进行引用。服务直连的方式仅适合在调试或测试服务的场景下使用,不适合在线上环境使用。因此,接下来我将重点分析通过注册中心引用服务的过程。从注册中心中获取服务配置只是服务引用过程中的一环,除此之外,服务消费者还需要经历 Invoker 创建、代理类创建等步骤。

    Dubbo 服务引用的时机有两个,第一个是在 Spring 容器调用 ReferenceBean 的 afterPropertiesSet 方法时引用服务,第二个是在 ReferenceBean 对应的服务被注入到其他类中时引用。这两个引用服务的时机区别在于,第一个是饿汉式的,第二个是懒汉式的。默认情况下,Dubbo 使用懒汉式引用服务。下面我们按照 Dubbo 默认配置进行分析,整个分析过程从 ReferenceBean 的 getObject 方法开始。当我们的服务被注入到其他类中时,Spring 会第一时间调用 getObject 方法,并由该方法执行服务引用逻辑。按照惯例,在进行具体工作之前,需先进行配置检查与收集工作。接着根据收集到的信息决定服务用的方式,有三种,第一种是引用本地 (JVM) 服务,第二是通过直连方式引用远程服务,第三是通过注册中心引用远程服务。不管是哪种引用方式,最后都会得到一个 Invoker 实例。如果有多个注册中心,多个服务提供者,这个时候会得到一组 Invoker 实例,此时需要通过集群管理类 Cluster 将多个 Invoker 合并成一个实例。合并后的 Invoker 实例已经具备调用本地或远程服务的能力了,但并不能将此实例暴露给用户使用,这会对用户业务代码造成侵入。此时框架还需要通过代理工厂类 (ProxyFactory) 为服务接口生成代理类,并让代理类去调用 Invoker 逻辑。避免了 Dubbo 框架代码对业务代码的侵入,同时也让框架更容易使用。

    引用服务要从 createProxy 开始看起。从字面意思上来看,createProxy 似乎只是用于创建代理对象的。但实际上并非如此,该方法还会调用其他方法构建以及合并 Invoker 实例。具体细节如下。

    private T createProxy(Map<String, String> map) {
        URL tmpUrl = new URL("temp", "localhost", 0, map);
        final boolean isJvmRefer;
        if (isInjvm() == null) {
            // url 配置被指定,则不做本地引用
            if (url != null && url.length() > 0) {
                isJvmRefer = false;
            // 根据 url 的协议、scope 以及 injvm 等参数检测是否需要本地引用
            // 比如如果用户显式配置了 scope=local,此时 isInjvmRefer 返回 true
            } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
                isJvmRefer = true;
            } else {
                isJvmRefer = false;
            }
        } else {
            // 获取 injvm 配置值
            isJvmRefer = isInjvm().booleanValue();
        }
    
        // 本地引用
        if (isJvmRefer) {
            // 生成本地引用 URL,协议为 injvm
            URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
            // 调用 refer 方法构建 InjvmInvoker 实例
            invoker = refprotocol.refer(interfaceClass, url);
    
        // 远程引用
        } else {
            // url 不为空,表明用户可能想进行点对点调用
            if (url != null && url.length() > 0) {
                // 当需要配置多个 url 时,可用分号进行分割,这里会进行切分
                String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        URL url = URL.valueOf(u);
                        if (url.getPath() == null || url.getPath().length() == 0) {
                            // 设置接口全限定名为 url 路径
                            url = url.setPath(interfaceName);
                        }
    
                        // 检测 url 协议是否为 registry,若是,表明用户想使用指定的注册中心
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            // 将 map 转换为查询字符串,并作为 refer 参数的值添加到 url 中
                            urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        } else {
                            // 合并 url,移除服务提供者的一些配置(这些配置来源于用户配置的 url 属性),
                            // 比如线程池相关配置。并保留服务提供者的部分配置,比如版本,group,时间戳等
                            // 最后将合并后的配置设置为 url 查询字符串中。
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            } else {
                // 加载注册中心 url
                List<URL> us = loadRegistries(false);
                if (us != null && !us.isEmpty()) {
                    for (URL u : us) {
                        URL monitorUrl = loadMonitor(u);
                        if (monitorUrl != null) {
                            map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        // 添加 refer 参数到 url 中,并将 url 添加到 urls 中
                        urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    }
                }
    
                // 未配置注册中心,抛出异常
                if (urls.isEmpty()) {
                    throw new IllegalStateException("No such any registry to reference...");
                }
            }
    
            // 单个注册中心或服务提供者(服务直连,下同)
            if (urls.size() == 1) {
                // 调用 RegistryProtocol 的 refer 构建 Invoker 实例
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
    
            // 多个注册中心或多个服务提供者,或者两者混合
            } else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
    
                // 获取所有的 Invoker
                for (URL url : urls) {
                    // 通过 refprotocol 调用 refer 构建 Invoker,refprotocol 会在运行时
                    // 根据 url 协议头加载指定的 Protocol 实例,并调用实例的 refer 方法
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url;
                    }
                }
                if (registryURL != null) {
                    // 如果注册中心链接不为空,则将使用 AvailableCluster
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                    // 创建 StaticDirectory 实例,并由 Cluster 对多个 Invoker 进行合并
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                } else {
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }
    
        Boolean c = check;
        if (c == null && consumer != null) {
            c = consumer.isCheck();
        }
        if (c == null) {
            c = true;
        }
    
        // invoker 可用性检查
        if (c && !invoker.isAvailable()) {
            throw new IllegalStateException("No provider available for the service...");
        }
    
        // 生成代理类
        return (T) proxyFactory.getProxy(invoker);
    }
    

    上面代码很多,不过逻辑比较清晰。首先根据配置检查是否为本地调用,若是,则调用 InjvmProtocol 的 refer 方法生成 InjvmInvoker 实例。若不是,则读取直连配置项,或注册中心 url,并将读取到的 url 存储到 urls 中。然后根据 urls 元素数量进行后续操作。若 urls 元素数量为1,则直接通过 Protocol 自适应拓展类构建 Invoker 实例接口。若 urls 元素数量大于1,即存在多个注册中心或服务直连 url,此时先根据 url 构建 Invoker。然后再通过 Cluster 合并多个 Invoker,最后调用 ProxyFactory 生成代理类。

    Invoker 是 Dubbo 的核心模型,代表一个可执行体。在服务提供方,Invoker 用于调用服务提供类。在服务消费方,Invoker 用于执行远程调用。Invoker 是由 Protocol 实现类构建而来。

    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);
        // 创建 DubboInvoker
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }
    

    上面方法看起来比较简单,不过这里有一个调用需要我们注意一下,即 getClients。这个方法用于获取客户端实例,实例类型为 ExchangeClient。ExchangeClient 实际上并不具备通信能力,它需要基于更底层的客户端实例进行通信。比如 NettyClient、MinaClient 等,默认情况下,Dubbo 使用 NettyClient 进行通信。接下来,我们简单看一下 getClients 方法的逻辑。

    private ExchangeClient[] getClients(URL url) {
        // 是否共享连接
        boolean service_share_connect = false;
        // 获取连接数,默认为0,表示未配置
        int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
        // 如果未配置 connections,则共享连接
        if (connections == 0) {
            service_share_connect = true;
            connections = 1;
        }
    
        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (service_share_connect) {
                // 获取共享客户端
                clients[i] = getSharedClient(url);
            } else {
                // 初始化新的客户端
                clients[i] = initClient(url);
            }
        }
        return clients;
    }
    

    这里根据 connections 数量决定是获取共享客户端还是创建新的客户端实例,默认情况下,使用共享客户端实例。getSharedClient 方法中也会调用 initClient 方法,因此下面我们一起看一下这两个方法。

    private ExchangeClient getSharedClient(URL url) {
        String key = url.getAddress();
        // 获取带有“引用计数”功能的 ExchangeClient
        ReferenceCountExchangeClient client = referenceClientMap.get(key);
        if (client != null) {
            if (!client.isClosed()) {
                // 增加引用计数
                client.incrementAndGetCount();
                return client;
            } else {
                referenceClientMap.remove(key);
            }
        }
    
        locks.putIfAbsent(key, new Object());
        synchronized (locks.get(key)) {
            if (referenceClientMap.containsKey(key)) {
                return referenceClientMap.get(key);
            }
    
            // 创建 ExchangeClient 客户端
            ExchangeClient exchangeClient = initClient(url);
            // 将 ExchangeClient 实例传给 ReferenceCountExchangeClient,这里使用了装饰模式
            client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
            referenceClientMap.put(key, client);
            ghostClientMap.remove(key);
            locks.remove(key);
            return client;
        }
    }
    

    上面方法先访问缓存,若缓存未命中,则通过 initClient 方法创建新的 ExchangeClient 实例,并将该实例传给 ReferenceCountExchangeClient 构造方法创建一个带有引用计数功能的 ExchangeClient 实例。ReferenceCountExchangeClient 内部实现比较简单,就不分析了。下面我们再来看一下 initClient 方法的代码。

    private ExchangeClient initClient(URL url) {
    
        // 获取客户端类型,默认为 netty
        String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
    
        // 添加编解码和心跳包参数到 url 中
        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    
        // 检测客户端类型是否存在,不存在则抛出异常
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported client type: ...");
        }
    
        ExchangeClient client;
        try {
            // 获取 lazy 配置,并根据配置值决定创建的客户端类型
            if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
                // 创建懒加载 ExchangeClient 实例
                client = new LazyConnectExchangeClient(url, requestHandler);
            } else {
                // 创建普通 ExchangeClient 实例
                client = Exchangers.connect(url, requestHandler);
            }
        } catch (RemotingException e) {
            throw new RpcException("Fail to create remoting client for service...");
        }
        return client;
    }
    

    initClient 方法首先获取用户配置的客户端类型,默认为 netty。然后检测用户配置的客户端类型是否存在,不存在则抛出异常。最后根据 lazy 配置决定创建什么类型的客户端。这里的 LazyConnectExchangeClient 代码并不是很复杂,该类会在 request 方法被调用时通过 Exchangers 的 connect 方法创建 ExchangeClient 客户端,该类的代码本节就不分析了。下面我们分析一下 Exchangers 的 connect 方法。

    public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        // 获取 Exchanger 实例,默认为 HeaderExchangeClient
        return getExchanger(url).connect(url, handler);
    }
    

    如上,getExchanger 会通过 SPI 加载 HeaderExchangeClient 实例,这个方法比较简单,大家自己看一下吧。接下来分析 HeaderExchangeClient 的实现。

    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        // 这里包含了多个调用,分别如下:
        // 1. 创建 HeaderExchangeHandler 对象
        // 2. 创建 DecodeHandler 对象
        // 3. 通过 Transporters 构建 Client 实例
        // 4. 创建 HeaderExchangeClient 对象
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }
    

    这里的调用比较多,我们这里重点看一下 Transporters 的 connect 方法。如下:

    public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        ChannelHandler handler;
        if (handlers == null || handlers.length == 0) {
            handler = new ChannelHandlerAdapter();
        } else if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            // 如果 handler 数量大于1,则创建一个 ChannelHandler 分发器
            handler = new ChannelHandlerDispatcher(handlers);
        }
    
        // 获取 Transporter 自适应拓展类,并调用 connect 方法生成 Client 实例
        return getTransporter().connect(url, handler);
    }
    

    如上,getTransporter 方法返回的是自适应拓展类,该类会在运行时根据客户端类型加载指定的 Transporter 实现类。若用户未配置客户端类型,则默认加载 NettyTransporter,并调用该类的 connect 方法。

    到这里就不继续跟下去了,在往下就是通过 Netty 提供的 API 构建 Netty 客户端了,到这里,关于 DubboProtocol 的 refer 方法就分析完了。接下来,继续分析 RegistryProtocol 的 refer 方法逻辑。

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        // 取 registry 参数值,并将其设置为协议头
        url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
        // 获取注册中心实例
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }
    
        // 将 url 查询字符串转为 Map
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
        // 获取 group 配置
        String group = qs.get(Constants.GROUP_KEY);
        if (group != null && group.length() > 0) {
            if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                    || "*".equals(group)) {
                // 通过 SPI 加载 MergeableCluster 实例,并调用 doRefer 继续执行服务引用逻辑
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
    
        // 调用 doRefer 继续执行服务引用逻辑
        return doRefer(cluster, registry, type, url);
    }
    

    上面代码首先为 url 设置协议头,然后根据 url 参数加载注册中心实例。然后获取 group 配置,根据 group 配置决定 doRefer 第一个参数的类型。这里的重点是 doRefer 方法,如下:

    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        // 创建 RegistryDirectory 实例
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        // 设置注册中心和协议
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
        // 生成服务消费者链接
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
    
        // 注册服务消费者,在 consumers 目录下新节点
        if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false)));
        }
    
        // 订阅 providers、configurators、routers 等节点数据
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));
    
        // 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个
        Invoker invoker = cluster.join(directory);
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        return invoker;
    }
    

    如上,doRefer 方法创建一个 RegistryDirectory 实例,然后生成服务者消费者链接,并向注册中心进行注册。注册完毕后,紧接着订阅 providers、configurators、routers 等节点下的数据。完成订阅后,RegistryDirectory 会收到这几个节点下的子节点信息。由于一个服务可能部署在多台服务器上,这样就会在 providers 产生多个节点,这个时候就需要 Cluster 将多个服务节点合并为一个,并生成一个 Invoker。

    Invoker 创建完毕后,接下来要做的事情是为服务接口生成代理对象。有了代理对象,即可进行远程调用。代理对象生成的入口方法为 ProxyFactory 的 getProxy,接下来进行分析。

    public <T> T getProxy(Invoker<T> invoker) throws RpcException {
        // 调用重载方法
        return getProxy(invoker, false);
    }
    
    public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
        Class<?>[] interfaces = null;
        // 获取接口列表
        String config = invoker.getUrl().getParameter("interfaces");
        if (config != null && config.length() > 0) {
            // 切分接口列表
            String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
            if (types != null && types.length > 0) {
                interfaces = new Class<?>[types.length + 2];
                // 设置服务接口类和 EchoService.class 到 interfaces 中
                interfaces[0] = invoker.getInterface();
                interfaces[1] = EchoService.class;
                for (int i = 0; i < types.length; i++) {
                    // 加载接口类
                    interfaces[i + 1] = ReflectUtils.forName(types[i]);
                }
            }
        }
        if (interfaces == null) {
            interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
        }
    
        // 为 http 和 hessian 协议提供泛化调用支持,参考 pull request #1827
        if (!invoker.getInterface().equals(GenericService.class) && generic) {
            int len = interfaces.length;
            Class<?>[] temp = interfaces;
            // 创建新的 interfaces 数组
            interfaces = new Class<?>[len + 1];
            System.arraycopy(temp, 0, interfaces, 0, len);
            // 设置 GenericService.class 到数组中
            interfaces[len] = GenericService.class;
        }
    
        // 调用重载方法
        return getProxy(invoker, interfaces);
    }
    
    public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);
    

    如上,上面大段代码都是用来获取 interfaces 数组的,我们继续往下看。getProxy(Invoker, Class<?>[]) 这个方法是一个抽象方法,下面我们到 JavassistProxyFactory 类中看一下该方法的实现代码。

    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        // 生成 Proxy 子类(Proxy 是抽象类)。并调用 Proxy 子类的 newInstance 方法创建 Proxy 实例
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }
    

    上面代码并不多,首先是通过 Proxy 的 getProxy 方法获取 Proxy 子类,然后创建 InvokerInvocationHandler 对象,并将该对象传给 newInstance 生成 Proxy 实例。InvokerInvocationHandler 实现自 JDK 的 InvocationHandler 接口,具体的用途是拦截接口类调用。该类逻辑比较简单,这里就不分析了。下面我们重点关注一下 Proxy 的 getProxy 方法,如下。

    public static Proxy getProxy(Class<?>... ics) {
        // 调用重载方法
        return getProxy(ClassHelper.getClassLoader(Proxy.class), ics);
    }
    
    public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
        if (ics.length > 65535)
            throw new IllegalArgumentException("interface limit exceeded");
    
        StringBuilder sb = new StringBuilder();
        // 遍历接口列表
        for (int i = 0; i < ics.length; i++) {
            String itf = ics[i].getName();
            // 检测类型是否为接口
            if (!ics[i].isInterface())
                throw new RuntimeException(itf + " is not a interface.");
    
            Class<?> tmp = null;
            try {
                // 重新加载接口类
                tmp = Class.forName(itf, false, cl);
            } catch (ClassNotFoundException e) {
            }
    
            // 检测接口是否相同,这里 tmp 有可能为空
            if (tmp != ics[i])
                throw new IllegalArgumentException(ics[i] + " is not visible from class loader");
    
            // 拼接接口全限定名,分隔符为 ;
            sb.append(itf).append(';');
        }
    
        // 使用拼接后的接口名作为 key
        String key = sb.toString();
    
        Map<String, Object> cache;
        synchronized (ProxyCacheMap) {
            cache = ProxyCacheMap.get(cl);
            if (cache == null) {
                cache = new HashMap<String, Object>();
                ProxyCacheMap.put(cl, cache);
            }
        }
    
        Proxy proxy = null;
        synchronized (cache) {
            do {
                // 从缓存中获取 Reference<Proxy> 实例
                Object value = cache.get(key);
                if (value instanceof Reference<?>) {
                    proxy = (Proxy) ((Reference<?>) value).get();
                    if (proxy != null) {
                        return proxy;
                    }
                }
    
                // 并发控制,保证只有一个线程可以进行后续操作
                if (value == PendingGenerationMarker) {
                    try {
                        // 其他线程在此处进行等待
                        cache.wait();
                    } catch (InterruptedException e) {
                    }
                } else {
                    // 放置标志位到缓存中,并跳出 while 循环进行后续操作
                    cache.put(key, PendingGenerationMarker);
                    break;
                }
            }
            while (true);
        }
    
        long id = PROXY_CLASS_COUNTER.getAndIncrement();
        String pkg = null;
        ClassGenerator ccp = null, ccm = null;
        try {
            // 创建 ClassGenerator 对象
            ccp = ClassGenerator.newInstance(cl);
    
            Set<String> worked = new HashSet<String>();
            List<Method> methods = new ArrayList<Method>();
    
            for (int i = 0; i < ics.length; i++) {
                // 检测接口访问级别是否为 protected 或 privete
                if (!Modifier.isPublic(ics[i].getModifiers())) {
                    // 获取接口包名
                    String npkg = ics[i].getPackage().getName();
                    if (pkg == null) {
                        pkg = npkg;
                    } else {
                        if (!pkg.equals(npkg))
                            // 非 public 级别的接口必须在同一个包下,否者抛出异常
                            throw new IllegalArgumentException("non-public interfaces from different packages");
                    }
                }
    
                // 添加接口到 ClassGenerator 中
                ccp.addInterface(ics[i]);
    
                // 遍历接口方法
                for (Method method : ics[i].getMethods()) {
                    // 获取方法描述,可理解为方法签名
                    String desc = ReflectUtils.getDesc(method);
                    // 如果方法描述字符串已在 worked 中,则忽略。考虑这种情况,
                    // A 接口和 B 接口中包含一个完全相同的方法
                    if (worked.contains(desc))
                        continue;
                    worked.add(desc);
    
                    int ix = methods.size();
                    // 获取方法返回值类型
                    Class<?> rt = method.getReturnType();
                    // 获取参数列表
                    Class<?>[] pts = method.getParameterTypes();
    
                    // 生成 Object[] args = new Object[1...N]
                    StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
                    for (int j = 0; j < pts.length; j++)
                        // 生成 args[1...N] = ($w)$1...N;
                        code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
                    // 生成 InvokerHandler 接口的 invoker 方法调用语句,如下:
                    // Object ret = handler.invoke(this, methods[1...N], args);
                    code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);");
    
                    // 返回值不为 void
                    if (!Void.TYPE.equals(rt))
                        // 生成返回语句,形如 return (java.lang.String) ret;
                        code.append(" return ").append(asArgument(rt, "ret")).append(";");
    
                    methods.add(method);
                    // 添加方法名、访问控制符、参数列表、方法代码等信息到 ClassGenerator 中
                    ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
                }
            }
    
            if (pkg == null)
                pkg = PACKAGE_NAME;
    
            // 构建接口代理类名称:pkg + ".proxy" + id,比如 org.apache.dubbo.proxy0
            String pcn = pkg + ".proxy" + id;
            ccp.setClassName(pcn);
            ccp.addField("public static java.lang.reflect.Method[] methods;");
            // 生成 private java.lang.reflect.InvocationHandler handler;
            ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
    
            // 为接口代理类添加带有 InvocationHandler 参数的构造方法,比如:
            // porxy0(java.lang.reflect.InvocationHandler arg0) {
            //     handler=$1;
            // }
            ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;");
            // 为接口代理类添加默认构造方法
            ccp.addDefaultConstructor();
    
            // 生成接口代理类
            Class<?> clazz = ccp.toClass();
            clazz.getField("methods").set(null, methods.toArray(new Method[0]));
    
            // 构建 Proxy 子类名称,比如 Proxy1,Proxy2 等
            String fcn = Proxy.class.getName() + id;
            ccm = ClassGenerator.newInstance(cl);
            ccm.setClassName(fcn);
            ccm.addDefaultConstructor();
            ccm.setSuperClass(Proxy.class);
            // 为 Proxy 的抽象方法 newInstance 生成实现代码,形如:
            // public Object newInstance(java.lang.reflect.InvocationHandler h) {
            //     return new org.apache.dubbo.proxy0($1);
            // }
            ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
            // 生成 Proxy 实现类
            Class<?> pc = ccm.toClass();
            // 通过反射创建 Proxy 实例
            proxy = (Proxy) pc.newInstance();
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        } finally {
            if (ccp != null)
                // 释放资源
                ccp.release();
            if (ccm != null)
                ccm.release();
            synchronized (cache) {
                if (proxy == null)
                    cache.remove(key);
                else
                    // 写缓存
                    cache.put(key, new WeakReference<Proxy>(proxy));
                // 唤醒其他等待线程
                cache.notifyAll();
            }
        }
        return proxy;
    }
    

    上面代码比较复杂,我们写了大量的注释。大家在阅读这段代码时,要搞清楚 ccp 和 ccm 的用途,不然会被搞晕。ccp 用于为服务接口生成代理类,比如我们有一个 DemoService 接口,这个接口代理类就是由 ccp 生成的。ccm 则是用于为 org.apache.dubbo.common.bytecode.Proxy 抽象类生成子类,主要是实现 Proxy 类的抽象方法。下面以 org.apache.dubbo.demo.DemoService 这个接口为例,来看一下该接口代理类代码大致是怎样的(忽略 EchoService 接口)。

    package org.apache.dubbo.common.bytecode;
    
    public class proxy0 implements org.apache.dubbo.demo.DemoService {
    
        public static java.lang.reflect.Method[] methods;
    
        private java.lang.reflect.InvocationHandler handler;
    
        public proxy0() {
        }
    
        public proxy0(java.lang.reflect.InvocationHandler arg0) {
            handler = $1;
        }
    
        public java.lang.String sayHello(java.lang.String arg0) {
            Object[] args = new Object[1];
            args[0] = ($w) $1;
            Object ret = handler.invoke(this, methods[0], args);
            return (java.lang.String) ret;
        }
    }
    

    参考内容

    [1]《深入理解Apache Dubbo与实战》
    [2] dubbo 官方文档
    [3] 【Dubbo源码分析】服务导入

    stun

    展开全文
  • dubbo 服务引用

    2021-04-27 10:54:01
    服务引用原理 引用时机 Spring 容器调用 ReferenceBean 的 afterPropertiesSet 方法时引用服务(默认饿汉式) ReferenceBean 对应的服务被注入到其他类中时引用(懒汉式) 源码分析 服务引用的入口方法为 Reference...

    https://dubbo.apache.org/zh/docs/v2.7/dev/source/refer-service

    服务引用原理

    引用时机

    •  Spring 容器调用 ReferenceBean 的 afterPropertiesSet 方法时引用服务(默认饿汉式)
    • ReferenceBean 对应的服务被注入到其他类中时引用(懒汉式)

    源码分析

    服务引用的入口方法为 ReferenceBean 的 getObject 方法

    public synchronized T get() {
       
        // 检测 ref 是否为空,为空则通过 init 方法创建
        if (ref == null) {
            // init 方法主要用于处理配置,以及调用 createProxy 生成代理类
            init();
        }
        return ref;
    }

    处理配置

    配置解析逻辑封装在 ReferenceConfig 的 init 方法中

    private void init() {
    
    // 创建代理类
        ref = createProxy(map);
    
        // 根据服务名,ReferenceConfig,代理类构建 ConsumerModel,
        // 并将 ConsumerModel 存入到 ApplicationModel 中
        ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
        ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
    
    
    }

    引用服务

    1. 生成远程服务的代理 2. 获得目标服务的url地址 3. 实现远程网络通信 4. 实现负载均衡 5. 实现集群容错

    createProxy 开始看起

    先根据配置检查是否为本地调用,若是,则调用 InjvmProtocol 的 refer 方法生成 InjvmInvoker 实例。若不是,则读取直连配置项,或注册中心 url,并将读取到的 url 存储到 urls 中。然后根据 urls 元素数量进行后续操作。若 urls 元素数量为1,则直接通过 Protocol 自适应拓展类构建 Invoker 实例接口。若 urls 元素数量大于1,即存在多个注册中心或服务直连 url,此时先根据 url 构建 Invoker。然后再通过 Cluster 合并多个 Invoker,最后调用 ProxyFactory 生成代理类

    创建 Invoker

    Invoker 是由 Protocol 实现类构建而来。Protocol 实现类有很多,本节会分析最常用的两个,分别是 RegistryProtocol 和 DubboProtocol

    RegistryProtocol 的 refer 方法逻辑

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        // 取 registry 参数值,并将其设置为协议头
        url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
        // 获取注册中心实例
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }
    
        // 将 url 查询字符串转为 Map
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
        // 获取 group 配置
        String group = qs.get(Constants.GROUP_KEY);
        if (group != null && group.length() > 0) {
            if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                    || "*".equals(group)) {
                // 通过 SPI 加载 MergeableCluster 实例,并调用 doRefer 继续执行服务引用逻辑
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
        
        // 调用 doRefer 继续执行服务引用逻辑
        return doRefer(cluster, registry, type, url);
    }

    doRefer 方法创建一个 RegistryDirectory 实例,然后生成服务者消费者链接,并向注册中心进行注册。注册完毕后,紧接着订阅 providers、configurators、routers 等节点下的数据。完成订阅后,RegistryDirectory 会收到这几个节点下的子节点信息。由于一个服务可能部署在多台服务器上,这样就会在 providers 产生多个节点,这个时候就需要 Cluster 将多个服务节点合并为一个,并生成一个 Invoker 

    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        // 创建 RegistryDirectory 实例
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        // 设置注册中心和协议
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
        // 生成服务消费者链接
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
    
        // 注册服务消费者,在 consumers 目录下新节点
        if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false)));
        }
    
        // 订阅 providers、configurators、routers 等节点数据
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));
    
        // 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个
        Invoker invoker = cluster.join(directory);
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        return invoker;
    }

     

    创建代理

    入口方法为 ProxyFactory 的 getProxy

    public <T> T getProxy(Invoker<T> invoker) throws RpcException {
        // 调用重载方法
        return getProxy(invoker, false);
    }
    
    public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
        Class<?>[] interfaces = null;
        // 获取接口列表
        String config = invoker.getUrl().getParameter("interfaces");
        if (config != null && config.length() > 0) {
            // 切分接口列表
            String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
            if (types != null && types.length > 0) {
                interfaces = new Class<?>[types.length + 2];
                // 设置服务接口类和 EchoService.class 到 interfaces 中
                interfaces[0] = invoker.getInterface();
                interfaces[1] = EchoService.class;
                for (int i = 0; i < types.length; i++) {
                    // 加载接口类
                    interfaces[i + 1] = ReflectUtils.forName(types[i]);
                }
            }
        }
        if (interfaces == null) {
            interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
        }
    
        // 为 http 和 hessian 协议提供泛化调用支持,参考 pull request #1827
        if (!invoker.getInterface().equals(GenericService.class) && generic) {
            int len = interfaces.length;
            Class<?>[] temp = interfaces;
            // 创建新的 interfaces 数组
            interfaces = new Class<?>[len + 1];
            System.arraycopy(temp, 0, interfaces, 0, len);
            // 设置 GenericService.class 到数组中
            interfaces[len] = GenericService.class;
        }
    
        // 调用重载方法
        return getProxy(invoker, interfaces);
    }
    
    //JavassistProxyFactory
    //ccp 用于为服务接口生成代理类ccm 则是用于为 org.apache.dubbo.common.bytecode.Proxy 抽象类生成子类,主要是实现 Proxy 类的抽象方法
    public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);

     

     

    展开全文
  • Dubbo 服务引用原理 服务引用原理,先看代码调用逻辑,然后一步步debug看看 消费者引用: ReferenceBean.getObject() -->ReferenceConfig.get() -->init() -->...

                                                                Dubbo 服务引用原理

     

    服务引用原理,先看代码调用逻辑,然后一步步debug看看

    消费者引用:
    ReferenceBean.getObject()
      -->ReferenceConfig.get()
        -->init()
          -->createProxy(map)
            -->refprotocol.refer(interfaceClass, urls.get(0))
              -->ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("registry");
              -->extension.refer(arg0, arg1);
                -->ProtocolFilterWrapper.refer
                  -->RegistryProtocol.refer
                    -->registryFactory.getRegistry(url)//建立zk的连接,和服务端发布一样(省略代码)
                    -->doRefer(cluster, registry, type, url)
                      -->registry.register//创建zk的节点,和服务端发布一样(省略代码)。节点名为:dubbo/com.alibaba.dubbo.demo.DemoService/consumers
                      -->registry.subscribe//订阅zk的节点,和服务端发布一样(省略代码)。   /dubbo/com.alibaba.dubbo.demo.DemoService/providers, 
                                                                            /dubbo/com.alibaba.dubbo.demo.DemoService/configurators,
                                                                             /dubbo/com.alibaba.dubbo.demo.DemoService/routers]
                        -->notify(url, listener, urls);
                          -->FailbackRegistry.notify
                            -->doNotify(url, listener, urls);
                              -->AbstractRegistry.notify
                                -->saveProperties(url);//把服务端的注册url信息更新到C:\Users\bobo\.dubbo\dubbo-registry-192.168.48.117.cache
                                 -->registryCacheExecutor.execute(new SaveProperties(version));//采用线程池来处理
                               -->listener.notify(categoryList)
                                 -->RegistryDirectory.notify
                                   -->refreshInvoker(invokerUrls)//刷新缓存中的invoker列表
                                     -->destroyUnusedInvokers(oldUrlInvokerMap,newUrlInvokerMap); // 关闭未使用的Invoker
                                     -->最终目的:刷新Map<String, Invoker<T>> urlInvokerMap 对象
                                                                                                                              刷新Map<String, List<Invoker<T>>> methodInvokerMap对象
                      -->cluster.join(directory)//加入集群路由
                        -->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension("failover");
                          -->MockClusterWrapper.join
                            -->this.cluster.join(directory)
                              -->FailoverCluster.join
                                -->return new FailoverClusterInvoker<T>(directory)
                                -->new MockClusterInvoker
            -->proxyFactory.getProxy(invoker)//创建服务代理
              -->ProxyFactory$Adpative.getProxy
                -->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension("javassist");
                  -->StubProxyFactoryWrapper.getProxy
                    -->proxyFactory.getProxy(invoker)
                      -->AbstractProxyFactory.getProxy
                        -->getProxy(invoker, interfaces)
                          -->Proxy.getProxy(interfaces)//目前代理对象interface com.alibaba.dubbo.demo.DemoService, interface com.alibaba.dubbo.rpc.service.EchoService
                          -->InvokerInvocationHandler// 采用jdk自带的InvocationHandler,创建InvokerInvocationHandler对象。

     

    首先ReferenceConfig类的init方法调用Protocol的refer方法生成Invoker实例(如上图中的红色部分),这是服务消费的关键。

    接下来把Invoker转换为客户端需要的接口(如:HelloWorld)。

     

     

     

    1.

     

     

     

     

     

     

     

     

    接下来是关键的

     

     

     

     

     

     

     

     

     

     

     

    订阅消费者

     

     

     

     

     

     

    上面的几行是因为  subscribe 之后会触发notify

     

     

     

    采用的是failfast 策略

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

    展开全文
  • dubbo服务引用-原理

    2018-07-19 20:20:24
    描述一下dubbo服务引用的过程,原理   既然你提到了dubbo的服务引用中封装通信细节是用到了动态代理,那请问创建动态代理常用的方式有哪些,他们又有什么区别?dubbo中用的是哪一种?(高频题)   除了JDK动态代理和...
  •  spring在解析配置文件的时候将dubbo:reference标签解析成了ReferenceBean类,该类实现了InitializingBean与FactoryBean接口,并且实现了其中的方法,用户服务引用。 FactoryBean是spring中的工厂bean,通过实现...
  • dubbo服务引用流程大致如下: (1)首先在dubbo容器启动的时候,会扫描所有的reference配置(也就是dubbo客户端配置的远程引用),生成对应ReferenceBean,例如:客户端的DemoClient类依赖了远程服务DemoService,...
  • dubbo服务引用是通过spring的schema实现的。消费端的配置如下: &lt;dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService"/&gt;...
  • 文章目录1....同样的,为了聚焦在服务引用的过程,编写如下的测试代码,把关注点放在服务引用的过程上。 代码如下: // 客户端 @Test public void invokeRemote() { ReferenceConfig<UserService&g
  • #服务引用流程图 大致步骤可以拆解为: -1. 配置加载 -2. 创建invoker对象 -3. 创建服务接口代理类 ReferenceBean dubbo服务引用dubbo:reference 标签触发,对应的为ReferenceBean。该类实现了InitializingBean...
  • Dubbo 服务引用流程

    千次阅读 2020-02-22 18:11:24
    观看本篇博客,需要先看上篇: ...服务引用流程 先看 DubboNamespaceHandler类的 init方法,在定义属性的时候有这样一行代码: this.registerBeanDefinitionParser("reference", new DubboBeanDe...
  • 理解 Dubbo 服务引用

    2018-05-23 23:30:00
    3 月,跳不动了?>>> dubbo 服务...
  • ​ 上一章分析了服务暴露的源码,这一章继续分析服务引用的源码。在Dubbo中有两种引用方式:第一种是服务直连,第二种是基于注册中心进行引用。服务直连一般用在测试的场景下,线上更多的是基于注册中心的方式。 ​ ...
  • 上一篇文章详细分析了服务导出的过程,本篇文章我们趁热打铁,继续分析服务引用过程。在 Dubbo 中,我们可以通过两种方式引用远程服务。第一种是使用服务直连的方式引用服务,第二种方式是基于注册中心进行引用。...
  • Dubbo服务引用流程

    2020-11-08 12:16:34
    if (this.isInjvm() == null) { // 判断是否为本地服务引用 if (this.url != null && this.url.length() > 0) { isJvmRefer = false; } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) { ...
  • Dubbo服务引用原理

    2018-09-11 21:37:24
    服务引用原理 配置文件 通过Spring容器加载 每一个标签,对应一个解析类 Reference 对应ReferenceBean 实现了FactoryBean FactoryBean 工厂Bean 引用标签,通过往容器中,注入Bean 使用时,从...
  • 哈哈哈和服务暴露一样,也有两种: 本地引用,jvm本地调用: // 推荐 <dubbo:service scope="local" /> // 不推荐使用,准备废弃 <dubbo:service injvm="true" /> 远程暴露,网络远程通信: <...
  • 点赞再看,养成习惯,...上篇文章我们已经了解了 Dubbo 服务暴露全过程,这篇文章我就带着大家再来看看 Dubbo 服务引入全流程,这篇服务引入写完下一篇就要来个全链路打通了,看看大家看完会不会有种任督二脉都被打...
  • Dubbo服务引用(二)远程引用

    千次阅读 2020-04-17 22:07:38
    * 服务引用 URL 数组 */ private final List<URL> urls = new ArrayList<URL>(); /** * 直连服务地址 * * 1. 可以是注册中心,也可以是服务提供者 * 2. 可配置多个,使用 ; 分隔 */ // ur...
  • dubbo服务引用调用原理

    千次阅读 2018-08-31 10:18:19
    所有的dubbo自定义标签都会由DubboNamespaceHandler处理 registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false)); ReferenceBean public class ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,631
精华内容 652
关键字:

dubbo服务引用