为您推荐:
精华内容
最热下载
问答
  • 5星
    50KB weixin_43145941 2021-04-30 17:15:27
  • 5星
    353KB qq_41934573 2021-06-24 22:06:26
  • 5星
    218.92MB xiaolong1126626497 2021-08-03 15:21:12
  • 5星
    167.56MB SUGARSY 2020-11-02 15:25:06
  • 1.54MB weixin_43817232 2020-03-31 09:49:08
  • 469KB weixin_38586186 2021-03-14 06:26:31
  • 73KB weixin_38576561 2020-09-28 04:20:09
  • 44KB weixin_38613154 2020-08-31 19:15:33
  • 2KB weixin_42176612 2021-05-27 16:58:34
  • 4星
    60.46MB W346850397 2020-05-10 11:32:57
  • 3.56MB sunlao6083 2017-11-03 11:54:03
  • 91KB weixin_38704786 2021-01-19 21:32:25
  • 76KB weixin_38621250 2020-08-26 15:11:07
  • 11.25MB shoneworn 2018-05-25 13:37:27
  • 1.54MB qq_35812205 2021-06-19 13:21:40
  • 进程通信的方法包括共享数据结构、消息队列等。消息传递是进程间传递数据的方法。消息队列有发送原语和接收原语。UNIX/Linux消息传递的编程有所不同。
    消息传递是进程间数据传递的一种方法,进程采用消息(message)的方法,由发送进程向接收进程的消息队列发送一个消息,接收进程在合适的时候取出。 
    

    因此,UNIX系统的消息传递就是消息队列,在进程通信中可不使用共享地址空间的方式通信。  消息队列又称为直接消息传递,而信箱称为间接消息传递。信箱不能算是发送进程的,也不能算是接收进程的,这与消息队列不同。消息队列必须由接收进程,有时候是发送进程向操作系统申请,因此属于这两个进程之一。消息队列是进程通信的方法。

    原语

     消息队列的原语包括发送原语和接收原语,具有原子性,不可分割。

    1.数据结构

    把消息缓冲作为进程通信工具,首先由Hansan提出,并在RC4000系统实现,广泛应用在系统内进程之间的通信。
    消息缓冲的数据结构是消息缓冲区,其描述为:
    type  message buffer=record
          sender: 消息发送者名
          size:消息长度
          text:消息正文
          next:指向下一个消息缓冲区的指针
       end
    为实现消息缓冲通信,还应在PCB中增加一些数据项,包括两种信号量,其描述为:
    type processcontrol block=record
        …….
        mq:  消息队列队首指针
        mutex:消息队列的互斥信号量,消息队列是临界资源
        Sm:  消息队列的计数信号量,发送进程和接收进程同步
        ……
    end

    2. 发送原语

    首先在进程的地址空间设置一个发送区a,再填写相关内容,然后调用发送原语send(receiver,message)把消息发送到接收进程的消息队列PCB(B).mq中。发送原语描述如下:
      procedure send(receiver,a)
        begin
           getbuf(a.size,i);   根据发送区a的长度申请消息缓冲区i
           i.sender:=a.sender;
           i.size:=a.size;
           i.text:=a.text;
           i.next:=0;
           getid(PCB set,receiver,j);  获得接收进程内部标识符j
           P(j.mutex);
           insert(j.mq.i);
           V(j.mutex);
           V(j.sm);
           end 
         

    3. 接收原语

     
         接收进程从自己的消息队列中摘下第一个消息缓冲区,并将它复制到指定的消息接收区b内,b在接收进程的地址空间内。所以消息队列并不在进程的地址空间中。
        procedure receiver(b)
        begin
           j:=internal name;
           P(j.sm);
    P(j.mutex);
           move(j.mq,i);
           V(j.mutex);
           b.sender:=i.sender;
           b.size:=i.size;
           b.text:=i.text;      
           end

    UNIX System V消息队列

    消息队列提供一种机制允许进程不必通过共享地址空间实现通信和同步,允许进程以消息的形式传递数据。

    1.数据结构

     
     消息缓冲区的数据结构:
    struct mymsh{
       long mtype;  //消息类型
       char mtext[];  //消息正文,可以是一个结构
    }
    消息的数据结构
    struct msg_msg{
       struct list_head m_list;
       long m_type;  //消息类型
       int m_ts;  //消息大小
       struct msg_message* next;  //下一个消息位置
       void *security;       //真正消息位置
      }
    消息队列的数据结构

    2. 系统调用

    (1)创建一个消息队列
       #include<sys/types.h>
       #include<sys/msg.h>
    int msgget(key_t  key, int msgflg);
           返回值:成功,返回消息队列标识符;错误,返回-1。
    第一个参数是IPC——PRIVATE或者ftok()返回的一个键。Msgflg参数是消息队列权限,可标记为OR( | ),IPC_CREAT,IPC_EXCL等。

    (2)发送消息
      int msgsnd(int msqid, void *msgp,size_t  msgsz, int msgflg);
      返回值:成功返回0,错误返回-1。
      参数1:指定的消息队列标识符,由msgget()生成。参数2用户定义的缓冲区,参数3:消息长度,参数4:越界处理。
       将新的消息发送到接收进程消息队列中。

    (3)接收消息
       ssize_t  msgrcv(int msqid, void *msgp,size_t maxmagsz,long msgtyp,int msgflg)
    返回值:成功接收的字符数,错误返回-1。
    参数1:从哪一个消息队列中读取消息,参数2:读取消息的缓冲区。参数4:消息类型。
    接收进程从消息队列取走消息。

    3.消息队列编程

    struct msgmbuf

    {

      long msg  type;

    char msg_text[512];

    };

    main()

    {

      int  qid;

      key_t  key;
     int len;

    struct msgmbuf msg;

    if((key=ftok(".",a))==-1)

    {

        printf("ftok fail\n");

      exit(-1);

    }

    if(qid=msgget(key,IPC_CREAT|0666))==-1)

    {

       printf("msgget fail\n");

       exit(-2);

    }

    printf("the message quene is %d\n",qid);

    puts("please input the message:");

    if((fgets((&msg)->msg_text,512,stdin))==NULL)

       {

         puts("no message");

        exit(-3);

    }

    msg.msg_type=getpid();

    len=strlen(msg.msg.text);

    if((msgsnd(qid,&msg,len,0))<0) //向消息队列中发消息,注意这个程序是简化的程序,并没有向接收进程的消息队列发消息

    {

        printf("msgsnd fail\n");

       exit(-4);

    }

    if((msgrcv(qid,&msg,512,0,0))<0)  //读消息队列的消息

    {

       printf(“msgrcv fail\n");

      exit(-5);

    }

    printf("reading message:%s\n",(&msg)->msg_text);//显示输出消息内容

    if((msgctl(qid,IPC_RMID,NULL))<0)

    {

       printf("msgctl fail\n");

      exit(-6);

    }

    exit(0);

    }
     

    4.常用命令

    显示系统中所有的消息队列
    $>ipcs –q
    $>./svmsg_lsaw
    Linux系统最多有16个消息队列,每个消息最大为8192字节。

    展开全文
    ronghuilin 2016-11-05 09:00:31
  • 49KB fsh364943092 2017-05-02 15:49:19
  • 3星
    1.17MB u011249211 2014-07-14 01:27:25
  • 2.09MB weixin_38695452 2021-03-07 08:51:31
  • 41KB gaojian19890807 2015-04-13 09:33:39
  • 论文《分布式系统的现代消息传递》Modern Messaging for Distributed Sytems L Magnoni 通过IOP出版有限公司出版许可物理学学报:会议系列,608卷,第1会议 作者电子邮件 luca.magnoni@cern.ch 作者隶属...

    论文 《分布式系统的现代消息传递》Modern Messaging for Distributed Sytems

    L Magnoni 

    通过IOP出版有限公司出版许可物理学学报:会议系列 608 1会议

     

    作者电子邮件

    luca.magnoni@cern.ch

    作者隶属关系

    1欧洲核子研究中心,欧洲粒子物理实验室(CERN),瑞士日内瓦

    引文

    L Magnoni 2015 J. 物理学Conf。序列。608 012038

     

    抽象

    现代软件应用程序很少孤立存在,而现在通常的做法是依赖服务或使用远程实体提供的信息。

    在这种分布式架构中,集成是关键。

    十多年来,消息传递是解决分布式性质挑战的参考解决方案,例如网络不可靠性,

    生产者和消费者的强烈耦合以及应用的异质性。

     

    由于强大的社区以及对标准和整合的共同努力,消息代理如今已成为许多项目和服务的传输层构建块,

    无论是在物理界还是在外面。此外,近年来出现了新一代的消息服务,

    重点关注低延迟和高性能用例,突破了消息传递应用程序的界限。

    本文将介绍分布式应用程序的消息传递解决方案,概述主要概念,技术和服务。

    1.介绍

    本文概述了消息传递概念,功能和现代技术。

    首先介绍分布式通信和系统集成的消息传递。

    然后提供对主要消息传递功能的回顾,然后概述从代理到无代理系统的消息传递的主要技术。

    最后,介绍了有关使用消息传递解决分布式应用程序通信问题的成功案例列表。

     

    2.用于松散耦合通信的消息传递

    现代分布式系统可以包括数百个(如果不是数千个)应用程序以多层操作,并为彼此提供不同的服务和功能。

    在这样的分布式架构,存在诸如网络不可靠性,强耦合等诸多挑战,生产者和消费者以及需要的应用程序的异构性,致力于建立一个坚实可靠的系统。

     

    2.1 面向连接的通信

    面向连接的通信是在远程实体之间交换信息的简单解决方案。如图1所示,考虑通过面向连接的协议(如TCP / IP)打开套接字,并通过它传输原始数据流。这将是一种快速而廉价的信息交换方式,但与此同时,紧密耦合的通信将基于许多假设,这些假设需要满足才能进行通信:

    1:紧密耦合的通信。

    • 时间依赖:所有组件必须同时可用。
    • 位置:每个组件必须知道对方地址。
    • 数据结构和表示:在最简单的实现中,所有组件必须就数据格式和二进制表示达成一致。

     

    2.2 用于松散耦合通信的消息传递

    耦合可以通过各方在沟通时相互作出的假设数来衡量。

    消息传递是松耦合通信解决方案的一个示例,其中消息是信息构建块,其目的在于最小化这些假设。

    它不是直接向特定地址发送信息,而是发送到可寻址信道,以解决位置依赖性。

    为了消除时间依赖性,可以增强该信道以对信息进行排队,直到远程组件准备好接收它为止。

    这样,生产者现在可以将请求发送到通道并继续处理,而不必担心交付。

    消息传递不对数据表示做出任何假设,因此标准数据格式(例如,自描述和与JSONXML无关的平台)可用于消除在所有组件之间共享数据处理逻辑的需要。

     

    2.3消息传递方案

    典型的消息传递用例是:

    信息发布:实体发布易变信息而没有关于谁感兴趣的先验知识(例如传感器)

    信息存储:实体从多个来源(例如日志收集器)收集信息

    远程过程调用:实体向一个或多个远程实体发送请求并期望回复。

     

    2.4消息传递中间件

    消息传递是一种松散耦合的通信解决方案,可最大限度地减少生产者和消费者的依赖,删除这些依赖项使整个架构更灵活,更容忍变更,但它带来了额外的复杂性。

    因此,多年来已经开发了专用消息中间件以提供消息传递功能而无需处理内部复杂性。

    下一节将介绍消息传递系统的主要概念和原理。

     

    3.消息系统

    如图2所示,消息传递系统充当想要通信的实体之间的间接层。

    通常称为消息代理,它负责将数据作为消息从一个应用程序传输到另一个应用程序,

    这样生产者和消费者就可以专注于分享内容,而不是如何分享内容。

    与许多其他技术一样,消息传递基于一些基本概念和属性,这些概念和属性在所有不同的风格和实现之间共享。

    2:松散耦合通信的消息传递。

     

    3.1 信息

    消息是信息构建块。它由一个主体组成,它是不可变的,包含通信的结构化数据(例如JSONXML,序列化协议)对象,以及一组,通常是可由代理处理并用于路由的键值对。

     

    3.2 通信模型:主题和队列

    消息传递系统支持不同的通信模型,每个模型定义了生产者和消费者之间如何交换信息。
    最常见的通信模型是队列和主题。
    队列用于实现点对点通信,其中,如果在生成信息时不存在消费者,则消息将保留在通道中以供以后传递,
    如果有多个消费者,则消息仅传送一次。
    主题是针对经典发布/订阅方案,
    如果不存在消费者,则消息被丢弃,并且在多个消费者的情况下,消息系统将消息传递给每个消费者。
    来自队列和主题的部分被广泛支持,更复杂的传递语义存在于协议级别(例如来自AMQP的交换/节点)以及许多其他中间件特定的。

     

    3.3 协议

    多年来,缺乏与消息代理交互的独特标准方式已成为消息传递技术的已知问题。
    AMQP协议由主要的消息传递参与者,公司和软件生产者设计,以克服这一限制。
    然而,定义有线通信和传递语义的统一协议的内部复杂性要求消息传递系统的主要开发工作变得完全兼容。
    本节概述了主消息系统当前支持的最常见的标准协议。
    对于面向消息的体系结构,协议选择是一个关键的设计决策,就其在应用程序中的强耦合而言。

     

    3.3.1 AMQP(高级消息队列协议)[1]是标准化工作的结果消息传递领域的主要贡献者(例如思科,微软,红帽,银行)。
    它是旨在实现不同消息传递系统之间的互操作性。 
    它提供了定义二进制线协议和完整的传递语义,理论上允许AMQP消息传递客户端能够与不同的代理实现无缝交互符合AMQP标准。 
    如今,该协议的最新稳定版本(1.1)的奉献是尚未广泛,但鉴于它已经得到了主要消息代理的支持,预计未来几年将更广泛地实施。

     

    3.3.2 STOMP(流式文本定向消息传递协议)[2]是基于文本的协议意味着简单且可广泛互操作。 
    它主要是一种有线协议,它非常有用基本的消息传递语义内置(例如,不支持通信模型,目的地是只是一个字符串消息头),
    需要在消息系统级别进行适当的配置(例如,目的地必须适当地映射到队列或主题)。 多亏了它简单,有许多语言提供的广泛的客户端,它是由支持大多数经纪人。

     

    3.3.3 MQTTMessage Queue遥测传输)[3]是轻量级协议设计的最初来自IBM 
    它适用于低带宽,高延迟的网络。 
    它定义了一个紧凑的二进制格式,通信开销非常有限,几十字节,这使得它适用于简单的物联网风格应用(例如移动电话,传感器)生产 - 忘记的情景。

     

    3.4功能
    如第2节所述,消息传递系统可以被视为中间通道通过排队等附加功能进行增强,以改善通信远程实体的经验。
    多年来,虽然没有正式的协议,但不同消息传递系统融合在一组共同的功能上,这些功能已成为事实上的标准用于消息传递中间件。
    功能列表包括Persistence,即保存功能永久存储上的消息,例如文件系统或数据库;故障转移,允许客户端经纪人失败时自动重新连接;
    保证交付,定义政策用于消息传递(例如,至少一次或完全一次);
    订购,发送消息它们的生产顺序;
    交易,将多个请求视为一部分的能力分布式事务,具有回滚选项和聚类,这是创建的可能性消息代理网络,用于高可用性和负载平衡。
    尽管如此,每个消息系统可以为相同的功能提供不同的解释。许多其他独特的经纪人特定存在功能,但它们的使用意味着将应用程序与特定代理硬连接味道。

     

    4.消息传递技术

    面向消息的中间件已经发展了十多年,现在已经成为一个丰富而稳固的服务和库生态系统。

    消息代理作为为分布式应用程序提供消息传递功能的中间独立服务,是最常见的消息传递系统类型。

    多年来,消息代理已广泛用于在分布式系统中实现通信和集成[4],但数据密集型和高性能用例除外,

    中间实体的存在不适合的选择。

    近年来,出现了新一代消息传递系统,重点关注低延迟和高性能用例,突破了消息传递应用程序的界限。

    下一节将介绍主要消息传递技术的概述。

     

    4.1 消息代理

    消息代理是消息系统最常见的实现。

    消息代理是独立的中间实体,它通过标准或自定义协议提供消息传递功能。

    存在许多消息代理,不同的功能,协议,实现语言,平台支持。

    本评论的重点是开源解决方案,但许多也作为企业商业软件的一部分存在。

    消息代理是功能最丰富的消息传递系统类型,具有协议支持的功能,如第2节所述。

    经纪人可以是多语言,允许生产者和消费者使用不同的协议(例如AMQP上的发送者,STOMP上的接收者)

    并且它们可以支持消息转换(例如,将消息有效负载从XML转换为JSON)以进一步解耦应用程序。

     

    4.1.1 ActiveMQ是最广泛采用的开源消息代理之一。

    它是一个Apache项目,用Java编写,得到了Red Hat的商业支持。

    ActiveMQ具有广泛的协议支持(例如AMQPSTOMPMQTTOpenwireHTTP和许多其他),

    它提供了许多跨语言客户端,并且完全符合JMS标准。

    ActiveMQ提供了许多高级功能,例如:丰富的语义传递(例如虚拟队列,复合目标,通配符),

    JDBC消息存储(例如,用于在任何JDBC兼容数据库中保留消息)和高级群集配置(例如,主从,经纪人网络)。

    ActiveMQ是一种功能完备的消息传递解决方案,可用于实现许多通信和集成模式[4]

     

    4.1.2 RabbitMQ  是一个用Erlang编写的轻量级开源消息代理,它从下面语言的消息传递功能中获益。

    RabbitMQ架构是深度模块化的,它主要支持AMQPSTOMP,但是附加协议可以作为插件加载(例如MQTTHTTP)。

    它支持主要的消息传递功能,例如持久性,群集,高可用性和联合。

    RabbitMQ仍然是一种轻量级的消息传递解决方案,由于其简单性和可靠性,可以嵌入到多个项目(例如Logstash)中。

     

    4.1.3性能和可伸缩性 对于消息传递系统,没有详细的上下文化,每秒消息的量化度量(msg / s)几乎没有意义。

    使用的协议(例如二进制或文本)起着重要作用,但存在许多其他延迟因素:

    持久性消息可以慢几个数量级,放大因子(例如主题消费者的数量)可以通过多个内存中的消息副本影响系统,

    对于有效载荷大小也是如此。次优的客户端可能导致大量的开/关连接,行为不端的消费者可能导致低用户问题,

    消息传递基础结构最常见的问题之一。

     

    [5]中的比较,其中几个消息代理通过STOMP协议在几个通信模型中进行评估,显示了在实际情况下如何

    性能可能在100000 msg / s1000 msg / s之间变化。

     

    4.2 Apache Kafka

    Apache Kafka是一个最初来自LinkedIn的开源项目,现在是Apache基金会的一部分。

    它已经被开发用于实时活动流分析,以解决对从生产者向许多潜在消费者移动大量数据(例如,用户指标,计算机农场监控)的有效方式的需求。

    规模和数据大小(数十亿条消息和每天数百千兆字节)和时间限制使得用例不适合标准经纪人,如[6]中的比较。

     

    Kafka的创新理念是成为无国籍经纪人,因此不保留任何有关消费者的信息。

    消费者必须保留其自己的状态(例如关于最后读取的数据的信息)并在需要时向Kafka轮询新数据。

    这允许Kafka独立于消费者的数量来保留单个消息副本(例如,消费时不会删除消息,而是通过保留期或其他策略删除消息),

    从而实现读取和写入操作的高吞吐量。

    Kafka持久性是作为分布式提交日志实现的,如图3所示,设计为易于扩展的分布式系统(基于Zookeeper),允许自动平衡消费者/生产者/代理。

    (a)Kafka集群

    (b)Kafka主题分区

    3Kafka架构。

    与标准消息代理相比,Kafka提供有限的消息传递功能(例如主要是主题语义,文件系统作为唯一持久存储,严格保证排序)。

    尽管有许多客户端库可用,但它仅支持TCP上的自定义二进制格式。

    Kafka是数据移动的最佳解决方案,经常被用作不同处理系统(例如HadoopStorm)的管道。

     

    4.3 ZeroMQ

    尽管名称如此,ZeroMQ(也称为0MQZMQ[7]不是标准的消息代理,而是一个提供消息传递功能的轻量级消息传递库。

    分布式应用程序可以使用ZeroMQ进行高吞吐量和低延迟通信,

    利用其在生产者和消费者之间实现直接联系的能力,

    没有涉及中间实体。虽然这可能与消息传递的主要假设之一相矛盾,

    ZeroMQ通过创新方法实现松耦合通信,充当网络堆栈的新层。

    它使用类似的API扩展了socket的概念,但内置了消息传递模式:

    请求/回复,发布/订阅,流水线和独占对,如图4所示。

    a)请求/回复

    b)发布/订阅

    4ZeroMQ套接字的示例。

     

    与经典套接字相比,每个ZeroMQ套接字都带有一个内部队列,以允许异步通信。

    结果是,例如在用于点对点通信的请求/回复场景中,

    如果在消费者未运行时生成数据,

    ZeroMQ库将负责延迟交付,而生产者方无需额外负载。

     

    ZeroMQ背后的理念是强大的,它允许高性能和低延迟的通信,但在应用程序级别具有额外的复杂性。

    ZeroMQ主要支持自己的二进制协议,并提供有限的消息传递功能(例如,故障转移,1-N拓扑的多播支持)。

    虽然可以使用ZMQ API轻松实现多个功能(例如确认),

    实现高级消息传递功能(例如保证传递,持久性)可能需要相当大的努力,

    使其适用于需要简单消息语义的数据广告场景。

     

    5.用例

    本节介绍了几种成功采用基于消息传递通信的用例,以解决分布式系统中的交换信息问题。

    5.1 CERN Beam Control中间件

    CERN实验室的光束控制部门正在为大型强子对撞机(LHC)的高可靠性控制/监控/报警应用使用信息。

    2005年以来,一组ActiveMQ经纪人,在商店和正向配置中,

    用于收集安全系统生成的关键数据(例如30个生产者,2MB / s4.5K msg / s)并将其转发给许多消费者(例如监控工具,仪表板)。

    作为安全数据关键任务,存储和转发配置允许将数据生成与消耗完全分离,防止行为不端的客户端进行数据收集和归档[8]

    此外,LHC Control框架最近已从CORBA迁移到ZeroMQ作为通信层[9]

     

    5.2 DAQ在线监测

    消息传递还广泛用于数据采集(DAQ)系统的几个监控工具,

    负责从检测器(例如高能物理实验)过滤和收集数据到存储设施。

     

    5.2.1 ATLAS TDAQ移位器助手项目[10]依靠消息传递将业务警报从私有TDAQ网络分发到GPN到许多异构消费者。

    ActiveMQ群集用于主/从配置,以最大限度地减少对单个出站连接所需的防火墙配置的影响。

     

    5.2.2 STAR Online框架依赖于基于AMQP的系统,可灵活,松散地耦合检测器元数据,

    使用消息传递作为统一传输层进行处理,

    存储和监控。 此外,已经进行了调查以重新编写MQTT上的控制框架,从协议的灵活性和互操作性中获益[11]

     

    5.3 WLCG消息服务

    消息传递也已成功用于大规模地理分布式基础设施。

    WLCG(全球LHC计算网格)消息服务是用于监控全球WLCG站点和服务的骨干传输层,

    拥有超过50000个客户端,平均消息速率为100 KHz 监视基础结构基于具有JSON有效负载的STOMP

    由于STOMP协议在多种代理风格中的互操作性,

    异构消息代理群集(ActiveMQApolloRabbitMQ)用于客户端应用程序生成任何内容并消费给所有人[12]的场景。

     

    6.总结

    消息传递从根本上说是对分布式系统问题的实用反应[4]

    如第2节所述,它允许松散耦合的通信作为生产者和消费者之间的中间层。

    它为分布式应用程序的灵活性和可伸缩性带来了许多好处,并对应用程序和基础架构的复

    消息系统仍然是不断发展的技术,如第3节所示,AMQP标准化工作指向了良好的方向,

    但仍然部分采用。

    消息代理是在许多项目和服务中用作传输层构建块的可靠且可靠的技术,

    无论是在物理界还是在外面。近年来,

    新一代系统正在推动低延迟/高吞吐量/数据密集型通信的消息传递,

    如第5节所述,缩小用例和放松假设,但将消息传递应用程序的界限推向新的领域。

     

    参考

    [1] AMQP(高级消息队列协议)http://www.amqp.org

    [2] STOMP(简单文本导向消息传递协议)http//stomp.gith ub.io [3] MQTTMQ遥测传输)http://mqtt.org

    [4] G HohpeB Woolf 2003 企业集成模式 Addison-Wesley Professional [5] Chirino H STOMP基准http://hiramchirino.com/stomp-benchmark

    [6] Kreps JNarkhede NRao J Kafka:用于日志处理的分布式消息系统。NetDB研讨会(雅典,希腊)

    [7] Hintjens P ZeroMQ:指南http://zeromq.org

    [8] Ehm F CERN的控制系统运行可靠的消息传递基础设施。 ICALEPCS2011会议录(法国格勒诺布尔)

    [9] Dworak AEhm FSliwinski WSobczak M 2011 中间件趋势和市场领导者2011

    ICALEPCS2011会议录(法国格勒诺布尔)

    [10] Kazarov AMiotto GLMagnoni L 2012 AAL项目:ATLAS数据采集基础设施的自动监测和智能分析。物理学杂志:会议系列,第368

    [11] Arkhipkin DLauret JBetts W 2011 STARs在线监控和元数据收集的消息排队框架。物理学杂志:会议系列,第331

    [12] Cons LPaladin M 2011 WLCG消息服务及其未来。物理学杂志:会议系列,第396

    展开全文
    21aspnet 2019-06-13 15:29:27
  • 今天学习的是谷歌大脑的同学 2017 年的工作《Neural Message Passing for Quantum Chemistry》,也就是我们经常提到的消息传递网络(Message Passing Neural Network,MPNN),目前引用数超过 900 次。 严格来说,...

    今天学习的是谷歌大脑的同学 2017 年的工作《Neural Message Passing for Quantum Chemistry》,也就是我们经常提到的消息传递网络(Message Passing Neural Network,MPNN),目前引用数超过 900 次。

    严格来说,MPNN 不是一个模型,而是一个框架。作者在这篇论文中主要将现有模型抽象其共性并提出成 MPNN 框架,同时利用 MPNN 框架在分子分类预测中取得了一个不错的成绩。

    1.Introduction

    深度学习被广泛应用于图像、音频、NLP 等领域,但在化学任务(分子分类等)中仍然使用中机器学习+特征工程的方式,其主要原因在于目前尚未有工作证明深度学习在这个领域能取得很大的成功。

    近年来,随着量子化学计算和分子动力学模拟等实验的展开产生了巨大的数据量,大多数经典的技术都无法有效利用目前的大数据集。而原子系统的对称性表明,能够应用于网络图中的神经网络也能够应用于分子模型。所以,找到一个更加强大的模型来解决目前的化学任务可以等价于找到一个适用于网络的模型。

    在这篇论文中,作者的目标是证明:能够应用于化学预测任务的模型可以直接从分子图中学习到分子的特征,并且不受到图同构的影响。为此,作者将应用于图上的监督学习框架称之为消息传递神经网络(MPNN),这种框架是从目前比较流行的支持图数据的神经网络模型中抽象出来的一些共性,抽象出来的目的在于理解它们之间的关系。

    鉴于目前已经有很多类似 MPNN 框架的模型,所以作者呼吁学者们应该将这个方法应用到实际的应用中,并且通过实际的应用来提出模型的改进版本,尽可能的去推广模型的实际应用。

    本文给出的一个例子是利用 MPNN 框架代替计算代价昂贵的 DFT 来预测有机分子的量子特性:

    2.MPNN

    本节内容分为两块,一块是看下作者如何从现有模型中抽象出 MPNN 框架,另一块是看下作者如何利用 MPNN 框架去解决实际问题。

    2.1 MPNN framework

    我们先来介绍下 MPNN 这一通用框架,并通过八篇文献来举例验证 MPNN 框架的通配性。

    简单起见,我们考虑无向图 G,节点 v 的特征为 x v x_v xv,边的特征为 e v w e_{vw} evw。前向传递有两个阶段:一个是消息传递阶段(Message Passing),另一个是读出阶段(Readout)。考虑消息传递阶段,消息函数定义为 M t M_t Mt,顶点更新函数定义为 U t U_t Ut,t 为运行的时间步。在消息传递过程中,隐藏层节点 v 的状态 h v t h_v^t hvt 可以被基于 m v t + 1 m_v^{t+1} mvt+1 进行更新:
    m v t + 1 = ∑ w ∈ N ( v ) M t ( h v t , h w t , e v w ) h v t + 1 = U t ( h v t , m v t + 1 ) \begin{aligned} m_v^{t+1} &= \sum_{w\in N(v)}M_t(h_v^t, h_w^t,e_{vw}) \\ h_v^{t+1} &= U_t(h_v^t, m_v^{t+1}) \end{aligned} \\ mvt+1hvt+1=wN(v)Mt(hvt,hwt,evw)=Ut(hvt,mvt+1)
    其中, N ( v ) N(v) N(v) 表示图 G 中节点 v 的邻居。

    读出阶段使用一个读出函数 R 来计算整张图的特征向量:
    y ^ = R ( h v T ∣ v ∈ G ) \hat y = R({h_v^T | v \in G}) \\ y^=R(hvTvG)
    消息函数 M t M_t Mt,向量更新函数 U t U_t Ut 和读出函数 R R R 都是可微函数。 R R R 作用于节点的状态集合,同时对节点的排列不敏感,这样才能保证 MPNN 对图同构保持不变。

    此外,我们也可以通过引入边的隐藏层状态来学习图中的每一条边的特征,并且同样可以用上面的等式进行学习和更新。

    接下来我们看下如何通过定义消息函数更新函数读出函数来适配不同种模型。

    Paper 1 : Convolutional Networks for Learning Molecular Fingerprints, Duvenaud et al. (2015)

    这篇论文中消息函数为:
    M ( h v , h w , e v w ) = ( h w , e v w ) M(h_v, h_w,e_{vw}) = (h_w,e_{vw}) \\ M(hv,hw,evw)=(hw,evw)
    其中 ( . , . ) (.,.) (.,.) 表示拼接(concat);

    节点的更新函数为:
    U t ( h v t , m v t + 1 ) = σ ( H t d e g ( v ) m v t + 1 ) U_t(h_v^t,m_v^{t+1}) = \sigma(H_t^{deg(v)}m_v^{t+1}) \\ Ut(hvt,mvt+1)=σ(Htdeg(v)mvt+1)
    其中 σ \sigma σ 为 sigmoid 函数, d e g ( v ) deg(v) deg(v) 表示节点 v 的度, H t v H_t^v Htv 是一个可学习的矩阵,t 为时间步,N 为节点度;

    读出函数 R 将先前所有隐藏层的状态 h v t h_v^t hvt 进行连接:
    R = f ( ∑ v , t s o f t m a x ( W t h v t ) ) R = f(\sum_{v,t}softmax(W_th_v^t)) \\ R=f(v,tsoftmax(Wthvt))
    其中 f 是一个神经网络, W t W_t Wt 是一个可学习的读出矩阵。

    这种消息传递阶段可能会存在一些问题,比如说最终的消息向量分别对连通的节点和连通的边求和 m v t + 1 = ( ∑ h w t , ∑ e v w ) m_v^{t+1}=(\sum h_w^t,\sum e_{vw}) mvt+1=(hwt,evw) 。由此可见,该模型实现的消息传递无法识别节点和边之间的相关性。

    Paper 2 : Gated Graph Neural Networks (GG-NN), Li et al. (2016)

    这篇论文比较有名,作者后续也是在这个模型的基础上进行改进的。

    GG-NN 使用的消息函数为:
    M t ( h v t , h w t , e v w ) = A e v w h w t M_t(h_v^t,h_w^t,e_{vw})=A_{e_{vw}}h_w^t \\ Mt(hvt,hwt,evw)=Aevwhwt
    其中 A e v w A_{e_{vw}} Aevw e v w e_{vw} evw 的一个可学习矩阵,每条边都会对应那么一个矩阵;

    更新函数为:
    U t ( h v t , m v t + 1 ) = G R U ( h v t , m v t + 1 ) U_t(h_v^t,m_v^{t+1}) = GRU(h_v^t, m_v^{t+1}) \\ Ut(hvt,mvt+1)=GRU(hvt,mvt+1)
    其中 G R U GRU GRU 为门控制单元(Gate Recurrent Unit)。该工作使用了权值捆绑,所以在每一个时间步 t 下都会使用相同的更新函数;

    读出函数 R 为:
    R = ∑ v ∈ V σ ( i ( h v ( T ) ) , h v 0 )    ⊙    ( j ( h v ( T ) ) ) R=\sum_{v\in V} \sigma(i(h_v^{(T)}),h_v^0)\; \odot \; (j(h_v^{(T)})) \\ R=vVσ(i(hv(T)),hv0)(j(hv(T)))
    其中 i 和 j 为神经网络, ⊙ \odot 表示元素相乘。

    Paper 3 : Interaction Networks, Battaglia et al. (2016)

    这篇论文考虑图中的节点和图结构,同时也考虑每个时间步下的节点级的影响。这种情况下更新函数的输入会多一些 ( h v , x v , m v ) (h_v,x_v,m_v) (hv,xv,mv),其中 $x_v $ 是一个外部向量,表示对顶点 v 的一些外部影响。

    这篇论文的消息函数 M ( h v , h w , e v w ) M(h_v,h_w,e_{vw}) M(hv,hw,evw) 是一个以 ( h v , h w , e v w ) (h_v,h_w,e_{vw}) (hv,hw,evw) 为输入的神经网络,节点更新函数 U ( h v , x v , m v ) U(h_v,x_v,m_v) U(hv,xv,mv) 是一个以 ( h v , x v , m v ) (h_v,x_v,m_v) (hv,xv,mv) 为输入的神经网络,最终会有一个图级别的输出 R = f ( ∑ v ∈ G h v T ) R=f(\sum_{v\in G}h_v^T) R=f(vGhvT) ,其中 f 是一个神经网络,输入是最终的隐藏层状态的和。在原论文中 T = 1 T=1 T=1

    Paper 4 : Molecular Graph Convolutions, Kearnes et al. (2016)

    这篇论文与其他 MPNN 稍微有些不同,主要区别在于考虑了边表示 e v , w t e_{v,w}^t ev,wt,并且在消息传递阶段会进行更新。

    消息传递函数用的是节点的消息:
    M t ( h v t , h w t , e v w t ) = e v w t M_t(h_v^t,h_w^t,e_{vw}^t)=e_{vw}^t Mt(hvt,hwt,evwt)=evwt
    节点的更新函数为:
    U t ( h v t , m v t + 1 ) = α ( W 1 ( α ( W 0 h v t ) , m v t + 1 ) ) U_t(h_v^t,m_v^{t+1}) = \alpha(W_1(\alpha(W_0h_v^t),m_v^{t+1})) Ut(hvt,mvt+1)=α(W1(α(W0hvt),mvt+1))
    其中 ( . , . ) (.,.) (.,.) 表示拼接(concat), α \alpha α 为 ReLU 激活函数, W 0 , W 1 W_0,W_1 W0,W1 为可学习权重矩阵;

    边状态的更新定义为:
    e v w t + 1 = U t ′ ( e v w t , h v t , h w t ) = α ( W 4 ( α ( W 2 , e v w t ) , α ( W 3 ( h v t , h w t ) ) ) ) \begin{aligned} e_{vw}^{t+1} &= U_t^{'}(e_{vw}^t, h_v^t, h_w^t) \\ &= \alpha(W_4(\alpha (W_2,e_{vw}^t), \alpha(W_3(h_v^t,h_w^t)))) \end{aligned} \\ evwt+1=Ut(evwt,hvt,hwt)=α(W4(α(W2,evwt),α(W3(hvt,hwt))))
    其中, W i W_i Wi 为可学习权重矩阵。

    Paper 5 : Deep Tensor Neural Networks, Schutt et al. (2017)

    消息函数为:
    M t = t a n h ( W f c ( ( W c f h w t + b 1 ) ⊙ ( W d f e v w + b 2 ) ) ) M_t = tanh(W^{fc}((W^{cf}h_w^t+b_1) \odot(W^{df}e_{vw}+b_2))) \\ Mt=tanh(Wfc((Wcfhwt+b1)(Wdfevw+b2)))
    其中 W f c , W c f , W d f W^{fc},W^{cf},W^{df} Wfc,Wcf,Wdf 为矩阵, b 1 , b 2 b_1,b_2 b1,b2 为偏置向量;

    更新函数为:
    U t ( h v t , m v t + 1 ) = h v t + m v t + 1 U_t(h_v^t,m_v^{t+1}) = h_v^t + m_v^{t+1} \\ Ut(hvt,mvt+1)=hvt+mvt+1
    读出函数通过单层隐藏层接受每个节点并且求和后输出:
    R = ∑ v N N ( h v T ) R = \sum_v NN(h_v^T) \\ R=vNN(hvT)
    Paper 6 : Laplacian Based Methods, Bruna et al. (2013); Defferrard et al. (2016); Kipf & Welling (2016)

    基于拉普拉斯矩阵的方法将图像中的卷积运算扩展到网络图 G 的邻接矩阵 A 中。

    在 Bruna et al. (2013); Defferrard et al. (2016); 的工作中,消息函数为:
    M t ( h v t , h w t ) = C v w t h w t M_t(h_v^t,h_w^t) = C_{vw}^t h_w^t \\ Mt(hvt,hwt)=Cvwthwt
    其中,矩阵 C v w t C_{vw}^t Cvwt 为拉普拉斯矩阵 L 的特征向量组成的矩阵;

    节点的更新函数为:
    U t ( h v t , m v t + 1 ) = σ ( m v t + 1 ) U_t(h_v^t, m_v^{t+1}) = \sigma(m_v^{t+1}) \\ Ut(hvt,mvt+1)=σ(mvt+1)
    其中, σ \sigma σ 为非线性的激活函数,比如说 ReLU。

    在 Kipf & Welling (2016) 的工作中,消息函数为:
    M t ( h v t , h w t ) = C v w h w t M_t(h_v^t,h_w^t) = C_{vw} h_w^t \\ Mt(hvt,hwt)=Cvwhwt
    其中, C v w = ( d e g ( v ) d e g ( w ) ) − 1 / 2 A v w C_{vw} = (deg(v)deg(w))^{-1/2}A_{vw} Cvw=(deg(v)deg(w))1/2Avw

    节点的更新函数为:
    U v t ( h v t , m v t + 1 ) = R e L U ( W t m v t + 1 ) U_v^t(h_v^t, m_v^{t+1}) = ReLU(W^t m_v^{t+1}) \\ Uvt(hvt,mvt+1)=ReLU(Wtmvt+1)
    可以看到以上模型都是 MPNN 框架的不同实例,所以作者呼吁大家应该致力于将这一框架应用于某个实际应用,并根据不同情况对关键部分进行修改,从而引导模型的改进,这样才能最大限度的发挥模型的能力。

    2.2 MPNN Variants

    本节来介绍下作者将 MPNN 框架应用于分子预测领域,提出了 MPNN 的变种,并以 QM9 数据集为例进行了实验。

    QM9 数据集中的分子大多数由碳氢氧氮等元素组成,并组成了约 134k 个有机分子,可以划分为四大类(具体类别不介绍了),任务是根据分子结构预测分子所属类别。

    作者主要是基于 GG-NN 来探索 MPNN 的多种改进方式(不同的消息函数、输出函数等),之所以用 GG-NN 是因为这是一个很强的 baseline。

    2.2.1 Message Functions

    首先来看下消息函数,可以以 GG-NN 中使用的消息函数开始,GG-NN 用的是矩阵乘法:
    M ( h v , h w , e v w ) = A e v w h w M(h_v,h_w,e_{vw}) = A_{e_{vw}}h_w \\ M(hv,hw,evw)=Aevwhw
    为了兼容边特征,作者提出了新的消息函数:
    M ( h v , h w , e v w ) = A ( e v w ) h w M(h_v,h_w,e_{vw}) = A(e_{vw})h_w \\ M(hv,hw,evw)=A(evw)hw
    其中, A ( e v w ) A(e_{vw}) A(evw) 是将边的向量 e v w e_{vw} evw 映射到 d×d 维矩阵的神经网络。

    矩阵乘法有一个特点,从节点 w 到节点 v 的函数仅与隐藏层状态 h w h_w hw 和边向量 e v w e_{vw} evw 有关,而和隐藏状态 h v t h_v^t hvt 无关。理论上来说,如果节点消息同时依赖于源节点 w 和目标节点 v 的话,网络的消息通道将会得到更有效的利用。所以也可以尝试去使用一种消息函数的变种:
    m v w = f ( h w t , h v t , e v w ) m_{vw} = f(h_w^t, h_v^t, e_{vw}) \\ mvw=f(hwt,hvt,evw)
    其中,f 为神经网络。

    2.2.2 Virtual Graph Elements

    其次看来下消息传递,作者探索了两种不同的消息传递方式。

    最简单的修改就是为没有连接的节点添加一个虚拟的边,这样消息便具有了更长的传播距离;

    此外,作者也尝试了使用潜在的“主”节点(master node),这个节点可以通过特殊的边来连接到图中任意一个节点。主节点充当了一个全局的暂存空间,每个节点都会在消息传递过程中通过主节点进行读取和写入。同时允许主节点具有自己的节点维度,以及内部更新函数(GRU)的单独权重。其目的同样是为了在传播阶段传播很长的距离。

    2.2.3 Readout Functions

    然后来看下读出函数,作者同样尝试了两种读出函数:

    首先是 GG-NN 中的读出函数:
    R = ∑ v ∈ V σ ( i ( h v ( T ) ) , h v 0 )    ⊙    ( j ( h v ( T ) ) ) R=\sum_{v\in V} \sigma(i(h_v^{(T)}),h_v^0)\; \odot \; (j(h_v^{(T)})) \\ R=vVσ(i(hv(T)),hv0)(j(hv(T)))
    此外也考虑 set2set 模型。set2set 模型是专门为在集合运算而设计的,并且相比简单累加节点的状态来说具有更强的表达能力。模型首先通过线性映射将数据映射到元组 ( h v t , x v ) (h_v^t, x_v) (hvt,xv) ,并将投影元组作为输入 T = { ( h v T , x v ) } T=\{(h_v^T,x_v) \} T={(hvT,xv)},然后经过 M 步计算后,set2set 模型会生成一个与节点顺序无关的 Graph-level 的 embeedding 向量,从而得到我们的输出向量。

    2.2.4 Multiple Towers

    最后考虑下 MPNN 的伸缩性。

    对一个稠密图来说,消息传递阶段的每一个时间步的时间复杂度为 O ( n 2 d 2 ) O(n^2d^2) O(n2d2),其中 n 为节点数,d 为向量维度,可以看到时间复杂度还是非常高的。

    为了解决这个问题作者将向量维度 d 拆分成 k 份,就变成了 k 个 d/k 维向量,并在传播过程中每个子向量分别进行传播和更新,最后再进行合并。此时的子向量时间复杂度为 O ( n 2 ( d / k ) 2 ) O(n^2(d/k)^2) O(n2(d/k)2),考虑 k 个子向量的时间复杂度为 O ( n 2 d 2 / k ) O(n^2d^2/k) O(n2d2/k)

    2.3 Input Representation

    这一节主要介绍 GNN 的输入。

    对于分子来说有很多可以提取的特征,比如说原子组成、化学键等,详细的特征列表如下图所示:

    对于邻接矩阵,作者模型尝试了三种边表示形式:

    化学图(Chemical Graph):在不考虑距离的情况下,邻接矩阵的值是离散的键类型:单键,双键,三键或芳香键;

    距离分桶(Distance bins):基于矩阵乘法的消息函数的前提假设是边信息是离散的,因此作者将键的距离分为 10 个 bin,比如说 [2,6] 中均匀划分 8 个 bin,[0,2] 为 1 个 bin,[6, +∞] 为 1 个 bin;

    原始距离特征(Raw distance feature):也可以同时考虑距离和化学键的特征,这时每条边都有自己的特征向量,此时邻接矩阵的每个实例都是一个 5 维向量,第一维是距离,其余思维是四种不同的化学键。

    4.Experiment

    来看一下实验结果,以 QM-9 数据集为例,共包含 130462 个分子,以 MAE 为评估指标。

    下图为现有算法和作者改进的算法之间的对比:

    下图为不考虑空间信息的结果:

    下图为考虑多塔模型和结果:

    5.Conclusion

    总结:作者从诸多模型中抽离出了 MPNN 框架,并且通过实验表明,具有消息函数、更新函数和读出函数的 MPNN 具有良好的归纳能力,可以用于预测分析特性,优于目前的 Baseline,并且无需进行复杂的特征工程。此外,实验结果也揭示了全局主节点和利用 set2set 模型的重要性,多塔模型也使得 MPNN 更具伸缩性,方便应用于大型图中。

    6.Reference

    1. 《Neural Message Passing for Quantum Chemistry》
    展开全文
    qq_27075943 2020-06-08 16:30:01
  • * 消息传递机制(同步回调,异步回调) * 作者:Ai * 时间:2018年4月29日13:12:39——2018年4月29日23:56:42 * 注释:在多线程和观察者模式的基础上,系统中使用消息传递(类似广播的方式 * 消息机制是面向对象...
    /**
     * 消息传递机制(同步回调,异步回调)
     * 作者:Ai
     * 时间:2018年4月29日13:12:39——2018年4月29日23:56:42
     * 注释:在多线程和观察者模式的基础上,系统中使用消息传递(类似广播的方式
     * 消息机制是面向对象中一种重要的编程思想
     * 我们每次点击鼠标,敲击键盘都相当于给操作系统一个消息,让操作系统或软件对我们的消息进行回应
     */
    import callback.Manager;
    import callback.Programmer;
    import message.IMessageProcess;
    import message.Message;
    import message.MessageType;
    import message.Windos;
    
    import java.util.Queue;
    import java.util.Timer;
    import java.util.concurrent.ArrayBlockingQueue;
    public class Start {
        public static void main(String[] args){
            //region 1.消息机制
            //消息队列:以队列(先进先出)存储消息
            //消息循环:不断的将位于队首的消息分发出去
            //消息处理:根据不同的消息类型做不同的处理
    
            //创建一个容量为10的消息队列
            Queue<Message> queue=new ArrayBlockingQueue<Message>(10);
    
            //创建一个窗体
            Windos windos=new Windos(queue);
            //为消息队列添加消息
            queue.add(new Message(windos, MessageType.系统事件,"系统启动"));
            queue.add(new Message(windos, MessageType.键盘事件,"键盘输入"));
            queue.add(new Message(windos, MessageType.鼠标事件,"鼠标点击"));
            //消息循环
            Message message=null;
            while ((message=queue.poll())!=null){
                ((IMessageProcess)message.getSource()).doMsg(message);
            }
            //endregion
    
            //region 2.同步回调,异步回调,
            /**
             * 回调方法:就是A类中调用B类中的某个方法C,然后B类中反过来调用A类中的方法D,D这个方法就叫回调方法
             * 假如我们现在有老板,组长,员工三个角色
             * 老板让组长去调查市场上的手机的使用率和电脑的使用率,方便下一个产品发布在那个平台
             * 组长当然不用自己亲自去调查,他可以让员工去调查
             */
            //假如说我们就是老板
            Manager manager=new Manager(new Programmer());
            //执行我下达的命令
            manager.entrust(Programmer.CallType.同步);
            //这样我们已经完成了回调,但是这样我们发现只有等员工完成调研后组长才会进行别的工作(同步回调)
            //在正常情况下,组长在安排好事情后就会去做其他事,只需员工在完成后通知他就行(异步回调)
            manager.entrust(Programmer.CallType.异步);
            //endregion
    
    
        }
    }
    

    message空间

    package message;
    
    /**
     * 处理消息接口
     */
    public interface IMessageProcess {
        void doMsg(Message message);
    }
    
    package message;
    
    /**
     * 消息类
     */
    public class Message {
        private Object source;//来源
        private MessageType messageType;//类型
        private String info;//消息
    
        //region get/set
        public Object getSource() {
            return source;
        }
    
        public void setSource(Object source) {
            this.source = source;
        }
    
        public MessageType getMessageType() {
            return messageType;
        }
    
        public void setMessageType(MessageType messageType) {
            this.messageType = messageType;
        }
    
        public String getInfo() {
            return info;
        }
    
        public void setInfo(String info) {
            this.info = info;
        }
        //endregion
    
    
        public Message(Object source, MessageType messageType, String info){
            this.source=source;
            this.messageType=messageType;
            this.info=info;
        }
    
    
    }
    
    package message;
    
    public enum MessageType {
        键盘事件,
        鼠标事件,
        系统事件
    }
    
    package message;
    
    import java.util.Queue;
    
    /**
     * 模拟窗体类
     */
    public class Windos implements IMessageProcess {
    
        //消息队列
        private Queue<Message> messageQueue;
        public Windos(Queue<Message> messageQueue){
            this.messageQueue=messageQueue;
        }
        //重写处理方法
        @Override
        public void doMsg(Message message) {
            switch (message.getMessageType()){
                case 系统事件:
                    onSystem(message);
                    break;
                case 键盘事件:
                    onKeybord(message);
                    break;
                case 鼠标事件:
                    onMouse(message);
                    break;
    
            }
        }
    
        /**
         * 系统事件
         */
        private static void onSystem(Message message){
            System.out.println("系统事件");
            System.out.println("消息:"+message.getInfo());
    
        }
    
        /**
         * 键盘事件
         * @param message
         */
        private static void onKeybord(Message message){
            System.out.println("键盘事件");
            System.out.println("消息:"+message.getInfo());
    
        }
    
        /**
         * 鼠标事件
         * @param message
         */
        private static void onMouse(Message message){
            System.out.println("鼠标事件");
            System.out.println("消息:"+message.getInfo());
        }
    
    }
    

    callback 空间

    package callback;
    
    public class Boss {
    
    }
    
    package callback;
    
    /**
     * 回调接口
     */
    public interface ICallback {
         boolean check(int result);
    }
    
    package callback;
    
    /**
     * 组长类
     */
    public class Manager implements ICallback {
        //要指派的员工
        private Programmer programmer;
    
        public Manager(Programmer programmer){
            this.programmer=programmer;
        }
    
        /**
         * 用于 Boss 下达的委托
         */
        public void entrust(Programmer.CallType type) {
            arrange(type);
        }
    
        // 进行安排下属进行 study 工作
        private void arrange(Programmer.CallType type) {
            System.out.println("Manager 正在为 Programmer 安排工作");
            programmer.study(Manager.this,type);
            System.out.println("为 Programmer 安排工作已经完成,Manager 做其他的事情去了。");
        }
    
        @Override
        public boolean check(int result) {
            if (result == 5) {
                return true;
            }
            return false;
        }
    }
    
    package callback;
    
    /**
     * 员工类
     */
    public class Programmer {
    
        public enum CallType {
            同步,
            异步
        }
    
        /**
         * 调研
         * @param callback
         * @param type
         */
        public void study(ICallback callback,CallType type) {
            if (type==CallType.同步){
                int result = 0;
                do {
                    result++;
                    System.out.println("第 " + result + " 次研究的结果");
                } while (!callback.check(result));
    
                System.out.println("调研任务结束");
            }
            else {
                //异步回调
                new StudyThread(callback).start();
            }
    
        }
    
    }
    
    package callback;
    
    /**
     * 调研线程
     */
    public class StudyThread extends Thread {
        ICallback callback;
        public StudyThread(ICallback callback){
            this.callback=callback;
        }
        @Override
        public void run() {
            int result = 0;
            do {
                result++;
                System.out.println("第 " + result + " 次研究的结果");
            } while (!callback.check(result));
    
            System.out.println("调研任务结束");
        }
    }
    

    以前写的,若有时间会增加详细说明

    展开全文
    qq_37446649 2019-04-24 19:58:24
  • 5星
    138KB cbnotes 2013-01-18 11:01:23
  • 5星
    56KB u010457371 2013-11-18 11:18:41
  • paladinzh 2019-03-09 17:13:17
  • 4星
    82KB shl329 2013-04-11 10:23:12
  • 20KB weixin_38744435 2019-09-18 10:28:02
  • 111KB weixin_38579899 2020-12-19 23:47:42
  • 121KB weixin_42105570 2021-02-12 23:10:49
  • 5星
    2KB zhuleiji 2012-03-20 14:17:43
  • qq_42698329 2020-08-15 15:31:22
  • z69183787 2019-07-04 14:22:39

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 643,368
精华内容 257,347
关键字:

消息传递