精华内容
下载资源
问答
  • Openstack_通用技术_RPC 远程异步调用

    千次阅读 2016-09-25 01:02:52
    目录目录 RPC 一个通过 HTTP Request 调用操作函数的 RPC 实现样例 ...为不同的进程服务提供了 call()(同步) cast()(异步) 两种调用方式。问题 1: 在一个 Openstack 项目中拥有多个不同的进程服务,EG. API Servic

    目录

    RPC

    RPC: 同一个项目内的不同服务进程之间的交互方式。为不同的进程服务提供了 call()(同步) 和 cast()(异步) 两种调用方式。

    问题 1: 在一个 Openstack 项目中拥有多个不同的进程服务,EG. API Service/Manage Service。 当我们通过 Client 发送 API Request 去调用 Manage Service 执行一个操作任务时,我们会希望这个调用的结果是能够快速响应到 Client 的(保证用户体验)。

    问题 2: 而且进程服务之间的调用我们还需要考虑如何有效的避免进程服务之间调用的阻塞问题。EG. API Service 调用 Manage Service 时,如果不能及时的将 API Service 释放掉,那么 API Request 就会因为被占用,而无法处理新的请求。

    对于上面两个问题,我们可以通过将具体的执行过程响应过程分离来达到理想的效果。这也是 RPC 和 API 存在的原因之一。

    一个通过 HTTP Request 调用操作函数的 RPC 实现样例

    包含了下列两个过程实现:

    • 接收 HTTP Request
    • RPC 调用操作函数

    环境

    Devstack 用的 L 版,样例在自定义的 Openstack 项目中实现,详见自动化生成 Openstack 新项目开发框架

    接收 HTTP Request

    # project/api/v1/router.py
    
    from project.api.v1 import new_resource
    
    class APIRouter(octopunch.api.openstack.APIRouter):
        """Routes requests on the API to the appropriate controller and method."""
        ExtensionManager = extensions.ExtensionManager
    
        def _setup_routes(self, mapper, ext_mgr):
            self.resources['versions'] = versions.create_resource()
            mapper.connect("versions", "/",
                           controller=self.resources['versions'],
                           action='show')
    
            mapper.redirect("", "/")
    
            self.resources['new_resource'] = new_resource.create_resource(ext_mgr)
            mapper.resource('new_resource', 'new_resource',
                            controller=self.resources['new_resource'])
    • 将 Resource 和 HTTP 方法绑定到 Controller 的 Action 函数中
    # project/api/v1/new_resource.py
    
    class NewResourceController(wsgi.Controller):
    
        def __init__(self, ext_mgr):
            self.ext_mgr = ext_mgr
            super(NewResourceController, self).__init__()
    
        # Create() 对应了 HTTP 的 POST 方法
        @wsgi.serializers()
        def create(self, req, body):
            """Create a NewResource."""
            context = req.environ['project.context']
    
            # Sync the project database.
            self.new_resource_api.db_sync(context)
    
    def create_resource(ext_mgr):
        """project resource factory method."""
        return wsgi.Resource(NewResourceController(ext_mgr))

    上述两个文件实现了通过 HTTP Request 的 POST 方法来执行指定的 Create() 函数。

    RPC 调用具体的操作函数

    • 通过 API 调用 RPC-API
    # project/new_resource/api.py
    
    # import rpcapi module
    from project.new_resource import rpcapi as new_resource_rpcapi
    
    class API(base.Base):
        """API for interacting with the new_resource manager."""
    
        def __init__(self, db_driver=None, image_service=None):
            self.new_resource_rpcapi = new_resource_rpcapi.NewResourceAPI()
            super(API, self).__init__(db_driver)
    
        # 定义调用 rpcapi 的接口函数
        def db_sync(self, context):
            """Call rpc api to start db sync for new_resource database."""
            self.new_resource_rpcapi.db_sync(context)

    API 的存在是为了能够快速的响应请求,至于之后的执行过程交由 RPC-API 和 Manager 来处理

    • rpc-api.py 调用 manager.py
      rpcapi.py 定义的 RPC 接口函数会自动的映射到 manager.py 中指定的处理函数。
    # project/new_resource/rpcapi.py
    
    class NewResourceAPI(object):
    
        def __init__(self):
            super(NewResourceAPI, self).__init__()
            target = messaging.Target(topic=CONF.manage_topic,
                                      version=self.RPC_API_VERSION)
            serializer = objects_base.ProjectObjectSerializer()
            self.client = rpc.get_client(target, version_cap='1.8',
                                         serializer=serializer)
    
        # 定义 rpcapi 函数,使用 cast 异步调用方式
        def db_sync(self, context):
            cctxt = self.client.prepare()
            # 指定 rpcapi 的调用方式,和指定映射到 manager.py 的处理函数
            cctxt.cast(context, 'db_sync')

    RPC-API 的存在是为了快速的响应进程服务之间的调用请求。

    # project/new_resource/manager.py
    class NewResourceManager(manager.Manager):
    
        RPC_API_VERSION = '1.8'
    
        target = messaging.Target(version=RPC_API_VERSION)
    
        def __init__(self, service_name=None, *args, **kwargs):
            super(NewResourceManager, self).__init__(*args, **kwargs)
            self._startup_delay = True
    
        def init_host_with_rpc(self):
            eventlet.sleep(CONF.periodic_interval)
            self._startup_delay = False
    
        def db_sync(self, context):
            print "这里是具体的 RPC 操作函数"

    小结
    Openstack 的 PRC 调用的过程为: api.pyrpcapi.pymanager.py
    详见:Openstack Nova 源码分析 — RPC 远程调用过程

    测试

    • 启动 API 服务
    project-api --config-file /etc/project/proname.conf
    • 启动 Manager 服务
    project-manager --config-file /etc/project/project.conf
    • 发送 HTTP 请求
    curl -i 'http://<Service_host_ip>:<service_port>/v1/<project_id>/<ResourceUrl>' -X POST -H "Content-Type: application/json" -H "X-Auth-Project-Id: admin" -H "X-Auth-Token: <token_id>" -d '<body_content_dict>'

    注意: 样例需要根据自身开发环境进行调整,

    展开全文
  • 这里的拦截器同时支持异步和同步两种模式,对于一些比较快的操作,可以直接使用同步拦截器。 <p><img alt="image" src=...
  • NettyRPC, 在联网的,但另一个RPC框架...特性简单,小代码库,易于学习 API非常快速,高性能完全非阻塞异步调用同步调用,单向调用。长期持久连接,自动重新连接到服务器高可用性,负载平衡故障转移多线程服务器与
  • 1.背景MQTT协议是基于PUB/SUB的异步通信模式,不适用于服务端同步控制设备端返回结果的场景。物联网平台基于MQTT协议制定了一套请求响应的同步机制,无需改动MQTT协议即可实现同步通信。物联网平台提供API给服务端...

    617592274544fe8da113338914b34df0.png

    1.背景

    MQTT协议是基于PUB/SUB的异步通信模式,不适用于服务端同步控制设备端返回结果的场景。物联网平台基于MQTT协议制定了一套请求和响应的同步机制,无需改动MQTT协议即可实现同步通信。物联网平台提供API给服务端,设备端只需要按照固定的格式回复PUB消息,服务端使用API,即可同步获取设备端的响应结果。

    2.名词解释

    RRPC:Revert-RPC。RPC(Remote Procedure Call)采用客户机/服务器模式,用户不需要了解底层技术协议,即可远程请求服务。RRPC则可以实现由服务端请求设备端并能够使设备端响应的功能。

    3.RRPC原理

    5e954787fa85524d1a787fe951aea7f2.png

    1. 物联网平台收到来自用户服务器的RRPC调用,下发一条RRPC请求消息给设备。消息体为用户传入的数据,Topic为物联网平台定义的Topic,其中含有唯一的RRPC消息ID。

    2. 设备收到下行消息后,按照指定Topic格式(包含之前云端下发的唯一的RRPC消息ID)回复一条RRPC响应消息给云端,云端提取出Topic中的消息ID,和之前的RRPC请求消息匹配上,然后回复给用户服务器。

    3. 如果调用时设备不在线,云端会给用户服务器返回设备离线的错误;如果设备没有在超时时间内(5秒内)回复RRPC响应消息,云端会给用户服务器返回超时错误。

    3.1 设备端标识 ext=1

    为了配合RRPC调用,设备端必须在进行MQTT CONNECT协议设置时,在clientId中增加ext=1参数:

    2a61d8259d31c2c21402ef9b5fcf318d.png

    3.2 RRPC通信Topic

    RRPC调用的Topic格式如下:

    5c9bc88f78763fa36bba85202ff2b987.png

    其中${messageId}是IoT物联网平台生成的唯一的RRPC消息id,黑体部分是IoT物联网平台约定,红色部分可以根据业务场景自定义。

    3.3 云端POP API调用

    5b18a2e1e0f3adc4d8e5e7d2369e59d3.png

    4.开发实战

    4.1 设备端代码(Nodejs)

    /**
    "dependencies": { "mqtt": "2.18.8" }
    */

    const crypto = require('crypto');
    const mqtt = require('mqtt');
    //设备身份三元组+区域 

    const deviceConfig = {
        productKey"替换productKey",
        deviceName"替换deviceName",
        deviceSecret"替换deviceSecret",
        regionId"cn-shanghai"
    };

    const url = `tcp://${deviceConfig.productKey}.iot-as-mqtt.${deviceConfig.regionId}.aliyuncs.com:1883`;
    //2.建立连接
    const client = mqtt.connect(url, makeMqttOptions(deviceConfig));
    //3.监听RRPC指令
    client.subscribe(`/ext/rrpc/+/this/is/my/topic`)
    client.on('message'function(topic, message{
        console.log("topic=" + topic)
        console.log("payloadJson=" + message)
        //4.响应RRPC指令
        if (topic.indexOf(`/ext/rrpc/`) > -1) {
            client.publish(topic, JSON.stringify({ code200msg"rrpc ok" }));
        }

    })
    /*
      生成MQTT连接参数
    */

    function makeMqttOptions() {

        const params = {
            productKey: deviceConfig.productKey,
            deviceName: deviceConfig.deviceName,
            timestampDate.now(),
            clientIdMath.random().toString(36).substr(2),
        }
        //CONNECT参数
        const options = {
            keepalive60//60s
            clean: false//cleanSession保持持久会话
            protocolVersion: 4 //MQTT v3.1.1
        }
        //1.生成clientId,username,password
        options.password = signHmacSha1(params, deviceConfig.deviceSecret);
        options.clientId = `${params.clientId}|securemode=3,signmethod=hmacsha1,timestamp=${params.timestamp},ext=1|`;
        options.username = `${params.deviceName}&${params.productKey}`;

        return options;
    }

    /*
      生成基于HmacSha1的password
      参考文档:https://help.aliyun.com/document_detail/73742.html?#h2-url-1
    */

    function signHmacSha1(params, deviceSecret{

        let keys = Object.keys(params).sort();
        // 按字典序排序
        keys = keys.sort();
        const list = [];
        keys.map((key) => {
            list.push(`${key}${params[key]}`);
        });
        const contentStr = list.join('');
        return crypto.createHmac('sha1', deviceSecret).update(contentStr).digest('hex');
    }

    4.2 服务端POP API调用代码(Nodejs)

    const co = require('co');
    const RPCClient = require('@alicloud/pop-core').RPCClient;

    const options = {
        accessKey:"替换accessKey",
        accessKeySecret"替换accessKeySecret"
    };

    //1.初始化client
    const client = new RPCClient({
        accessKeyId: options.accessKey,
        secretAccessKey: options.accessKeySecret,
        endpoint'https://iot.cn-shanghai.aliyuncs.com',
        apiVersion'2018-01-20'
    });
    //2.构造RRPC参数
    const params = {
        ProductKey"你的产品ProductKey",
        DeviceName"你的设备DeviceName",
        RequestBase64Byte: Buffer.from("Hello World").toString('base64'),
        Timeout5000,
        Topic:'/this/is/my/topic'
    };

    co(function*() {
        try {
            //3.发起API调用
            const response = yield client.request('RRpc', params);

            console.log(JSON.stringify(response));
        } catch (err) {
            console.log(err);
        }
    });

    4.3 实际运行效果

    # 设备端日志
    $ node rrpc_client.js 
    topic=/ext/rrpc/1128893568908287488/this/is/my/topic
    payloadJson=Hello World

    # 服务端日志
    $ node rrpc_Server.js 
    {
        "MessageId""1128893568908287488",
        "RequestId""F9540921-8FDB-4671-B50A-84D119DA56D4",
        "PayloadBase64Byte""eyJjb2RlIjoyMDAsIm1zZyI6InJycGMgb2sifQ==",
        "Success": true,
        "RrpcCode""SUCCESS"
    }

    其中 eyJjb2RlIjoyMDAsIm1zZyI6InJycGMgb2sifQ== 
    解码后即:{ code: 200, msg: "rrpc ok" }

    4.4 IoT物联网平台 日志服务

    94e6d01df91841480d96f0ae2a961dba.png

    c7501675c655203dc81bd846e8c46d06.png

    展开全文
  • 原文链接:https://my.oschina.net/xianggao/blog/661085#0 系列目录#聊聊远程通信Java远程通讯技术及原理分析聊聊Socket、TCP/IP、HTTP、FTP及网络编程RMI原理及实现RPC原理...API对比与分析聊聊同步异步、阻塞与...

    原文链接:https://my.oschina.net/xianggao/blog/661085


    #0 系列目录#

    近来遇到了一些常见的概念,尤其是网络编程方面的概念,如:阻塞、非阻塞、异步I/O等等,对于这些概念自己也没有太清晰的认识,只是很模糊的概念,说了解吧也了解,但是要让自己准确的描述概念方面的具体细节,却说的不那么准确,这也是自己在这几个方面也没有细细考究过的原因吧。经过看了些这几个概念的资料,发现同步、异步、阻塞、非阻塞的概念其实也并不难以理解,在此写下此文,欢迎拍砖,希望多多交流。

    #1 同步与异步# 首先来解释同步和异步的概念,这两个概念与消息的通知机制有关。也就是同步与异步主要是从消息通知机制角度来说的。 ##1.1 概念描述## 所谓同步就是一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列。要么成功都成功,失败都失败,两个任务的状态可以保持一致。

    所谓异步是不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了。至于被依赖的任务最终是否真正完成,依赖它的任务无法确定,所以它是不可靠的任务序列

    ##1.2 消息通知## 异步的概念和同步相对。当一个同步调用发出后,调用者要一直等待返回消息(结果)通知后,才能进行后续的执行;当一个异步过程调用发出后,调用者不能立刻得到返回消息(结果)。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者

    这里提到执行部件和调用者通过三种途径返回结果:状态、通知和回调。使用哪一种通知机制,依赖于执行部件的实现,除非执行部件提供多种选择,否则不受调用者控制

    1. 如果执行部件用状态来通知,那么调用者就需要每隔一定时间检查一次,效率就很低(有些初学多线程编程的人,总喜欢用一个循环去检查某个变量的值,这其实是一种很严重的错误);

    2. 如果是使用通知的方式,效率则很高,因为执行部件几乎不需要做额外的操作。至于回调函数,其实和通知没太多区别。

    ##1.2 场景比喻## 举个例子,比如我去银行办理业务,可能会有两种方式:

    1. 选择排队等候;

    2. 另种选择取一个小纸条上面有我的号码,等到排到我这一号时由柜台的人通知我轮到我去办理业务了;

    第一种:前者(排队等候)就是同步等待消息通知,也就是我要一直在等待银行办理业务情况;

    第二种:后者(等待别人通知)就是异步等待消息通知。在异步消息处理中,等待消息通知者(在这个例子中就是等待办理业务的人)往往注册一个回调机制,在所等待的事件被触发时由触发机制(在这里是柜台的人)通过某种机制(在这里是写在小纸条上的号码,喊号)找到等待该事件的人。

    #2 阻塞与非阻塞# 阻塞和非阻塞这两个概念与程序(线程)等待消息通知(无所谓同步或者异步)时的状态有关。也就是说阻塞与非阻塞主要是程序(线程)等待消息通知时的状态角度来说的。 ##2.1 概念描述## 阻塞调用是指调用结果返回之前,当前线程会被挂起,一直处于等待消息通知,不能够执行其他业务。函数只有在得到结果之后才会返回。

    有人也许会把阻塞调用和同步调用等同起来,实际上它们是不同的。

    1. 对于同步调用来说,很多时候当前线程可能还是激活的,只是从逻辑上当前函数没有返回而已,此时,这个线程可能也会处理其他的消息。还有一点,在这里先扩展下:

    (a) 如果这个线程在等待当前函数返回时,仍在执行其他消息处理,那这种情况就叫做同步非阻塞;

    (b) 如果这个线程在等待当前函数返回时,没有执行其他消息处理,而是处于挂起等待状态,那这种情况就叫做同步阻塞;

    所以同步的实现方式会有两种:同步阻塞、同步非阻塞;同理,异步也会有两种实现:异步阻塞、异步非阻塞;

    1. 对于阻塞调用来说,则当前线程就会被挂起等待当前函数返回;

    非阻塞和阻塞的概念相对应,指在不能立刻得到结果之前,该函数不会阻塞当前线程,而会立刻返回。虽然表面上看非阻塞的方式可以明显的提高CPU的利用率,但是也带了另外一种后果就是系统的线程切换增加增加的CPU执行时间能不能补偿系统的切换成本需要好好评估

    ##2.2 场景比喻## 继续上面的那个例子,不论是排队还是使用号码等待通知,如果在这个等待的过程中,等待者除了等待消息通知之外不能做其它的事情,那么该机制就是阻塞的,表现在程序中,也就是该程序一直阻塞在该函数调用处不能继续往下执行。

    相反,有的人喜欢在银行办理这些业务的时候一边打打电话发发短信一边等待,这样的状态就是非阻塞的,因为他(等待者)没有阻塞在这个消息通知上,而是一边做自己的事情一边等待。

    但是需要注意了,同步非阻塞形式实际上是效率低下的,想象一下你一边打着电话一边还需要抬头看到底队伍排到你了没有。如果把打电话和观察排队的位置看成是程序的两个操作的话,这个程序需要在这两种不同的行为之间来回的切换,效率可想而知是低下的;而异步非阻塞形式却没有这样的问题,因为打电话是你(等待者)的事情,而通知你则是柜台(消息触发机制)的事情,程序没有在两种不同的操作中来回切换。

    #3 同步/异步与阻塞/非阻塞#

    1. 同步阻塞形式

    效率是最低的,

    拿上面的例子来说,就是你专心排队,什么别的事都不做。

    **实际程序中:**就是未对fd 设置O_NONBLOCK标志位的read/write 操作;

    1. 异步阻塞形式

    如果在银行等待办理业务的人采用的是异步的方式去等待消息被触发(通知),也就是领了一张小纸条,假如在这段时间里他不能离开银行做其它的事情,那么很显然,这个人被阻塞在了这个等待的操作上面;

    异步操作是可以被阻塞住的,只不过它不是在处理消息时阻塞,而是在等待消息通知时被阻塞。

    比如select 函数,假如传入的最后一个timeout参数为NULL,那么如果所关注的事件没有一个被触发,程序就会一直阻塞在这个select 调用处

    1. 同步非阻塞形式

    实际上是效率低下的,

    想象一下你一边打着电话一边还需要抬头看到底队伍排到你了没有,如果把打电话和观察排队的位置看成是程序的两个操作的话,这个程序需要在这两种不同的行为之间来回的切换,效率可想而知是低下的。

    很多人会写阻塞的read/write 操作,但是别忘了可以对fd设置O_NONBLOCK 标志位,这样就可以将同步操作变成非阻塞的了

    1. 异步非阻塞形式

    效率更高,

    因为打电话是你(等待者)的事情,而通知你则是柜台(消息触发机制)的事情,程序没有在两种不同的操作中来回切换

    比如说,这个人突然发觉自己烟瘾犯了,需要出去抽根烟,于是他告诉大堂经理说,排到我这个号码的时候麻烦到外面通知我一下(注册一个回调函数),那么他就没有被阻塞在这个等待的操作上面,自然这个就是异步+非阻塞的方式了。

    如果使用异步非阻塞的情况,比如aio_*组的操作,当发起一个aio_read操作时,函数会马上返回不会被阻塞,当所关注的事件被触发时会调用之前注册的回调函数进行处理

    很多人会把同步和阻塞混淆,我想是因为很多时候同步操作会以阻塞的形式表现出来,比如很多人会写阻塞的read/write操作,但是别忘了可以对fd设置O_NONBLOCK标志位,这样就可以将同步操作变成非阻塞的了。但最根本是因为没有区分这两个概念,比如阻塞的read/write操作中,其实是把消息通知机制和等待消息通知的状态结合在了一起,在这里所关注的消息就是fd是否可读/写,而等待消息通知的状态则是对fd可读/写等待过程中程序(线程)的状态。当我们将这个fd设置为非阻塞的时候,read/write操作就不会在等待消息通知这里阻塞,如果fd不可读/写则操作立即返回。

    同样的,很多人也会把异步和非阻塞混淆,因为异步操作一般都不会在真正的IO操作处被阻塞,比如如果用select函数,当select返回可读时再去read一般都不会被阻塞,而是在select函数调用处阻塞

    #4 小明的故事# 对上面所讲的概念再次进行一个场景梳理,上面已经明确说明,同步/异步关注的是消息通知的机制,而阻塞/非阻塞关注的是程序(线程)等待消息通知时的状态。以小明下载文件打个比方,从这两个关注点来再次说明这两组概念,希望能够更好的促进大家的理解。

    1. 同步阻塞:小明一直盯着下载进度条,到 100% 的时候就完成。

    同步体现在:等待下载完成通知;

    阻塞体现在:等待下载完成通知过程中,不能做其他任务处理;

    1. 同步非阻塞:小明提交下载任务后就去干别的,每过一段时间就去瞄一眼进度条,看到 100% 就完成。

    同步体现在:等待下载完成通知;

    非阻塞体现在:等待下载完成通知过程中,去干别的任务了,只是时不时会瞄一眼进度条;【小明必须要在两个任务间切换,关注下载进度】

    1. 异步阻塞:小明换了个有下载完成通知功能的软件,下载完成就“叮”一声。不过小明仍然一直等待“叮”的声音(看起来很傻,不是吗)。

    异步体现在:下载完成“叮”一声通知;

    阻塞体现在:等待下载完成“叮”一声通知过程中,不能做其他任务处理;

    1. 异步非阻塞:仍然是那个会“叮”一声的下载软件,小明提交下载任务后就去干别的,听到“叮”的一声就知道完成了。

    异步体现在:下载完成“叮”一声通知;

    非阻塞体现在:等待下载完成“叮”一声通知过程中,去干别的任务了,只需要接收“叮”声通知即可;【软件处理下载任务,小明处理其他任务,不需关注进度,只需接收软件“叮”声通知,即可】

    也就是说,同步/异步是“下载完成消息”通知的方式(机制),而阻塞/非阻塞则是在等待“下载完成消息”通知过程中的状态(能不能干其他任务),在不同的场景下,同步/异步、阻塞/非阻塞的四种组合都有应用。

    所以,综上所述,同步和异步仅仅是关注的消息如何通知的机制,而阻塞与非阻塞关注的是等待消息通知时的状态。也就是说,同步的情况下,是由处理消息者自己去等待消息是否被触发,而异步的情况下是由触发机制来通知处理消息者,所以在异步机制中,处理消息者和触发机制之间就需要一个连接的桥梁

    在银行的例子中,这个桥梁就是小纸条上面的号码。

    在小明的例子中,这个桥梁就是软件“叮”的声音。

    最后,请大家注意理解“消息通知机制”和“等待消息通知时的状态”这两个概念,这是理解四个概念的关键所在。


    展开全文
  • Nova中的RPC

    2017-04-17 16:12:40
    远程过程调用RPC,Remote Procedure Call)通过远程过程调用,一个服务可以调用其他远程服务进程的方法,并且有两种调用方式:callcast。通过call调用,远程方法会被同步执行,调用者会被阻塞知道结果返回;通过...

    Openstack遵循这样的设计原则:项目之间用RESTful API通信;项目内部不同服务之间必须经过消息总线。
    #远程过程调用(RPC,Remote Procedure Call)
    通过远程过程调用,一个服务可以调用其他远程服务进程的方法,并且有两种调用方式:call和cast。通过call调用,远程方法会被同步执行,调用者会被阻塞知道结果返回;通过cast的方式调用,远程方法会被异步执行,结果并不会立即返回,调用者也不会被阻塞,但是调用者需要利用其他方式查询这次远程调用的结果。
    #怎样理解阻塞非阻塞与同步异步的区别(https://www.zhihu.com/question/19732473)

    同步与异步

    同步和异步关注的是消息通信机制 (synchronous communication/ asynchronous communication)所谓同步,就是在发出一个调用时,在没有得到结果之前,该调用就不返回。但是一旦调用返回,就得到返回值了。换句话说,就是由调用者主动等待这个调用的结果。而异步则是相反,调用在发出之后,这个调用就直接返回了,所以没有返回结果。换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果。而是在调用发出后,*被调用者 *通过状态、通知来通知调用者,或通过回调函数处理这个调用。典型的异步编程模型比如Node.js举个通俗的例子:你打电话问书店老板有没有《分布式系统》这本书,如果是同步通信机制,书店老板会说,你稍等,”我查一下",然后开始查啊查,等查好了(可能是5秒,也可能是一天)告诉你结果(返回结果)。而异步通信机制,书店老板直接告诉你我查一下啊,查好了打电话给你,然后直接挂电话了(不返回结果)。然后查好了,他会主动打电话给你。在这里老板通过“回电”这种方式来回调。

    阻塞与非阻塞

    阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态.阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。还是上面的例子,你打电话问书店老板有没有《分布式系统》这本书,你如果是阻塞式调用,你会一直把自己“挂起”,直到得到这本书有没有的结果,如果是非阻塞式调用,你不管老板有没有告诉你,你自己先一边去玩了, 当然你也要偶尔过几分钟check一下老板有没有返回结果。在这里阻塞与非阻塞与是否同步异步无关。跟老板通过什么方式回答你结果无关。

    引用说明:
    作者:严肃
    链接:https://www.zhihu.com/question/19732473/answer/20851256
    来源:知乎
    #消息队列
    Openstack所支持的消息总线类型中大部分都是基于AMQP(高级消息队列协议)
    ##消息队列相关架构
    ###AMQP架构
    AMQP是一个异步消息传递所使用的开放的应用层协议,主要包括消息的导向、队列、路由、可靠性和安全性。通过定义消息在网络上传输时的字节流格式,不同的具体AMQP实现之间可以进行互操作,AMQP的架构如图1-1

    AMQP架构
    图1-1

    AMQP可以看成三个组件,发布者、中间件、订阅者。

    • 发布者:消息发布方
    • 中间件:包括消息的路由、交换(Exchange实现),消息的存储(queue中存储)
    • 订阅者:消息接收方
      简单来说,发布者把消息发送给exchange,Exchange将详细存储到queue中,订阅者从queue中获取消息。下面细分一下。

    AMQP模型的四个重要角色:

    • Exchange:根据 Routing key 转发消息到对应的 Message Queue 中
      • direct: binding key 和 routing key 必须完全一致,不支持通配符
      • topic: 同direct类型,但支持通配符,“*” 匹配一个单子,“#” 匹配一个零个或者多个单字,单字之间由"."分隔
      • fanout: 忽略binding key和routing key,消息会被传递到所有绑定的队列上
    • Routing key:用于 Exchange 判断哪些消息需要发送对应的 Message Queue
    • Publisher:消息发送者,将消息发送的 Exchange 并指明 Routing Key,以便 Message Queue 可以正确的收到消息
    • Consumer:消息接受者,从 Message Queue 获取消息

    消息发布者 Publisher 将 Message 发送给 Exchange 并且说明 Routing Key。Exchange 负责根据 Message 的 Routing Key 进行路由,将 Message 正确地转发给相应的 Message Queue。监听在 Message Queue 上的 Consumer 将会从 Queue 中读取消息。
    Routing Key 是 Exchange 转发信息的依据,因此每个消息都有一个 Routing Key 表明可以接受消息的目的地址,而每个 Message Queue 都可以通过将自己想要接收的 Routing Key 告诉 Exchange 进行 binding,这样 Exchange 就可以将消息正确地转发给相应的 Message Queue。

    基于AMQP实现RPC如图1-2:

    在这里插入图片描述
    图1-2

    • Exchange:消息交换,客户端发送一个请求消息给Exchange,指定routing key为“op_queue”,同时指明一个消息队列名来获取响应,图中为“res_queue”。
    • Exchange把此消息转发到消息队列op_queue。
    • 消息队列op_queue把消息退给服务端,服务端执行此RPC调用所对应的任务。执行结束后,服务端把响应结果发送给消息队列,指明routing key为“res_queue”。
    • Exchange把此消息转发到消息队列res_queue。
    • 客户端从消息队列res_queue中获取相应。

    RPC.cast请求

    如图2-1(参考https://docs.openstack.org/developer/nova/rpc.html ):

    • 请求发起者的消息被实例化之后发往消息队列系统
    • 一旦消息被发送Exchange,Exchange会根据routing key发送给指定的消费者读取,并传递给请求处理者

    这里写图片描述
    图2-1

    源码分析

    Nova RPC

    Nova中各服务间的通信是基于AMQP实现的RPC机制,其中nova-compute、nova-conductor和nova-scheduler在启动时都会注册一个RPC Server,而nova-api因为Nova内部没有服务去调用他提供的接口,故无需注册。
    以nova-compute为例:

    # nova/compute/rpcapi.py
    class ComputeAPI(object):
      def __init__(self):
        super(ComputeAPI, self).__init__()
        target = messaging.Target(topic=CONF.compute_topic, version='4.0')
        version_cap = self.VERSION_ALIASES.get(CONF.upgrade_levels.compute,
                                               CONF.upgrade_levels.compute)
        serializer = objects_base.NovaObjectSerializer()
        self.client = self.get_client(target, version_cap, serializer)
        #self.client会在下面live_migration方法中用到
    
        def get_client(self, target, version_cap, serializer):
        #__init__方法中有调用get_client方法,这个方法是获取目标机的RPC client信息
        return rpc.get_client(target,
                              version_cap=version_cap,
                              serializer=serializer)
    
      def live_migration(self, ctxt, instance, dest, block_migration, host,
                       migration, migrate_data=None):
        #nova-compute对nova其他服务提供的远程可调用RPC接口,这儿以live_migration方法实现。
        args = {'migration': migration}
        version = '4.2'
        if not self.client.can_send_version(version):
            version = '4.0'
            args.pop('migration')
        cctxt = self.client.prepare(server=host, version=version)
        #调用__init__方法中的self.client获取目标机RPC Client信息,prepare为python元类属性
        cctxt.cast(ctxt, 'live_migration', instance=instance,
                   dest=dest, block_migration=block_migration,
                   migrate_data=migrate_data, **args)
        #RPC cast主要用于异步形式,例如创建虚拟机,在创建过程中可能需要较长时间,
        #如果使用RPC call显然对性能有很大影响。cast()的第二个参数是RPC调用的函数名,
        #后面的参数将作为参数传入该函数。
        #cast 函数未在nova/compute/rpcapi.py中以及rpcapi.py所import的包中找到,
        #但只在nova/compute/cells_api.py中有找到,此处不明晰,特希大牛指点。
    

    ComputeAPI中的函数就是Compute服务提供给RPC调用的端口,其他服务调用前需首先import这个模块(在 Nova Project 中大多数的服务都提供了 API 或 RPC API 的实现文件,这些 API 是服务进程自身为了能被其他的服务进程访问所提供出来的一种接口,当别的服务进程希望影响另一个服务进程时,就可以通过 import 另一个服务进程的 rpcapi 就可以实现了。)

    其他服务对Nova RPC的调用(以nova-conductor通过RPC通知nova-compute创建虚拟机为例)

    流程如图2-2
    在这里插入图片描述
    图2-2

    • nova-compute提供给Nova其他组件调用的rpcapi接口。nova-conductor通过import nova-compute的compute-rpcapi或者通过实例传参来compute_rpc对象。这样 nova-conductor 就拥有了通过 RPC 访问 nova-compue 的能力。
    • 在 nova-conductor 的代码实现中调用了 rpcapi 模块的方法,即 nova-conductor发送了一个请求到 Queue,并等待 nova-compute 接受和响应。
    • nova-compute 接收到 nova-conductor 的请求,调用nova-compute的manage模块来处理这个请求。
    #nova/nova-conductor/tasks/live_migrate.py
    class LiveMigrationTask(base.TaskBase):
        def __init__(self, context, instance, destination,
                     block_migration, disk_over_commit, migration, compute_rpcapi,
                     servicegroup_api, scheduler_client):
            super(LiveMigrationTask, self).__init__(context, instance)
            self.destination = destination
            self.block_migration = block_migration
            self.disk_over_commit = disk_over_commit
            self.migration = migration
            self.source = instance.host
            self.migrate_data = None
    
            self.compute_rpcapi = compute_rpcapi
            self.servicegroup_api = servicegroup_api
            self.scheduler_client = scheduler_client
    
        def _execute(self):
            self._check_instance_is_active()
            self._check_host_is_up(self.source)
    
            if not self.destination:
                self.destination = self._find_destination()
                self.migration.dest_compute = self.destination
                self.migration.save()
            else:
                self._check_requested_destination()
    
            # TODO(johngarbutt) need to move complexity out of compute manager
            # TODO(johngarbutt) disk_over_commit?
            #调用 ComputeAPI 类中的 live_migration() RPC接口,以RPC的方式发出一个请求到Queue再被nova-compute接收
            return self.compute_rpcapi.live_migration(self.context,
                    host=self.source,
                    instance=self.instance,
                    dest=self.destination,
                    block_migration=self.block_migration,
                    migration=self.migration,
                    migrate_data=self.migrate_data)
    

    ComputeAPI类只是暴露给其他服务的RPC调用接口,Compute的RPC Server接受到RPC请求后,真正完成任务的是nova.compute.manager模块。

    nova.compute.manager

    nova.compute.manager 会一直在监听 Queue ,当Queue中存在相关的 RPC 请求时,实际上是由 manager 来实现的。

    #nova/compute/manager.py
    class ComputeManager(manager.Manager):
      def live_migration(self, context, dest, instance, block_migration,
                       migration, migrate_data):
        """Executing live migration.
    
        :param context: security context
        :param dest: destination host
        :param instance: a nova.objects.instance.Instance object
        :param block_migration: if true, prepare for block migration
        :param migration: an nova.objects.Migration object
        :param migrate_data: implementation specific params
    
        """
    
        # NOTE(danms): Remove these guards in v5.0 of the RPC API
        if migration:
            migration.status = 'queued'
            migration.save()
    
        def dispatch_live_migration(*args, **kwargs):
            with self._live_migration_semaphore:
                self._do_live_migration(*args, **kwargs)
    
        # NOTE(danms): We spawn here to return the RPC worker thread back to
        # the pool. Since what follows could take a really long time, we don't
        # want to tie up RPC workers.
        utils.spawn_n(dispatch_live_migration,
                      context, dest, instance,
                      block_migration, migration,
                      migrate_data)  
    

    从ComputeAPI到ComputeManage的过程即是RPC调用过程。

    整个过程 wsgi接受到请求去找resource resource会具体实现动作
    resource-> compute.api->conductor.api->conductor.rpcapi (nova-api开始rpc调用nova-Conductor)
    Conductor.manager接到调用开始处理 Conductor针对live-migration有个单独的tasks
    Conductor rpc调用nova -compute
    nova-compute manager接到调用开始处理 将请求传给后端driver也就是libvirt migration进行迁移操作

    总结:

    rpc调用中,
    首先要有rpcapi
    其次调用时不能直接调用rpcapi,api会对rpcapi进行封装,暴露给外面的其实是api

    调用端: 调用的api–>要调用的api-要调用的rpcapi
    被调用端: 对应的manager接收调用请求,并对调用行为进行实际动作

    参考:
    http://blog.csdn.net/jmilk/article/details/52116458
    《Openstack设计与实现》
    http://blog.csdn.net/li_101357/article/details/52841107

    展开全文
  • openstack中的消息队列 1 openstack中消息队列的使用 nova中的每个组件都会连接消息服务器,一个组件...发送消息有两种方式:同步调用rpc.call和异步调用rpc.cast openstack内部一些对象: l Topic Publisher:...
  • 微服务简介

    2020-10-10 20:25:36
    微服务的定义: 将一个单体应用拆分为一组微小的服务组件,每个微小的服务组件运行在...IPC有两种实现方式:同步过程调用和异步消息调用。 同步过程调用的具体实现中,有一种实现方式为RPC(远程过程调用)的通信方式
  • 微服务架构特点

    千次阅读 2017-09-20 13:19:25
    1、将“巨石”服务,解耦成简单独立服务 2、独立进程 3、服务间通信简单 ...服务间调用:使用同步请求或者异步请求,同步请求包括restful、RPC(remote procedure call)。异步请求包括kafka、notifymetaQ 服务
  • 支付业务分析 1. 用户请求支付系统 2. 支付系统调用第三方支付平台API进行发起...Zookeeper(管理和同步分布式应用中的文件) RocketMQ(异步解耦,分布式事务的数据一致性,消息的顺序收发) Redis Mysql 5. 支付业务 5.1
  • openstack各项目之间通过restful api进行通信,相同项目内不同组件进程组件之间通过消息总线进行...1、RPC(remote procedure call)主要包含callcast两种(call:同步调用;cast:异步接口)。 2、事件通知(e...
  • 原文链接:https://my.oschina.net/xianggao/blog/654677#0 系列目录#聊聊远程通信Java远程通讯技术及原理分析聊聊Socket、TCP/IP、HTTP、FTP及网络编程RMI原理及实现RPC原理...API对比与分析聊聊同步异步、阻塞与...
  • 聊聊Linux 五种IO模型

    2016-06-30 11:03:35
    0 系列目录 ...RPC 框架使用 RMI + ZooKeeper 实现远程调用框架深入浅出SOA思想微服务、SOA API对比与分析聊聊同步异步、阻塞与非阻塞聊聊Linux 五种IO模型聊聊IO多路复用之select、poll、epol
  • java开源包1

    千次下载 热门讨论 2013-06-28 09:14:34
    4、同步调用; 5、超时机制; 6、支持多种通信框架(Mina/Netty/Grizzly),支持多种序列化/反序列化(Java/Hessian/PB); 7、支持自定义通信协议,可完全替换NFS-RPC自带的协议。 淘宝开放平台JAVA版SDK top4...
  • java开源包12

    热门讨论 2013-06-28 10:14:45
    4、同步调用; 5、超时机制; 6、支持多种通信框架(Mina/Netty/Grizzly),支持多种序列化/反序列化(Java/Hessian/PB); 7、支持自定义通信协议,可完全替换NFS-RPC自带的协议。 淘宝开放平台JAVA版SDK top4...
  • Java资源包01

    2016-08-31 09:16:25
    4、同步调用; 5、超时机制; 6、支持多种通信框架(Mina/Netty/Grizzly),支持多种序列化/反序列化(Java/Hessian/PB); 7、支持自定义通信协议,可完全替换NFS-RPC自带的协议。 淘宝开放平台JAVA版SDK top4...
  • java开源包101

    2016-07-13 10:11:08
    4、同步调用; 5、超时机制; 6、支持多种通信框架(Mina/Netty/Grizzly),支持多种序列化/反序列化(Java/Hessian/PB); 7、支持自定义通信协议,可完全替换NFS-RPC自带的协议。 淘宝开放平台JAVA版SDK top4...
  • java开源包11

    热门讨论 2013-06-28 10:10:38
    4、同步调用; 5、超时机制; 6、支持多种通信框架(Mina/Netty/Grizzly),支持多种序列化/反序列化(Java/Hessian/PB); 7、支持自定义通信协议,可完全替换NFS-RPC自带的协议。 淘宝开放平台JAVA版SDK top4...
  • java开源包6

    热门讨论 2013-06-28 09:48:32
    4、同步调用; 5、超时机制; 6、支持多种通信框架(Mina/Netty/Grizzly),支持多种序列化/反序列化(Java/Hessian/PB); 7、支持自定义通信协议,可完全替换NFS-RPC自带的协议。 淘宝开放平台JAVA版SDK top4...
  • java开源包10

    热门讨论 2013-06-28 10:06:40
    4、同步调用; 5、超时机制; 6、支持多种通信框架(Mina/Netty/Grizzly),支持多种序列化/反序列化(Java/Hessian/PB); 7、支持自定义通信协议,可完全替换NFS-RPC自带的协议。 淘宝开放平台JAVA版SDK top4...
  • java开源包8

    热门讨论 2013-06-28 09:55:26
    4、同步调用; 5、超时机制; 6、支持多种通信框架(Mina/Netty/Grizzly),支持多种序列化/反序列化(Java/Hessian/PB); 7、支持自定义通信协议,可完全替换NFS-RPC自带的协议。 淘宝开放平台JAVA版SDK top4...
  • java开源包9

    热门讨论 2013-06-28 09:58:55
    4、同步调用; 5、超时机制; 6、支持多种通信框架(Mina/Netty/Grizzly),支持多种序列化/反序列化(Java/Hessian/PB); 7、支持自定义通信协议,可完全替换NFS-RPC自带的协议。 淘宝开放平台JAVA版SDK top4...
  • java开源包7

    热门讨论 2013-06-28 09:52:16
    4、同步调用; 5、超时机制; 6、支持多种通信框架(Mina/Netty/Grizzly),支持多种序列化/反序列化(Java/Hessian/PB); 7、支持自定义通信协议,可完全替换NFS-RPC自带的协议。 淘宝开放平台JAVA版SDK top4...
  • COM 与 COM+ 从入门到精通 书籍

    热门讨论 2008-09-09 16:06:30
    RPC(RemoteProcedureCall,远程过程调用) 调动 数据传递 DCOM配置实用程序 DCOM应用程序的安全机制 验证 授权 加密 整性检查 小结 第9章 用VC++建立DCOM服务器 标准与自定义调动 标准调动 自定...
  • asciimatics:跨平台,全屏终端包(即鼠标/键盘输入彩色,定位文本输出),完整的复杂动画特殊效果的高级 API。 alive-progress:一款新的进度条,具有实时吞吐量信息以及非常酷的动画。 colorama:跨平台彩色...
  • 在有状态SessionBean中,用累加器,以对话状态存储起来,创建EJB对象,并将当前的计数器初始化,调用每一个EJB对象的count()方法,保证Bean正常被激活钝化,EJB对象是用完毕,从内存中清除…… Java Socket 聊天...
  • JAVA上百实例源码以及开源项目

    千次下载 热门讨论 2016-01-03 17:37:40
    6个目标文件,EJB来模拟银行ATM机的流程及操作:获取系统属性,初始化JNDI,取得Home对象的引用,创建EJB对象,并将当前的计数器初始化,调用每一个EJB对象的count()方法,保证Bean正常被激活钝化,EJB对象是用...

空空如也

空空如也

1 2 3
收藏数 46
精华内容 18
关键字:

rpc异步调用和同步调用api