精华内容
下载资源
问答
  • Conductor 3.2.0 客户端开发记录
    2021-09-22 21:52:23

    不使用Conductor Spring

    和之前版本差不多,但使用TaskRunnerConfigurer 替代WorkflowTaskCoordinator

    import java.util.ArrayList;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import 
    更多相关内容
  • Conductor是一个现代实用程序库,可帮助您使用函数式编程控制执行流程。 :person_in_tuxedo: 描述 它提供了一组实用程序功能,可与异步和同步代码一起使用,从而使您可以用最少的代码非常清楚地控制执行流。 该库...
  • openstack-nova-conductor

    2018-12-14 20:05:16
    openstack云计算的rpm包
  • Print Conductor 中文版是一款用于自动打印的智能软件解决方案。 批量打印 PDF 文件,办公文档,技术图纸,法律文档,协议,演示文稿,文本文件,带附件的电子邮件,图表,电子表格,发票,图像和许多其他类型的文件...
  • 本文是对Conductor微服务编排工具的使用说明,对初学者有很好的启蒙作用。Conductor是使用广泛的无微服务编排工具,具有良好的微服务编排功能和轻量化流程管理控制能力,在微服务架构中使用广泛。
  • Android代码-Conductor

    2019-08-06 05:42:37
    Conductor A small, yet full-featured framework that allows building View-based Android applications. Conductor provides a light-weight wrapper around standard Android Views that does just about ...
  • conductor

    2019-11-27 20:34:29
    这个https://github.com/Netflix/conductor是项目的github地址, 选择SSH或者HTTPs检出,git clonegit@github.com:Netflix/conductor.git(执行git命令需要先安装git)至任意位置。 二、启动本地服务 进入项目...

    一、首先从git上检出项目

    这个https://github.com/Netflix/conductor是项目的github地址,

     

    选择SSH或者HTTPs检出,git clone git@github.com:Netflix/conductor.git(执行git命令需要先安装git)至任意位置。

    二、启动本地服务

    进入项目目录下,右键git bash here,执行命令cd server进入server目录,再执行../gradlew server,然后等待项目构建成功即可。注意:此处JDK版本必须为1.8否则会报错:无效的源发行版: 1.8。启动成功后会创建一个kitchen workflow样例:

    此时可以打开http://localhost:8080/查看Swagger APIs。

    重新打开一个git命令窗口,执行cd ui,进入ui目录,执行gulp watch等待构建成功即可。注意:需要执行gulp命令,首先需要安装node.js,并且由于框架使用了import,node.js版本必须使用8.0以上,node装好后再安装glup。顺序应该为:安装node.js->全局安装gulp->项目安装gulp。

    成功后可打开http://localhost:3000查看任务相关,如下图:

     

    运行时模型:

    Conductor 遵循RPC的通讯模型,其中workers运行在与 服务器分离的应用机器上,并通过HTTP协议或者是轮询的方式来管理work队列。

    1.Workers作为远程系统,通过HTTP协议(或者任意支持的RPC机制)同Conductor 服务进行通信。

    2.任务队列用于为workers安排任务,Conductor使用内部的dyno-queues 作为任务队列,当然也可以使用其他SQS(简单队列)或者pub-sub机制的队列作为代替。

    3.conductor-redis-persistence模块使用Dynomite作为存储引擎,以及Elasticsearch作为索引的解决方案。

    4.对于存储和索引支持不同数据库。

    一个新的workflow的注册和执行需要以下步骤:

    1.定义这个workflow需要的task

    2.创建workflow。

    3.创建消费这些任务的workers并通过轮询的方式来获取任务。

    4.触发工作流执行:

    POST /workflow/{name}{

    ... //json payload as workflow input

    }

    5.轮询获取任务

    GET /tasks/poll/batch/{taskType}

    6.更新任务状态

    POST /tasks{

    "outputData": {

    "encodeResult":"success",

    "location": "http://cdn.example.com/file/location.png"

    //any task specific output

    },

    "status": "COMPLETED"

    }

    展开全文
  • Conductor一个微服务编排引擎
  • conductor.js

    2021-05-31 10:58:06
    Conductor.js 是一个用于创建沙盒化、可重复使用的应用程序的库,这些应用程序可以嵌入到主机应用程序中。 与标准<iframe>相比,使用 Conductor.js 的优势在于它使用了一组明确定义的事件,允许应用程序与其...
  • print conductor

    2018-01-21 09:19:16
    Print Conductor破解版下载 批量打印工具 汉化版本 一键打印
  • Conductor 是一个分布式系统的测试框架。很多测试框架只能测试单机的代码,而 Conductor 是一个分布式系统测试框架,使用 Python 开发,可用于协调一组测试服务。Conductor 系统允许通过单机来控制很多系统进行协调...
  • netflix conductor,用于服务编排,功能比较全,还是纯开源;
  • conductor, selenium 框架,将你带到你想去的地方 导体查看站点名称。 正在启动使用 Maven,将它的包括为一个依赖项:<dependency> <groupId>io.ddavison</groupId> <art
  • Conductor 在同一个地方跨组件组织各种动画
  • Conductor:使用Electron开发的用户界面创作和PHP依赖管理工具
  • 无须事先打开文件,使用Print Conductor同时自动打印不同格式的文件。支持超过75种文件格式。兼容所有打印机。
  • 简介:本文主要介绍netflix conductor的基本概念和主要运行机制。 作者 | 夜阳 来源 | 阿里技术公众号 本文主要介绍netflix conductor的基本概念和主要运行机制。 一 简介 netflix conductor是基于JAVA语言...

    简介:本文主要介绍netflix conductor的基本概念和主要运行机制。

    image.png

    作者 | 夜阳
    来源 | 阿里技术公众号

    本文主要介绍netflix conductor的基本概念和主要运行机制。

    一 简介

    netflix conductor是基于JAVA语言编写的开源流程引擎,用于架构基于微服务的流程。它具备如下特性:

    • 允许创建复杂的业务流程,流程中每个独立的任务都是由一个微服务所实现。
    • 基于JSON DSL 创建工作流,对任务的执行进行编排。
    • 工作流在执行的过程中可见、可追溯。
    • 提供暂停、恢复、重启等多种控制模型。
    • 提供一种简单的方式来最大限度重用微服务。
    • 拥有扩展到百万流程并发运行的服务能力。
    • 通过队列服务实现客户端与服务端的分离。
    • 支持 HTTP 或其他RPC协议进行数据传送

    二 基本概念

    1 Task

    Task是最小执行单元,承载了一段执行逻辑,如发送HTTP请求等。

    • System Task:被conductor服务执行,这些任务的执行与引擎在同一个JVM中。
    • Worker Task:被worker服务执行,执行与引擎隔离开,worker通过队列获取任务后,执行并更新结果状态到引擎。Worker的实现是跨语言的,其使用Http协议与Server通信。

    conductor提供了若干内置SystemTask:

    • 功能性Task:

      • HTTP:发送http请求
      • JSON_JQ_TRANSFORM:jq命令执行,一般用户json的转换,具体可见jq官方文档
      • KAFKA_PUBLISH: 发布kafka消息
    • 流程控制Task:

      • SWITCH(原Decision):条件判断分支,类似于代码中的switch case
      • FORK:启动并行分支,用于调度并行任务
      • JOIN:汇总并行分支,用于汇总并行任务
      • DO_WHILE:循环,类似于代码中的do while
      • WAIT:一直在运行中,直到外部时间触发更新节点状态,可用于等待外部操作
      • SUB_WORKFLOW:子流程,执行其他的流程
      • TERMINATE:结束流程,以指定输出提前结束流程,可以与SWITCH节点配合使用,类似代码中的提前return语句
    • 自定义Task:

      • 对于System Task,Conductor提供了WorkflowSystemTask 抽象类,可以自定义扩展实现。
      • 对于Worker Task,可以实现conductor的client Worker接口实现执行逻辑。

    2 Workflow

    • Workflow由一系列需要执行的Task组成,conductor采用json来描述Task的流转关系。
    • 除基本的顺序流程外,借助内置的SWITCH、FORK、JOIN、DO_WIHLE、TERMINATE任务,还能实现分支、并行、循环、提前结束等流程控制。

    3 Input&Output

    Task的输入是一种映射,其作为工作流实例化的一部分或某些其他Task的输出。允许将来自工作流或其他Task的输入/输出作为随后执行的Task的输入。

    • Task有自己的输入和输出,输入输出都是jsonobject类型。
    • Task可以引用其他Task的输入输出,使用${taskxxx.output}的方式引用。引用语法为json-path,除最基础的${taskxxx.output}的值解析方式外,还支持其他复杂操作,如过滤等,具体见json-path语法。
    • 启动Workflow时可以传入流程的输入数据,Task可以通过${workflow.input}的方式引用。

    Task实现原子操作的处理以及流程控制操作,Workflow定义描述Task的流转关系,Task引用Workflow或者其它Task的输入输出。通过这些机制,conductor实现了JSON DSL对流程的描述。

    三 整体架构

    image.png

    主要分为几个部分:

    • Orchestrator: 负责流程的流转调度工作;
    • Management/Execution Service: 提供流程、任务的管理更新等操作;
    • TaskQueues: 任务队列,Orchestrator解析出来的待执行Task会放到队列中;
    • Worker: 任务执行worker,从TaskQueues中获取任务,通过Execution Service更新任务状态与结果数据;
    • Database: 元数据&运行时数据库,用于保存运行时的Workflow、Task等状态信息,以及流程任务定义的等原信息;
    • Index: 索引数据库,用于存储执行历史;

    四 运行模型

    1 Task状态转移

    • SCHEDULED:待调度,task放到队列中还没有被poll出来执行时的状态
    • IN_PROGRESS:执行中,被poll出来执行但还没有完成时的状态
    • COMPLETED:执行完成
    • FAILED:执行失败
    • CANCELLED:被中止时为此状态,一般出现在两种情况:

      1. 手动中止流程时,正在运行中的task会被置为此状态;
      2. 多个fork分支,当某个分支的task失败时,其它分支中正在运行的task会被置为此状态;

    image.png

    2 任务队列

    任务的执行(同步的系统任务除外)都会先添加到任务队列中,是典型的生产者消费者模式。

    • 任务队列,是一个带有延迟、优先级功能的队列;
    • 每种类型的Task是一个单独的队列,此外,如果配置了domain、isolationGroup,还会拆分成多个队列实现执行隔离;
    • decider service是生产者,其根据流程配置与当前执行情况,解析出可执行的task后,添加到队列;
    • 任务执行器(SystemTaskWorker、Worker)是消费者,其长轮询对应的队列,从队列中获取任务执行;

    队列接口可插拔,conductor提供了Dynomite 、MySQL、PostgreSQL的实现。

    3 核心功能实现机制

    conductor调度的核心是decider service,其根据当前流程运行的状态,解析出将要执行的任务列表,将任务入队交给worker执行。

    decide主要流程简化如下,详细代码见WorkflowExecutor.java的decide方法:

    image.png

    其中,调度任务处理流程简化如下,详细代码见WorkflowExecutor.java的scheduleTask方法:

    image.png

    decide的触发时机

    最主要的触发时机:

    1. 新启动执行时,会触发decide操作
    2. 系统任务执行完成时,会触发decide操作
    3. Workder任务通过ExecutionService更新任务状态时,会触发decide操作

    流程控制节点的实现机制

    1)Task & TaskMapper

    对于每一个Task来说,都有Task和TaskMapper两部分:

    1. Task:任务的执行逻辑代码,它的作用是Task的执行
    2. TaskMapper:任务的映射逻辑代码,它通过Task的定义配置、当前实例的执行状态等信息,返回实际需要执行的Task列表

    对于一般的任务来说,TaskMapper返回的是就是Task本身,补充一些执行实例的状态信息。但是对于控制节点来说,会有不同的逻辑。

    2)条件分支(SWITCH)的实现机制

    SWITCH用于根据条件判断,执行不同的分支。

    实际上,该节点的Task不做任何操作,TaskMapper根据分支条件,判断出要走的分之后,返回对应分支的第一个Task。

    SwitchTaskMapper.java getMappedTasks方法关键代码:

    // 待调度的Task list,最终返回结果
    List<Task> tasksToBeScheduled = new LinkedList<>();
    // evalResult是分支条件变量的值(case)
    // decisionCases是一个Map结构,key为分支的case值,value为对应分支的任务定义list(分支内的任务定义会有多个)
    // 根据分支变量的实际值,获取对应分支的任务定义list
    List<WorkflowTask> selectedTasks = taskToSchedule.getDecisionCases().get(evalResult);
    // default的逻辑:如果获取不到对应的分支或者分支为空,则用默认的分支
    if (selectedTasks == null || selectedTasks.isEmpty()) {
      selectedTasks = taskToSchedule.getDefaultCase();
    }
    if (selectedTasks != null && !selectedTasks.isEmpty()) {
      // 获取分支的第一个(下标0)task,返回给decider service去做调度(decider会把任务添加到队列里,交给worker去执行)
      WorkflowTask selectedTask = selectedTasks.get(0);
      // 调用了deciderService的getTasksToBeScheduled方法,此方法里又获取到TaskMapper调用了getMappedTasks。这里采用了递归调用的方式,解析嵌套的Task
      List<Task> caseTasks = taskMapperContext.getDeciderService()
        .getTasksToBeScheduled(workflowInstance, selectedTask, retryCount, taskMapperContext.getRetryTaskId());
      tasksToBeScheduled.addAll(caseTasks);
      switchTask.getInputData().put("hasChildren", "true");
    }
    return tasksToBeScheduled;

    3)并行(FORK)的实现机制

    FORK用于开启多个并行分支。

    实际上,该节点的Task不做任何操作,TaskMapper返回所有并行分支的第一个Task。
    ForkJoinTaskMapper.java getMappedTasks关键代码:

    // 待调度的Task list,最终返回结果
    List<Task> tasksToBeScheduled = new LinkedList<>();
    // 配置中的所有fork分支
    List<List<WorkflowTask>> forkTasks = taskToSchedule.getForkTasks();
    for (List<WorkflowTask> wfts : forkTasks) {
      // 每个分支取第一个Task
      WorkflowTask wft = wfts.get(0);
      // 调用了deciderService的getTasksToBeScheduled方法,此方法里又获取到TaskMapper调用了getMappedTasks。这里采用了递归调用的方式,解析嵌套的Task
      List<Task> tasks2 = taskMapperContext.getDeciderService()
        .getTasksToBeScheduled(workflowInstance, wft, retryCount);
      tasksToBeScheduled.addAll(tasks2);
    }
    return tasksToBeScheduled;

    总的来说,分支(SWITCH)、并行(FORK)节点本身没有执行逻辑,其通过TaskMapper返回到实际要执行的Task,然后交给Decider Service处理。

    重试的实现机制

    重试和其延迟时间设置,都是借助任务队列的功能实现的。

    重试:将任务重新添加到任务队列

    重试的延迟时间:添加到任务队列时设置延迟时间,延迟时间过后,任务才能在队列中被poll出来执行

    五 完整性保障机制

    由于调度过程中可能会出现因机器重启、网络异常、JVM崩溃等偶发情况,这些会导致的decide过程意外终止,流程执行不完整,展现出如流程一直运行中(实际已经没有在调度),或者其它状态错误等异常现象。

    1 WorkflowReconciler

    针对这种情况,conductor有一个WorkflowReconciler,会定期尝试decide所有正在运行中的流程,修复流程执行的一致性。此外,它还有一个作用是校验流程超时时间。

    2 decideQueue

    那么WorkflowReconciler是如何获取到当前运行中的流程呢,答案是decideQueue。
    decideQueue和任务队列相同,也是一个具有延迟功能的队列,其存放的是正在执行中的流程的实例id。在任务开始执行时(包括新启动执行、重试执行、恢复执行、重跑执行等),会将实例id push到decideQueue中;在执行结束(成功、失败)时,会从decideQueue中删除实例id。

    3 ExecutionLockService

    WorkflowReconciler会定期尝试decide所有正在运行中的流程用于超时判断、维护流程一致性。但是流程本身正常执行也会触发decide,如果同一个执行同时触发两个decide,可能会导致状态混乱,执行卡住等问题。

    conductor采用了锁来解决这个问题,其提供了单机LocalOnlyLock(基于信号量实现)、redis分布式锁(基于redission实现)、zookeeper分布式锁三种实现。

    decide方法中最开始会尝试获取锁,如果获取失败则直接返回。通过锁来保障不会对同一个流程实例并发执行decide。

    if (!executionLockService.acquireLock(workflowId)) {
      return false;
    }

    由于锁是可配置的,可能会导致一个误区:单台机器的话不用配置锁。其实单机也是需要配置锁的,因为WorkflowReconciler和流程正常执行会产生冲突,可能会导致偶发的流程状态混乱问题。

    原文链接
    本文为阿里云原创内容,未经允许不得转载。 

    展开全文
  • 本文分析的webrtc的版本是:m84 平台:win10 WebRTC PeerConnection Client源码分析<1>-main window WebRTC PeerConnection Client源码分析<2>-PeerConnectionClient ...Conductor是Pee

    本文分析的webrtc的版本是:m84 平台:win10

    WebRTC PeerConnection Client源码分析1-main window

    WebRTC PeerConnection Client源码分析2-PeerConnectionClient

    WebRTC PeerConnection Client源码分析3-Conductor

    注:下文中所谓的回调函数,实际上是虚函数。把虚函数说成回调函数是为了描述方便。

    Conductor是PeerConnection Client的核心,现在按照Conductor实际调用顺序逐一分析。

    上一篇文章中我已经给一张PeerConnection Client与信令服务器交互的时序图,现在给出一张PeerConnection的时序图:

    在这里插入图片描述

    这张时序图引自:https://blog.csdn.net/ice_ly000/article/details/103204327

    1、创建Conductor对象

    int PASCAL wWinMain(HINSTANCE instance, HINSTANCE prev_instance, wchar_t* cmd_line,int cmd_show) 
    {
    	...
      /*创建Conductor对象*/      
      rtc::scoped_refptr<Conductor> conductor(new rtc::RefCountedObject<Conductor>(&client, &wnd));
    
        ...
    }
    
    Conductor::Conductor(PeerConnectionClient* client, MainWindow* main_wnd)
        : peer_id_(-1), loopback_(false), client_(client), main_wnd_(main_wnd) 
    {
      client_->RegisterObserver(this);        /*注册成为PeerConnectionClient的订阅者*/
      main_wnd->RegisterObserver(this);       /*注册成为MainWindow的订阅者*/
    }
    
    void PeerConnectionClient::RegisterObserver(PeerConnectionClientObserver* callback) 
    {
      RTC_DCHECK(!callback_);
      callback_ = callback;     /*保存订阅者*/
    }
    
    void MainWnd::RegisterObserver(MainWndCallback* callback) 
    {
      callback_ = callback;     /*保存订阅者*/
    }
    

    Conductor在构造器注册到PeerConnectionClient和MainWnd中。

    struct PeerConnectionClientObserver {
      /*成功登录信令服务器*/
      virtual void OnSignedIn() = 0;    
      /*登出信令服务器*/
      virtual void OnDisconnected() = 0;   
      /*其他客户端登录信令服务器*/  
      virtual void OnPeerConnected(int id, const std::string& name) = 0;  
      /*其他客户端登出信令服务器*/
      virtual void OnPeerDisconnected(int peer_id) = 0;    
      /*接收到其他客户端发送过来的信息*/
      virtual void OnMessageFromPeer(int peer_id, const std::string& message) = 0;  
      /*通知信息已发送*/
      virtual void OnMessageSent(int err) = 0;
      /*登录信令服务器失败*/
      virtual void OnServerConnectionFailure() = 0;
    
     protected:
      virtual ~PeerConnectionClientObserver() {}
    };
    

    Conductor通过覆写上面的虚函数,用于接收PeerConnectionClient中的消息。

    class MainWndCallback {
     public:
      /*通知登录信令服务器*/
      virtual void StartLogin(const std::string& server, int port) = 0;   
      /*通知登出信令服务器*/
      virtual void DisconnectFromServer() = 0;
      /*连接对端peer*/
      virtual void ConnectToPeer(int peer_id) = 0;
      /*与对端断开连接*/
      virtual void DisconnectFromCurrentPeer() = 0;
      /*自定义消息处理函数*/
      virtual void UIThreadCallback(int msg_id, void* data) = 0;
      /*关闭Conductor*/
      virtual void Close() = 0;
    
     protected:
      virtual ~MainWndCallback() {}
    };
    

    Conductor通过覆写上面的虚函数,用于接收MainWnd中的消息。

    2、主动端主动开启通话

    2.1、用户点击对端id

    登录信令服务器后,会进入peer list界面。如下图:

    image-20210927024714162

    通话双方进入peer list界面后,为了后续方便描述,后面把发起通话的一端称为主动peer,把被动发起通话的一端称为被动peer

    主动peerpeer list界面点击对方用户peer id后,会触发如下逻辑:

    void MainWnd::OnDefaultAction() 
    {
    ...
        
      else if (ui_ == LIST_PEERS) {       /*peer list界面peer id被双击*/
    	...
          if (peer_id != -1 && callback_) {
            /*回调Conductor::ConnectToPeer,传入要连接peer的id。*/
            callback_->ConnectToPeer(peer_id);    
          }
        }
      } 
    ...
    }
    

    双击用户的peer id后,通过消息处理循环函数进入上述函数,接着通过回调进入Conductor::ConnectToPeer()

    void Conductor::ConnectToPeer(int peer_id) {
      RTC_DCHECK(peer_id_ == -1);
      RTC_DCHECK(peer_id != -1);
    
      /*peer_connection_必须为空,返回返回。*/
      if (peer_connection_.get()) {
        main_wnd_->MessageBox(
            "Error", "We only support connecting to one peer at a time", true);
        return;
      }
    
      /*初始化PeerConnection*/
      if (InitializePeerConnection()) {
        peer_id_ = peer_id;
        /*生成offer*/
        peer_connection_->CreateOffer(
            this, webrtc::PeerConnectionInterface::RTCOfferAnswerOptions());
      } else {
        main_wnd_->MessageBox("Error", "Failed to initialize PeerConnection", true);
      }
    }
    

    后面的分析很多都会涉及到webrtc::PeerConnection,要好好配合文章开头给的时序以帮助理解。

    bool Conductor::InitializePeerConnection() 
    {
      RTC_DCHECK(!peer_connection_factory_);
      RTC_DCHECK(!peer_connection_);
    
      /*1、创建PeerConnectionFactory对象*/
      peer_connection_factory_ = webrtc::CreatePeerConnectionFactory(
          nullptr /* network_thread */, nullptr /* worker_thread */,
          nullptr /* signaling_thread */, nullptr /* default_adm */,
          webrtc::CreateBuiltinAudioEncoderFactory(),
          webrtc::CreateBuiltinAudioDecoderFactory(),
          webrtc::CreateBuiltinVideoEncoderFactory(),
          webrtc::CreateBuiltinVideoDecoderFactory(), nullptr /* audio_mixer */,
          nullptr /* audio_processing */);
    
      if (!peer_connection_factory_) {
        main_wnd_->MessageBox("Error", "Failed to initialize PeerConnectionFactory",
                              true);
        DeletePeerConnection();
        return false;
      }
    
      /*2、创建PeerConnection对象*/
      if (!CreatePeerConnection(/*dtls=*/true)) {
        main_wnd_->MessageBox("Error", "CreatePeerConnection failed", true);
        DeletePeerConnection();
      }
    
      /*3、添加音视频源*/
      AddTracks();
    
      return peer_connection_ != nullptr;
    }
    

    先创建PeerConnectionFactory对象,然后通过该对象创建PeerConnection对象,最后添加音频、视频源。

    bool Conductor::CreatePeerConnection(bool dtls) 
    {
      RTC_DCHECK(peer_connection_factory_);
      RTC_DCHECK(!peer_connection_);
    
      webrtc::PeerConnectionInterface::RTCConfiguration config;
      /*sdp的格式*/
      config.sdp_semantics = webrtc::SdpSemantics::kUnifiedPlan;
      /*是否使用dtls*/
      config.enable_dtls_srtp = dtls;
      /*配置stun/turn服务器*/
      webrtc::PeerConnectionInterface::IceServer server;
      server.uri = GetPeerConnectionString();
      config.servers.push_back(server);
      
      /*创建PeerConnection对象*/
      peer_connection_ = peer_connection_factory_->CreatePeerConnection(config, nullptr, nullptr, this);
    
      return peer_connection_ != nullptr;
    }
    

    创建PeerConnection对象

    void Conductor::AddTracks() {
      if (!peer_connection_->GetSenders().empty()) {
        return;  // Already added tracks.
      }
    
      /*创建audio track*/
      rtc::scoped_refptr<webrtc::AudioTrackInterface> audio_track(peer_connection_factory_->CreateAudioTrack(kAudioLabel, peer_connection_factory_->CreateAudioSource(cricket::AudioOptions())));
      /*添加audio track*/
      auto result_or_error = peer_connection_->AddTrack(audio_track, {kStreamId});
      if (!result_or_error.ok()) {
        RTC_LOG(LS_ERROR) << "Failed to add audio track to PeerConnection: " << result_or_error.error().message();
      }
      
      /*创建video device*/
      rtc::scoped_refptr<CapturerTrackSource> video_device = CapturerTrackSource::Create();
      if (video_device) {
        /*创建video track*/
        rtc::scoped_refptr<webrtc::VideoTrackInterface> video_track_(peer_connection_factory_->CreateVideoTrack(kVideoLabel, video_device));
        /*将video track送至本地视频渲染器,用于本地视频的渲染。*/
        main_wnd_->StartLocalRenderer(video_track_);
    
        /*添加video track*/
        result_or_error = peer_connection_->AddTrack(video_track_, {kStreamId});
        if (!result_or_error.ok()) {
          RTC_LOG(LS_ERROR) << "Failed to add video track to PeerConnection: "
                            << result_or_error.error().message();
        }
      } else {
        RTC_LOG(LS_ERROR) << "OpenVideoCaptureDevice failed";
      }
      
      /*将界面从peer list界面切换至视频显示界面*/
      main_wnd_->SwitchToStreamingUI();
    }
    

    创建audio track、video track,并添加到PeerConnection中。本地视频需要通过视频渲染器显示出来。

    2.2、创建offer

    InitializePeerConnection()函数完成后,会调用CreateOffer()生成offer,此函数生成offer是异步的,生成的offer会通过回调函数Conductor::OnSuccess()传回。

    /*offer生成成功后的回调*/
    void Conductor::OnSuccess(webrtc::SessionDescriptionInterface* desc) 
    {
      /*生成的offer需要通过SetLocalDescription设置到PeerConnection中*/
      peer_connection_->SetLocalDescription(DummySetSessionDescriptionObserver::Create(), desc);
    
      std::string sdp;
      desc->ToString(&sdp);   /*将offer转成string*/
    
    ...
        
      /*将offer包装到json中*/
      Json::StyledWriter writer;
      Json::Value jmessage;
      jmessage[kSessionDescriptionTypeName] = webrtc::SdpTypeToString(desc->GetType());
      jmessage[kSessionDescriptionSdpName] = sdp;
        
      /*发送至信令服务器*/
      SendMessage(writer.write(jmessage));
    }
    
    /*offer生成失败后的回调*/
    void Conductor::OnFailure(webrtc::RTCError error) {
      RTC_LOG(LERROR) << ToString(error.type()) << ": " << error.message();
    }
    

    offer的生成是异步的,成功的生成offer或生成失败会调用对应的回调函数(虚函数)。

    class DummySetSessionDescriptionObserver : public webrtc::SetSessionDescriptionObserver 
    {
     public:
      static DummySetSessionDescriptionObserver* Create() {
        return new rtc::RefCountedObject<DummySetSessionDescriptionObserver>();
      }
      virtual void OnSuccess() { RTC_LOG(INFO) << __FUNCTION__; }
      virtual void OnFailure(webrtc::RTCError error) {
        RTC_LOG(INFO) << __FUNCTION__ << "  " << ToString(error.type()) << ": " << error.message();
      }
    };
    

    在调用SetLocalDescriptionSetRemoteDescription时,其处理的结果是通过回调函数告知的。当成功时,会调用OnSuccess函数,失败时会调用OnFailure函数。

    传入SetLocalDescriptionSetRemoteDescription中用于反馈结果的是一个类对象,该类对象需要继承webrtc::SetSessionDescriptionObserver接口,同时覆写OnSuccess和OnFailure虚函数,这样就可形成多态,将处理的结果通过覆写的虚函数回调上来以供用户处理。

    void Conductor::SendMessage(const std::string& json_object) {
      std::string* msg = new std::string(json_object);
      /*投递的消息类型是SEND_MESSAGE_TO_PEER*/
      main_wnd_->QueueUIThreadCallback(SEND_MESSAGE_TO_PEER, msg);
    }
    

    通过回调返回的offer,并没有直接向信令服务器发送。信令的发送需要在主线程中进行,此时还在webrtc的内部线程中,所以通过上面的函数将待发送的offer信令投递给了主线程,主线程在处理这个消息时,会将对应的信令发送给信令服务器。

    /*自定义的消息类型*/
    enum WindowMessages {
      UI_THREAD_CALLBACK = WM_APP + 1,
    };
    
    /*向Windows Application消息队列投递自定义的消息*/
    void MainWnd::QueueUIThreadCallback(int msg_id, void* data) 
    {
      /*向消息队列投递自己定义的消息*/
      ::PostThreadMessage(ui_thread_id_, UI_THREAD_CALLBACK, static_cast<WPARAM>(msg_id), reinterpret_cast<LPARAM>(data));
    }
    
    bool MainWnd::PreTranslateMessage(MSG* msg)
    {
    ...
      /*处理自定义类型的消息*/
      } else if (msg->hwnd == NULL && msg->message == UI_THREAD_CALLBACK) {
        callback_->UIThreadCallback(static_cast<int>(msg->wParam), reinterpret_cast<void*>(msg->lParam));   /*进入Conductor处理自己发出的消息*/
        ret = true;
      }
    ...
    }
    

    在MainWnd中自定义了消息类型UI_THREAD_CALLBACK,通过MainWnd::QueueUIThreadCallback函数可以向Windows Application消息队列投递自定义的消息。在MainWnd::PreTranslateMessage中处理投递的消息。

    void Conductor::UIThreadCallback(int msg_id, void* data) 
    {
      switch (msg_id) {
        case PEER_CONNECTION_CLOSED:
        case SEND_MESSAGE_TO_PEER: {
        case NEW_TRACK_ADDED: {
        case TRACK_REMOVED: {
        default:
          RTC_NOTREACHED();
          break;
      }
    }
    

    通过MainWnd::QueueUIThreadCallback函数投递的消息,最终会在Conductor::UIThreadCallback得到处理。

    void Conductor::UIThreadCallback(int msg_id, void* data) 
    {
      switch (msg_id) {
    ...
        case SEND_MESSAGE_TO_PEER: {
          RTC_LOG(INFO) << "SEND_MESSAGE_TO_PEER";
          /*获取消息*/
          std::string* msg = reinterpret_cast<std::string*>(data);   
          if (msg) {
            pending_messages_.push_back(msg);  /*将消息存放到队列中,保证消息有序发送。*/
          }
    
          /*如果有消息待发送,且PeerConnectionClient可以发送信令。*/
          if (!pending_messages_.empty() && !client_->IsSendingMessage()) {
            msg = pending_messages_.front();    /*从队首获取一条待发送的消息*/
            pending_messages_.pop_front();
    
            /*通过PeerConnectionClient放该消息*/
            if (!client_->SendToPeer(peer_id_, *msg) && peer_id_ != -1) {
              RTC_LOG(LS_ERROR) << "SendToPeer failed";
              DisconnectFromServer();
            }
            delete msg;
          }
    
          if (!peer_connection_.get())
            peer_id_ = -1;
    
          break;
        }
    ...
    }
    

    上述在发送offer信令时,会通过上述代码发送给信令服务器。

    3、被动端被动开启视频通话

    image-20210927024714162

    被动peer登录信令服务器后,处于peer listen界面,如上图。

    被动peer收到信令服务器发送过来的offer信令后,其逻辑如下:

    void PeerConnectionClient::OnHangingGetRead(rtc::AsyncSocket* socket) 
    {
    ...
          } else {
            /*用于处理offer、answer、candidate、bye信令*/
            OnMessageFromPeer(static_cast<int>(peer_id), notification_data_.substr(pos));
          }
    ...
    }
    

    当信令服务器发送过来offer信令时,PeerConnectionClient::OnHangingGetRead函数被触发。

    void PeerConnectionClient::OnMessageFromPeer(int peer_id, const std::string& message)
    {
    ...
      } else {
        /*收到的是offer、answer、candidate信令*/
        callback_->OnMessageFromPeer(peer_id, message);
      }
    }
    

    在PeerConnectionClient对象中,通过OnMessageFromPeer回调函数,将信令送至了Conductor::OnMessageFromPeer函数中。

    void Conductor::OnMessageFromPeer(int peer_id, const std::string& message) 
    {
      RTC_DCHECK(peer_id_ == peer_id || peer_id_ == -1);
      RTC_DCHECK(!message.empty());
    
      /*此时被动peer还没有创建PeerConnection对象*/
      if (!peer_connection_.get()) {
        RTC_DCHECK(peer_id_ == -1);
        peer_id_ = peer_id;
        
        /*创建PeerConnection对象*/
        if (!InitializePeerConnection()) {
          RTC_LOG(LS_ERROR) << "Failed to initialize our PeerConnection instance";
          client_->SignOut();
          return;
        }
      } else if (peer_id != peer_id_) {
        RTC_DCHECK(peer_id_ != -1);
        RTC_LOG(WARNING) << "Received a message from unknown peer while already in a "
                            "conversation with a different peer.";
        return;
      }
    
      Json::Reader reader;
      Json::Value jmessage;
      /*将收到的消息解析成json对象*/
      if (!reader.parse(message, jmessage)) {     
        RTC_LOG(WARNING) << "Received unknown message. " << message;
        return;
      }
      std::string type_str;
      std::string json_object;
      
      /*从json消息中解析出消息的类型*/
      rtc::GetStringFromJsonObject(jmessage, kSessionDescriptionTypeName, &type_str);
      if (!type_str.empty()) {
        ...
        /*获取消息的类型*/
        absl::optional<webrtc::SdpType> type_maybe = webrtc::SdpTypeFromString(type_str);
    
        if (!type_maybe) {
          RTC_LOG(LS_ERROR) << "Unknown SDP type: " << type_str;
          return;
        }
        webrtc::SdpType type = *type_maybe;
        std::string sdp;
        
        /*从json消息中获取sdp,此处为offer。*/
        if (!rtc::GetStringFromJsonObject(jmessage, kSessionDescriptionSdpName, &sdp)){
          RTC_LOG(WARNING) << "Can't parse received session description message.";
          return;
        }
        
        /*将offer转成webrtc可以理解的对象*/
        webrtc::SdpParseError error;
        std::unique_ptr<webrtc::SessionDescriptionInterface> session_description =
            webrtc::CreateSessionDescription(type, sdp, &error);
        if (!session_description) {
          RTC_LOG(WARNING) << "Can't parse received session description message. "
                              "SdpParseError was: " << error.description;
          return;
        }
          
        RTC_LOG(INFO) << " Received session description :" << message;
          
        /*将offer通过SetRemoteDescription设置到PeerConnection中*/
        peer_connection_->SetRemoteDescription(DummySetSessionDescriptionObserver::Create(),session_description.release());
          
        /*收到了对端的offer,本端需要产生answer。*/
        if (type == webrtc::SdpType::kOffer) {
          peer_connection_->CreateAnswer(
              this, webrtc::PeerConnectionInterface::RTCOfferAnswerOptions());
        }
      } else {
    ...
      }
    }
    

    被动peer在收到offer时,还没有创建PeerConnection对象,所以先创建PeerConnection对象,过程同主动peer

    将收到的消息解析成json,然后从json中获取typesdp对应的值,通过SetRemoteDescription设置offer,然后通过CreateAnswer生成answer,注意answer的生成也是异步的,生成后也会通过Conductor::OnSuccess回调函数传递上来,其处理方式同offer,且生成的answer会通过信令服务器发送给对端。

    offer信令的格式如下:

    {
       "sdp" : "v=0\r\no=- 7038993275920826226 ...",
       "type" : "offer"
    }
    

    4、处理candidate

    生成offer和answer后,WebRTC会通过Conductor::OnIceCandidate()函数上传生成的candidate。

    void Conductor::OnIceCandidate(const webrtc::IceCandidateInterface* candidate) {
      RTC_LOG(INFO) << __FUNCTION__ << " " << candidate->sdp_mline_index();
    
      Json::StyledWriter writer;
      Json::Value jmessage;
    
      /*将candidate包装到json中*/
      jmessage[kCandidateSdpMidName] = candidate->sdp_mid();
      jmessage[kCandidateSdpMlineIndexName] = candidate->sdp_mline_index();
      std::string sdp;
      if (!candidate->ToString(&sdp)) {
        RTC_LOG(LS_ERROR) << "Failed to serialize candidate";
        return;
      }
      jmessage[kCandidateSdpName] = sdp;
      
      /*通过信令服务器发送给对端*/
      SendMessage(writer.write(jmessage));
    }
    

    生成的candidate需要通过信令服务器发送给对端。

    candidate的信令格式如下:

    {
       "candidate" : "candidate:2999745851 1 udp 2122260223 192.168.56.1 58572 typ host generation 0 ufrag Cy2E network-id 3",
       "sdpMLineIndex" : 1,
       "sdpMid" : "1"
    }
    
    void Conductor::OnMessageFromPeer(int peer_id, const std::string& message) 
    {
    ...
        
      if (!type_str.empty()) {
    ...
      } else {    /*处理收到的candidate*/
        std::string sdp_mid;
        int sdp_mlineindex = 0;
        std::string sdp;
          
        /*从json中解析处需要的数据*/
        if (!rtc::GetStringFromJsonObject(jmessage, kCandidateSdpMidName, &sdp_mid) ||
            !rtc::GetIntFromJsonObject(jmessage, kCandidateSdpMlineIndexName,
                                       &sdp_mlineindex) ||
            !rtc::GetStringFromJsonObject(jmessage, kCandidateSdpName, &sdp)) {
          RTC_LOG(WARNING) << "Can't parse received message.";
          return;
        }
          
        webrtc::SdpParseError error;
        /*根据接收的信息生成candidate对象*/
        std::unique_ptr<webrtc::IceCandidateInterface> candidate(
            webrtc::CreateIceCandidate(sdp_mid, sdp_mlineindex, sdp, &error));
        if (!candidate.get()) {
          RTC_LOG(WARNING) << "Can't parse received candidate message. "
                              "SdpParseError was: " << error.description;
          return;
        }
          
        /*给PeerConnection对象添加candidate*/
        if (!peer_connection_->AddIceCandidate(candidate.get())) {
          RTC_LOG(WARNING) << "Failed to apply the received candidate";
          return;
        }
        RTC_LOG(INFO) << " Received candidate :" << message;
      }
    }
    

    将收到的candidate通过AddIceCandidate函数添加到PeerConnection对象中。

    5、收到远端视频流

    void Conductor::OnAddTrack(
        rtc::scoped_refptr<webrtc::RtpReceiverInterface> receiver,
        const std::vector<rtc::scoped_refptr<webrtc::MediaStreamInterface>>&
            streams) {
      RTC_LOG(INFO) << __FUNCTION__ << " " << receiver->id();
      /*向主线程发送自定义的消息*/
      main_wnd_->QueueUIThreadCallback(NEW_TRACK_ADDED, receiver->track().release());   
    }
    

    经过上面的步骤后,建立了音视频的连接,当收到远端的视频流时,会通过OnAddTrack()通知用户。

    void Conductor::UIThreadCallback(int msg_id, void* data) 
    {
    ...
        case NEW_TRACK_ADDED: {
          auto* track = reinterpret_cast<webrtc::MediaStreamTrackInterface*>(data);
          if (track->kind() == webrtc::MediaStreamTrackInterface::kVideoKind) {
            /*获取远端video track*/
            auto* video_track = static_cast<webrtc::VideoTrackInterface*>(track);
            
            /*送至MainWnd处理*/
            main_wnd_->StartRemoteRenderer(video_track);
          }
          track->Release();
          break;
        }
    ...
    }
    
    void MainWnd::StartRemoteRenderer(webrtc::VideoTrackInterface* remote_video) 
    {
      /*生成远端视频渲染器,同时将远端视频渲染器注册到webrtc中。*/
      remote_renderer_.reset(new VideoRenderer(handle(), 1, 1, remote_video));
    }
    

    在这个函数中会将MainWnd中的remote_renderer_添加到WebRTC中,用于接收并渲染远端视频帧。

    void Conductor::OnRemoveTrack(
        rtc::scoped_refptr<webrtc::RtpReceiverInterface> receiver) {
      RTC_LOG(INFO) << __FUNCTION__ << " " << receiver->id();
      main_wnd_->QueueUIThreadCallback(TRACK_REMOVED, receiver->track().release());
    }
    
    void Conductor::UIThreadCallback(int msg_id, void* data) 
    {
    ...
        case TRACK_REMOVED: {
          auto* track = reinterpret_cast<webrtc::MediaStreamTrackInterface*>(data);
          track->Release();
          break;
        }
    ...
    }
    

    远端视频流移除的逻辑

    6、远端peer关闭

    当远端关闭时,会想信令服务器发送一条sign out信令,信令服务器会转发该信令。本端收到信令后的处理逻辑如下:

    void PeerConnectionClient::OnHangingGetRead(rtc::AsyncSocket* socket) 
    {
    ...
              if (connected) {        
                peers_[id] = name;
                callback_->OnPeerConnected(id, name);
              } else {                /*收到远端peer关闭的信令。*/
                peers_.erase(id);
                callback_->OnPeerDisconnected(id);   /*通知Conductor远端关闭*/
              }
    ...
    }
    
    void Conductor::OnPeerDisconnected(int id) 
    {
      RTC_LOG(INFO) << __FUNCTION__;
      if (id == peer_id_) {
        RTC_LOG(INFO) << "Our peer disconnected";
        /*通知远端关闭*/
        main_wnd_->QueueUIThreadCallback(PEER_CONNECTION_CLOSED, NULL);
      } else {
        /*刷新peer list界面的用户列表*/
        if (main_wnd_->current_ui() == MainWindow::LIST_PEERS)
          main_wnd_->SwitchToPeerList(client_->peers());
      }
    }
    
    void Conductor::UIThreadCallback(int msg_id, void* data) 
    {
      switch (msg_id) {
        case PEER_CONNECTION_CLOSED:
          RTC_LOG(INFO) << "PEER_CONNECTION_CLOSED";
          DeletePeerConnection();    /*释放PeerConnection对象*/
    
          if (main_wnd_->IsWindow()) {       /*如果在视频渲染界面*/
            if (client_->is_connected()) {   /*如果处于连接状态,则将界面切换至peer list界面*/
              main_wnd_->SwitchToPeerList(client_->peers());
            } else {
              main_wnd_->SwitchToConnectUI();  /*将界面切换至connect界面*/
            }
          } else {
            DisconnectFromServer();       
          }
          break;
    ...
      }
    }
    

    7、本端关闭程序

    void Conductor::Close() 
    {
      client_->SignOut();      /*向信令服务器发送sign out信令*/
      DeletePeerConnection();  /*释放PeerConnection对象*/
    }
    

    关闭本端程序,首先需要向信令服务器发送sign out信令,然后销毁PeerConnection对象。

    展开全文
  • 要将无驱动程序AI与IBM Spectrum Conductor一起使用,您需要在计划运行无驱动程序AI的集群的每个节点上下载并安装无驱动程序AI。 首先下载TAR SH版本以在每个主机上安装: : 在每台主机上安装无驱动程序AI之后,...
  • conductor 的wait按官方说法可以通过消息队列及conductor api触发: 1、没有aws队列环境,没有测试队列触发模式 2、测试过程没有找到对应rest,/api/queue/update/{workflowId}/{taskRefName}/{status} 一直报告 { ...
  • conductor server配置项目

    2021-10-10 11:59:41
    conductor server配置项目说明是分散在各个additional-spring-configuration-metadata.json 及类ConductorProperties 中。 可以使用 find . -name additional*.json |grep -v build|xargs cat >>all-...
  • Conductor - 基于docker 任务运行器 基本上,这是一个便携式PHP环境,可以运行robo在任何地方docker可以运行。 在您的出色项目在他们的本地工作站上运行之前,要求您的开发人员同事安装包X和包Y ,哦和模块Z的日子...
  • Print Conductor可以批量打印所有流行的文档:PDF文件,Word和Excel文件,各种办公文件,技术图纸,法律文件,协议,演示文稿,文本文件,电子邮件,图表,电子表格,发票,图像和许多其他类型文件。Print Conductor...
  • conductor client 比较简单,包括以下两个子项目 1、client 2、client-spring client-spring是spring boot实现。 从以下spring boot核心类可以看到做什么 public class ConductorClientAutoConfiguration { @...
  • Print Conductor批量打印工具,Print Conductor|Crack版  Print Conductor是一种简单的批量打印工具。如果您必须定期打开并打印大量文件,则此方便的工具可以帮助您立即打印文件。当您将打印文件打印到程序时,...
  • 翻译自https://medium.com/netflix-techblog/netflix-conductor-a-microservices-orchestrator-2e8d4771bf40
  • Netflix Conductor框架是典型的服务编排框架,通过Conductor还可以实现工作流和分布式调度,性能非常卓越。 关于Conductor的基本概念在 https://netflix.github.io/conductor/intro/ 文中已经有深入介绍,本篇将以...
  • Build file '/Users/songzj/Downloads/sourceCode/conductor-3.0.1/build.gradle' line: 18 An exception occurred applying plugin request [id: 'nebula.netflixoss', version: '9.2.2'] Failed to
  • Netflix/conductor入门 Workflow 工作流是您的流程流的容器。它可以包括几种不同类型的任务,子工作流,相互连接的输入和输出,以有效地获得所需的结果。 Workflow 定义 工作流是使用基于JSON的DSL定义的,包括作为...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 7,057
精华内容 2,822
关键字:

conductor

友情链接: M16-RF905.zip