精华内容
下载资源
问答
  • dubbo服务暴露

    2017-12-27 13:54:32
    dubbo服务暴露

    1.dubbo启动依赖于spring容器的启动,而spring容器的启动主要的方法都在AbstractApplicationContext类里

    public void refresh() throws BeansException, IllegalStateException {
    		synchronized (this.startupShutdownMonitor) {
    			// Prepare this context for refreshing.
    			prepareRefresh();
    
    			// Tell the subclass to refresh the internal bean factory.
    			ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();
    
    			// Prepare the bean factory for use in this context.
    			prepareBeanFactory(beanFactory);
    
    			try {
    				// Allows post-processing of the bean factory in context subclasses.
    				postProcessBeanFactory(beanFactory);
    
    				// Invoke factory processors registered as beans in the context.
    				invokeBeanFactoryPostProcessors(beanFactory);
    
    				// Register bean processors that intercept bean creation.
    				registerBeanPostProcessors(beanFactory);
    
    				// Initialize message source for this context.
    				initMessageSource();
    
    				// Initialize event multicaster for this context.
    				initApplicationEventMulticaster();
    
    				// Initialize other special beans in specific context subclasses.
    				onRefresh();
    
    				// Check for listener beans and register them.
    				registerListeners();
    
    				// Instantiate all remaining (non-lazy-init) singletons.
    				finishBeanFactoryInitialization(beanFactory);
    
    				// Last step: publish corresponding event.
    				finishRefresh();
    			}
    
    			catch (BeansException ex) {
    				// Destroy already created singletons to avoid dangling resources.
    				destroyBeans();
    
    				// Reset 'active' flag.
    				cancelRefresh(ex);
    
    				// Propagate exception to caller.
    				throw ex;
    			}
    		}
    	}
    而我们在xml文件里面配置的标签dubbo:service对应的类文件为ServiceBean,它实现了一个ApplicationListener,这个接口的主要是监听spring容器的启动事件,当spring容器启动完毕之后就会调用finishRefresh();最终触发ServiceBean类的onApplicationEvent执行,开始暴露服务。
    public void onApplicationEvent(ApplicationEvent event) {
            if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) {
            	if (isDelay() && ! isExported() && ! isUnexported()) {
                    if (logger.isInfoEnabled()) {
                        logger.info("The service ready on spring started. service: " + getInterface());
                    }
                    export();
                }
            }
        }
    2.下面开始检查相应的配置,延迟暴露,注册中心,应用,协议,提供者等等一系列的配置,加载注册信息汇总成dubbo的URL形式,然后根据配置的协议种类分别针对这个注册url进行暴露,

     private void doExportUrls() {
            List<URL> registryURLs = loadRegistries(true);
            for (ProtocolConfig protocolConfig : protocols) {
                doExportUrlsFor1Protocol(protocolConfig, registryURLs);
            }
        }
    3.经过一系列的参数组装后开始根据具体的范围进行本地或者远程暴露,什么都不配置的话那两个地方都暴露
      String scope = url.getParameter(Constants.SCOPE_KEY);
            //配置为none不暴露
            if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
    
                //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
                if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                    exportLocal(url);
                }
                //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露远程服务)
                if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
                    if (logger.isInfoEnabled()) {
                        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                    }
                    if (registryURLs != null && registryURLs.size() > 0
                            && url.getParameter("register", true)) {
                        for (URL registryURL : registryURLs) {
                            url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                            URL monitorUrl = loadMonitor(registryURL);
                            if (monitorUrl != null) {
                                url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                            }
                            if (logger.isInfoEnabled()) {
                                logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                            }
                            Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
    
                            Exporter<?> exporter = protocol.export(invoker);
                            exporters.add(exporter);
                        }
                    } else {
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
    
                        Exporter<?> exporter = protocol.export(invoker);
                        exporters.add(exporter);
                    }
                }
            }
            this.urls.add(url);
    4.首先是本地暴露,更换协议为injvm,这个协议很重要,和前面讲的获取自适应扩展产生的对象有直接关系,在ServiceClassHolder的ThreadLocal<Class> holder记录这次暴露的class,这个时候proxyFactory是一个javassist生成的自适应扩展类实例ProxyFactory$Adaptive

     private void exportLocal(URL url) {
            if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                URL local = URL.valueOf(url.toFullString())
                        .setProtocol(Constants.LOCAL_PROTOCOL)
                        .setHost(NetUtils.LOCALHOST)
                        .setPort(0);
    
                // modified by lishen
                ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));
    
                Exporter<?> exporter = protocol.export(
                        proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
                exporters.add(exporter);
                logger.info("Export dubbo service " + interfaceClass.getName() +" to local registry");
            }
        }
    首先执行proxyFactory.getInvoker方法,在这里到底会调用ProxyFactory接口的那个实现类呢,字符串生成的自适应扩展类在调用他所实现的接口的某个方法时会查看这个方法中满足那些规则,"protocol"的?hasInvocation?SPI注解上有没有默认值defaultExtName,根据这些然后生成getExtension(extName),最终获取具体的实例,如果这时候有包裹类型Wrapper,进行包裹之后返回实际的对象。综上所述,@SPI("javassist")并且这个ProxyFactory有包裹类型,所以函数调用首先经过StubProxyFactoryWrapper

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException {
            return proxyFactory.getInvoker(proxy, type, url);
        }
    然后到达目的地JavassistProxyFactory,针对具体的实现class用javassist生成对应的代理,然后包装成AbstractProxyInvoker这个类型返回,

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
            // TODO Wrapper类不能正确处理带$的类名
            final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
            return new AbstractProxyInvoker<T>(proxy, type, url) {
                @Override
                protected Object doInvoke(T proxy, String methodName, 
                                          Class<?>[] parameterTypes, 
                                          Object[] arguments) throws Throwable {
                    return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
                }
            };
        }
    开始针对协议进行暴露,这个时候协议是injvm,所以实际对象为InjvmProtocol,但是因为有包装Wrapper,所以需要经过ProtocolFilterWrapper,针对不是registry类型的协议,会创建"service.filter""provider"类型的过滤器链

     public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
                return protocol.export(invoker);
            }
            return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
        }
    包装必要的暴露监听

     public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
                return protocol.export(invoker);
            }
            return new ListenerExporterWrapper<T>(protocol.export(invoker), 
                    Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                            .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
        }
    针对参数警醒一下包装生成Exporter返回,最后加入到服务暴露列表

     public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
        }
    5.远程暴露,先是一样的流程,包装代理并且返回Invoker

    if (registryURLs != null && registryURLs.size() > 0
                            && url.getParameter("register", true)) {
                        for (URL registryURL : registryURLs) {
                            url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                            URL monitorUrl = loadMonitor(registryURL);
                            if (monitorUrl != null) {
                                url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                            }
                            if (logger.isInfoEnabled()) {
                                logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                            }
                            Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
    
                            Exporter<?> exporter = protocol.export(invoker);
                            exporters.add(exporter);
                        }
                    }
    进行协议暴露,因为入参invoker有URL属性,并且它的协议是registry,所以最后执行的对象是RegistryProtocol,前面会经过过滤器和监听器包装,因为是注册协议,所以这两个类不对这个对象进行任何包装。

     public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
            //export invoker
            final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
            //registry provider
            final Registry registry = getRegistry(originInvoker);
            final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
            registry.register(registedProviderUrl);
            // 订阅override数据
            // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
            final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
            final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
            overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
            registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
            //保证每次export都返回一个新的exporter实例
            return new Exporter<T>() {
                public Invoker<T> getInvoker() {
                    return exporter.getInvoker();
                }
                public void unexport() {
                	try {
                		exporter.unexport();
                	} catch (Throwable t) {
                    	logger.warn(t.getMessage(), t);
                    }
                    try {
                    	registry.unregister(registedProviderUrl);
                    } catch (Throwable t) {
                    	logger.warn(t.getMessage(), t);
                    }
                    try {
                    	overrideListeners.remove(overrideSubscribeUrl);
                    	registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
                    } catch (Throwable t) {
                    	logger.warn(t.getMessage(), t);
                    }
                }
            };
        }
    简单包装一下URL和原始的invoker,开始dubbo协议的暴露,过滤器监听器,最后到DubboProtocol

    private <T> ExporterChangeableWrapper<T>  doLocalExport(final Invoker<T> originInvoker){
            String key = getCacheKey(originInvoker);
            ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
            if (exporter == null) {
                synchronized (bounds) {
                    exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                    if (exporter == null) {
                        final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                        exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker);
                        bounds.put(key, exporter);
                    }
                }
            }
            return (ExporterChangeableWrapper<T>) exporter;
        }

    组装DubboExporter并把对应的键值对存进暴露服务缓存里exporterMap

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            URL url = invoker.getUrl();
            
            // export service.
            String key = serviceKey(url);
            DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
            exporterMap.put(key, exporter);
                openServer(url);
            return exporter;
        }
    开始打开服务并且准备接受请求,设置心跳,获取网络服务类,绑定具体处理请求的requestHandler 

    private ExchangeServer createServer(URL url) {
            //默认开启server关闭时发送readonly事件
            url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
            //默认开启heartbeat
            url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
            String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
    
            if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
                throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    
            url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
            ExchangeServer server;
            try {
                server = Exchangers.bind(url, requestHandler);
            } catch (RemotingException e) {
                throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
            }
            str = url.getParameter(Constants.CLIENT_KEY);
            if (str != null && str.length() > 0) {
                Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
                if (!supportedTypes.contains(str)) {
                    throw new RpcException("Unsupported client type: " + str);
                }
            }
            return server;
        }
    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
            
            public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
                if (message instanceof Invocation) {
                    Invocation inv = (Invocation) message;
                    Invoker<?> invoker = getInvoker(channel, inv);
                    //如果是callback 需要处理高版本调用低版本的问题
                    if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
                        String methodsStr = invoker.getUrl().getParameters().get("methods");
                        boolean hasMethod = false;
                        if (methodsStr == null || methodsStr.indexOf(",") == -1){
                            hasMethod = inv.getMethodName().equals(methodsStr);
                        } else {
                            String[] methods = methodsStr.split(",");
                            for (String method : methods){
                                if (inv.getMethodName().equals(method)){
                                    hasMethod = true;
                                    break;
                                }
                            }
                        }
                        if (!hasMethod){
                            logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
                            return null;
                        }
                    }
                    RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                    return invoker.invoke(inv);
                }
                throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
            }
        };
    开始把具体的URL和这个请求处理器绑定到到一起,获取具体的HeaderExchanger

    public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
            url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
            return getExchanger(url).bind(url, handler);
        }
    public static Exchanger getExchanger(URL url) {
            String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
            return getExchanger(type);
        }
    public static Exchanger getExchanger(String type) {
            return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
        }
    包装具体请求处理类,加上前置处理HeaderExchangeHandler,编解码处理DecodeHandler,获取NettyTransporter进行绑定

     public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
            ChannelHandler handler;
            if (handlers.length == 1) {
                handler = handlers[0];
            } else {
                handler = new ChannelHandlerDispatcher(handlers);
            }
            return getTransporter().bind(url, handler);
        }
    public static Transporter getTransporter() {
        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
    }

    
    

    
    创建NettyServer服务端,对于请求处理器进行再一次包装,添加心跳以及处理多信息功能
    

    public NettyServer(URL url, ChannelHandler handler) throws RemotingException{
            super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
        }
    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
            return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                                            .getAdaptiveExtension().dispatch(handler, url)));
        }
    
     public ChannelHandler dispatch(ChannelHandler handler, URL url) {
            return new AllChannelHandler(handler, url);
        }
    
    获取FixedThreadPool线程池,在SimpleDataStore保存端口号线程池相应的键值对
    public WrappedChannelHandler(ChannelHandler handler, URL url) {
            this.handler = handler;
            this.url = url;
            executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
    
            String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
            if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
                componentKey = Constants.CONSUMER_SIDE;
            }
            DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
            dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
        }
    设置完一系列属性后开始正式开启netty服务,这里的线程池就是上面创建的那个线程池

    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
            super(url, handler);
            localAddress = getUrl().toInetSocketAddress();
            String host = url.getParameter(Constants.ANYHOST_KEY, false) 
                            || NetUtils.isInvalidLocalHost(getUrl().getHost()) 
                            ? NetUtils.ANYHOST : getUrl().getHost();
            bindAddress = new InetSocketAddress(host, getUrl().getPort());
            this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
            this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
            try {
                doOpen();
            } 
            executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
                    .getDefaultExtension().get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
        }
    绑定ip端口,建立通讯通道

    protected void doOpen() throws Throwable {
            NettyHelper.setNettyLoggerFactory();
            ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
            ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
            ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
            bootstrap = new ServerBootstrap(channelFactory);
            
            final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
            channels = nettyHandler.getChannels();
            // https://issues.jboss.org/browse/NETTY-365
            // https://issues.jboss.org/browse/NETTY-379
            // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
            bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
                public ChannelPipeline getPipeline() {
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
                    ChannelPipeline pipeline = Channels.pipeline();
                    /*int idleTimeout = getIdleTimeout();
                    if (idleTimeout > 10000) {
                        pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                    }*/
                    pipeline.addLast("decoder", adapter.getDecoder());
                    pipeline.addLast("encoder", adapter.getEncoder());
                    pipeline.addLast("handler", nettyHandler);
                    return pipeline;
                }
            });
            // bind
            channel = bootstrap.bind(getBindAddress());
        }
    包装心跳功能,开始定时给绑定的channel发送心跳信息,返回具体的服务处理类存到缓存里exporterMap

    public HeaderExchangeServer(Server server) {
            if (server == null) {
                throw new IllegalArgumentException("server == null");
            }
            this.server = server;
            this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
            this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
            if (heartbeatTimeout < heartbeat * 2) {
                throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
            }
            startHeatbeatTimer();
        }
    返回暴露的Exporter,和最开始的originInvoker一块包装成ExporterChangeableWrapper

    //用于解决rmi重复暴露端口冲突的问题,已经暴露过的服务不再重新暴露
        //providerurl <--> exporter
        private final Map<String, ExporterChangeableWrapper<?>> bounds 
    6.获取注册中心,这里的协议是zookeeper

    private Registry getRegistry(final Invoker<?> originInvoker){
            URL registryUrl = originInvoker.getUrl();
            return registryFactory.getRegistry(registryUrl);
        }
    public Registry getRegistry(URL url) {
        	url = url.setPath(RegistryService.class.getName())
        			.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
        			.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
        	String key = url.toServiceString();
            // 锁定注册中心获取过程,保证注册中心单一实例
            LOCK.lock();
            try {
                Registry registry = REGISTRIES.get(key);
                if (registry != null) {
                    return registry;
                }
                registry = createRegistry(url);
                if (registry == null) {
                    throw new IllegalStateException("Can not create registry " + url);
                }
                REGISTRIES.put(key, registry);
                return registry;
            } finally {
                // 释放锁
                LOCK.unlock();
            }
        }
    
    创建注册服务,这里默认使用的是zookeeper,定时在文件里保存相应的配置 loadProperties();通知订阅者变更事件notify(url.getBackupUrls());定时检测并连接注册中心retry();开始连接注册中心zkClient = zookeeperTransporter.connect(url);在ZkclientZookeeperClient里订阅相关 状态的改变,返回注册的信息中心Registry

    7.开始在注册中心zk里面注册提供者的URL,registry.register(registedProviderUrl);首先记录注册的URL,registered.add(url);然后具体向注册中心注册,对于zk而言就是创建节点,zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));

    8.// 订阅override数据
            // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。

            final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
            final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
            overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
            registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    记录URL与监听器的订阅信息ConcurrentMap<URL, Set<NotifyListener>> subscribed ,保存配置 saveProperties(url);,通知监听事件listener.notify(categoryList);这样的话如果提供者有任何变动的话就会触发监听器。

    {
                    List<URL> urls = new ArrayList<URL>();
                    for (String path : toCategoriesPath(url)) {
                        ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                        if (listeners == null) {
                            zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                            listeners = zkListeners.get(url);
                        }
                        ChildListener zkListener = listeners.get(listener);
                        if (zkListener == null) {
                            listeners.putIfAbsent(listener, new ChildListener() {
                                public void childChanged(String parentPath, List<String> currentChilds) {
                                	ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                                }
                            });
                            zkListener = listeners.get(listener);
                        }
                        zkClient.create(path, false);
                        List<String> children = zkClient.addChildListener(path, zkListener);
                        if (children != null) {
                        	urls.addAll(toUrlsWithEmpty(url, path, children));
                        }
                    }
                    notify(url, listener, urls);
                }
    9.把之前的ExporterChangeableWrapper用Exporter包装一下然后返回,使他能得到具体的invoker代理。最后把这个暴露服务加入暴露列表里。
    展开全文
  • Dubbo服务暴露

    2020-11-30 20:15:04
    Dubbo服务暴露分为两个步骤:先将服务封装成Invoker、在将Invoker导出。Dubbo的服务导出主要封装在ServiceBean。 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XulyBPqU-1606738422173)...

    Dubbo服务暴露分为两个步骤:先将服务封装成Invoker、在将Invoker导出。Dubbo的服务导出主要封装在ServiceBean。

    在这里插入图片描述

    ServiceBean实现了ApplicationListener接口,监听Spring事件,在容器刷新的时候回触发服务导出操作。在导出之前先判断服务是否需要导出,对应Service中export属性。

        @Override
        public void onApplicationEvent(ContextRefreshedEvent event) {
            if (!isExported() && !isUnexported()) {
                if (logger.isInfoEnabled()) {
                    logger.info("The service ready on spring started. service: " + getInterface());
                }
                export();
            }
        }
    

    在ServiceBean中export方法中又调用了父类ServiceConfig的export方法,核心逻辑都封装在ServiceConfig中,publishExportEvent主要是发送服务导出事件。

    public void export() {
            super.export();
            publishExportEvent();
        }
    

    接下来我们重点看下ServiceConfig中export方法

    public synchronized void export() {
            checkAndUpdateSubConfigs();   // (1)
    
            if (!shouldExport()) {    //(2)
                return;
            }
    
            if (shouldDelay()) {   // (3)
                DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
            } else {
                doExport();   // (4)
            }
        }
    

    (1)、进行参数检验。

    (2)、判断服务是否需要导出。

    (3)、判断是否配置延迟导出,延迟导出的话会在一个定时线程中去导出。

    (4)、核心的导出服务逻辑。

    private void doExportUrls() {
            List<URL> registryURLs = loadRegistries(true);  // (1)
            for (ProtocolConfig protocolConfig : protocols) {   // (2)
                String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);  // (3)
                ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
                ApplicationModel.initProviderModel(pathKey, providerModel);
                doExportUrlsFor1Protocol(protocolConfig, registryURLs);  // (4)
            }
        }
    

    (1)、loadRegistries服务导出和服务引用共用的方法,获取注册中心地址,服务提供者需要注册中心地址注册服务,消费方需要从注册中心拉取服务信息。一个服务可以用多种协议发布,每个协议又存在多个地址(注册中心集群),拉取的注册中心地址排除了提供者配置register为false和消费者配置subscribe为false的路径,如果没有填写默认使用”0.0.0.0“。

    (2)、遍历每种协议,默认使用dubbo协议进行导出,可以进行多种协议同时导出,这里使用默认dubbo协议进行跟踪。

    (3)、获取路径的key,默认为包名+类名。

    (4)、获取注册中心的信息,根据协议暴露对应的url。

    接下来我们接着doExportUrlsFor1Protocol往下走。

    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
            ···  
            if (ProtocolUtils.isGeneric(generic)) {
                map.put(GENERIC_KEY, generic);
                map.put(METHODS_KEY, ANY_VALUE);
            } else {
                String revision = Version.getVersion(interfaceClass, version);
                if (revision != null && revision.length() > 0) {
                    map.put(REVISION_KEY, revision);
                }
    
                String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();   // (1)
                if (methods.length == 0) {
                    logger.warn("No method found in service interface " + interfaceClass.getName());
                    map.put(METHODS_KEY, ANY_VALUE);
                } else {
                    map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));  // (2)
                }
            }
            if (!ConfigUtils.isEmpty(token)) {
                if (ConfigUtils.isDefault(token)) {
                    map.put(TOKEN_KEY, UUID.randomUUID().toString());
                } else {
                    map.put(TOKEN_KEY, token);
                }
            }
            // export service
            String host = this.findConfigedHosts(protocolConfig, registryURLs, map);  
            Integer port = this.findConfigedPorts(protocolConfig, name, map);   
            URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);     // (3)
    
            if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .hasExtension(url.getProtocol())) {
                url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                        .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
            }
    
            String scope = url.getParameter(SCOPE_KEY);
            // don't export when none is configured
            if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
    
                // export to local if the config is not remote (export to remote only when config is remote)
                if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                    exportLocal(url);   // (4)
                }
                // export to remote if the config is not local (export to local only when config is local)
                if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                    if (!isOnlyInJvm() && logger.isInfoEnabled()) {
                        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                    }
                    if (CollectionUtils.isNotEmpty(registryURLs)) {
                        for (URL registryURL : registryURLs) {   // (5)
                            //if protocol is only injvm ,not register
                            if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                                continue;  // (6)
                            }
                            url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                            URL monitorUrl = loadMonitor(registryURL);
                            if (monitorUrl != null) {
                                url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
                            }
                            if (logger.isInfoEnabled()) {
                                logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                            }
    
                            // For providers, this is used to enable custom proxy to generate invoker
                            String proxy = url.getParameter(PROXY_KEY);
                            if (StringUtils.isNotEmpty(proxy)) {
                                registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                            }
    
                            Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));   // (7)
                            DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    
                            Exporter<?> exporter = protocol.export(wrapperInvoker);   // (8)
                            exporters.add(exporter);
                        }
                    } else {
                        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                   
                    MetadataReportService metadataReportService = null;
                    if ((metadataReportService = getMetadataReportService()) != null) {
                        metadataReportService.publishProvider(url);
                    }
                }
            }
            this.urls.add(url);
        }
    

    (1)、获取服务提供者类中所有方法。

    (2)、如果存在方法将方法放在Map中。

    (3)、将协议名(默认dubbo)、本机地址、端口号信息封装成URL。

    (4)、如果配置不是远程的,则导出到本地(仅当配置为远程时才导出到远程)。

    (5)、遍历所有注册中心进行注册。

    (6)、忽略本地导出,本地带出在(4)已经做过了。

    (7)、将服务封装成Invoker,因为每个服务提供的类名、方法名、参数都是不同的,这里封装成Invoker可以进行统一调用。

    (8)、服务导出,这里是远程暴露的核心逻辑。

    我们先看一下本地服务导出是如何实现的

    private void exportLocal(URL url) {
            URL local = URLBuilder.from(url)
                    .setProtocol(LOCAL_PROTOCOL)
                    .setHost(LOCALHOST_VALUE)
                    .setPort(0)
                    .build();    // (1)
           
            Exporter<?> exporter = protocol.export(
                    PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));   // (2)
            exporters.add(exporter);  
      
        }
        
        
        
      public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);  //
        }
        
        
        InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
            super(invoker);
            this.key = key;
            this.exporterMap = exporterMap;
            exporterMap.put(key, this);
        }
    

    (1) 、将服务信息注册信息封装成URL。

    (2)、本地导出实现是InjvmExporter,在构造函数中只是将服务信息导出添加在exporterMap中。

    下面我们重点看下远程服务导出流程

    Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                            DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    
                            Exporter<?> exporter = protocol.export(wrapperInvoker);
                            exporters.add(exporter);
    

    在protocol的SPI中实现是RegistryProtocol,我们来看下RegistryProtocol中export是如何实现的。

    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
            URL registryUrl = getRegistryUrl(originInvoker);  // 注册中心地址
     
            // 获取注册中心的URL,比如zookeeper://127.0.0.1:2181
            URL providerUrl = getProviderUrl(originInvoker);
            
            final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);    // (1)
            final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);   // (2)
            overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    
            providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
            //export invoker 这里就是前面说的dubbo:// 的暴露,并且还会打开端口等操作
            final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); // (3)
    
            // url to registry 根据URL加载Registry的实现类,比如我们这里用的即使ZookeeperRegistry
            final Registry registry = getRegistry(originInvoker);
            // 获取注册的服务提供者的URL,这里就是刚才我们说的Dubbo://...
            final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
            // 将提供者信息注册到服务者与消费者注册表中
            ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
                    registryUrl, registeredProviderUrl);
            //to judge if we need to delay publish  判断是否需要延迟发布
            boolean register = registeredProviderUrl.getParameter("register", true);
            if (register) {
                // 想注册中心注册服务,zookeeper
                register(registryUrl, registeredProviderUrl);   // (4)
                providerInvokerWrapper.setReg(true);
            }
    
            // Deprecated! Subscribe to override rules in 2.6.x or before.
            registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    
            exporter.setRegisterUrl(registeredProviderUrl);
            exporter.setSubscribeUrl(overrideSubscribeUrl);
            //Ensure that a new exporter instance is returned every time export
            return new DestroyableExporter<>(exporter);
        }
    

    (1)、获得提供者的URL。

    (2)、订阅本地服务变动,重新导出。

    (3)、真正的服务导出,走的是DubboProtocol的export。

    (4)、向注册中心注册提供者信息。

    我们从上面(3)、(4)两个步骤逐个跟进。先看下DubboProtocol是如何导出的,这个先说下为什么要先执行RegistryProtocol的export,这里好比是RegistryProtocol抽象了服务导出的公共逻辑。

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            URL url = invoker.getUrl();
    
            // export service.
            String key = serviceKey(url);
            DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
            exporterMap.put(key, exporter);  // (1)
    
            //export an stub service for dispatching event
            Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
            Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
            if (isStubSupportEvent && !isCallbackservice) {  // (2)
                String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
                if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                                "], has set stubproxy support event ,but no stub methods founded."));
                    }
    
                } else {
                    stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
                }
            }
    
            openServer(url); // (3)
            optimizeSerialization(url);  //(4)
    
            return exporter;
        }
    

    (1)、将Invoker封装成DubboExporter放入导出Map中。

    (2)、如果服务设置了本地存根和服务回调,则将方法放入Map中。本地存根给服务端掉用异常的时候处理方法。

    (3)、使用Netty的方式初始化服务,消费方和提供方通过Netty通讯的。

    (4)、如果服务设置了序列化优化器,则放入Map中以供序列化的时候使用。

    Dubbo服务已经通过Netty暴露了,那么它还需要将暴露的信息注册到注册中心去,消费方才知道怎么去调用。这个先走的是FailbackRegistry,可以认为是封装Registry增强了失败重试功能。

    public void register(URL url) {
            super.register(url);   // (1)
            removeFailedRegistered(url);   // (2)
            removeFailedUnregistered(url);    // (3)
            try {
                // Sending a registration request to the server side
                // 发送消费者注册请求
                doRegister(url);   //(4)
            } catch (Exception e) {
                Throwable t = e;
    
                // If the startup detection is opened, the Exception is thrown directly.
                boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                        && url.getParameter(Constants.CHECK_KEY, true)
                        && !CONSUMER_PROTOCOL.equals(url.getProtocol());
                boolean skipFailback = t instanceof SkipFailbackWrapperException;
                if (check || skipFailback) {
                    if (skipFailback) {
                        t = t.getCause();
                    }
                    throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
                } else {
                    logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                }
    
                // Record a failed registration request to a failed list, retry regularly
                addFailedRegistered(url);   // (5)
            }
        }
    

    (1)、调用父类的方法,将URL添加到已注册URL集合中。

    (2)、从注册失败集合中移除之前注册失败的当前URL。

    (3)、从注册失败并且需要取消的集合中移除当前URL,该集合中提供了取消的定时任务去取消。

    (4)、有子类完成具体注册实现的注册任务,这里使用的是ZookeeperRegistry,直接向ZK中创建一个节点。

    (5)、如果注册失败,则由定时任务实现重试。

    到这里服务暴露的流程就结束啦。

    展开全文
  • Dubbo 服务暴露

    2019-06-03 10:28:28
    本篇文章主要是阅读了dubbo官方文档:...关于服务暴露和引用,感觉很多细节还不是十分清楚,所以决定从自己手上的项目看起,然后一步步探究其中的实现,顺便记...

    本篇文章主要是阅读了dubbo官方文档:http://dubbo.apache.org/zh-cn/docs/user/quick-start.html关于服务的暴露和引用,感觉很多细节还不是十分清楚,所以决定从自己手上的项目看起,然后一步步探究其中的实现,顺便记录下这个过程中学到的其他知识,由于dubbo是一个很成熟的框架了,用到的技术也很多,里面定义了很多类和接口十分复杂,所以我一步步去分析篇幅可能有些长,周边知识也非常多,我在文中都列了相关拓展知识的链接,建议大家clone一份源码跟着读,抛砖引玉,能对大家也有所帮助。dubbo的版本号是2.6.2。

    1.从dubbo的配置讲起:

    这里主要用到了spring自定标签功能,关于spring自定义配置,大家可以查看:https://www.cnblogs.com/mahuan2/p/7213866.html这篇文章.

    这里主要定义了一个dubbo的命名空间,然后编写了对应的xsd文档,dubbo的xsd文档在jar包中的META-INF/dubbo.xsd。

    然后dubbo.xsd中我们主要关注service和reference标签。

    然后对标签的处理类dubbo中的定义写在META-INF/spring.handlers

    我们看到dubbo标签的解析主要用到了DubboNamespaceHandler 这个类。然后我们打开DubboNamespaceHandler这个类的源码。

    我们可以看到dubbo自定义了一个DubboBeanDefinitionParser类去解析上面的标签,并且自定义了ServiceBean和ReferenceBean。然后我们再打开DubboBeanDefinitionParser这个类。

    这里我们主要关注parse这个方法,这个方法逻辑比较多,我也没有读得十分清楚,不过大体意思就是拿到xml中所有配置的基本信息,然后定义成spring中的BeanDefinition。关于BeanDefinition,读者可以看这篇文章:https://blog.csdn.net/windrui/article/details/52973915。大体就是对spring的Bean的抽象,主要保存类名、scope、属性、构造函数参数列表、依赖的bean、是否是单例类、是否是懒加载等,其实就是将Bean的定义信息存储到这个BeanDefinition相应的属性中,后面对Bean的操作就直接对BeanDefinition进行,例如拿到这个BeanDefinition后,可以根据里面的类名、构造函数、构造函数参数,使用反射进行对象创建。

    这里我对其中一点比较感兴趣,就是service里面的ref参数:

    因为这里把一个interface映射到了一个可实例化的class,而且还是运行时bean的名字,所以我看了这个ref的解析实现,这个解析主要有两个:

    (1).写class属性,不写ref:

    这段的意思是如果用了class标签,spring会生成相应的class的BeanDefinition,并创建了一个BeanDefinitionHolder来代表运行时的bean,并且这个bean的名字是id+Impl。

    (2)写ref,不写class。

    如果有ref,就用RuntimeBeanReference作为创建时的bean。

    这样就可以将本来是interface,实例化的时候是另外一个bean,并且这个bean也会存在于ioc的Bean创建链条中。

    如果ref和class都写了,则以ref为准,因为ref的代码在后面,哈哈^_^

    以上就是dubbo如何从xml读取到定义成spring的BeanDefinition,接下来,我们再看一下bean实例化的时候是如何暴露服务的。

     

    2.serviceBean是何时暴露服务的

     

    我们看ServiceBean里面实现了ApplicationListener<ContextRefreshedEvent>,ApplicationContextAware,InitializingBean接口,这3个接口分别会在spring启动的不同时机依次调用,他们调用的为顺序 2 -> 1 -> 3:

    然后我们看下ServiceBean的三个接口:

    没什么好说的,初始化一些变量

    基本做的也是一些初始化的变量,这里关注一个方法:

    BeanFactoryUtils.beansOfTypeIncludingAncestors , 大概的作用就是获取这个上下文所有指定class的Bean,如上面的providerConfigMap,传入的参数就是ProviderConfig.class。

    然后主要关注一下最后的方法export:

    然后这个serviceBean实现的第3个接口最后执行,也没太多东西,其实也是调用export,估计是为了防止前面的方法没有执行吧!

    然后我们关注这个export,这个export方法就是将本地服务暴露给外面调用的过程,这样就保证了spring容器在初始化完成的时候,所有的serivceBean都暴露服务了。接下来,我们再看看export做了哪些事情。

     

    3.serviceBean暴露服务做了哪些事情

    我们看看export的源码:

    主要就判断该服务是否已经打开,以及是否需要延迟打开,实际逻辑在doExport里面,我们再看doExport的代码。

    别的没什么好说的,monitor是监控中心的配置,registries是注册中心配置,protocols 是发布者配置,别的暂不知道干嘛用,暂时不管,这些都会在checkDefault里面初始化provider这个变量的实话初始化,打开这个类看到

    主要就是host,port,环境路径等信息,然后这些信息都是从系统变量中拿的,看checkDefault里面就知道。

    然后取的都是系统变量里面的dubbo.前缀的变量,dubbo每个服务运行的时候都自动会设置这些参数吧(我猜)。另外这里还有个初始化这个类的测试单元,很好理解了。

    然后继续回到doExport。

    这里复习了一下Class.forName,具体内容可以参考:https://www.cnblogs.com/xingzc/p/5760166.html,主要就是把接口类加载进来。

    checkInterfaceAndMethods检查interfaceClass不能为空,必须是interface类型,验证方法必须存在。

    checkRef检查ref不能为空,必须实现interface接口,具体代码自己看了。

    然后后面checkApplication初始化application变量

    checkRegistry初始化registries变量

    checkProtocol初始化protocols变量

    appendProperties方法前面有提过

    checkStubAndMock方法不知道干嘛的,忽略

    主要看看doExportUrls方法

    然后loadRegistries

    这个方法是将registries变量里面的每个地址,拼上application和registryconfig里面的参数,拼成一个registryUrl(不是最后生成的url)带参数的标准格式,如:www.xxx.com?key1=value1&key2=value2。然后返回这些url的列表,自己一个服务生成的url例子:

    registryUrl:

    registry://172.23.2.101:2181/com.alibaba.dubbo.registry.RegistryService?application=oic-dubbo-provider&dubbo=2.6.1&logger=slf4j&pid=15258&register=true&registry=zookeeper&timestamp=1528958780785

     

     

    然后再遍历protocols变量,将protocols列表中的每个protocol根据url暴露出去,主要是doExportUrlsFor1Protocol方法。

    然后这个方法前期就一堆塞参数到map,最后也是跟上面生成registryUrl差不多,只不过多加了一些module,provider和自己的一些参数,拼成一个更长的url。下面这个就是我上面那个服务生成的完整url:

    dubbo://10.8.0.28:12000/com.tyyd.oic.service.PushMessageService?accepts=1000&anyhost=true&application=oic-dubbo-provider&bind.ip=10.8.0.28&bind.port=12000&buffer=8192&charset=UTF-8&default.service.filter=dubboCallDetailFilter&dubbo=2.6.1&generic=false&interface=com.tyyd.oic.service.PushMessageService&iothreads=9&logger=slf4j&methods=deletePushMessage,getPushMessage,batchPushMessage,addPushMessage,updatePushMessage,qryPushMessage&payload=8388608&pid=15374&queues=0&retries=0&revision=1.0.0&serialization=hessian2&side=provider&threadpool=fixed&threads=100&timeout=6000&timestamp=1528959454516&version=1.0.0

    上面可以看到,url地址包含了版本号,接口名,方法列表,序列化方法,过期时间等这个接口bean所有需要用到的上下文信息,并且地址头也由registry改成了dubbo。因为包含了所有上下文的信息,所以这个url的用处很大,后面看代码就知道了。

    这部分后面url还拼加了一些拓展类,监控url等信息,具体就不细看了,主要看看转成invoker和并创建exporter这个阶段

    这里的话,我比较好奇proxyFactory这个变量是如何生成的,这个根据官方文档说可能是javassistProxyFactory或者JdkProxyFactory中的任意一个,当然,这里是一个重点,就是动态代理,关于动态代理的知识可以看这里:https://www.cnblogs.com/gonjan-blog/p/6685611.html

    然后javassistProxyFactory和JdkProxyFactory都是动态代理的生成技术,只不过一个是用字节码生成,一个是用jdk生成。

    然后这个proxyFactory变量的生成我们看到:

    然后网上查了下,原来这里有个很牛批的技术,叫做SPI,相关说明参考:

    https://blog.csdn.net/qiangcai/article/details/77750541

    大致就是同一个接口,可以有不同的实现类,仅通过配置就可以动态修改具体用哪个实现类,而且这个拿到的实现还是单例模式

    具体dubbo中的说明文章中也有描述,可以看到

    proxyFactory接口写了SPI标签,所以这里默认使用的就是javassistProxyFactory。

     

    然后我们看看javassistProxyFactory的实现里面写了什么东西.

    这里生成了一个Wrapper,这个Wrapper是通过字节码生成的,大概是对于传入的proxy实现类进一步抽象,可以根据方法名,参数等去调用相应实现类的方法。更多可以看看这篇文章:

    https://blog.csdn.net/synpore/article/details/79148260

    然后wrapper被封装到一个Invoker里面,url主要放在invoker里面。

    然后回到前面,用生成的invoker再封装成一个DelegateProviderMetaDataInvoker,这个类跟invoker区别不大,只是多放了this这个serviceBean的信息,方便后面使用

     

    然后调用用portocol的export方法,我们看看portocol是哪来的

    这个是又是熟悉的SPI技术,我们再看看Portocol接口的SPI标签

    标签是dubbo,使用我打开DubboPortocol类

    然后上面有一些获取url的参数,还是上面那个例子,打断点看到的值是:

    具体怎么拿就不看了。下面主要看看openServer方法。

    这里也直接跟进来,看到具体的值了,然后到createServer里面

    也没太多东西,主要关注Exchangers.bind(url,requestHandler).

    然后我们一路跟进去,发现

    默认生成的Exchanger是headerExchanger。我们再看看HeaderExchanger.

    HeaderExchanger的参数是一个Transporters.bind参数,我们继续跟进去

    然后getTransporter方法继续跟:

    老套路了,我们看Transporter接口

    看到是nettyTransporter,继续跟:

    到这里,终于看到有点熟悉的NettyServer了,我们再看看NettyServer这个类

    这个方法用了父类AbstractServer构造函数,还有ChannelHandlers.wrap方法,我们先看看这个方法

    可以看到ChannelHandlers是一个单例模式,这里又用了SPI创建了Dispatcher类

    AllDispatcher.NAME 是值是all ,我们打开AllDispatcher类

    再看AllChannelHandler

    super的父类

    定义了一个线程池,SPI创建的,是fixThreadPool,就不进去看了(终于找到jdk基本的类了)

    然后关于fixThreadPool的可以看这篇:https://blog.csdn.net/czd3355/article/details/52608567

     

    还有个dataStore:

    大体就是一个ConcurrentMap套ConcurrentMap的类,这边就看到这里吧

     

    回到nettyServer的父类AbstractServer的构造方法

    然后还是之前那个服务,我们能看到用到了url里面的参数,所以说dubbo的所有服务的上下文基本都在生成的url里面了。 这里我们主要看下doOpen方法

    然后这里开始基本就是些跟jdk本身比较相关的地方了,还有就是终于看到netty相关的包了。

    这里dubbo封装了一个NamedThreadFactory(发现dubbo也用了好多工厂模式),打开这个类

    看到这个类自定义了类名前缀和,线程组,感觉是个不错的工厂类,以后可以拿给自己用,哈哈~

    然后后面就是netty的使用了,另外说一下,dubbo默认用的还是比较老的netty3,用法介绍:

    https://www.cnblogs.com/java-zhao/p/7625557.html 然后netty的详解看这篇:https://blog.csdn.net/haoyuyang/article/details/53231585

    netty是同步非阻塞的,阻塞非阻塞是针对应用程序而言的,同步非同步的是针对操作系统而言的。

    然后这里自己实现了一个netty的编解码器,dubbo这边封装了很多自己的类,剥丝抽茧其实还是很简单的,可以参考这篇:https://www.cnblogs.com/ll409546297/p/8036954.html

    然后dubbo编码序列化效率的分析可以看这篇文章:https://blog.csdn.net/moonpure/article/details/53175519

    具体的源码我就不细看了,默认的序列化协议是hessian2.

     

    然后回到最前面,想想我们走了这么远到底做了什么?

    是不是给一个service标签的serviceBean生成了一个代理好的Invoker,这个invoker放在一个叫exporter的对象下,然后这个exporter放在了serviceBean的一个exporters变量下,准备被调用,然后创建的nettyServer放在了serviceBean的变量protocol下的一个变量serverMap里面,这样一个serverBean的netty服务,方法代理类是不是都生成好了。这里注意protocol是单例生成的,所以如果有bean打开过nettyServer,别的bean就不会再打开。

    然后回到很前面的ServiceConfig的doExport里面:

    发现他把生成好了serviceBean放到了一个ApplicationModel里面,后面consumer也会放到这里面,具体不知道干嘛用的。

    以上就是粗略看一个服务暴露的过程。有很多细节需要注意。

    展开全文

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 3,065
精华内容 1,226
关键字:

dubbo服务暴露