精华内容
下载资源
问答
  • 数据包发送与接收

    千次阅读 2021-12-09 18:37:52
    数据包发送与接收的过程

    封装和解封装

    应用层---HTTP超文本传输协议---TCP 80端口

            HTTPS=HTTP+SSL/TLS(比HTTP更安全)---TCP 443端口

            FTP文件传输协议---TCP 20/21端口

    TFTP简单文件传输协议---UDP 69端口

    telent远程登录协议---TCP 23端口

    SSH ---TCP 22端口

    DHCP 动态主机配置协议(自动获取IP地址---UDP67/68端口

    DNS 域名解析协议---UDP/TCP53端口

    传输层---端口号---TCP协议/UDP协议

    TCP和UDP的区别

    1. TCP是面向连接的协议,UDP是无连接的协议;
    2. TCP协议传输是可靠的,UDP协议传输“尽力而为”;
    3. TCP可以进行流控,UDP不行;
    4. TCP可以进行分段,UDP不行;
    5. TCP传输速度较慢,占用资源较大;UDP传输速度较快,占用资源小。

    TCP和UDP的场景应用:TCP更适合对传输可靠性要求较高,但是对速度要求较小的场景;UDP更适合对苏的要求较高,对可靠性要求较低的场景(即时通讯类)

    网络层---IP地址---IP协议

    数据链路层---MAC地址---以太网协议---以太网:早期局域网的解决方案,现在也用在广域网中。就是依靠MAC地址寻址的一二层网络。

    物理层

     

    什么是面向连接

    在正式传输数据之前,先通过预备好的协议(TCP协议),建立点到点的连接,之后再传输数据。

    TCP头部:

    URG(urgent紧急):紧急标志。紧急标志为"1"表明该位有效。

    ACK(acknowledgement 确认) :确认标志。表明确认编号栏有效。大多数情况下该标志位是置位的。TCP报头内的确认编号栏内包含的确认编号(w+1)为下一个预期的序列编号,同时提示远端系统已经成功接收所有数据。

    PSH(push传送):推标志。该标志置位时,接收端不将该数据进行队列处理,而是尽可能快地将数据转由应用处理。在处理Telnet或rlogin等交互模式的连接时,该标志总是置位的。

    RST:复位标志。用于复位相应的TCP连接。

    SYN(synchronous建立联机):同步标志。表明同步序列编号栏有效。该标志仅在三次握手建立TCP连接时有效。它提示TCP连接的服务端检查序列编号,该序列编号为TCP连接初始端(一般是客户端)的初始序列编号。在这里,可以把TCP序列编号看作是一个范围从0到4,294,967,295的32位计数器。通过TCP连接交换的数据中每一个字节都经过序列编号。在TCP报头中的序列编号栏包括了TCP分段中第一个字节的序列编号。

    FIN(finish结束) :结束标志。

    Sequence number(顺序号码)
    Acknowledge number(确认号码)

    伪头部校验---32位源IP地址,32位目标IP地址,8位保留,8位协议,16位总长度---反码相加法

     

    UDP头部:

     

    TCP的三次握手---建立连接,不牵扯到任何数据,A发完后B即可同时回复同意和想要建立连接,所以为三次握手

     

    TCP的四次挥手---断开连接,存在数据传输,A先发完即可发送断开连接,B需要等自己全部发完才可发送断开连接,若A,B同时发完数据,则存在三次挥手的可能,但本质上还是四次挥手

     

    TCP传输可靠性的保障机制---确认,重传,排序,控流

    控流---滑动窗口流动机制:为了追求更快的传输效率,A会一直尝试增加win窗口的数量

     

    IP协议

     

    TCP和IP都是可变长头部

    TTL---生存时间---每当数据包经过一次路由器的转发,这个TTL值都将减1;当一个是数据包中的TTL值为0时,则路由器将不再对其进行转发,将直接丢弃。

    TCP---6

    UDP---17

    ICMP---1

    MTU---最大传输单元---1500字节(默认)---应用于网络层

    MSS---最大段长度---1460字节(减去IP协议和TCP协议)---在TCP三次握手过程中包含MSS,和SYN一起发送,取两者中小的那个。---应用于传输层

    <Huawei>---用户视图---<>

    用户视图只能执行查看操作,但是不能对设备进行配置

    <Huawei>display ip interface brief---查看接口IP配置情况摘要

    Physical---物理层面具备通信条件

    Protocol---协议层面具备通讯条件

    <Huawei>system-view---进入系统视图

    [Huawei]---系统视图---可以对设备进行全局类的配置

    [Huawei]sysname XX---修改设备名称

    [Huawei]interface GigabitEthernet 0/0/0------进入接口视图

    [Huawei-GigabitEthernet0/0/0]

    [Huawei-GigabitEthernet0/0/0]ip address 192.168.1.1 24---配置IP地址命令

    [Huawei-GigabitEthernet0/0/0]undo ip address 192.168.1.1 24---删除该操作---所有删除操作都是在原操作的基础上加上undo

    帮助系统

    Tab---自动补全命令

    ?---查看命令后面参数;可以查看该字母开头的所有命令

    展开全文
  • 后端开发时总是会用到邮箱开发的一些东西,那么就需要针对不同的邮箱服务商做出不同的邮件接收发送服务器地址和端口号的配置了,本文中主要介绍和记录的主要有两种邮件服务器:POP3和SMTP及其端口号(其实还有一种...

    各邮箱服务商的收、发邮件服务器地址、端口号

    后端开发邮件的收发功能(Java后端实现多附件邮件发送案例讲解)时总是需要针对不同的邮箱服务商做出不同的邮件接收、发送服务器地址和端口号的配置,本文中主要介绍和记录两种常用的邮件服务器:POP3和SMTP及其端口号(其实还有一种IMAP类型的,但是不建议使用,因为这种类型的权限太大,容易出问题),两种服务器协议分别是介绍分别如下:

    POP3

    • POP3是Post Office Protocol3的简称,即邮局协议的第3个版本,它规定怎样将个人计算机连接到Internet的邮件服务器和下载电子邮件的电子协议。它是因特网电子邮件的第一个离线协议标准,POP3允许用户从服务器上把邮件存储到本地主机(即自己的计算机)上,同时删除保存在邮件服务器上的邮件,而POP3服务器则是遵循POP3协议的接收邮件服务器,用来接收电子邮件的。

    SMTP

    • SMTP 的全称是“Simple Mail Transfer Protocol”,即简单邮件传输协议。它是一组用于从源地址到目的地址传输邮件的规范,通过它来控制邮件的中转方式。SMTP 协议属于TCP/IP 协议簇,它帮助每台计算机在发送或中转信件时找到下一个目的地。SMTP 服务器就是遵循 SMTP 协议的发送邮件服务器。
    • SMTP 认证,简单地说就是要求必须在提供了账户名和密码之后才可以登录 SMTP 服务器,这就使得那些垃圾邮件的散播者无可乘之机。
    • 增加 SMTP 认证的目的是为了使用户避免受到垃圾邮件的侵扰。

    国内常用各大邮箱的收发邮件服务器及其端口号:

    • 网易163邮箱(mail.163.com):
      POP3服务器地址:pop.163.com(端口:110)
      SMTP服务器地址:smtp.163.com(端口:25)

    • 网易126邮箱(mail.126.com):
      POP3服务器地址:pop.126.com(端口:110)
      SMTP服务器地址:smtp.126.com(端口:25)

    • 移动139邮箱(mail.10086.cn):
      POP3服务器地址:POP.139.com(端口:110)
      SMTP服务器地址:smtp.139.com(端口:25)

    • 腾讯QQ邮箱(mail.qq.com):
      POP3服务器地址:pop.qq.com(端口:110)
      SMTP服务器地址:smtp.qq.com (端口:25)

    • 腾讯QQ企业邮箱(exmail.qq.com) :
      POP3服务器地址:pop.exmail.qq.com (SSL启用 端口:995)
      SMTP服务器地址:smtp.exmail.qq.com(SSL启用 端口:587/465)

    • 谷歌Gmail邮箱(mail.google.com):
      POP3服务器地址:pop.gmail.com(SSL启用 端口:995)
      SMTP服务器地址:smtp.gmail.com(SSL启用 端口:587)

    • 腾讯Foxmail邮箱(mail.qq.com):
      POP3服务器地址:pop.foxmail.com(端口:110)
      SMTP服务器地址:smtp.foxmail.com(端口:25)

    • 新浪sina邮箱(mail.sina.com.cn):
      POP3服务器地址:pop3.sina.com.cn(端口:110)
      SMTP服务器地址:smtp.sina.com.cn(端口:25)

    • 新浪sinaVIP邮箱(mail.sina.com.cn):
      POP3服务器:pop3.vip.sina.com (端口:110)
      SMTP服务器:smtp.vip.sina.com (端口:25)

    • 搜狐sohu邮箱(mail.sohu.com):
      POP3服务器地址:pop3.sohu.com(端口:110)
      SMTP服务器地址:smtp.sohu.com(端口:25)

    • 雅虎yahoo邮箱(login.yahoo.com):
      POP3服务器地址:pop.mail.yahoo.com
      SMTP服务器地址:smtp.mail.yahoo.com

    • 雅虎yahoo.com.cn邮箱(login.yahoo.com):
      POP3服务器地址:pop.mail.yahoo.com.cn(端口:995)
      SMTP服务器地址:smtp.mail.yahoo.com.cn(端口:587 )

    • 微软HotMail邮箱(mail.live.com) :
      POP3服务器地址:pop3.live.com (端口:995)
      SMTP服务器地址:smtp.live.com (端口:587)

    • 263.net:
      POP3服务器地址:pop3.263.net(端口:110)
      SMTP服务器地址:smtp.263.net(端口:25)

    • 263.net.cn:
      POP3服务器地址:pop.263.net.cn(端口:110)
      SMTP服务器地址:smtp.263.net.cn(端口:25)

    • x263.net:
      POP3服务器地址:pop.x263.net(端口:110)
      SMTP服务器地址:smtp.x263.net(端口:25)

    • 21cn.com:
      POP3服务器地址:pop.21cn.com(端口:110)
      SMTP服务器地址:smtp.21cn.com(端口:25)

    • china.com:
      POP3服务器地址:pop.china.com(端口:110)
      SMTP服务器地址:smtp.china.com(端口:25)

    • tom.com:
      POP3服务器地址:pop.tom.com(端口:110)
      SMTP服务器地址:smtp.tom.com(端口:25)

    • etang.com:
      POP3服务器地址:pop.etang.com
      SMTP服务器地址:smtp.etang.com
      ————————————————
      本文为CSDN博主「Wjhsmart」原文链接:https://blog.csdn.net/Wjhsmart/article/details/109203361

    展开全文
  • AMQP(Advanced message queuing protocol)是一个提供统一消息服务的应用层标准协议,基于此协议的客户端消息中间件可传递消息,并不受客户端、中间件等不同产品,不同开发语言等条件的限制。ActiveMQ是基于JMS...

    1.简介

    所有MQ产品从模型抽象上来说都是一样的过程。消费者订阅某个队列。生产者创建消息,然后发布到队列,最后将消息发送到监听的消费者。

    AMQP(Advanced message queuing protocol)是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受客户端、中间件等不同产品,不同开发语言等条件的限制。

    ActiveMQ是基于JMS(Java Message Service)协议的消息中间件。区别如下:

    88369e219d8018135717de5b764db88f.png

    Rabbit模型如下:

    08879534a46cddb18016a3ce37156378.png

    1.Message。消息,是不具体的。由消息头和消息体组成。消息体是不透明的,而消息头是一系列可选属性组成,这些属性包括routing-key(路由键)、priority(优先级)、delivery-mode(是否持久存储)等

    2.Publisher。消息的生产者,也是一个向交换机发布消息的客户端应用程序。

    3.Exchanger。交换机,用来接收生产者发布的消息并将这些消息路由给服务器中的队列。

    4.Binging。绑定,用于消息队列和交换器之间的管理。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则。所以可以将交换器理解成一个由绑定构成的路由表。

    5.Queue。消息队列,用来保存消息知道发送给消费者。一个消息可投入一个或对个队列。

    6.Connection。网络连接,比如一个TCP连接。

    7.Channel。信道,多路复用连接中的一条独立的双向数据流通道,可读可写。一个Connection包括多个channel。因为对于操作系统来说建立和销毁TCP是非常昂贵的开销,所以引入信道的概念,以复用一条TCP连接。

    8.Consumer。消费者,从消息队列取得消息的客户端应用程序。

    9.VirtualHost。虚拟主机。表示一批交换机、消息队列和相关对象。vhost本质上是一个mini版的RabbitMQ服务器,拥有自己的队列、绑定、交换器和权限控制;vhost通过在各个实例间提供逻辑上分离,允许你为不同应用程序安全保密地运行数据;vhost是AMQP概念的基础,必须在连接时进行指定,RabbitMQ包含了默认vhost:“/”。

    10.Borker。表示消息队列服务器实体。表示启动一个rabbitmq所包含的进程。

    2.使用

    1.简单队列模式

    不涉及交换机的模型如下:

    2c3f4935a3f5eae2b8129151eb32c1e5.png

    pom文件引入如下依赖:

    com.rabbitmq

    amqp-client

    5.4.3

    1.消息生产者

    packagerabbitmq;importjava.io.IOException;importjava.util.concurrent.TimeoutException;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;public classProducer {public staticConnectionFactory getConnectionFactory() {//创建连接工程,下面给出的是默认的case

    ConnectionFactory factory = newConnectionFactory();

    factory.setHost("192.168.99.100");

    factory.setPort(5672);

    factory.setUsername("guest");

    factory.setPassword("guest");

    factory.setVirtualHost("/");returnfactory;

    }public static void main(String[] args) throwsIOException, TimeoutException {

    ConnectionFactory connectionFactory=getConnectionFactory();

    Connection newConnection= null;

    Channel createChannel= null;try{

    newConnection=connectionFactory.newConnection();

    createChannel=newConnection.createChannel();/*** 声明一个队列。

    * 参数一:队列名称

    * 参数二:是否持久化

    * 参数三:是否排外 如果排外则这个队列只允许有一个消费者

    * 参数四:是否自动删除队列,如果为true表示没有消息也没有消费者连接自动删除队列

    * 参数五:队列的附加属性

    * 注意:

    * 1.声明队列时,如果已经存在则放弃声明,如果不存在则会声明一个新队列;

    * 2.队列名可以任意取值,但需要与消息接收者一致。

    * 3.下面的代码可有可无,一定在发送消息前确认队列名称已经存在RabbitMQ中,否则消息会发送失败。*/createChannel.queueDeclare("myQueue", true, false, false,null);

    String message= "测试消息";/*** 发送消息到MQ

    * 参数一:交换机名称,为""表示不用交换机

    * 参数二:为队列名称或者routingKey.当指定了交换机就是routingKey

    * 参数三:消息的属性信息

    * 参数四:消息内容的字节数组*/createChannel.basicPublish("", "myQueue", null, message.getBytes());

    System.out.println("消息发送成功");

    }catch(Exception e) {

    e.printStackTrace();

    }finally{if (createChannel != null) {

    createChannel.close();

    }if (newConnection != null) {

    newConnection.close();

    }

    }

    }

    }

    注意:5672是rabbitmq暴露的端口,15672是management插件的端口。

    发送成功之后可以从15672端口查看,也可以从15672进行消费,如下:

    eb2712f48ec39aa3ba0b7b01818f9c15.png

    2.消息接收

    packagerabbitmq;importjava.io.IOException;importjava.util.concurrent.TimeoutException;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importcom.rabbitmq.client.DefaultConsumer;importcom.rabbitmq.client.Envelope;importcom.rabbitmq.client.AMQP.BasicProperties;public classConsumer {public staticConnectionFactory getConnectionFactory() {//创建连接工程,下面给出的是默认的case

    ConnectionFactory factory = newConnectionFactory();

    factory.setHost("192.168.99.100");

    factory.setPort(5672);

    factory.setUsername("guest");

    factory.setPassword("guest");

    factory.setVirtualHost("/");returnfactory;

    }public static void main(String[] args) throwsIOException, TimeoutException {

    ConnectionFactory connectionFactory=getConnectionFactory();

    Connection newConnection= null;

    Channel createChannel= null;try{

    newConnection=connectionFactory.newConnection();

    createChannel=newConnection.createChannel();/*** 声明一个队列。

    * 参数一:队列名称

    * 参数二:是否持久化

    * 参数三:是否排外 如果排外则这个队列只允许有一个消费者

    * 参数四:是否自动删除队列,如果为true表示没有消息也没有消费者连接自动删除队列

    * 参数五:队列的附加属性

    * 注意:

    * 1.声明队列时,如果已经存在则放弃声明,如果不存在则会声明一个新队列;

    * 2.队列名可以任意取值,但需要与消息接收者一致。

    * 3.下面的代码可有可无,一定在发送消息前确认队列名称已经存在RabbitMQ中,否则消息会发送失败。*/createChannel.queueDeclare("myQueue", true, false, false,null);/*** 接收消息。会持续坚挺,不能关闭channel和Connection

    * 参数一:队列名称

    * 参数二:消息是否自动确认,true表示自动确认接收完消息以后会自动将消息从队列移除。否则需要手动ack消息

    * 参数三:消息接收者的标签,用于多个消费者同时监听一个队列时用于确认不同消费者。

    * 参数四:消息接收者*/createChannel.basicConsume("myQueue", true, "", newDefaultConsumer(createChannel) {

    @Overridepublic voidhandleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throwsIOException {

    String string= new String(body, "UTF-8");

    System.out.println("接收到d消息: -》 " +string);

    }

    });

    }catch(Exception e) {

    e.printStackTrace();

    }finally{

    }

    }

    }

    注意:消息的确认模式可以为自动也可以为手动,自动确认读取完会自动从队列删除;手动需要自己ack,如果设为手动也没ack可能会造成消息重复消费。

    如果是多个消费者,会从队列以轮询的方式处理消息,这种称为工作队列模式。

    补充:这种实际也是用了rabbitmq的一个默认交换机,routing_key为队列名称。也可以理解为是Rabbitmq类型为System的交换机。

    02f579877e6c1be24e882e593950d660.png

    测试:修改消费者代码

    createChannel.basicConsume("myQueue", true, "", newDefaultConsumer(createChannel) {

    @Overridepublic voidhandleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throwsIOException {

    String string= new String(body, "UTF-8");

    System.out.println(envelope);

    System.out.println(properties);

    System.out.println("接收到d消息: -》 " +string);

    }

    });

    结果:(可以看出是有路由key的,值为队列名称)

    Envelope(deliveryTag=1, redeliver=false, exchange=, routingKey=myQueue)

    #contentHeader(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)

    接收到d消息:-》 测试消息

    2.涉及交换机的发送和接收

    Exchange类型根据分发策略分为四种。direct、fanout、topic、headers。headers匹配AMQP消息的header而不是路由键,此外headers交换机和direct交换机完全一致,目前几乎不用。Exchange只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,消息会丢失。所以只能收到监听之后生产者发送的消息。

    抽取工具类:

    packagerabbitmq;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;public classConnectionUtils {public static Connection getConnection() throwsException {//创建连接工程,下面给出的是默认的case

    ConnectionFactory factory = newConnectionFactory();

    factory.setHost("192.168.99.100");

    factory.setPort(5672);

    factory.setUsername("guest");

    factory.setPassword("guest");

    factory.setVirtualHost("/");returnfactory.newConnection();

    }

    }

    1.Direct类型交换-单播模式,也成为路由模式(Routing模式)

    精准绑定,消息中的路由键(RoutingKey)和Binding的bindingKey一致。

    生产者:

    packagerabbitmq;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;public classDirectProducer {public static void main(String[] args) throwsException{

    Connection connection=ConnectionUtils.getConnection();//8、创建频道-channel = connection.createChannel()

    Channel channel =connection.createChannel();//声明交换机- channel.exchangeDeclare(交换机名字,交换机类型)

    channel.exchangeDeclare("routing_exchange", BuiltinExchangeType.DIRECT);//连续发3条消息

    for (int i = 0; i < 3; i++) {

    String routingKey= "";//发送消息的时候根据相关逻辑指定相应的routing key。

    switch(i){case 0: //假设i=0,为error消息

    routingKey = "log.error";break;case 1: //假设i=1,为info消息

    routingKey = "log.info";break;case 2: //假设i=2,为warning消息

    routingKey = "log.warning";break;

    }//10、创建消息-String m = xxx

    String message = "hello,message!" +i;//11、消息发送-channel.basicPublish(交换机[默认Default Exchage],路由key[简单模式可以传递队列名称],消息其它属性,消息内容)

    channel.basicPublish("routing_exchange",routingKey,null,message.getBytes("utf-8"));

    }//12、关闭资源-channel.close();connection.close()

    channel.close();

    connection.close();

    }

    }

    消费者一:

    packagerabbitmq;importjava.io.IOException;importcom.rabbitmq.client.AMQP;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.Consumer;importcom.rabbitmq.client.DefaultConsumer;importcom.rabbitmq.client.Envelope;public classDirectConsumerOne {public static void main(String[] args) throwsException{

    Connection connection=ConnectionUtils.getConnection();//8、创建频道-channel = connection.createChannel()

    Channel channel =connection.createChannel();//9、声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)

    channel.queueDeclare("routing_queue1",true,false,false,null);//队列绑定交换机-channel.queueBind(队列名, 交换机名, 路由key[广播消息设置为空串])

    channel.queueBind("routing_queue1", "routing_exchange", "log.error");//创建消费者

    Consumer callback = newDefaultConsumer(channel){/***@paramconsumerTag 消费者标签,在channel.basicConsume时候可以指定

    *@paramenvelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)

    *@paramproperties 属性信息(生产者的发送时指定)

    *@parambody 消息内容

    *@throwsIOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throwsIOException {//路由的key

    String routingKey =envelope.getRoutingKey();//获取交换机信息

    String exchange =envelope.getExchange();//获取消息ID

    long deliveryTag =envelope.getDeliveryTag();//获取消息信息

    String message = new String(body,"utf-8");

    System.out.println("routingKey:" + routingKey +

    ",exchange:" + exchange +

    ",deliveryTag:" + deliveryTag +

    ",message:" +message);

    }

    };/*** 消息消费

    * 参数1:队列名称

    * 参数2:是否自动应答,true为自动应答[mq接收到回复会删除消息],设置为false则需要手动应答

    * 参数3:消息接收到后回调*/channel.basicConsume("routing_queue1",true,callback);//注意,此处不建议关闭资源,让程序一直处于读取消息

    }

    }

    消费者二:

    packagerabbitmq;importjava.io.IOException;importcom.rabbitmq.client.AMQP;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.Consumer;importcom.rabbitmq.client.DefaultConsumer;importcom.rabbitmq.client.Envelope;public classDirectConsumerTwo {public static void main(String[] args) throwsException{

    Connection connection=ConnectionUtils.getConnection();//8、创建频道-channel = connection.createChannel()

    Channel channel =connection.createChannel();//9、声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)

    channel.queueDeclare("routing_queue2",true,false,false,null);//队列绑定交换机-channel.queueBind(队列名, 交换机名, 路由key[广播消息设置为空串])

    channel.queueBind("routing_queue2", "routing_exchange", "log.error");//队列绑定交换机-channel.queueBind(队列名, 交换机名, 路由key[广播消息设置为空串])

    channel.queueBind("routing_queue2", "routing_exchange", "log.info");//队列绑定交换机-channel.queueBind(队列名, 交换机名, 路由key[广播消息设置为空串])

    channel.queueBind("routing_queue2", "routing_exchange", "log.warning");//创建消费者

    Consumer callback = newDefaultConsumer(channel){/***@paramconsumerTag 消费者标签,在channel.basicConsume时候可以指定

    *@paramenvelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)

    *@paramproperties 属性信息(生产者的发送时指定)

    *@parambody 消息内容

    *@throwsIOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throwsIOException {//路由的key

    String routingKey =envelope.getRoutingKey();//获取交换机信息

    String exchange =envelope.getExchange();//获取消息ID

    long deliveryTag =envelope.getDeliveryTag();//获取消息信息

    String message = new String(body,"utf-8");

    System.out.println("routingKey:" + routingKey +

    ",exchange:" + exchange +

    ",deliveryTag:" + deliveryTag +

    ",message:" +message);

    }

    };/*** 消息消费

    * 参数1:队列名称

    * 参数2:是否自动应答,true为自动应答[mq接收到回复会删除消息],设置为false则需要手动应答

    * 参数3:消息接收到后回调*/channel.basicConsume("routing_queue2",true,callback);//注意,此处不建议关闭资源,让程序一直处于读取消息

    }

    }

    结果:

    (1)消费者一

    ad581098fdc0b7660550b1328eaf4191.png

    (2)消费者二

    6ce3ac9a1115462b6c7f78ceb694814b.png

    2.fanout多播模式,也称为Publish/Scribe模式

    每个发到fanout类型交换器的消息会被分发到所有的队列中。fanout不处理路由键,发消息最快。

    两个消费者:

    消费者1:

    packagerabbitmq;importjava.io.IOException;importcom.rabbitmq.client.AMQP;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.Consumer;importcom.rabbitmq.client.DefaultConsumer;importcom.rabbitmq.client.Envelope;public classFanoutConsumerOne {public static void main(String[] args) throwsException{

    Connection connection=ConnectionUtils.getConnection();//8、创建频道-channel = connection.createChannel()

    Channel channel =connection.createChannel();//9、声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)

    channel.queueDeclare("fanout_queue1",true,false,false,null);//队列绑定交换机-channel.queueBind(队列名, 交换机名, 路由key[广播消息设置为空串])

    channel.queueBind("fanout_queue1", "fanout_exchange", "");//创建消费者

    Consumer callback = newDefaultConsumer(channel){/***@paramconsumerTag 消费者标签,在channel.basicConsume时候可以指定

    *@paramenvelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)

    *@paramproperties 属性信息(生产者的发送时指定)

    *@parambody 消息内容

    *@throwsIOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throwsIOException {//路由的key

    String routingKey =envelope.getRoutingKey();//获取交换机信息

    String exchange =envelope.getExchange();//获取消息ID

    long deliveryTag =envelope.getDeliveryTag();//获取消息信息

    String message = new String(body,"utf-8");

    System.out.println("routingKey:" + routingKey +

    ",exchange:" + exchange +

    ",deliveryTag:" + deliveryTag +

    ",message:" +message);

    }

    };/*** 消息消费

    * 参数1:队列名称

    * 参数2:是否自动应答,true为自动应答[mq接收到回复会删除消息],设置为false则需要手动应答

    * 参数3:消息接收到后回调*/channel.basicConsume("fanout_queue1",true,callback);//注意,此处不建议关闭资源,让程序一直处于读取消息

    }

    }

    消费者二:

    packagerabbitmq;importjava.io.IOException;importcom.rabbitmq.client.AMQP;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.Consumer;importcom.rabbitmq.client.DefaultConsumer;importcom.rabbitmq.client.Envelope;public classFanoutConsumerTwo {public static void main(String[] args) throwsException{

    Connection connection=ConnectionUtils.getConnection();//8、创建频道-channel = connection.createChannel()

    Channel channel =connection.createChannel();//9、声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)

    channel.queueDeclare("fanout_queue2",true,false,false,null);//队列绑定交换机-channel.queueBind(队列名, 交换机名, 路由key[广播消息设置为空串])

    channel.queueBind("fanout_queue2", "fanout_exchange", "");//创建消费者

    Consumer callback = newDefaultConsumer(channel){/***@paramconsumerTag 消费者标签,在channel.basicConsume时候可以指定

    *@paramenvelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)

    *@paramproperties 属性信息(生产者的发送时指定)

    *@parambody 消息内容

    *@throwsIOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throwsIOException {//路由的key

    String routingKey =envelope.getRoutingKey();//获取交换机信息

    String exchange =envelope.getExchange();//获取消息ID

    long deliveryTag =envelope.getDeliveryTag();//获取消息信息

    String message = new String(body,"utf-8");

    System.out.println("routingKey:" + routingKey +

    ",exchange:" + exchange +

    ",deliveryTag:" + deliveryTag +

    ",message:" +message);

    }

    };/*** 消息消费

    * 参数1:队列名称

    * 参数2:是否自动应答,true为自动应答[mq接收到回复会删除消息],设置为false则需要手动应答

    * 参数3:消息接收到后回调*/channel.basicConsume("fanout_queue2",true,callback);//注意,此处不建议关闭资源,让程序一直处于读取消息

    }

    }

    生产者:

    packagerabbitmq;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;public classFanoutProducer {public static void main(String[] args) throwsException{

    Connection connection=ConnectionUtils.getConnection();//8、创建频道-channel = connection.createChannel()

    Channel channel =connection.createChannel();//声明交换机- channel.exchangeDeclare(交换机名字,交换机类型)

    channel.exchangeDeclare("fanout_exchange", BuiltinExchangeType.FANOUT);//连续发10条消息

    for (int i = 0; i < 10; i++) {//10、创建消息-String m = xxx

    String message = "hello, message!" +i;//11、消息发送-channel.basicPublish(交换机[默认Default Exchage],路由key[简单模式可以传递队列名称],消息其它属性,消息内容)

    channel.basicPublish("fanout_exchange","",null,message.getBytes("utf-8"));

    System.out.println("发送消息成功: " +message);

    }//12、关闭资源-channel.close();connection.close()

    channel.close();

    connection.close();

    }

    }

    启动两个生产者,后启动消费者后消费消息。

    3.Topic类型

    处理routingKey和bindingKey,支持通配符。# 匹配0或多个单词,* 匹配单个单词。 Topic主题模式可以实现 Publish/Subscribe发布订阅模式 和 Routing路由模式 的双重功能

    生产者:

    packagerabbitmq;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;public classTopicProducer {public static void main(String[] args) throwsException{

    Connection connection=ConnectionUtils.getConnection();//8、创建频道-channel = connection.createChannel()

    Channel channel =connection.createChannel();//声明交换机- channel.exchangeDeclare(交换机名字,交换机类型)

    channel.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC);//连续发3条消息

    for (int i = 0; i < 5; i++) {

    String routingKey= "";//发送消息的时候根据相关逻辑指定相应的routing key。

    switch(i){case 0: //假设i=0,为error消息

    routingKey = "log.error";break;case 1: //假设i=1,为info消息

    routingKey = "log.info";break;case 2: //假设i=2,为warning消息

    routingKey = "log.warning";break;case 3: //假设i=3,为log.info.add消息

    routingKey = "log.info.add";break;case 4: //假设i=4,为log.info.update消息

    routingKey = "log.info.update";break;

    }//10、创建消息-String m = xxx

    String message = "hello,message!" +i;//11、消息发送-channel.basicPublish(交换机[默认Default Exchage],路由key[简单模式可以传递队列名称],消息其它属性,消息内容)

    channel.basicPublish("topic_exchange",routingKey,null,message.getBytes("utf-8"));

    }//12、关闭资源-channel.close();connection.close()

    channel.close();

    connection.close();

    }

    }

    消费者一:

    packagerabbitmq;importjava.io.IOException;importcom.rabbitmq.client.AMQP;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.Consumer;importcom.rabbitmq.client.DefaultConsumer;importcom.rabbitmq.client.Envelope;public classTopicConsumerOne {public static void main(String[] args) throwsException{

    Connection connection=ConnectionUtils.getConnection();//8、创建频道-channel = connection.createChannel()

    Channel channel =connection.createChannel();//9、声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)

    channel.queueDeclare("topic_queue1",true,false,false,null);//队列绑定交换机与路由key

    channel.queueBind("topic_queue1", "topic_exchange", "log.*");//创建消费者

    Consumer callback = newDefaultConsumer(channel){/***@paramconsumerTag 消费者标签,在channel.basicConsume时候可以指定

    *@paramenvelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)

    *@paramproperties 属性信息(生产者的发送时指定)

    *@parambody 消息内容

    *@throwsIOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throwsIOException {//路由的key

    String routingKey =envelope.getRoutingKey();//获取交换机信息

    String exchange =envelope.getExchange();//获取消息ID

    long deliveryTag =envelope.getDeliveryTag();//获取消息信息

    String message = new String(body,"utf-8");

    System.out.println("routingKey:" + routingKey +

    ",exchange:" + exchange +

    ",deliveryTag:" + deliveryTag +

    ",message:" +message);

    }

    };/*** 消息消费

    * 参数1:队列名称

    * 参数2:是否自动应答,true为自动应答[mq接收到回复会删除消息],设置为false则需要手动应答

    * 参数3:消息接收到后回调*/channel.basicConsume("topic_queue1",true,callback);//注意,此处不建议关闭资源,让程序一直处于读取消息

    }

    }

    消费者二:

    packagerabbitmq;importjava.io.IOException;importcom.rabbitmq.client.AMQP;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.Consumer;importcom.rabbitmq.client.DefaultConsumer;importcom.rabbitmq.client.Envelope;public classTopicConsumerTwo {public static void main(String[] args) throwsException{

    Connection connection=ConnectionUtils.getConnection();//8、创建频道-channel = connection.createChannel()

    Channel channel =connection.createChannel();//9、声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)

    channel.queueDeclare("topic_queue2",true,false,false,null);//队列绑定路由key

    channel.queueBind("topic_queue2", "topic_exchange", "log.#");//创建消费者

    Consumer callback = newDefaultConsumer(channel){/***@paramconsumerTag 消费者标签,在channel.basicConsume时候可以指定

    *@paramenvelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)

    *@paramproperties 属性信息(生产者的发送时指定)

    *@parambody 消息内容

    *@throwsIOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throwsIOException {//路由的key

    String routingKey =envelope.getRoutingKey();//获取交换机信息

    String exchange =envelope.getExchange();//获取消息ID

    long deliveryTag =envelope.getDeliveryTag();//获取消息信息

    String message = new String(body,"utf-8");

    System.out.println("routingKey:" + routingKey +

    ",exchange:" + exchange +

    ",deliveryTag:" + deliveryTag +

    ",message:" +message);

    }

    };/*** 消息消费

    * 参数1:队列名称

    * 参数2:是否自动应答,true为自动应答[mq接收到回复会删除消息],设置为false则需要手动应答

    * 参数3:消息接收到后回调*/channel.basicConsume("topic_queue2",true,callback);//注意,此处不建议关闭资源,让程序一直处于读取消息

    }

    }

    启动两个消费者后启动生产者,最终入下:

    (1)消费者一

    d45a79a323683935cd90ff2cbeff0df9.png

    (2)消费者二

    91913de951b8908f0e5bd9c127806fa6.png

    总结:

    1、简单模式

    一个生产者、一个消费者,不需要设置交换机(使用默认的交换机,一个direct类型的交换机,routing_key为queue名称)

    2、工作队列模式Work Queue

    一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机,一个direct类型的交换机,routing_key为queue名称)

    3、发布订阅模式Publish/subscribe

    需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列。多播模式,不进行RoutingKey的判断。

    4、路由模式Routing

    需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

    5、通配符模式Topic

    需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

    补充一下,无论是fanout多播模式还是direct路由模式还是topic通配符模式,Exchanger收到消息是会发送到后面的queue列中。如果一个应用以多实例部署,多个实例监听一个Exchanger下面相同的队列,不会造成一个消息被相同的应用多实例重复消费,因为queue本质是不可重复消费。

    开发中可以一个应用一个交换机,不同的消息类型放到不同的队列中。如果涉及死信队列,可以对每个应用再建立一个死信交换机,队列名称相同,便于处理死信消息。

    补充:消息的属性可以通过BasicProperties进行设置

    BasicProperties源码如下:

    public static class BasicProperties extendscom.rabbitmq.client.impl.AMQBasicProperties {privateString contentType;privateString contentEncoding;private Mapheaders;privateInteger deliveryMode;privateInteger priority;privateString correlationId;privateString replyTo;privateString expiration;privateString messageId;privateDate timestamp;privateString type;privateString userId;privateString appId;private String clusterId;

    测试:

    (1)生产者发送消息时生成一些属性

    packagerabbitmq;importjava.io.IOException;importjava.util.HashMap;importjava.util.Map;importjava.util.concurrent.TimeoutException;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importcom.rabbitmq.client.AMQP.BasicProperties;public classProducer {public staticConnectionFactory getConnectionFactory() {//创建连接工程,下面给出的是默认的case

    ConnectionFactory factory = newConnectionFactory();

    factory.setHost("192.168.99.100");

    factory.setPort(5672);

    factory.setUsername("guest");

    factory.setPassword("guest");

    factory.setVirtualHost("/");returnfactory;

    }public static void main(String[] args) throwsIOException, TimeoutException {

    ConnectionFactory connectionFactory=getConnectionFactory();

    Connection newConnection= null;

    Channel createChannel= null;try{

    newConnection=connectionFactory.newConnection();

    createChannel=newConnection.createChannel();/*** 声明一个队列。

    * 参数一:队列名称

    * 参数二:是否持久化

    * 参数三:是否排外 如果排外则这个队列只允许有一个消费者

    * 参数四:是否自动删除队列,如果为true表示没有消息也没有消费者连接自动删除队列

    * 参数五:队列的附加属性

    * 注意:

    * 1.声明队列时,如果已经存在则放弃声明,如果不存在则会声明一个新队列;

    * 2.队列名可以任意取值,但需要与消息接收者一致。

    * 3.下面的代码可有可无,一定在发送消息前确认队列名称已经存在RabbitMQ中,否则消息会发送失败。*/createChannel.queueDeclare("myQueue", true, false, false,null);

    String message= "测试消息";//设置消息属性以及headers

    Map headers = new HashMap<>();

    headers.put("creator", "张三");

    BasicProperties build= new BasicProperties().builder().appId("test001").messageId("001").headers(headers).build();/*** 发送消息到MQ

    * 参数一:交换机名称,为""表示不用交换机

    * 参数二:为队列名称或者routingKey.当指定了交换机就是routingKey

    * 参数三:消息的属性信息

    * 参数四:消息内容的字节数组*/createChannel.basicPublish("", "myQueue", build, message.getBytes());

    System.out.println("消息发送成功");

    }catch(Exception e) {

    e.printStackTrace();

    }finally{if (createChannel != null) {

    createChannel.close();

    }if (newConnection != null) {

    newConnection.close();

    }

    }

    }

    }

    (2)消息接收者

    packagerabbitmq;importjava.io.IOException;importjava.util.concurrent.TimeoutException;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importcom.rabbitmq.client.DefaultConsumer;importcom.rabbitmq.client.Envelope;importcom.rabbitmq.client.AMQP.BasicProperties;public classConsumer {public staticConnectionFactory getConnectionFactory() {//创建连接工程,下面给出的是默认的case

    ConnectionFactory factory = newConnectionFactory();

    factory.setHost("192.168.99.100");

    factory.setPort(5672);

    factory.setUsername("guest");

    factory.setPassword("guest");

    factory.setVirtualHost("/");returnfactory;

    }public static void main(String[] args) throwsIOException, TimeoutException {

    ConnectionFactory connectionFactory=getConnectionFactory();

    Connection newConnection= null;

    Channel createChannel= null;try{

    newConnection=connectionFactory.newConnection();

    createChannel=newConnection.createChannel();/*** 声明一个队列。

    * 参数一:队列名称

    * 参数二:是否持久化

    * 参数三:是否排外 如果排外则这个队列只允许有一个消费者

    * 参数四:是否自动删除队列,如果为true表示没有消息也没有消费者连接自动删除队列

    * 参数五:队列的附加属性

    * 注意:

    * 1.声明队列时,如果已经存在则放弃声明,如果不存在则会声明一个新队列;

    * 2.队列名可以任意取值,但需要与消息接收者一致。

    * 3.下面的代码可有可无,一定在发送消息前确认队列名称已经存在RabbitMQ中,否则消息会发送失败。*/createChannel.queueDeclare("myQueue", true, false, false,null);/*** 接收消息。会持续坚挺,不能关闭channel和Connection

    * 参数一:队列名称

    * 参数二:消息是否自动确认,true表示自动确认接收完消息以后会自动将消息从队列移除。否则需要手动ack消息

    * 参数三:消息接收者的标签,用于多个消费者同时监听一个队列时用于确认不同消费者。

    * 参数四:消息接收者*/createChannel.basicConsume("myQueue", true, "", newDefaultConsumer(createChannel) {

    @Overridepublic voidhandleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throwsIOException {

    System.out.println(properties);

    System.out.println(envelope);

    String string= new String(body, "UTF-8");

    System.out.println("接收到d消息: -》 " +string);

    }

    });

    }catch(Exception e) {

    e.printStackTrace();

    }finally{

    }

    }

    }

    结果:

    #contentHeader(content-type=null, content-encoding=null, headers={creator=张三}, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=001, timestamp=null, type=null, user-id=null, app-id=test001, cluster-id=null)

    Envelope(deliveryTag=1, redeliver=false, exchange=, routingKey=myQueue)

    接收到d消息:-》 测试消息

    补充: RabbitMQheaders消息类型的交换机使用方法如下:

    x-match 为all是匹配所有的请求头和值,必须所有相等才会发送;any是满足任意一个即可。

    packagerabbitmq;importjava.util.HashMap;importjava.util.Hashtable;importjava.util.Map;importcom.rabbitmq.client.AMQP.BasicProperties.Builder;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;public classHeadersProducer {public static voidmain(String[] args) {

    Connection connection= null;

    Channel channel= null;try{

    connection=ConnectionUtils.getConnection();

    channel=connection.createChannel();//声明交换机

    channel.exchangeDeclare("header_exchange", BuiltinExchangeType.HEADERS);//声明queue

    channel.queueDeclare("header_queue", true, false, false, null);//声明bind-x-match any 匹配任意一个头,x-match all 匹配所有key

    Map bindingArgs = new HashMap();

    bindingArgs.put("x-match", "all"); //any or all

    bindingArgs.put("headName1", "val1");

    bindingArgs.put("headName2", "val2");

    channel.queueBind("header_queue", "header_exchange", "", bindingArgs);//设置消息头键值对信息

    Map headers = new Hashtable();

    headers.put("headName1", "val1");

    headers.put("headName2", "val2");

    Builder builder= newBuilder();

    builder.headers(headers);

    String message= "这是headers测试消息234";

    channel.basicPublish("header_exchange", "", builder.build(), message.getBytes());

    System.out.println("发送消息: " +message);

    }catch(Exception e) {

    e.printStackTrace();

    }finally{try{if (channel != null) {//回滚。如果未异常会提交事务,此时回滚无影响

    channel.txRollback();

    channel.close();

    }if (connection != null) {

    connection.close();

    }

    }catch(Exception ignore) {//ignore

    }

    }

    }

    }

    展开全文
  • 发送进程把消息发送到队列尾部,接受进程从消息队列头部读取消息,消息一旦被读出就从队列中删除。 二、结构 1、消息队列中消息本身由消息类型和消息数据组成,通常使用如下结构: struct msgbuf { long mtype; ...

    一、定义
    1、消息队列是一种先进先出的队列型数据结构,实际上是系统内核中的一个内部链表。消息被顺序插入队列中,其中发送进程将消息添加到队列末尾,接受进程从队列头读取消息。
    2、多个进程可同时向一个消息队列发送消息,也可以同时从一个消息队列中接收消息。发送进程把消息发送到队列尾部,接受进程从消息队列头部读取消息,消息一旦被读出就从队列中删除。
    二、结构
    1、消息队列中消息本身由消息类型和消息数据组成,通常使用如下结构:

    struct msgbuf
    {
    	long 	mtype;
    	char	mtext[1];
    }
    

    1)mtype指定了消息类型,为正整数。
    引入消息类型之后,消息队列在逻辑上由一个消息链表转化为多个消息链表。发送进程仍然无条件把消息写入队列的尾部,但接收进程却可以有选择地读取某个特定类型的消息中最接近队列头的一个,即使该消息不在队列头。相应消息一旦被读取,就从队列中删除,其它消息维持不变。
    2)成员mtext指定了消息的数据。我们可以定义任意的数据类型甚至包括结构来描述消息数据。
    例1 :定义消息结构,它的消息数据是一个整型数据。

    struct msgbuf
    {
    	long 	mtype;
    	int 	ntext;
    };
    

    例2:定义消息结构,它的消息数据是一个字符数组和一个整型数据。

    struct msgbuf
    {
    	long 	mtype;
    	char	ctext[100];
    	int 	ntext;
    };
    

    例3:定义消息结构,它的消息数据是一个结构,该结构由一个字符数组和一个整型数据组成。

    struct msgtext
    {
    	char	ctext[200];	
        int 	ntext;
    }
    struct msgbuf
    {
       long 	mtype;	
       struct  msgtext stext;
    };
    

    三、消息队列的创建
    1、在UNIX中,采用函数msgget创建消息队列,原型:

    #include <sys/types.h>
    #include <sys/ipc.h>
    #include <sys/msg.h>
    int msgget(key_t key, int msgflg);
    

    函数创建一个新的消息队列,或访问一个已经存在的消息队列。
    1)参数key是消息队列的关键字。
    注:当参数key取值IPC_PRIVATE时,函数创建关键字为0的消息队列。在UNIX内核中虽然要求消息队列关键字唯一,但也可以创建多个关键字为0的消息队列。
    2)参数msgflg的低9位指定队列的属主、属组和其他用户的访问权限,其它位指定消息队列的创建方式。
    创建方式参数:
    IPC_CREAT:创建,如存在则打开;
    IPC_EXCL:与IPC_CREAT使用,单独使用无意义。创建时,如存在则失败。
    例1:创建关键字为0x1234,访问权限为0666的消息队列,如队列已存在返回其标识号。

    int msgid;
    msgid = msgget(0x1234, 0666|IPC_CREAT);
    

    例2:创建关键字为0x1234,访问权限为0666的消息队列,如队列已存在则报错。

    int msgid;
    msgid = msgget(0x1234, 0666|IPC_CREAT|IPC_EXCL);
    

    四、消息队列的发送与接收

    类似于底层文件编程的函数read和write,函数msgsnd应用于消息队列的发送,函数msgrcv用于消息队列的接收。

    1、在UNIX中函数msgsnd向消息队列发送消息,原型:

    #include <sys/types.h>
    #include <sys/ipc.h>
    #include <sys/msg.h>
    int msgsnd(int msqid, void *msgp, int msgsz, int msgflg);
    

    1)函数msgsnd向队列消息msgid发送消息,相关参数的含义:
    msgid:指定发送消息队列的标识号;
    msgp:指向存储待发送消息内容的内存地址,用户可设计自己的消息结构;
    msgsz:指定长度,仅记载数据的长度,不包括消息类型部分,且必须大于0;
    msgflg:控制消息发送的方式,有阻塞和非阻塞(IPC_NOWAIT)两种方式。
    2)导致msgsnd函数阻塞的原因:
    消息队列满:阻塞条件为msg_cbytes + msgsz > msg_qbytes;
    (msg_cbytes:消息队列中已使用字节数;
    msg_qbytes:消息队列中可以容纳的最大字节数;)
    消息总数满:系统中所有消息队列记载的消息总数已达到系统上限值。
    3)以阻塞方式向阻塞队列(关键字为KEY)中写入字符串“Helo UNIX!”,消息类型为TYPE。
    全部过程分为5步:
    第一步:定义消息结构

    struct msgbuf{	
    	long mtype;		
    	char ctext[100];
    }	
    

    第二步:打开消息队列

    int msgid;
    msgid = msgget(KEY, 0666|IPC_CREAT);
    if(msgid < 0)	//打开或创建消息失败;
    

    第三步:组装消息,设置消息类型和拷贝消息数据

    struct msgbuf buf;
    buf.mtype = 100;
    strcpy(buf.ctext, “HELLO UNIX!);
    

    第四步:发送消息

    int ret;
    ret = msgsnd(msgid, &buf, strlen(buf.ctext), 0);
    

    第五步:发送判断

    if(ret == -1)
    {
    	if(errno == EINTR)	//信号中断,重新发送;
    	else //系统错误
    }
    

    进程在发送消息过程中如果接收到信号,将中止消息发送并返回EINTR错误,此时重新发送即可。

    2、实例:循环读取键盘输入,并将输入的字符串写入到消息队列(关键字为0x1234)。

    #include <sys/msg.h>
    #include <sys/types.h>
    #include <sys/ipc.h>
    #include <stdio.h>
    #include <sys/errno.h>
    #include<string.h>
    
    extern int errno;
    struct mymsgbuf{
    		long mtype;
    		char ctext[100];
    };
    int main(){
    		struct mymsgbuf buf;	
    		int msgid;
    		if((msgid = msgget(0x1234, 0666|IPC_CREAT)) < 0)	
    		{
    			fprintf(stderr, "open msg %x failed.\n", 0x1234);
    			return;
    		}
    		while(strncmp(buf.ctext, "exit", 4))	
    		{
    			memset(&buf, 0, sizeof(buf));
    			fgets(buf.ctext, sizeof(buf.ctext), stdin);
    			buf.mtype = getpid();
    
    			while((msgsnd(msgid, &buf, strlen(buf.ctext),0)) < 0)
    			{
    				if(errno == EINTR)
    					continue;
    				return;		
    			}
    
    		}
    		return 0;
    }
    

    3、在UNIX中函数msgrcv从消息队列中接收消息,原型:

    #include <sys/types>
    #include <sys/ipc.h>
    #include <sys/msg.h>
    int msgrcv(int msgid, void *msgp, int msgsz, long msgtyp, int msgflg);
    

    1)函数msgrcv从消息队列msgid中读取一条消息,参数含义:
    msgid:消息队列标识号;
    msgp:指向接收消息的内存缓冲区;
    msgsz:指定该缓冲区的最大容量,不包括消息类型占用的部分;
    msgtyp:指定读取消息的类型;
    ( 0:读取消息队列中第一个消息;
    正整数:读取消息队列中第一个类型为msgtyp的消息;
    负整数:读取消息队列中第一个类型小于或等于msgtyp的绝对值的消息。)
    msgflg:指定了消息的接收方式
    (IPC_NOWAIT:非阻塞方式读取信息;
    MSG_NOERROR:截断读取消息。)
    2)以阻塞方式从消息队列(关键字为KEY)接收消息,接收消息类型为TYPE。
    第一步:定义消息结构
    一般要求与发送消息程序中定义结构一致
    第二步:打开(创建)消息队列

    int msgid;
    msgid = msgget(KEY, 0666|IPC_CREAT);
    

    第三步:准备接收消息缓冲区

    struct msgbuf buf;
    memset(buf, 0, sizeof(buf));
    

    第四步:接收消息

    int ret;
    ret = msgrcv(msgid, &buf, sizeof(buf.ctext), TYPE, 0);
    

    第五步:接收判断

    if(ret == -1)
    {
    	if(errno == EINTR)	 //信号中断,重新接收;
    	else                 //系统错误
    }
    

    4、实例:以阻塞方式不断从消息队列(关键字为0x1234)中读取消息,并打印接收到的消息类型、长度和数据等,当接收到内容为“exit”的消息时程序结束。

    #include <sys/msg.h>
    #include <sys/types.h>
    #include <sys/ipc.h>
    #include <stdio.h>
    #include <sys/errno.h>
    extern int errno;
    struct mymsgbuf{
    	long mtype;
    	char ctext[100];
    };
    int main(){
    	struct mymsgbuf buf;
    	int msgid;	
    	int ret;
    	if((msgid = msgget(0x1234, 0666|IPC_CREAT)) < 0)	{
    		fprintf(stderr, "open msg %X failed.\n", 0x1234);
    		return;
    	}
    	while(strncmp(buf.ctext, "exit", 4))	
    	{
    		memset(&buf, 0, sizeof(buf));
    		while((ret = msgrcv(msgid, &buf, sizeof(buf.ctext), buf.mtype, 0)) < 0)
    		{
    			if(errno == EINTR)	
    				continue;
    			return;		
    		}		
    		fprintf(stderr,"Msg:Type=%d,Len=%d,Text:%s",buf.mtype,ret, buf.ctext);
    	}
    	return 0;
    }
    

    综合以上两个实例:
    在这里插入图片描述
    五、小结
    1、采用消息队列通信比采用管道通信具有更多的灵活性,通信的进程不但没有血缘上的要求,也不需要进行同步处理。
    2、消息队列是一种先进先出的队列型数据结构;
    3、消息队列将输出的信息进行了打包处理,可以保证以消息为单位进行接收;
    4、消息队列对信息进行分类服务,根据消息的类别进行分别处理。
    5、提供消息数据自动拆分功能,同时不能接受两次发送的消息。
    6、消息队列提供了不完全随机读取的服务。
    7、消息队列提供了完全异步的读写服务。

    展开全文
  • RabbitMQ 是采用 Erlang 语言实现 ...【实例】SpringBoot整合RabbitMQ实现消息的发送与接收。 实例要求: 实现SpringBoot整合RabbitMQ框架。 实现RabbitMQ消息确认机制(ACK)。 实现RabbitMQ消息队列延迟功能。
  • 电子邮件设置注意事项:需开通手机上网业务1、位置...通讯协议:POP3或IMAP4(设定邮箱账户的通讯协议)④ 自动接收邮件:开启或关闭⑤ 最大接收容量:1024 (邮件最大接收容量)⑥ 答复地址:用户的邮箱2) 发送服务器...
  • 卸载网卡驱动,重新安装网卡驱动,然后分配IP地址。(备注:我曾经给别的企业用这个方法后,也没有解决问题,只好重新安装系统,问题解决。)3.如果网卡是10/100Mbps自适应,可以试着把网卡速率设置为10Mbps试一下...
  • "本地连接"有发送接收数据的解决方法发布时间:2013-07-23 23:34:03 作者:佚名 我要评论在日常的网络维护中,常常出现"本地连接"有发送接收,这个故障曾经让我伤透脑筋,现在也没有一个正确的解决方案,而下面...
  • 发送接收删除 Amazon SQS 消息本主题描述了如何发送接收删除 Amazon SQS 消息。始终使用 SQS 队列发送消息。发送消息通过调用 AmazonSQS 客户端的 sendMessage 方法,将单个消息添加到 Amazon SQS 队列。...
  • session为某个客户端的连接会话,需要通过它来给客户端发送数据 */ @OnOpen public void onOpen(Session session){ this.session = session; webSocketSet.add(this); //加入set中 addOnlineCount(); //在线数加1 ...
  • 下面介绍 Java 中如何使用 RabbitMQ 发送接收消息。 使用Maven添加依赖文件 在pom.xml配置信息文件中,添加 RabbitMQ 客户端依赖: com.rabbitmq amqp-client 5.10.0 1、发送者客户端代码 首先发送发送一条消息...
  • 接收窗口之前的数据已经被接收,再次接收接收窗口之前的数据可以认为是重复发送的,不处理,接收窗口之后的数据不能接收,超出接收范围直接丢弃。 接收端通过tcp首部通告窗口字段告诉对端本地可以可以接收多少...
  • RabbitMQ简单消息发送与接收1、前言2、简单消息发送与接收实战2.1 引入依赖 1、前言 这里将编写两个java程序。发送单个消息的生产者和接收消并打印出来的消费者。 在下图中,p是我们的生产者,c是我们的消费者。中间...
  • Qt TCP/UDP 一端用自定义结构体发送消息,一端用QByteArray接收消息 ** 用自定义结构体发送消息 tcpClientSocket* perClient = tcpClientSocketList.at(index); char* sendData = new char[sizeof ...
  • mail 是 Linux 的邮件客户端命令,可以利用这个命令给其他用户发送邮件。...功能描述:发送接收电子邮件。【例 1】发送邮件。如果我们想要给其他用户发送邮件,则可以执行如下命令:[root@localhost ~]# mai...
  • 这是因为RabbitMQ默认会在消息被消费者接收后,立即确认。 但存在丢失消息的可能,如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。 另外一种情况就是,我们在spring中处理...
  • } 树莓派上用Python写的串口发送(16进制)、接收数据(16进制),把16进制的数组转成数组,然后提取出来,在转换成16进制,在转换成10进制。 import RPi.GPIO as GPIO import serial import time import binascii ...
  • 文章目录一、通过控制台直接发送数据1.创建新的C#控制台程序2.客户端代码3.服务器代码4.运行结果二、创建简单可视化界面进行通信1.创建新的窗体应用 一、通过控制台直接发送数据 1.创建新的C#控制台程序 2.客户端...
  • 接线 星瞳教程 发送数据 接收数据 不过我比较奇怪为何它们的uart不需要初始化 openmv传送数据 1、初始化以及uart参数设置 uart pyb.UART(3, 115200) #串口3 波特率115200uart.init(115目录参考接线星瞳教程openmv...
  • if (MsgType.UNKNOW ==msgEntity.getMsgType()) { log.info("# 客户端 发送数据 类型未定义... :"+msgEntity.toString());return; }if(!session.isActivity()){ session.setActivity(true); session.setImei...
  • 接收)3.1 MatlabJava发送字符串Java服务端:Matlab客户端:通讯效果3.2 MatlabJava发送Bytes字节Java服务端Matlab客户端:通讯效果 1. Java UDP通信(发送&接收) 1.1 接收端 import java.io.
  • STM32串口发送数据和接收数据方式总结

    万次阅读 多人点赞 2020-12-24 12:26:20
    关注、星标公众号,直达精彩内容串口发送数据 1、串口发送数据最直接的方式就是标准调用库函数 。voidUSART_SendData(USART_TypeDef*USARTx,uin...
  • linux网络报文接收流程 netif_rx-->netif_rx_schedule-->net_rx_action-->process_backlog-->netif_receive_skb-->上层协议栈处理(ptype_base的HASH表中已注册.type.func协议处理函数) linux...
  • 对象:银行柜员客户 (都是前端交互) 功能:视频办理相关业务 1. 先定义一个变量,用于存放WebSocket的实例对象 const webSocketObj = null; 2. 判断浏览器是否支持使用WebSocked window.WebSocked = window....
  • 网络病毒,因为有的病毒一旦发作后,能导致病毒发作计算机同处一个子网的所有计算机都出现无法上网故障或上网速度缓慢故障所以这个优先查下;2,计算机自身的操作系统有问题,从计算机操作系统来看:能够对网络...
  • 邮件接收协议则是一种基于“拉”的协议,主要包括POP协议和IMAP协议,在正式介绍这些协议之前,我们先给出邮件收发的体系结构: 从上图可以看出邮件收发的整个过程大致如下: (1)发件人调用用户代理编辑要...
  • STM32如何利用串口发送接收数据

    千次阅读 热门讨论 2021-05-08 11:38:49
    STM32如何利用串口发送接收数据? 我现在计划利用STM32F103X的串口对迪文屏发送及接收数据。 手中硬件:正点原子开发板(旗舰版),迪文屏(4.3寸),电脑 软件:MCU程序下载:FLYMCU;串口助手:XCOM;迪文屏配置:...
  • CANOE CAPL 实现以太网报文发送接收

    千次阅读 2021-03-18 14:01:51
    CANOE CAPL 实现以太网报文发送接收一、上一片主要讲解以太网的发送,以及如何以HEX形式发送这一篇主要讲解如何实现多端口发送,多端口接收报文,以及处理报文。你好! 这是你第一次使用 Markdown编辑器 所展示的...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 391,826
精华内容 156,730
关键字:

发送与接收地址删除