精华内容
下载资源
问答
  • subscribe(订阅) pipe方法用于链接可观察的运算符,而subscribe方法用于激活可观察的并监听发射的值. ** subscribe完整的函数**如下: 下面完整的包含3个函数的对象被称为observer(观察者),表示的是对序列结果的...

    angular官方文档RxJS 库
    RxJS库官方文档

    RxJS库

    RxJS是ReactiveX编程理念的JavaScript版本。ReactiveX来自微软,它是一种针对异步数据流的编程。简单来说,它将一切数据,包括HTTP请求,DOM事件或者普通数据等包装成流的形式,然后用强大丰富的操作符对流进行处理,使你能以同步编程的方式处理异步数据,并组合不同的操作符来轻松优雅的实现你所需要的功能。

    响应式编程是一种面向数据流和变更传播的异步编程范式(Wikipedia)。RxJS(响应式扩展的 JavaScript 版)是一个使用可观察对象进行响应式编程的库,它让组合异步代码和基于回调的代码变得更简单。

    RxJS 提供了一种对 Observable 类型的实现,直到 Observable 成为了 JavaScript 语言的一部分并且浏览器支持它之前,它都是必要的。这个库还提供了一些工具函数,用于创建和使用可观察对象。这些工具函数可用于:

    • 把现有的异步代码转换成可观察对象
    • 迭代流中的各个值
    • 把这些值映射成其它类型
    • 对流进行过滤
    • 组合多个流

    2.安装

    npm install rxjs
    

    3. 方法

    方法作用
    of(…args)可以将普通JavaScript数据转为可观察序列
    fromPromise(promise)将Promise转化为Observable
    fromEvent(elment, eventName)从DOM事件创建序列,例如Observable.fromEvent($input, ‘click’)
    ajax(urlAjaxRequest)
    create(subscribe)这个属于万能的创建方法,一般用于只提供了回调函数的某些功能或者库,在你用这个方法之前先想想能不能用RxJS上的类方法来创建你所需要的序列
    import {of,fromPromise,fromEvent,Observable} from 'rxjs'
    import { ajax } from 'rxjs/ajax';
    const nums = of(1, 2, 3); //将普通JavaScript数据转为可观察序列
    
    //实际上Observable可以认为是加强版的Promise,它们之间是可以通过RxJS的API互相转换的
    const ob = Observable.fromPromise(somePromise); // Promise转为Observable
    const promise = someObservable.toPromise();        // Observable转为Promise
    
    //从DOM事件创建序列
    const el = document.getElementById('my-element')!;
    const mouseMoves = fromEvent<MouseEvent>(el, 'mousemove');
    
    //发送http请求
    const apiData = ajax('/api/data');
    
    //
    Observable.create((observer) => {})
    
    类别操作
    创建from, fromPromise,fromEvent, of,ajax
    组合combineLatest, concat, merge, startWith , withLatestFrom, zip
    过滤debounceTime, distinctUntilChanged, filter, take, takeUntil
    转换bufferTime, concatMap, map, mergeMap, scan, switchMap
    工具tap
    多播share

    4. 创建可观察对象的函数

    RxJS 提供了一些用来创建可观察对象的函数。这些函数可以简化根据某些东西创建可观察对象的过程,比如事件、定时器、承诺等等。

    import {of } from 'rxjs'
    
    
    //创建一个Observable观察序列 
    //ob作为源会每隔1000ms发射一个递增的数据,即0 -> 1 -> 2
    const ob= interval(1000);
    
    
    //take(3)表示只取源发射的前3个数据,取完第3个后关闭源的发射
    //map表示将流中的数据进行映射处理,这里我们将数据翻倍
    //filter表示过滤掉出符合条件的数据,这里只有第三个数据会留下来
    //前面已经使用同步编程创建好了一个流的处理过程,但此时ob作为源并不会立刻发射数据,如果我们在map中打印n是不会得到任何输出的,因为ob作为Observable序列必须被“订阅”才能够触发上述过程,也就是subscribe(发布/订阅模式)。
    ob.take(3).map(n => n * 2).filter(n => n > 0).subscribe(n => console.log(n));
    

    4. subscribe(订阅)

    pipe方法用于链接可观察的运算符,而subscribe方法用于激活可观察的并监听发射的值.

    ** subscribe完整的函数**如下:
    下面完整的包含3个函数的对象被称为observer(观察者),表示的是对序列结果的处理方式。 通过Observable的subscribe函数,观察者去订阅可观察者的消息。

    • next表示数据正常流动,没有出现异常;
    • error表示流中出错,可能是运行出错,http报错等等;
    • complete表示流结束,不再发射新的数据

    ** 特点:**

    • 在一个流的生命周期中,error和complete只会触发其中一个,可以有多个next(表示多次发射数据),直到complete或者error。
    • observer.next可以认为是Promise中then的第一个参数,observer.error对应第二个参数或者Promise的catch。
    • RxJS同样提供了catch操作符,err流入catch后,catch必须返回一个新的Observable。被catch后的错误流将不会进入observer的error函数,除非其返回的新observable出错。
    ob.subscribe({
        next: d => console.log(d),
        error: err => console.error(err),
        complete: () => console.log('end of the stream')
    })
    
    import { of } from 'rxjs';
    
    // RxJS同样提供了catch操作符,err流入catch后,catch必须返回一个新的Observable。被catch后的错误流将不会进入observer的error函数,除非其返回的新observable出错
    of(1).map(n => n*n).catch(err => {
        // 此处处理catch之前发生的错误
        return of(0); // 返回一个新的序列,该序列成为新的流。
    });
    

    ** subscribe简写方式 **:传入一个函数,会被当做next函数

    6.pipe方法

    import {of} from 'rxjs'
    const ob = of(1,2,3);
    const newOb = ob.pipe(
    	tap(num=>console.log(num)),
    	map(num=>'hello:'+num)
    );
    newOb.subscribe(data=>console.log(data))
    

    map方法返回一个新的函数,
    pipe函数使用闭包返回函数,函数里使用reduce方法把前一个操作的结果作为后一个操作的输入值。
    刚开始tap和map里的函数还未执行,直到subscribe方法调用,传入的tap和map里的箭头函数才开始执行。

    对于序列里的1,2,3,先执行管道里的tap和map操作,再把map操作的输出,作为输入去执行subscribe里指定的回调。

    在这里插入图片描述

    7.使用RsJS实现搜索功能

    搜索是前端开发中很常见的功能:监听的keyup事件,然后将内容发送到后台,并展示后台返回的数据。

    正常实现代码会存在以下2个问题:

    • 多余的请求:
      当想搜索“爱迪生”时,输入框可能会存在三种情况,“爱”、“爱迪”、“爱迪生”。而这三种情况将会发起 3 次请求,存在 2 次多余的请求。

    • 已无用的请求仍然执行:
      一开始搜了“爱迪生”,然后马上改搜索“达尔文”。结果后台返回了“爱迪生”的搜索结果,执行渲染逻辑后结果框展示了“爱迪生”的结果,而不是当前正在搜索的“达尔文”,这是不正确的。

    减少多余请求次数:使用防抖;
    **已无用的请求仍然执行 **的解决方式,可以在发起请求前声明一个当前搜索的状态变量,后台将搜索的内容及结果一起返回,前端判断返回数据与当前搜索是否一致,一致才走到渲染逻辑。
    最终为以下代码:

    <input id="text"></input>
    <script>
       var text = document.querySelector('#text'),
            timer = null,
            currentSearch = '';
    
        text.addEventListener('keyup', (e) =>{
            clearTimeout(timer);// 在 250 毫秒内进行其他输入,则清除上一个定时器
            // 定时器,在 250 毫秒后触发
            timer = setTimeout(() => {
                // 声明一个当前所搜的状态变量
                currentSearch = '书'; 
    
                var searchText = e.target.value;
                $.ajax({
                    url: `/search/${searchText}`,
                    success: data => {
                        // 判断后台返回的标志与我们存的当前搜索变量是否一致
                        if (data.search === currentSearch) {
                            // 渲染展示
                            render(data);
                        } else {
                            // ..
                        }
                    }           
                });
            },250)
        })
    </script>
    
    

    上面代码基本满足需求,但代码开始显得乱糟糟。我们来使用RxJS实现上面代码功能,如下:

    import { fromEvent } from 'rxjs';
    var text = document.querySelector('#text');
    var inputStream = fromEvent(text, 'keyup') //为dom元素绑定'keyup'事件
                        .debounceTime(250) // 防抖动
                        .pluck('target', 'value') // 取值
                        .switchMap(url => Http.get(url)) // 将当前输入流替换为http请求
                        .subscribe(data => render(data)); // 接收数据
    

    RxJS能简化你的代码,它将与流有关的内部状态封装在流中,而不需要在流外定义各种变量来以一种上帝视角控制流程。Rx的编程方式使你的业务逻辑流程清晰,易维护,并显著减少出bug的概率。

    展开全文
  • SIP 请求方法(5)-SUBSCRIBE& NOTIFY

    千次阅读 2020-12-28 15:21:21
    UA 使用SUBSCRIBE方法来建立订阅关系,以获取特定事件的通知(通过NOTIFY方法),SUBSCRIBE和NOTIFY都定义于RFC6665。订阅成功后在UAC和UAS间建立一个dialog。订阅请求包含一个Expires头域,它说明订阅存在的持续时间...

            前面介绍了RFC3261里定义的六种SIP方法。接下来,我们看看SIP扩展的方法有哪些。

    SUBSCRIBE

             UA 使用SUBSCRIBE方法来建立订阅关系,以获取特定事件的通知(通过NOTIFY方法),SUBSCRIBE和NOTIFY都定义于RFC6665。订阅成功后在UAC和UAS间建立一个dialog。订阅请求包含一个Expires头域,它说明订阅存在的持续时间。期满之后,订阅关系自动终止。期满之前,可以再发一条SUBSCRIBE请求来刷新。服务端接受订阅时返回一条200 OK应答,它也包含一个Expires头域。限定的时间可以和请求一致,服务端可以缩短时间,不能延长时间。SIP中没有定义UNSUBSCRIBE方法,如果需要提前终止订阅,还是要通过SUBSCRIBE方法完成,Expires:0就表示终止。订阅终止时(不论是时间到期还是通过请求提前终止),都需要最后发一条NOTIFY消息来说明订阅已经终止。对SUBSCRIBE请求回应202 Accepted,并不表示订阅已经通过授权,它仅说明服务端已经了解订阅信息。

            下图显示了订阅的基本流程。客户端发出SUBSCRIBE请求,收到成功应答。服务端事件触发时,客户端会收到NOTIFY。在订阅到期之前,客户端重新发SUBSCRIBE请求来展期,以便获知更多通知。

            注意:客户端在收到SUBSCRIBE对应的200 OK应答之前,就应该准备好接收NOTIFY。此外,因为下游可能有分支,客户端必须准备好接收不同服务端的NOTIFY。尽管SUBSCRIBE只会收到一条对应的200 OK应答,但是,由于NOTIFY的To tag不同,因此需要建立独立的dialog。

     

    SUBSCRIBE 与 NOTIFY的交换流程题

    SUBSCRIBE request请求消息实例:

     

    SUBSCRIBE sip:ptolemy@rosettastone.example.com SIP/2.0
    Via SIP/2.0/UDP proxy.elasticity.example.org:5060;branch=z9hG4bK348471123
    Via SIP/2.0/UDP parlour.elasticity.example.org:5060;branch=z9hG4bKABDA ;received=192.0.3.4
    Max-Forwards: 70
    To: <sip:Ptolemy@rosettastone.example.com>
    From: Thomas Young <sip:tyoung@elasticity.example.org>;tag=1814
    Call-ID: 452k59252058dkfj34924lk34
    CSeq: 3412 SUBSCRIBE
    Allow-Events: dialog
    Contact: <sip:tyoung@parlour.elasticity.example.org>
    Event: dialog
    Content-Length: 0

            Event头域中说明订阅的事件类型,它是SUBSCRIBE请求的必要头域。每个SIP事件框架应用程序都通过一个唯一的事件tag定义数据包。每个数据包都定义了以下内容:

     

    • 默认订阅到期时间

    • 预期的SUBSCRIBE消息体

    • 什么事件触发NOTIFY 的发送,以及NOTIFY中预期的消息体

    • NOTIFY中包含完整的状态或增量

    • 最大通知率

     

            有一个协议叫PSTN与互联网互通(PSTN and Internet Interworking (PINT),RFC2848) ,它定义了SUBSCRIBE、NOTIFY和UNSUBSCRIBE方法,其语义与SIP相似。PINT请求中没有Event头域,服务器可以据此区分PINT SUBSCRIBE请求与SIP SUBSCRIBE。服务器应该通过Allow-Events头域说明它所支持的事件包。

            如果在dialog内发SUBSCRIBE刷新但收到481 Dialog Does Not Exist应答,这说明服务端已经中止订阅。客户端应该认为dialog与订阅已经终止,如果需要,可以发SUBSCRIBE建立新的订阅dialog。

    packet一种特殊的类型描述,它说明event消息里的事件类型。下表列出当前定义的SIP event与packet。

     

    SIP 事件包

    event packet name

    用途

    规范

    call-completion

    Call Completion [8]

    RFC 6910

    certificate

    Certificate (public key) [9]

    RFC 6072

    credential

    Credential (private key) [9]

    RFC 6072

    conference

    Conferencing [10]

    RFC 4579

    consent-pending-additions

    Consent Framework [11]

    RFC 5362

    dialog

    SIP Dialog Information [12]

    RFC 4235

    http-monitor

    Web Page Monitoring [13]

    RFC 5989

    kpml

    Key Press Markup Language[14]

    RFC 4730

    load-control

    Load Filtering Policy [15]

    RFC 7200

    message-summary

    Voicemail [16]

    RFC 3842

    poc-settings

    Push-to-Talk over Cellular [17]

    RFC 4354

    presence

    Presence [18]

    RFC 3845

    reg

    Registration [19]

    RFC 3680

    refer

    Refer [20]

    RFC 3515

    session-spec-policy

    Session-Specific Policy [21]

    RFC 6795

    ua-profile

    User Agent Profile (Configuration) [22]

    RFC 6808

    vq-rtcpxr

    RTCP VoIP Summary [23]

    RFC 6035

    winfo

    Watcher template [7]

    RFC 3857

    xcap-diff

    Changes in XCAP Files [24]

    RFC 5875

     

    SUBSCRIBE请求的必要头域

    SUBSCRIBE请求的必要头域
    Via
    To
    From
    Call-ID
    CSeq
    Max-Forwards
    Contact
    Event
    Allow-Events

     

    NOTIFY

            UA使用NOTIFY方法来传递某种特定事件发生的信息。NOTIFY通常在订阅者与能和者之间建立的dialog内发送。然而,可以使用非SIP方法建立订阅(不发SUBSCRIBE请求),还可以通过其它SIP请求建立隐式的订阅(比如说,REFER请求所建立的隐式订阅)。因为它是dialog内请求,所以NOTIFY携带To tag, From tag, 和 Call-ID。

            NOTIFY请求通常会收到200 OK应答,它说明消息已经被接收。如果收到481 Dialog/Transaction Does Not Exist应答,那么订阅自动终止,不再发后续NOTIFY消息。

            NOTIFY请求包含一个Event头域,它说明事件包类型;此外,Subscription-State头域说明当前的订阅状态。Event头域携带的是订阅时指定的事件名。Subscription-State头域可选值是active, pending, 或 terminated。

            订阅开始与结束时,总是要发一条NOTIFY消息。如果NOTIFY消息包含增量状态信息,那么消息体中应该有状态版本号,每发一条NOTIFY消息,这个版本号加1。通过这种方式,NOTIFY消息的接收方就能识别出信息丢失或接收乱序。

    NOTIFY请求消息实例:

    NOTIFY sip:tyoung@parlour.elasticity.example.org SIP/2.0
    Via SIP/2.0/UDP cartouche.rosettastone.example.com:5060;branch=z9hG4bK3841323
    Max-Forwards: 70
    To: Thomas Young <sip:tyoung@elasticity.example.org>;tag=1814
    From: <sip:ptolemy@rosettastone.example.com>;tag=5363956k
    Call-ID: 452k59252058dkfj34924lk34
    CSeq: 3 NOTIFY
    Contact: <sip:ptolemy@cartouche.rosettastone.example.com>
    Event: dialog
    Subscription-State: active;expires=180
    Allow-Events: dialog Content-Type: application/xml+dialog
    Content-Length: ...
    
    (XML Message body not shown...)

     

    NOTIFY请求的必要头域:

    NOTIFY请求的必要头域
    To
    Via
    To
    From
    Call-ID
    CSeq
    Max-Forwards
    Event
    Allow-Events
    Subscription-State

     

    展开全文
  • Subscriber subscribe(const std::string& topic, uint32_t queue_size, void(T::*fp)(M), T* obj, const TransportHints& transport_hints = TransportHints())\ Subscriber NodeHandle::subscribe...

    一个简单的订阅程序如下:

    File: /home/lgy/ros_catkin_ws/src/ros_tutorials/roscpp_tutorials/listener/listener.cpp
    
    29: #include "ros/ros.h"
    30: #include "std_msgs/String.h"
    
    36: void chatterCallback(const std_msgs::String::ConstPtr& msg)
    37: {
    38:   ROS_INFO("I heard: [%s]", msg->data.c_str());
    39: }
    
    42: int main(int argc, char **argv)
    43: {
    54:   ros::init(argc, argv, "listener");
    
    61:   ros::NodeHandle n;
    
    79:   ros::Subscriber sub = n.subscribe("chatter", 1000, chatterCallback);
    
    88:   ros::spin();
    
    91:   return 0;
    92: }
    

    我们分析
    ros::Subscriber sub = n.subscribe(“chatter”, 1000, chatterCallback);订阅消息时做了什么?
    订阅的消息时如何收到并如何处理的?
    ros::spin();做了什么?

    一、先分析subscribe

    ros::Subscriber sub = n.subscribe(“chatter”, 1000, chatterCallback);

    函数调用如下:

    File: /home/lgy/ros_catkin_ws/src/ros_comm/roscpp/include/ros/node_handle.h
    401:   template<class M, class T>
    402:   Subscriber subscribe(const std::string& topic, uint32_t queue_size, void(T::*fp)(M), T* obj, 
    403:                        const TransportHints& transport_hints = TransportHints())
    404:   {
    405:     SubscribeOptions ops;
    406:     ops.template initByFullCallbackType<M>(topic, queue_size, boost::bind(fp, obj, _1));
    407:     ops.transport_hints = transport_hints;
    408:     return subscribe(ops);
    409:   }
    
    File: /home/lgy/ros_catkin_ws/src/ros_comm/roscpp/include/ros/subscribe_options.h
    81:   template<class P>
    82:   void initByFullCallbackType(const std::string& _topic, uint32_t _queue_size,
    83:        const boost::function<void (P)>& _callback,
    84:        const boost::function<boost::shared_ptr<typename ParameterAdapter<P>::Message>(void)>& factory_fn = DefaultMessageCreator<typename ParameterAdapter<P>::Message>())
    85:   {
    86:     typedef typename ParameterAdapter<P>::Message MessageType;
    87:     topic = _topic;
    88:     queue_size = _queue_size;
    89:     md5sum = message_traits::md5sum<MessageType>();
    90:     datatype = message_traits::datatype<MessageType>();
    91:     helper = boost::make_shared<SubscriptionCallbackHelperT<P> >(_callback, factory_fn);
    92:   }
    
    File: /home/lgy/ros_catkin_ws/src/ros_comm/roscpp/include/ros/subscription_callback_helper.h
    102:   SubscriptionCallbackHelperT(const Callback& callback, 
    103: 			      const CreateFunction& create = DefaultMessageCreator<NonConstType>())
    104:     : callback_(callback)
    105:     , create_(create)
    106:   { }
    

    函数调用关系总结如下:

    Subscriber subscribe(const std::string& topic, uint32_t queue_size, void(T::*fp)(M), T* obj,...)
    	subscribe_options::initByFullCallbackType
    		 helper = boost::make_shared<SubscriptionCallbackHelperT<P> >(_callback, factory_fn);
    		 	SubscriptionCallbackHelperT(const Callback& callback, const CreateFunction& create = DefaultMessageCreator<NonConstType>())//这里请留意callback通过构造函数最终赋值给callback_中。
    	subscribe(ops);
    

    这里请留意callback通过构造函数最终赋值给callback_中。
    接着向下分析

    File: /home/lgy/ros_catkin_ws/src/ros_comm/roscpp/src/libros/node_handle.cpp
    322: Subscriber NodeHandle::subscribe(SubscribeOptions& ops)
    323: {
    324:   ops.topic = resolveName(ops.topic);
    325:   if (ops.callback_queue == 0)
    326:   {
    327:     if (callback_queue_)
    328:     {
    329:       ops.callback_queue = callback_queue_;
    330:     }
    331:     else
    332:     {
    333:       ops.callback_queue = getGlobalCallbackQueue();
    334:     }
    335:   }
    336: 
    337:   if (TopicManager::instance()->subscribe(ops))
    338:   {
    339:     Subscriber sub(ops.topic, *this, ops.helper);
    340: 
    341:     {
    342:       boost::mutex::scoped_lock lock(collection_->mutex_);
    343:       collection_->subs_.push_back(sub.impl_);
    344:     }
    345: 
    346:     return sub;
    347:   }
    348: 
    349:   return Subscriber();
    350: }
    
    File: /home/lgy/ros_catkin_ws/src/ros_comm/roscpp/src/libros/topic_manager.cpp
    245: // this function has the subscription code that doesn't need to be templated.
    246: 
    247: {
    248:   boost::mutex::scoped_lock lock(subs_mutex_);
    249: 
    250:   if (addSubCallback(ops))
    251:   {
    252:     return true;
    253:   }
    254: 
    255:   if (isShuttingDown())
    256:   {
    257:     return false;
    258:   }
    259: 
    260:   if (ops.md5sum.empty())
    261:   {
    262:     throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] with an empty md5sum");
    263:   }
    264: 
    265:   if (ops.datatype.empty())
    266:   {
    267:     throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] with an empty datatype");
    268:   }
    269: 
    270:   if (!ops.helper)
    271:   {
    272:     throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] without a callback");
    273:   }
    274: 
    275:   const std::string& md5sum = ops.md5sum;
    276:   std::string datatype = ops.datatype;
    277: 
    278:   SubscriptionPtr s(boost::make_shared<Subscription>(ops.topic, md5sum, datatype, ops.transport_hints));
    279:   s->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks);
    280: 
    281:   if (!registerSubscriber(s, ops.datatype))
    282:   {
    283:     ROS_WARN("couldn't register subscriber on topic [%s]", ops.topic.c_str());
    284:     s->shutdown();
    285:     return false;
    286:   }
    287: 
    288:   subscriptions_.push_back(s);
    289: 
    290:   return true;
    291: }
    
    File: /home/lgy/ros_catkin_ws/src/ros_comm/roscpp/src/libros/topic_manager.cpp
    198: bool TopicManager::addSubCallback(const SubscribeOptions& ops)
    199: {
    200:   // spin through the subscriptions and see if we find a match. if so, use it.
    201:   bool found = false;
    202:   bool found_topic = false;
    203: 
    204:   SubscriptionPtr sub;
    205: 
    206:   {
    207:     if (isShuttingDown())
    208:     {
    209:       return false;
    210:     }
    211: 
    212:     for (L_Subscription::iterator s = subscriptions_.begin();
    213:          s != subscriptions_.end() && !found; ++s)
    214:     {
    215:       sub = *s;
    216:       if (!sub->isDropped() && sub->getName() == ops.topic)
    217:       {
    218:         found_topic = true;
    219:         if (md5sumsMatch(ops.md5sum, sub->md5sum()))
    220:         {
    221:           found = true;
    222:         }
    223:         break;
    224:       }
    225:     }
    226:   }
    227: 
    228:   if (found_topic && !found)
    229:   {
    230:     std::stringstream ss;
    231:     ss << "Tried to subscribe to a topic with the same name but different md5sum as a topic that was already subscribed [" << ops.datatype << "/" << ops.md5sum << " vs. " << sub->datatype() << "/" << sub->md5sum() << "]";
    232:     throw ConflictingSubscriptionException(ss.str());
    233:   }
    234:   else if (found)
    235:   {
    236:     if (!sub->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks))
    237:     {
    238:       return false;
    239:     }
    240:   }
    241: 
    242:   return found;
    243: }
    

    我们看看函数调用关系:

    ros::Subscriber sub = n.subscribe("chatter", 1000, chatterCallback);
    	Subscriber subscribe(const std::string& topic, uint32_t queue_size, void(T::*fp)(M), T* obj, const TransportHints& transport_hints = TransportHints())\
    		Subscriber NodeHandle::subscribe(SubscribeOptions& ops)
    			//情况1:掉用addSubCallback增加callback,如果是已经注册过的可以添加成功,否则失败继续向下执行
    			bool TopicManager::addSubCallback(const SubscribeOptions& ops)
    				bool Subscription::addCallback(const SubscriptionCallbackHelperPtr& helper, const std::string& md5sum, CallbackQueueInterface* queue, int32_t queue_size, const VoidConstPtr& tracked_object, bool allow_concurrent_callbacks)
    
    			//情况2:这里创建订阅器 增加回调函数 并保存下来
    			subscriptionPtr s(boost::make_shared<Subscription>(ops.topic, md5sum, datatype, ops.transport_hints));
    				bool Subscription::addCallback(const SubscriptionCallbackHelperPtr& helper, const std::string& md5sum, CallbackQueueInterface* queue, int32_t queue_size, const VoidConstPtr& tracked_object, bool allow_concurrent_callbacks)
    
    			s->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks);
    			subscriptions_.push_back(s);
    

    subscribe会调用addSubCallback增加callback,如果是已经注册过的可以添加成功;否则创建订阅器 增加回调函数。
    两种情况最终都调用Subscription::addCallback(…),该函数如下:

    File: /home/lgy/ros_catkin_ws/src/ros_comm/roscpp/src/libros/subscription.cpp
    681: bool Subscription::addCallback(const SubscriptionCallbackHelperPtr& helper, const std::string& md5sum, CallbackQueueInterface* queue, int32_t queue_size, const VoidConstPtr& tracked_object, bool allow_concurrent_callbacks)
    682: {
    683:   ROS_ASSERT(helper);
    684:   ROS_ASSERT(queue);
    685: 
    686:   statistics_.init(helper);
    687: 
    688:   // Decay to a real type as soon as we have a subscriber with a real type
    689:   {
    690:     boost::mutex::scoped_lock lock(md5sum_mutex_);
    691:     if (md5sum_ == "*" && md5sum != "*")
    692:     {
    693: 
    694:       md5sum_ = md5sum;
    695:     }
    696:   }
    697: 
    698:   if (md5sum != "*" && md5sum != this->md5sum())
    699:   {
    700:     return false;
    701:   }
    702: 
    703:   {
    704:     boost::mutex::scoped_lock lock(callbacks_mutex_);
    705: 
    706:     CallbackInfoPtr info(boost::make_shared<CallbackInfo>());
    707:     info->helper_ = helper;
    708:     info->callback_queue_ = queue;
    709:     info->subscription_queue_ = boost::make_shared<SubscriptionQueue>(name_, queue_size, allow_concurrent_callbacks);
    710:     info->tracked_object_ = tracked_object;
    711:     info->has_tracked_object_ = false;
    712:     if (tracked_object)
    713:     {
    714:       info->has_tracked_object_ = true;
    715:     }
    716: 
    717:     if (!helper->isConst())
    718:     {
    719:       ++nonconst_callbacks_;
    720:     }
    721: 
    722:     callbacks_.push_back(info);
    723:     cached_deserializers_.reserve(callbacks_.size());
    724: 
    725:     // if we have any latched links, we need to immediately schedule callbacks
    726:     if (!latched_messages_.empty())
    727:     {
    728:       boost::mutex::scoped_lock lock(publisher_links_mutex_);
    729: 
    730:       V_PublisherLink::iterator it = publisher_links_.begin();
    731:       V_PublisherLink::iterator end = publisher_links_.end();
    732:       for (; it != end;++it)
    733:       {
    734:         const PublisherLinkPtr& link = *it;
    735:         if (link->isLatched())
    736:         {
    737:           M_PublisherLinkToLatchInfo::iterator des_it = latched_messages_.find(link);
    738:           if (des_it != latched_messages_.end())
    739:           {
    740:             const LatchInfo& latch_info = des_it->second;
    741: 
    742:             MessageDeserializerPtr des(boost::make_shared<MessageDeserializer>(helper, latch_info.message, latch_info.connection_header));
    743:             bool was_full = false;
    744:             info->subscription_queue_->push(info->helper_, des, info->has_tracked_object_, info->tracked_object_, true, latch_info.receipt_time, &was_full);
    745:             if (!was_full)
    746:             {
    747:               info->callback_queue_->addCallback(info->subscription_queue_, (uint64_t)info.get());
    748:             }
    749:           }
    750:         }
    751:       }
    752:     }
    753:   }
    754: 
    755:   return true;
    756: }
    

    722:
    Subscription::addCallback先检查参数合法性,然后创建一个CallbackInfoptr info,最终调用callbacks_.push_back(info)。

    二、订阅消息是怎么接收并处理的?

    在之前的文章中我们分析了同进程消息发布与订阅的处理流程:

    void IntraProcessSubscriberLink::enqueueMessage(const SerializedMessage& m, bool ser, bool nocopy)
        void IntraProcessPublisherLink::handleMessage(const SerializedMessage& m, bool ser, bool nocopy)
            uint32_t Subscription::handleMessage(const SerializedMessage& m, bool ser, bool nocopy, const boost::shared_ptr<M_string>& connection_header, const PublisherLinkPtr& link)
    

    我们可以看到同进程的publish消息一路调用,直接调用到订阅器的消息处理了。

    在另外一篇文章中我们分析了通过TCP/UDP发布与订阅消息处理流程:

    TransportPublisherLink::onHeaderReceived-->TransportPublisherLink::onMessageLength-->TransportPublisherLink::onMessage-->TransportPublisherLink::handleMessage-->Subscription::handleMessage
    
    这样就完成了数据subscription节点的数据接收并调用Subscription::handleMessage,后续文章分析。
    

    可以看到这两种情况最终又调用到Subscription::handleMessage,我们分析如下流程:

    File: /home/lgy/ros_catkin_ws/src/ros_comm/roscpp/src/libros/subscription.cpp
    599: uint32_t Subscription::handleMessage(const SerializedMessage& m, bool ser, bool nocopy, const boost::shared_ptr<M_string>& connection_header, const PublisherLinkPtr& link)
    600: {
    601:   boost::mutex::scoped_lock lock(callbacks_mutex_);
    602: 
    603:   uint32_t drops = 0;
    604: 
    605:   // Cache the deserializers by type info.  If all the subscriptions are the same type this has the same performance as before.  If
    606:   // there are subscriptions with different C++ type (but same ROS message type), this now works correctly rather than passing
    607:   // garbage to the messages with different C++ types than the first one.
    608:   cached_deserializers_.clear();
    609: 
    610:   ros::Time receipt_time = ros::Time::now();
    611: 
    		//遍历callbacks_
    612:   for (V_CallbackInfo::iterator cb = callbacks_.begin();
    613:        cb != callbacks_.end(); ++cb)
    614:   {
    615:     const CallbackInfoPtr& info = *cb;
    616: 
    617:     ROS_ASSERT(info->callback_queue_);
    618: 
    619:     const std::type_info* ti = &info->helper_->getTypeInfo();
    620: 
    621:     if ((nocopy && m.type_info && *ti == *m.type_info) || (ser && (!m.type_info || *ti != *m.type_info)))
    622:     {
    623:       MessageDeserializerPtr deserializer;
    624: 
    625:       V_TypeAndDeserializer::iterator des_it = cached_deserializers_.begin();
    626:       V_TypeAndDeserializer::iterator des_end = cached_deserializers_.end();
    627:       for (; des_it != des_end; ++des_it)
    628:       {
    629:         if (*des_it->first == *ti)
    630:         {
    631:           deserializer = des_it->second;
    632:           break;
    633:         }
    634:       }
    635: 
    636:       if (!deserializer)
    637:       {
    638:         deserializer = boost::make_shared<MessageDeserializer>(info->helper_, m, connection_header);
    639:         cached_deserializers_.push_back(std::make_pair(ti, deserializer));
    640:       }
    641: 
    642:       bool was_full = false;
    643:       bool nonconst_need_copy = false;
    644:       if (callbacks_.size() > 1)
    645:       {
    646:         nonconst_need_copy = true;
    647:       }
    648: 
    649:       info->subscription_queue_->push(info->helper_, deserializer, info->has_tracked_object_, info->tracked_object_, nonconst_need_copy, receipt_time, &was_full);
    650: 
    651:       if (was_full)
    652:       {
    653:         ++drops;
    654:       }
    655:       else
    656:       {
    657:         info->callback_queue_->addCallback(info->subscription_queue_, (uint64_t)info.get());
    658:       }
    659:     }
    660:   }
    661: 
    662:   // measure statistics
    663:   statistics_.callback(connection_header, name_, link->getCallerID(), m, link->getStats().bytes_received_, receipt_time, drops > 0);
    664: 
    665:   // If this link is latched, store off the message so we can immediately pass it to new subscribers later
    666:   if (link->isLatched())
    667:   {
    668:     LatchInfo li;
    669:     li.connection_header = connection_header;
    670:     li.link = link;
    671:     li.message = m;
    672:     li.receipt_time = receipt_time;
    673:     latched_messages_[link] = li;
    674:   }
    675: 
    676:   cached_deserializers_.clear();
    677: 
    678:   return drops;
    679: }
    

    通过代码看这里遍历callbacks_,判断队列是否满:如果满的话就丢弃了,否则加入到处理队列。消息处理就结束了?怎么没有调用我们的回调函数呢?我们继续向后看。

    三、ros::spin();做了什么?

    File: /home/lgy/ros_catkin_ws/src/ros_comm/roscpp/src/libros/init.cpp
    560: void spin()
    561: {
    562:   SingleThreadedSpinner s;
    563:   spin(s);
    564: }
    565: 
    566: void spin(Spinner& s)
    567: {
    568:   s.spin();
    569: }
    
    File: /home/lgy/ros_catkin_ws/src/ros_comm/roscpp/src/libros/spinner.cpp
    123: void SingleThreadedSpinner::spin(CallbackQueue* queue)
    124: {
    125:   if (!queue)
    126:   {
    127:     queue = getGlobalCallbackQueue();
    128:   }
    129: 
    130:   if (!spinner_monitor.add(queue, true))
    131:   {
    132:     std::string errorMessage = "SingleThreadedSpinner: " + DEFAULT_ERROR_MESSAGE + " You might want to use a MultiThreadedSpinner instead.";
    133:     ROS_FATAL_STREAM(errorMessage);
    134:     throw std::runtime_error(errorMessage);
    135:   }
    136: 
    137:   ros::WallDuration timeout(0.1f);
    138:   ros::NodeHandle n;
    139:   while (n.ok())
    140:   {
    141:     queue->callAvailable(timeout);
    142:   }
    143:   spinner_monitor.remove(queue);
    144: }
    
    File: /home/lgy/ros_catkin_ws/src/ros_comm/roscpp/src/libros/callback_queue.cpp
    301: void CallbackQueue::callAvailable(ros::WallDuration timeout)
    302: {
    303:   setupTLS();
    304:   TLS* tls = tls_.get();
    305: 
    306:   {
    307:     boost::mutex::scoped_lock lock(mutex_);
    308: 
    309:     if (!enabled_)
    310:     {
    311:       return;
    312:     }
    313: 
    314:     if (callbacks_.empty())
    315:     {
    316:       if (!timeout.isZero())
    317:       {
    318:         condition_.wait_for(lock, boost::chrono::nanoseconds(timeout.toNSec()));
    319:       }
    320: 
    321:       if (callbacks_.empty() || !enabled_)
    322:       {
    323:         return;
    324:       }
    325:     }
    326: 
    327:     bool was_empty = tls->callbacks.empty();
    328: 
    329:     tls->callbacks.insert(tls->callbacks.end(), callbacks_.begin(), callbacks_.end());
    330:     callbacks_.clear();
    331: 
    332:     calling_ += tls->callbacks.size();
    333: 
    334:     if (was_empty)
    335:     {
    336:       tls->cb_it = tls->callbacks.begin();
    337:     }
    338:   }
    339: 
    340:   size_t called = 0;
    341: 
    342:   while (!tls->callbacks.empty())
    343:   {
    344:     if (callOneCB(tls) != Empty)
    345:     {
    346:       ++called;
    347:     }
    348:   }
    349: 
    350:   {
    351:     boost::mutex::scoped_lock lock(mutex_);
    352:     calling_ -= called;
    353:   }
    354: }
    
    File: /home/lgy/ros_catkin_ws/src/ros_comm/roscpp/src/libros/callback_queue.cpp
    356: CallbackQueue::CallOneResult CallbackQueue::callOneCB(TLS* tls)
    357: {
    358:   // Check for a recursive call.  If recursive, increment the current iterator.  Otherwise
    359:   // set the iterator it the beginning of the thread-local callbacks
    360:   if (tls->calling_in_this_thread == 0xffffffffffffffffULL)
    361:   {
    362:     tls->cb_it = tls->callbacks.begin();
    363:   }
    364: 
    365:   if (tls->cb_it == tls->callbacks.end())
    366:   {
    367:     return Empty;
    368:   }
    369: 
    370:   ROS_ASSERT(!tls->callbacks.empty());
    371:   ROS_ASSERT(tls->cb_it != tls->callbacks.end());
    372: 
    373:   CallbackInfo info = *tls->cb_it;
    374:   CallbackInterfacePtr& cb = info.callback;
    375: 
    376:   IDInfoPtr id_info = getIDInfo(info.removal_id);
    377:   if (id_info)
    378:   {
    379:     boost::shared_lock<boost::shared_mutex> rw_lock(id_info->calling_rw_mutex);
    380: 
    381:     uint64_t last_calling = tls->calling_in_this_thread;
    382:     tls->calling_in_this_thread = id_info->id;
    383: 
    384:     CallbackInterface::CallResult result = CallbackInterface::Invalid;
    385: 
    386:     {
    387:       // Ensure that thread id gets restored, even if callback throws.
    388:       // This is done with RAII rather than try-catch so that the source
    389:       // of the original exception is not masked in a crash report.
    390:       BOOST_SCOPE_EXIT(&tls, &last_calling)
    391:       {
    392:         tls->calling_in_this_thread = last_calling;
    393:       }
    394:       BOOST_SCOPE_EXIT_END
    395: 
    396:       if (info.marked_for_removal)
    397:       {
    398:         tls->cb_it = tls->callbacks.erase(tls->cb_it);
    399:       }
    400:       else
    401:       {
    402:         tls->cb_it = tls->callbacks.erase(tls->cb_it);
    403:         result = cb->call();
    404:         if (result == CallbackInterface::Success)
    405:         {
    406:           condition_.notify_one();
    407:         }
    408:       }
    409:     }
    410: 
    411:     // Push TryAgain callbacks to the back of the shared queue
    412:     if (result == CallbackInterface::TryAgain && !info.marked_for_removal)
    413:     {
    414:       boost::mutex::scoped_lock lock(mutex_);
    415:       callbacks_.push_back(info);
    416: 
    417:       return TryAgain;
    418:     }
    419: 
    420:     return Called;
    421:   }
    422:   else
    423:   {
    424:     tls->cb_it = tls->callbacks.erase(tls->cb_it);
    425:   }
    426: 
    427:   return Called;
    428: }
    

    函数调用关系如下:

    void spin()
    	void SingleThreadedSpinner::spin(CallbackQueue* queue)
    		void CallbackQueue::callAvailable(ros::WallDuration timeout)
    			CallbackQueue::CallOneResult CallbackQueue::callOneCB(TLS* tls)
    		        CallbackInterfacePtr& cb = info.callback; result = cb->call();
    

    函数一路调用到cb->call()。CallbackInterfacePtr& cb是CallbackInterface指针。

    class ROSCPP_DECL SubscriptionQueue : public CallbackInterface, public boost::enable_shared_from_this<SubscriptionQueue>
    

    SubscriptionQueue继承自CallbackInterface,接着看subscriptionQueue::call()

    File: /home/lgy/ros_catkin_ws/src/ros_comm/roscpp/src/libros/subscription_queue.cpp
    102: CallbackInterface::CallResult SubscriptionQueue::call()
    103: {
    104:   // The callback may result in our own destruction.  Therefore, we may need to keep a reference to ourselves
    105:   // that outlasts the scoped_try_lock
    106:   boost::shared_ptr<SubscriptionQueue> self;
    107:   boost::recursive_mutex::scoped_try_lock lock(callback_mutex_, boost::defer_lock);
    108: 
    109:   if (!allow_concurrent_callbacks_)
    110:   {
    111:     lock.try_lock();
    112:     if (!lock.owns_lock())
    113:     {
    114:       return CallbackInterface::TryAgain;
    115:     }
    116:   }
    117: 
    118:   VoidConstPtr tracker;
    119:   Item i;
    120: 
    121:   {
    122:     boost::mutex::scoped_lock lock(queue_mutex_);
    123: 
    124:     if (queue_.empty())
    125:     {
    126:       return CallbackInterface::Invalid;
    127:     }
    128: 
    129:     i = queue_.front();
    130: 
    131:     if (queue_.empty())
    132:     {
    133:       return CallbackInterface::Invalid;
    134:     }
    135: 
    136:     if (i.has_tracked_object)
    137:     {
    138:       tracker = i.tracked_object.lock();
    139: 
    140:       if (!tracker)
    141:       {
    142:         return CallbackInterface::Invalid;
    143:       }
    144:     }
    145: 
    146:     queue_.pop_front();
    147:     --queue_size_;
    148:   }
    149: 
    150:   VoidConstPtr msg = i.deserializer->deserialize();
    151: 
    152:   // msg can be null here if deserialization failed
    153:   if (msg)
    154:   {
    155:     try
    156:     {
    157:       self = shared_from_this();
    158:     }
    159:     catch (boost::bad_weak_ptr&) // For the tests, where we don't create a shared_ptr
    160:     {}
    161: 
    162:     SubscriptionCallbackHelperCallParams params;
    163:     params.event = MessageEvent<void const>(msg, i.deserializer->getConnectionHeader(), i.receipt_time, i.nonconst_need_copy, MessageEvent<void const>::CreateFunction());
    164:     i.helper->call(params);
    165:   }
    166: 
    167:   return CallbackInterface::Success;
    168: }
    
    
    File: /home/lgy/ros_catkin_ws/src/ros_comm/roscpp/include/ros/subscription_callback_helper.h
    141:   virtual void call(SubscriptionCallbackHelperCallParams& params)
    142:   {
    143:     Event event(params.event, create_);
    144:     callback_(ParameterAdapter<P>::getParameter(event));
    145:   }
    

    函数调用如下:

     CallbackInterface::CallResult SubscriptionQueue::call()
     	i = queue_.front();
     	VoidConstPtr msg = i.deserializer->deserialize();
     	i.helper->call(params);
     		callback_(ParameterAdapter<P>::getParameter(event));
    

    请再次留意下subscribe做的工作:

     Subscriber subscribe(const std::string& topic, uint32_t queue_size, void(T::*fp)(M), T* obj,...)
    	subscribe_options::initByFullCallbackType
    		 helper = boost::make_shared<SubscriptionCallbackHelperT<P> >(_callback, factory_fn);
    		 	SubscriptionCallbackHelperT(const Callback& callback, const CreateFunction& create = DefaultMessageCreator<NonConstType>())//这里请留意callback通过构造函数最终赋值给callback_中。		 	
    

    这里的callback_就是Subscriber subscribe时的callback,这样终于调用到了用户的回调函数。

    展开全文
  • let disposeBag = DisposeBag() Observable.of("A", "B", "C") .subscribe { print($0) }.disposed(by: disposeBag) 该create运营商有一个名为一个参数subscribe。它的工作是提供可观察的调用订阅的实现。 使用...

    可观察的生命周期

    在这里插入图片描述

    在上图中,可观察到的发射了9个元素。当一个可观察对象发出一个元素时,它会在下一个事件中发出它。

    Observable发出三个轻击事件,然后结束。这称为completed事件。

    可观察对象发出error包含错误的事件。如果一个可观察对象发出一个error事件,则它也将终止并且不能再发出其他任何事件。

    一个observable发出next包含元素的事件。

    初始化环境

    创建一个项目命名为RXSwiftDemo,在命令行pod init, 在podfile 中填写如下内容,并运行pod install.

    # Uncomment the next line to define a global platform for your project
    # platform :ios, '9.0'
    
    target 'RXSwiftDemo' do
      # Comment the next line if you don't want to use dynamic frameworks
      use_frameworks!
    
      # Pods for RXSwiftDemo
      pod 'RxSwift', '6.1.0'
      pod 'RxCocoa', '6.1.0'
    
      target 'RXSwiftDemoUITests' do
        pod 'RxBlocking', '6.1.0'
        pod 'RxTest', '6.1.0'
      end
    
    end
    
    

    打开workspace就可以进行实战。

    创建observable

    在这里插入图片描述

    just恰当地命名,因为它所做的只是创建一个包含just单个元素的可观察序列。

    在操作者从元件的规则阵列创建一个可观察到的个体类型实例的from

    您可以of用来创建observable的数组或创建单个类型的可观察的数组。

    下面的示例可以辨别justoffrom

     let one = 1
      let two = 2
      let three = 3
      /// This is a singe element observable
      let observable: Observable<Int> = Observable<Int>.just(one)
      
      /// This is an observable of individual type instances from a array
      let observable2 = Observable.of(one, two, three)
      
      /// This is an observables array using of to create
      let observable3 = Observable.of([one, two, three])
      
      /// This is an observable of individual type instances from a array
      let observable4 = Observable.from([one, two, three])
    

    订阅观察值subscribe

    在这里插入图片描述
    观察者 是用来监听事件,然后它需要这个事件做出响应。例如:弹出提示框就是观察者,它对点击按钮这个事件做出响应。

    订阅RxSwift可观察对象是非常相似的;您称观察为可观察的subscribing。

    因此addObserver(),您可以使用代替subscribe()

    NotificationCenter开发人员通常仅使用其.default单例实例不同,Rx中的每个可观察实例都不同。

    更重要的是,observable只有拥有订阅者才能发送事件。
    遵守可观察变量的例子

    let one = 1
      let two = 2
      let three = 3
      /// This is an observable of individual type instances from a array
      let observable = Observable.of(one, two, three)
      
      /// Subscribes an element handler
      observable.subscribe(onNext: { element in
        print(element) // Result will be: 1,2,3
      })
      
      /// Subscribes an event handler to an observable sequence.
      observable.subscribe { event in
        print(event) // Result will be: next(1) next(2) next(3) completed
      }
    

    empty操作者创建具有零个元素的空可观察序列。它只会发出一个.completed事件。

    let observable = Observable<Void>.empty()
        observable
          .subscribe(
            onNext: { element in
              print(element)
          },
            /// When empty an observable, it will go to completed block
            onCompleted: {
              print("Completed")
          }
        )
    

    never操作创建一个可观察到的不排放任何东西,从来没有终止

    /// If never an observable, it will not emit anything
    let observable = Observable<Any>.never()
        observable
          .subscribe(
            onNext: { element in
              print(element)
          },
            onCompleted: {
              print("Completed")
          }
        )
    

    处置和终止dispose

    在这里插入图片描述

    要明确取消订阅,请调用dispose()它。取消订阅或取消订阅后,dispose当前示例中的observable将停止发出事件

    let observable = Observable.of("A", "B", "C")
      let subscription = observable.subscribe { event in
        print(event)
      }
      /// When dispose, the observable can not emit anything
      subscription.dispose()
    

    单独管理每个订阅将很乏味,因此RxSwift包含一个DisposeBag类型。处置袋可容纳通常使用此.disposed(by: disposeBag)方法添加的一次性用品,并dispose()在处置袋即将被释放时将呼叫每个处置袋。

    let disposeBag = DisposeBag()
            Observable.of("A", "B", "C")
                .subscribe {
                print($0)
                }.disposed(by: disposeBag)
    

    create运营商有一个名为一个参数subscribe。它的工作是提供可观察的调用订阅的实现。
    使用create定制的可观察

    let disposeBag = DisposeBag()
    Observable<String>.create { (observer) -> Disposable in
                observer.onNext("1")
                observer.onNext("?")
                
                return Disposables.create()
            }.subscribe(
                onNext: { print($0) },
                onError: { print($0) },
                onCompleted: { print("Completed") })
            .disposed(by: disposeBag)
    

    Operator - 操作符 - Filter

    操作符可以帮助大家创建新的序列,或者变化组合原有的序列,从而生成一个新的序列。

    我们之前在输入验证例子中就多次运用到操作符。例如,通过 map 方法将输入的用户名,转换为用户名是否有效。然后用这个转化后来的序列来控制红色提示语是否隐藏。我们还通过 combineLatest 方法,将用户名是否有效和密码是否有效合并成两者是否同时有效。然后用这个合成后来的序列来控制按钮是否可点击。

    这里 map 和 combineLatest 都是操作符,它们可以帮助我们构建所需要的序列。现在,我们再来看几个例子:
    在这里插入图片描述
    filter - 过滤
    在这里插入图片描述

    let disposeBag = DisposeBag()
    let rxTemperature: Observable<Double> = Observable.of(10.0, 11.0, 20.0, 35.0, 40.0)
            rxTemperature.filter { temperature in
                temperature > 33
            }.subscribe(onNext: { temperature in
                print("high temperature \(temperature)°")
            }).disposed(by: disposeBag)
    

    建立可观察的工厂

    deferred 延迟读取值直到订阅.

    deferred有一个需要记住的特征。每次有新观察者时,deferred都会从其关闭中创建Observable。这意味着订阅2次将创建2个可观察对象。它会影响性能,但不太可能发生
    deferred订阅长计算功能时会有所帮助(如果使用create或,则会阻塞just

    let disposeBag = DisposeBag()
    var flip = false
            
            let factory: Observable<Int> = Observable.deferred { () -> Observable<Int> in
                flip = !flip
                
                if flip {
                    return Observable.of(1, 2, 3)
                } else {
                    return Observable.of(4, 5, 6)
                }
            }
            
            for _ in 0...1 {
                factory.subscribe(onNext: {
                    print($0, terminator: " ... ")
                }).disposed(by: disposeBag)
                print()
            }
    

    参考

    https://medium.com/@duydtdev/observables-in-rxswift-f2f451df49b7

    https://beeth0ven.github.io/RxSwift-Chinese-Documentation/content/rxswift_core/observable.html

    https://github.com/ReactiveX/RxSwift

    展开全文
  • const action = {type:'addNum'} store.dispatch(action) 5-3.subscribe 订阅store中的状态数据 componentDidMount(){ store.subscribe(()=>{ this.setState(store.getState()) }) } 5-4.取消监听 // 组件挂载完成,...
  • 探究WebClient的block和subscribe call有无return的服务的区别 Java代码 Server Server1 @RequestMapping("/server1") public void server1(){ for (int i = 0; i < 5; i++) { try { Thread.sleep(1000); } ...
  • redis subscribe 连接中断

    2021-03-13 17:05:16
    115) at redis.clients.jedis.Jedis.subscribe(Jedis.java:2628) at redis.clients.jedis.JedisCluster$150.execute(JedisCluster.java:1624) at redis.clients.jedis.JedisCluster$150.execute(JedisCluster.java:...
  • 所以create操作符是用来创建一个Observable的,我们直接上示例代码: Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable { ...
  • RabbitMQ的Publish/Subscribe发布与订阅模式 java_久孤是一名对技术持有独钟热爱的java资深程序员,崇尚程序界的开源精神,乐于做一个技术价值分享的博主,愿程序在你我这永远不迷茫 a.模式模型图: 发布订阅模式...
  • vuex中subscribe的使用

    2021-05-21 16:50:13
    会在每个 mutation 完成后调用,接收 mutation 和经过 mutation 后的... } } }) store.subscribe((mutation,state) => { console.log(mutation.type) console.log(state) }) /* 20 add { name : 'xxx', age : 20} */
  • Subscriber class xxx and its super classes have no public methods with the @Subscribe annotation 遇到异常不要慌,其实已经提示的很明显了,指定了具体的class,且明确告诉你这个类没有@Subscribe注解。 原因...
  • ros::Subscriber sub_lidarProp = n.subscribe("lidar_prop", 1000, imgCallBack); ros::spin(); return 0; } 编译通过,imgCallBack中无消息打印。 subscribe中字符串的名字改为 /adasim/lidar_prop 可以正确打印...
  • const aa = new Subject(); const sub2 = new Subject(); aa.subscribe(sub2); sub2.subscribe((data) => { console.log(data); }); aa.next(1); // sub2的订阅会收到aa.next的1;
  • 使用的模式主要有三种Publish/Subscribe,Request/Response,Send/Receive Publish/Subscribe //Publish代码 using EasyNetQ; using Message; using System; using System.Collections.Generic; using System....
  • 上一篇文章中,我们讲了如何利用Observer(观察者)模式实现多选框的全选, 本篇文章将带来Publish/SubScribe模式,并且利用该模式实现一个简易的消息通知功能,文章的最后还与Oberver进行对比。 什么是发布者/...
  • RabbitMQ的Publish/Subscribe 发布与订阅模式
  • RxJS入门笔记,关于Observable可观察对象、Observer观察者、Subscribe订阅,SubscriptionObservable可观察对象Observer观察者总结整体 本笔记可用于入门理解和记忆RxJS的基本概念和使用。 Observable可观察对象 官方...
  • <div class="parent-box"> <mt-button class="dealIllBtn btnCopy subscribe-btn" type="primary" >处理违章</mt-button > <wx-open-subscribe style="width: 50vw; z-index: 2; ...
  • 微信开放标签 wx-open-subscribe(复数模板)

    千次阅读 热门讨论 2021-02-03 13:52:04
    以为wx-open-subscribe例子,先公众号设置-》功能设置绑定JS接口安全域名 保证"weixin-js-sdk": “^1.6.0” 在1.6或以上 html代码:(这里用的是vue,原生看看官网就行) <wx-open-subscribe style="width: 40...
  • 订阅错误是这部分 .this.commentService.uploadCommentImage(formData).subscribe(data => {console.log(data);}, (err) => {console.error(err); // Error occurred!});commentService.uploadCommentImage - ...
  • Publish/Subscribe模式使用了一个主题/ 事件通道,这个通道介于希望接收到通知的对象(订阅者)和激活事件的对象之间(发布者)。 订阅发布模式定义了一种一对多的依赖关系,让多个订阅者对象同时监听某一个主题...
  • 文章目录 一、创建Publisher 二、创建Subscribe 三、编译功能包 四、运行 4.1 启动roscore 4.2启动节点 4.3关闭退出 参考 一、创建Publisher Publisher的主要作用是针对指定话题发布特定数据类型的消息。 使用代码...
  • Symfony\Component\Console\Application->doRunCommand(Object(App\Console\Commands\RedisSubscribe), Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\...
  • import createStore from 'Redux' const { store } = createStore(reducer) ...store.subscribe(放上view的更新函数)//对于React 则是render和setState 此后 更新函数的每一次变化都会触发view的重新自动渲染
  • 如果是调试测试号,这个错误的原因是需要你去关注公众号。 在测试号的后台会有一个二维码,用你需要调用微信接口的那个微信号扫码,成功之后会有显示已经关注的用户列表。 然后再次调用接口就可以了。...
  • SUBSCRIBE 报文1. 简介2. SUBSCRIBE 报文结构2.1 固定报头2.2 可变报头2.3 有效载荷3. 使用网络调试助手测试4. SUBACK - 订阅确认4.1 固定报头4.2 可变报头4.3 有效载荷 1. 简介 连接成功后,客户端可向服务端发送 ...
  • 其中 ros::NodeHandle nh,nh是实例化的节点句柄,可以根据需要自行修改,ros::Subscriber 是必须的,waypoint_sub可以修改,nh.subscribe句柄名可修改,是自己建立,格式为 功能包名::消息名,("/planning/pos_cmd...
  • 原文出处:Angular7入门辅助教程(五)——Observable(可观察对象) 方法一: import { of } from 'rxjs/observable/of';... observable.subscribe({ next: num => console.log(num) }); //.
  • 3.创建一个类,subscribe可以修改类中成员(第四个参数) Subscriber subscribe(const std::string& topic, uint32_t queue_size, const boost::function<void (const boost::shared_ptr<M const>&...
  • Store 允许使用store.subscribe方法设置监听函数,一旦 State 发生变化,就自动执行这个函数。 store.subscribe方法返回一个函数,调用这个函数就可以解除监听。 完整代码 import React, {useEffect} from 'react';...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 170,599
精华内容 68,239
关键字:

subscribe

友情链接: Email Spammer by Kaami.zip