精华内容
参与话题
问答
  • ZeroMQ

    万次阅读 2017-06-13 16:08:18
    ZeroMQ概述 ZeroMQ是一种基于消息队列的多线程网络库,其对套接字类型、连接处理、帧、甚至路由的底层细节进行抽象,提供跨越多种传输协议的套接字。ZeroMQ是网络通信中新的一层,介于应用层和传输层之间(按照...

    1  ZeroMQ概述

    ZeroMQ是一种基于消息队列的多线程网络库,其对套接字类型、连接处理、帧、甚至路由的底层细节进行抽象,提供跨越多种传输协议的套接字。ZeroMQ是网络通信中新的一层,介于应用层和传输层之间(按照TCP/IP划分),其是一个可伸缩层,可并行运行,分散在分布式系统间。

    2  系统架构

    2.1总体架构

    ZeroMQ几乎所有的I/O操作都是异步的,主线程不会被阻塞。ZeroMQ会根据用户调用zmq_init函数时传入的接口参数,创建对应数量的I/O Thread。每个I/O Thread都有与之绑定的PollerPoller采用经典的Reactor模式实现,Poller根据不同操作系统平台使用不同的网络I/O模型(selectpollepolldevpollkequeue等)。主线程与I/O线程通过Mail Box传递消息来进行通信。Server开始监听或者Client发起连接时,在主线程中创建zmq_connecterzmq_listener,通过Mail Box发消息的形式将其绑定到I/O线程,I/O线程会把zmq_connecterzmq_listener添加到Poller中用以侦听读/写事件。ServerClient在第一次通信时,会创建zmq_init来发送identity,用以进行认证。认证结束后,双方会为此次连接创建Session,以后双方就通过Session进行通信。每个Session都会关联到相应的读/写管道, 主线程收发消息只是分别从管道中读/写数据。Session并不实际跟kernel交换I/O数据,而是通过pluginSession中的Engine来与kernel交换I/O数据。


    1总体架构

    2.2所处层次

             ZeroMQ不是单独的服务或者程序,仅仅是一套组件,其封装了网络通信、消息队列、线程调度等功能,向上层提供简洁的API,应用程序通过加载库文件,调用API函数来实现高性能网络通信。


    2所处层次

    2.3消息模型

    ZeroMQ将消息通信分成4种模型,分别是一对一结对模型(Exclusive-Pair)、请求回应模型(Request-Reply)、发布订阅模型(Publish-Subscribe)、推拉模型(Push-Pull)。这4种模型总结出了通用的网络通信模型,在实际中可以根据应用需要,组合其中的2种或多种模型来形成自己的解决方案。

    2.3.1   一对一结对模型

             最简单的1:1消息通信模型,可以认为是一个TCP Connection,但是TCP Server只能接受一个连接。数据可以双向流动,这点不同于后面的请求回应模型。

    2.3.2   请求回应模型

    由请求端发起请求,然后等待回应端应答。一个请求必须对应一个回应,从请求端的角度来看是发-收配对,从回应端的角度是收-发对。跟一对一结对模型的区别在于请求端可以是1~N个。该模型主要用于远程调用及任务分配等。Echo服务就是这种经典模型的应用。


    3请求回应模型

    2.3.3   发布订阅模型

             发布端单向分发数据,且不关心是否把全部信息发送给订阅端。如果发布端开始发布信息时,订阅端尚未连接上来,则这些信息会被直接丢弃。订阅端未连接导致信息丢失的问题,可以通过与请求回应模型组合来解决。订阅端只负责接收,而不能反馈,且在订阅端消费速度慢于发布端的情况下,会在订阅端堆积数据。该模型主要用于数据分发。天气预报、微博明星粉丝可以应用这种经典模型。


    4发布订阅模型

    2.3.4   推拉模型

    Server端作为Push端,而Client端作为Pull端,如果有多个Client端同时连接到Server端,则Server端会在内部做一个负载均衡,采用平均分配的算法,将所有消息均衡发布到Client端上。与发布订阅模型相比,推拉模型在没有消费者的情况下,发布的消息不会被消耗掉;在消费者能力不够的情况下,能够提供多消费者并行消费解决方案。该模型主要用于多任务并行。


    推拉模型

    2.4通信协议

    提供进程内、进程间、机器间、广播等四种通信协议。通信协议配置简单,用类似于URL形式的字符串指定即可,格式分别为inproc://ipc://tcp://pgm://ZeroMQ会自动根据指定的字符串解析出协议、地址、端口号等信息。

    3  工作流程


    基本流程

    4  性能分析

    目前,市面上类似的产品不少,主要有4种:MSMQ(微软产品)、ActiveMQJava)、RabbitMQ(Erlang)ZeroMQC++)。除ZeroMQ外,其它3款产品都是一个单独服务或者进程,需要单独安装和运行,且对环境有一定依赖。其中,MSMQ在非Windows平台下安装非常复杂,ActiveMQ需要目标机器上已经安装了JavaRabbitMQ需要Erlang环境。而ZeroMQ是以库的形式存在,由应用程序加载、运行即可。但是ZeroMQ仅提供非持久性的消息队列。

    7是来自于Internet的性能测试数据。显示的是每秒钟发送和接受的消息数。整个过程共产生1百万条1K的消息,测试环境为Windows Vista。从测试数据可以看出,ZeroMQ的性能远远高于其它3MQ

    但是测试数据仅供参考,因为缺少必须的环境参数和性能指标,比如:CPU参数、内存参数、消息模型、通信协议、极限时消耗CPU百分比、极限时消耗内存百分比等。


    7性能测试

    5  应用场景

    应用ZeroMQPush-Pull模型实现联众游戏服务器的“热插拔”、负载均衡和消息派发。按照如图8部署服务器,Push端充当Gateway,作为一组游戏服务器集群最上层的一个Proxy,起负载均衡的作用,所有Gameserver作为Pull端。当一个请求到达Push端(Gateway)时,Push端根据一定的分配策略将任务派发到Pull端(Gameserver)。以联众某款游戏A为例,游戏A刚上线时,预计最大同时在线人数是10W,单台Gameserver并发处理能力为1W,需要10Gameserver,由于游戏A可玩性非常好,半个月后最大同时在线人数暴增到50W,那么不需要在某天的凌晨将GatewayGameserver停机,只需要随时在机房新添加40Gameserver,启动并连接到Gateway即可。

    ZeroMQ中对ClientServer的启动顺序没有要求,Gameserver之间如果需要通信的话,Gameserver的应用层不需要管理这些细节,ZeroMQ已经做了重连处理。


    8应用场景

    6  总结

    6.1简单

    1、仅仅提供24API接口,风格类似于BSD Socket

    2、处理了网络异常,包括连接异常中断、重连等。

    3、改变TCP基于字节流收发数据的方式,处理了粘包、半包等问题,以msg为单位收发数据,结合Protocol Buffers,可以对应用层彻底屏蔽网络通信层。

    4、对大数据通过SENDMORE/RECVMORE提供分包收发机制。

    5、通过线程间数据流动来保证同一时刻任何数据都只会被一个线程持有,以此实现多线程的“去锁化”。

    6、通过高水位HWM来控制流量,用交换SWAP来转储内存数据,弥补HWM丢失数据的缺陷。

    7、服务器端和客户端的启动没有先后顺序。

    6.2灵活

    1、支持多种通信协议,可以灵活地适应多种通信环境,包括进程内、进程间、机器间、广播。

    2、支持多种消息模型,消息模型之间可以相互组合,形成特定的解决方案。

    6.3跨平台

    支持LinuxWindowsOS X等。

    6.4多语言

    可以绑定CC++Java.NETPython30多种开发语言。

    6.5高性能

    相对同类产品,性能卓越。

    展开全文
  • Zeromq

    2017-05-11 18:01:00
    为什么80%的码农都做不了架构师?>>> ...

    近期在做STF的二次开发(其实也算不上,只是添加一些小功能)

    STF使用zeromq进行消息的传递.zeromq几乎支持所有的开发语言(这里可以查看示例)

    STF是基于nodejs开发的,使用的zeromq开发包是zmq

    push/pull

    单向通讯:push负责发送消息,pull负责接收消息,在回调中处理消息;
    服务端给客户端发送消息: push作为服务端,pull作为客户端
    客户端给服务端发送消息: pull作为服务端,push作为客户端
    push.send(msg)
    pull.on(‘message’, callback)
    多个Client(pull)同时连接Server(push)时:
    比如:Client1,Client2,Client3,Client4按顺序分别连接上Server后
    Server发送的第1个消息由Client1接收
    Server发送的第2个消息由Client2接收
    Server发送的第3个消息由Client3接收
    Server发送的第4个消息由Client4接收
    Server发送的第5个消息由Client1接收

    push.js

    var zmq = require('zmq');
    var sock = zmq.socket('push');
    
    sock.bindSync('tcp://127.0.0.1:3000');
    console.log('Publisher bound to port 3000');
    
    var count = 1 
    setInterval(function() {
        // console.log('sending a multipart message envelop');
        sock.send(['kitty cats', 'meow!' + count++]);
    }, 500);

    pull.js

    var zmq = require('zmq');
    var sock = zmq.socket('pull');
    
    sock.connect('tcp://127.0.0.1:3000');
    console.log('Connected to port 3000');
    
    sock.on('message', function(topic, message) {
      console.log('received a message related to:', topic.toString(), 'containing message:', message.toString());
    });

    pub/sub

    单向通讯:pub负责发布消息,sub负责订阅消息(只会收到订阅的消息)
    服务端给客户端发送消息: pub作为服务端,sub作为客户端
    客户端给服务端发送消息: sub作为服务端,pub作为客户端
    pub.send(msg)
    sub.subscribe(title)
    sub.on(‘message’, callback)
    有多个sub时,pub发送的title消息,每个sub都可以收到,sub不会接收非title消息

    dealer/dealer

    双向通讯:服务端可以给客户端发消息,客户端也可以给服务端发消息
    dealer.send(msg)
    dealer.on(‘message’, callback)
    多个Client同时连接Server时:
    比如:Client1,Client2,Client3,Client4按顺序分别连接上Server后
    Server发送的第1个消息由Client1接收
    Server发送的第2个消息由Client2接收
    Server发送的第3个消息由Client3接收
    Server发送的第4个消息由Client4接收
    Server发送的第5个消息由Client1接收
     

     

     

    转载于:https://my.oschina.net/lhplj/blog/897752

    展开全文
  • zeromq

    2019-09-14 20:41:41
    ./configure--host=arm-none-linux-gnueabi 转载于:https://my.oschina.net/julian115/blog/471307...
    ./configure --host=arm-none-linux-gnueabi


    转载于:https://my.oschina.net/julian115/blog/471307

    展开全文
  • zeroMQ

    2013-01-10 09:13:16
    [+] Zeromq资源阅读Zeromq示例 Zeromq模式: http://blog.codingnow.com/2011/02/zeromq_message_patterns.html zeromq主页: http://www.zeromq.org/ Zeromq Guild: ...
    [+]
    
    1. Zeromq资源阅读
    2. Zeromq示例

    Zeromq模式:

    http://blog.codingnow.com/2011/02/zeromq_message_patterns.html

    zeromq主页:

    http://www.zeromq.org/

    Zeromq Guild:

    http://zguide.zeromq.org/page:all#Fixing-the-World

    Zeromq 中文简介:

    http://blog.csdn.net/program_think/article/details/6687076

    Zero wiki:

    http://en.wikipedia.org/wiki/%C3%98MQ

    zeromq系列:

    http://iyuan.iteye.com/blog/972949

    Zeromq资源阅读:

    ØMQ(Zeromq) 是一个更为高效的传输层

    优势是:

    1 程序接口库是一个并发框架

    2 在集群和超级计算机上表现得比TCP更快

    3 通过inproc, IPC, TCP, 和 multicast进行传播消息

    4 通过发散,订阅,流水线,请求的方式连接

    5 对于不定规模的多核消息传输应用使用异步IO

    6 有非常大并且活跃的开源社区

    7 支持30+的语言

    8 支持多种系统

     

    Zeromq定义为“史上最快的消息队列”

    从网络通信的角度看,它处于会话层之上,应用层之下。

    ØMQ (ZeroMQ, 0MQ, zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry whole messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N with patterns like fanout, pub-sub, task distribution, and request-reply. It's fast enough to be the fabric for clustered products. Its asynchronous I/O model gives you scalable multicore applications, built as asynchronous message-processing tasks. It has a score of language APIs and runs on most operating systems. ØMQ is from iMatix and is LGPL open source.


    Zeromq中传递的数据格式是由用户自己负责,就是说如果server发送的string是有带"\0"的,那么client就必须要知道有这个

     

    Pub_Sub模式。

    the subscriber will always miss the first messages that the publisher sends. This is because as the subscriber connects to the publisher (something that takes a small but non-zero time), the publisher may already be sending messages out.

    在这种模式下很可能发布者刚启动时发布的数据出现丢失,原因是用zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布(内部局域网没这么夸张的)。官网给了两个解决方案;1,发布者sleep一会再发送数据(这个被标注成愚蠢的);2,使用proxy。

    Zeromq示例:

    1 获取例子

    git clone --depth=1 git://github.com/imatix/zguide.git

    2 服务器端:

    (当服务器收到消息的时候,服务器回复“World”)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    <?php
        /*
        *  Hello World server
        *  Binds REP socket to tcp://*:5555
        *  Expects "Hello" from client, replies with "World"
        * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
        */
         
        $context = new ZMQContext(1);
         
        //  Socket to talk to clients
        $responder = new ZMQSocket($context, ZMQ::SOCKET_REP);
        $responder->bind("tcp://*:5555");
         
        while(true) {
            //  Wait for next request from client
            $request = $responder->recv();
            printf ("Received request: [%s]\n", $request);
         
            //  Do some 'work'
            sleep (1);
         
            //  Send reply back to client
            $responder->send("World");   
     
    }

    3 客户端:

    (客户端发送消息)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    <?php
        /*
        *  Hello World client
        *  Connects REQ socket to tcp://localhost:5555
        *  Sends "Hello" to server, expects "World" back
        * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
        */
         
        $context = new ZMQContext();
         
        //  Socket to talk to server
        echo "Connecting to hello world server…\n";
        $requester = new ZMQSocket($context, ZMQ::SOCKET_REQ);
        $requester->connect("tcp://localhost:5555");
         
        for($request_nbr = 0; $request_nbr != 10; $request_nbr++) {
            printf ("Sending request %d…\n", $request_nbr);
            $requester->send("Hello");
             
            $reply = $requester->recv();
            printf ("Received reply %d: [%s]\n", $request_nbr, $reply);
     
    }
    1
     

    天气气候订阅系统:(pub-sub)

    1 server端:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    <?php
        /*
        *  Weather update server
        *  Binds PUB socket to tcp://*:5556
        *  Publishes random weather updates
        * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
        */
         
        //  Prepare our context and publisher
        $context = new ZMQContext();
        $publisher = $context->getSocket(ZMQ::SOCKET_PUB);
        $publisher->bind("tcp://*:5556");
        $publisher->bind("ipc://weather.ipc");
         
        while (true) {
            //  Get values that will fool the boss
            $zipcode     = mt_rand(0, 100000);
            $temperature = mt_rand(-80, 135);
            $relhumidity = mt_rand(10, 60);
         
            //  Send message to all subscribers
            $update = sprintf ("%05d %d %d", $zipcode, $temperature, $relhumidity);
            $publisher->send($update);
        }

    2 client端:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    <?php
        /*
        *  Weather update client
        *  Connects SUB socket to tcp://localhost:5556
        *  Collects weather updates and finds avg temp in zipcode
        * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
        */
         
        $context = new ZMQContext();
         
        //  Socket to talk to server
        echo "Collecting updates from weather server…", PHP_EOL;
        $subscriber = new ZMQSocket($context, ZMQ::SOCKET_SUB);
        $subscriber->connect("tcp://localhost:5556");
         
        //  Subscribe to zipcode, default is NYC, 10001
        $filter = $_SERVER['argc'] > 1 ? $_SERVER['argv'][1] : "10001";
        $subscriber->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, $filter);
         
        //  Process 100 updates
        $total_temp = 0;
        for ($update_nbr = 0; $update_nbr < 100; $update_nbr++) {
            $string = $subscriber->recv();
            sscanf ($string, "%d %d %d", $zipcode, $temperature, $relhumidity);
            $total_temp += $temperature;
        }
        printf ("Average temperature for zipcode '%s' was %dF\n",
            $filter, (int) ($total_temp / $update_nbr));
    1
    ------------------------
    1
    pub-sub的proxy模式:
    1
    图示是:

    clip_image001_thumb[2]

    Proxy节点的代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    <?php
        /*
        *  Weather proxy device
        * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
        */
         
        $context = new ZMQContext();
         
        //  This is where the weather server sits
        $frontend = new ZMQSocket($context, ZMQ::SOCKET_SUB);
        $frontend->connect("tcp://192.168.55.210:5556");
         
        //  This is our public endpoint for subscribers
        $backend = new ZMQSocket($context, ZMQ::SOCKET_PUB);
        $backend->bind("tcp://10.1.1.0:8100");
         
        //  Subscribe on everything
        $frontend->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "");
         
        //  Shunt messages out to our own subscribers
        while(true) {
            while(true) {
                //  Process all parts of the message
                $message = $frontend->recv();
                $more = $frontend->getSockOpt(ZMQ::SOCKOPT_RCVMORE);
                $backend->send($message, $more ? ZMQ::SOCKOPT_SNDMORE : 0);
                if(!$more) {
                    break; // Last message part
                }
            }
     
    }
    其实就是proxy同时是作为pub又作为sub的

    ----------------------

    作者:yjf512(轩脉刃)

    出处:http://www.cnblogs.com/yjf512/

    展开全文

空空如也

1 2 3 4 5 ... 20
收藏数 6,362
精华内容 2,544
关键字:

zeromq