精华内容
下载资源
问答
  • rabbitmq教程
    2021-02-08 10:59:59

    本教程以CentOS6为例说明:

    1、安装Erlang

    下载Zero-dependency Erlang from RabbitMQ 下载地址

    执行命令:

    rpm -ivh erlang-23.1.4-1.el6.x86_64.rpm
    

    2、安装RabbitMQ

    (1)安装socat

    yum -y install epel-release
    
    yum -y install socat
    

    (2)安装RabbitMQ

    下载RabbitMQ 下载地址

    rpm -ivh rabbitmq-server-3.8.9-1.el6.noarch.rpm
    

    (3)开放guest外部访问

    vim /etc/rabbitmq/rabbitmq.config
    
    [{rabbit, [{loopback_users, []}]}].
    

    3、启动、停止RabbitMQ

    service rabbitmq-server start
    
    service rabbitmq-server stop
    
    service rabbitmq-server restart
    

    4、开启RabbitMQ web管理工具

    rabbitmq-plugins enable rabbitmq_management
    
    service rabbitmq-server restart
    

    登录界面管理:http://IP:15672/

    5、设置开机启动

    chkconfig rabbitmq-server on
    

    6、防火墙开放15672端口

    /sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
    
    /etc/rc.d/init.d/iptables save
    

    7、相关端口:

    5672: rabbitMq的编程语言客户端连接端口

    15672:rabbitMq管理界面端口

    25672:rabbitMq集群的端口

    8、RabbitMQ相关命令

    rabbitmqctl list_users

    rabbitmqctl add_user fzb fzb2019

    rabbitmqctl set_user_tags fzb administrator

    rabbitmqctl change_password fzb fzb2020

    rabbitmqctl delete_user fzb

    9、日志文件

    日志位置:/var/log/rabbitmq/

    • rabbit@{hostname}.log:输出rabbitmq运行相关的信息,如网络流量、用户、交换器、队列等信息
    • rabbit@{hostname}-sasl.log:Erlang运行相关信息
    更多相关内容
  • RabbitMQ教程

    2018-12-25 10:41:12
    rabbitmq 编辑 MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过...
  • rabbitmq教程

    2018-08-05 22:08:53
    rabbitmq教程
  • rabbitMQ教程

    2019-01-23 17:51:33
    MQ全称为Message Queue,即消息队列, RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开 发中应用...
  • 基于Python的rabbitmq教程. 文档链接: 安装:(推荐使用docker的镜像,这样也不会“污染”本地环境) 基于Ubuntu 下载docker: sudo apt-get install docker.io 给予你自己使用docker的权限: sudo chmod o+wr /var/run/...
  • RabbitMQ教程 这是我的中型文章中描述的演示的代码存储库。 中篇文章 1. 在本文中,我们将讨论如何在Docker上快速启动RabbitMQ实例。 我们将通过两种方法在容器中运行RabbitMQ Docker映像:(1)使用docker run命令...
  • RabbitMQ教程完整(精华干货)

    千次阅读 2020-12-10 13:49:00
    RabbitMQ教程(完整!!!) 一.RabbitMQ安装 1.1 为什么使用RabbitMQ 1.降低耦合度 2.RabbitMQ速度快,微秒级别 3.学习成本低 4.支持多种语言 1.2 什么是RabbitMQ RabbitMQ是一个由erlang开发的AMQP(Advanced ...

    RabbitMQ教程(完整!!!)

    一.RabbitMQ安装

    1.1 为什么使用RabbitMQ

    1.降低耦合度

    2.RabbitMQ速度快,微秒级别

    3.学习成本低

    4.支持多种语言

    1.2 什么是RabbitMQ

    1. RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue 高级消息队列协议)的开源实现
    2. 能够实现异步消息处理

    1.3 安装RabbitMQ

    #在线安装
    docker pull rabbitmq:management
    #使用官方定义的端口号启动
    docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management
    

    二.RabbitMQ架构

    2.1官方简单架构图

    1.Publish-生产者(发布消息到RabbitMQ中的Exchange

    2.Exchange-交换机(与生产者建立连接并接收生产者的消息

    3.Routes-路由(交换机以什么样的策略消息发布到Queue

    4.Queue-队列(Exchange会将消息分发到指定的Queue,Queue和消费者进行交互

    5.Consumer-消费者(监听RabbitMQ中的Queue中的消息

    在这里插入图片描述

    2.2Rabbit完整架构图

    在这里插入图片描述

    2.3查看图形化界面并创建一个Virtual Host

    Virtual Host:相当于RabbitMQ上的虚拟机,可以创立多个,建立connection连接的时Virtual Host!

    创建一个全新的用户和全新的Virtual Host,并且将test用户设置上可以操作/test-1的权限!

    在这里插入图片描述

    三.RabbitMQ使用

    3.1RabbitMQ通讯方式

    在这里插入图片描述
    在这里插入图片描述

    在这里插入图片描述

    3.2 Java连接RabbitMQ

     public static Connection getConnection(){
            //创建Connection工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setVirtualHost("/test-1");
            factory.setPassword("test");
            factory.setUsername("test");
            factory.setHost("192.168.61.134");
            factory.setPort(5672);
            Connection connection = null;
            //拿到连接
            try {
                connection  = factory.newConnection();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
    
            return connection;
        }
    

    3.3Hello-world

    在这里插入图片描述

    一个生产者,一个默认的交换机,一个队列,一个消费者

    1、创建生产者,创建一个channel,发送消息到exchange,指定路由规则

    //生产者
    public static void publish() throws Exception {
        //1、获取connection
        Connection connection = RabbitConfig.getConnection();
        //2、创建channel
        Channel channel = connection.createChannel();
        //3、发送消息到exchange
          String msg = "hello world!!!";
        /**
         * 参数1:指定exchange,使用“”。默认的exchange
         * 参数2:指定路由的规则,使用具体的队列名称。exchange为""时,消息直接发送到队列中
         * 参数3:制动传递的消息携带的properties
         * 参数4:指定传递的消息,byte[]类型
         */
        channel.basicPublish("", "helloworld", null,msg.getBytes());
        //PS:exchange是不会将消息持久化的,Queue可以持久化,得配置
    
        System.out.println("生产者发布消息成功!");
        //4、关闭管道和连接
        channel.close();
        connection.close();
    
    }
    

    2、创建一个消费者,创建一个channel,创建一个队列,并且消费队列

    //消费者  
    public static void consumer() throws Exception{
        //1、获取连对象、
        Connection connection = RabbitConfig.getConnection();
    
        //2、创建channel
        Channel channel = connection.createChannel();
    
        //3、创建队列-helloworld
        /**
             * 参数1:queue 指定队列名称
             * 参数2:durable 是否开启持久化(true)
             * 参数3:exclusive 是否排外(conn.close()-》当前对列自动删除,当前队列只能被一个 消费者消费)
             * 参数4:autoDelete 如果这个队列没有其他消费者在消费,队列自动删除
             * 参数5:arguments 指定队列携带的信息
             *
             */
        channel.queueDeclare("helloworld",true,false,false,null);
    
    
        //4.开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:"+new String(body,"UTF-8"));
            }
        };
        /**
             * 参数1:queue 指定消费哪个队列
             * 参数1:deliverCallback 指定是否ACK(true:收到消息会立即告诉RabbiMQ,false:手动告诉)
             * 参数1:cancelCallback 指定消费回调
             *
             */
        channel.basicConsume("helloworld",true,consumer);
        System.out.println("消费者开始监听队列");
    
        //5、键盘录入,让程序不结束!
        System.in.read();
    
        //6、释放资源
        channel.close();
        connection.close();
    
    }
    

    3.4 Work

    在这里插入图片描述

    一个生产者,一个默认的交换机,一个队列,两个消费者

    只需要在consumer消费者端,添加Qos能力以及更改为ACK手动即可让消费者根据自己的能力消费,不是RabbitMQ默认的平均分配了

    //1指定当前消费者一次能消费多少条消息
    channel.basicQos(1);
    
    //2.开启监听Queue
    DefaultConsumer consumer = new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("消费者1号接收到消息:"+new String(body,"UTF-8"));
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //手动ACK(接收信息,指定书否批量操作)
            channel.basicAck(envelope.getDeliveryTag(),false);
        }
    };
    //3.关闭自动ACK
        channel.basicConsume("work",false,consumer);
    

    3.5 Publish/Subscribe

    在这里插入图片描述

    一个生产者,一个交换机,两个队列,两个消费者

    声明一个Fanout类型的exchange,并且将exchange和queue绑定在一起,绑定规则直接绑定。

    1、让生产者创建一个exchange并且指定类型,和一个或多个队列绑定在一起。当生产者发送消息是会发送到exchange中,再由exchange到绑定的队列中

    //3、通过channel创建自己的exchange 并且绑定队列
        /**
         * 参数1:exchange的名称
         * 参数2:指定exchange的类型
         * FANOUT-Publish/Subscribe
         * DIRECT-Routing
         * TOPIC-Topics
         */
    
        channel.exchangeDeclare("publish-exchange", BuiltinExchangeType.FANOUT);
        channel.queueBind("pubsub-queue1","publish-exchange","");
        channel.queueBind("pubsub-queue2","publish-exchange","");
    

    2、消费者还是监听指定队列即可。

    3.6 Routing

    在这里插入图片描述

    一个生产者,一个交换机,两个队列,两个消费者

    声明一个DIRECT类型的exchange,并且根据Routingkey绑定指定的队列

    绑定生产者和消费者都可以做,因为两边都有channel

    1、生产者在创建DIRECT类型的exchange后绑定相应的队列,并且指定Routingkey。在发送消息是也要指定消息的Routingkey

    //3、创建exchange并且指定类型
    channel.exchangeDeclare("routing-exchange", BuiltinExchangeType.DIRECT);
    
    //4、绑定队列 routing-queue-error routing-queue-info
    channel.queueBind("routing-queue-error", "routing-exchange", "ERROR");
    channel.queueBind("routing-queue-info", "routing-exchange", "INFO");
    
    //5、发送消息并且指定接收的队列的routingkey
    channel.basicPublish("routing-exchange", "ERROR", null, "ERROR-MSG".getBytes());
    channel.basicPublish("routing-exchange", "INFO", null, "INFO-1".getBytes());
    channel.basicPublish("routing-exchange", "INFO", null, "INFO-2".getBytes());
    channel.basicPublish("routing-exchange", "INFO", null, "INFO-3".getBytes());
    

    2、消费者没变化,监听自己的队列即可

    3.7 Topics

    在这里插入图片描述

    一个生产者,一个交换机,两个队列,两个消费者

    1、生产者创建Topic的exchange并且并且指定队列,这次绑定可以通过*和#匹配关键字,对指定RoutingKey内容进行匹配。

    *(星号)可以代替一个单词。

    #(哈希)可以替代零个或多个单词。

    //3、创建exchange
    channel.exchangeDeclare("topics-exchange", BuiltinExchangeType.TOPIC);
    
    //4、绑定队列 topics-queue-1 topics-queue-2
    channel.queueBind("topics-queue-1", "topics-exchange", "zhang.*");
    channel.queueBind("topics-queue-2", "topics-exchange", "wang.*");
    channel.queueBind("topics-queue-2", "topics-exchange", "wang.#;");
    
    //3、发送消息到exchange
    channel.basicPublish("topics-exchange", "zhang.sna", null, "张三".getBytes());
    channel.basicPublish("topics-exchange", "zhang.sna.sna", null, "张三三".getBytes());
    channel.basicPublish("topics-exchange", "wang.wu", null, "王五".getBytes());
    channel.basicPublish("topics-exchange", "wang.ergou", null, "王二狗".getBytes());
    System.out.println("生产者发布消息成功!");
    

    2、消费者无变化,监听指定的队列即可。

    四.Springboot整合RabbitMQ

    4.1 Springboot整合RabbitMQ

    1.创建Springboot工程

    2.导入依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    3.写入配置文件application.yml

    spring:
      rabbitmq:
        host: 192.168.164.133
        virtual-host: /test-1
        username: test
        password: test
        port: 5672
    

    4.创建config配置,声明exchange和queue,并且绑定在一起

    @Configuration
    public class MQConfig {
    
    
        //1、创建exchange topic
        @Bean
        public TopicExchange getTopicExchage() {
            return new TopicExchange("boot-topic-exchange", true, false);
        }
    
        //2、创建queue
        @Bean
        public Queue getQueue(){
            return new Queue("topic-queue",true,false,false,null);
        }
    
    
        //3、绑定
        @Bean
        public Binding geteBinding(TopicExchange topicExchange,Queue queue){
            return BindingBuilder.bind(queue).to(topicExchange).with("ERROR");
        }
    
    }
    
    

    5.发布消息到RabbitMQ

      @Test
        void contextLoads() throws IOException {
            rabbitTemplate.convertAndSend("boot-topic-exchange","ERROR","error");
        }
    
    

    6.创建消费者监听器接收消息

    @Component
    public class Consumer {
        @RabbitListener(queues = "topic-queue")
        public void getMassage(Object massage){
            System.out.println("接收到消息:"+massage);
        }
    
    }
    
    

    4.2 Springboot整合RabbitMQ手动ACK

    1.修改配置文件 关闭自动ack 开启手动ack

    spring:
      rabbitmq:
        listener:
          simple:
            acknowledge-mode: manual
    

    2.修改消费者类为手动ack

    @RabbitListener(queues = "topic-queue")
    public void getMassage(String msg, Channel channel, Message message) throws IOException {
        System.out.println("接收到消息:"+msg);
        //手动ack
       channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
    

    五.RabbitMQ的其他操作

    5.1 消息的可靠性

    生产者和RabbitMQ的问题 publiser—>exchange

    5.1.1 confirm机制

    RabbitMQ的事务:保证生产者能将消息100%发送到RabbitMQ中。如果失败则回滚,并且记录日志,后面在定时发送。但是事务的效率太低,加了事务,RabbitMQ的效率要比平常低100倍

    除了事务,RabbitMQ还提供了Confirm机制。比事务效率高。

    1.普通Confirm方式

    //3.1、开启Confirm
    channel.confirmSelect();
    //3.2、发送消息
    channel.basicPublish("", "hello3world", null, "hello world".getBytes());
    //3.3、判断消息是否发送成功 返回boolean 返回flase则不会将消息发送到RabbitMQ
    if (channel.waitForConfirms()){
        System.out.println("消息发送成功!");
    }else {
      throw  new Exception("消息发送失败");
    }
    

    2.批量Confirm方式

    //3.1、开启Confirm
    channel.confirmSelect();
    //3.2、发送消息
    for (int i = 0; i <1000 ; i++) {
        channel.basicPublish("", "hello3world", null, ("hello world"+i).getBytes());
    }
    
    //3.3、判断消息是否发送成功 当你发送全部的消息,只要有一个消息发送失败,则所有发送失败,抛出IoException
    channel.waitForConfirmsOrDie();
    
    

    3.异步Confirm方式

    //3.1、开启Confirm
    channel.confirmSelect();
    //3.2、发送消息
    for (int i = 0; i <1000 ; i++) {
        channel.basicPublish("", "hello3world", null, ("hello world"+i).getBytes());
    }
    
    //3.3、开启异步回调
    channel.addConfirmListener(new ConfirmListener() {
       public void handleAck(long deliveryTag, boolean multiple) throws IOException {
           System.out.println("消息发送成功,标示为:"+deliveryTag+",是否为批量操作:"+multiple);
       }
    
       public void handleNack(long deliveryTag, boolean multiple) throws IOException {
           System.out.println("消息发送失败,标示为:"+deliveryTag+",是否为批量操作:"+multiple);
       }
    });
    System.in.read();
    

    在这里插入图片描述

    5.1.2 Return机制

    ​ Confirm只能保证消息到exchange,无法保证在RibbtMQ中exchange将消息分发到queue中

    ​ 而且exchange不能持久化消息,queue是可以持久化消息的

    ​ 采用Return机制来保证消息是否由exchange发送到了queue中!

    1.开启Return机制

    解决exchange->queue问题

    //3.1.2、开启Return机制
    channel.addReturnListener(new ReturnListener() {
        public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
            //当送达失败是才会回调
            System.out.println(new String(body,"utf-8")+",消息没有送达到queue中");
        }
    });
    
    

    2.发送消息 boolean mandatory为trun才会有Return机制效果

    //3.2、发送消息,第三个设置为true才会有Return机制,默认为false
    //使用void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
    for (int i = 0; i <1000 ; i++) {
        channel.basicPublish("", "xxx", true,null, ("hello world"+i).getBytes());
    }
    
    

    5.1.3 通过Springboot实现Confirm以及Return机制

    1、编写配置文件,开启Confirm和Return

    spring:
      rabbitmq:
        publisher-confirm-type: simple
        publisher-returns: true
    

    2、 指定RabbitTemplate对象,开启Confirm个Return,实现回调方法

    @Component
    public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    //在类加载的时候回执行这个方法
    @PostConstruct
    public  void initMethod(){
        rabbitTemplate.setConfirmCallback(this::confirm);
        rabbitTemplate.setReturnCallback(this::returnedMessage);
    }
    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack){
            System.out.println("消息发送到Exchange成功!!!");
        }else {
            System.out.println("消息发送到Exchange失败");
        }
    
    }
    
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("消息发送到Queue失败!!!");
    
    }
    
    }
    
    

    5.2消息重复消费

    重复消费消息,会对非幂等性操作造成问题

    重复消费消息的原因是,消费者1消费完消息后没有给RabbitMQ一个ack

    在这里插入图片描述

    为了解决消费者重复消费消息的问题,可以采用Redis,在消费者消费值前,将消息的id放入Redis中

    id-0(正在执行业务)

    id-1(业务执行成功)

    如果ack失败,在RabbitMQ将消息交给其他消费者是,先执行Reids的setnx方法,如果key存在,获取key的值,如果是0,什么都不做。如果是1,手动ack给RabbitMQ。

    极端情况:在第一个消费者执行业务时,出现了死锁,导致Reids中消息的key-value一直为0

    解决方法:在setnx的基础上给Redis的消息缓存设置失效时间

    5.2.1 消息重复消费

    1、生产者在发送消息时,指定messageId

    //4、添加携带属性
    AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
            .deliveryMode(1) //设置当前消息是否开启持久化 1-需要持久化 0-不需要持久化
            .messageId(UUID.randomUUID().toString()).build();
    //5、发送消息 Hello World方式 直接发送到queue中
    channel.basicPublish("", "helloworld", properties, "Hello World".getBytes());
    

    2、消费者在消费信息时,在监听队列回调方法中,根据业务逻辑来操作redis

    DefaultConsumer consumer = new DefaultConsumer(channel){
    Jedis jedis = new Jedis("192.168.164.136",6379);
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String messageId = properties.getMessageId();
        //4.1 通过setnx判断当前消息是否为首次消费 true:放入key-value
        String result = jedis.set(messageId,"0","NX","EX",10);
        if (null!=result&&result.equalsIgnoreCase("OK")){
            System.out.println("接收到消息:"+new String(body,"UTF-8"));
            //4.2消费成功 把messageId的Value设置为1
            jedis.set(messageId,"1");
            channel.basicAck(envelope.getDeliveryTag(),false);
        }else {
            //4.3如果当前setnx为false 如果是0那就直接return 如果为1那就手动ack
            String s = jedis.get(messageId);
            if ("1".equalsIgnoreCase(s)){
                //手动ack
                channel.basicAck(envelope.getDeliveryTag(),false);
            }else{
                return;
            }
        }
    }
    };
    
    5.2.1 Springboot解决消息重复消费

    1、添加redis依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>]
    

    2、修改配置文件

      redis:
        host: 192.168.164.136
        port: 6379
    

    3、修改生产者

    @Test
    void contextLoads() throws IOException {
        //携带信息发送
        CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("boot-topic-exchange","ERROR","error",messageId);
        System.in.read();
    }
    

    4、修改消费者

    @RabbitListener(queues = "topic-queue")
    public void getMassage(String msg, Channel channel, Message message) throws IOException {
        //1、获取messageID
        String messageID = message.getMessageProperties().getHeader("spring_returned_message_correlation");
        //2、用redis的setnx()方法放入值 放入成功返回true 放入失败返回false
        if (redisTemplate.opsForValue().setIfAbsent(messageID, "0", 10, TimeUnit.SECONDS)) {
            //3、消费消息
            System.out.println("接收到消息:" + msg);
            //4、设置value值为1
            redisTemplate.opsForValue().set(messageID, "1",10,TimeUnit.SECONDS);
            //5、手动ack
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } else {
            //6、如果放入值失败 获取messageID对应的value
            String s = redisTemplate.opsForValue().get(messageID);
            //7、value=0 什么都不做
            if ("0".equalsIgnoreCase(s)) {
                return;
                //8、value=1 手动ack
            } else {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
    
        }
    
    }
    
    展开全文
  • C#使用RabbitMQ教程【一】.rar,C#使用RabbitMQ教程【一】,Receive,bin,Release,Debug,Receive.exe.config,Receive.exe,Receive.vshost.exe.config,Receive.vshost.exe.manifest,RabbitMQ.Client.dll,RabbitMQ....
  • NodeJS的RabbitMQ教程实现 此代码使用为 nodeJS 实现官方 RabbitMq 教程。 要获取依赖项,请使用根项目目录中的 NPM ##tutorial1 - “你好世界!” ##tutorial2 - “工作队列” ##tutorial3 - “发布/订阅” ...
  • rabbitmq教程md.zip

    2020-12-18 08:48:46
    rabbitmq教程,里面为rabbitmq总结的md文档,包含使用示例!
  • RabbitMQ消费者向RabbitMQ教程学习-接收消息的项目遵循的Java教程在Docker上运行RabbitMQ我们在本地下载并运行Rabbitmq以在Docker中进行访问我们更改端口以响应预期的端口使用docker ps docker知道端口,或通过...
  • 消息队列1、MQ的相关概念1.1 什么时MQ1.2 为什么要用MQ1.3 MQ的分类1.4 MQ的选择2、RabbitMQ2.1 RabbitMQ的概念 1、MQ的相关概念 1.1 什么时MQ MQ(message queue),从字面意思上看,本质上是个队列,FIFO先入先出,...

    1、MQ的相关概念

    1.1 什么时MQ

    MQ(message queue),从字面意思上看,本质上是个队列,FIFO先入先出,只不过队列中存放的内容是Messahe而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了MQ之后,消息发送上游只需要依赖MQ,不需要依赖其他服务。

    1.2 为什么要用MQ

    流量削峰
    举个例子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限制订单超过一万后不允许用户下单。使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好。
    在这里插入图片描述
    应用解耦
    以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。
    在这里插入图片描述

    异步处理
    有些服务间调用是异步的,例如 A 调用 B,B 需要花费很长时间执行,但是 A 需要知道 B 什么时候可以执行完,以前一般有两种方式,A 过一段时间去调用 B 的查询 api 查询。或者 A 提供一个 callback api,B 执行完之后调用 api 通知 A 服务。这两种方式都不是很优雅,使用消息总线,可以很方便解决这个问题,A 调用 B 服务后,只需要监听 B 处理完成的消息,当 B 处理完成后,会发送一条消息给 MQ,MQ 会将此消息转发给 A 服务。这样 A 服务既不用循环调用 B 的查询 api,也不用提供 callback api。同样 B 服务也不用做这些操作。A 服务还能及时的得到异步处理成功的消息。
    在这里插入图片描述

    1.3 MQ的分类

    1.3.1.ActiveMQ

    • 优点:单机吞吐量万级,时效性 ms 级,可用性高,基于主从架构实现高可用性,消息可靠性较低的概率丢失数据 缺点:官方社区现在对
    • ActiveMQ 5.x 维护越来越少,高吞吐量场景较少使用。
    • 尚硅谷官网视频:http://www.gulixueyuan.com/course/322

    1.3.2.Kafka

    大数据的杀手锏,谈到大数据领域内的消息传输,则绕不开 Kafka,这款为大数据而生的消息中间件,以其百万级 TPS 的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥着举足轻重的作用。目前已经被 LinkedIn,Uber, Twitter, Netflix 等大公司所采纳。

    • 优点: 性能卓越,单机写入 TPS 约在百万条/秒,最大的优点,就是吞吐量高。时效性 ms 级可用性非常高,kafka 是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用,消费者采用 Pull 方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次;有优秀的第三方Kafka Web 管理界面 Kafka-Manager;在日志领域比较成熟,被多家公司和多个开源项目使用;功能支持:功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用
    • 缺点:Kafka 单机超过 64 个队列/分区,Load 会发生明显的飙高现象,队列越多,load 越高,发送消息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;支持消息顺序,但是一台代理宕机后,就会产生消息乱序,社区更新较慢;

    1.3.3.RocketMQ

    RocketMQ 出自阿里巴巴的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进。被阿里巴巴广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog 分发等场景。

    • 优点:单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到 0 丢失,MQ 功能较为完善,还是分布式的,扩展性好,支持 10 亿级别的消息堆积,不会因为堆积导致性能下降,源码是 java 我们可以自己阅读源码,定制自己公司的 MQ
    • 缺点:支持的客户端语言不多,目前是 java 及 c++,其中 c++不成熟;社区活跃度一般,没有在 MQ核心中去实现 JMS 等接口,有些系统要迁移需要修改大量代码

    1.3.4.RabbitMQ

    2007 年发布,是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。

    • 优点:由于 erlang 语言的高并发特性,性能较好;吞吐量到万级,MQ 功能比较完备,健壮、稳定、易用、跨平台、支持多种语言 如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持 AJAX 文档齐全;开源提供的管理界面非常棒,用起来很好用,社区活跃度高;更新频率相当高
      https://www.rabbitmq.com/news.html
    • 缺点:商业版需要收费,学习成本较高

    1.4 MQ的选择

    1.4.1.Kafka

    Kafka 主要特点是基于 Pull 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。大型公司建议可以选用,如果有日志采集功能,肯定是首选 kafka 了。
    尚硅谷官网 kafka 视频连接 http://www.gulixueyuan.com/course/330/tasks

    1.4.2.RocketMQ

    天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。RoketMQ 在稳定性上可能更值得信赖,这些业务场景在阿里双 11 已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择 RocketMQ。

    1.4.3.RabbitMQ

    结合 erlang 语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高,管理界面用起来十分方便,如果你的数据量没有那么大,中小型公司优先选择功能比较完备的 RabbitMQ。

    2、RabbitMQ

    2.1 RabbitMQ的概念

    RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据。

    2.2 四大核心概念

    生产者
    产生数据发送消息的程序是生产者
    交换机
    交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定
    队列
    队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
    消费者
    消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
    在这里插入图片描述

    2.3 RabbitMQ的核心部分

    在这里插入图片描述

    2.4. 各个名词介绍

    在这里插入图片描述

    • Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
    • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似 于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出 多个
      vhost,每个用户在自己的 vhost 创建 exchange/queue 等
    • Connection:publisher/consumer 和 broker 之间的 TCP 连接
    • Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP
    • Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程 序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客
      户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的
      Connection 极大减少了操作系统建立 TCP connection 的开销
    • Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发 消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe)
      and fanout (multicast)
    • Queue:消息最终被送到这里等待 consumer 取走
    • Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保 存到 exchange 中的查询表中,用于 message 的分发依据

    2.5. 安装

    1.官网地址

    https://www.rabbitmq.com/download.html

    2.文件上传

    上传到/usr/local/software 目录下(如果没有 software 需要自己创建)
    在这里插入图片描述

    3.安装文件(分别按照以下顺序安装)

    rpm -ivh erlang-21.3-1.el7.x86_64.rpm
    yum install socat -y
    rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm

    3.常用命令(按照以下顺序执行)

    添加开机启动 RabbitMQ 服务
    chkconfig rabbitmq-server on
    启动服务
    /sbin/service rabbitmq-server start
    查看服务状态
    /sbin/service rabbitmq-server status
    在这里插入图片描述

    停止服务(选择执行)
    /sbin/service rabbitmq-server stop
    开启 web 管理插件
    rabbitmq-plugins enable rabbitmq_management
    用默认账号密码(guest)访问地址 http://47.115.185.244:15672/出现权限问题

    4.添加一个新的用户

    创建账号
    rabbitmqctl add_user admin 123
    设置用户角色
    rabbitmqctl set_user_tags admin administrator
    设置用户权限
    set_permissions [-p ]
    rabbitmqctl set_permissions -p “/” admin “." ".” “.*”
    用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限
    当前用户和角色
    rabbitmqctl list_users
    在这里插入图片描述

    5.再次利用 admin 用户登录

    在这里插入图片描述

    6. 重置命令

    关闭应用的命令为
    rabbitmqctl stop_app
    清除的命令为
    rabbitmqctl reset
    重新启动命令为
    rabbitmqctl start_app

    展开全文
  • centos7安装RabbitMQ教程

    2022-09-03 17:39:30
    RabbitMQ是用Erlang语言编写的,在本教程中我们将安装最新版本的Erlang到服务器中。Erlang在默认的YUM存储库中不可用,因此您将需要安装EPEL存储库。我们已经安装了Erlang,我们可以进一步下载RabbitMQ。Erlang现在...

    更新基本系统

    在安装任何软件包之前,建议您使用以下命令更新软件包和存储库。

    yum -y update

    更新系统后,进一步安装Erlang。

    安装Erlang

    RabbitMQ是用Erlang语言编写的,在本教程中我们将安装最新版本的Erlang到服务器中。 Erlang在默认的YUM存储库中不可用,因此您将需要安装EPEL存储库。 运行以下命令相同。

    yum -y install epel-release
    
    yum -y update

    现在使用以下命令安装Erlang。

    yum -y install erlang socat

    您现在可以使用以下命令检查Erlang版本。

    erl -version

    您将得到以下输出。

    [root@liptan-pc ~]# erl -version
    Erlang (ASYNC_THREADS,HIPE) (BEAM) emulator version 5.10.4

    要切换到Erlang shell,可以键入以下命令。

    erl

    shell将更改,您将得到以下输出。

    Erlang R16B03-1 (erts-5.10.4) [source] [64-bit] [async-threads:10] [hipe] [kernel-poll:false]
    
    Eshell V5.10.4  (abort with ^G)
    1>
    

    您可以通过按ctrl + C两次退出shell。 Erlang现在安装在系统上,现在可以继续安装RabbitMQ。

    安装RabbitMQ

    RabbitMQ为预编译并可以直接安装的企业Linux系统提供RPM软件包。 唯一需要的依赖是将Erlang安装到系统中。 我们已经安装了Erlang,我们可以进一步下载RabbitMQ。 通过运行下载Erlang RPM软件包。

    RabbitMQhttps://download.csdn.net/download/Jifei5201314/86507905
    

    如果你没有安装wget ,可以运行yum -y install wget 。 您可以随时找到最新版本的RabbitMQ下载页面的链接。

    通过运行导入GPG密钥:

    rpm –import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc

    运行RPM安装RPM包:

    rpm -Uvh rabbitmq-server-3.6.14-1.el7.noarch.rpm

    RabbitMQ现已安装在您的系统上。

    运行RabbitMQ

    您可以通过运行以下命令启动RabbitMQ服务器进程。

    systemctl start rabbitmq-server

    要在引导时自动启动RabbitMQ,请运行以下命令。

    systemctl enable rabbitmq-server

    要检查RabbitMQ服务器的状态,请运行:

    systemctl status rabbitmq-server
    访问web控制台
    

    启动RabbitMQ Web管理控制台,方法是运行:

    rabbitmq-plugins enable rabbitmq_management

    通过运行以下命令,将RabbitMQ文件的所有权提供给RabbitMQ用户:

    chown -R rabbitmq:rabbitmq /var/lib/rabbitmq/

    现在,您将需要为RabbitMQ Web管理控制台创建管理用户。 运行以下命令相同。

    rabbitmqctl add_user admin StrongPassword
    rabbitmqctl set_user_tags admin administrator
    rabbitmqctl set_permissions -p / admin “.*” “.*” “.*”
    

    将管理员更改为管理员用户的首选用户名。 确保将StrongPassword更改为非常强大的密码。

    要访问RabbitMQ的管理面板,请使用您最喜爱的Web浏览器并打开以下URL。

    http://Server_IP:15672/

    您将看到以下登录页面。

    展开全文
  • yum安装RabbitMQ教程

    2022-07-19 20:01:54
    yum按照RabbiMQ教程
  • 一文详解Windows安装RabbitMQ教程,同时解决在安装过程中出现的问题,并说明rabbitMq对应的erlang版本,又把rabbitMq和erlang存储到我的百度云盘和微云中,读者可以直接下载即可。
  • 在centos6安装rabbitmq教程,真实有效。需要有esl-erlang_17.3-1~centos~6_amd64.rpm,esl-erlang-compat-R14B-1.el6.noarch.rpm和rabbitmq-server-3.4.1-1.noarch.rpm
  • Spring Boot整合RabbitMQ教程

    千次阅读 2022-03-22 15:11:03
    创建RabbitMQ的消费方 第一步:首先创建消费者spring boot项目,然后引入maven <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp<...
  • Linux系统CentOS 8.x安装RabbitMQ教程

    千次阅读 2022-04-23 07:57:10
    运行环境准备,下载准备好安装包 基础编译工具库安装 yum install -y make gcc gcc-c++ glibc-devel...1、rabbitmq版本和 erlang 版本兼容分析 https://www.rabbitmq.com/which-erlang.html 2、下载 erlang、soca
  • 很好用的RabbitMQ教程,使用简介,和spring集成, 和springboot集成 ,解释简单易懂
  • Linux系统安装rabbitmq教程

    千次阅读 2021-12-31 10:56:02
    1,rabbitmq是用erlang语言写的,所以安装的话要下载erlang环境,两者安装包都在下面链接中链接: 注意:我的系统是centeros7! 不是的话可以去官网下载 https://pan.baidu.com/s/12zaoJ8M5qPyTY4D03mLhug 提取码:...
  • RabbitMQ教程+代码.rar

    2020-09-13 14:31:43
    RabbitMQ教程,内含代码,从安装到基础知识到最终实现都有详细指导。 实现方式:1.Java基础实现,2.整合springboot。
  • 原因:RabbitMQ服务端代码是使用并发式语言Erlang编写的,安装Rabbit MQ的前提是安装Erlang。 下载地址:http://www.erlang.org/downloads 我的电脑是64位的,各位根据自己的电脑配置选择相应的,速度较慢,可以去...
  • Ubuntu安装RabbitMQ教程 摘要 本篇主要给大家介绍ubuntu安装rabbitmq的过程以及相关遇到的坑,同时也避免自己忘记,所以写下这篇博文供大家学习。 相关版本 操作系统:Linux阿里云服务器 ubuntu:18.04(root用户) ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 19,780
精华内容 7,912
关键字:

rabbitmq教程