• 使用 JAVA 语言自己动手来写一个MQ (类似ActiveMQ,RabbitMQ) 主要角色 首先我们必须需要搞明白 MQ (消息队列) 中的三个基本角色 Producer Broker Consumer 整体架构如下所示 自定义协议 首先从上一...

    使用 JAVA 语言自己动手来写一个MQ (类似ActiveMQ,RabbitMQ)

    主要角色

    首先我们必须需要搞明白 MQ (消息队列) 中的三个基本角色

    Producer
    Broker
    Consumer
    

    整体架构如下所示

    自定义协议

    首先从上一篇中介绍了协议的相关信息,具体厂商的 MQ(消息队列) 需要遵循某种协议或者自定义协议 , 消息的 生产者和消费者需要遵循其协议(约定)才能后成功地生产消息和生产消息 ,所以在这里我们自定义一个协议如下.

    消息处理中心 : 如果接收到的信息包含"SEND"字符串,即视为生产者发送的消息,消息处理中心需要将此信息存储等待消费者消费

    消息处理中心 : 如果接受到的信息为CONSUME,既视为消费者发送消费请求,需要将存储的消息队列头部的信息转发给消费者,然后将此消息从队列中移除

    消息处理中心 : 如果消息处理中心存储的消息满3条仍然没有消费者进行消费,则不再接受生产者的生产请求

    消息生产者:需要遵循协议将生产的消息头部增加"SEND:" 表示生产消息

    消息消费者:需要遵循协议向消息处理中心发送"CONSUME"字符串表示消费消息

    流程顺序

    项目构建流程

    下面将整个MQ的构建流程过一遍

    1. 新建一个 Broker 类,内部维护一个 ArrayBlockingQueue 队列,提供生产消息和消费消息的方法, 仅仅具备存储服务功能
    2. 新建一个 BrokerServer 类,将 Broker 发布为服务到本地9999端口,监听本地9999端口的 Socket 链接,在接受的信息中进行我们的协议校验, 这里 仅仅具备接受消息,校验协议,转发消息功能;
    3. 新建一个 MqClient 类,此类提供与本地端口9999的Socket链接 , 仅仅具备生产消息和消费消息的方法
    4. 测试:新建两个 MyClient 类对象,分别执行其生产方法和消费方法

    具体使用流程

    1. 生产消息:客户端执行生产消息方法,传入需要生产的信息,该信息需要遵循我们自定义的协议,消息处理中心服务在接受到消息会根据自定义的协议校验该消息是否合法,如果合法如果合法就会将该消息存储到Broker内部维护的 ArrayBlockingQueue 队列中.如果 ArrayBlockingQueue 队列没有达到我们协议中的最大长度将将消息添加到队列中,否则输出生产消息失败.
    2. 消息消息:客户端执行消费消息方法, Broker服务 会校验请求的信息的信息是否等于 CONSUME ,如果验证成功则从Broker内部维护的 ArrayBlockingQueue 队列的 Poll 出一个消息返回给客户端

    代码演示

    消息处理中心 Broker

    /**
     * 消息处理中心
     */
    public class Broker {
        // 队列存储消息的最大数量
        private final static int MAX_SIZE = 3;
    
        // 保存消息数据的容器
        private static ArrayBlockingQueue<String> messageQueue = new ArrayBlockingQueue<String>(MAX_SIZE);
    
        // 生产消息
        public static void produce(String msg) {
            if (messageQueue.offer(msg)) {
                System.out.println("成功向消息处理中心投递消息:" + msg + ",当前暂存的消息数量是:" + messageQueue.size());
            } else {
                System.out.println("消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!");
            }
            System.out.println("=======================");
        }
    
        // 消费消息
        public static String consume() {
            String msg = messageQueue.poll();
            if (msg != null) {
                // 消费条件满足情况,从消息容器中取出一条消息
                System.out.println("已经消费消息:" + msg + ",当前暂存的消息数量是:" + messageQueue.size());
            } else {
                System.out.println("消息处理中心内没有消息可供消费!");
            }
            System.out.println("=======================");
    
            return msg;
        }
    
    }

    消息处理中心服务 BrokerServer

    /**
     * 用于启动消息处理中心
     */
    public class BrokerServer implements Runnable {
    
        public static int SERVICE_PORT = 9999;
    
        private final Socket socket;
    
        public BrokerServer(Socket socket) {
            this.socket = socket;
        }
    
        @Override
        public void run() {
            try (
                    BufferedReader in = new BufferedReader(new InputStreamReader(
                            socket.getInputStream()));
                    PrintWriter out = new PrintWriter(socket.getOutputStream())
            )
            {
                while (true) {
                    String str = in.readLine();
                    if (str == null) {
                        continue;
                    }
                    System.out.println("接收到原始数据:" + str);
    
                    if (str.equals("CONSUME")) { //CONSUME 表示要消费一条消息
                        //从消息队列中消费一条消息
                        String message = Broker.consume();
                        out.println(message);
                        out.flush();
                    } else if (str.contains("SEND:")){
                        //接受到的请求包含SEND:字符串 表示生产消息放到消息队列中
                        Broker.produce(str);
                    }else {
                        System.out.println("原始数据:"+str+"没有遵循协议,不提供相关服务");
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) throws Exception {
            ServerSocket server = new ServerSocket(SERVICE_PORT);
            while (true) {
                BrokerServer brokerServer = new BrokerServer(server.accept());
                new Thread(brokerServer).start();
            }
        }
    }

    客户端 MqClient

    /**
     * 访问消息队列的客户端
     */
    public class MqClient {
    
        //生产消息
        public static void produce(String message) throws Exception {
            //本地的的BrokerServer.SERVICE_PORT 创建SOCKET
            Socket socket = new Socket(InetAddress.getLocalHost(), BrokerServer.SERVICE_PORT);
            try (
                    PrintWriter out = new PrintWriter(socket.getOutputStream())
            ) {
                out.println(message);
                out.flush();
            }
        }
    
        //消费消息
        public static String consume() throws Exception {
            Socket socket = new Socket(InetAddress.getLocalHost(), BrokerServer.SERVICE_PORT);
            try (
                    BufferedReader in = new BufferedReader(new InputStreamReader(
                            socket.getInputStream()));
                    PrintWriter out = new PrintWriter(socket.getOutputStream())
            ) {
                //先向消息队列发送命令
                out.println("CONSUME");
                out.flush();
    
                //再从消息队列获取一条消息
                String message = in.readLine();
    
                return message;
            }
        }
    
    }

    测试MQ

    public class ProduceClient {
    
        public static void main(String[] args) throws Exception {
            MqClient client = new MqClient();
    
            client.produce("SEND:Hello World");
        }
    
    }
    
    public class ConsumeClient {
    
        public static void main(String[] args) throws Exception {
            MqClient client = new MqClient();
            String message = client.consume();
    
            System.out.println("获取的消息为:" + message);
        }
    }

    我们多执行几次客户端的生产方法和消费方法就可以看到一个完整的MQ的通讯过程,下面是我执行了几次的一些日志

    接收到原始数据:SEND:Hello World
    成功向消息处理中心投递消息:SEND:Hello World,当前暂存的消息数量是:1
    =======================
    接收到原始数据:SEND:Hello World
    成功向消息处理中心投递消息:SEND:Hello World,当前暂存的消息数量是:2
    =======================
    接收到原始数据:SEND:Hello World
    成功向消息处理中心投递消息:SEND:Hello World,当前暂存的消息数量是:3
    =======================
    接收到原始数据:SEND:Hello World
    消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!
    =======================
    接收到原始数据:Hello World
    原始数据:Hello World没有遵循协议,不提供相关服务
    
    接收到原始数据:CONSUME
    已经消费消息:SEND:Hello World,当前暂存的消息数量是:2
    =======================
    接收到原始数据:CONSUME
    已经消费消息:SEND:Hello World,当前暂存的消息数量是:1
    =======================
    接收到原始数据:CONSUME
    已经消费消息:SEND:Hello World,当前暂存的消息数量是:0
    =======================
    接收到原始数据:CONSUME
    消息处理中心内没有消息可供消费!
    =======================

    小结

    本章示例代码主要源自分布式消息中间件实践一书 , 这里我们自己使用Java语言写了一个MQ消息队列 , 通过这个消息队列我们对MQ中的几个角色 "生产者,消费者,消费处理中心,协议" 有了更深的理解 ; 那么下一章节我们就来一块学习具体厂商的MQ RabbitMQ

    展开全文
  • 近期公司项目中,听同事提起MQ、JMS,因为之前没有使用过不太了解,所以抽出半天时间学习下,了解下相关概念、功能及其用途。便于再次与人沟通时可以言之有物,后续工作中遇到了,可立马上手,提高工作效率。 学习...
  • 消息队列系列分享大纲:  一、消息队列的概述 二、消息队列之RabbitMQ的使用 三、消息队列之Kafka的使用 四、消息队列之RabbitMQ的原理详解 五、消息队列之Kafka的原理详解 六、消息队列之面试集锦 1.消息...
  • java 消息队列学习 2016-07-04 15:21:09
    消息队列的定义:  JMS介绍  Java Message Service (JMS) 是SUM提出的旨在统一各种MOM(Message-Oriented Middleware)系统接口的规范,它包含点对点(Point to Point, PTP) 和发布/订阅(Publish/...
  • java消息队列rabbitmq学习 2019-05-27 10:47:41
    https://blog.csdn.net/zpcandzhj/article/details/81436980
  • java 技术:消息队列 2017-03-21 20:36:30
    学习过程分为三个步骤: 1 查找资料2 实验实践3 归纳总结 ...2 消息队列MQ技术的介绍和原理,介绍了消息队列MQ的基本知识点。 此处不再对技术内容进行复制粘贴,仅提供学习思路。首先用wha
  • Java实现消息队列服务 2019-08-30 16:35:03
    使用 JAVA 语言自己动手来写一个MQ (类似ActiveMQ,RabbitMQ) 主要角色 首先我们必须需要搞明白 MQ (消息队列) 中的三个基本角色 ProducerBrokerConsumer 整体架构如下所示 自定义协议 首先从上一篇中介绍了协议的...
  • 大型网站架构之分布式消息队列   以下是消息队列以下的大纲,本文主要介绍消息队列概述,消息队列应用场景和消息中间件示例(电商,日志系统)。 本次分享大纲 消息队列概述消息队列应用场景消息中间件示例...
  • 学习java消息队列的实现项目,使用jfinal + jfinal-ext + activeMQ + quartz快速构建。 1.消息队列 消息队列,其实是一种基于数据结构实现的服务。而java语言中的实现,有apache的activeMQ,比较主流。 2.环境...
  • 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题 实现高性能,高可用,可伸缩和最终一致性架构 使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ ...
  • 消息队列”是在消息的传输过程中保存消息的容器。 “消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。 消息被发送到队列中。“消息队列”是在...
  • 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题 实现高性能,高可用,可伸缩和最终一致性架构 使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ ...
  • 主流消息队列对比 2018-08-12 19:15:29
    Ckafka Kafka是linkedin开源的MQ系统,主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于...主流消息队列对比   RabbitMQ Ro...
  • 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题,实现高性能、高可用、可伸缩和最终一致性架构,使用较多的消息队列有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ...
  • Java 帝国之消息队列原创: 刘欣 码农翻身 2017-02-06张家村的历史Java 帝国的张家村正在迎来一次重大的变革。5年前网上购物兴起的时候, 帝国非常看好, 决定向这个领域进军, 于是兴建了张家村, 在这里安装了...
  • 一、消息队列使用场景或者其好处消息队列一般是在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。在项目启动之...
  • Redis不仅可作为缓存服务器,还可用作消息队列。它的列表类型天生支持用作消息队列。如下图所示:  由于Redis的列表是使用双向链表实现的,保存了头尾节点,所以在列表头尾两边插取元素都是非常快的。  所以可以...
  • 文章目录消息队列其实很简单一 什么是消息队列二 为什么要用消息队列(1) 通过异步处理提高系统性能(削峰、减少响应所需时间)(2) 降低系统耦合性三 使用消息队列带来的一些问题四 JMS VS AMQP4.1 JMS4.1.1 JMS 简介...
  • 消息队列介绍及选型 2018-03-30 21:42:25
    消息队列提供了异步处理机制,允许用户把消息放入队列,但并不立即处理它。想在队列中放入多少消息就放多少,然后在需要的时候再去处理他。解耦 降低工程间的强依赖程度,针对异构系统进行适配。在项目启动之初来...
1 2 3 4 5 ... 20
收藏数 101,161
精华内容 40,464