精华内容
下载资源
问答
  • RabbitMQ系列-实现RPC异步调用

    千次阅读 2018-10-12 17:52:48
    使用Spring AMQP实现RPC异步调用 示列 服务器端 应用启动类代码, import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation....

    使用Spring AMQP实现RPC异步调用

    示列

    服务器端

    应用启动类代码,

    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.ComponentScan;
    
    import java.util.concurrent.TimeUnit;
    
    @ComponentScan
    public class Application {
        public static void main(String[] args) throws Exception{
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
            System.out.println("===server startup======");
            TimeUnit.SECONDS.sleep(120);
            context.close();
        }
    }
    

    配置类:
    监听了sms队列,这个队列将会是客户端请求消息发送到的队列,配置了适配器,适配器中去调用服务,适配器返回的值就是服务端返回给客户端的RPC调用的结果

    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class MQConfig {
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory factory = new CachingConnectionFactory();
            factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
            return factory;
        }
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            return rabbitAdmin;
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            return rabbitTemplate;
        }
    
        @Bean
        public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setQueueNames("sms");
            container.setAcknowledgeMode(AcknowledgeMode.NONE);
            //使用适配器的方式
            container.setMessageListener(new MessageListenerAdapter(new SendSMSHandler()));
            return container;
        }
    }
    

    处理器,处理器中调用具体的服务,我们此列子中处理器方法返回的值是boolean类型

    import java.util.concurrent.TimeUnit;
    
    public class SendSMSHandler {
    
        public boolean handleMessage(byte[] body){
            String _body = new String(body);
            System.out.println(_body);
            String[] sms = _body.split(":");
            String phone = sms[0];
            String content = sms[1];
    
            boolean is = SendSMSTool.sendSMS(phone,content);
    
            try {
                TimeUnit.SECONDS.sleep(6);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            return is;
        }
    }
    

    服务接口

    public class SendSMSTool {
    
        public static boolean sendSMS(String phone,String content){
            System.out.println("发送短信内容:【"+content+"】到手机号:"+phone);
            return phone.length() > 6;
        }
    }
    

    服务端步骤

    1. 消息处理方法,一定要有返回值,这个返回值就是就是server回复客户端的结果。比如我们SendSMSHandler.handleMessage方法返回的值。

    客户端
    应用启动类:

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.support.CorrelationData;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.ComponentScan;
    
    import java.util.UUID;
    import java.util.concurrent.TimeUnit;
    
    @ComponentScan
    public class Application {
        public static void main(String[] args) throws Exception{
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
    
            RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
    
            //设置超时时间,单位是ms
            rabbitTemplate.setReplyTimeout(10000);
    
            String phone = "15634344321";
            String content ="周年庆,五折优惠";
    
            MessageProperties messageProperties = new MessageProperties();
            Message message = new Message((phone+":"+content).getBytes(),messageProperties);
    
            //rabbitTemplate.send("","sms",message);
    
            Message reply = rabbitTemplate.sendAndReceive("","sms",message,
                    new CorrelationData(UUID.randomUUID().toString()));
    
            System.out.println(reply);
            System.out.println("message,body:"+new String(reply.getBody()));
            System.out.println("message,properties:"+reply.getMessageProperties());
    
            TimeUnit.SECONDS.sleep(30);
            context.close();
        }
    }
    

    配置类代码:

    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class MQConfig {
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory factory = new CachingConnectionFactory();
            factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
            return factory;
        }
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            return rabbitAdmin;
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            return rabbitTemplate;
        }
    }
    

    如果服务端睡眠6s,则客户端通过sendAndReceive方法接收到的Message对象为空,怎样设置呢?
    客户端通过设置rabbitTemplate.setReplyTimeout(10000);就可以了。

    客户端步骤

    1. 使用sendAndReceive方法发送消息,该方法返回一个Message对象,该对象就是server返回的结果
    2. sendAndReceive如果超过5s还没有收到结果,则返回null,这个超时时间可以通过rabbitTemplate.setReplyTimeout()来进行设置
    3. server端返回的结果一定要注意,和MessageConverter有关,默认的org.springframework.amqp.support.converter.SimpleMessageConverter会把基本的数据类型转换成Serializable对象,这样的话,client端接收的也是序列化的java对象,所以,需要合理设置MessageConverter

    示列代码中服务端返回给客户端的是Boolean类型,

    启动服务端客户端代码:
    服务器打印控制台打印:

    15634344321:周年庆,五折优惠
    发送短信内容:【周年庆,五折优惠】到手机号:15634344321
    

    客户端控制台打印:

    message,body:����sr��java.lang.Boolean� r�՜�����Z��valuexp�
    message,properties:MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/x-java-serialized-object, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=amq.rabbitmq.reply-to.g2dkAB5yYWJiaXRAaVpicDFqY3d4N3NmYjFud3pyZWh5NloAAFu0AAAABwI=.kHL9zxtdQmtcxl0mQF8zrg==, receivedDelay=null, deliveryTag=1, messageCount=null, consumerTag=null, consumerQueue=null]
    

    我们发现客户端接收到的数据乱码,将服务端的处理器的返回值改写成String类型的,

    import java.util.concurrent.TimeUnit;
    
    public class SendSMSHandler {
    
        public String handleMessage(byte[] body){
            String _body = new String(body);
            System.out.println(_body);
            String[] sms = _body.split(":");
            String phone = sms[0];
            String content = sms[1];
    
            boolean is = SendSMSTool.sendSMS(phone,content);
    
            try {
                TimeUnit.SECONDS.sleep(6);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            return is ? "success":"false";
        }
    }
    

    此时发现客户端接收的消息数据没有乱码,原因何在?我们总结一下就是服务器端处理器返回给客户端boolean类型,那么返回的消息数据就乱码,如果返回的是String类型,那么返回的消息数据就不会乱码。

    之前我们学习了org.springframework.amqp.support.converter.MessageConverter接口,当客户端向服务端发送消息的时候会进行消息类型转换,调用了fromMessage方法,而当服务器返回给客户端的时候会将服务端的对象转换成Message对象,很明显调用的是toMessage方法。

    我们知道org.springframework.amqp.support.converter.MessageConverter接口的默认实现是org.springframework.amqp.support.converter.SimpleMessageConverter,而toMessage方法的实现是在其继承的对象AbstractMessageConverter中,

    我们看到其AbstractMessageConverter.toMessage方法的实现逻辑是:

        @Override
        public final Message toMessage(Object object, MessageProperties messageProperties)
                throws MessageConversionException {
            if (messageProperties == null) {
                messageProperties = new MessageProperties();
            }
            //将对象转换成Message对象
            Message message = createMessage(object, messageProperties);
            messageProperties = message.getMessageProperties();
            if (this.createMessageIds && messageProperties.getMessageId() == null) {
                messageProperties.setMessageId(UUID.randomUUID().toString());
            }
            return message;
        }
    

    createMessage方法就是将对象转换成Message对象,

        /**
         * Creates an AMQP Message from the provided Object.
         */
        @Override
        protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
            byte[] bytes = null;
            if (object instanceof byte[]) {
                bytes = (byte[]) object;
                messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES);
            }
            else if (object instanceof String) {
                try {
                    bytes = ((String) object).getBytes(this.defaultCharset);
                }
                catch (UnsupportedEncodingException e) {
                    throw new MessageConversionException(
                            "failed to convert to Message content", e);
                }
                messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
                messageProperties.setContentEncoding(this.defaultCharset);
            }
            //因为boolean类型实现Serializable接口,所以会将其序列化
            else if (object instanceof Serializable) {
                try {
                    bytes = SerializationUtils.serialize(object);
                }
                catch (IllegalArgumentException e) {
                    throw new MessageConversionException(
                            "failed to convert to serialized Message content", e);
                }
                messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);
            }
            if (bytes != null) {
                messageProperties.setContentLength(bytes.length);
            }
            return new Message(bytes, messageProperties);
        }
    

    我们在程序中将序列化对象直接转换成字符串所以乱码,而返回的是String类型的情形的时候先将字符串转换成相应的字节数组,然后返回new Message(bytes, messageProperties);就不会乱码。

    继续探讨,当我们服务端返回的是一个对象的时候,客户端会返回空
    返回的对象:

    public class SendStatus {
    
        private String phone;
    
        private String result;
    
        public String getPhone() {
            return phone;
        }
    
        public void setPhone(String phone) {
            this.phone = phone;
        }
    
        public String getResult() {
            return result;
        }
    
        public void setResult(String result) {
            this.result = result;
        }
    }
    

    将该对象返回:

    public class SendSMSHandler {
    
        public SendStatus handleMessage(byte[] body){
            String _body = new String(body);
            System.out.println(_body);
            String[] sms = _body.split(":");
            String phone = sms[0];
            String content = sms[1];
    
            boolean is = SendSMSTool.sendSMS(phone,content);
            SendStatus sendStatus = new SendStatus();
            sendStatus.setPhone(phone);
            sendStatus.setResult(is ? "SUCCESS":"FAILURE");
            return sendStatus;
        }
    }
    

    服务端控制台:

    15634344321:周年庆,五折优惠
    发送短信内容:【周年庆,五折优惠】到手机号:15634344321
    

    客户端:

    message,body:
    message,properties:MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/octet-stream, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=amq.rabbitmq.reply-to.g2dkAB5yYWJiaXRAaVpicDFqY3d4N3NmYjFud3pyZWh5NloAAFxBAAAABwI=.01fOGW/nvS2nz6gKza+cjg==, receivedDelay=null, deliveryTag=1, messageCount=null, consumerTag=null, consumerQueue=null]
    

    原因何在,因为我们定义的SendStatus不走createMessage中的所有if分支,最后返回的是null,怎么解决呢,要么自己去定义一个org.springframework.amqp.support.converter.MessageConverter实现,要么换一个默认的org.springframework.amqp.support.converter.MessageConverter实现。

    改造后的示列

    使用AMQP自带的消息类型转换器Jackson2JsonMessageConverter
    服务端

    应用启动类,

    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.ComponentScan;
    
    import java.util.concurrent.TimeUnit;
    
    @ComponentScan
    public class Application {
        public static void main(String[] args) throws Exception{
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
            System.out.println("===server startup======");
            TimeUnit.SECONDS.sleep(120);
            context.close();
        }
    }
    

    配置类,添加自定义的消息转换器

    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class MQConfig {
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory factory = new CachingConnectionFactory();
            factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
            return factory;
        }
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            return rabbitAdmin;
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            return rabbitTemplate;
        }
    
        @Bean
        public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setQueueNames("sms");
            container.setAcknowledgeMode(AcknowledgeMode.NONE);
            //使用适配器的方式
            container.setMessageListener(new MessageListenerAdapter(new SendSMSHandler(),new Jackson2JsonMessageConverter()));
            return container;
        }
    }
    

    处理器handler,返回自定义的SendStatus类型

    import java.util.concurrent.TimeUnit;
    
    public class SendSMSHandler {
    
        public SendStatus handleMessage(byte[] body){
            String _body = new String(body);
            System.out.println(_body);
            String[] sms = _body.split(":");
            String phone = sms[0];
            String content = sms[1];
    
            boolean is = SendSMSTool.sendSMS(phone,content);
            SendStatus sendStatus = new SendStatus();
            sendStatus.setPhone(phone);
            sendStatus.setResult(is ? "SUCCESS":"FAILURE");
    
            try {
                TimeUnit.SECONDS.sleep(6);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            return sendStatus;
        }
    }
    

    接口服务,

    public class SendSMSTool {
    
        public static boolean sendSMS(String phone,String content){
            System.out.println("发送短信内容:【"+content+"】到手机号:"+phone);
            return phone.length() > 6;
        }
    }
    
    

    服务端返回的对象,

    public class SendStatus {
        private String phone;
    
        private String result;
    
        public String getPhone() {
            return phone;
        }
    
        public void setPhone(String phone) {
            this.phone = phone;
        }
    
        public String getResult() {
            return result;
        }
    
        public void setResult(String result) {
            this.result = result;
        }
    }
    

    客户端
    应用启动类,

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.support.CorrelationData;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.ComponentScan;
    
    import java.util.UUID;
    import java.util.concurrent.TimeUnit;
    
    @ComponentScan
    public class Application {
        public static void main(String[] args) throws Exception{
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
    
            RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
    
            //设置超时时间,单位是ms
            rabbitTemplate.setReplyTimeout(10000);
    
            String phone = "15634344321";
            String content ="周年庆,五折优惠";
    
            MessageProperties messageProperties = new MessageProperties();
            Message message = new Message((phone+":"+content).getBytes(),messageProperties);
    
            //rabbitTemplate.send("","sms",message);
    
            Message reply = rabbitTemplate.sendAndReceive("","sms",message,
                    new CorrelationData(UUID.randomUUID().toString()));
    
            System.out.println(reply);
            System.out.println("message,body:"+new String(reply.getBody()));
            System.out.println("message,properties:"+reply.getMessageProperties());
    
            TimeUnit.SECONDS.sleep(30);
            context.close();
        }
    }
    

    配置类

    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class MQConfig {
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory factory = new CachingConnectionFactory();
            factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
            return factory;
        }
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            return rabbitAdmin;
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            return rabbitTemplate;
        }
    }
    

    启动服务器客户端,客户端返回

    message,body:{"phone":"15634344321","result":"SUCCESS"}
    message,properties:MessageProperties [headers={__TypeId__=rpc.server.SendStatus}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=amq.rabbitmq.reply-to
    

    返回了SendStatus的JSON格式,因为使用了Jackson2JsonMessageConverter消息类型转换器。

    转自:https://www.jianshu.com/p/24b88ccfb019

     

    展开全文
  • 分布式 - RPC异步调用

    2018-02-22 22:36:53
    要实时就用同步,要吞吐率就用异步同步调用 流程略 实现负载均衡:连接池中建立了与一个RPC-server集群的连接,连接池在返回连接的时候,需要具备负载均衡策略。实现故障转移:连接池中建立了与一个RPC-server...

    要实时就用同步,要吞吐率就用异步。

    同步调用

    流程略

    实现负载均衡:连接池中建立了与一个RPC-server集群的连接,连接池在返回连接的时候,需要具备负载均衡策略。
    实现故障转移:连接池中建立了与一个RPC-server集群的连接,当连接池发现某一个机器的连接异常后,需要将这个机器的连接排除掉,返回正常的连接,在机器恢复后,再将连接加回来。
    实现发送超时:因为是同步阻塞调用,拿到一个连接后,使用底层socket带超时的send/recv即可实现带超时的发送和接收。

    异步调用

    流程略

    为什么要待发送队列、待接收队列?
    因为要将工作线程和io收发线程两者的同步关系解除,从而引入工作线程池和io收发线程池。

    为什么要上下文
    因为请求包的发送,响应包的callback回调不在同一个工作线程中完成,需要一个context来记录一个请求的上下文,把请求-响应-回调等一些信息匹配起来。通过rpc框架的内部请求id作为key,来保存调用开始时间time,超时时间timeout,回调函数callback,超时回调timeout_callback等信息。
    注意:请求id由client端服务调用时生成,会序列化成字节流发送给server端,server端会返回该请求id。

    负载均衡,故障转移
    与同步调用的连接池思路基本相同。
    注意:由于同步调用的连接池使用阻塞方式收发,需要与一个服务的一个server ip建立多条连接来保证client端多个服务同时路由到同一个server时不会阻塞。而由于异步调用,server端会很快返回response,所以client端多个服务同时路由到同一个server的情况是很少的,因此一个服务的一个server ip只需要建立少量的连接。

    超时发送与接收
    超时管理器启动timer对上下文管理器中的所有context进行扫描,看上下文中请求发送时间是否过长,如果过长,就不再等待result包,将该context从上下文管理器中移除,直接执行timeout_callback。
    注意:如果timeout_callback执行后,client端接收到了server端的result包,此时因为通过req-id在上下文管理器里找不到对应的context(说明已经超时处理过了),就直接将请求丢弃。

    展开全文
  • Iguazu RPC是生态系统的插件,它允许配置异步调用和缓存策略。 我们宽松地使用“ RPC”,因为您可以使用浏览器中可用的任何形式的通信策略来与服务器API进行通信(例如REST,GraphQL,甚至返回JSON或XML的非结构化...
  • 作者 | 李林锋10 年 Java NIO、平台中间件设计开发经验,精通 Netty、Mina、分布式服务框架、API Gateway、PaaS 等,《Netty 进阶...

    作者 | 李林锋

    10 年 Java NIO、平台中间件设计和开发经验,精通 Netty、Mina、分布式服务框架、API Gateway、PaaS 等,《Netty 进阶之路》、《分布式服务框架原理与实践》作者。目前在华为终端应用市场负责业务微服务化、云化、全球化等相关设计和开发工作。

    联系方式:

    新浪微博:Nettying

    微信:Nettying

    Email:neu_lilinfeng@sina.com

    01

    异步的一些常见误区

    1.1

    常见的理解误区

    在将近 10 年的平台中间件研发历程中,我们的平台和业务经历了从 C++ 到 Java,从同步的 BIO 到非阻塞的 NIO,以及纯异步的事件驱动 I/O(AIO)。服务器也从 Web 容器逐步迁移到了内部更轻量、更高性能的微容器。服务之间的 RPC 调用从最初的同步阻塞式调用逐步升级到了全栈异步非阻塞调用。

    每次的技术演进都会涉及到大量底层平台技术以及上层编程模型的切换,在实际工作中,我发现很多同学对通信框架的异步和 RPC 调用的异步理解有误,比较典型的错误理解包括:

    1.我使用的是 Tomcat8,因为 Tomcat8 支持 NIO,所以我基于 Tomcat 开发的 HTTP 调用都是异步的。

    2.因为我们的 RPC 框架底层使用的是 Netty、Vert.X 等异步框架,所以我们的 RPC 调用天生就是异步的。

    3.因为我们底层的通信框架不支持异步,所以 RPC 调用也无法异步化。

    1.2

    混淆Tomcat NIO与HTTP服务的异步化

    1.2.1 Tomcat 的 BIO 和 NIO

    在 Tomcat6.X 版本对 NIO 提供比较完善的支持之前,作为 Web 服务器,Tomcat 以 BIO 的方式接收并处理客户端的 HTTP 请求,当并发访问量比较大时,就容易发生拥塞等性能问题,它的工作原理示意如下所示:

    图 1 采用 BIO 做 HTTP 服务器的 Web 容器

    传统同步阻塞通信(BIO)面临的主要问题如下:

    1.性能问题:一连接一线程模型导致服务端的并发接入数和系统吞吐量受到极大限制。

    2.可靠性问题:由于 I/O 操作采用同步阻塞模式,当网络拥塞或者通信对端处理缓慢会导致 I/O 线程被挂住,阻塞时间无法预测。

    3.可维护性问题:I/O 线程数无法有效控制、资源无法有效共享(多线程并发问题),系统可维护性差。

    从上图我们可以看出,每当有一个新的客户端接入,服务端就需要创建一个新的线程(或者重用线程池中的可用线程),每个客户端链路对应一个线程。当客户端处理缓慢或者网络有拥塞时,服务端的链路线程就会被同步阻塞,也就是说所有的 I/O 操作都可能被挂住,这会导致线程利用率非常低,同时随着客户端接入数的不断增加,服务端的 I/O 线程不断膨胀,直到无法创建新的线程。

    同步阻塞 I/O 导致的问题无法在业务层规避,必须改变 I/O 模型,才能从根本上解决这个问题。

    Tomcat 6.X 提供了对 NIO 的支持,通过指定 Connector 的 protocol="org.apache.coyote.http11.Http11NioProtocol",就可以开启 NIO 模式,采用 NIO 之后,利用 Selector 的轮询以及 I/O 操作的非阻塞特性,可以实现使用更少的 I/O 线程处理更多的客户端连接,提升吞吐量和主机的资源利用率。Tomcat 8.X 之后提供了对 NIO2.0 的支持,默认也开启了 NIO 通信模式。

    1.2.2 Tomcat NIO 与 Servlet 异步

    事实上,Tomcat 支持 NIO,与 Tomcat 的 HTTP 服务是否是异步的,没有必然关系,这个可以从两个层面理解:

    1.HTTP 消息的读写:即便采用了 NIO,HTTP 请求和响应的消息处理仍然可能是同步阻塞的,这与协议栈的具体策略有关系。从 Tomcat 官方文档可以看到,Tomcat 6.X 版本即便采用 Http11NioProtocol,HTTP 请求消息和响应消息的读写仍然是 Blocking 的。

    2.HTTP 请求和响应的生命周期管理:本质上就是 Servlet 是否支持异步,如果 Servlet 是 3.X 之前的版本,则 HTTP 协议的处理仍然是同步的,这就意味着 Tomcat 的 Connector 线程需要同时处理 HTTP 请求消息、执行 Servlet Filter 以及业务逻辑,然后将业务构造的 HTTP 响应消息发送给客户端,整个 HTTP 消息的生命周期都采用了同步处理方式。

    Tomcat 与 Servlet 的版本配套关系如下所示:

    Servlet**** 规范版本Tomcat**** 版本JDK**** 版本
    4.09.0.X8+
    3.18.0.X7+
    3.07.0.X6+
    2.56.0.X5+
    2.45.5.X1.4+
    2.34.1.X1.3+
    表 1 Tomcat 与 Servlet 的版本配套关系

    1.2.3 Tomcat NIO 与 HTTP 服务调用

    以 Tomcat 6.X 版本为例,Tomcat HTTP 协议消息和后续的业务逻辑处理如下所示(Tomcat HTTP 协议处理非常复杂,为了便于理解,图示做了简化):

    图 2 Tomcat 6.X 的 HTTP 消息接入和处理原理

    从上图可以看出,HTTP 请求消息的读取、Servlet Filter 的执行、业务 Servlet 的逻辑处理,以及 HTTP 响应都是由 Tomcat 的 NIO 线程(Processor,实际更复杂些,这里做了简化处理)做处理,即 HTTP 消息的处理周期中都是串行同步执行的,尽管 Tomcat 使用 NIO 做接入,HTTP 服务端的处理仍然是同步的。它的弊端很明显,如果 Servlet 中的业务逻辑处理比较复杂,则会导致 Tomcat 的 NIO 线程被阻塞,无法读取其它 HTTP 客户端发送的 HTTP 请求消息,导致客户端读响应超时。

    可能有读者会有疑问,途中标识处,为什么不能创建一个业务线程池,由业务线程池异步处理业务逻辑,处理完成之后再填充 HttpServletResponse,发送响应。实际上在 Servlet 支持异步之前是无法实现的,原因是每个响应对象只有在 Servlet 的 service 方法或 Filter 的 doFilter 方法范围内有效,该方法一旦调用完成,Tomcat 就认为本次 HTTP 消息处理完成,它会回收 HttpServletRequest 和 HttpServletResponse 对象再利用,如果业务异步化之后再处理 HttpServletResponse,拿到的实际就不是之前请求消息对应的响应,会发生各种非预期问题,因此,业务逻辑必须在 service 方法结束前执行,无法做异步化处理。

    如果使用的是支持 Servlet3.0+ 版本的 Tomcat,通过开启异步处理模式,就能解决同步调用面临的各种问题,在后续章节中会有详细介绍。

    1.2.4 总结

    通过以上分析我们可以看出,除了将 Tomcat 的 Connector 配置成 NIO 模式之外,还需要 Tomcat 配套的 Servlet 版本支持异步化(3.0+),同时还需要在业务 Servlet 的代码中开启异步模式,HTTP 服务端才能够实现真正的异步化:I/O 异步以及业务逻辑处理的异步化。

    1.3

    混淆 RPC 异步与 I/O 异步

    1.3.1 Java 的各种 I/O 模型

    很多人喜欢将 JDK 1.4 提供的 NIO 框架称为异步非阻塞 I/O,但是,如果严格按照 UNIX 网络编程模型和 JDK 的实现进行区分,实际上它只能被称为非阻塞 I/O,不能叫异步非阻塞 I/O。在早期的 JDK 1.4 和 1.5 update10 版本之前,JDK 的 Selector 基于 select/poll 模型实现,它是基于 I/O 复用技术的非阻塞 I/O,不是异步 I/O。在 JDK 1.5 update10 和 Linux core2.6 以上版本,Sun 优化了 Selctor 的实现,它在底层使用 epoll 替换了 select/poll,上层的 API 并没有变化,可以认为是 JDK NIO 的一次性能优化,但是它仍旧没有改变 I/O 的模型。相关优化的官方说明如下图所示:

    图 3 JDK1.5_update10 支持 epoll

    由 JDK1.7 提供的 NIO 2.0 新增了异步的套接字通道,它是真正的异步 I/O,在异步 I/O 操作的时候可以传递信号变量,当操作完成之后会回调相关的方法,异步 I/O 也被称为 AIO。NIO 类库支持非阻塞读和写操作,相比于之前的同步阻塞读和写,它是异步的,因此很多人仍然习惯于称 NIO 为异步非阻塞 I/O,在此不需要太咬文嚼字。

    不同的 I/O 模型由于线程模型、API 等差别很大,所以用法的差异也非常大。各种 I/O 模型的优缺点对比如下:


    同步阻塞 I/O(BIO)非阻塞 I/O(NIO)异步 I/O(AIO)
    客户端个数:I/O 线程1:1M:1(1 个 I/O 线程处理多个客户端连接)M:0(不需要用户启动额外的 I/O 线程,被动回调)
    I/O 类型(阻塞)阻塞 I/O非阻塞 I/O非阻塞 I/O
    I/O 类型(同步)同步 I/O同步 I/O(I/O 多路复用)异步 I/O
    API 使用难度简单非常复杂复杂
    调试难度简单复杂复杂
    可靠性非常差
    吞吐量
    表 2 Java 各种 I/O 模型优缺点对比

    1.3.2 RPC 工作原理

    RPC 的全称是 Remote Procedure Call,它是一种进程间通信方式。允许像调用本地服务一样调用远程服务,它的具体实现方式可以不同,例如 Spring 的 HTTP Invoker,Facebook 的 Thrift 二进制私有协议通信。

    RPC 框架的目标就是让远程过程(服务)调用更加简单、透明,RPC 框架负责屏蔽底层的传输方式(TCP 或者 UDP)、序列化方式(XML/Json/ 二进制)和通信细节。框架使用者只需要了解谁在什么位置提供了什么样的远程服务接口即可,开发者不需要关心底层通信细节和调用过程。

    RPC 框架的调用原理图如下所示:

    图 4 RPC 框架原理图

    RPC 框架实现的几个核心技术点总结如下:

    1.远程服务提供者需要以某种形式提供服务调用相关的信息,包括但不限于服务接口定义、数据结构,或者中间态的服务定义文件,例如 Thrift 的 IDL 文件,WS-RPC 的 WSDL 文件定义,甚至也可以是服务端的接口说明文档;服务调用者需要通过一定的途径获取远程服务调用相关信息,例如服务端接口定义 Jar 包导入,获取服务端 IDL 文件等。

    2.远程代理对象:服务调用者调用的服务实际是远程服务的本地代理,对于 Java 语言,它的实现就是 JDK 的动态代理,通过动态代理的拦截机制,将本地调用封装成远程服务调用。

    3.通信:RPC 框架与具体的协议无关,例如 Spring 的远程调用支持 HTTP Invoke、RMI Invoke,MessagePack 使用的是私有的二进制压缩协议。

    4.序列化:远程通信,需要将对象转换成二进制码流进行网络传输,不同的序列化框架,支持的数据类型、数据包大小、异常类型以及性能等都不同。不同的 RPC 框架应用场景不同,因此技术选择也会存在很大差异。一些做的比较好的 RPC 框架,可以支持多种序列化方式,有的甚至支持用户自定义序列化框架(Hadoop Avro)。

    1.3.3 RPC 异步与 I/O 的异步

    RPC 异步与 I/O 的异步没有必然关系,当然,在大多数场景下,RPC 框架底层会使用异步 I/O,实现全栈异步。

    RPC 框架异步调度模型如下所示:

    图 5 异步 RPC 调用原理

    异步 RPC 调用的关键点有 2 个:

    1.不能阻塞调用方线程:接口调用通常会返回 Future 或者 Promise 对象,代表异步操作的一个回调对象,当异步操作完成之后,由 I/O 线程回调业务注册的 Listener,继续执行业务逻辑。

    2.请求和响应的上下文关联:除了 HTTP/1.X 协议,大部分二进制协议的 TCP 链路都是多路复用的,请求和响应消息的发送和接收顺序是无序的。所以,异步 RPC 调用需要缓存请求和响应的上下文关联关系,以及响应需要使用到的消息上下文。

    正如图5所示,当 RPC 调用请求消息发送到 I/O 线程的消息队列之后,业务线程就可以返回,至于 I/O 线程采用同步还是异步的方式读写消息,与 RPC 调用的同步和异步没必然的关联关系,当然,采用异步 I/O, 整体性能和可靠性会更好一些,所以现在大部分的 RPC 框架底层采用的都是异步 / 非阻塞 I/O。以 Netty 为例,无论 RPC 调用是同步还是异步,只要调用消息发送接口,Netty 都会将发送请求封装成 Task,加入到 I/O 线程的消息队列中统一处理,相关代码如下所示:

    异步回调的一些实现策略:

    1.Future/Promise:比较常用的有 JDK8 之前的 Future,通过添加 Listener 来做异步回调,JDK8 之后通常使用 CompletableFuture,它支持各种复杂的异步处理策略,例如自定义线程池、多个异步操作的编排、有返回值和无返回值异步、多个异步操作的级联操作等。

    2.线程池 +RxJava:最经典的实现就是 Netflix 开源的 Hystrix 框架,使用 HystrixCommand(创建线程池)做一层异步封装,将同步调用封装成异步调用,利用 RxJava API,通过订阅的方式对结果做异步处理,它的工作原理如下所示:

    图 6 利用 Hystix 做异步化封装

    1.3.4 总结

    通过以上分析可以得出如下结论:

    1.RPC 异步指的是业务线程发起 RPC 调用之后,不用同步等待服务端返回应答,而是立即返回,当接收到响应之后,回调执行业务的后续逻辑。

    2.I/O 的异步是通信层的具体实现策略,使用异步 I/O 会带来性能和可靠性提升,但是与 RPC 调用是同步还是异步没必然关系。

    02

    RPC 同步与异步调用

    很多 RPC 框架同时支持同步和异步调用,下面对同步和异步 RPC 调用的工作原理以及优缺点进行分析。

    2.1

    同步 RPC 调用

    2.1.1 同步 RPC 调用流行的原因

    在传统的单体架构中,以 Spring + Struts + MyBatis + Tomcat 为例,业务逻辑通常由各种 Controller(Spring Bean)来实现,它的逻辑架构如下所示:

    图 7 基于 MVC 的传统单体架构

    在单体架构中,本地方法调用都是同步方式,而且定义形式往往都是如下形式(请求参数 + 方法返回值):

    String sayHello(String hello);

    切换到 RPC 框架之后,很多都支持通过 XML 引用或者代码注解的方式引用远端的 RPC 服务,可以像使用本地接口一样调用远程的服务,这种开发模式与传统单体应用开发模式相似,编程简单,学习和切换成本低,调试也比较方便,因此,同步 RPC 调用成为大部分项目的首选。

    以 XML 方式导入远端服务提供者的 API 接口示例如下:

    
        

    <xxx:reference id="echoService" interface="edu.neu.EchoService" />




    <bean class="edu.neu.xxxAction" init-method="start">




    <property name="echoService" ref="echoService" />




    </bean>

    导入之后业务就可以直接在代码中调用 echoService 接口,与传统单体应用调用本地 Spring Bean 一样,无需感知远端服务接口的具体部署位置信息。

    2.1.2 同步 RPC 调用工作原理

    同步 RPC 调用是最常用的一种服务调用方式,它的工作原理如下:客户端发起远程 RPC 调用请求,用户线程完成消息序列化之后,将消息投递到通信框架,然后同步阻塞,等待通信线程发送请求并接收到应答之后,唤醒同步等待的用户线程,用户线程获取到应答之后返回。

    它的工作原理图如下所示:

    图 8 同步 RPC 调用

    主要流程如下:

    1.消费者调用服务端发布的接口,接口调用由 RPC 框架包装成动态代理,发起远程 RPC 调用。

    2.消费者线程调用通信框架的消息发送接口之后,直接或者间接调用 wait() 方法,同步阻塞等待应答。

    3.通信框架的 I/O 线程通过网络将请求消息发送给服务端。

    4.服务端返回应答消息给消费者,由通信框架负责应答消息的反序列化。

    5.I/O 线程获取到应答消息之后,根据消息上下文找到之前同步阻塞的业务线程,notify() 阻塞的业务线程,返回应答给消费者,完成 RPC 调用。

    2.1.3 同步 RPC 调用面临的挑战

    同步 RPC 调用的主要缺点如下:

    1.线程利用率低:线程资源是系统中非常重要的资源,在一个进程中线程总数是有限制的,提升线程使用率就能够有效提升系统的吞吐量,在同步 RPC 调用中,如果服务端没有返回响应,客户端业务线程就会一直阻塞,无法处理其它业务消息。

    2.纠结的超时时间:RPC 调用的超时时间配置是个比较棘手的问题。如果配置的过大,一旦服务端返回响应慢,就容易把客户端挂死。如果配置的过小,则超时失败率会增加。即便参考测试环境的平均和最大时延来设置,由于生产环境数据、硬件等与测试环境的差异,也很难一次设置的比较合理。另外,考虑到客户端流量的变化、服务端依赖的数据库、缓存、第三方系统等的性能波动,这都会导致服务调用时延发生变化,因此,依靠超时时间来保障系统的可靠性,难度很大。

    3.雪崩效应:在一个同步调用链中,只要下游某个服务返回响应慢,会导致故障沿着调用链向上游蔓延,最终把整个系统都拖垮,引起雪崩,示例如下:

    图 9 同步 RPC 调用级联故障

    2.2

    异步 RPC 调用

    2.2.1 异步 RPC 调用工作原理

    JDK 原生的 Future 主要用于异步操作,它代表了异步操作的执行结果,用户可以通过调用它的 get 方法获取结果。如果当前操作没有执行完,get 操作将阻塞调用线程。在实际项目中,往往会扩展 JDK 的 Future,提供 Future-Listener 机制,它支持主动获取和被动异步回调通知两种模式,适用于不同的业务场景。

    基于 JDK 的 Future-Listener 机制,可以实现异步 RPC 调用,它的工作原理如下所示:

    图 10 异步 RPC 调用原理图

    异步 RPC 调用的工作流程如下:

    1.消费者调用 RPC 服务端发布的接口,接口调用由 RPC 框架包装成动态代理,发起远程 RPC 调用。

    2.通信框架异步发送请求消息,如果没有发生 I/O 异常,返回。

    3.请求消息发送成功后,I/O 线程构造 Future 对象,设置到 RPC 上下文中。

    4.用户线程通过 RPC 上下文获取 Future 对象。

    5.构造 Listener 对象,将其添加到 Future 中,用于服务端应答异步回调通知。

    6.用户线程返回,不阻塞等待应答。

    7.服务端返回应答消息,通信框架负责反序列化等。

    8.I/O 线程将应答设置到 Future 对象的操作结果中。

    9.Future 对象扫描注册的监听器列表,循环调用监听器的 operationComplete 方法,将结果通知给监听器,监听器获取到结果之后,继续后续业务逻辑的执行,异步 RPC 调用结束。

    2.2.2 异步 RPC 调用编程模型的优化

    Java8 的 CompletableFuture 提供了非常丰富的异步功能,它可以帮助用户简化异步编程的复杂性,通过 Lambda 表达式可以方便的编写异步回调逻辑,除了普通的异步回调接口,它还提供了多个异步操作结果转换以及与或等条件表达式的编排能力,方便对多个异步操作结果进行逻辑编排。

    CompletableFuture 提供了大约 20 类比较实用的异步 API,接口定义示例如下:

    图 11 CompletableFuture 异步 API 定义

    利用 JDK 的 CompletableFuture 与 Netty 的 NIO,可以非常方便的实现异步 RPC 调用,设计思路如下所示:

    图 12 异步 RPC 调用设计原理

    异步 RPC 调用的工作流程如下:

    1.消费者通过 RPC 框架调用服务端。

    2.Netty 异步发送 HTTP 请求消息,如果没有发生 I/O 异常就正常返回。

    3.HTTP 请求消息发送成功后,I/O 线程构造 CompletableFuture 对象,设置到 RPC 上下文中。

    4.用户线程通过 RPC 上下文获取 CompletableFuture 对象。

    5.不阻塞用户线程,立即返回 CompletableFuture 对象。

    6.通过 CompletableFuture 编写 Function 函数,在 Lambda 表达式中实现异步回调逻辑。

    7.服务端返回 HTTP 响应,Netty 负责反序列化工作。

    8.Netty I/O 线程通过调用 CompletableFuture 的 complete 方法将应答设置到 CompletableFuture 对象的操作结果中。

    9.CompletableFuture 通过 whenCompleteAsync 等接口异步执行业务回调逻辑,实现 RPC 调用的异步化。

    2.2.3 异步 RPC 调用的优势

    异步 RPC 调用相比于同步调用有两个优点:

    1.化串行为并行,提升 RPC 调用效率,减少业务线程阻塞时间。

    2.化同步为异步,避免业务线程阻塞。

    假如一次阅读首页访问需要调用多个服务接口,采用同步调用方式,它的调用流程如下所示:

    图 13 同步调用多个服务场景

    由于每次 RPC 调用都是同步阻塞,三次调用总耗时为 T = T1 + T2 + T3。下面看下采用异步 RPC 调用之后的优化效果:

    图 14 异步多服务调用场景

    采用异步 RPC 调用模式,最后调用三个异步操作结果 Future 的 get 方法同步等待应答,它的总执行时间 T = Max(T1, T2,T3),相比于同步 RPC 调用,性能提升效果非常明显。

    2.3

    总结

    2.3.1 异步 RPC 调用性能未必会更高

    通常在实验室环境中测试,由于网络时延小、模拟业务又通常比较简单,所以异步 RPC 调用并不一定性能更高,但在生产环境中,异步 RPC 调用往往性能更高、可靠性也更好。主要原因是网络环境相对恶劣、真实的 RPC 调用耗时更多等,这种恶劣的运行环境正好可以发挥异步 RPC 调用的优势。

    2.3.2 最佳实践

    服务框架支持多种 RPC 调用方式,在实际项目中如何选择呢?建议从以下几个角度进行考虑:

    1.降低业务 E2E 时延:业务调用链是否太长、某些服务是否不太可靠,需要对服务调用流程进行梳理,看是否可以通过异步并行 RPC 调用来提升调用效率,降低 RPC 调用时延。

    2.可靠性角度:某些业务调用链上的关键服务不太可靠,一旦出故障会导致大量线程资源被挂住,可以考虑使用异步 RPC 调用防止故障扩散。

    3.传统的 RPC 调用:服务调用比较简单,对时延要求不高的场景,则可以考虑同步 RPC 调用,降低编程复杂度,以及调试难度,提升开发效率。

    往期精彩回顾

    Mesher集成Istio实践

    基于CSE的微服务架构实践-Spring Cloud技术栈选型

    ServiceComb Alpha 集群动态主节点实现

    扫码加群

    更多精彩

    本文为 InfoQ 中文站特供稿件,首发地址为:https://www.infoq.cn/article/q3iPeYQv-uF5YsISq62c

    好看你就点点我

    戳“阅读原文”直达现场~

    展开全文
  • 同步调用与异步调用

    千次阅读 2018-09-11 11:02:33
    同步调用和异步调用是两种提交任务的方式 同步调用:提交完任务后,就在原地等待任务执行完毕,拿到运行结果/返回值后再执行下一步,同步调用下任务是串行执行。 异步调用:提交完任务后,不会再原地等待任务执行...

    同步调用和异步调用是两种提交任务的方式

    同步调用:提交完任务后,就在原地等待任务执行完毕,拿到运行结果/返回值后再执行下一步,同步调用下任务是串行执行。

    异步调用:提交完任务后,不会再原地等待任务执行完毕,直接执行下一行代码,异步调用时并发执行。

    异步调用,几乎同时下达任务

    from concurrent.futures import ProcessPoolExecutor
    import os, time,random
    
    
    
    def task(x):
        print("%s is running" % os.getpid())
        time.sleep(random.randint(1,3))
        return x**2
    
    if __name__=="__main__":
        p = ProcessPoolExecutor()
        futures = []
        for i in range(10):
            future = p.submit(task,i)#返回计算结果
            futures.append(future)
        p.shutdown(wait=True)#默认waiti为True 等待十个进程任务执行完,关闭进程池的入口。
        for future in futures:
            print(future.result())
        print("主")
    结果为:
    
    10760 is running
    10564 is running
    12848 is running
    3928 is running
    10564 is running
    12848 is running
    10760 is running
    3928 is running
    10760 is running
    10564 is running
    0
    1
    4
    9
    16
    25
    36
    49
    64
    81
    主

    如果把p.shutdown(wait=True)去掉,则会出现结果穿插在进程中

    2908 is running
    8092 is running
    10376 is running
    13136 is running
    8092 is running
    2908 is running
    0
    1
    8092 is running
    10376 is running
    4
    2908 is running
    10376 is running
    9
    16
    25
    36
    49
    64
    81
    主

     

    同步调用:

    def task(x):
        print("%s is running" % os.getpid())
        time.sleep(random.randint(1,3))
        return x**2
    
    if __name__=="__main__":
        p = ProcessPoolExecutor()
        for i in range(10):
            res = p.submit(task,i).result()#返回计算结果
            print(res)
        print("主")
    
    结果为:
    
    8360 is running
    0
    472 is running
    1
    4888 is running
    4
    12980 is running
    9
    8360 is running
    16
    472 is running
    25
    4888 is running
    36
    12980 is running
    49
    8360 is running
    64
    472 is running
    81
    主

    串行执行,效率低下。

    展开全文
  • 《Netty 进阶之路》、《分布式服务框架原理与实践》作者李林锋深入剖析通信层 RPC 调用异步化。李林锋此后还将在 InfoQ 上开设 Netty 专题持续出稿,感兴趣的同学可以持续关注。1. 异步的一些常见误区1.1.常见的...
  • 分布式 - RPC同步和异步说明

    千次阅读 2018-09-13 17:20:12
    要实时就用同步,要吞吐率就用异步同步调用 流程略 实现负载均衡:连接池中建立了与一个RPC-server集群的连接,连接池在返回连接的时候,需要具备负载均衡策略。 实现故障转移:连接池中建立了与一个RPC-...
  • RPC(Remote Procedure Call Protocol)远程过程调用协议实例,学习使用thrift无痛入门代码。具体使用方法,可以看博客详解。
  • 常见的理解误区在将近 10 年的平台中间件研发历程中,我们的平台业务经历了从 C++ 到 Java,从同步的 BIO 到非阻塞的 NIO,以及纯异步的事件驱动 I/O(AIO)。服务器也从 Web 容器逐步迁移到了内部更轻量、更高性能...
  • 精通RabbitMQ之RPC同步调用

    千次阅读 2018-12-23 12:33:31
    精通RabbitMQ之RPC同步调用 前面我们对应用解耦做过分析,我们能够使用消息中间件来完成应用解耦,很大一部分原因是因为我们的系统之间可以...RabbitMQ RPC同步调用实际上是使用了两个异步调用完成的,生产者投递...
  • 【手写dubbo-5】rpc调用异步同步

    千次阅读 2021-05-13 17:05:26
    一般情况下每个service的调用的过程都是同步的,例如在一个service中通过RestTemplate调用一个接口,这样也可以认为是一个远程调用,这种是同步进行的,整个调用的思路如下图。调用线程只需要等待调用结果,并且返回...
  • RPC(Remote Procedure Call Protocol) 远程过程调用 远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。也就是说两台服务器A、B,一个应用部署在A服务器上,想要调用B...
  • 远程调用服务,同步RPC01、远程过程调用(remote procedure call,RPC): RPC类似于调用一个本地对象的一个方法。 是同步操作,会阻塞调用代码的执行,直到被调用的过程执行完毕。这是与消息队列MQ最大的区别。 ...
  • 文章内容 异步 RPC 调用的应用场景 ...对于一些逻辑上不存在互相依赖关系的服务,可以通过异步 RPC 调用,实现服务的并行调用,通过并行调用来降低服务调用总耗时,以手游购买道具流程为例,消费次数限制鉴权、...
  • 主要给大家介绍了关于如何在Spring异步调用中传递上下文的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用Spring具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧
  • Dubbo 同步异步调用的几种方式

    千次阅读 2019-03-15 00:04:20
    我们知道,Dubbo 缺省协议采用单一长连接,底层实现是 Netty 的 NIO ... 异步调用 参数回调 事件通知 同步调用 同步调用是一种阻塞式的调用方式,即 Consumer 端代码一直阻塞等待,直到 Provider 端返回为止; ...
  • 直观讲解--RPC调用和HTTP调用的区别

    万次阅读 多人点赞 2018-08-07 15:04:00
    很长时间以来都没有怎么好好搞清楚RPC(即Remote Procedure Call,远程过程调用HTTP调用的区别,不都是写一个服务然后在客户端调用么?这里请允许我迷之一笑~Naive!本文简单地介绍一下两种形式的C/S架构,先说...
  • 多线程异步调用接口数据同步问题

    千次阅读 2019-08-27 18:16:00
    在多线程进行接口调用时如果调用的接口执行时间不同会直接跳过慢的接口,导致最终数据出错。 PrintUtil类模拟被调用的2个接口方法 ThreadDemo类是多线程的实现类 Test类是调用方。 注释掉f1.join()时执行结果:...
  • 这一篇,我们为大家带来了开发过程中,最常接触到的同步异步调用解析。本文会介绍下同步异步的使用场景,以及 SOFARPC 中的代码实现机制,为了方便大家理解阅读代码。不会过多的设计代码实现细节,更多的还是希望...
  • RabbitMQ实现异步同步RPC

    千次阅读 2017-05-06 10:50:33
    一、同步RPC 客户端:package com.rabbitmq.synchronization;import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.UUID; import java....
  • 实际上rpc同步调用与异步调用核心问题只在于,线程是否阻塞等待结果返回,如果不等待那么就是异步的调用,而等待wait的话那么就是同步调用。实现的原理大致如下: client一个线程调用远程接口,生成一个唯一的ID...
  • 由于netty通信是异步的,客户端请求之后就不再阻塞等待服务端的结果返回了,客户端可以去做其他的事情,而服务端处理完之后会将响应结果...下面用代码实现一次完整的,rpc通过netty调用之后,找到对应请求的response...
  • 异步调用的理解

    千次阅读 2018-10-16 22:06:05
    首先讲下个人对异步和同步,阻塞非阻塞的概念的理解。 关于这个概念看了许多解释,都是似是而非,并不能完全get到点。个人认为从进程间通信的角度理解比较好,在《操作系统》中关于的部分是这样解释的: 进程间...
  • RPC服务 从三个角度来介绍RPC服务:分别是RPC架构,同步异步调用以及流行的RPC框架。 RPC架构 先说说RPC服务的基本架构吧。允许我可耻地盗一幅图哈~我们可以很清楚地看到,一个完整的RPC架构里面包含了四个核心的...
  • 0. dubbo同步调用、异步调用和是否返回结果配置 (1)dubbo默认为同步调用,并且有返回结果。 (2)dubbo异步调用配置,设置 async="true",异步调用可以提高效率。 (3)dubbo默认是有返回结果,不需要返回,...
  • GRPC 异步调用 C++

    千次阅读 2017-02-28 21:36:42
    原文 ... protobuf原生的异步调用 void DoneCallback(PingMessage *response) { } void async_test() { RpcClient client("127.0.0.1", 8000); PingService::Stub stub(client.Channel())
  • 《Netty 进阶之路》、《分布式服务框架原理与实践》作者李林锋深入剖析通信层 RPC 调用异步化。李林锋此后还将在 InfoQ 上开设 Netty 专题持续出稿,感兴趣的同学可以持续关注。1. 异步RPC调用的应用场景1.1 ...
  • SOFAScalable Open Financial Architecture是蚂蚁金服自主研发的...《剖析 | SOFARPC 框架》系列由 SOFA 团队源码爱好者们出品 前言这一篇,我们为大家带来了开发过程中,最常接触到的同步异步调用解析。本文会...
  • 其实就是RPC请求的整体耗时,如果采用同步调用, CPU 大部分的时间都在等待而没有去计算,从而导致 CPU 的利用率不够。这就好比工地里面搬砖,砌墙,捣水泥都由一个人干,其他人旁观, 那效率就十分低下。 RPC 请求...
  • c#自带remote,入门级demo,看看即可。。。个人看法:分布式中,用rpc真不如异步消息队列。。。

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 54,897
精华内容 21,958
关键字:

rpc异步调用和同步调用api