精华内容
下载资源
问答
  • dubbo远程调用源码分析,分成了三篇文章地址分别如下:dubbo远程调用源码分析(一):客户端发送请求dubbo远程调用源码分析(二):服务端接收请求dubbo远程调用源码分析(三):客户端接收反馈后的处理本文分为三...

    dubbo远程调用的源码分析,分成了三篇文章地址分别如下:

    dubbo远程调用源码分析(一):客户端发送请求

    dubbo远程调用源码分析(二):服务端接收请求

    dubbo远程调用源码分析(三):客户端接收反馈后的处理


    本文分为三部分,分别是:

    消费端注册部分

    消费端动态代理部分

    消费端事件处理器部分


    消费端注册部分

    在分析dubbo远程调用的时候,要从dubbo消费端(consumer)注册开始说起

    dubbo的consumer在注册的时候,在配置文件中是用以下形式注册的:

    <dubbo:reference id="sequenceService" interface="SequenceService"/>

    在服务启动的时候,这个配置项会生成一个ReferenceBean的对象,当我们在代码中使用SequenceService时,实际上我们使用的是这个ReferenceBean对象调用getObject()方法之后返回的对象,这个对象就是SequenceService的代理对象。

    先看一下ReferenceBean类的getObject()方法:

       public Object getObject() throws Exception {
    
            return get();
    
       }

    get()方法在ReferenceBean的父类ReferenceConfig中:

       public synchronized T get() {
    
            if (destroyed) {  
    
                throw new IllegalStateException("Already destroyed!");
    
            }
    
            if (ref == null) {
    
                init();
    
            }
    
            return ref;
    
       }

    返回的这个ref就是代理类,下面看一下用来初始化的init()方法:

       private void init() {
    
            if (initialized) {
    
                return;
    
            }
    
            initialized = true;
    
            if (interfaceName == null ||interfaceName.length() == 0) {
    
                throw new IllegalStateException("<dubbo:reference interface=\"\" />interface not allow null!");
    
            }
    
            // 获取消费者全局配置
    
            checkDefault();
    
            appendProperties(this);
    
            if (getGeneric() == null &&getConsumer() != null) {
    
               setGeneric(getConsumer().getGeneric());
    
            }
    
            if(ProtocolUtils.isGeneric(getGeneric())) {
    
                interfaceClass =GenericService.class;
    
            } else {
    
                try {
    
                    interfaceClass =Class.forName(interfaceName, true, Thread.currentThread()
    
                           .getContextClassLoader());
    
                } catch (ClassNotFoundException e){
    
                    throw newIllegalStateException(e.getMessage(), e);
    
                }
    
               checkInterfaceAndMethods(interfaceClass, methods);
    
            }
    
            String resolve =System.getProperty(interfaceName);
    
            String resolveFile = null;
    
            if (resolve == null || resolve.length()== 0) {
    
                resolveFile =System.getProperty("dubbo.resolve.file");
    
                if (resolveFile == null ||resolveFile.length() == 0) {
    
                    File userResolveFile = newFile(new File(System.getProperty("user.home")),"dubbo-resolve.properties");
    
                    if (userResolveFile.exists()) {
    
                        resolveFile =userResolveFile.getAbsolutePath();
    
                    }
    
                }
    
                if (resolveFile != null &&resolveFile.length() > 0) {
    
                    Properties properties = newProperties();
    
                    FileInputStream fis = null;
    
                    try {
    
                        fis = newFileInputStream(new File(resolveFile));
    
                        properties.load(fis);
    
                    } catch (IOException e) {
    
                        throw new IllegalStateException("Unload " + resolveFile + ", cause: "+ e.getMessage(), e);
    
                    } finally {
    
                        try {
    
                            if (null != fis)fis.close();
    
                        } catch (IOException e) {
    
                           logger.warn(e.getMessage(), e);
    
                        }
    
                    }
    
                    resolve =properties.getProperty(interfaceName);
    
                }
    
            }
    
            if (resolve != null &&resolve.length() > 0) {
    
               url = resolve;
    
                if (logger.isWarnEnabled()) {
    
                    if (resolveFile != null&& resolveFile.length() > 0) {
    
                        logger.warn("Usingdefault dubbo resolve file " + resolveFile + " replace " +interfaceName + "" + resolve + " to p2p invoke remoteservice.");
    
                    } else {
    
                        logger.warn("Using-D" + interfaceName + "=" + resolve + " to p2p invokeremote service.");
    
                    }
    
                }
    
            }
    
            if (consumer != null) {
    
               if (application == null){
    
                    application =consumer.getApplication();
    
                }
    
                if (module == null) {
    
                    module = consumer.getModule();
    
                }
    
                if (registries == null) {
    
                    registries =consumer.getRegistries();
    
                }
    
                if (monitor == null) {
    
                    monitor =consumer.getMonitor();
    
                }
    
            }
    
            if (module != null) {
    
                if (registries == null) {
    
                    registries = module.getRegistries();
    
                }
    
                if (monitor == null) {
    
                    monitor = module.getMonitor();
    
                }
    
            }
    
            if (application != null) {
    
                if (registries == null) {
    
                    registries =application.getRegistries();
    
                }
    
                if (monitor == null) {
    
                    monitor =application.getMonitor();
    
                }
    
            }
    
            checkApplication();
    
            checkStubAndMock(interfaceClass);
    
            Map<String, String> map = new HashMap<String, String>();
    
            Map<Object, Object> attributes =new HashMap<Object, Object>();
    
            map.put(Constants.SIDE_KEY,Constants.CONSUMER_SIDE);
    
            map.put(Constants.DUBBO_VERSION_KEY,Version.getVersion());
    
           map.put(Constants.TIMESTAMP_KEY,String.valueOf(System.currentTimeMillis()));
    
            if (ConfigUtils.getPid() > 0) {
    
                map.put(Constants.PID_KEY,String.valueOf(ConfigUtils.getPid()));
    
            }
    
            if (!isGeneric()) {
    
                String revision =Version.getVersion(interfaceClass, version);
    
                if (revision != null &&revision.length() > 0) {
    
                    map.put("revision",revision);
    
                }
    
     
    
                String[] methods =Wrapper.getWrapper(interfaceClass).getMethodNames();
    
                if (methods.length == 0) {
    
                    logger.warn("NO method found in service interface " + interfaceClass.getName());
    
                    map.put("methods",Constants.ANY_VALUE);
    
                } else {
    
                    map.put("methods",StringUtils.join(new HashSet<String>(Arrays.asList(methods)),","));
    
                }
    
            }
    
            map.put(Constants.INTERFACE_KEY,interfaceName);
    
            appendParameters(map, application);
    
            appendParameters(map, module);
    
            appendParameters(map, consumer,Constants.DEFAULT_KEY);
    
            appendParameters(map, this);
    
            String prifix =StringUtils.getServiceKey(map);
    
            if (methods != null &&methods.size() > 0) {
    
                for (MethodConfig method : methods){
    
                    appendParameters(map, method,method.getName());
    
                    String retryKey =method.getName() + ".retry";
    
                    if (map.containsKey(retryKey)){
    
                        String retryValue =map.remove(retryKey);
    
                        if ("false".equals(retryValue)) {
    
                           map.put(method.getName() + ".retries", "0");
    
                        }
    
                    }
    
                    appendAttributes(attributes,method, prifix + "." + method.getName());
    
                    checkAndConvertImplicitConfig(method,map, attributes);
    
                }
    
            }
    
            //attributes通过系统context进行存储.
    
           StaticContext.getSystemContext().putAll(attributes);
    
            ref = createProxy(map);
    
       }

    基本意思就是集成各种配置,比如类名、方法、版本等等,组成一个map,最终通过createProxy()方法生成代理类,createProxy()方法的代码如下:

       @SuppressWarnings({"unchecked", "rawtypes","deprecation"})
    
       private T createProxy(Map<String, String> map) {
    
            URL tmpUrl = new URL("temp","localhost", 0, map);
    
            final boolean isJvmRefer;
    
            if (isInjvm() == null) {
    
                if (url != null &&url.length() > 0) { //指定URL的情况下,不做本地引用
    
                    isJvmRefer = false;
    
                } else if(InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
    
                    //默认情况下如果本地有服务暴露,则引用本地服务.
    
                    isJvmRefer = true;
    
                } else {
    
                    isJvmRefer = false;
    
                }
    
            } else {
    
                isJvmRefer =isInjvm().booleanValue();
    
            }
    
     
    
            if (isJvmRefer) {
    
                URL url = new URL(Constants.LOCAL_PROTOCOL,NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
    
                invoker =refprotocol.refer(interfaceClass, url);
    
                if (logger.isInfoEnabled()) {
    
                    logger.info("Using injvmservice " + interfaceClass.getName());
    
                }
    
            } else {
    
                if (url != null &&url.length() > 0) { // 用户指定URL,指定的URL可能是对点对直连地址,也可能是注册中心URL
    
                    String[] us =Constants.SEMICOLON_SPLIT_PATTERN.split(url);
    
                    if (us != null &&us.length > 0) {
    
                        for (String u : us) {
    
                            URL url =URL.valueOf(u);
    
                            if (url.getPath() ==null || url.getPath().length() == 0) {
    
                                url =url.setPath(interfaceName);
    
                            }
    
                            if(Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
    
                               urls.add(url.addParameterAndEncoded(Constants.REFER_KEY,StringUtils.toQueryString(map)));
    
                            } else {
    
                               urls.add(ClusterUtils.mergeUrl(url, map));
    
                            }
    
                        }
    
                    }
    
                } else { // 通过注册中心配置拼装URL
    
                    List<URL> us =loadRegistries(false);
    
                    if (us != null &&us.size() > 0) {
    
                        for (URL u : us) {
    
                            URL monitorUrl =loadMonitor(u);
    
                            if (monitorUrl != null){
    
                               map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
    
                            }
    
                           urls.add(u.addParameterAndEncoded(Constants.REFER_KEY,StringUtils.toQueryString(map)));
    
                        }
    
                    }
    
                    if (urls == null || urls.size()== 0) {
    
                        throw newIllegalStateException("No such any registry to reference " +interfaceName + " on the consumer " + NetUtils.getLocalHost() +" use dubbo version " + Version.getVersion() + ", please config<dubbo:registry address=\"...\" /> to your springconfig.");
    
                    }
    
                }
    
     
    
                if (urls.size() == 1) {
    
                    invoker =refprotocol.refer(interfaceClass, urls.get(0));
    
                } else {
    
                    List<Invoker<?>>invokers = new ArrayList<Invoker<?>>();
    
                   URL registryURL =null;
    
                    for (URL url : urls) {
    
                       invokers.add(refprotocol.refer(interfaceClass, url));
    
                        if(Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
    
                            registryURL = url; // 用了最后一个registryurl
    
                        }
    
                    }
    
                    if (registryURL != null) { // 有 注册中心协议的URL
    
                        // 对有注册中心的Cluster只用 AvailableCluster
    
                        URL u = registryURL.addParameter(Constants.CLUSTER_KEY,AvailableCluster.NAME);
    
                        invoker = cluster.join(new StaticDirectory(u, invokers));
    
                    } else { // 不是 注册中心的URL
    
                        invoker = cluster.join(new StaticDirectory(invokers));
    
                    }
    
                }
    
            }
    
     
    
            Boolean c = check;
    
            if (c == null && consumer !=null) {
    
                c = consumer.isCheck();
    
            }
    
            if (c == null) {
    
                c = true; // default true
    
            }
    
            if (c && !invoker.isAvailable()){
    
                throw new IllegalStateException("Failed to check the status of the service " +interfaceName + ". No provider available for the service " + (group== null ? "" : group + "/") + interfaceName + (version ==null ? "" : ":" + version) + " from the url " +invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() +" use dubbo version " + Version.getVersion());
    
            }
    
            if (logger.isInfoEnabled()) {
    
                logger.info("Refer dubboservice " + interfaceClass.getName() + " from url " +invoker.getUrl());
    
            }
    
            // 创建服务代理
    
            return (T)proxyFactory.getProxy(invoker);
    
       }

    一开始调用isInjvm()方法判断目标接口是否在本地就有,如果本地就有,直接调用本地的接口。

    如果本地没有,就在配置中找有没有用户指定的url,如果指定了就使用用户指定的url提供的接口。

    如果没有指定url,则从注册中心中获得目标url列表。

    如果urls.size()==1,则直接用这个url获得invoker,这个invoker就是最后用来创建动态代理用的。

    当urls.size()>1时,有registryURL属性,如果配置了注册中心协议Protocol,则只用AvailableCluster得到invoker。

    cluster.join()方法是用来获得invoker的,cluster属性的定义:

    private transient volatile Invoker<?> invoker;

    Invoker是个接口,根据配置的不同会使用不同的实现类,比如上面的AvailableCluster,他的join()方法是这样的:

       public <T> Invoker<T> join(Directory<T> directory)throws RpcException {
    
            return new AbstractClusterInvoker<T>(directory) {
    
                public Result doInvoke(Invocationinvocation, List<Invoker<T>> invokers, LoadBalance loadbalance)throws RpcException {
    
                    for (Invoker<T> invoker :invokers) {
    
                        if (invoker.isAvailable()){
    
                            returninvoker.invoke(invocation);
    
                        }
    
                    }
    
                    throw new RpcException("No provider available in " + invokers);
    
                }
    
            };
    
       }

    实际上这个join()方法返回了一个AbstractClusterInvoker对象,并重写了他的doInvoke()方法,这个方法在动态代理实际被调用时会用到。

    现在回到createProxy()方法,最后用得到的invoker通过proxyFactory创建动态代理,至此动态代理就创建完了。

     

    消费端动态代理部分

    当我们在代码中配置好的SequenceService进行远程调用时,实际上调用的是对应Invoker的invoke()方法,invoker是一个接口,对于这个接口的实现大概是这样的:

    Invoker

    ----AbstractInvoker

    ----AbstractClusterInvoker

    ----AbstractProxyInvoker

    ----DelegateInvoker

    ----MockClusterInvoker

    ----MergeableClusterInvoker

    ----InvokerListenerAdapter

    ----InvokerListenerAdapter

    ……

    还有很多

    AbstractInvoker就是用来远程通信的Invoker

    AbstractClusterInvoker是provider是集群时使用Invoker,比AbstractInvoker多了负载均衡,选择provider的过程,最终确定了调用的provider之后还是会调用AbstractInvoker中的invoke()方法。

    我们先看AbstractClusterInvoker的invoke()方法:

        public Result invoke(final Invocation invocation) throws RpcException {
    
    
           checkWhetherDestroyed();
    
    
            LoadBalanceloadbalance;
     
    
           List<Invoker<T>> invokers = list(invocation);
    
            if (invokers !=null && invokers.size() > 0) {
    
                loadbalance =ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
    
                       .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
    
            } else {
    
                loadbalance=ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
    
            }
    
           RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    
            return doInvoke(invocation, invokers, loadbalance);
    
        }

    首先判断当前consumer是否已经destory了,然后用list(invocation)方法获得所有的provider信息,获得负载均衡算法LoadBalance,设置同步属性,最后调用doInvoke方法。

    AbstractClusterInvoker的doInvoke()方法是个抽象方法:

    protected abstract Result doInvoke(Invocation invocation,List<Invoker<T>> invokers,
    
                                          LoadBalance loadbalance) throws RpcException;

    他的子类有很多,比如:

    AvailableClusterInvoker 选择第一个可用的provider。 

    FailBackClusterInvoker失败自动恢复,后台记录失败请求,定时重发,通常用于消息通知操作。

    FailfastClusterInvoker快速失败,只发起一次调用,失败立即报错,通常用于非幂等性的写操作。

    FailoverClusterInvoker失败转移,当出现失败,重试其它服务器,通常用于读操作,但重试会带来更长延迟。

    FailsafeClusterInvoker失败安全,出现异常时,直接忽略,通常用于写入审计日志等操作。

    ForkingClusterInvoker并行调用,只要一个成功即返回,通常用于实时性要求较高的操作,但需要浪费更多服务资源。

    具体使用哪个得看配置


    我们以之前提到的AvailableClusterInvoker为例,看一下doInvoke()方法:

        public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,LoadBalance loadbalance) throws RpcException {
    
            for(Invoker<T> invoker : invokers) {
    
                if(invoker.isAvailable()) {
    
                    return invoker.invoke(invocation);
    
                }
    
            }
    
            throw new RpcException("No provider available in " + invokers);
    
        }

    就是判断invoker是否可用,可用就直接调用invoker的invoke()方法,实际上调用的还是AbstractInvoker的invoke()方法,如果不是集群就直接调这个方法了,该方法代码如下:

        public Result invoke(Invocation inv) throws RpcException {
    
            if(destroyed.get()) {
    
                throw new RpcException("Rpc invoker for service " + this + " on consumer" + NetUtils.getLocalHost()
    
                        +" use dubbo version " + Version.getVersion()
    
                        +" is DESTROYED, can not be invoked any more!");
    
            }
    
            RpcInvocation invocation = (RpcInvocation) inv;
    
           invocation.setInvoker(this);
    
            if (attachment!= null && attachment.size() > 0) {
    
                invocation.addAttachmentsIfAbsent(attachment);
    
            }
    
            Map<String,String> context = RpcContext.getContext().getAttachments();
    
            if (context !=null) {
    
               invocation.addAttachmentsIfAbsent(context);
    
            }
    
            if(getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY,false)) {
    
               invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
    
            }
    
           RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    
    
            try {
    
                return doInvoke(invocation);
    
            } catch(InvocationTargetException e) { // biz exception
    
                Throwablete = e.getTargetException();
    
                if (te ==null) {
    
                    return new RpcResult(e);
    
                } else {
    
                    if (te instanceof RpcException) {
    
                       ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
    
                    }
    
                    return new RpcResult(te);
    
                }
    
            } catch(RpcException e) {
    
                if(e.isBiz()) {
    
                    return new RpcResult(e);
    
                } else {
    
                    throw e;
    
                }
    
            } catch(Throwable e) {
    
                return new RpcResult(e);
    
            }
    
        }

    还是先判断consumer是否是destory的,其实destroyed是destory的过去分词,不是人家拼错了。

    然后经历一堆和AbstractClusterInvoker的invoke一样的参数设置,最后调用doInvoke()方法,而且这个方法在这个Invoker里面也是抽象的。

    AbstractInvoker的doInvoke()方法在DubboInvoker类里面有具体实现,这个DubboInvoker是AbstractInvoker的子类,doInvoke()方法如下:

        @Override
    
        protected Result doInvoke(final Invocation invocation) throws Throwable {
    
            RpcInvocation inv = (RpcInvocation) invocation;
    
            final String methodName = RpcUtils.getMethodName(invocation);
    
           inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
    
           inv.setAttachment(Constants.VERSION_KEY, version);
    
    
            ExchangeClient currentClient;
    
            if(clients.length == 1) {
    
               currentClient = clients[0];
    
            } else {
    
               currentClient = clients[index.getAndIncrement() % clients.length];
    
            }
    
            try {
    
                boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
    
                boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
    
                int timeout= getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
    
                if(isOneway) {
    
                    boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
    
                    currentClient.send(inv, isSent);
    
                   RpcContext.getContext().setFuture(null);
    
                    return new RpcResult();
    
                } else if(isAsync) {
    
                   ResponseFuture future = currentClient.request(inv, timeout);
    
                    RpcContext.getContext().setFuture(newFutureAdapter<Object>(future));
    
                    return new RpcResult();
    
                } else {
    
                   RpcContext.getContext().setFuture(null);
    
                    return(Result) currentClient.request(inv, timeout).get();
    
                }
    
            } catch(TimeoutException e) {
    
                throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: "+ getUrl() + ", cause: " + e.getMessage(), e);
    
            } catch(RemotingException e) {
    
                throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " +getUrl() + ", cause: " + e.getMessage(), e);
    
            }
    
        }

    在经过一堆的设置参数(地址、版本)之后,dubbo获得了两个参数,isAsync和isOneway,isAsync为true时代表异步调用,isOneway为true时代表没有返回值。

    当isOneway为true时,调用send()方法然后返回一个空的RpcResult,ExchangeClient的send()方法就是用来把消息发给provider的,send()方法的返回值类型是void。

    而当isAsync为true时,设置了一个ResponseFuture之后返回一个空的RpcResult

    最后的else就是普通的同步调用,不需要设置Future,一直等到provider端返回处理结果,currentClient.request方法负责把请求发出。

     ExchangeClient是个接口,request()方法的实现类在HeaderExchangeClient类中,HeaderExchangeClient的request()方法只有一行,直接调用了HeaderExchangeChannel的request方法,这个request方法如下:

        public ResponseFuture request(Object request, int timeout) throws RemotingException {
    
            if (closed) {
    
                throw new RemotingException(this.getLocalAddress(), null, "Failed to send request" + request + ", cause: The channel " + this + " isclosed!");
    
            }
    
            // create request.
    
            Request req =new Request();
    
           req.setVersion("2.0.0");
    
           req.setTwoWay(true);
    
           req.setData(request);
    
            DefaultFuture future = new DefaultFuture(channel, req, timeout);
    
            try {
    
               channel.send(req);
    
            } catch(RemotingException e) {
    
               future.cancel();
    
                throw e;
    
            }
    
            return future;
    
        }

    其中的channel就是dubbo集成的Netty的Channel类,负责服务器间消息传输,这个类在dubbo中和netty中都能找到,这里调用了他的send()方法。

    Channel的send()方法来自EndPoint接口

    Channel接口实现了EndPoint接口

    AbstractChannel抽象类实现了Channel接口,然而他的send()方法的功能只是判断当前channel是否已关闭

        public void send(Object message, boolean sent) throws RemotingException {
    
            if (isClosed()){
    
                throw new RemotingException(this, "Failed to send message "
    
                        +(message == null ? "" : message.getClass().getName()) + ":"+ message
    
                        +", cause: Channel closed. channel: " + getLocalAddress() + "-> " + getRemoteAddress());
    
            }
    
        }

    最后NettyChannel类继承了AbstractChannel类,重写了父类的send()方法,代码如下:

        public void send(Object message, boolean sent) throws RemotingException {
    
           super.send(message, sent);
    
    
            boolean success= true;
    
            int timeout =0;
    
            try {
    
               ChannelFuture future = channel.write(message);
    
                if (sent) {
    
                    timeout= getUrl().getPositiveParameter(Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
    
                    success= future.await(timeout);
    
                }
    
                Throwable cause = future.getCause();
    
                if (cause!= null) {
    
                    throw cause;
    
                }
    
            } catch(Throwable e) {
    
                throw new RemotingException(this, "Failed to send message " + message + "to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
    
            }
    
     
    
            if (!success) {
    
                throw new RemotingException(this, "Failed to send message " + message + "to " + getRemoteAddress()
    
                        +"in timeout(" + timeout + "ms) limit");
    
            }
    
        }

    一开始调用了父类的send()方法,判断是否关闭

    channel.write()方法就是Channel负责发送消息的方法,至此,消息只要再通过一些事件处理器(主要是编码),就可以发到provider端了。

     

    消费端事件处理器部分

    NettyClient在初始化时添加了三个事件处理器用来处理发送消息和接收消息的事件,分别是NettyCodecAdapter.DeCoder,NettyCodecAdapter.Encoder,NettyHandler,代码在NettyClient类的doOpen()方法里:

        @Override
    
        protected void doOpen() throws Throwable {
    
           NettyHelper.setNettyLoggerFactory();
    
            bootstrap = new ClientBootstrap(channelFactory);
    
            // config
    
            // @see org.jboss.netty.channel.socket.SocketChannelConfig
    
            bootstrap.setOption("keepAlive",true);
    
           bootstrap.setOption("tcpNoDelay", true);
    
           bootstrap.setOption("connectTimeoutMillis", getTimeout());
    
            final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    
           bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
    
                public ChannelPipeline getPipeline() {
    
                   NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(),NettyClient.this);
    
                   ChannelPipeline pipeline = Channels.pipeline();
    
                   pipeline.addLast("decoder", adapter.getDecoder());
    
                   pipeline.addLast("encoder", adapter.getEncoder());
    
                   pipeline.addLast("handler", nettyHandler);
    
                    returnpipeline;
    
                }
    
            });
    
        }

    几种事件处理器的在添加时顺序是DeCoder,Encoder,NettyHandler。

    当线程给对方发送信息时,叫做下行事件,下行事件会先经过NettyHandler再经过Encoder。

    当线程接收对方发来的信息时,叫做上行事件,上行事件会先经过DeCoder再经过NettyHandler。

    在调用Channel.write()时,会调用事件处理器中的NettyHandler和Encoder,反过来当provider给consumer返回信息时调用的是DeCoder和NettyHandler。

     

    第一步,NettyHandler的writeRequested方法会首先被调用:

        @Override
    
        public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
    
           super.writeRequested(ctx, e);
    
            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
    
            try {
    
               handler.sent(channel, e.getMessage());
    
            } finally {
    
               NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
    
            }
    
        }

    这里是进行一些dubbo的回调功能。

    第二步是调用NettyCodecAdapter.Encoder,encoder的定义和实现类就在NettyCodecAdapter类中:

        private final ChannelHandler encoder = new InternalEncoder();
    
    
        private final Codec2codec;
    
    
        @Sharable
    
        private class InternalEncoder extends OneToOneEncoder {
    
    
            @Override
    
            protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception {
    
               com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
    
                        com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(1024);
    
               NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
    
                try {
    
                   codec.encode(channel, buffer, msg);
    
                } finally {
    
                   NettyChannel.removeChannelIfDisconnected(ch);
    
                }
    
                return ChannelBuffers.wrappedBuffer(buffer.toByteBuffer());
    
            }
    
        }

    Codec2是提供encode和decode的接口,该接口由DubboCodec类实现,而具体的实现代码在DubboCodec类的父类ExchangeCodec中:

        public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
    
            if (msg instanceof Request) {
    
               encodeRequest(channel, buffer, (Request) msg);
    
            } else if (msg instanceof Response) {
    
                encodeResponse(channel,buffer, (Response) msg);
    
            } else {
    
               super.encode(channel, buffer, msg);
    
            }
    
        }

    在发起调用时,msg是一个Request,调用encodeRequest()方法:

        protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
    
            Serialization serialization = getSerialization(channel);
    
            // header.
    
            byte[] header =new byte[HEADER_LENGTH];
    
            // set magicnumber.
    
           Bytes.short2bytes(MAGIC, header);
    
     
            // set requestand serialization flag.
    
            header[2] =(byte) (FLAG_REQUEST | serialization.getContentTypeId());
    
    
            if(req.isTwoWay()) header[2] |= FLAG_TWOWAY;
    
            if (req.isEvent()) header[2] |= FLAG_EVENT;
    
     
            // set requestid.
    
           Bytes.long2bytes(req.getId(), header, 4);
    
     
            // encoderequest data.
    
            int savedWriteIndex = buffer.writerIndex();
    
           buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
    
           ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
    
            ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
    
            if(req.isEvent()) {
    
               encodeEventData(channel, out, req.getData());
    
            } else {
    
               encodeRequestData(channel, out, req.getData());
    
            }
    
           out.flushBuffer();
    
            bos.flush();
    
            bos.close();
    
            int len =bos.writtenBytes();
    
           checkPayload(channel, len);
    
           Bytes.int2bytes(len, header, 12);
    
     
    
            // write
    
           buffer.writerIndex(savedWriteIndex);
    
           buffer.writeBytes(header); // write header.
    
           buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
    
        }

    这个方法写的是把请求序列化为二进制的过程,其中的encodeRequestData()方法,经过了好几个只有一行的方法调用,最终执行的是ExchangeCodec类的如下方法:

        protected void encodeRequestData(ObjectOutput out, Object data) throws IOException {
    
           out.writeObject(data);
    
        }

    至此数据会被发送到provider端。

    展开全文
  • Nova中RPC远程过程调用 nova-compute RPC API的实现 novacomputemanager 模块 最后Nova Project Services nova-api:捕获novaclient发送过来的HTTP请求,并且将它转换为AMQP消息,通过Queue来与别的services

    目录

    Nova Project Services

    这里写图片描述

    • nova-api:捕获novaclient发送过来的HTTP请求,并且将它转换为AMQP消息,通过Queue来与别的services通信。

    • nova-conductor:为数据库访问提供了一层安全保障。
      NOTE:除了nova-conductor可以访问数据库之外,因为nova-scheduler是只读数据库,而nova-api对数据库的操作有Policy保护,所以它们也都是可以访问数据库的。但是最好还是仅通过

    展开全文
  • 风险管理信息 带有命令的远程方法调用 Chat-Server-Application 写在 Technische Hochschule Mittelhessen 的模块操作系统中
  • NULL 博文链接:https://quicker.iteye.com/blog/853016
  • 一个使用Node.js编写微服务的模块,使用redis pub / sub抽象从一个节点服务器到另一个节点服务器远程过程调用。 在你开始之前 它能做什么? 假设您有一个由两个节点服务器组成的应用程序。 处理用户和路由的...
  • dubbo源码浅析-远程服务调用流程

    千次阅读 2018-05-29 17:22:49
    转载自:dubbo源码浅析(五)-远程服务调用流程非商业转载,如造成侵权,请联系本人删除消费端调用远程服务接口时,使用上和调用普通的java接口是没有任何区别,但是服务消费者和提供者是跨JVM和主机的,客户端如何...

    转载自:dubbo源码浅析(五)-远程服务调用流程

    非商业转载,如造成侵权,请联系本人删除

    文中标红色的文字都是dubbo调用到netty框架的地方

    消费端调用远程服务接口时,使用上和调用普通的java接口是没有任何区别,但是服务消费者和提供者是跨JVM和主机的,客户端如何封装请求让服务端理解请求并且解析服务端返回的接口调用结果,服务端如何解析客户端的请求并且向客户端返回调用结果,这些框架是如何实现的,下面就来看下这部分的代码。 
    消费端调用提供端服务的过程要执行下面几个步骤: 
    1. 消费端触发请求 
    2. 消费端请求编码 
    3. 提供端请求解码 
    4. 提供端处理请求 
    5. 提供端响应结果编码 
    6. 消费端响应结果解码

    消费端触发请求

    在消费者初始化的时候,会生成一个消费者代理注册到容器中,该代理回调中持有一个MockClusterInvoker实例,消费调用服务接口时它的invoke会被调用,此时会构建一个RpcInvocation对象,把服务接口的method对象和参数放到RpcInvocation对象中,作为MockClusterInvoker.invoke方法的参数,在这个invoke方法中,判断请求是否需要mock,是否配置了mock属性,是强制mock还是失败后mock,关于mock这里先不详细展开,这里只看下核心流程。 
    MockClusterInvoker.invoke会调用FailfastClusterInvoker.invoke,大系统中为了服务高可用同一个服务一般会有多个应用服务器提供,要先挑选一个提供者提供服务。在服务接口消费者初始化时,接口方法和提供者Invoker对应关系保存在RegistryDirectory的methodInvokerMap中,通过调用的方法名称(或方法名称+第一个参数)改方法对应的提供者invoker列表,如注册中心设置了路由规则,对这些invoker根据路由规则进行过滤。 
    com.alibaba.dubbo.registry.integration.RegistryDirectory.doList(Invocation) 
    这里写图片描述 
    com.alibaba.dubbo.rpc.cluster.directory.AbstractDirectory.list(Invocation) 
    这里写图片描述 
    读取到所有符合条件的服务提供者invoker之后,由LoadBalance组件执行负载均衡,从中挑选一个invoker进行调用,框架内置支持的负载均衡算法包括random(随机)、roundrobin(R-R循环)、leastactive(最不活跃)、consistenthash(一致性hash),应用可配置,默认random。 
    methodInvokerMap保存的是持有DubboInvoker(dubbo协议)实例的InvokerDelegete对象,是Invoker-Filter链的头部,先激活Filter连然后最终调到DubboInvoker.invoke(RpcInvocation),此时远程调用分三种类型: 
    1. 单向调用,无需获取关注调用结果的,无需等待接口返回结果,注意调用结果不要单纯跟返回值混淆了,异常也是调用结果。 
    2. 异步调用,需要关注返回结果,但是不会同步等待接口调用结束,会异步的获取返回返回结果,这种情况给调用者返回一个Future,但是不同步等待Future.get返回调用结果 
    3. 同步调用,需要同步等待服务调用结束获取调用结果,给调用者返回一个Future并且Future.get等待结果,此时接口调用线程会挂起等待响应。 

    这里写图片描述 
    我们大部分使用场景都是同步调用,所以主要看一下同步调用。如果使用者配置了多个connections按顺序选择一个ExchangeClient和服务器通信,同步调用时调用HeaderExchangeClient.request->HeaderExchangeChannel.request。

    com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel.request(Object, int) 
    这里写图片描述 
    这里的request参数是RpcInvocation对象,包含调用的方法、参数等信息,timeout参数是接口超时时间,把这些信息封装在Request对象中,调用channel.send,这个channel对象就是和服务端打交道的NettyClient实例,NettyClient.send调用NettyChannel.send。

    com.alibaba.dubbo.remoting.transport.netty.NettyChannel.send(Object, boolean) 
    这里写图片描述 
    这里的sent参数决定是否等待请求消息发出,sent=true 等待消息发出,消息发送失败将抛出异常,sent=false 不等待消息发出,将消息放入IO队列,即刻返回。默认情况下都是false。NettyChannel中有channel属性,这个channel是Netty框架中的组件,负责客户端和服务端链路上的消息传递,channel.write把请求消息写入,这里的message是上面封装的Request对象。这里的IO模型是非阻塞的,线程不用同步等待所有消息写完,而是直接返回。调用Netty框架的IO事件之后会触发Netty框架的IO事件处理链。

    消费端请求编码

    在消费者初始化创建NettyClient时了解到了,NettyClient添加了三个事件处理器组成处理器链:NettyCodecAdapter.decoder->NettyCodecAdapter.encoder->NettyHandler,其中NettyCodecAdapter.encoder下行事件处理器(实现了netty3的ChannelDownstreamHandler接口),NettyCodecAdapter. decoder是上行事件处理器(实现了netty3的ChannelUpstreamHandler接口),NettyHandler是上行事件+下行时间处理器(同时实现了netty3的ChannelUpstreamHandler和ChannelDownstreamHandler接口)。channel.write在Netty框架中是一个下行事件,所以NettyCodecAdapter.encoder和NettyHandler处理器会被回调,下行事件的事件处理器调用顺序是从后到前,即后添加的处理器先执行。 
    NettyHandler没有对请求消息做任何加工,只是触发dubbo框架的一些回调,这些回调里面没有做任何核心的事情,

    com.alibaba.dubbo.remoting.transport.netty.NettyHandler.writeRequested(ChannelHandlerContext, MessageEvent) 
    这里写图片描述 
    encoder顾名思义就是编码器,它的主要工作就是把数据按照客户端-服务端的约定协议对请求信息和返回结果进行编码。看下它的encode方法: 
    这里写图片描述 
    下行事件触发之后依次调用handleDownstream->doEncode->encode,在encode中对Request对象进行编码。这个msg参数就是上面被write的Request对象,这里的Codec2组件是DubboCountCodec实现,DubboCountCodec.encode调用DubboCodec.Encode

    com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec.encode(Channel, ChannelBuffer, Object) 
    这里写图片描述 
    根据协议,消息中写入16个字节的消息头: 
    1、1-2字节,固定的魔数 
    2、第3个字节,第8位存储数据类型是请求数据还是响应数据,其它7位存储序列化类型,约定和服务端的序列化-反序列化协议 
    3、5-12个字节,请求id 
    4、13-16个字节,请求数据长度

    com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec.encodeRequest(Channel, ChannelBuffer, Request) 
    这里写图片描述 
    从URL中查找序列化扩展点名称,加载序列化组件把请求对象序列化成二进制。消费端和提供端的序列化反序列化协议要配套,所以这个序列化协议一般是在提供端指定的,指定的协议类型会在提供者和消费者初始化的时候写入到URL对象中,框架中默认的序列化协议是hessian2。消息体数据包含dubbo版本号、接口名称、接口版本、方法名称、参数类型列表、参数、附加信息,把它们按顺序依次序列化,数据写入到类型为ChannelBuffer的buffer参数中,然后把ChannelBuffer封装成Netty框架的org.jboss.netty.buffer.ChannelBuffer。如果参数中有回调接口,还需要在消费端启动端口监听提供端的回调,这里不展开。

    com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec.encodeRequestData(Channel, ObjectOutput, Object) 
    这里写图片描述 
    然后把封装好的ChannelBuffer写到链路发送到服务端,这里消费端前半部分的工作就完成,接下来目光要转移到服务端。

    org.jboss.netty.handler.codec.oneone.OneToOneEncoder.doEncode(ChannelHandlerContext, MessageEvent) 
    这里写图片描述

    提供端请求解码

    在看提供端初始化代码的时候看到,框架在创建NettyServer时,也会创建netty框架的IO事件处理器链:NettyCodecAdapter.decoder->NettyCodecAdapter.encoder->NettyHandler

    com.alibaba.dubbo.remoting.transport.netty.NettyServer.doOpen() 
    这里写图片描述 
    客户端发送数据到服务端时会触发服务端的上行IO事件并且启动处理器回调,NettyCodecAdapter.decoder和NettyHandler是上行事件处理器,上行事件处理器调用顺序是从前到后执行,即先添加的处理器先执行,所以先触发NettyCodecAdapter.decoder再触发NettyHandler。 
    由NettyCodecAdapter.decoder对请求进行解码,把消息翻译成提供端可理解的,上行事件调用decoder的handleUpstream->messageReceived

    com.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter.InternalDecoder.messageReceived(ChannelHandlerContext, MessageEvent) 
    这里写图片描述 
    com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec.decode(Channel, ChannelBuffer, int, byte[]) 
    这里写图片描述 
    把数据读取到ChannelBuffer之后扔给Codec2组件进行解码处理,这里有个半包传输处理,因为这里使用的是非阻塞式的IO模型,非阻塞IO的特点是线程的读取数据是事件触发式,是由一个Selector组件轮询准备就绪的IO事件,发现准备就绪的事件之后通知线程读取,这种模式的好处是可以极大的优化线程模型,只需少数几个线程处理所有客户端和服务端连接,而阻塞IO需要线程和连接要一对一,但是非阻塞IO远高于阻塞式IO,不像阻塞式IO读写数据时只有数据读完或者超时才会返回,这样能保证读到的数据肯定是完整,而非阻塞模式方法返回之后可能只读到一部分数据,框架的处理是在解析消息时检查消息的长度确定是否有完整的数据,如果数据不完整返回NEED_MORE_INPUT,保存当前解析的位置等待链路的下次IO事件,在下次IO事件到达时从上次保存的位置开始解析。 
    读取到完整的数据之后解析数据头,读取魔数、序列化类型、以及请求id,读取第3个字节判断改数据是消费端请求数据还是提供端响应数据(因为消费端和提供端解码器代码是共用的),并且从1-7位从读出序列化类型,并且根据此序列化类型加载序列化组件对消息进行反序列化按顺序读取消费端写入的dubbo版本号、接口名称、接口版本、方法名称、参数类型列表、参数、附加信息,写入DecodeableRpcInvocation对象对应的属性中。

    com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec.decodeBody(Channel, InputStream, byte[]) 
    这里写图片描述 
    com.alibaba.dubbo.rpc.protocol.dubbo.DecodeableRpcInvocation.decode(Channel, InputStream) 
    这里写图片描述 
    创建一个Request对象,把DecodeableRpcInvocation对象对象设置到Request对象的data属性中。 
    com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec.decodeBody(Channel, InputStream, byte[]) 
    这里写图片描述 
    解码完成之后,激活下一个处理器的messageReceived事件,并且把解码后的对象封装在MessageEvent中。

    com.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter.InternalDecoder.messageReceived(ChannelHandlerContext, MessageEvent) 
    这里写图片描述 
    org.jboss.netty.channel.Channels.fireMessageReceived(ChannelHandlerContext, Object, SocketAddress) 
    这里写图片描述 
    Decoder执行完之后,事件进入到下一个处理器NettyHandler,看下NettyHandler中的代码: 
    这里写图片描述 
    这里直接交给handler处理了,这个handler封装了很多层:DecodeHandler->HeaderExchangeHandler->DubboProtocol.requestHandler,中间封装了好几万层这里只把重要的列出来,源头是从创建NettyServer的时候传过来的。

    com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.createServer(URL) 
    这里写图片描述 
    这里写图片描述 
    先会走到DecodeHandler.received:

    com.alibaba.dubbo.remoting.transport.DecodeHandler.received(Channel, Object) 
    这里写图片描述 
    这个message是Request类型的,要先decode一下,因为在之前已经解码过了,所以这里不会做任何事情,直接走下一个handler.received,这个handler就是HeaderExchangeHandler:

    com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.received(Channel, Object) 
    这里写图片描述 
    普通的同步接口twoWay属性是true走handleRequest方法处理请求,处理结束之后调用channel.send把结果返回到客户端。

    提供端处理请求

    请求处理再走下一个handler的reply,这个handler就是DubboProtocol.requestHandler,把request对象中的data取出来传到requestHandler中,这个data就是前面的解码后的DecodeableRpcInvocation对象它是Invocation接口的一个实现: 
    com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.handleRequest(ExchangeChannel, Request) 
    这里写图片描述 
    com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol 
    这里写图片描述 
    查找提供端请求对应的Invoker,在接口提供者初始化时,每个接口都会创建一个Invoker和Exporter,Exporter持有invoker实例,Exporter对象保存在DubboProtocol的exporterMap中,key是由URL生成的serviceKey,此时通过Invocation中的信息就可还原该serviceKey并且找到对应的Exporter和Invoker,在分析提供者初始化代码时知道它是Invoker-Filter的头节点,激活Filter后调用由ProxyFactory生成的Invoker: 
    这里写图片描述 
    调用invoker.invoke时,通过反射调用最终的服务实现执行相关逻辑。 
    服务执行结束之后,创建一个Response对象返回给客户端。在执行服务实现时会出现两种结果:成功和失败,如果成功,把返回值设置到Response的result中,Response的status设置成OK,如果失败,把失败异常设置到Response的errorMessage中,status设置成SERVICE_ERROR。 
    回到HeaderExchangeHandler.received中的代码,在handleRequest之后,调用channel.send把Response发送到客户端,这个channel封装客户端-服务端通信链路,最终会调用Netty框架,把响应写回到客户端。 
    这里写图片描述

    提供端响应结果编码

    提供端要按照和消费端的协议把Response按照特定的协议进行编码,把编码后的数据写回到消费端,从上面的代码可以看到,在NettyServer初始化的时候,定义了三个IO事件处理器,服务端往客户端回写响应时产生下行事件,处理下行事件处理器,NettyCodecAdapter.encoder和NettyHandler是下行事件处理器,先激活NettyHandler,再激活NettyCodecAdapter. encoder,在NettyCodecAdapter. encoder对响应结果进行编码,还是通过Code2组件和请求编码时使用的组件一样,把响应类型和响应结果依次写回到客户端,根据协议会写入16个字节的数据头,包括: 
    1、1-2字节魔数 
    2、第3个字节,序列化组件类型,约定和客户端的序列化-反序列化协议 
    3、第4个字节,响应状态,是OK还是error 
    4、5-13个字节,响应id,这里的id和request中的id一样 
    5、13-16个字节,响应数据长度

    com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec.encodeResponse(Channel, ChannelBuffer, Response) 
    这里写图片描述 
    返回结果有三种结果:1、没有返回值即返回类型是void;2、有返回值并且执行成功;3、服务调用异常。

    com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec.encodeResponseData(Channel, ObjectOutput, Object) 
    这里写图片描述 
    解码后的数据会写入到通信链路中。

    消费端响应结果解码

    服务端给客户端回写数据之后,客户端会收到IO事件,一个上行事件。NettyClient中有两个上行事件处理器NettyCodecAdapter.decoder和NettyHandler,按照顺序decoder先执行对服务端传过来的数据进行解码,解析出序列化协议、响应状态、响应id(即请求id)。把响应body数据读到DecodeableRpcResult对象中,进行解析同时加载处理原始Request数据,这个Request对象在请求时会被缓存到DefaultFuture中,加载Request的目的是因为Request中Invocation中携带了服务接口的返回值类型信息,需要根据这个类型把响应解析创建对应类型的对象。

    com.alibaba.dubbo.rpc.protocol.dubbo.DecodeableRpcResult.decode(Channel, InputStream) 
    这里写图片描述 
    创建Response对象并且把解析出结果或异常设置到Response中。 
    decoder把响应解析成Response对象中,NettyHandler接着往下处理,同样触发它的messageReceive事件,在提供端解码的时候看到了,它的handler封装关系是:DecodeHandler->HeaderExchangeHandler->DubboProtocol.requestHandler,主要处理在HeaderExchangeHandler中:

    com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.handleResponse(Channel, Response) 
    这里写图片描述 
    com.alibaba.dubbo.remoting.exchange.support.DefaultFuture.doReceived(Response) 
    这里写图片描述 
    这里主要做的事情是唤醒调用者线程,并且把Response设置到DefaultFuture中,在消费者触发请求的代码中可以看到,消费端调用接口的时候请求写到提供端之后,会调用DefaultFuture.get阻塞等待响应结果

    com.alibaba.dubbo.remoting.exchange.support.DefaultFuture.get(int) 
    这里写图片描述 
    com.alibaba.dubbo.remoting.exchange.support.DefaultFuture.isDone() 
    这里写图片描述 
    在done这个Condition上进行条件等待,DefaultFuture.doReceive时,设置response唤醒done,此时调用线程被唤醒并且检查是否已经有了response(避免假唤醒),唤醒之后返回response中的result,调用端即拿到了接口的调用结果(返回值或异常),整个远程服务接口的调用流程就完成了。

    超时处理

    前面说了在进行接口调用时会出现两种情况:接口调用成功、接口调用异常,其实还有一种情况就是接口调用超时。在消费端等待接口返回时,有个timeout参数,这个时间是使用者设置的,可在消费端设置也可以在提供端设置,done.await等待时,会出现两种情况跳出while循环,一是线程被唤醒并且已经有了response,二是等待时间已经超过timeout,此时也会跳出while,当跳出while循环并且Future中没有response时,就说明接口已超时抛出TimeoutException,框架把TimeoutException封装成RpcException抛给应用层。


    展开全文
  • 远程过程调用服务器和Crystal客户端。 实现msgpack-rpc原型调用。 设计可靠且稳定(捕获所有可能的protocall /套接字错误)。 它也表现出色:基准测试在池模式下(单个服务器核心,单个客户端核心)显示了约200K ...
  • 基于java aio 的RPC 远程调用框架 组件介绍 Serializer 序列化和反序列的工具类,项目的实现为基于Gson的序列化工具 IOHandler 从Channel中读取数据并交由Serializer处理的类,本身是异步读取数据 在读取数据时 提供...
  • Asyncio-rpc:远程过程调用框架 用于异步远程过程调用的Python包 免费软件:BSD许可证 文档: : 。 概述 特征 Asyncio RPC客户端/服务器 Msgpack序列化,带有使用自己的数据类的选项(Python 3.7) Redis通信层 ...
  • 通过微信 JSD-SDK开发文档,调用微信接口获取摄像头图片上传到服务器,保存图片到本地服务器
  • SpringBoot使用Netty实现远程调用

    千次阅读 多人点赞 2020-10-24 00:12:29
    SpringBoot使用Netty实现远程调用 前言 众所周知我们在进行网络连接的时候,建立套接字连接是一个非常消耗性能的事情,特别是在分布式的情况下,用线程池去保持多个客户端连接,是一种非常消耗线程的行为。那么我们...

    SpringBoot使用Netty实现远程调用

    前言

    众所周知我们在进行网络连接的时候,建立套接字连接是一个非常消耗性能的事情,特别是在分布式的情况下,用线程池去保持多个客户端连接,是一种非常消耗线程的行为。那么我们该通过什么技术去解决上述的问题呢,那么就不得不提一个网络连接的利器——Netty.

    正文

    Netty

    Netty是一个NIO客户端服务器框架:

    • 它可快速轻松地开发网络应用程序,例如协议服务器和客户端。
    • 它极大地简化和简化了网络编程,例如TCPUDP套接字服务器。

    NIO是一种非阻塞IO ,它具有以下的特点

    • 单线程可以连接多个客户端。
    • 选择器可以实现但线程管理多个Channel,新建的通道都要向选择器注册。
    • 一个SelectionKey键表示了一个特定的通道对象和一个特定的选择器对象之间的注册关系。
    • selector进行select()操作可能会产生阻塞,但是可以设置阻塞时间,并且可以用wakeup()唤醒selector,所以NIO是非阻塞IO

    Netty模型selector模式

    它相对普通NIO的在性能上有了提升,采用了:

    • NIO采用多线程的方式可以同时使用多个selector
    • 通过绑定多个端口的方式,使得一个selector可以同时注册多个ServerSocketServer
    • 单个线程下只能有一个selector,用来实现Channel的匹配及复用
      在这里插入图片描述

    半包问题
    TCP/IP在发送消息的时候,可能会拆包,这就导致接收端无法知道什么时候收到的数据是一个完整的数据。在传统的BIO中在读取不到数据时会发生阻塞,但是NIO不会。为了解决NIO的半包问题,NettySelector模型的基础上,提出了reactor模式,从而解决客户端请求在服务端不完整的问题。

    netty模型reactor模式

    • selector的基础上解决了半包问题。

    在这里插入图片描述

    上图,简单地可以描述为"boss接活,让work干":manReactor用来接收请求(会与客户端进行握手验证),而subReactor用来处理请求(不与客户端直接连接)。

    SpringBoot使用Netty实现远程调用

    maven依赖

    <!--lombok-->
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.2</version>
      <optional>true</optional>
    </dependency>
    
    <!--netty-->
    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-all</artifactId>
      <version>4.1.17.Final</version>
    </dependency>
    

    服务端部分

    NettyServer.java:服务启动监听器

    @Slf4j
    public class NettyServer {
        public void start() {
            InetSocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 8082);
            //new 一个主线程组
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            //new 一个工作线程组
            EventLoopGroup workGroup = new NioEventLoopGroup(200);
            ServerBootstrap bootstrap = new ServerBootstrap()
                    .group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ServerChannelInitializer())
                    .localAddress(socketAddress)
                    //设置队列大小
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    // 两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            //绑定端口,开始接收进来的连接
            try {
                ChannelFuture future = bootstrap.bind(socketAddress).sync();
                log.info("服务器启动开始监听端口: {}", socketAddress.getPort());
                future.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                log.error("服务器开启失败", e);
            } finally {
                //关闭主线程组
                bossGroup.shutdownGracefully();
                //关闭工作线程组
                workGroup.shutdownGracefully();
            }
        }
    }
    

    ServerChannelInitializer.java:netty服务初始化器

    /**
    * netty服务初始化器
    **/
    public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            //添加编解码
            socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
            socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
            socketChannel.pipeline().addLast(new NettyServerHandler());
        }
    }
    

    NettyServerHandler.java:netty服务端处理器

    /**
    * netty服务端处理器
    **/
    @Slf4j
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
        /**
         * 客户端连接会触发
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            log.info("Channel active......");
        }
    
        /**
         * 客户端发消息会触发
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            log.info("服务器收到消息: {}", msg.toString());
            ctx.write("你也好哦");
            ctx.flush();
        }
    
    
        /**
         * 发生异常触发
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    

    RpcServerApp.java:SpringBoot启动类

    /**
    * 启动类
    *
    */
    @Slf4j
    @SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
    public class RpcServerApp extends SpringBootServletInitializer {
        @Override
        protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
            return application.sources(RpcServerApp.class);
        }
    
        /**
         * 项目的启动方法
         *
         * @param args
         */
        public static void main(String[] args) {
            SpringApplication.run(RpcServerApp.class, args);
            //开启Netty服务
            NettyServer nettyServer =new  NettyServer ();
            nettyServer.start();
            log.info("======服务已经启动========");
        }
    }
    

    客户端部分

    NettyClientUtil.java:NettyClient工具类

    /**
    * Netty客户端
    **/
    @Slf4j
    public class NettyClientUtil {
    
        public static ResponseResult helloNetty(String msg) {
            NettyClientHandler nettyClientHandler = new NettyClientHandler();
            EventLoopGroup group = new NioEventLoopGroup();
            Bootstrap bootstrap = new Bootstrap()
                    .group(group)
                    //该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输
                    .option(ChannelOption.TCP_NODELAY, true)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast("decoder", new StringDecoder());
                            socketChannel.pipeline().addLast("encoder", new StringEncoder());
                            socketChannel.pipeline().addLast(nettyClientHandler);
                        }
                    });
            try {
                ChannelFuture future = bootstrap.connect("127.0.0.1", 8082).sync();
                log.info("客户端发送成功....");
                //发送消息
                future.channel().writeAndFlush(msg);
                // 等待连接被关闭
                future.channel().closeFuture().sync();
                return nettyClientHandler.getResponseResult();
            } catch (Exception e) {
                log.error("客户端Netty失败", e);
                throw new BusinessException(CouponTypeEnum.OPERATE_ERROR);
            } finally {
                //以一种优雅的方式进行线程退出
                group.shutdownGracefully();
            }
        }
    }
    

    NettyClientHandler.java:客户端处理器

    /**
    * 客户端处理器
    **/
    @Slf4j
    @Setter
    @Getter
    public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    
        private ResponseResult responseResult;
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            log.info("客户端Active .....");
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            log.info("客户端收到消息: {}", msg.toString());
            this.responseResult = ResponseResult.success(msg.toString(), CouponTypeEnum.OPERATE_SUCCESS.getCouponTypeDesc());
            ctx.close();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    

    验证

    测试接口

    @RestController
    @Slf4j
    public class UserController {
    
        @PostMapping("/helloNetty")
        @MethodLogPrint
        public ResponseResult helloNetty(@RequestParam String msg) {
            return NettyClientUtil.helloNetty(msg);
        }
    }
    

    访问测试接口
    在这里插入图片描述

    服务端打印信息
    在这里插入图片描述

    客户端打印信息
    在这里插入图片描述

    源码

    项目源码可从的我的github中获取:github源码地址

    在这里插入图片描述

    展开全文
  • 快网 通过直接远程方法调用(RMI)在基于Java的服务器和android软件之间进行通信的工具添加了完整版本的android-客户端和服务器的重构版本org.fastnet.android
  • #Java 远程方法调用 ##Calculator 示例 Java 远程方法调用 (Java RMI) 是一个 Java API,它执行远程过程调用 (RPC) 的面向对象等价物,支持序列化 Java 类的直接传输和分布式垃圾收集。 远程过程调用:消息传递远程...
  • 米 分布式系统的 Java 远程方法调用 符合 制作 运行服务器 光盘仓 伺服器 运行客户端 光盘仓 java客户端#
  • //这里就是当远程调用失败时,feign会自动进入接口的实现类,就是之前所说的熔断, // 在这里,你可以做对应的处理,比如返回一个空的User对象,或者启用备用方案,调用联外一台服务等等。 //这里我们直接返回空...
  • 1、RPC(Remote Procedure Call)远程过程调用,它允许一台计算机程序远程调用另外一台计算机的子程序,而不用去关心底层的网络通信细节,对我们来说是透明的。经常用于分布式网络通信中。 2、Hadoop的进程间交互都...
  • 关注“Java艺术”一起来充电吧!在前面分析Dubbo注册中心层源码的文章中,我们知道,服务的导出与引入由RegistryProtocol调度完成。对于服务提供者,服务是先导出再注册到注...
  • 使用Akka的远程调用

    万次阅读 2017-01-06 15:01:25
    正如其它RPC或者RMI框架那样,Akka也提供了远程调用的能力。服务端在监听的端口上接收客户端的调用。本文将在《Spring与Akka的集成》一文的基础上介绍Akka的remote调用,本文很多代码和例子来源于Akka官网的代码示例...
  • RMI-JAVA中的远程方法调用 第1步-将项目克隆到您的计算机 存储库URL- 步骤2-然后运行以下命令来编译Java代码 javac * .java-这将一次编译所有Java代码 步骤3-打开您的CMD并运行以下命令以启动RMI注册表 启动...
  • Hadoop源码剖析07-远程过程调用(一)

    千次阅读 2019-12-30 22:49:13
    为什么要用远程过程调用 作为典型的分布式系统,Hadoop中各个实体间存在着大量的交互,远程过程调用让用 户可以像调用本地方法一样调用另外一个应用程序提供的服务,而不必设计和开发相关的信 息发送、处理和接收等...
  • 调用 WTSRegisterSessionNotification APIzc会话监控消息,接收来自远程桌面用户登入或登出事件。 再调用WTSQuerySessionInformation cha询 出会话用户名与IP地址,可做成系统服务后台监控,发现非白名单用户发送...
  • 提示:RPC 代表远程过程调用。 RPC# 旨在提供一种简单的方法来触发远程代码执行和远程对象处理。 无论您使用何种数据提供程序,无论您以何种方式连接到您的服务器(WCF、套接字、HttpClient、ServiceStack...)。 ...
  • NymphRPC是一个紧凑的,基于C ++的远程过程调用(RPC)库。 可以查看一下test文件夹中的示例服务器和客户端实现,以了解如何将NymphRPC集成到应用程序中。 计划的港口 除了当前的C ++实现之外,还计划使用该库的Ada...
  • 想想JavaScript世界中的Java远程方法调用(RMI)。 RMI.js是一些JavaScript客户端代码和Node.js模块的组合,该模块可以实现远程方法的隐式调用。 开发人员可以使用本地,远程或两者的方法定义对象,然后在客户端...
  • Java RMI远程方法调用详解

    万次阅读 多人点赞 2016-07-22 17:45:43
    远程方法调用RMI(Remote Method Invocation),是允许运行在一个Java虚拟机的对象调用运行在另一个Java虚拟机上的对象的方法。...它使客户机上运行的程序可以调用远程服务器上的对象。远程方法调用特性使
  • 1.Spring中,HTTPInvoker(HTTP调用器)是通过基于HTTP协议的分布式远程调用解决方案,和java ...向服务器发送远程调用请求: 远程调用信息——>封装为远程调用对象——>序列化写入到远程调用HTTP请求中——>向服务器
  • 使用RMI(远程方法调用),安全管理器和策略的概念对服务器JAVA的简单实现。 这是Netbeans IDE的项目。 由巴拉那州联邦技术大学(巴西)的学生开发的项目: 雨果·阿吉亚尔(Hugo Aguiar) 乔斯·卡洛斯·德·莫...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 61,671
精华内容 24,668
热门标签
关键字:

网站服务器源码远程调用