精华内容
下载资源
问答
  • ESB企业服务总线

    千次阅读 2016-02-20 22:02:24
    ESB是企业服务总线(Enterprise Service Bus)缩写,是中间件技术与Web Service等技术结合产物,也是SOA系统中核心基础设施。ESB就是一个服务中介,形成服务使用者->ESB服务Proxy->服务提供者生物链,中介...

    ESB是企业服务总线(Enterprise Service Bus)的缩写,是中间件技术与Web Service等技术结合的产物,也是SOA系统中的核心基础设施。ESB就是一个服务的中介,形成服务使用者->ESB服务Proxy->服务提供者的生物链,中介的作用在不同应用中各有不同:

    解耦中介 :客户对实际服务提供者的身份、物理位置、传输协议和接口定义都是不知道也不关心的,交互集成代码提取到了业务逻辑之外,由ESB平台进行中央的宣告式定义。ESB平台实现协议转换 (WebService,Http,JMS…),消息转换 (转换、充实、过滤),消息路由 (同步/异步、发布/订阅、基于内容路由、分支与聚合…)。
    服务中介 :ESB平台作为中介提供服务交互中的基础服务。ESB平台实现SLA (可靠性保证,负载均衡,流量控制,缓存,事务控制,加密传输),服务管理监控 (异常处理,服务调用及消息数据记录,系统及服务的状态监控,ESB配置管理),统一安全管理 (这个有点理想主义)。
    服务编排 :多个服务进行编排形成新的服务。ESB支持一个直观的形式定义新组合服务的流程(工作流、BPEL 或 代码级编排)。
    从上面可以看到ESB的基本功能仍然是数据传输,消息协议转化,路由三大核心功能。有这三大核心功能也可以看到在进行异构系统的整合时候往往根据需要ESB提供这些功能。没有ESB时候也可以实现SOA,比如借助SCA和BPEL来实现SOA,当时却很难实现消息协议转化和动态路由。

    ESB在发展过程中有从原有的消息中间件转化为ESB产品的,这类消息中间件和数据总线产品在原有的EAI企业应用集成中应用比较多。而SOA根据强调了基于服务的集成,以Web Service服务为基本的管理单元。一个服务的定位是关于如何把业务逻辑表现成为一组相互独立的,自描述的且能互操作的实体。

    对于SOA关注的是服务全生命周期,通过服务实现业务价值。而ESB关注的是服务中介和服务的集成,是SOA的基础设施。SOA有两个核心组件,一个是ESB,一个是BPEL,而ESB是基础设施,BPEL是业务流程驱动下服务的集成和整合。离开了SOA,ESB将失去它所连接的服务,而仅仅是一个总线,同时也将变得毫无价值。Bobby做了一个比喻:路是没有任何价值的,除非你利用它把一个东西从一个地方移到另外一个地方。而离开SOA,ESB就像一个没人使用的道路。

    做SOA的事情不要先上来建立一个大而全的ESB,相反是关注你的业务问题,找到用SOA的方法来解决业务上的需求,在解决这个问题的过程当中,你会看到一系列的业务服务。这些业务服务是会产生业务价值的。它可以灵活地组装,动态地解决你变化的业务需求。这是它的价值,只有这样才能使你的业务敏捷起来,随需应变起来。而在服务的组装过程中,你再去考虑利用ESB来把他们连接起来。
    这里写图片描述
    ESB 需要某种形式的服务路由目录(service routing directory)来路由服务请求。然而,SOA 可能还有单独的业务服务目录(business service directory),其最基本的形式可能是设计时服务目录,用于在组织的整个开发活动中实现服务的重用。Web 服务远景在业务服务目录和服务路由目录的角色中都放置了一个 UDDI 目录,因而使得可以动态发现和调用服务。这样的目录可以视为 ESB 的一部分;然而,在这样的解决方案变得普遍之前,业务服务目录可能与 ESB 是分离的。
    标准的 ESB 功能
    这里写图片描述
    这里写图片描述
    上面的许多功能既可以使用专有技术实现,也可以通过利用开放标准实现。然而,使用不同的技术来实现 ESB 可能会使它们的性能、可伸缩性和可靠性这些特性显著不同,同时 ESB 功能和所支持的开放标准也会有所不同。由于这些原因,再加上最近制订和正在兴起的一些相关标准,当今实现 ESB 的许多关键决策都涉及到成熟的专有技术和不成熟的开放标准之间的权衡。
    支持 SOA 的最低功能的 ESB 实现
    如果在前面确定的功能中只有一部分和大多数 SOA 场景相关,我们可能会问:实现 ESB 所需的一组最低功能由什么构成?为此,考虑最被普遍认同的 ESB 定义的原理:
    ESB 是一种逻辑体系结构组件,它提供与 SOA 的原则保持一致的集成基础架构。
    SOA 原则需要使用与实现无关的的接口、强调位置透明性和可互操作性的通信协议、相对粗粒度和封装可重用功能的服务定义。
    ESB 可以作为分布式的异构基础架构进行实现。
    ESB 提供了管理服务基础架构的方法和在分布式异构环境中进行操作的功能。
    重点内容最低的 ESB 功能
    这里写图片描述
    请注意这些最低功能并不需要使用特别的技术,比如 EAI 中间件、Web 服务、J2EE 或 XML。这些技术的使用非常接近也非常符合需求,但是不必强制要求使用它们。相反,最低功能几乎只需简单地使用 SOAP/HTTP 和 WSDL 就可以实现(当然不是所有的情况都这样):

    URL 寻址和现有的 HTTP 和 DNS 基础架构提供了一个具有路由服务和位置透明性的“总线(bus)”。
    SOAP/HTTP 支持请求-响应(Request-Response)通信规范。
    HTTP 传输协议被广泛地使用。
    SOAP 和 WSDL 是开放、与实现无关的服务通信和连接模型。

    然而,这些 SOAP/HTTP 和 WSDL 的基本应用只是点到点(point-to-point)的集成,并不能实现一些 ESB 需要的关键功能:
    目前还没有用于控制服务寻址和命名的管理功能。服务名称通过每个适配器单独进行控制的,服务路由控制则分散在由服务客户端调用的地址、HTTP 基础架构和分配给适配器的服务名称之间。
    虽然这种方法依赖于实现细节,但是它往往并不能使服务实现的替代变得简单;服务请求者代码(也可能是开发工具生成的)通常通过特定地址 的特定协议直接绑定到具体的服务提供者实现。如果想要用另一个服务实现来替代原来的服务实现,就需要修改应用程序代码并重新部署这些代码。

    当然,在许多甚至是大多数情形中往往需要其他的功能,并且这种需要变得越来越常见。特别地,不管是现在还是以后,下面的需求类型可能会导致更复杂高级的技术的使用:

    服务质量和服务级别功能。
    高级 SOA 概念,例如服务编排、目录、转换等等。
    按需操作环境需求,比如管理与自治功能以及基础架构智能功能。
    跨越具有不同所有权的多个网络、多个协议以及多个域的真正意义上的异步操作。

    展开全文
  • SOA的提出在很大程度上就是为了更好的满足企业应用集成的需求。SOA强调复用和松耦合,注重...SOA的体系结构一般来说也需要企业服务总线(ESB)的支撑,只是它对总线上的服务和总线本身的作用和位置有着更加明确的要求。
  • 本文内容包括:引言调用服务其他集成功能开发企业服务总线结束语参考资料本文不仅仅是为架构师准备:使用企业服务总线(EnterpriseServiceBus),作为支持面向服务体系结构(SOA)基础架构,也将使开发人员能够...
  • ESB是一种在松散耦合的服务和应用之间标准集成方式。它可以作用于: 面向服务的架构 -分布式应用由可重用的服务组成 面向消息架构 - 应用之间通过ESB发送和接受消息 事件驱动架构 - 应用之间异步地产生和...
  • 企业服务总线需求说明Enterprzise Service Bus1 ESB主要作用ESB主要解决多终端多系统相互调用问题,减少系统间耦合,添加可维护性。2 ESB核心需求ESB核心需求主要包含:服务调用与数据适配、服务管理...
    

    企业服务总线需求说明

     

    Enterprzise Service Bus

     

    1     ESB的主要作用

    ESB主要解决多终端多系统的相互调用问题,减少系统间的耦合,添加可维护性。



     2     ESB的核心需求

    ESB的核心需求主要包含:服务调用与数据适配、服务管理、服务安全、服务监控。


    2.1   服务调用与数据适配

    支持系统间进行相互的服务调用,系统间数据协议相互独立,通过数据适配可以自己主动实现系统数据与ESB数据的传输转换。


    支持单个服务调用,多个服务调用(组合服务)。

     

    2.2   服务管理

    服务管理功能主要包含服务的注冊、公布、版本号控制以及服务信息维护功能。

     

    2.2.1      服务的属性

    业务系统须要与其他业务系统之间进行通信,是由ESB平台提供数据接口的服务进行,业务系统在使用服务前必需先进行注冊,并在服务有效的期间内。

     

    2.2.2      服务的注冊

    服务必须先进行注冊,记录具体的服务描写叙述信息。

    服务由管理员来统一行进管理,服务的功能包含:添加(注冊)、改动(变更)、删除、查询。

     

    2.2.3      服务的公布

    服务公布后,才干提供外部訪问。

    服务通过改动状态来控制是否向外公布,也可以通过时间段来控制服务的有效期(这个主要是针对平台向第三方公司提供的时效性服务)。

     

    2.2.4      服务的版本号控制

    支撑同一服务多个版本号的控制。

    因为业务系统的开发进度等其他原因,引起数据接口版本号的数据结构或数据源不同。服务的版本号由管理员来控制,版本号名称由管理员填写。服务版本号的功能包含:添加、改动、删除、查询。

     

    2.2.5      服务规则缓存

    为了提高效率,系统初始化时,从数据库的路由关系表中读取路由关系,通过定义缓存接口,将其路由关系表信息写入缓存。

    当路由关系表信息发生变更时,手动发送路由关系改动通知到改动缓存,又一次读取服务路由关系表,刷新服务路由缓存。

     

    2.2.6      服务信息管理

    服务信息的增删改查功能,以及信息的分类管理。

    服务信息分类可以是多级分类结构的树结构

     

    2.3   协议转换

    不同业务系统之间的数据交换

    不同的业务系统可能会使用不同的协议传递消息,ESB平台提供不同的接口类型以适应不同的入口协议或者出口,协议的转换在ESB平台的内部封装完毕,不须要在接入系统做相关改动。该平台支持的协议包含WEBSERVICE和HTTP等。

     

    2.4   服务安全

    建立良好的安全訪问机制,保证安全性。

    以服务数据作为基础,构成业务系统须要的数据流转的路由,每一个请求都须要推断所发起的请求是否合法,否则应反应相应的代码说明“不同意该请求”。请求接入时要进行的验证。

    2.4.1      验证流程

    訪问授权验证

    对URL參数_lic值的有效性进行验证,_lic和_valid通常是服务处理方提供,相当于用户/password的概念。

    參数签名验证

    对URL參数_sign值的有效性进行验证,用于防止传输数据过程中被拦截篡改,_sign的生成校验规则请看下文。

    业务数据验证

    具体业务级别的验证,依据服务路由规则的配置,读取服务路由关系验证。

     

     

     

    2.5   服务监控   

    监控每一个服务的调用,写入监控日志,为日后的服务调用分析、调优提供数据支撑。

    服务监控,记录每一个服务调度的过程相关的属性,以便以后系统的升级改造等提供可靠性的数据分析。

     

    2.6   非功能性需求

    2.6.1      高性能

    支撑高訪问量的服务调用请求,建立良好的缓存、队列等机制。

    组建一个高性能的数据调度的队列。定义作业调度的队列(List<WorkItem>),利用新线程进行分配资源,避免堵死。可视化设置队列的等待的数量或等待数量自己主动适应(Auto)。

    针对不同服务的业务要求,设置是否须要开启缓存,并统一设置缓存的生命周期。依据服务的调度的条件,把须要缓存的数据记录到一个缓存组(以Hastable)类型存储,有利于提高索引的效率。

     

    2.6.2      扩展性

    具有良好的扩展性,为日后的功能控制提供支撑。

     

    2.6.3      可维护性

    提供可视化、灵活的配置管理。

    建立ESB控制台,用来控制整个ESB平台的功能。

     

    3     通用功能组件

    ESB的为服务的统一接入提供良好的支撑,在ESB上进一步接入封装很多其他的通用功能组件可以较少各个系统反复功能的开发。

    3.1   系统日志

    3.1.1      业务日志

    支持多系统、多业务,依据数据量自己主动进行数据分表(按月或按日)。

     

    3.1.2      维护日志

    支持多系统、多业务,依据数据量自己主动进行数据分表(按月)。

     

    3.1.3      错误日志

    支持多系统错误日志记录,依据数据量自己主动进行数据分表(按年)。

     

    3.2   计划任务

    支持由周期时间或指定时间驱动的任务调度,触发运行指定的服务。

    支持计划任务的可视化配置管理。

    比如:

    每月1号调用报表服务生成月运营报表。

     

    3.3   事件管理

    可以监控制定的业务类型,当发生业务时自己主动产生相应的事件,触发指定的服务。

    支持事件的可视化配置管理。

    比如:

    当新审核通过一个学生档案的新增时,自己主动调用服务,为该学生开通会员账号。

    收集各业务系统的业务需求,确定须要制订的事件,兴许再进行事件开发。事件以组件方面作为功能扩展实现,并添加事件触发时所产生的过程记录,方便以后统计管理及问题排查。

    转载于:https://www.cnblogs.com/mfrbuaa/p/3866071.html

    展开全文
  • 在应用集成项目中,经常会遇到多个集成应用之间交易数据一致性问题,虽然很多成熟应用集成产品都会提供分布式事务和重试功能,但是这些功能往往在实际应用中作用不是很大。主要因为:1.大多数集成接口使用...

    在应用集成项目中,经常会遇到多个集成应用之间的交易数据一致性的问题,虽然很多成熟的应用集成产品都会提供分布式事务和重试的功能,但是这些功能往往在实际的应用中作用不是很大。主要因为:1.大多数集成接口使用的是基于HTTP的传输协议(Web Service、REST等),而分布式事务通常只能支持诸如JDBC,EJB,JMS之类的协议;2、大多数集成服务之间的调用异常或是因为网络原因、或是因为数据原因都不可能很快自动恢复,而集成产品所提供的重试一般都是在短时间内的重试,比如30秒重试一次,重试3次等,在很多情况下无法满足需要。

    为了保证集成项目实际应用中的交易数据一致性,我们需要在项目实施的过程中构建自己的服务补偿机制。根据需求的不同,服务补偿集成有如下两种:

    • 实时冲正

    对业务一致性要求高的集成业务,如果其采用的集成接口不支持分布式事务(基于HTTP的接口),需要采用实时冲正进行服务的补偿


    • 需要进行冲正补偿的系统服务,必须提供两个服务接口
      • 正常服务接口:用来进行正常的业务服务调用
      • 冲正服务接口:当集成过程中出现错误,调用冲正服务接口进行回滚操作

    • 冲正服务在服务的异常处理分支进行调用,目标系统需要实现业务去重的操

    • 批量补偿

    对业务一致性要求不是很高(主要是时效性)的业务场景,可采用补偿流程定时重发的方式进行服务补偿

    • 集成的过程中出现异常,将集成数据存储到冲正日志
    • 补偿流程定时检查冲正日志,发现需要冲正的数据,自动重新调用目标服务进行服务重做进行服务补偿
    • 如果多次调用补偿失败,可转为手工补偿

     

    欢迎关注我的微信公众号

    展开全文
  • 最后 , 基于一个简单场景中 , 我们将对提出体系结构实现细节 , 使用Azure 服务总线消息收发系统。最终交易 VS如名称所暗示 , 最终浓度约为一致 , 最终。在此上下文中意味着最终在稍...

    我打算通过本文说明什么最终一致性是 , 如何比较事务一致性并且 , 当考虑以这种方法设计的解决方案。接下来我们来看看事件域看他们的作用是 , 在最终的一致性 , 并且还关系到我们如何识别这些语言的域名。最后 , 基于一个简单的场景中 , 我们将对提出的体系结构的实现细节 , 使用Azure 服务总线消息收发系统。

    最终交易 VS

    如名称所暗示 , 最终浓度约为一致 , 最终。在此上下文中意味着最终在稍后的时间。为了理解该概念 , 当比较事务一致性 , 如在以下附图中 :

    Transactional_Consistency

    第 1 图的工作 , 也许是大多数开发人员非常熟悉。客户端 ( UI 、 API 等) 执行的指令的系统的内部运行的所有必要操作 , 以保持一致性域 (a 和 b) 在这里交易。当客户端获得响应的 :

    • 两个操作 A 和操作 B 成功
    • 两个操作 A 和操作 B 失效了

    因此 , 当在工作流事务一致性的客户得到一个 “成功” 响应 , 保证了所有必要的操作来维持该一致性域已经成功执行。

    必须注意 , 一致性域的 “边界” 并不是一个解决方案 , 而是业务组件边界定义。报告列举了 “空间” 内的域的集合的业务规则需要强制施行。

    Eventual_Consistency

    在第二工作流程的图形可能更熟悉分布式系统的开发人员。再次 , 该客户机执行一个命令的系统 , 但是这个时间只有一部分的所有必要操作的一致性维护运行 (此处内部) 的交易。现在 , 当客户端获得响应 , 则表示以下之一是真实 :

    • 操作 A 和操作 B 已经成功运行计划在稍后时间 , 即最终
    • 操作 A 和操作 B 失败 (will not run)

    换句话说 , 当在最终一致性的工作流客户端得到一个 “成功” 响应 , 它只保证一部分必要的操作以便维持该一致性域已被成功执行和调度运行。

    可以说 , 通过比较事务一致性似乎更直接和更容易实现。事实上 , 在大多数嵌入式架构。那么 , 为什么当选择与最终一致性 ?

    再看第一个图 , 以下场景 :

    1. 作业 B 需要很长的时间来执行。
      例如 , 设定大的值的计算、报告生成等。您不想让您网站的用户等待几秒钟 , 直到它们获得确认的帐 , 所以你需要这些长时间运行的操作 , 以在背景中运行 , 从而优化了前端性能。
    2. B 操作在本质上是异步的 , 即取决于异步机制。
      例如 , 发送电子邮件。这是一个经典的例子。你将很快向用户通知电子邮件将被发送 “不久” 为什么不在这一单独的组件 , 其还可以被重用和独立缩放。
    3. 执行操作 B 的不同骨料但不是在相同的操作限界上下文
      我稍后将讨论这种情形 , 并同时实现细节。
    4. 执行操作 B 的不同限界上下文比操作的。
      基于 Martin Fowler 的示例 , 考虑更新客户的名称销售的上下文将需要升级的客户的支持。这种事不会发生在相同的时间 , 但最终有界上下文将被同步 , 从而实现时域一致性。

    在现实应用场景 # 1 将几乎肯定会迫使你朝最终一致性在一些点由于性能要求。甚至场景 # 2 中 , 虽然这无可厚非是设计决策的部分 , 因为该响应时间的异步 API 发起机制可以为快速路由所述交易。

    原因 # 3 和 # 4 的 DDD 和特定主要做出明智的设计决策。你很可能会将该交易路由到最终一致性选择在这些情景下更好的设计。这促进了较小的、低耦合的组件执行特定操作。这将使该系统更容易扩展 , 而将默认地导致改进的性能的应用程序的增长 , 因为你很可能遇到场景 # 1 。

    事件域

    第二个图又不清楚如何操作 “B” 被触发。事件域在这里诞生。考虑更新的示意图 :

    EventualConsistency_with_DomainEvents2_
     
    这提供了更多细节 , 我们可以看到一个事件是 , 域出版后不久 , “操作” 的运行 , 同时技术的边界内 , 用一交易。域的事件随后被保存在队列中消耗的过程 , 以后谁还需要它来运行操作 “B” 。

    因此 , 一个域事件中的作用是便于最终一致性。在这方面 , 他们作为触发器的域信息和容器。

    事件域没有正式定义为 DDD 模式 (Eric Evans本书首次发布。概念引入后来并定义为 :

    事件域是一个正式的域模型的表示的道理。域活动 , 而忽略无关作出明确的事件 , 来自不同领域的专家要跟踪或通知的、或相关联的状态改变对象的其他模型。

    事件域也可以用于重建的特定状态 ;事件来源采用。在这个特定的上下文命令将创建的每个客户事件域。

    但这并不适合于 “传统” 方法 , 只有 ‘有意义’ 事件是捕获域的域模型。那么 , 什么是 “有意义” ?一个想法是 : “如果我忽略了的事件 X 发生的业务域的规则仍然是一致的吗 ?” 如果答案是不 , 那么你可能需要它来作为事件域模型。

    作为提示 , 在试图识别域事件 ,Vernon Vaughn还建议关注以下关键词组为业务专家谈话时 :

    • “当。 … …”
    • “如果出现这种情况”
    • “告诉我。 … …” 和 “通知我。 … …”
    • “发生”

    我还应提及 , 您可以使用事件域同步更新域内部的对象相同。我个人觉得这是一个稍微过度工程化实现事务一致性 , 但它是一种有效的方法。

    另一方法是使用同步事件来更新另一个域内) 的相同事务处理。从技术上讲这是错误的 , 因为它违背了聚集体每交易单DDD 规则 , 但是它可以应用在当没有可用的消息接发机制。这样做仍然可以获得一些益处的解耦设计的聚集体 , 但是不更新之间的性能和扩展性优势。

    在以下实现中焦点将放在 “最终一致性” 情景 , 完全利用域事件所带来的好处。

    提出的设计

    让我们考虑下面的方法的简化版本 (Vernon Vaughn 的办法在他的畅销书执行 DDD) :Architecture
    首先 , 一个域内聚集事件边界。本实体负责的领域或服务。

    然后 , 将新近创建的域是通过发布事件事件发布者。当这发生时 , 现有注册用户 , 在这种情况下 ,事件存储订户将接收的事件 , 并将其保存。事情至此内发生的相同交易。

    接下来的事件代理(后台处理) 将顺序读取事件并且将它们转发到专用消息队列。

    最后各种事件使用者(后台处理) 将读取的专用队列、串行化并运行必要的操作来实现时域一致性。

    有几个重要的事情要提一下这种设计方案。首先 , 你可能发现该事件不直接进入消息队列但被首先存储在模型存储队列作为用于事件转发器。这种情况的原因是基于这样的假设 , 该模型存储在消息队列并不共享同一交易 , 当真正使用 SQL Azure 服务总线和数据库消息传递系统。没有这个中间步骤 , 我们最终会在位置成功地提交事务 , 并且然后存储在模型中不能保存在事件域中的消息队列 , 这将留给我们一个不一致事件 , 因为模型中丢失。

    同样的事情不会发生的事件和消息队列的货代吗 ?技术上是的 , 但是现在我们更好地处理这样的情形。因为域是存储事件如今我们可以尝试着给它的消息队列 , 并且仅如果成功我们会将该事件标记为 “已转发” 中存储的模型。

    如果该事件被转发到消息队列中存储更新的模型 , 但是失败了怎么办 ?在此场景中 , 我们可以依靠两个东西 :

    1. 消息De - duplication。消息传送系统比如兔或 MQ Azure 服务总线提供此功能。自动去除重复消息基于唯一消息标识符的定制。
    2. 幂次事件域。这意味着 , 如果在相同域中只出现一次的事件 , 该事件后续将不改变的状态设定由第一域中发生。事件域具有幂等理想的场景 , 但它需要额外的工作 , 有时还可导致增加的复杂性和性能下降。因此 , 仔细选择消息传送系统 , 该系统支持去重总是一个好主意。

    实现

    现在 , 我们概念上澄清事情将如何工作的代码。在这里可以找到完整的解决方案 :https://github.com/florindpreda/eventualconsistency.domainevents.azureservicebus

    场景选择说明了最终一致性的使用是相同的 , 当我说工作单元图案。事实上 , 我还会重用相同的工作单元中执行的事务的一部分。

    快速重新迭代 , 我们有两个类别 : 产品和产品评论。— — 删除了相关产品的产品评论需要被删除的删除 (这里为 “软逻辑” 或 “删除” , 而不是物理地移除数据库中的记录所使用的 , “标记” 更新的 isDeleted 代替) 。

    首先我们说domainevent类别 :

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public abstract class DomainEvent
    {
        public DateTime OcurrendOn { get; protected set; }
     
        public DomainEvent()
        {
            this.OcurrendOn = DateTime.UtcNow;
        }
    }

    这是所有将来的事件。这是个好主意 , 所有事件具有时间戳的发生 , 主要用于伐木和调试方案。

    接下来我们具体productdeleted事件 :

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public class ProductDeleted : DomainEvent
    {
        public Guid ProductId { get; protected set; }      
     
        public ProductDeleted(Guid productId)
        {
            this.ProductId = productId;        
        }
    }

    域事件应当仅承载所需的最少量信息的消费者的最佳性能。例如 , 如果事件需要五个消费者 ID 以便运行 , 就足以找到其他 4 , 但它需要复杂和耗时的查询 , 那么最好还是包括所有五个主体中的事件 ID 的事件时容易获得。在这种情况下 , 为了删除所有商品评论所有我们需要的是ProductID。

    如所讨论的 , 该事件将被创建并发布来自产品集合 :

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    public class Product : Entity
    {      
        public string Name { get; private set; }
     
        protected Product() {}
     
        public Product(string name)
        {      
            this.Name = name;
        }
     
        public override void Delete()
        {
            base.Delete();
            DomainEvents.Publisher.Publish<ProductDeleted>(new ProductDeleted(this.Id));         
        }
    }

    让我们看看如何像出版商 :

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    public class DomainEventPublisher
    {
        private readonly IDictionary<Type, IList<IDomainEventSubscriber>> _subscribers = new Dictionary<Type, IList<IDomainEventSubscriber>>();
     
        public void Publish<T>(T domainEvent) where T : DomainEvent
        {          
            var eventSubscribers = _subscribers.SelectMany(s => s.Value)
                                                .Where(sb => sb.SubscribedToEventType() == domainEvent.GetType()
                                                            || sb.SubscribedToEventType() == typeof(DomainEvent)
                                                        );
     
            foreach(var eventSubscriber in eventSubscribers)
            {
                eventSubscriber.Handle(domainEvent);
            }
        }      
     
        public void Subscribe<TEvent>(Action<DomainEvent> handle) where TEvent : DomainEvent
        {
            var subscriber = new DomainEventSubscriber(handle, typeof(TEvent));
            Subscribe(subscriber);
        }
     
        public void Subscribe(IDomainEventSubscriber domainEventSubscriber)
        {
            var eventType = domainEventSubscriber.SubscribedToEventType();         
            if (_subscribers.ContainsKey(eventType))
            {
                _subscribers[eventType].Add(domainEventSubscriber);
            }
            else
            {
                _subscribers[eventType] = new List<IDomainEventSubscriber>();
                _subscribers[eventType].Add(domainEventSubscriber);
            }
        }      
    }

    首先需要注意的是 , 出版商的订户列表分组的事件的类型是 “听” 到。当一个事件被公布 , 所有注册用户 , 并执行各自的处理。订户可以登记具体事件 , 比如productdeleted或者 , 所有的事件。在eventstoringsubscriber我们希望是后者 , 所有事件被存储用于将来转发 :

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    public class EventStoringSubscriber : IEventStoringSubscriber
    {      
        private readonly IStoredEventRepository _storedEventRepository;
        private readonly IEventSerializer _eventSerializer;
     
        public EventStoringSubscriber(IStoredEventRepository storedEventRepository, IEventSerializer eventSerializer)
        {          
            _storedEventRepository = storedEventRepository;
            _eventSerializer = eventSerializer;
        }
     
        public void Handle(DomainEvent domainEvent)
        {
            var serializedBody = _eventSerializer.Serialize(domainEvent);
            var storedEvent = new StoredEvent(domainEvent.GetType().ToString(), domainEvent.OcurrendOn, serializedBody);
            _storedEventRepository.Add(storedEvent);
        }
     
        public Type SubscribedToEventType()
        {
            return typeof(DomainEvent);
        }
    }

    在eventstoringsubscriberJSON 序列化的事件 ( 在这里 ) , 创造了storedevent然后将其保持。再次 , 重要的是要提到 ,storedeventrepository操作在相同事务下运行范围的存储库删除时涉及的产品。在storedevent域模型是一个类提供公共接口 , 用于存储所有事件以均匀的方式 :

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    public class StoredEvent : Entity
    {
        public string TypeName { get; private set; }
        public DateTime OccurredOn { get; private set; }
        public string SerializedBody { get; private set; }
        public bool IsForwarded { get; private set; }
     
        protected StoredEvent() {}
     
        public StoredEvent(string typeName, DateTime occurredOn, string serializedBody)
        {
            TypeName = typeName;
            OccurredOn = occurredOn;
            SerializedBody = serializedBody;
        }
     
        public void MarkAsForwarded()
        {
            IsForwarded = true;
        }
    }

    The last piece of the information of the publishing part is to see how and when are the subscribers registered. First it’s important to note that the DomainEventPublisher is a ‘singleton per request’, meaning we’ll use the same instance during the duration of each request. It’s the same approach used for the Unit of Work. This is the implementation using Unity IoC:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    if (HttpContext.Current != null)
    {
        container.RegisterType<IDatabaseContext, DatabaseContext>(new PerHttpRequestLifetimeManager());
        container.RegisterType<DomainEventPublisher, DomainEventPublisher>(new PerHttpRequestLifetimeManager());
    }
    else
    {              
        container.RegisterType<IDatabaseContext, DatabaseContext>(new ContainerControlledLifetimeManager());
        container.RegisterType<DomainEventPublisher, DomainEventPublisher>(new ContainerControlledLifetimeManager());
    }

    As we’ll get a new publisher for each request, we will also have to register the subscribers at the beginning of each request:

    1
    2
    3
    4
    protected void Application_BeginRequest()
    {
        UnityConfig.RegisterEventsSubscribers();
    }

    The code above is part of Global.asax and it calls the following method which is part of the IoC configuration:

    1
    2
    3
    4
    5
    6
    7
    public static void RegisterEventsSubscribers()
    {
        _container.Resolve<DomainEvents>();
     
        var eventStoringSubscriber = _container.Resolve<IEventStoringSubscriber>();          
        DomainEvents.Publisher.Subscribe(eventStoringSubscriber);
    }

    The DomainEvents is just a wrapper exposing the publisher via a static property. It’s implemented this way so it can be easily used inside entities without explicitly coupling them with an IDomainEventsPublisher interface:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    public class DomainEvents
    {
        private static DomainEventPublisher _publisher;
     
        public static DomainEventPublisher Publisher
        {
            get
            {
                if (_publisher == null)
                {
                    throw new Exception("Publisher is not initialized");
                }
                return _publisher;
            }
        }
     
        public DomainEvents(DomainEventPublisher publisher)
        {
            _publisher = publisher;
        }
    }

    This covers the publishing part. At this point the ProductDeleted event is stored in the storedeventsSQL 表。

    接下来 , 在 “事件背景货运工人需要推动它在 Azure 服务总线队列。在eventforwarderservice阶级 :

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    public class EventForwarderService : IEventForwarderService
    {      
        private readonly IDictionary<string, string> _eventTypeQueueMapping = new Dictionary<string, string>()
        {
            { typeof(ProductDeleted).ToString(), "ProductDeletedQueue" },
            { typeof(PlaceholderEvent).ToString(), "PlaceholderQueue" }
        };
     
        private readonly IUnitOfWork _unitOfWork;
        private readonly IStoredEventRepository _storedEventRepository;
        private readonly IMessagingService _messagingService;
     
        public EventForwarderService(IUnitOfWork unitOfWork, IStoredEventRepository storedEventRepository, IMessagingService messagingService)
        {
            _unitOfWork = unitOfWork;
            _storedEventRepository = storedEventRepository;
            _messagingService = messagingService;
        }
     
        public void ForwardEvents()
        {
            using(_unitOfWork)
            {
                var newEvents = _storedEventRepository.GetNewEvents().ToList();
                 
                foreach(StoredEvent storedEvent in newEvents)
                {
                    var queueName = this.GetAssociatedQueueName(storedEvent.TypeName);
                    _messagingService.Send(storedEvent, queueName);
     
                    storedEvent.MarkAsForwarded();
                    _storedEventRepository.Update(storedEvent);
     
                    _unitOfWork.Commit();
                }              
            }
        }
     
        private string GetAssociatedQueueName(string eventType)
        {
            var queueName = string.Empty;
     
            try
            {
                queueName = _eventTypeQueueMapping[eventType];
            }
            catch(KeyNotFoundException ex)
            {
                throw new ArgumentOutOfRangeException(string.Format("No mapping defined for event: {0}", eventType), ex);
            }
     
            return queueName;
        }
    }

     

    因为上面的类封装了所有的 “转发” 逻辑可以被容易地再利用 , 不管什么类型的服务的事件背景是 “转发器” 。我们可以毫不费力的 Azure Web 任务之间进行切换 , 工人角色或 Windows 服务。在imessagingservice接口实现azureservicebusqueuemessagingservice类别 :

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    public class AzureServiceBusQueueMessagingService : IMessagingService
    {      
        private readonly NamespaceManager _namespaceManager;
        private readonly string _connectionString;
     
        private readonly IEventSerializer _eventSerializer;
     
        public AzureServiceBusQueueMessagingService(IEventSerializer eventSerializer)
        {
            _connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
            _namespaceManager = NamespaceManager.CreateFromConnectionString(_connectionString);        
            _eventSerializer = eventSerializer;
        }
     
        private void InitQueue(string queueName)
        {
            if(string.IsNullOrWhiteSpace(queueName))
            {
                throw new ArgumentNullException("Queue name is empty.");
            }
     
            //configure queue settings
            var queueDescription = new QueueDescription(queueName);
            queueDescription.RequiresDuplicateDetection = true;
            queueDescription.DuplicateDetectionHistoryTimeWindow = TimeSpan.FromDays(7);
            queueDescription.LockDuration = TimeSpan.FromMinutes(5);
            queueDescription.EnableDeadLetteringOnMessageExpiration = true;
     
            //create queue if not exists
            if (!_namespaceManager.QueueExists(queueName))
            {
                _namespaceManager.CreateQueue(queueDescription);
            }
        }
     
        public void Send(StoredEvent storedEvent, string queueName)
        {
            this.InitQueue(queueName);
     
            var client = QueueClient.CreateFromConnectionString(_connectionString, queueName);         
            var brokeredMessage = this.CreateBrokeredMessage(storedEvent);
     
            client.Send(brokeredMessage);
     
            client.Close();
        }
     
        private BrokeredMessage CreateBrokeredMessage(StoredEvent storedEvent)
        {
            var brokeredMessage = new BrokeredMessage(storedEvent.SerializedBody);
            brokeredMessage.MessageId = storedEvent.Id.ToString();
     
            return brokeredMessage;
        }
     
        ...
    }

     

    一个值得注意的initqueue方法是 , 我们设定requiresduplicatedetection标志为真 , 并且还在duplicatedetectionhistorytimewindow。前者允许重复检测 , 后者是多久 Azure 服务总线将存储MessageID所存储信息。我在这里设置为 7 天 , 但这个时间窗口 , 应谨慎考虑 , 因为它需要额外的队列空间。

    拼图的最后一块是消费的 “事件” 。就像在 “事件背景是货运代理人这一过程 , 但不同的是 , 我们可以有多个消费者 (每一个事件类型) 。在productdeletedeventconsumer仅接收productdeleted事件 :

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    public class ProductDeletedEventConsumer : IProductDeletedEventConsumer
    {
        private readonly string QUEUE_NAME = "ProductDeletedQueue";
        private readonly IMessagingService _messagingService;
        private readonly IUnitOfWork _unitOfWork;
        private readonly IProductReviewRepository _productReviewRepository;
     
        public ProductDeletedEventConsumer(IMessagingService messagingService, IUnitOfWork unitOfWork, IProductReviewRepository productReviewRepository)
        {
            _messagingService = messagingService;
            _unitOfWork = unitOfWork;
            _productReviewRepository = productReviewRepository;
        }
     
        public void ProcessNextEvent()
        {
            _messagingService.ProcessNextEvent<ProductDeleted>(pd => Process(pd), QUEUE_NAME);
        }
     
        private void Process(ProductDeleted productDeleted)
        {
            using(_unitOfWork)
            {
                var productReviews = _productReviewRepository.GetByProductId(productDeleted.ProductId);
                foreach (var productReview in productReviews)
                {
                    productReview.Delete();
                    _productReviewRepository.Update(productReview);
                }
     
                _unitOfWork.Commit();
            }
        }
    }

    每一个新时间productdeleted发生时 ,productdeletedeventconsumer该过程将通过删除所有相关评论。在processnextevent方法azureservicebusqueuemessagingservice如下所示 :

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    ...continued
     
    public void ProcessNextEvent<TEvent>(Action<TEvent> handle, string queueName) where TEvent : DomainEvent
    {
        this.InitQueue(queueName);
     
        var client = QueueClient.CreateFromConnectionString(_connectionString, queueName);
     
        var brokeredMessage = client.Receive(TimeSpan.FromSeconds(5));
     
        if (brokeredMessage != null)
        {
            Process<TEvent>(handle, brokeredMessage);
        }
    }
     
    private void Process<TEvent>(Action<TEvent> handle, BrokeredMessage brokeredMessage) where TEvent : DomainEvent
    {
        var jsonEvent = brokeredMessage.GetBody<string>();
        var productDeletedEvent = _eventSerializer.Deserialize<TEvent>(jsonEvent);
     
        handle(productDeletedEvent);
     
        try
        {              
            brokeredMessage.Complete();
        }
        catch (Exception ex)
        {
            //do something else, e.g log
            brokeredMessage.DeadLetter();//move to dead letter queue to inspect later  
        }
    }

     

    运行它

    1. 到了Azure 门户并创建一个新的 Azure 服务总线命名空间。您可以创建一个免费帐户吧 , 如果你还没有的话 ) 。
    2. 溶液从 GitHub 来下载 :https://github.com/florindpreda/eventualconsistency.domainevents.azureservicebus
    3. 更新所述服务总线连接的字符串在 app. configevcosample.eventforwarderworker和evcosample.productdeletedconsumerworker
    4. 运行 Web 项目evcosample.api( 这将创建一个数据库 LocalDB 和种子) 。
    5. 同时运行的后台进程evcosample.eventforwarderworker和evcosample.productdeletedconsumerworker。
    6. 发送 HTTP DELETE 请求productcontroller.deleteWeb API 方法传递 (ProductID) 被删除。
    7. 检查产品,productreviews和storedeventsSQL 表是否productdeleted事件已经被成功处理。

     

    结论

    总之 , 我们已经看到 :

    • “最终一致性” 方法是一种设计 , 可以提高性能和可扩展性的应用的执行的某些操作推迟到稍后的时间。
    • 在事件域 DDD 的战术元素和 “最终一致性” 的主持人也作为触发事件信息和容器的消费者域。
    • 一般的模式实现 “最终一致性” 是使用消息传递系统像 Azure 服务总线存储序列化事件域
    • 当存储模型和消息传递系统不共享同一事务范围 , 事件应当被初始地保存在模型存储并且然后转发到消息收发系统。这确保不会丢失域事件何时出现故障。
    • 为了避免相同事件域处理一次以上 , 使去重复的消息传递系统的支持 , 并尝试设计等领域。

    点关注,不迷路

    文章每周持续更新,可以微信搜索「 十分钟学编程 」第一时间阅读和催更,如果这个文章写得还不错,觉得有点东西的话 ~求点赞👍 求关注❤️ 求分享❤️ 
    各位的支持和认可,就是我创作的最大动力,我们下篇文章见!

    展开全文
  • 改进应用之间集成(企业服务总线ESB)已成为企业IT重要方向 派拉ESB模块划分 管理平台 监控平台 运行平台 派拉基于ESB实施整体优势 行业需求深刻理解 优秀服务团队与方法论 成熟软件架构 ...
  • ESB是企业服务总线(Enterprise Service Bus)缩写,是中间件技术与Web Service等技术结合产物,也是SOA系统中核心基础设施。ESB就是一个服务中介,形成服务使用者-&gt;ESB服务Proxy-&gt;服务提供者...
  • ESB是企业服务总线(Enterprise Service Bus)缩写,是中间件技术与Web Service等技术结合产物,也是SOA系统中核心基础设施。ESB就是一个服务中介,形成服务使用者-&gt;ESB服务Proxy-&gt;服务提供者...
  • ESB是企业服务总线(Enterprise Service Bus)缩写,是中间件技术与Web Service等技术结合产物,也是SOA系统中核心基础设施。ESB就是一个服务中介,形成服务使用者-&gt;ESB服务Proxy-&gt;服务提供者...
  • 企业服务总线ESB之交易流水

    千次阅读 2014-09-22 17:07:25
    流水信息的记录是企业服务总线中非常重要的一个环节,其有两个核心的作用: 交易过程中问题的排查和跟踪; 冲正交易的执行;冲正是银行里特定的词汇。实际就是如何保证分布式事物的一致性。 这里面...
  • https://www.cnblogs.com/renzhitian/p/6853289.html 数据总线是起到调度服务的作用,数据总线不是集成服务,数据总线更新一个调度框架,每个服务需要根据约定向数据总线注册服务,那么如何注册那?其实数据总线就...

空空如也

空空如也

1 2 3 4 5 ... 17
收藏数 330
精华内容 132
关键字:

服务总线的作用