精华内容
下载资源
问答
  • 文章目录1、RabbitMQ基本概念2、三种队列模式2.1、Direct2.2、Fanout2.3、Topic 1、RabbitMQ基本概念 消息队列有很多种,常见的是ActiveMQ、RabbitMQ、Kafka,三者依次能处理更好的数据量但是安全性也随之降低并且很...

    1、RabbitMQ基本概念

    消息队列有很多种,常见的是ActiveMQ、RabbitMQ、Kafka,三者依次能处理更好的数据量但是安全性也随之降低并且很可能出现数据的丢失,但是目的却是一致的:异步处理、应用解耦,流量削锋和消息通讯等问题实现高性能,高可用,可伸缩性和最终一致性。

    RabbitMQ是用Erlang语言开发的AMQR的开源实现。最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗,具体特点如下:

    • 可靠性:使用一些机制来保证可靠性,如持久化、传输确认、发布确认
    • 灵活的理由:进入消息队列之前,通过Exchange来路由信息的
    • 消息集群:多个RabbitMQ服务器可以组成一个集群,形成一个逻辑broker
    • 高可用:队列可以在集群中的机器上进行镜像,使得在部分节点出问题情况下队列仍然可用
    • 多种协议:支持多种消息队列协议,如STOMP、MQTT等等
    • 多语言客户端:几乎支持所有的语言
    • 管理界面:提供了一个易使用的用户界面,使得用户可用监控和管理消息Broker的许多方面
    • 跟踪机制:如果消息异常,RabbitMQ提供了消息跟踪机制,使用者可以找出发生了什么
    • 插件机制:RabbitMQ提供了许多插件,来从多方面进行扩展,同样也可以编写自己的插件

    架构图:
    在这里插入图片描述
    主要的几个概念:

    1. RabbitMQ Server:也叫做Broker Server,是一种传输服务。角色就是维护一条从Producer到Customer的路线,保证数据能够按照指定的方式进行传输
    2. Producer:消息生产者,消息生产者连接RabbitMQ服务器将消息投递到Exchange
    3. Consumer:消息消费者,消息消费者订阅,RabbitMQ将Queue中的消息发送到消息消费者
    4. Exchange:交换器,有Direct,Fanout,Topic,headers四种类型每种拥有不同的路由规则,本身不存储数据。
    5. Queue:队列,在RabbitMQ中负责存储消息。消息消费者通过订阅队列来获取消息,RabbitMQ中的消息都只能存储在Queue中,生产者生产消息并最终投递到Queue中,然后消费者再去使用。多对多的关系。
    6. RoutingKey:路由规则,通过指定路由规则指定消息流向哪。

    安装:
    使用docker 进行安装

    docker run -di --name=rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 15671:15671 -p 15672:15672 -p 25672:25672 rabbitmq:management
    

    在这里插入图片描述

    2、三种队列模式

    2.1、Direct

    在单节点进行消息传递的时候,可以使用这种方式。
    在这里插入图片描述
    这种情况下指定好RouteKey对应的那个指定的Queue就可以了。

    使用步骤:

    1. 一般情况下可以使用RabbitMQ自带的Exchange
    2. 这种模式下不需要将Exchange进行任何绑定操作
    3. 消息传递,指定RouteKey,不然没法指向Queue
    4. 如果vhost中不存在RouteKey中指定的队列名,那么该消息会被抛弃。

    具体实现:
    在这里插入图片描述
    不需要设置Exchange,是使用默认的。
    代码实现分为两个部分:

    • 生产者
    • 消费者

    pom.xml

    		<dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
            </dependency>
    
    

    application.yml

    server:
      port: 7100
    spring:
      rabbitmq:
        host: localhost
    

    生产者
    我们可以先看看上面的架构图,首先将数据产生,然后进去Exchange,利用你指定的RouteKey去到达指定的Queue。

    首先我们来到web界面,先创建好队列的RouteKey,方便我们去一会进行指定。

    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = RabbitMQApplication.class)
    public class Product1 {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 直接模式
         */
        @Test
        public void send_msg1(){
         rabbitTemplate.convertAndSend("zxc", "直接模式测试");
        }
    }
    

    还需要一个消费者将队列中的消息消费掉

    @Component
    @RabbitListener(queues = "zxc")
    public class Custom1 {
    
        @RabbitHandler
        public void receive_msg(String msg){
            System.out.println("zxc:" + msg);
        }
    }
    

    在这里插入图片描述

    2.2、Fanout

    • 什么是分裂模式

    需要将消息一次发给多个队列时,需要使用这种模式:
    在这里插入图片描述

    • 新建两个队列
      在这里插入图片描述
    • 创建Exchange并绑定相应的Queue

    由于我们要传送到不同Queue中,因此要去运送信息的Exchange也不能只是一个了,所以要额外创建Fanout的Exchange,然后将创建出的Exchange去指定好对应的Queue
    在这里插入图片描述
    在这里插入图片描述

    启动方法写好,因为没有匹配规则,所以RouteKey为null就可以了,只是选好Exchange的名字就行

    	/**
         * 分裂模式
         */
        @Test
        public void send_msg2(){
            rabbitTemplate.convertAndSend("zxczxc", "", "分裂模式测试");
        }
    
    

    编写消费者

    @Component
    @RabbitListener(queues = "zxc1")
    public class Custom2 {
    
        @RabbitHandler
        public void receive_msg(String msg){
            System.out.println("zxc1:" + msg);
        }
    }
    
    @Component
    @RabbitListener(queues = "zxc2")
    public class Custom3 {
    
        @RabbitHandler
        public void receive_msg(String msg){
            System.out.println("zxc2:" + msg);
        }
    }
    

    在这里插入图片描述

    2.3、Topic

    简单说一下主题模式的概念,刚才说到了分列模式意思就是没有RouteKey,而主题模式相当于分列模式的加强版有匹配规则的分列算法。
    在这里插入图片描述
    和之前的分列模式是差不多的,只是增加了RoutingKey
    在这里插入图片描述
    编写生产者:

    	/**
         * 主题模式
         */
        @Test
        public void send_msg3(){
            rabbitTemplate.convertAndSend("topic", "love.you", "主题模式测试");
        }
    
    

    消费者:

    @Component
    @RabbitListener(queues = "zxc")
    public class Custom1 {
    
        @RabbitHandler
        public void receive_msg(String msg){
            System.out.println("zxc:" + msg);
        }
    }
    
    @Component
    @RabbitListener(queues = "zxc1")
    public class Custom2 {
    
        @RabbitHandler
        public void receive_msg(String msg){
            System.out.println("zxc1:" + msg);
        }
    }
    
    @Component
    @RabbitListener(queues = "zxc2")
    public class Custom3 {
    
        @RabbitHandler
        public void receive_msg(String msg){
            System.out.println("zxc2:" + msg);
        }
    }
    

    在这里插入图片描述

    展开全文
  • aliyun的topic说明

    千次阅读 2019-04-25 21:11:10
    说在前面,所有的topic都是需要配合物模型使用 https://help.aliyun.com/document_detail/73727.html?spm=a2c4g.11186623.6.566.727b77a1urids6 属性 所有properties下面的都是属性,对应使用 /sys/a1PODPi5...

    说在前面,所有的topic都是需要配合物模型使用

    https://help.aliyun.com/document_detail/73727.html?spm=a2c4g.11186623.6.566.727b77a1urids6

    属性

    所有properties下面的都是属性,对应使用

    /sys/a1PODPi5ohj/${deviceName}/thing/event/property/post

    /sys/a1PODPi5ohj/${deviceName}/thing/service/property/set

    事件

    Events下面是事件,对应

    /sys/a1HnMJCnqYK/${deviceName}/thing/event/${tsl.event.identifer}/post

     

    ${tsl.event.identifer}字段说明

    事件都是上报,默认会添加属性上报的topic

    thing.event.property.post

    对应topic就是

    /sys/a1HnMJCnqYK/${deviceName}/thing/event/property/post

    其实也就是我们第一部分属性上报的topic

     

    自己添加的事件说明

    事件标识对应identifier,我这里是shijian,上报的topic就是thing.event.shijian.post

    /sys/a1HnMJCnqYK/${deviceName}/thing/event/shijian/post 

    服务

     

    Services下面是服务对应

    /sys/a1PODPi5ohj/${deviceName}/thing/service/${tsl.event.identifer}

    和事件相反,服务只能订阅

    默认添加thing.service.property.set,和thing.service.property.get

    /sys/a1PODPi5ohj/${deviceName}/thing/service/property/set

    /sys/a1PODPi5ohj/${deviceName}/thing/service/property/get

    可以看到其实还是第一部分的属性获取

    第二个topic我没用过不知道干啥的,估计可以通过这个下发属性。

     

    自己添加服务说明,这里选用异步,同步需要rrpc

    其实有了上面例子很好理解thing.service.fuwu,也就是

    /sys/a1PODPi5ohj/${deviceName}/thing/service/fuwu 

    关于同步rrpc

    https://help.aliyun.com/document_detail/90567.html?spm=a2c4g.11186623.6.624.217bf9bc6sgX4I

    ·RRPC请求消息Topic/sys/${YourProductKey}/${YourDeviceName}/rrpc/request/${messageId}

    ·RRPC响应消息Topic/sys/${YourProductKey}/${YourDeviceName}/rrpc/response/${messageId}

    ·RRPC订阅Topic/sys/${YourProductKey}/${YourDeviceName}/rrpc/request/+

    关键点就是订阅那个通配符topic,因为每次messageId是不同的,也就是说每次都需要按照发过来的messageId重新组合topic

    展开全文
  • tag可以理解为topic的二级标签,同一个topic可以发送不用的tag消息,消费者可以指定接收某个tag的消息,也可以使用通配符如“*”,null来接收所有tag的消息,是对消息的进一步分类 groupname是组的感念,理解起来...

    先对三者的概念做个个人理解的介绍:

    topic是消息通道的重要标识,可以理解成消息的标题,生产者和消费者必须在用一个topic下才能通讯

    tag可以理解为topic的二级标签,同一个topic可以发送不用的tag消息,消费者可以指定接收某个tag的消息,也可以使用通配符如“*”,null来接收所有tag的消息,是对消息的进一步分类

    groupname是组的感念,理解起来比较抽象,groupname可以配置也可以不配置,不配置默认都在一个组。但是不影响消息的正常发送和接收,但是会影响到消息的消费者会被谁消费

    举个例子,如果有一个生产者,两个消费者A和B,如果生产者配置的组名为group1,消费者A和B的组名也配置的组名为group1,那么消费者A和B只能有一个消费到消息,

    但是如果消费者A和消费者B 配置了不通的组名,则两个消费者都能收到生产者发送的消息。

    其实groupname是为了项目的负载均衡用的,比如你的消费者程序部署多处,为了防止消息重复处理,只需要配置相同的groupname就可以了

    展开全文
  • 由于很多业务表因为历史原因或者性能原因,都使用了违反第一范式的设计模式。即同一个列中存储了多个属性值(具体结构见下表)。 这种模式下,应用常常需要将这个列依据分隔符进行分割,并得到列转行的结果。 源表...

    行列转换常见场景

    由于很多业务表因为历史原因或者性能原因,都使用了违反第一范式的设计模式。即同一个列中存储了多个属性值(具体结构见下表)。 这种模式下,应用常常需要将这个列依据分隔符进行分割,并得到列转行的结果。

    源表数据:

    期望得到目标结果:

    思路分析:

     这个join最基本原理是笛卡尔积,通过实现循环,依次按照逗号分割截取后,改变列的数值

    方法:

    id构建:如有一行数据value有100个逗号分割的值,需要循环100次,mysql内部也有属性表help_topic自增id help_topic_id可用,共有659个连续的id,id最大值大于符合分割value值的个数,能满足于大部分需求了

    mysql中截取字符串的函数:substring_index。

    substring_index(“待截取有用部分的字符串”,“截取数据依据的字符”,截取字符的位置N)

    通过 b.help_topic_id < (length(a.value) - length(replace(a.value,',',''))+1)限制循环的次数,再截取每次循环的最后一位。

    测试过程如下:

    select 
    a.id,a.value,substring_index(a.value,',',b.help_topic_id+1),substring_index(substring_index(a.value,',',b.help_topic_id+1),',',-1) 
    from 
    source_table a
    join
    mysql.help_topic b
    on b.help_topic_id < (length(a.value) - length(replace(a.value,',',''))+1)
    order by a.id;

    整体sql:

    select a.id,substring_index(substring_index(a.value,',',b.help_topic_id+1),',',-1)  as value
    from 
    source_table a
    join
    mysql.help_topic b
    on b.help_topic_id < (length(a.value) - length(replace(a.value,',',''))+1)
    order by a.id;
    

     

    展开全文
  • 使用到数据库自带的mysql.help_topic表来属性拆分,help_topic表就是实现行转列功能 SELECT SUBSTRING_INDEX( SUBSTRING_INDEX('1,2,3,4,5',',',help_topic_id + 1),',' ,- 1) lzh FROM mysql.help_topic WHERE ...
  • 只需使用您的主题名称对您的 nsqd 实例调用 create 方法,您就会摇摆不定。 // point it to the http port on your nsqd server var nsqd = 'localhost:4151'; var nsqTopic = require('nsq-topic'); nsqTopic....
  • springboot整合activeMQ案例,queue、topic两种模式

    万次阅读 热门讨论 2018-07-06 14:43:40
    #默认情况下activemq提供的是queue模式,若要使用topic模式需要配置下面配置 spring.jms.pub-sub-domain=true #账号 spring.activemq.user=admin # 密码 spring.activemq.password=admin # 是否信任所有包 #spring....
  • 因业务需要,本人在测试机上部署了一个activemq。突然奇想 要用 Stomp方式请求 TOPIC 数据。 打开activemq stomp 文档,发现内容十分的节省啊,不过还好给了两个demo.
  • mysql.help_topic表无权限使用解决方法

    千次阅读 2020-12-19 14:30:30
    我想要分开,每个人单独一条,于是用到了mysql的help_topic表 代码: select id, substring_index(substring_index(a.owner,';' ,b.help_topic_id+1),';',-1) owner from test a join mysql.help_topic b...
  • kafka基本概念和使用

    千次阅读 2018-12-03 00:06:13
    kafka基本概念和使用 文章目录kafka基本概念和使用kafka的概念基本概念Kafka的使用首先kafka的安装kafka的简单实用和理解搭建集群(3个节点)windows版本环境搭建 本文在[初谈Kafka]...
  • public static final String TOPIC_EXCHANGE_NAME = "topic_exchange"; /** * 定义bindingKey,模糊匹配 */ public static final String BINGDING_KEY_TEST1 = "*.orange.*"; public static final String ...
  • 概述上篇文章中间件系列五...这节我们引入Topic exchange(主题交换机),支持对路由键的模糊匹配实现生产者发送一个消息,这个消息同时被传送给所有队列。但是有时我们不希望所有的消息都被所有队列接收,我们希望可以指
  • Kafka删除topic

    千次阅读 2019-09-26 10:28:51
    topic中的元数据存储在ZooKeeper中的/brokers/topics和/config/topics路径下,topic中的消息数据存储在log.dir或log.dirs配置的路径下,所以删除topic就需要删除这些路径下的内容。 准备工作 创建topic: bin/...
  • topic_modelling-源码

    2021-06-16 17:39:54
    就是这样,我使用了 python 库和来完成这些艰苦的工作。 它显示了不同的算法方法,TFIDF、LSI 和 LDA 算法。 用法 weighting_algorithms.py : 用你想分析的输入文本文件修改脚本并运行脚本,然后oyu可以写一个单词...
  • 但无论采用这两种的任何一种,都是可以在同一个topic下,通过tag来进行业务区分的。 网上有很多分析相关使用方式的文章,虽然分析的结果都是“不可以”,但我们可以通过其他的一些方案来进行解决。 自主搭建的...
  • Kafka使用JavaAPI-AdminClient操作topic

    热门讨论 2020-12-04 09:51:19
    Kafka使用JavaAPI-AdminClient操作topic 首先需要初始化AdminClient对象(具体编写位置与修饰权限看自己代码结构); private static final String borker = "localhost:port"; public static AdminClient ...
  • ActiveMQ的topic的简单使用

    千次阅读 2018-07-20 11:16:04
    上一篇写了queue的简单使用,实际差不多,直接上代码 这个topic就类似与微信公众号的感觉,一个生产者可以对应多个消费者 代码在运行的时候需要先运行一下消费者,相当于你订阅了这个topic(类似于关注了一个微信...
  • 如何永久删除Kafka的Topic

    千次阅读 2019-05-27 14:25:06
    使用kafka-topics --delete命令删除topic时并没有真正的删除,而是把topic标记为:“marked for deletion”,导致重新创建相同名称的Topic时报错“already exists”。 2.问题复现 1.登录Kafka集群所在的服务器,...
  • Apollo如何通知/订阅主题topic

    千次阅读 热门讨论 2018-04-20 22:21:46
    How to advertise and subscribe a topic 导读 众所周知,Apollo是基于ROS开发的,所以其底层也是基于消息的机制进行节点通信的。但是它在ROS的基础上做了一些改动,如下: P2P——由于原生ROS的...
  • kafka topic的基本操作

    2017-07-28 11:30:51
    转载自 http://www.cnblogs.com/xiaodf/p/6093261.html ... 创建kafka topic bin/kafka-topics.sh --zookeeper node01:2181 --create --topic t_cdr --partit
  • topic创建详解

    千次阅读 2020-03-24 12:38:56
    1、自动创建 如果kafka broker中的config/server....那么当生产者向一个尚未创建的topic发送消息时,会自动创建一个num.partitions(默认值为1)个分区和default.replication.factor(默认值为1)个副本的对应to...
  • 所以我们使用topic的指令格式应该都类似: kafka-topics --zookeeper cluster2-4:2181/kafka ...... A.创建一个名为 test 的主题(Topic): kafka-topics --zookeeper cluster2-4:2181/kafka --create -...
  • AOSC OS APT主题生成器 这是AOSC OS APT主题生成器。 它以JSON格式为二进制包生成主题元数据。 生成的格式在。 该项目是AOSC基础架构的一部分。 建筑 只需运行cargo build --...运行./topic-manifest -d 来启动。
  • 查看kafka的topic清单以及topic的内容

    千次阅读 2020-11-12 16:54:09
    对于云端服务器上运行着的kafka集群,由于没有开放相关的端口,之前查看都是使用端口转发,ssh练上去之后,使用端口转发,转发到本地查看。 使用kafka时发现,zk连接上...查看kafka中topic清单:kafka-topics.sh --l
  • 这个问题是最近一个朋友问我的,用sparkstreaming消费kafka的多个topic,怎么获取topic的信息,然后根据不同topic的数据做不同的逻辑处理.其实这个问题非常简单,...
  • 文章目录前言回顾 Queue 类型消息的发送和请求过程ActiveMQTopic 和 JmsTemplateTopic 类型消息的特性 前言 本篇文章的内容会非常之少,因为 Topic 类型的数据的发送和接收本身在代码上并没有太大的差别,唯一的...
  • kafka 创建topic,查看topic

    万次阅读 2018-09-28 16:46:21
    创建 创建kafka topic bin/kafka-topics.sh --create --topic topicname --replication-factor 1 --partitions 1 --zookeeper ...方法一: 执行linux命令: bin/kafka-topics.sh --create --topic topicname --rep...
  • Rocketmq topic 管理及自动创建分析

    千次阅读 2019-06-08 12:06:07
    rocketmq topic 信息是保存在broker 上 borker会定期保存到store/config 下topics.json 每次broker启动会默认创建系统级topic,其中就...禁止自动创建topic方法:修改TBW102 PERM为0并且需要注意每个新broker增加的...
  • 前言: 删除kafka topic及其数据,严格来...本文总结多个删除kafka topic的应用场景,总结一套删除kafka topic的标准操作方法。 step1: 如果需要被删除topic 此时正在被程序 produce和consume,则这些生产和消费...
  • kafka彻底删除topic的两种方法

    千次阅读 2016-11-27 15:54:11
    方法一:快速配置删除法 1.kafka启动之前,在server.properties配置delete.topic.enable=true  2.执行命令bin/kafka-topics.sh --delete --topic test --zookeeper zk:2181 或者使用kafka-manager集群管理工具删除

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 129,863
精华内容 51,945
关键字:

topic的用法