精华内容
下载资源
问答
  • 消息总线java实现

    2016-06-27 11:28:39
    软件体系结构作业 消息总线实现代码
  • SpringCloud——消息总线(Bus)之Spring Cloud Bus将分布式系统的节点与轻量级消息代理链接。
  • 消息总线是一种通信工具,可以在机器之间互相传输消息、文件等,这篇文章主要介绍了SpringCloud Bus消息总线的实现,Spring cloud bus 通过轻量消息代理连接各个分布的节点,小编觉得挺不错的,现在分享给大家,也给...
  • LiveEventBus是一款Android消息总线,基于LiveData,具有生命周期感知能力,支持Sticky,支持AndroidX,支持跨进程,支持跨APP 为什么要用LiveEventBus 生命周期感知 消息随时订阅,自动取消订阅 告别消息总线造成...
  • [角度消息总线] 该项目为 AngularJS 应用程序定义了一个消息总线。 通常的用例是在几个 AngularJS 控制器之间分派事件。 例如:您可以向总线发布一个事件,并且每个对此感兴趣的控制器都会收到通知,并且还可以...
  • 消息总线

    千次阅读 2018-04-24 15:29:17
    SpringCloudBus:事件、消息总线,用于在集群(例如,配置变化事件)中传播状态变化,可与Spring Cloud Config联合实现热部署。 在上一篇写出了springcloud对微服务的集中配置,那么就出现了一个问题,如果修改配置...

    SpringCloudBus:事件、消息总线,用于在集群(例如,配置变化事件)中传播状态变化,可与Spring Cloud Config联合实现热部署。

    在上一篇写出了springcloud对微服务的集中配置,那么就出现了一个问题,如果修改配置了怎么实现不需重启服务来实现配置的更新,下面有集中解决方法。

    1.使用/refresh手动刷新配置

    缺点:单点刷新,如果集群服务多的话,无论是工作量还是维护上都十分麻烦。

    使用上一篇的config-client服务,加入依赖,

    spring-boot-starter-actuator
    

    pom文件如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    
    	<groupId>com.dalaoyang</groupId>
    	<artifactId>springcloud_config_client</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<packaging>jar</packaging>
    
    	<name>springcloud_config_client</name>
    	<description>springcloud_config_client</description>
    
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>1.5.9.RELEASE</version>
    		<relativePath/> <!-- lookup parent from repository -->
    	</parent>
    
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    		<java.version>1.8</java.version>
    		<spring-cloud.version>Edgware.SR1</spring-cloud.version>
    	</properties>
    
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-eureka</artifactId>
    		</dependency>
    
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-test</artifactId>
    			<scope>test</scope>
    		</dependency>
    
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-config</artifactId>
    		</dependency>
    
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-actuator</artifactId>
    		</dependency>
    
    	</dependencies>
    
    	<dependencyManagement>
    		<dependencies>
    			<dependency>
    				<groupId>org.springframework.cloud</groupId>
    				<artifactId>spring-cloud-dependencies</artifactId>
    				<version>${spring-cloud.version}</version>
    				<type>pom</type>
    				<scope>import</scope>
    			</dependency>
    		</dependencies>
    	</dependencyManagement>
    
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    		</plugins>
    	</build>
    
    
    </project>
    

    在Controller类上加入@RefreshScope注解,由于上一篇controller写在了启动类上,所以直接加在启动类上,代码如下:

    package com.dalaoyang;
    
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.context.config.annotation.RefreshScope;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @SpringBootApplication
    @RestController
    @RefreshScope
    public class SpringcloudConfigClientApplication {
    
    	public static void main(String[] args) {
    		SpringApplication.run(SpringcloudConfigClientApplication.class, args);
    	}
    
    
    	@Value("${title}")
    	String title;
    
    	@RequestMapping("/getTitle")
    	public String getTitle(){
    		return title;
    	}
    
    }
    

    配置文件新增配置management.security.enabled=false,在刷新时关闭安全验证。

    代码如下:

    spring.application.name=config-client
    spring.cloud.config.label=master
    spring.cloud.config.profile=test
    spring.cloud.config.uri= http://localhost:9000/
    
    ##注册中心地址
    eureka.client.service-url.defaultZone=http://eureka.dalaoyang.cn/eureka/
    
    ## 刷新时,关闭安全验证
    management.security.enabled=false
    

    分别启动项目config-server,config-client。访问http://localhost:8080/getTitle,结果如下图

    修改git上配置,修改为dalaoyang_test_change,在次请求,结果没有改变,使用postman或者其他工具post请求http://localhost:8080/getTitle看到返回如下结果。

    在次访问http://localhost:8080/getTitle,如下图

    2.使用springcloudbus刷新配置

    springcloudbus需要使用轻量消息代理,本文使用rabbitmq,启动rabbitmq如下图:

    访问http://localhost:15672/#/如下图

    新建项目springcloud_bus,同时改造config-client,pom文件加入bus依赖,代码如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    
    	<groupId>com.dalaoyang</groupId>
    	<artifactId>springcloud_bus</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<packaging>jar</packaging>
    
    	<name>springcloud_bus</name>
    	<description>springcloud_bus</description>
    
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>1.5.9.RELEASE</version>
    		<relativePath/> <!-- lookup parent from repository -->
    	</parent>
    
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    		<java.version>1.8</java.version>
    		<spring-cloud.version>Edgware.SR1</spring-cloud.version>
    	</properties>
    
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-eureka</artifactId>
    		</dependency>
    
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-test</artifactId>
    			<scope>test</scope>
    		</dependency>
    
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-config</artifactId>
    		</dependency>
    
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-actuator</artifactId>
    		</dependency>
    
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-bus-amqp</artifactId>
    		</dependency>
    	</dependencies>
    
    	<dependencyManagement>
    		<dependencies>
    			<dependency>
    				<groupId>org.springframework.cloud</groupId>
    				<artifactId>spring-cloud-dependencies</artifactId>
    				<version>${spring-cloud.version}</version>
    				<type>pom</type>
    				<scope>import</scope>
    			</dependency>
    		</dependencies>
    	</dependencyManagement>
    
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    		</plugins>
    	</build>
    
    
    </project>
    

    加入rabbitmq配置,配置文件如下:

    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    
    ##端口号
    server.port=8881
    
    ## 刷新时,关闭安全验证
    management.security.enabled=false
    
    spring.application.name=config-client
    spring.cloud.config.label=master
    spring.cloud.config.profile=test
    spring.cloud.config.uri= http://localhost:9000/
    
    ##注册中心地址
    eureka.client.service-url.defaultZone=http://eureka.dalaoyang.cn/eureka/
    

    启动类加入注解@RefreshScope,代码如下:

    package com.dalaoyang;
    
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.context.config.annotation.RefreshScope;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @SpringBootApplication
    @RestController
    @RefreshScope
    public class SpringcloudBusApplication {
    
    	public static void main(String[] args) {
    		SpringApplication.run(SpringcloudBusApplication.class, args);
    	}
    
    
    	@Value("${title}")
    	String title;
    
    	@RequestMapping("/getTitle")
    	public String getTitle(){
    		return title;
    	}
    }
    

    config-client加入同样依赖和配置,重启config-client,启动springcloud_bus,先去http://eureka.dalaoyang.cn/,可以看到

    先将git上配置改回dalaoyang_test,分别请求http://localhost:8881/getTitle和http://localhost:8080/getTitle结果如下:

    然后用postman使用post请求访问http://localhost:8881/bus/refresh

    再次分别请求http://localhost:8881/getTitle和http://localhost:8080/getTitle结果如下:

    从图中可以看出刷新配置成功。

    3.局部刷新配置,配置与第2种方法一样,只是在使用postman刷新时略加改变,在本文中使用http://localhost:8881/bus/refresh?destination=config-client:8881可以刷新服务名为config-client端口为8881的服务,如果想要刷新服务名为config-client的所有服务可以写成http://localhost:8881/bus/refresh?destination=config-client:**

    源码下载 :大老杨码云

    个人网站:https://www.dalaoyang.cn

    展开全文
  • ActiveMQ 企业消息总线(ESB)源代码 本代码包含ESB在实际项目中的应用例子。注意测试前请下载安装 apache-activemq-5.12.1
  • 作者codyer,源码ElegantBus,ElegantBus 是一款 Android 平台,基于 LivaData 的消息总线框架,这是一款非常 优雅 的消息总线框架。如果对 ElegantBus 的实现过程,以及考虑点感兴趣的可以看看前几节自吹如果只是...
  • .NET进程间广播消息总线。 它旨在在Windows桌面应用程序中进行快速广播消息传递,因此可以正常工作。 快速介绍 设计为无服务器 客户可以随时进出 消息在指定的超时后过期,默认为500毫秒 日志保持较小以提高性能,...
  • ngx-消息总线 用于 Angular 11+ 的消息总线,用于组件之间的通信。 对于Angular版本8.x,您需要选择8.0.0版本。 对于 Angular 7.x 版本,您需要选择 7.0.0 版本。 对于 Angular 6.x 版本,您需要选择 3.0.0 版本。 ...
  • Grav是用于Go应用程序的嵌入式分布式消息传递网格。 Grav允许系统的互连组件以可靠的异步方式有效地进行通信。 HTTP和RPC在现代分布式系统中很难很好地扩展,但是Grav旨在在各种分布式环境中具有高性能和弹性。 ...
  • 主要介绍了SpringCloud消息总线Bus配置中心实现过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
  • 总线总线是命令的消息总线实现,也就是命令总线。 主要思想是提供一种以解耦方式与特定 CORE 层交互的方法。 这个项目开始于一个宠物项目,并在不同的项目中重用。 所以最终它成为开源的,使其更容易维护。 该...
  • 标准号DL/T 1660-2016发布日期2016-12-05实施日期2017-05-01中国标准分类号F21国际标准分类号29.020归口单位中国电力企业联合会主管部门国家能源局行业分类电力、热力、燃气及水生产和供应业
  • 主要介绍了SpringCloud之消息总线Spring Cloud Bus实例代码,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
  • Prooph Service Bus - 轻量级消息总线支持CQRS和微服务
  • libiopcmsgbus 通过UDS的消息总线
  • 事件总线 分布式消息总线
  • service-bus, 支持CQRS的PHP轻量级消息总线 PSB - ProophServiceBus支持CQRS和微服务的PHP 7.1 轻量级消息总线 消息 APIprooph/服务总线是一个轻量级的消息传递 facade 。 它允许你用消息来定义你的模型的API 。
  • 消息总线文档管理

    2016-06-27 11:26:05
    消息总线文档管理
  • 基于层次消息总线的体系结构是一种新的软件体系风格
  • 微服务之消息总线

    2020-10-29 09:32:40
    简介 ...Spring Cloud作为微服务架构的一个综合解决方案,也提供了对应的解决方案Spring Cloud Bus,即消息总线。 这里要理解一个概念,消息总线。简单理解就是一个消息中心,众多微服务实例可以连接

    简介

    在上篇文章《微服务之配置中心》中写到,客户端可从服务端获取配置信息,当Git仓库中的配置文件修改后,为了让客户端获取最新的配置信息,可以通过执行refresh操作进行手动刷新。但是这样有问题,当客户端很多时(随之系统的不断扩大),如果需要每个客户端都执行一遍,那就蛋疼了,显然这种方案就不适合了。Spring Cloud作为微服务架构的一个综合解决方案,也提供了对应的解决方案Spring Cloud Bus,即消息总线。

    这里要理解一个概念,消息总线。简单理解就是一个消息中心,众多微服务实例可以连接到总线上,实例可以往消息中心发送或接收信息(通过监听)。比如:实例A发送一条消息到总线上,总线上的实例B可以接收到信息(实例B订阅了实例A),这样的话,消息总线就充当一个中间者的角色,使得实例A和实例B解偶了,很方便。

    图片发自简书App

    消息总线(Spring Cloud Bus)

    原理

    Spring Cloud Bus通过建立多个应用之间的通信频道,管理和传播应用间的消息,从技术角度来说,应用了AMQP消息代理作为通道,通过MQ的广播机制实现消息的发送和接收。以其典型应用——配置中心客户端刷新为例,说明下工作流程:

    图片发自简书App

    (1)修改配置文件,触发webhook向clientA发送bus/refresh;

    (2)clientA重新从配置中心获取新的配置信息,同时发送消息到Spring Cloud Bus;

    (3)Spring Cloud Bus收到消息,同时通知clientB、clientC(订阅配置更新事件);

    (4)clientB、clientC收到通知,重新请求配置中心,获取新的配置信息。

    这样,三个客户端均得到最新的配置。

    消息代理

    这个过程中,作为通道的AMQP消息代理很重要。AMQP(高级消息队列协议,是一个标准)是一个网络协议,从扮演角色来说,消息代理从生产者(producers)那儿接收消息,并根据既定的路由规则把接收到的消息发送给处理消息的消费者(consumers),这个过程中的发布者,消费者,消息代理可以存在于不同的设备上,下面简单介绍下工作流程(其实跟上面的类似):

    图片发自简书App

    消息(message)被发布者(publisher)发送给交换机(exchange),交换机常常被比喻成邮局或者邮箱。然后交换机将收到的消息根据路由规则分发给绑定的队列(queue)。最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

          AMQP作为一个标准协议,主要实现方案有RabbitMQ、ActiveMQ、Qpid等。这里我主要以RabbitMQ为例进行说明,它是一个优秀的微服务架构消息中间件,与Spring Cloud Bus能够很好的结合使用。

    下图显示了RabbitMQ的Web管理首页:

    图片发自简书App

    (1)Broker:消息队列服务器,即负责接收生产者消息,发送至消费者的;

    (2)Connections:连接,即发送者、消息接收者、消费者之间的物理连接;

    (3)Channel:通道,连接生产者、消费者的逻辑结构。一个Connection可以对应多个Channel;

    图片发自简书App

    (4)Exchange:消息交换机,消息第一个到达的地方,可以指定路由规则,决定消息分发到不同的消息队列中去;

    (5)Queue:消息队列,消息经Exchange路由转发至此,进入逻辑等待状态(等待消费,即客户端获取);

    (6)Binding:绑定,把Exchange和Queue按照路由规则进行绑定,即决定Exchange接收消息后,需要发送到哪些Queue中:

    图片发自简书App

    消息发送-接收原理图

    图片发自简书App

    配置步骤

    Config Server服务端

    (1)在Config Server添加RabbitMQ依赖,非常简单:

    图片发自简书App

    (2)修改配置文件,添加RabbitMQ的配置信息:

    图片发自简书App

    (3)启动类加注解:

    图片发自简书App

    Config Client客户端

    (1)在Config Client添加RabbitMQ依赖,非常简单:

    图片发自简书App

    (2)修改配置文件,添加RabbitMQ的配置信息:

    图片发自简书App

    Eureka Server注册中心

    省略。

    测试

    (1)启动Eureka Server注册中心、ConfigServer服务端、Config Client客户端(开启两个实例),如图:

    图片发自简书App

    (2)客户端获取配置文件的值,如图:

     

    图片发自简书App

    图片发自简书App

    图片发自简书App

    (3)修改配置文件值,改为apps.caac.net/demo,如图:

     

    图片发自简书App

    (4)通过curl执行刷新操作(配置服务端执行/bus/refresh),如图:

     

    图片发自简书App

    (5)客户端重新获取配置文件的值,可知已获取最新配置信息,如图:

     

    图片发自简书App

    图片发自简书App

    (6)至此完成配置自动刷新,当部署正式环境时,可以在配置文件修改时自动执行一个操作:curl -X POST http://localhost:8889/bus/refresh,在码云后台可以进行设置(这里测试环境,设置不了,必须是公网地址),如图:

    图片发自简书App

    局部刷新

    有时我们只想刷新部分微服务的配置,此时可通过/bus/refresh端点的destination参数来定位要刷新的应用程序。

    刷新指定实例(某一服务)

    执行:/bus/refresh?destination=customers:9000,

    其中,customers:9000指的是各个微服务的ApplicationContext ID(默认为:${spring.application.name}:${server.port})。

    刷新全部实例(某一服务)

    执行:/bus/refresh?destination=customers:**,这样就可以触发customers微服务所有实例的配置刷新。

    图片发自简书App

    注意:这里的${spring.application.name}区分大小写。



    作者:守候流星雨
    链接:https://www.jianshu.com/p/bdddca222c63
    来源:简书
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

    展开全文
  • 基于消息总线的工作流系统设计与实现,吴加志,詹舒波,本文介绍了工作流定义和工作流当前发展状况,为了实现企业经营过程中具体事务处理与流程管理的解耦分离,同时达到工作流程的自动
  • 本标准规定了电力系统消息总线的功能要求、接口功能、数据类型和接口定义等内容。
  • 软件体系结构 消息总线风格 文档管理
  • 基于上面需要异步以及等待,因此消息总线还需要一个线程池支持异步以及一个相对任务定时器来等待。 然后就跑下面的流程图就行了: 这里提供 MessageBus 的声明,可以看看消息总线有哪些功能: # include ...

    如果没有怎么写过项目,更确切地说是没有用面向对象的语言写过项目。就不会明白跨类通信这个需求是多么刚需。

    为什么要跨类通信?把所有的逻辑都写在一个类中,都是一家人,那就不需要通信了啊。是,确实是这样,当功能不是很多的时候,这样做确实好。但是,随着项目的规模增大,各种功能堆积,各种模块的增加,会使得这个类非常臃肿与庞大。臃肿庞大就意味着不好维护和扩展。

    因此,我们需要把功能划分出来,把模块划分得细一些,尽量做到类的职责单一且明显。这样可以做到高内聚,低耦合

    既然需要很多类,那么类与类之间必然会存在一些关系,如何来维护?对,就是跨类通信,沟通起来。

    想想之前,两个.c文件需要通信时,我们会怎么做,搞一个全局变量就可以了啊。

    对,跨类通信就是通过全局变量来做到的(好像也只有这个办法哦),这里也要说下原理:我们实现一个程序,这个程序有很多类,那么最终这些类肯定是组成了一个进程,而全局变量在一个进程中是共享的。

    既然全局变量可以做到跨类通信,那么任意两个类之间需要通信就要搞一个全局变量。这样子全局变量也变得臃肿,这不是我们想要的。

    概念

    我们可以参考计算机硬件的一个概念:消息总线。这个概念其实是把网状结构,变成了星性结构,类与类的通信有一个中心机构(消息总线)管理。

    在这里插入图片描述

    设计

    消息总线,有消息二字。这是什么意思呢?我需要你干什么,我给你发个消息,你就把这个事给干了。对,这就是消息总线的设计。大体设计就是这样,现在来看看消息总线需要哪些原料。

    首先,我们要提前写好处理消息的函数,然后等待别人发消息,这里,我们要把消息和函数对应起来,因此这里选择std::multimap,为什么不选择std::map呢,因为,同一个消息可能有几个函数对之感兴趣;

    设计的过程中一定要考虑到同步和异步,尤其是异步,因为潜意识中都会写同步,不会怎么考虑异步,考虑这样一种情况:我现在很渴,给你发个消息,让你去楼下买水,然而你不怎么锻炼,买水送给我这个过程需要一个小时,你说我是等一个小时再喝水还是等个十分钟,看看你回来没,如果没回来就叫别人去买?这里就牵扯到消息总线的同步和异步,这个在程序中就有体现。

    基于上面需要异步以及等待,因此消息总线还需要一个线程池支持异步以及一个相对任务定时器来等待。

    然后就跑下面的流程图就行了:

    在这里插入图片描述

    这里提供MessageBus的声明,可以看看消息总线有哪些功能:

    #include <experimental/any>
    
    class MessageBus
    {
    public:
        using any = std::experimental::any;
    
        typedef std::function<void(any, any)> FUN;
        typedef std::function<void(any, any)> AsynTimeoutCallback;
    
    public:
        MessageBus() = default;
        ~MessageBus() = default;
    private:
        MessageBus(const MessageBus &) = delete;
        MessageBus(MessageBus &&) = delete;
        MessageBus & operator = (const MessageBus &) = delete;
        MessageBus & operator = (MessageBus &&) = delete;
    
    public:
        uint32_t RigisterMessageByMainType(uint32_t mainType, const FUN & callback);
        uint32_t RigisterMessage(uint32_t mainType, uint32_t minorType, const FUN & callback);
    
        void CacelMessageByID(uint32_t id);
        void CacelMessageByMainType(uint32_t mainType);
        void CacelMessage(uint32_t mainType, uint32_t mimorType);
    
        void SyncSendMessageByMainType(uint32_t mainType, any mainAny = any(), any minorAny = any());
        void SyncSendMessage(uint32_t mainType, uint32_t minorType, any mainAny = any(), any minorAny = any());
    
        void AsynSendMessageByMainType(uint32_t mainType, std::chrono::seconds d = std::chrono::seconds(1),
                AsynTimeoutCallback = [] (any, any) {}, any mainAny = any(), any minorAny = any());
        void AsynSendMessage(uint32_t mainType, uint32_t minorType, std::chrono::seconds d = std::chrono::seconds(1),
                AsynTimeoutCallback = [] (any, any) {}, any mainAny = any(), any minorAny = any());
    
    private:
        void DoFunctionByMainType(uint32_t mainType, any mainAny, any minorAny);
        void DoFunction(uint32_t mainType, uint32_t minorType, any mainAny, any minorAny);
    private:
        struct Blob
        {
            Blob() {}
            Blob(uint32_t id, const FUN & fun) : id(id), fun(fun) {}
            uint32_t id;
            FUN fun = nullptr;
        };
        struct CompByMainType
        {
            bool operator () (const uint32_t val, const std::pair< std::pair<uint32_t , uint32_t>, std::shared_ptr<Blob> > & ele);
            bool operator () (const std::pair< std::pair<uint32_t , uint32_t>, std::shared_ptr<Blob> > & ele, const uint32_t val);
        };
    private:
        std::mutex mutex_;
        uint32_t id_;
        std::multimap< std::pair<uint32_t, uint32_t>, std::shared_ptr<Blob> > blobs_;
    
        ThreadPool pool_;
        RelativeTimer relativeTimer_;
    };
    

    实现

    bool MessageBus::CompByMainType::operator()(
            const std::pair<std::pair<uint32_t, uint32_t>, std::shared_ptr<MessageBus::Blob>> &ele, const uint32_t val)
    {
        return ele.first.first < val;
    }
    
    bool MessageBus::CompByMainType::operator()(const uint32_t val,
                                                const std::pair<std::pair<uint32_t, uint32_t>, std::shared_ptr<MessageBus::Blob>> &ele)
    {
        return val < ele.first.first;
    }
    
    uint32_t MessageBus::RigisterMessage(uint32_t mainType, uint32_t minorType, const MessageBus::FUN &callback)
    {
        std::lock_guard<std::mutex> lock(mutex_);
        blobs_.emplace(std::make_pair(mainType, minorType), std::make_shared<Blob>(id_, callback));
        return id_++;
    }
    
    uint32_t MessageBus::RigisterMessageByMainType(uint32_t mainType, const MessageBus::FUN &callback)
    {
        std::lock_guard<std::mutex> lock(mutex_);
        blobs_.emplace(std::make_pair(mainType, -1), std::make_shared<Blob>(id_, callback));
        return id_++;
    }
    
    void MessageBus::CacelMessage(uint32_t mainType, uint32_t mimorType)
    {
        std::lock_guard<std::mutex> lock(mutex_);
        auto range = blobs_.equal_range(std::make_pair(mainType, mimorType));
        blobs_.erase(range.first, range.second);
    }
    
    void MessageBus::CacelMessageByMainType(uint32_t mainType)
    {
        std::lock_guard<std::mutex> lock(mutex_);
        auto range = std::equal_range(std::begin(blobs_), std::end(blobs_), mainType, CompByMainType {});
        blobs_.erase(range.first, range.second);
    }
    
    void MessageBus::CacelMessageByID(uint32_t id)
    {
        std::lock_guard<std::mutex> lock(mutex_);
        blobs_.erase(std::find_if(std::begin(blobs_), std::end(blobs_),
                [id] (const std::pair< std::pair<uint32_t , uint32_t >, std::shared_ptr<Blob> > & ele)
        {
            return id == ele.second->id;
        }));
    }
    
    void MessageBus::DoFunction(uint32_t mainType, uint32_t minorType, any mainAny, any minorAny)
    {
        std::vector< std::shared_ptr<Blob> > vec;
        {
            std::lock_guard<std::mutex> lock(mutex_);
            auto range = blobs_.equal_range(std::make_pair(mainType, minorType));
            for (auto it = range.first; it != range.second; vec.push_back(it++->second)) {}
        }
    
        std::for_each(std::begin(vec), std::end(vec), [=] (const std::shared_ptr<Blob> & blob)
        {
            blob->fun(mainAny, minorAny);
        });
    }
    
    void MessageBus::DoFunctionByMainType(uint32_t mainType, MessageBus::any mainAny, MessageBus::any minorAny)
    {
        std::vector< std::shared_ptr<Blob> > vec;
        {
            std::lock_guard<std::mutex> lock(mutex_);
            auto range = std::equal_range(std::begin(blobs_), std::end(blobs_), mainType, CompByMainType {});
            for (auto it = range.first; it != range.second; vec.push_back(it++->second)) {}
        }
    
        std::for_each(std::begin(vec), std::end(vec), [=] (const std::shared_ptr<Blob> & blob)
        {
            blob->fun(mainAny, minorAny);
        });
    }
    
    void MessageBus::SyncSendMessage(uint32_t mainType, uint32_t minorType, MessageBus::any mainAny, MessageBus::any minorAny)
    {
        DoFunction(mainType, minorType, mainAny, minorAny);
    }
    
    void MessageBus::SyncSendMessageByMainType(uint32_t mainType, MessageBus::any mainAny, MessageBus::any minorAny)
    {
        DoFunctionByMainType(mainType, mainAny, minorAny);
    }
    
    void MessageBus::AsynSendMessage(uint32_t mainType, uint32_t minorType, std::chrono::seconds d,
            MessageBus::AsynTimeoutCallback callback, MessageBus::any mainAny, MessageBus::any minorAny)
    {
        std::shared_future<void> future = pool_.Submit([=] ()
        {
            DoFunction(mainType, minorType, mainAny, minorAny);
        });
    
        relativeTimer_.AddTimerTask(std::to_string((static_cast<uint64_t>(mainType) << 32) + minorType), [=] ()
        {
            if (future.wait_for(std::chrono::seconds::zero()) != std::future_status::ready) callback(mainAny, minorAny);
        }, d, false, true);
    }
    
    void MessageBus::AsynSendMessageByMainType(uint32_t mainType, std::chrono::seconds d,
            MessageBus::AsynTimeoutCallback callback, MessageBus::any mainAny, MessageBus::any minorAny)
    {
        std::shared_future<void> future = pool_.Submit([=] ()
        {
            DoFunctionByMainType(mainType, mainAny, minorAny);
        });
    
        relativeTimer_.AddTimerTask(std::to_string((static_cast<uint64_t>(mainType) << 32) + static_cast<uint32_t>(-1)),
                [=] ()
        {
            if (future.wait_for(std::chrono::seconds::zero()) != std::future_status::ready) callback(mainAny, minorAny);
        }, d, false, true);
    }
    

    细节

    std::experimental::any

    为了实现函数签名的统一性,参数类型使用any,标准库中的any是C++17才有的内容,这里使用<experimental>头文件中的anyany可以擦除类型信息,有点类似void *指针。

    struct CompByMainType

    我设置了主消息号和副消息号,这样可以同一类的消息分在一个主消息号中。有利于管理和维护,但是有时候我们需要给所有注册主消息号的函数发消息,因此需要这个比较函数。

    异步

    仔细看看异步发送消息那个函数。把消息处理函数提交到线程池中,然后开一个定时任务,如果在规定时间内,消息处理函数没有完成,那么就调用超时处理函数;

    future是用了std::shared_future,这是因为std::future是不可复制的,而 l a m b d a lambda lambda表达式需要复制一次future,因此选用可复制的std::shared_future

    测试

    消息总线通常用来跨类通信,由于是测试的小程序,就在main函数中测试消息总线,只需要观察控制台的输出是否符合期望就好了。

    using namespace std::experimental;
    
    MessageBus messageBus;
    
    int main()
    {
        messageBus.RigisterMessage(0, 0, [] (any, any)
        {
            std::cout << "0 + 0" << std::endl;
        });
        messageBus.RigisterMessage(0, 0, [] (any, any)
        {
            std::cout << "00 + 00" << std::endl;
        });
        messageBus.RigisterMessageByMainType(0, [] (any, any)
        {
            std::cout << "0" << std::endl;
        });
        messageBus.RigisterMessageByMainType(2, [] (any, any)
        {
            /* // */std::this_thread::sleep_for(std::chrono::seconds(5));
            std::cout << "2222" << std::endl;
        });
    
        int data = 1, data2 = 1;
        messageBus.RigisterMessageByMainType(3, [] (any mainAny, any)
        {
            int *p = any_cast<int *>(mainAny);
            *p = 2;
            std::cout << "3333" << std::endl;
        });
        messageBus.RigisterMessageByMainType(4, [] (any mainAny, any)
        {
            int *p = any_cast<int *>(mainAny);
            *p = 2;
            std::cout << "4444" << std::endl;
            std::this_thread::sleep_for(std::chrono::hours(1));
        });
    
        messageBus.SyncSendMessage(0, 0);
        std::cout << "-------" << std::endl;
        messageBus.SyncSendMessageByMainType(0);
        std::cout << "-------" << std::endl;
    
        messageBus.AsynSendMessageByMainType(2, std::chrono::seconds(2), [] (any, any)
        {
            std::cout << "timeout" << std::endl;
        });
    
        std::cout << "-------" << std::endl;
        std::cout << "before data : " << data << std::endl;
        messageBus.SyncSendMessageByMainType(3, &data);
        std::cout << "after data : " << data << std::endl;
        std::cout << "-------" << std::endl;
    
        std::cout << "-------" << std::endl;
        std::cout << "before data2 : " << data2 << std::endl;
        messageBus.AsynSendMessageByMainType(4, std::chrono::seconds(2), [] (any mainAny, any)
        {
            int *p = any_cast<int *>(mainAny);
            *p = 2;
            std::cout << "timeout" << std::endl;
        }, &data2);
        std::cout << "after data2 : " << data2 << std::endl;
        std::cout << "-------" << std::endl;
    
        while (true)
        {
            std::this_thread::sleep_for(std::chrono::seconds(5));
        }
        return 0;
    }
    

    参考:

    展开全文
  • SpringCloud-消息总线BUS

    千次阅读 2019-06-27 18:16:33
      本文我们来介绍下SpringCloud中消息总线BUS。 一、什么是消息总线bus   SpringCloud Bus集成了市面上常用的消息中间件(rabbit mq,kafka等),连接微服务系统中的所有的节点,当有数据变更的时候,可以通过...

      本文我们来介绍下SpringCloud中消息总线BUS。

    一、什么是消息总线bus

      SpringCloud Bus集成了市面上常用的消息中间件(rabbit mq,kafka等),连接微服务系统中的所有的节点,当有数据变更的时候,可以通过消息代理广播通知微服务及时变更数据,例如微服务的配置更新。

    二、bus解决了什么问题?

      解决了微服务数据变更,及时同步的问题。

    刷新客户端服务
    在这里插入图片描述

    刷新服务端服务
    在这里插入图片描述

    案例前提需要启动RabbitMQ服务,不太清楚的可参考此文:https://dpb-bobokaoya-sm.blog.csdn.net/article/details/90409404

    三、bus-客户端刷新

    1.创建项目

    在这里插入图片描述

    2.pom文件

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>1.5.13.RELEASE</version>
    	</parent>
    	<groupId>com.bobo</groupId>
    	<artifactId>config-client-refresh-bus</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<dependencyManagement>
    		<dependencies>
    			<dependency>
    				<groupId>org.springframework.cloud</groupId>
    				<artifactId>spring-cloud-dependencies</artifactId>
    				<version>Dalston.SR5</version>
    				<type>pom</type>
    				<scope>import</scope>
    			</dependency>
    		</dependencies>
    	</dependencyManagement>
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-web</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-eureka</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-config</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-actuator</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-bus-amqp</artifactId>
    		</dependency>
    
    	</dependencies>
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    		</plugins>
    	</build>
    </project>
    

    3.配置文件

    spring.application.name=config-client
    server.port=9051
    #设置服务注册中心地址,指向另一个注册中心
    eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/
    
    #默认 false,这里设置 true,表示开启读取配置中心的配置
    spring.cloud.config.discovery.enabled=true
    #对应 eureka 中的配置中心 serviceId,默认是 configserver
    spring.cloud.config.discovery.serviceId=config-server
    #指定环境
    spring.cloud.config.profile=dev
    #git 标签
    spring.cloud.config.label=master
    
    management.security.enabled=false
    
    # 消息队列RabbitMQ的连接配置信息
    spring.rabbitmq.host=192.168.88.150
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=dpb
    spring.rabbitmq.password=123
    spring.rabbitmq.virtualHost=/
    

    注意:controller的作用域要调整

    在这里插入图片描述

    4.再创建一个客户端项目

    在这里插入图片描述

    5.测试

    分别启动config-server和两个服务端,然后获取git中的属性信息

    在这里插入图片描述
    在这里插入图片描述

    显示修改git中的属性值
    在这里插入图片描述

    现在在客户端获取肯定是没有改变的,这时我们可以通过RestTemplate发送post请求来刷新客户端。比如我们刷新5051的服务

    在这里插入图片描述

    	public static void main(String[] args) {
    		RestTemplate template = new RestTemplate();
    		String url = "http://localhost:9051/bus/refresh";
    		template.postForLocation(url, null);
    		System.out.println("刷新客户端...");
    	}
    

    然后再访问:

    在这里插入图片描述
    在这里插入图片描述

    说明同步成功!同时在RabbitMQ中也可以看到队列信息

    在这里插入图片描述

    四、bus-服务端刷新

      接下来看下直接刷新config-server服务来通过bus及时通知client的效果。

    1.创建项目

    在这里插入图片描述

    2.pom

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>1.5.13.RELEASE</version>
    	</parent>
    	<groupId>com.bobo</groupId>
    	<artifactId>config-server-bus</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<dependencyManagement>
    		<dependencies>
    			<dependency>
    				<groupId>org.springframework.cloud</groupId>
    				<artifactId>spring-cloud-dependencies</artifactId>
    				<version>Dalston.SR5</version>
    				<type>pom</type>
    				<scope>import</scope>
    			</dependency>
    		</dependencies>
    	</dependencyManagement>
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-web</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-eureka</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-config-server</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-bus-amqp</artifactId>
    		</dependency>
    	</dependencies>
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    		</plugins>
    	</build>
    </project>
    

    3.配置文件

    spring.application.name=config-server
    server.port=9050
    #设置服务注册中心地址,指向另一个注册中心
    eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/
    
    #Git 配置
    spring.cloud.config.server.git.uri=https://gitee.com/dengpbs/config
    #spring.cloud.config.server.git.username=
    #spring.cloud.config.server.git.password=
    
    management.security.enabled=false
    
    spring.rabbitmq.host=192.168.88.150
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=dpb
    spring.rabbitmq.password=123
    spring.rabbitmq.virtualHost=/
    

    4.测试

      分别启动config-server和两个客户端。

    在这里插入图片描述

    在这里插入图片描述

    git修改属性

    在这里插入图片描述

    通过RestTemplate显示的refresh

    public static void main(String[] args) {
    	RestTemplate template = new RestTemplate();
    	String url = "http://localhost:9050/bus/refresh";
    	template.postForLocation(url, null);
    	System.out.println("刷新客户端...");
    }
    

    然后在访问:

    在这里插入图片描述

    在这里插入图片描述

    五、局部刷新服务

      参考下图,我们上面介绍的两种bus刷新的案例中,只要refresh了服务,那么订阅的有RabbitMQ服务的所有的服务都会刷新一遍配置信息,那这样就会造成一种情况,比如,我们仅仅修改了影响Server-A 服务的配置信息,但是一刷新,Service-B和Service-C服务都要刷新配置信息,显然这样是不行的。这时我们就可以通过局部刷新来实现刷新部分服务的需求。

    在这里插入图片描述

    1.刷新指定服务

      刷新服务的时候我们可以指定刷新的服务信息,具体实现如下:

    http://config-server/bus/refresh?destination=需要刷新的服务名称:端口
    

    2.刷新指定集群服务

      如果同一个服务有集群部署的场景,我们希望刷新服务的时候,集群的服务一并都刷新,提交的信息只需要端口通配就行

    http://config-server/bus/refresh?destination=需要刷新的服务名称:**
    
    展开全文
  • 一个根本上轻便且简单的并发消息总线库。 它有什么作用? libbus提供用于消息传递的共享总线。 客户端可以注册回调以从总线接收消息。 任何用户都可以向所有注册的客户端(广播)或特定的客户端发送消息。 该库...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 75,415
精华内容 30,166
关键字:

消息总线