-
2021-09-22 21:52:23更多相关内容
-
conductor:轻松混合同步和异步代码
2021-04-26 10:16:41Conductor是一个现代实用程序库,可帮助您使用函数式编程控制执行流程。 :person_in_tuxedo: 描述 它提供了一组实用程序功能,可与异步和同步代码一起使用,从而使您可以用最少的代码非常清楚地控制执行流。 该库... -
openstack-nova-conductor
2018-12-14 20:05:16openstack云计算的rpm包 -
文档批量打印工具 Print Conductor 7.0.2003.16190 中文多语免费版.zip
2021-05-25 14:46:21Print Conductor 中文版是一款用于自动打印的智能软件解决方案。 批量打印 PDF 文件,办公文档,技术图纸,法律文档,协议,演示文稿,文本文件,带附件的电子邮件,图表,电子表格,发票,图像和许多其他类型的文件... -
服务编排conductor介绍.docx
2020-02-21 19:28:10本文是对Conductor微服务编排工具的使用说明,对初学者有很好的启蒙作用。Conductor是使用广泛的无微服务编排工具,具有良好的微服务编排功能和轻量化流程管理控制能力,在微服务架构中使用广泛。 -
Android代码-Conductor
2019-08-06 05:42:37Conductor A small, yet full-featured framework that allows building View-based Android applications. Conductor provides a light-weight wrapper around standard Android Views that does just about ... -
conductor
2019-11-27 20:34:29这个https://github.com/Netflix/conductor是项目的github地址, 选择SSH或者HTTPs检出,git clonegit@github.com:Netflix/conductor.git(执行git命令需要先安装git)至任意位置。 二、启动本地服务 进入项目...一、首先从git上检出项目
这个https://github.com/Netflix/conductor是项目的github地址,
选择SSH或者HTTPs检出,git clone git@github.com:Netflix/conductor.git(执行git命令需要先安装git)至任意位置。
二、启动本地服务
进入项目目录下,右键git bash here,执行命令cd server进入server目录,再执行../gradlew server,然后等待项目构建成功即可。注意:此处JDK版本必须为1.8否则会报错:无效的源发行版: 1.8。启动成功后会创建一个kitchen workflow样例:
此时可以打开http://localhost:8080/查看Swagger APIs。
重新打开一个git命令窗口,执行cd ui,进入ui目录,执行gulp watch等待构建成功即可。注意:需要执行gulp命令,首先需要安装node.js,并且由于框架使用了import,node.js版本必须使用8.0以上,node装好后再安装glup。顺序应该为:安装node.js->全局安装gulp->项目安装gulp。
成功后可打开http://localhost:3000查看任务相关,如下图:
运行时模型:
Conductor 遵循RPC的通讯模型,其中workers运行在与 服务器分离的应用机器上,并通过HTTP协议或者是轮询的方式来管理work队列。
1.Workers作为远程系统,通过HTTP协议(或者任意支持的RPC机制)同Conductor 服务进行通信。
2.任务队列用于为workers安排任务,Conductor使用内部的dyno-queues 作为任务队列,当然也可以使用其他SQS(简单队列)或者pub-sub机制的队列作为代替。
3.conductor-redis-persistence模块使用Dynomite作为存储引擎,以及Elasticsearch作为索引的解决方案。
4.对于存储和索引支持不同数据库。
一个新的workflow的注册和执行需要以下步骤:
1.定义这个workflow需要的task
2.创建workflow。
3.创建消费这些任务的workers并通过轮询的方式来获取任务。
4.触发工作流执行:
POST /workflow/{name}{
... //json payload as workflow input
}
5.轮询获取任务
GET /tasks/poll/batch/{taskType}
6.更新任务状态
POST /tasks{
"outputData": {
"encodeResult":"success",
"location": "http://cdn.example.com/file/location.png"
//any task specific output
},
"status": "COMPLETED"
}
-
Node.js-Conductor一个微服务编排引擎
2019-08-10 03:53:02Conductor一个微服务编排引擎 -
conductor.js
2021-05-31 10:58:06Conductor.js 是一个用于创建沙盒化、可重复使用的应用程序的库,这些应用程序可以嵌入到主机应用程序中。 与标准<iframe>相比,使用 Conductor.js 的优势在于它使用了一组明确定义的事件,允许应用程序与其... -
print conductor
2018-01-21 09:19:16Print Conductor破解版下载 批量打印工具 汉化版本 一键打印 -
分布式系统测试框架Conductor.zip
2019-07-18 22:03:55Conductor 是一个分布式系统的测试框架。很多测试框架只能测试单机的代码,而 Conductor 是一个分布式系统测试框架,使用 Python 开发,可用于协调一组测试服务。Conductor 系统允许通过单机来控制很多系统进行协调... -
conductor-server-2.25.5-all.jar1
2020-03-10 17:05:50netflix conductor,用于服务编排,功能比较全,还是纯开源; -
conductor, selenium 框架,将你带到你想去的地方.zip
2019-09-18 06:37:46conductor, selenium 框架,将你带到你想去的地方 导体查看站点名称。 正在启动使用 Maven,将它的包括为一个依赖项:<dependency> <groupId>io.ddavison</groupId> <art -
Conductor在同一个地方跨组件组织各种动画
2019-08-10 04:30:25Conductor 在同一个地方跨组件组织各种动画 -
Conductor使用Electron开发的用户界面创作和PHP依赖管理工具
2019-08-08 07:32:31Conductor:使用Electron开发的用户界面创作和PHP依赖管理工具 -
Print Conductor多文档批量打印v6.2注册版
2018-12-01 12:07:47无须事先打开文件,使用Print Conductor同时自动打印不同格式的文件。支持超过75种文件格式。兼容所有打印机。 -
开源微服务编排框架:Netflix Conductor
2021-12-07 10:48:55简介:本文主要介绍netflix conductor的基本概念和主要运行机制。 作者 | 夜阳 来源 | 阿里技术公众号 本文主要介绍netflix conductor的基本概念和主要运行机制。 一 简介 netflix conductor是基于JAVA语言...简介:本文主要介绍netflix conductor的基本概念和主要运行机制。
作者 | 夜阳
来源 | 阿里技术公众号本文主要介绍netflix conductor的基本概念和主要运行机制。
一 简介
netflix conductor是基于JAVA语言编写的开源流程引擎,用于架构基于微服务的流程。它具备如下特性:
- 允许创建复杂的业务流程,流程中每个独立的任务都是由一个微服务所实现。
- 基于JSON DSL 创建工作流,对任务的执行进行编排。
- 工作流在执行的过程中可见、可追溯。
- 提供暂停、恢复、重启等多种控制模型。
- 提供一种简单的方式来最大限度重用微服务。
- 拥有扩展到百万流程并发运行的服务能力。
- 通过队列服务实现客户端与服务端的分离。
- 支持 HTTP 或其他RPC协议进行数据传送
二 基本概念
1 Task
Task是最小执行单元,承载了一段执行逻辑,如发送HTTP请求等。
- System Task:被conductor服务执行,这些任务的执行与引擎在同一个JVM中。
- Worker Task:被worker服务执行,执行与引擎隔离开,worker通过队列获取任务后,执行并更新结果状态到引擎。Worker的实现是跨语言的,其使用Http协议与Server通信。
conductor提供了若干内置SystemTask:
-
功能性Task:
- HTTP:发送http请求
- JSON_JQ_TRANSFORM:jq命令执行,一般用户json的转换,具体可见jq官方文档
- KAFKA_PUBLISH: 发布kafka消息
-
流程控制Task:
- SWITCH(原Decision):条件判断分支,类似于代码中的switch case
- FORK:启动并行分支,用于调度并行任务
- JOIN:汇总并行分支,用于汇总并行任务
- DO_WHILE:循环,类似于代码中的do while
- WAIT:一直在运行中,直到外部时间触发更新节点状态,可用于等待外部操作
- SUB_WORKFLOW:子流程,执行其他的流程
- TERMINATE:结束流程,以指定输出提前结束流程,可以与SWITCH节点配合使用,类似代码中的提前return语句
-
自定义Task:
- 对于System Task,Conductor提供了WorkflowSystemTask 抽象类,可以自定义扩展实现。
- 对于Worker Task,可以实现conductor的client Worker接口实现执行逻辑。
2 Workflow
- Workflow由一系列需要执行的Task组成,conductor采用json来描述Task的流转关系。
- 除基本的顺序流程外,借助内置的SWITCH、FORK、JOIN、DO_WIHLE、TERMINATE任务,还能实现分支、并行、循环、提前结束等流程控制。
3 Input&Output
Task的输入是一种映射,其作为工作流实例化的一部分或某些其他Task的输出。允许将来自工作流或其他Task的输入/输出作为随后执行的Task的输入。
- Task有自己的输入和输出,输入输出都是jsonobject类型。
- Task可以引用其他Task的输入输出,使用${taskxxx.output}的方式引用。引用语法为json-path,除最基础的${taskxxx.output}的值解析方式外,还支持其他复杂操作,如过滤等,具体见json-path语法。
- 启动Workflow时可以传入流程的输入数据,Task可以通过${workflow.input}的方式引用。
Task实现原子操作的处理以及流程控制操作,Workflow定义描述Task的流转关系,Task引用Workflow或者其它Task的输入输出。通过这些机制,conductor实现了JSON DSL对流程的描述。
三 整体架构
主要分为几个部分:
- Orchestrator: 负责流程的流转调度工作;
- Management/Execution Service: 提供流程、任务的管理更新等操作;
- TaskQueues: 任务队列,Orchestrator解析出来的待执行Task会放到队列中;
- Worker: 任务执行worker,从TaskQueues中获取任务,通过Execution Service更新任务状态与结果数据;
- Database: 元数据&运行时数据库,用于保存运行时的Workflow、Task等状态信息,以及流程任务定义的等原信息;
- Index: 索引数据库,用于存储执行历史;
四 运行模型
1 Task状态转移
- SCHEDULED:待调度,task放到队列中还没有被poll出来执行时的状态
- IN_PROGRESS:执行中,被poll出来执行但还没有完成时的状态
- COMPLETED:执行完成
- FAILED:执行失败
-
CANCELLED:被中止时为此状态,一般出现在两种情况:
- 手动中止流程时,正在运行中的task会被置为此状态;
- 多个fork分支,当某个分支的task失败时,其它分支中正在运行的task会被置为此状态;
2 任务队列
任务的执行(同步的系统任务除外)都会先添加到任务队列中,是典型的生产者消费者模式。
- 任务队列,是一个带有延迟、优先级功能的队列;
- 每种类型的Task是一个单独的队列,此外,如果配置了domain、isolationGroup,还会拆分成多个队列实现执行隔离;
- decider service是生产者,其根据流程配置与当前执行情况,解析出可执行的task后,添加到队列;
- 任务执行器(SystemTaskWorker、Worker)是消费者,其长轮询对应的队列,从队列中获取任务执行;
队列接口可插拔,conductor提供了Dynomite 、MySQL、PostgreSQL的实现。
3 核心功能实现机制
conductor调度的核心是decider service,其根据当前流程运行的状态,解析出将要执行的任务列表,将任务入队交给worker执行。
decide主要流程简化如下,详细代码见WorkflowExecutor.java的decide方法:
其中,调度任务处理流程简化如下,详细代码见WorkflowExecutor.java的scheduleTask方法:
decide的触发时机
最主要的触发时机:
- 新启动执行时,会触发decide操作
- 系统任务执行完成时,会触发decide操作
- Workder任务通过ExecutionService更新任务状态时,会触发decide操作
流程控制节点的实现机制
1)Task & TaskMapper
对于每一个Task来说,都有Task和TaskMapper两部分:
- Task:任务的执行逻辑代码,它的作用是Task的执行
- TaskMapper:任务的映射逻辑代码,它通过Task的定义配置、当前实例的执行状态等信息,返回实际需要执行的Task列表
对于一般的任务来说,TaskMapper返回的是就是Task本身,补充一些执行实例的状态信息。但是对于控制节点来说,会有不同的逻辑。
2)条件分支(SWITCH)的实现机制
SWITCH用于根据条件判断,执行不同的分支。
实际上,该节点的Task不做任何操作,TaskMapper根据分支条件,判断出要走的分之后,返回对应分支的第一个Task。
SwitchTaskMapper.java getMappedTasks方法关键代码:
// 待调度的Task list,最终返回结果 List<Task> tasksToBeScheduled = new LinkedList<>(); // evalResult是分支条件变量的值(case) // decisionCases是一个Map结构,key为分支的case值,value为对应分支的任务定义list(分支内的任务定义会有多个) // 根据分支变量的实际值,获取对应分支的任务定义list List<WorkflowTask> selectedTasks = taskToSchedule.getDecisionCases().get(evalResult); // default的逻辑:如果获取不到对应的分支或者分支为空,则用默认的分支 if (selectedTasks == null || selectedTasks.isEmpty()) { selectedTasks = taskToSchedule.getDefaultCase(); } if (selectedTasks != null && !selectedTasks.isEmpty()) { // 获取分支的第一个(下标0)task,返回给decider service去做调度(decider会把任务添加到队列里,交给worker去执行) WorkflowTask selectedTask = selectedTasks.get(0); // 调用了deciderService的getTasksToBeScheduled方法,此方法里又获取到TaskMapper调用了getMappedTasks。这里采用了递归调用的方式,解析嵌套的Task List<Task> caseTasks = taskMapperContext.getDeciderService() .getTasksToBeScheduled(workflowInstance, selectedTask, retryCount, taskMapperContext.getRetryTaskId()); tasksToBeScheduled.addAll(caseTasks); switchTask.getInputData().put("hasChildren", "true"); } return tasksToBeScheduled;
3)并行(FORK)的实现机制
FORK用于开启多个并行分支。
实际上,该节点的Task不做任何操作,TaskMapper返回所有并行分支的第一个Task。
ForkJoinTaskMapper.java getMappedTasks关键代码:// 待调度的Task list,最终返回结果 List<Task> tasksToBeScheduled = new LinkedList<>(); // 配置中的所有fork分支 List<List<WorkflowTask>> forkTasks = taskToSchedule.getForkTasks(); for (List<WorkflowTask> wfts : forkTasks) { // 每个分支取第一个Task WorkflowTask wft = wfts.get(0); // 调用了deciderService的getTasksToBeScheduled方法,此方法里又获取到TaskMapper调用了getMappedTasks。这里采用了递归调用的方式,解析嵌套的Task List<Task> tasks2 = taskMapperContext.getDeciderService() .getTasksToBeScheduled(workflowInstance, wft, retryCount); tasksToBeScheduled.addAll(tasks2); } return tasksToBeScheduled;
总的来说,分支(SWITCH)、并行(FORK)节点本身没有执行逻辑,其通过TaskMapper返回到实际要执行的Task,然后交给Decider Service处理。
重试的实现机制
重试和其延迟时间设置,都是借助任务队列的功能实现的。
重试:将任务重新添加到任务队列
重试的延迟时间:添加到任务队列时设置延迟时间,延迟时间过后,任务才能在队列中被poll出来执行
五 完整性保障机制
由于调度过程中可能会出现因机器重启、网络异常、JVM崩溃等偶发情况,这些会导致的decide过程意外终止,流程执行不完整,展现出如流程一直运行中(实际已经没有在调度),或者其它状态错误等异常现象。
1 WorkflowReconciler
针对这种情况,conductor有一个WorkflowReconciler,会定期尝试decide所有正在运行中的流程,修复流程执行的一致性。此外,它还有一个作用是校验流程超时时间。
2 decideQueue
那么WorkflowReconciler是如何获取到当前运行中的流程呢,答案是decideQueue。
decideQueue和任务队列相同,也是一个具有延迟功能的队列,其存放的是正在执行中的流程的实例id。在任务开始执行时(包括新启动执行、重试执行、恢复执行、重跑执行等),会将实例id push到decideQueue中;在执行结束(成功、失败)时,会从decideQueue中删除实例id。3 ExecutionLockService
WorkflowReconciler会定期尝试decide所有正在运行中的流程用于超时判断、维护流程一致性。但是流程本身正常执行也会触发decide,如果同一个执行同时触发两个decide,可能会导致状态混乱,执行卡住等问题。
conductor采用了锁来解决这个问题,其提供了单机LocalOnlyLock(基于信号量实现)、redis分布式锁(基于redission实现)、zookeeper分布式锁三种实现。
decide方法中最开始会尝试获取锁,如果获取失败则直接返回。通过锁来保障不会对同一个流程实例并发执行decide。
if (!executionLockService.acquireLock(workflowId)) { return false; }
由于锁是可配置的,可能会导致一个误区:单台机器的话不用配置锁。其实单机也是需要配置锁的,因为WorkflowReconciler和流程正常执行会产生冲突,可能会导致偶发的流程状态混乱问题。
原文链接
本文为阿里云原创内容,未经允许不得转载。 -
WebRTC PeerConnection Client源码分析3-Conductor
2021-10-02 19:14:36本文分析的webrtc的版本是:m84 平台:win10 WebRTC PeerConnection Client源码分析<1>-main window WebRTC PeerConnection Client源码分析<2>-PeerConnectionClient ...Conductor是Pee本文分析的webrtc的版本是:
m84
平台:win10
WebRTC PeerConnection Client源码分析1-main window
WebRTC PeerConnection Client源码分析2-PeerConnectionClient
WebRTC PeerConnection Client源码分析3-Conductor
注:下文中所谓的回调函数,实际上是虚函数。把虚函数说成回调函数是为了描述方便。
Conductor是PeerConnection Client的核心,现在按照Conductor实际调用顺序逐一分析。
上一篇文章中我已经给一张PeerConnection Client与信令服务器交互的时序图,现在给出一张PeerConnection的时序图:
这张时序图引自:https://blog.csdn.net/ice_ly000/article/details/103204327
1、创建Conductor对象
int PASCAL wWinMain(HINSTANCE instance, HINSTANCE prev_instance, wchar_t* cmd_line,int cmd_show) { ... /*创建Conductor对象*/ rtc::scoped_refptr<Conductor> conductor(new rtc::RefCountedObject<Conductor>(&client, &wnd)); ... } Conductor::Conductor(PeerConnectionClient* client, MainWindow* main_wnd) : peer_id_(-1), loopback_(false), client_(client), main_wnd_(main_wnd) { client_->RegisterObserver(this); /*注册成为PeerConnectionClient的订阅者*/ main_wnd->RegisterObserver(this); /*注册成为MainWindow的订阅者*/ } void PeerConnectionClient::RegisterObserver(PeerConnectionClientObserver* callback) { RTC_DCHECK(!callback_); callback_ = callback; /*保存订阅者*/ } void MainWnd::RegisterObserver(MainWndCallback* callback) { callback_ = callback; /*保存订阅者*/ }
Conductor在构造器注册到PeerConnectionClient和MainWnd中。
struct PeerConnectionClientObserver { /*成功登录信令服务器*/ virtual void OnSignedIn() = 0; /*登出信令服务器*/ virtual void OnDisconnected() = 0; /*其他客户端登录信令服务器*/ virtual void OnPeerConnected(int id, const std::string& name) = 0; /*其他客户端登出信令服务器*/ virtual void OnPeerDisconnected(int peer_id) = 0; /*接收到其他客户端发送过来的信息*/ virtual void OnMessageFromPeer(int peer_id, const std::string& message) = 0; /*通知信息已发送*/ virtual void OnMessageSent(int err) = 0; /*登录信令服务器失败*/ virtual void OnServerConnectionFailure() = 0; protected: virtual ~PeerConnectionClientObserver() {} };
Conductor通过覆写上面的虚函数,用于接收PeerConnectionClient中的消息。
class MainWndCallback { public: /*通知登录信令服务器*/ virtual void StartLogin(const std::string& server, int port) = 0; /*通知登出信令服务器*/ virtual void DisconnectFromServer() = 0; /*连接对端peer*/ virtual void ConnectToPeer(int peer_id) = 0; /*与对端断开连接*/ virtual void DisconnectFromCurrentPeer() = 0; /*自定义消息处理函数*/ virtual void UIThreadCallback(int msg_id, void* data) = 0; /*关闭Conductor*/ virtual void Close() = 0; protected: virtual ~MainWndCallback() {} };
Conductor通过覆写上面的虚函数,用于接收MainWnd中的消息。
2、主动端主动开启通话
2.1、用户点击对端id
登录信令服务器后,会进入peer list界面。如下图:
通话双方进入peer list界面后,为了后续方便描述,后面把发起通话的一端称为主动peer,把被动发起通话的一端称为被动peer。
主动peer在peer list界面点击对方用户peer id后,会触发如下逻辑:
void MainWnd::OnDefaultAction() { ... else if (ui_ == LIST_PEERS) { /*peer list界面peer id被双击*/ ... if (peer_id != -1 && callback_) { /*回调Conductor::ConnectToPeer,传入要连接peer的id。*/ callback_->ConnectToPeer(peer_id); } } } ... }
双击用户的peer id后,通过消息处理循环函数进入上述函数,接着通过回调进入Conductor::ConnectToPeer()。
void Conductor::ConnectToPeer(int peer_id) { RTC_DCHECK(peer_id_ == -1); RTC_DCHECK(peer_id != -1); /*peer_connection_必须为空,返回返回。*/ if (peer_connection_.get()) { main_wnd_->MessageBox( "Error", "We only support connecting to one peer at a time", true); return; } /*初始化PeerConnection*/ if (InitializePeerConnection()) { peer_id_ = peer_id; /*生成offer*/ peer_connection_->CreateOffer( this, webrtc::PeerConnectionInterface::RTCOfferAnswerOptions()); } else { main_wnd_->MessageBox("Error", "Failed to initialize PeerConnection", true); } }
后面的分析很多都会涉及到webrtc::PeerConnection,要好好配合文章开头给的时序以帮助理解。
bool Conductor::InitializePeerConnection() { RTC_DCHECK(!peer_connection_factory_); RTC_DCHECK(!peer_connection_); /*1、创建PeerConnectionFactory对象*/ peer_connection_factory_ = webrtc::CreatePeerConnectionFactory( nullptr /* network_thread */, nullptr /* worker_thread */, nullptr /* signaling_thread */, nullptr /* default_adm */, webrtc::CreateBuiltinAudioEncoderFactory(), webrtc::CreateBuiltinAudioDecoderFactory(), webrtc::CreateBuiltinVideoEncoderFactory(), webrtc::CreateBuiltinVideoDecoderFactory(), nullptr /* audio_mixer */, nullptr /* audio_processing */); if (!peer_connection_factory_) { main_wnd_->MessageBox("Error", "Failed to initialize PeerConnectionFactory", true); DeletePeerConnection(); return false; } /*2、创建PeerConnection对象*/ if (!CreatePeerConnection(/*dtls=*/true)) { main_wnd_->MessageBox("Error", "CreatePeerConnection failed", true); DeletePeerConnection(); } /*3、添加音视频源*/ AddTracks(); return peer_connection_ != nullptr; }
先创建PeerConnectionFactory对象,然后通过该对象创建PeerConnection对象,最后添加音频、视频源。
bool Conductor::CreatePeerConnection(bool dtls) { RTC_DCHECK(peer_connection_factory_); RTC_DCHECK(!peer_connection_); webrtc::PeerConnectionInterface::RTCConfiguration config; /*sdp的格式*/ config.sdp_semantics = webrtc::SdpSemantics::kUnifiedPlan; /*是否使用dtls*/ config.enable_dtls_srtp = dtls; /*配置stun/turn服务器*/ webrtc::PeerConnectionInterface::IceServer server; server.uri = GetPeerConnectionString(); config.servers.push_back(server); /*创建PeerConnection对象*/ peer_connection_ = peer_connection_factory_->CreatePeerConnection(config, nullptr, nullptr, this); return peer_connection_ != nullptr; }
创建PeerConnection对象
void Conductor::AddTracks() { if (!peer_connection_->GetSenders().empty()) { return; // Already added tracks. } /*创建audio track*/ rtc::scoped_refptr<webrtc::AudioTrackInterface> audio_track(peer_connection_factory_->CreateAudioTrack(kAudioLabel, peer_connection_factory_->CreateAudioSource(cricket::AudioOptions()))); /*添加audio track*/ auto result_or_error = peer_connection_->AddTrack(audio_track, {kStreamId}); if (!result_or_error.ok()) { RTC_LOG(LS_ERROR) << "Failed to add audio track to PeerConnection: " << result_or_error.error().message(); } /*创建video device*/ rtc::scoped_refptr<CapturerTrackSource> video_device = CapturerTrackSource::Create(); if (video_device) { /*创建video track*/ rtc::scoped_refptr<webrtc::VideoTrackInterface> video_track_(peer_connection_factory_->CreateVideoTrack(kVideoLabel, video_device)); /*将video track送至本地视频渲染器,用于本地视频的渲染。*/ main_wnd_->StartLocalRenderer(video_track_); /*添加video track*/ result_or_error = peer_connection_->AddTrack(video_track_, {kStreamId}); if (!result_or_error.ok()) { RTC_LOG(LS_ERROR) << "Failed to add video track to PeerConnection: " << result_or_error.error().message(); } } else { RTC_LOG(LS_ERROR) << "OpenVideoCaptureDevice failed"; } /*将界面从peer list界面切换至视频显示界面*/ main_wnd_->SwitchToStreamingUI(); }
创建audio track、video track,并添加到PeerConnection中。本地视频需要通过视频渲染器显示出来。
2.2、创建offer
InitializePeerConnection()函数完成后,会调用CreateOffer()生成offer,此函数生成offer是异步的,生成的offer会通过回调函数Conductor::OnSuccess()传回。
/*offer生成成功后的回调*/ void Conductor::OnSuccess(webrtc::SessionDescriptionInterface* desc) { /*生成的offer需要通过SetLocalDescription设置到PeerConnection中*/ peer_connection_->SetLocalDescription(DummySetSessionDescriptionObserver::Create(), desc); std::string sdp; desc->ToString(&sdp); /*将offer转成string*/ ... /*将offer包装到json中*/ Json::StyledWriter writer; Json::Value jmessage; jmessage[kSessionDescriptionTypeName] = webrtc::SdpTypeToString(desc->GetType()); jmessage[kSessionDescriptionSdpName] = sdp; /*发送至信令服务器*/ SendMessage(writer.write(jmessage)); } /*offer生成失败后的回调*/ void Conductor::OnFailure(webrtc::RTCError error) { RTC_LOG(LERROR) << ToString(error.type()) << ": " << error.message(); }
offer的生成是异步的,成功的生成offer或生成失败会调用对应的回调函数(虚函数)。
class DummySetSessionDescriptionObserver : public webrtc::SetSessionDescriptionObserver { public: static DummySetSessionDescriptionObserver* Create() { return new rtc::RefCountedObject<DummySetSessionDescriptionObserver>(); } virtual void OnSuccess() { RTC_LOG(INFO) << __FUNCTION__; } virtual void OnFailure(webrtc::RTCError error) { RTC_LOG(INFO) << __FUNCTION__ << " " << ToString(error.type()) << ": " << error.message(); } };
在调用SetLocalDescription和SetRemoteDescription时,其处理的结果是通过回调函数告知的。当成功时,会调用
OnSuccess
函数,失败时会调用OnFailure
函数。传入SetLocalDescription和SetRemoteDescription中用于反馈结果的是一个类对象,该类对象需要继承webrtc::SetSessionDescriptionObserver接口,同时覆写OnSuccess和OnFailure虚函数,这样就可形成多态,将处理的结果通过覆写的虚函数回调上来以供用户处理。
void Conductor::SendMessage(const std::string& json_object) { std::string* msg = new std::string(json_object); /*投递的消息类型是SEND_MESSAGE_TO_PEER*/ main_wnd_->QueueUIThreadCallback(SEND_MESSAGE_TO_PEER, msg); }
通过回调返回的offer,并没有直接向信令服务器发送。信令的发送需要在主线程中进行,此时还在webrtc的内部线程中,所以通过上面的函数将待发送的offer信令投递给了主线程,主线程在处理这个消息时,会将对应的信令发送给信令服务器。
/*自定义的消息类型*/ enum WindowMessages { UI_THREAD_CALLBACK = WM_APP + 1, }; /*向Windows Application消息队列投递自定义的消息*/ void MainWnd::QueueUIThreadCallback(int msg_id, void* data) { /*向消息队列投递自己定义的消息*/ ::PostThreadMessage(ui_thread_id_, UI_THREAD_CALLBACK, static_cast<WPARAM>(msg_id), reinterpret_cast<LPARAM>(data)); } bool MainWnd::PreTranslateMessage(MSG* msg) { ... /*处理自定义类型的消息*/ } else if (msg->hwnd == NULL && msg->message == UI_THREAD_CALLBACK) { callback_->UIThreadCallback(static_cast<int>(msg->wParam), reinterpret_cast<void*>(msg->lParam)); /*进入Conductor处理自己发出的消息*/ ret = true; } ... }
在MainWnd中自定义了消息类型
UI_THREAD_CALLBACK
,通过MainWnd::QueueUIThreadCallback函数可以向Windows Application消息队列投递自定义的消息。在MainWnd::PreTranslateMessage中处理投递的消息。void Conductor::UIThreadCallback(int msg_id, void* data) { switch (msg_id) { case PEER_CONNECTION_CLOSED: case SEND_MESSAGE_TO_PEER: { case NEW_TRACK_ADDED: { case TRACK_REMOVED: { default: RTC_NOTREACHED(); break; } }
通过MainWnd::QueueUIThreadCallback函数投递的消息,最终会在Conductor::UIThreadCallback得到处理。
void Conductor::UIThreadCallback(int msg_id, void* data) { switch (msg_id) { ... case SEND_MESSAGE_TO_PEER: { RTC_LOG(INFO) << "SEND_MESSAGE_TO_PEER"; /*获取消息*/ std::string* msg = reinterpret_cast<std::string*>(data); if (msg) { pending_messages_.push_back(msg); /*将消息存放到队列中,保证消息有序发送。*/ } /*如果有消息待发送,且PeerConnectionClient可以发送信令。*/ if (!pending_messages_.empty() && !client_->IsSendingMessage()) { msg = pending_messages_.front(); /*从队首获取一条待发送的消息*/ pending_messages_.pop_front(); /*通过PeerConnectionClient放该消息*/ if (!client_->SendToPeer(peer_id_, *msg) && peer_id_ != -1) { RTC_LOG(LS_ERROR) << "SendToPeer failed"; DisconnectFromServer(); } delete msg; } if (!peer_connection_.get()) peer_id_ = -1; break; } ... }
上述在发送offer信令时,会通过上述代码发送给信令服务器。
3、被动端被动开启视频通话
被动peer登录信令服务器后,处于peer listen界面,如上图。
当被动peer收到信令服务器发送过来的offer信令后,其逻辑如下:
void PeerConnectionClient::OnHangingGetRead(rtc::AsyncSocket* socket) { ... } else { /*用于处理offer、answer、candidate、bye信令*/ OnMessageFromPeer(static_cast<int>(peer_id), notification_data_.substr(pos)); } ... }
当信令服务器发送过来offer信令时,PeerConnectionClient::OnHangingGetRead函数被触发。
void PeerConnectionClient::OnMessageFromPeer(int peer_id, const std::string& message) { ... } else { /*收到的是offer、answer、candidate信令*/ callback_->OnMessageFromPeer(peer_id, message); } }
在PeerConnectionClient对象中,通过OnMessageFromPeer回调函数,将信令送至了Conductor::OnMessageFromPeer函数中。
void Conductor::OnMessageFromPeer(int peer_id, const std::string& message) { RTC_DCHECK(peer_id_ == peer_id || peer_id_ == -1); RTC_DCHECK(!message.empty()); /*此时被动peer还没有创建PeerConnection对象*/ if (!peer_connection_.get()) { RTC_DCHECK(peer_id_ == -1); peer_id_ = peer_id; /*创建PeerConnection对象*/ if (!InitializePeerConnection()) { RTC_LOG(LS_ERROR) << "Failed to initialize our PeerConnection instance"; client_->SignOut(); return; } } else if (peer_id != peer_id_) { RTC_DCHECK(peer_id_ != -1); RTC_LOG(WARNING) << "Received a message from unknown peer while already in a " "conversation with a different peer."; return; } Json::Reader reader; Json::Value jmessage; /*将收到的消息解析成json对象*/ if (!reader.parse(message, jmessage)) { RTC_LOG(WARNING) << "Received unknown message. " << message; return; } std::string type_str; std::string json_object; /*从json消息中解析出消息的类型*/ rtc::GetStringFromJsonObject(jmessage, kSessionDescriptionTypeName, &type_str); if (!type_str.empty()) { ... /*获取消息的类型*/ absl::optional<webrtc::SdpType> type_maybe = webrtc::SdpTypeFromString(type_str); if (!type_maybe) { RTC_LOG(LS_ERROR) << "Unknown SDP type: " << type_str; return; } webrtc::SdpType type = *type_maybe; std::string sdp; /*从json消息中获取sdp,此处为offer。*/ if (!rtc::GetStringFromJsonObject(jmessage, kSessionDescriptionSdpName, &sdp)){ RTC_LOG(WARNING) << "Can't parse received session description message."; return; } /*将offer转成webrtc可以理解的对象*/ webrtc::SdpParseError error; std::unique_ptr<webrtc::SessionDescriptionInterface> session_description = webrtc::CreateSessionDescription(type, sdp, &error); if (!session_description) { RTC_LOG(WARNING) << "Can't parse received session description message. " "SdpParseError was: " << error.description; return; } RTC_LOG(INFO) << " Received session description :" << message; /*将offer通过SetRemoteDescription设置到PeerConnection中*/ peer_connection_->SetRemoteDescription(DummySetSessionDescriptionObserver::Create(),session_description.release()); /*收到了对端的offer,本端需要产生answer。*/ if (type == webrtc::SdpType::kOffer) { peer_connection_->CreateAnswer( this, webrtc::PeerConnectionInterface::RTCOfferAnswerOptions()); } } else { ... } }
被动peer在收到offer时,还没有创建PeerConnection对象,所以先创建PeerConnection对象,过程同主动peer。
将收到的消息解析成json,然后从json中获取
type
和sdp
对应的值,通过SetRemoteDescription设置offer,然后通过CreateAnswer生成answer,注意answer的生成也是异步的,生成后也会通过Conductor::OnSuccess回调函数传递上来,其处理方式同offer,且生成的answer会通过信令服务器发送给对端。offer信令的格式如下:
{ "sdp" : "v=0\r\no=- 7038993275920826226 ...", "type" : "offer" }
4、处理candidate
生成offer和answer后,WebRTC会通过Conductor::OnIceCandidate()函数上传生成的candidate。
void Conductor::OnIceCandidate(const webrtc::IceCandidateInterface* candidate) { RTC_LOG(INFO) << __FUNCTION__ << " " << candidate->sdp_mline_index(); Json::StyledWriter writer; Json::Value jmessage; /*将candidate包装到json中*/ jmessage[kCandidateSdpMidName] = candidate->sdp_mid(); jmessage[kCandidateSdpMlineIndexName] = candidate->sdp_mline_index(); std::string sdp; if (!candidate->ToString(&sdp)) { RTC_LOG(LS_ERROR) << "Failed to serialize candidate"; return; } jmessage[kCandidateSdpName] = sdp; /*通过信令服务器发送给对端*/ SendMessage(writer.write(jmessage)); }
生成的candidate需要通过信令服务器发送给对端。
candidate的信令格式如下:
{ "candidate" : "candidate:2999745851 1 udp 2122260223 192.168.56.1 58572 typ host generation 0 ufrag Cy2E network-id 3", "sdpMLineIndex" : 1, "sdpMid" : "1" }
void Conductor::OnMessageFromPeer(int peer_id, const std::string& message) { ... if (!type_str.empty()) { ... } else { /*处理收到的candidate*/ std::string sdp_mid; int sdp_mlineindex = 0; std::string sdp; /*从json中解析处需要的数据*/ if (!rtc::GetStringFromJsonObject(jmessage, kCandidateSdpMidName, &sdp_mid) || !rtc::GetIntFromJsonObject(jmessage, kCandidateSdpMlineIndexName, &sdp_mlineindex) || !rtc::GetStringFromJsonObject(jmessage, kCandidateSdpName, &sdp)) { RTC_LOG(WARNING) << "Can't parse received message."; return; } webrtc::SdpParseError error; /*根据接收的信息生成candidate对象*/ std::unique_ptr<webrtc::IceCandidateInterface> candidate( webrtc::CreateIceCandidate(sdp_mid, sdp_mlineindex, sdp, &error)); if (!candidate.get()) { RTC_LOG(WARNING) << "Can't parse received candidate message. " "SdpParseError was: " << error.description; return; } /*给PeerConnection对象添加candidate*/ if (!peer_connection_->AddIceCandidate(candidate.get())) { RTC_LOG(WARNING) << "Failed to apply the received candidate"; return; } RTC_LOG(INFO) << " Received candidate :" << message; } }
将收到的candidate通过AddIceCandidate函数添加到PeerConnection对象中。
5、收到远端视频流
void Conductor::OnAddTrack( rtc::scoped_refptr<webrtc::RtpReceiverInterface> receiver, const std::vector<rtc::scoped_refptr<webrtc::MediaStreamInterface>>& streams) { RTC_LOG(INFO) << __FUNCTION__ << " " << receiver->id(); /*向主线程发送自定义的消息*/ main_wnd_->QueueUIThreadCallback(NEW_TRACK_ADDED, receiver->track().release()); }
经过上面的步骤后,建立了音视频的连接,当收到远端的视频流时,会通过OnAddTrack()通知用户。
void Conductor::UIThreadCallback(int msg_id, void* data) { ... case NEW_TRACK_ADDED: { auto* track = reinterpret_cast<webrtc::MediaStreamTrackInterface*>(data); if (track->kind() == webrtc::MediaStreamTrackInterface::kVideoKind) { /*获取远端video track*/ auto* video_track = static_cast<webrtc::VideoTrackInterface*>(track); /*送至MainWnd处理*/ main_wnd_->StartRemoteRenderer(video_track); } track->Release(); break; } ... } void MainWnd::StartRemoteRenderer(webrtc::VideoTrackInterface* remote_video) { /*生成远端视频渲染器,同时将远端视频渲染器注册到webrtc中。*/ remote_renderer_.reset(new VideoRenderer(handle(), 1, 1, remote_video)); }
在这个函数中会将MainWnd中的remote_renderer_添加到WebRTC中,用于接收并渲染远端视频帧。
void Conductor::OnRemoveTrack( rtc::scoped_refptr<webrtc::RtpReceiverInterface> receiver) { RTC_LOG(INFO) << __FUNCTION__ << " " << receiver->id(); main_wnd_->QueueUIThreadCallback(TRACK_REMOVED, receiver->track().release()); } void Conductor::UIThreadCallback(int msg_id, void* data) { ... case TRACK_REMOVED: { auto* track = reinterpret_cast<webrtc::MediaStreamTrackInterface*>(data); track->Release(); break; } ... }
远端视频流移除的逻辑
6、远端peer关闭
当远端关闭时,会想信令服务器发送一条sign out信令,信令服务器会转发该信令。本端收到信令后的处理逻辑如下:
void PeerConnectionClient::OnHangingGetRead(rtc::AsyncSocket* socket) { ... if (connected) { peers_[id] = name; callback_->OnPeerConnected(id, name); } else { /*收到远端peer关闭的信令。*/ peers_.erase(id); callback_->OnPeerDisconnected(id); /*通知Conductor远端关闭*/ } ... } void Conductor::OnPeerDisconnected(int id) { RTC_LOG(INFO) << __FUNCTION__; if (id == peer_id_) { RTC_LOG(INFO) << "Our peer disconnected"; /*通知远端关闭*/ main_wnd_->QueueUIThreadCallback(PEER_CONNECTION_CLOSED, NULL); } else { /*刷新peer list界面的用户列表*/ if (main_wnd_->current_ui() == MainWindow::LIST_PEERS) main_wnd_->SwitchToPeerList(client_->peers()); } } void Conductor::UIThreadCallback(int msg_id, void* data) { switch (msg_id) { case PEER_CONNECTION_CLOSED: RTC_LOG(INFO) << "PEER_CONNECTION_CLOSED"; DeletePeerConnection(); /*释放PeerConnection对象*/ if (main_wnd_->IsWindow()) { /*如果在视频渲染界面*/ if (client_->is_connected()) { /*如果处于连接状态,则将界面切换至peer list界面*/ main_wnd_->SwitchToPeerList(client_->peers()); } else { main_wnd_->SwitchToConnectUI(); /*将界面切换至connect界面*/ } } else { DisconnectFromServer(); } break; ... } }
7、本端关闭程序
void Conductor::Close() { client_->SignOut(); /*向信令服务器发送sign out信令*/ DeletePeerConnection(); /*释放PeerConnection对象*/ }
关闭本端程序,首先需要向信令服务器发送sign out信令,然后销毁PeerConnection对象。
-
Conductor-h2o-driverlessai:将H2O无人驾驶AI集成为IBM Spectrum Conductor的应用程序实例
2021-02-12 23:42:34要将无驱动程序AI与IBM Spectrum Conductor一起使用,您需要在计划运行无驱动程序AI的集群的每个节点上下载并安装无驱动程序AI。 首先下载TAR SH版本以在每个主机上安装: : 在每台主机上安装无驱动程序AI之后,... -
conductor event 和wait 分析及测试记录
2021-10-06 09:42:50conductor 的wait按官方说法可以通过消息队列及conductor api触发: 1、没有aws队列环境,没有测试队列触发模式 2、测试过程没有找到对应rest,/api/queue/update/{workflowId}/{taskRefName}/{status} 一直报告 { ... -
conductor server配置项目
2021-10-10 11:59:41conductor server配置项目说明是分散在各个additional-spring-configuration-metadata.json 及类ConductorProperties 中。 可以使用 find . -name additional*.json |grep -v build|xargs cat >>all-... -
conductor:一个基于 http 的 docker 任务运行器
2021-06-13 17:41:24Conductor - 基于docker 任务运行器 基本上,这是一个便携式PHP环境,可以运行robo在任何地方docker可以运行。 在您的出色项目在他们的本地工作站上运行之前,要求您的开发人员同事安装包X和包Y ,哦和模块Z的日子... -
Print Conductor v8.0.2203.14130一款图片文件批量打印工具打印机必备工具.exe
2022-03-16 18:43:11Print Conductor可以批量打印所有流行的文档:PDF文件,Word和Excel文件,各种办公文件,技术图纸,法律文件,协议,演示文稿,文本文件,电子邮件,图表,电子表格,发票,图像和许多其他类型文件。Print Conductor... -
conductor client 代码快速分析
2021-10-10 13:03:24conductor client 比较简单,包括以下两个子项目 1、client 2、client-spring client-spring是spring boot实现。 从以下spring boot核心类可以看到做什么 public class ConductorClientAutoConfiguration { @... -
Print Conductor批量打印工具,Print Conductor|Crack版
2021-03-26 19:36:22Print Conductor批量打印工具,Print Conductor|Crack版 Print Conductor是一种简单的批量打印工具。如果您必须定期打开并打印大量文件,则此方便的工具可以帮助您立即打印文件。当您将打印文件打印到程序时,... -
Netflix Conductor:微服务编排器
2019-10-30 23:31:14翻译自https://medium.com/netflix-techblog/netflix-conductor-a-microservices-orchestrator-2e8d4771bf40 -
深入浅出Netflix Conductor使用
2020-05-28 14:22:45Netflix Conductor框架是典型的服务编排框架,通过Conductor还可以实现工作流和分布式调度,性能非常卓越。 关于Conductor的基本概念在 https://netflix.github.io/conductor/intro/ 文中已经有深入介绍,本篇将以... -
Win10 安装 Netflix Conductor
2021-07-21 14:28:26Build file '/Users/songzj/Downloads/sourceCode/conductor-3.0.1/build.gradle' line: 18 An exception occurred applying plugin request [id: 'nebula.netflixoss', version: '9.2.2'] Failed to -
Netflix/conductor学习笔记(1)
2021-04-06 15:11:03Netflix/conductor入门 Workflow 工作流是您的流程流的容器。它可以包括几种不同类型的任务,子工作流,相互连接的输入和输出,以有效地获得所需的结果。 Workflow 定义 工作流是使用基于JSON的DSL定义的,包括作为...