精华内容
下载资源
问答
  • 继续《Linux使用ZMQ实践“生产者-消费者”模型》 文章之后进一步思考: ZeroMQ通过隐藏了基础的socket操作,达到调用简明易懂的层次; 那么,如果某些场景下,又需要考虑到连接状态的维护,应该如何操作? ZeroMQ...

    1. 前言

    继续《Linux下使用ZMQ实践“生产者-消费者”模型》 文章之后进一步思考:
    ZeroMQ通过隐藏了基础的socket操作,达到调用简明易懂的层次;
    那么,如果某些场景下,又需要考虑到连接状态的维护,应该如何操作?
    ZeroMQ给出的解决方案就是zmq_socket_monitor

    2. 相关知识

    支持监控的事件:

    事件描述
    ZMQ_EVENT_CONNECTEDsocket已被成功连接
    ZMQ_EVENT_CONNECT_DELAYED连接动作被挂起
    ZMQ_EVENT_CONNECT_RETRIED连接失败,正在重试
    ZMQ_EVENT_LISTENING监听成功
    ZMQ_EVENT_BIND_FAILED绑定失败
    ZMQ_EVENT_ACCEPTED接受新连接
    ZMQ_EVENT_ACCEPT_FAILED接受新连接失败
    ZMQ_EVENT_CLOSEDsocket关闭(主动关闭)
    ZMQ_EVENT_CLOSE_FAILEDsocket关闭失败
    ZMQ_EVENT_DISCONNECTED连接意外关闭(被关闭)
    ZMQ_EVENT_MONITOR_STOPPED监控的socket消亡

    使用思路:将要监听的sock跟monitor关联,然后创建一个额外的ZMQ_PAIR,通过pair来获取sock上的事件。

    3.场景举例

    根据之前的“生产者-消费者”模型的一个改进:

    • 已知固定的消费者个数,如4个;
    • 生产者等待4个消费者全部启动后,才开始发送消息;
    • 生产者发送退出消息,等待消费者断开连接后才最后退出;

    在之前一对多的 Push-Pull 模式下,如果没有消费者连接,则生产者数据发送会一直阻塞,但如果有至少一个连接成功,则生产者进入发送数据阶段;在改进场景中,需求所有消费者就绪后,生产者才正式开始发送数据,达到一个理想的均衡状态。

    这样,我们就依赖monitor机制的实现,监听消费者的Push套件,额外增加一个监听器monitor:

    #define ADDR "tcp://127.0.0.1:555"
    #define MONITOR "inproc://monitor-server"
    	...
    	void *sock = zmq_socket(ctx, ZMQ_PUSH);
        void *mon  = zmq_socket(ctx, ZMQ_PAIR);
    	...
        zmq_bind(sock, ADDR);
        zmq_socket_monitor(sock, MONITOR, ZMQ_EVENT_ALL);
        zmq_connect(mon, MONITOR);
        ...
    

    下来,我们通过monitor等待4个消费者的连接事件,成功后才发送数据;
    发送数据完成后,我们通过发送“Quit”报文来通知消费者退出进程;
    完整的生产者代码如下:

    void test_producer(void *ctx, int times)
    {
        int ix = 0, cnt = 0, id = 0, event = 0;
        char request[1024];
    
        void *sock = zmq_socket(ctx, ZMQ_PUSH);
        void *mon  = zmq_socket(ctx, ZMQ_PAIR);
    
        s_set_id_num(sock, id);
        zmq_bind(sock, ADDR);
        zmq_socket_monitor(sock, MONITOR, ZMQ_EVENT_ALL);
        zmq_connect(mon, MONITOR);
    
        LOGN("Producer %d setup\n", id);
        for (cnt = 0; cnt < 4;) {
            event = get_monitor_event(mon, NULL, NULL);
            if (event == ZMQ_EVENT_ACCEPTED) {
                LOGN("Producer accepted\n");
                cnt++;
            }
        }
    
        LOGN("Producer %d start\n", id);
        for (ix = 0; ix < times; ix++) {
            snprintf(request, sizeof(request), "Data-%03d-%03d", id, ix);
            s_send(sock, request);
            LOGN("Producer %d send: %s\n", id, request);
            usleep(100 * 1000);
        }
    
        for (cnt = 0; cnt < 4;) {
            s_send(sock, "Quit"); // 通知一个消费者,退出一个消费者
            event = get_monitor_event(mon, NULL, NULL);
            if (event == ZMQ_EVENT_DISCONNECTED) {
                cnt++;
            }
        }
    
        LOGN("Producer %d stop\n", id);
        zmq_close(sock);
    }
    

    获取监听事件的接口为,get_monitor_event,该函数从ZeroMQ帮助手册摘抄下来:

    static int get_monitor_event (void *monitor, int *value, char **address)
    {   
        // First frame in message contains event number and value
        zmq_msg_t msg;
        zmq_msg_init (&msg);
        if (zmq_msg_recv (&msg, monitor, 0) == -1)
            return -1; // Interrupted, presumably
        assert (zmq_msg_more (&msg));
        
        uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
        uint16_t event = *(uint16_t *) (data);
        if (value) 
            *value = *(uint32_t *) (data + 2);
        
        // Second frame in message contains event address
        zmq_msg_init (&msg);
        if (zmq_msg_recv (&msg, monitor, 0) == -1)
            return -1; // Interrupted, presumably
        assert (!zmq_msg_more (&msg));
        
        if (address) {
            uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
            size_t size = zmq_msg_size (&msg); 
            *address = (char *) malloc (size + 1);
            memcpy (*address, data, size);
            (*address)[size] = 0;
        }
        return event;
    }
    

    然后消费者的实现,跟先前的例子差不多,就多了一个退出的判断:

    int test_consumer(void *ctx, int id)
    {
        int cnt = 0;
        char request[1024];
    
        void *sock = zmq_socket(ctx, ZMQ_PULL);
    
        s_set_id_num(sock, id);
        zmq_connect(sock, ADDR);
    
        LOGN("Consumer %d start\n", id);
        while (++cnt) {
            s_recv(sock, request);
            LOGN("Consumer %d recv: %s\n", id, request);
            usleep(300 * 1000);
    
            if (strcmp(request, "Quit") == 0) {
                break;
            }
        }
        LOGN("Consumer %d stop\n", id);
        zmq_close(sock);
    }
    

    最后,main函数功能,主要为fork,主进程做生产者,子进程做消费者;
    同时,为了方便起见,省略了waitpid回收子进程的动作;

    
    int main(int argc, char *argv[])
    {
        int ix = 0;
        void *ctx = zmq_ctx_new();
        srandom(time(NULL));
    
        /* 1x producter vs 4x consumer */
        for (ix= 1; ix <= 4; ix++) {
            pid_t pid = fork();
            if (pid == 0) {
                test_consumer(ctx, ix);
                goto out;
            }
        }
    
        test_producer(ctx, atoi(argv[1]));
        // TODO waitpid
    out:
        zmq_ctx_destroy(ctx);
        exit(EXIT_SUCCESS);
    }
    

    实际运行情况如下:

    [ 1561228921.433 ]: Consumer 1 start
    [ 1561228921.433 ]: Consumer 2 start
    [ 1561228921.434 ]: Consumer 4 start
    [ 1561228921.434 ]: Consumer 3 start
    [ 1561228921.434 ]: Producer 0 setup
    [ 1561228921.435 ]: Producer accepted
    [ 1561228921.496 ]: Producer accepted
    [ 1561228921.572 ]: Producer accepted
    [ 1561228921.572 ]: Producer accepted
    [ 1561228921.572 ]: Producer 0 start
    [ 1561228921.572 ]: Producer 0 send: Data-000-000
    [ 1561228921.574 ]: Consumer 3 recv: Data-000-000
    [ 1561228921.673 ]: Producer 0 send: Data-000-001
    [ 1561228921.774 ]: Producer 0 send: Data-000-002
    [ 1561228921.775 ]: Consumer 2 recv: Data-000-002
    [ 1561228921.876 ]: Producer 0 send: Data-000-003
    [ 1561228921.877 ]: Consumer 1 recv: Data-000-003
    [ 1561228921.978 ]: Producer 0 send: Data-000-004
    [ 1561228921.978 ]: Consumer 4 recv: Data-000-004
    [ 1561228922.079 ]: Producer 0 send: Data-000-005
    [ 1561228922.081 ]: Consumer 3 recv: Data-000-005
    [ 1561228922.183 ]: Producer 0 send: Data-000-006
    [ 1561228922.284 ]: Producer 0 send: Data-000-007
    [ 1561228922.285 ]: Consumer 2 recv: Data-000-007
    [ 1561228922.386 ]: Producer 0 send: Data-000-008
    [ 1561228922.387 ]: Consumer 1 recv: Data-000-008
    [ 1561228922.488 ]: Producer 0 send: Data-000-009
    [ 1561228922.488 ]: Consumer 4 recv: Data-000-009
    [ 1561228922.590 ]: Consumer 3 recv: Quit
    [ 1561228922.892 ]: Consumer 3 stop
    [ 1561228922.894 ]: Consumer 2 recv: Quit
    [ 1561228923.195 ]: Consumer 2 stop
    [ 1561228923.196 ]: Consumer 1 recv: Quit
    [ 1561228923.497 ]: Consumer 1 stop
    [ 1561228923.499 ]: Consumer 4 recv: Quit
    [ 1561228923.800 ]: Consumer 4 stop
    [ 1561228923.802 ]: Producer 0 stop
    

    可以看出,程序第一阶段,启动进程;第二阶段,发送数据,负载均衡;第三阶段,回收资源。

    4 结论

    ZMQ监控事件的方法,提供了一种可选的扩展场景支持,实际使用可以放主线程处理,也可以放独立的子线程处理。

    展开全文
  • 一、背景 上一篇文章《Linux使用ZMQ实践“请求-响应”服务器模型》中使用的是REP-REQ套件,该套件的特点是必须一个请求对应一个响应,如果在应用中不想使用同步处理的方式呢?ZMQ有没有提供异步处理的方法?答案...

    一、背景

        上一篇文章《Linux下使用ZMQ实践“请求-响应”服务器模型》中使用的是REP-REQ套件,该套件的特点是必须一个请求对应一个响应,如果在应用中不想使用同步处理的方式呢?ZMQ有没有提供异步处理的方法?答案是使用DEALER-ROUTER套件。

        另外如何在多线程中安全传递消息的方法可以参见《Linux下使用ZMQ实践“生产者-消费者”模型》ZMQ_PULL、ZMQ_PUSH的实践。

    二、相关知识

    1、ZMQ_DEALER

           ZMQ_DEALER
               A socket of type ZMQ_DEALER is an advanced pattern used for extending request/reply sockets.
               Each message sent is round-robined among all connected peers, and each message received is
               fair-queued from all connected peers.
    
               When a ZMQ_DEALER socket enters the mute state due to having reached the high water mark for
               all peers, or if there are no peers at all, then any zmq_send(3) operations on the socket
               shall block until the mute state ends or at least one peer becomes available for sending;
               messages are not discarded.
    
               When a ZMQ_DEALER socket is connected to a ZMQ_REP socket each message sent must consist of
               an empty message part, the delimiter, followed by one or more body parts.

        DEALER是一种用于请求/答应模式的更高级的扩展Socket,它可以自由的收发消息,没有ZMQ_REP/ZMQ_REQ那样的限制。对于每一个连接,接收消息也是使用了公平队列,发送使用了循环队列(RR)。

        ZMQ_DEALER受ZMQ_RCVHW和ZMQ_SNDHW两个阀值影响(可通过zmq_setsockopt函数设置),一旦发送或接收队列达到阀值,Socket就会进入mute状态,此时对DEALER的任何zmq_send操作都会阻塞,直到mute状态结束。如果当前没有有效的连接,zmq_send操作也会阻塞,直到有新的连接到来为止。DEALER发生阻塞并不会丢弃消息。

        注意:如果ZMQ_DEALER连接到ZMQ_REP,每一个消息包必须包含一个空帧,然后再紧跟着数据包体。

    2、ZMQ_ROUTER

        ZMQ_ROUTER           
               A socket of type ZMQ_ROUTER is an advanced socket type used for extending request/reply
               sockets. When receiving messages a ZMQ_ROUTER socket shall prepend a message part containing
               the routing id of the originating peer to the message before passing it to the application.
               Messages received are fair-queued from among all connected peers. When sending messages a
               ZMQ_ROUTER socket shall remove the first part of the message and use it to determine the
               routing id of the peer the message shall be routed to. If the peer does not exist anymore,
               or has never existed, the message shall be silently discarded. However, if
               ZMQ_ROUTER_MANDATORY socket option is set to 1, the socket shall fail with EHOSTUNREACH in
               both cases.

        ZMQ_ROUTER是一种用于请求/答应模式的更高级的扩展Socket,它可以自由的收发消息。当ZMQ_ROUTER接收到消息时,会自动在消息顶部加入来源地址标识符,接收消息使用了公平队列。

        

        当发送消息时,ZMQ_ROUTER又会自动去掉这个标识符,并且根据这个标识符路由到相应的端点。

        如果此地址标识的端点不存在,默认会毫无征兆的丢弃消息,除非将ZMQ_ROUTER_MANDATORY 选项设置为1。

               When a ZMQ_ROUTER socket enters the mute state due to having reached the high water mark for
               all peers, then any messages sent to the socket shall be dropped until the mute state ends.
               Likewise, any messages routed to a peer for which the individual high water mark has been
               reached shall also be dropped. If, ZMQ_ROUTER_MANDATORY is set to 1, the socket shall block
               or return EAGAIN in both cases.
        当队列达到阀值时,ZMQ_ROUTER Socket就会进入mute状态,此时所有后续发送到此Socket的消息都会被丢弃,直到mute状态结束。同样的,如果对端的接收队列达到了阀值,消息也会被丢弃。但是如果设置了ZMQ_ROUTER_MANDATORY 选项,消息不会丢弃,接口将等待发送完成后返回。


    三、实践

        代码基于REQ-REP版本进行修改,读者可以通过ROUTER_DEALER宏来看出与原来代码的区别;

        服务端主要是配合客户端实现,代码修改不大,仍然保持着一次请求、一次响应的功能;

        需要注意的地方就是Router收到Dealer的消息是包含标识头部的(Dealer来源标识),发送的时候需要使用sendmore把标识先发出去。

    int main(int argc, char *argv[])
    {
            void *ctx = zmq_ctx_new();
            server_master(ctx);
            zmq_ctx_destroy(ctx);
            exit(EXIT_SUCCESS);
    }
    
    static void *server_master(void *ctx)
    {
            int ret = 0;
            char id[16] = {0};
            char request[1024];
            char respone[1024];
    
    #ifdef DEALER_ROUTER
            void *server = zmq_socket(ctx, ZMQ_ROUTER);
    #else
            void *server = zmq_socket(ctx, ZMQ_REP);
    #endif
    
            s_set_id_ex(server, id, sizeof(id));
            zmq_bind(server, "ipc://server.ipc");
            zmq_pollitem_t items[] = {
                    { server, 0, ZMQ_POLLIN, 0 },
            };
    
            LOGN("Server %s start\n", id);
            while (1) {
                    ret = zmq_poll(items, 1 /* size */, 1000 /* ms */);
                    assert(ret >= 0);
    
            if (items[0].revents & ZMQ_POLLIN) {
    #ifdef DEALER_ROUTER
                            char peer[16] = {0};
                            s_recv(server, peer);
    #endif
                            s_recv(server, request);
                            LOGN("Server %s recv: %s\n", id, request);
    
                            //TODO something handle
                            sleep(1);
    
    #ifdef DEALER_ROUTER
                            s_sendmore(server, peer);
    #endif
                            snprintf(respone, sizeof(respone), "%s-World", request);
                            s_send(server, respone);
                            LOGN("Server %s send: %s\n", id, respone);
                    }
            }
    
            LOGN("Server %s Finish\n", id);
            zmq_close(server);
    }

        客户端的代码修改的比较多,主要是使用主线程推送请求,子线程中对请求进行响应处理,来达到异步的目的(或者另一种实现就是在一个线程中zmq_poll统一处理)

    int main(int argc, char *argv[])
    {
            void *ctx = zmq_ctx_new();
            srandom(time(NULL));
    #ifdef DEALER_ROUTER
            void *thread = zmq_threadstart(client_worker, ctx);
            client_master(ctx);
            zmq_threadclose(thread);
    #else
            client_task(ctx);
    #endif
            zmq_ctx_destroy(ctx);
            exit(EXIT_SUCCESS);
    }
    void client_master(void *ctx)
    {
            int ix;
            int roll = randof(1000);
            char request[1024];
            void *pusher = zmq_socket(ctx, ZMQ_PUSH);
    
            zmq_connect(pusher, "inproc://client.inproc");
    
            for (ix = 0; ix < TEST_TIMES; ix++) {
                    snprintf(request, sizeof(request), "Request-%03d-%03d", roll, ix);
                    s_send(pusher, request);
            }
    
            zmq_close(pusher);
    }

    主线程通过 ZMQ_PUSH 与子线程的 ZMQ_PULL 进行对接,子线程再转送给 ZMQ_DEALER 发送出去;

    由于子线程需要同时监听两个socket的收事件,所以使用了 zmq_poll 进行IO复用;

    void client_worker(void *ctx)
    {
            int ret = 0;
            int cnt = 0;
            char id[16] = {0};
            char request[1024];
            char respone[1024];
            void *puller = zmq_socket(ctx, ZMQ_PULL);
            void *dealer = zmq_socket(ctx, ZMQ_DEALER);
            
            s_set_id_ex(dealer, id, sizeof(id));
            zmq_connect(dealer, "ipc://server.ipc");
            zmq_bind(puller, "inproc://client.inproc");
            zmq_pollitem_t items[] = {
                    { puller, 0, ZMQ_POLLIN, 0 },
                    { dealer, 0, ZMQ_POLLIN, 0 }
            };
            LOGN("Client %s start\n", id);
            while (cnt < TEST_TIMES) {
                    ret = zmq_poll(items, 2 /* size */, 1000 /* ms */);
                    assert(ret >= 0);
            if (items[0].revents & ZMQ_POLLIN) {
                            s_recv(puller, request);
                            s_sendmore(puller, ""); // dealer
                            s_send(dealer, request);
                            LOGN("Client %s send: %s\n", id, request);
                    }
            if (items[1].revents & ZMQ_POLLIN) {
                            s_recv(dealer, respone);
                            cnt++;
                            LOGN("Client %s recv: %s\n", id, respone);
                            //TODO something handle
                    }
            }
            LOGN("Client %s finish\n", id);
            zmq_close(puller);
            zmq_close(dealer);
    }

    开启两个client(00FA与0064)、一个server,运行结果如下:

    [ 1520703616.843 ]: Client 00FA start
    [ 1520703616.844 ]: Client 00FA send: Request-251-000
    [ 1520703616.844 ]: Client 00FA send: Request-251-001
    [ 1520703616.844 ]: Client 00FA send: Request-251-002
    [ 1520703616.844 ]: Client 00FA send: Request-251-003
    [ 1520703616.844 ]: Client 00FA send: Request-251-004
    [ 1520703616.844 ]: Client 00FA send: Request-251-005
    [ 1520703616.844 ]: Client 00FA send: Request-251-006
    [ 1520703616.844 ]: Client 00FA send: Request-251-007
    [ 1520703616.844 ]: Client 00FA send: Request-251-008
    [ 1520703616.844 ]: Client 00FA send: Request-251-009
    [ 1520703622.523 ]: Client 00FA recv: Request-251-000-World
    [ 1520703624.523 ]: Client 00FA recv: Request-251-001-World
    [ 1520703626.525 ]: Client 00FA recv: Request-251-002-World
    [ 1520703628.527 ]: Client 00FA recv: Request-251-003-World
    [ 1520703630.528 ]: Client 00FA recv: Request-251-004-World
    [ 1520703632.531 ]: Client 00FA recv: Request-251-005-World
    [ 1520703634.534 ]: Client 00FA recv: Request-251-006-World
    [ 1520703636.535 ]: Client 00FA recv: Request-251-007-World
    [ 1520703638.544 ]: Client 00FA recv: Request-251-008-World
    [ 1520703639.541 ]: Client 00FA recv: Request-251-009-World
    [ 1520703639.541 ]: Client 00FA finish
    [ 1520703617.217 ]: Client 0064 start
    [ 1520703617.217 ]: Client 0064 send: Request-609-000
    [ 1520703617.217 ]: Client 0064 send: Request-609-001
    [ 1520703617.217 ]: Client 0064 send: Request-609-002
    [ 1520703617.217 ]: Client 0064 send: Request-609-003
    [ 1520703617.217 ]: Client 0064 send: Request-609-004
    [ 1520703617.217 ]: Client 0064 send: Request-609-005
    [ 1520703617.217 ]: Client 0064 send: Request-609-006
    [ 1520703617.217 ]: Client 0064 send: Request-609-007
    [ 1520703617.217 ]: Client 0064 send: Request-609-008
    [ 1520703617.217 ]: Client 0064 send: Request-609-009
    [ 1520703620.521 ]: Client 0064 recv: Request-609-000-World
    [ 1520703621.521 ]: Client 0064 recv: Request-609-001-World
    [ 1520703623.522 ]: Client 0064 recv: Request-609-002-World
    [ 1520703625.524 ]: Client 0064 recv: Request-609-003-World
    [ 1520703627.526 ]: Client 0064 recv: Request-609-004-World
    [ 1520703629.527 ]: Client 0064 recv: Request-609-005-World
    [ 1520703631.529 ]: Client 0064 recv: Request-609-006-World
    [ 1520703633.531 ]: Client 0064 recv: Request-609-007-World
    [ 1520703635.534 ]: Client 0064 recv: Request-609-008-World
    [ 1520703637.537 ]: Client 0064 recv: Request-609-009-World
    [ 1520703637.537 ]: Client 0064 finish
    [ 1520703619.450 ]: Server 00C8 start
    [ 1520703619.519 ]: Server 00C8 recv: Request-609-000
    [ 1520703620.521 ]: Server 00C8 send: Request-609-000-World
    [ 1520703620.521 ]: Server 00C8 recv: Request-609-001
    [ 1520703621.521 ]: Server 00C8 send: Request-609-001-World
    [ 1520703621.521 ]: Server 00C8 recv: Request-251-000
    [ 1520703622.522 ]: Server 00C8 send: Request-251-000-World
    [ 1520703622.522 ]: Server 00C8 recv: Request-609-002
    [ 1520703623.522 ]: Server 00C8 send: Request-609-002-World
    [ 1520703623.523 ]: Server 00C8 recv: Request-251-001
    [ 1520703624.523 ]: Server 00C8 send: Request-251-001-World
    [ 1520703624.524 ]: Server 00C8 recv: Request-609-003
    [ 1520703625.524 ]: Server 00C8 send: Request-609-003-World
    [ 1520703625.524 ]: Server 00C8 recv: Request-251-002
    [ 1520703626.525 ]: Server 00C8 send: Request-251-002-World
    [ 1520703626.525 ]: Server 00C8 recv: Request-609-004
    [ 1520703627.526 ]: Server 00C8 send: Request-609-004-World
    [ 1520703627.526 ]: Server 00C8 recv: Request-251-003
    [ 1520703628.527 ]: Server 00C8 send: Request-251-003-World
    [ 1520703628.527 ]: Server 00C8 recv: Request-609-005
    [ 1520703629.527 ]: Server 00C8 send: Request-609-005-World
    [ 1520703629.528 ]: Server 00C8 recv: Request-251-004
    [ 1520703630.528 ]: Server 00C8 send: Request-251-004-World
    [ 1520703630.529 ]: Server 00C8 recv: Request-609-006
    [ 1520703631.529 ]: Server 00C8 send: Request-609-006-World
    [ 1520703631.530 ]: Server 00C8 recv: Request-251-005
    [ 1520703632.530 ]: Server 00C8 send: Request-251-005-World
    [ 1520703632.531 ]: Server 00C8 recv: Request-609-007
    [ 1520703633.531 ]: Server 00C8 send: Request-609-007-World
    [ 1520703633.532 ]: Server 00C8 recv: Request-251-006
    [ 1520703634.532 ]: Server 00C8 send: Request-251-006-World
    [ 1520703634.533 ]: Server 00C8 recv: Request-609-008
    [ 1520703635.533 ]: Server 00C8 send: Request-609-008-World
    [ 1520703635.534 ]: Server 00C8 recv: Request-251-007
    [ 1520703636.535 ]: Server 00C8 send: Request-251-007-World
    [ 1520703636.536 ]: Server 00C8 recv: Request-609-009
    [ 1520703637.537 ]: Server 00C8 send: Request-609-009-World
    [ 1520703637.538 ]: Server 00C8 recv: Request-251-008
    [ 1520703638.541 ]: Server 00C8 send: Request-251-008-World
    [ 1520703638.541 ]: Server 00C8 recv: Request-251-009
    [ 1520703639.541 ]: Server 00C8 send: Request-251-009-World

        从客户端信息来看,10个请求快速地就发送出去了,然后再下来的20秒内,大约每2秒能够获取到一次响应;

        从服务端信息上来看,对于两个客户端的处理基本是轮流地处理的;

    四、总结

        应用可以通过 ROUTER-DEALER套件来实现异步的“请求-响应" 处理,在消息的处理过程中只要注意空帧的发送、标识头的处理就行;



    参考文章:

    [1] http://zguide.zeromq.org/page:all



        ZMQ_DEALER受ZMQ_RCVHW和ZMQ_SNDHW两个阀值影响(可通过zmq_setsockopt函数设置),一旦发送或接收队列达到阀值,Socket就会进入mute状态,此时对DEALER的任何zmq_send操作都会阻塞,直到mute状态结束。如果当前没有有效的连接,zmq_send操作也会阻塞,直到有新的连接到来为止。DEALER发生阻塞并不会丢弃消息。
    展开全文
  • 一、背景 上一篇文章《Linux使用ZMQ实践“请求-响应”模型》引入了ZMQ的实践案例,本章继续实践编程模型中常用的“生产者-消费者”模型。二、相关知识 ZMQ_PUSH、ZMQ_PULL模型是单发单收的模型,你只能在ZMQ_...

    一、背景

        上一篇文章《Linux下使用ZMQ实践“请求-响应”模型》引入了ZMQ的实践案例,本章继续实践编程模型中常用的“生产者-消费者”模型。

    二、相关知识

        ZMQ_PUSH、ZMQ_PULL模型是单发单收的模型,你只能在ZMQ_PUSH套接字上进行send操作,而不能进行recv,反之一样:

           ZMQ_PUSH
               A socket of type ZMQ_PUSH is used by a pipeline node to send messages to
               downstream pipeline nodes. Messages are round-robined to all connected
               downstream nodes. The zmq_recv() function is not implemented for this socket
               type.
    
               When a ZMQ_PUSH socket enters the mute state due to having reached the high
               water mark for all downstream nodes, or if there are no downstream nodes at
               all, then any zmq_send(3) operations on the socket shall block until the mute
               state ends or at least one downstream node becomes available for sending;
               messages are not discarded.
           ZMQ_PULL
               A socket of type ZMQ_PULL is used by a pipeline node to receive messages from
               upstream pipeline nodes. Messages are fair-queued from among all connected
               upstream nodes. The zmq_send() function is not implemented for this socket
               type.

    三、实践

    1、“多个生产者-单个消费者”模型


    先从main函数入手,当有传参时(./queue 100)走生成者逻辑、无参数时(./queue)走消费者逻辑

    int main(int argc, char *argv[])
    {
            void *ctx = zmq_ctx_new();
            srandom(time(NULL));
    
            if (argc > 1) {
                    test_producer(ctx, atoi(argv[1]));
            }
            else {
                    test_consumer(ctx);
            }
            zmq_ctx_destroy(ctx);
            exit(EXIT_SUCCESS);
    }

    消费者循环等待数据到来

    int test_consumer(void *ctx)
    {
            int cnt = 0;
            char id[16] = {0};
            char request[1024];
            void *sock = zmq_socket(ctx, ZMQ_PULL);
            s_set_id_ex(sock, id, sizeof(id));
            zmq_bind(sock, "ipc://queue.ipc");
            LOGN("Consumer %s start\n", id);
            while (++cnt) {
                    s_recv(sock, request);
                    LOGN("Consumer %s recv: %s\n", id, request);
                    usleep(300 * 1000);
            }
            LOGN("Consumer %s stop\n", id);
            zmq_close(sock);
    }

    生产者则进行消息推送,在消息体中加入来源id、序列号信息

    void test_producer(void *ctx, int times)
    {
            int ix = 0;
            char id[16] = {0};
            char request[1024];
            void *sock = zmq_socket(ctx, ZMQ_PUSH);
            s_set_id_ex(sock, id, sizeof(id));
            zmq_connect(sock, "ipc://queue.ipc");
            LOGN("Producer %s start\n", id);
            for (ix = 0; ix < times; ix++) {
                    snprintf(request, sizeof(request), "Data-%03s-%03d", id, ix);
                    s_send(sock, request);
                    LOGN("Producer %s send: %s\n", id, request);
                    usleep(300 * 1000);
            }
            LOGN("Producer %s stop\n", id);
            zmq_close(sock);
    }

    运行结果:producer-0027、producer-0082同时给consumer-00C7推送消息,consumer侧进行类似“负载均衡”的消息处理

    ./queue 10                 
    [ 1520654736.979 ]: Producer 0027 start
    [ 1520654736.979 ]: Producer 0027 send: Data-0027-000
    [ 1520654737.293 ]: Producer 0027 send: Data-0027-001
    [ 1520654737.606 ]: Producer 0027 send: Data-0027-002
    [ 1520654737.917 ]: Producer 0027 send: Data-0027-003
    [ 1520654738.222 ]: Producer 0027 send: Data-0027-004
    [ 1520654738.535 ]: Producer 0027 send: Data-0027-005
    [ 1520654738.848 ]: Producer 0027 send: Data-0027-006
    [ 1520654739.158 ]: Producer 0027 send: Data-0027-007
    [ 1520654739.470 ]: Producer 0027 send: Data-0027-008
    [ 1520654739.775 ]: Producer 0027 send: Data-0027-009
    [ 1520654740.088 ]: Producer 0027 stop
    ./queue 10                 
    [ 1520654737.390 ]: Producer 0082 start
    [ 1520654737.390 ]: Producer 0082 send: Data-0082-000
    [ 1520654737.698 ]: Producer 0082 send: Data-0082-001
    [ 1520654738.011 ]: Producer 0082 send: Data-0082-002
    [ 1520654738.316 ]: Producer 0082 send: Data-0082-003
    [ 1520654738.629 ]: Producer 0082 send: Data-0082-004
    [ 1520654738.941 ]: Producer 0082 send: Data-0082-005
    [ 1520654739.251 ]: Producer 0082 send: Data-0082-006
    [ 1520654739.564 ]: Producer 0082 send: Data-0082-007
    [ 1520654739.869 ]: Producer 0082 send: Data-0082-008
    [ 1520654740.174 ]: Producer 0082 send: Data-0082-009
    [ 1520654740.487 ]: Producer 0082 stop
    ./queue                  
    [ 1520654721.190 ]: Consumer 00C7 start
    [ 1520654736.980 ]: Consumer 00C7 recv: Data-0027-000
    [ 1520654737.294 ]: Consumer 00C7 recv: Data-0027-001
    [ 1520654737.606 ]: Consumer 00C7 recv: Data-0082-000
    [ 1520654737.917 ]: Consumer 00C7 recv: Data-0082-001
    [ 1520654738.222 ]: Consumer 00C7 recv: Data-0082-002
    [ 1520654738.535 ]: Consumer 00C7 recv: Data-0082-003
    [ 1520654738.848 ]: Consumer 00C7 recv: Data-0082-004
    [ 1520654739.158 ]: Consumer 00C7 recv: Data-0082-005
    [ 1520654739.470 ]: Consumer 00C7 recv: Data-0082-006
    [ 1520654739.775 ]: Consumer 00C7 recv: Data-0082-007
    [ 1520654740.088 ]: Consumer 00C7 recv: Data-0082-008
    [ 1520654740.393 ]: Consumer 00C7 recv: Data-0082-009
    [ 1520654740.697 ]: Consumer 00C7 recv: Data-0027-002
    [ 1520654741.010 ]: Consumer 00C7 recv: Data-0027-003
    [ 1520654741.315 ]: Consumer 00C7 recv: Data-0027-004
    [ 1520654741.615 ]: Consumer 00C7 recv: Data-0027-005
    [ 1520654741.921 ]: Consumer 00C7 recv: Data-0027-006
    [ 1520654742.226 ]: Consumer 00C7 recv: Data-0027-007
    [ 1520654742.538 ]: Consumer 00C7 recv: Data-0027-008
    [ 1520654742.851 ]: Consumer 00C7 recv: Data-0027-009

    由于使用多生产者、单消费者,所以在生产者中使用 zmq_connect() 方法,消费者中使用 zmq_bind() 方法;

    (多个进程无法对同一个地址进行绑定)

    2、“单生产者-多消费者”模型

    代码只要进行微小改动:生产者中使用 zmq_bind() 方法,消费者中使用 zmq_connect() 方法;

    运行结果如下:producer-0084推出10个消息、分别被consumer-0003、consumer-00CA接收处理

    ./queue 10
    [ 1520656647.700 ]: Producer 0084 start
    [ 1520656647.848 ]: Producer 0084 send: Data-0084-000
    [ 1520656648.152 ]: Producer 0084 send: Data-0084-001
    [ 1520656648.464 ]: Producer 0084 send: Data-0084-002
    [ 1520656648.777 ]: Producer 0084 send: Data-0084-003
    [ 1520656649.089 ]: Producer 0084 send: Data-0084-004
    [ 1520656649.396 ]: Producer 0084 send: Data-0084-005
    [ 1520656649.709 ]: Producer 0084 send: Data-0084-006
    [ 1520656650.021 ]: Producer 0084 send: Data-0084-007
    [ 1520656650.324 ]: Producer 0084 send: Data-0084-008
    [ 1520656650.637 ]: Producer 0084 send: Data-0084-009
    [ 1520656650.950 ]: Producer 0084 stop
    ./queue 
    [ 1520656640.402 ]: Consumer 0003 start
    [ 1520656648.464 ]: Consumer 0003 recv: Data-0084-002
    [ 1520656649.089 ]: Consumer 0003 recv: Data-0084-004
    [ 1520656649.709 ]: Consumer 0003 recv: Data-0084-006
    [ 1520656650.324 ]: Consumer 0003 recv: Data-0084-008
    ./queue 
    [ 1520656638.389 ]: Consumer 00CA start
    [ 1520656647.848 ]: Consumer 00CA recv: Data-0084-000
    [ 1520656648.152 ]: Consumer 00CA recv: Data-0084-001
    [ 1520656648.777 ]: Consumer 00CA recv: Data-0084-003
    [ 1520656649.396 ]: Consumer 00CA recv: Data-0084-005
    [ 1520656650.021 ]: Consumer 00CA recv: Data-0084-007
    [ 1520656650.637 ]: Consumer 00CA recv: Data-0084-009

    四、总结

        使用ZMQ_PUSH、ZMQ_PULL队列模式可以方便地实现“单生产者-单消费者”、“多生产者-单消费者”、“单生产者-多消费者”模型,支持多线程、多进程的处理,而且在应用层可以完全不用考虑队列加锁的问题(ZMQ内部处理),开发起来效率更快;

        ZMQ的编程哲学就是消息传递来避免锁:我们需要并行地做一个计算处理,不是直接去调用计算单元的接口(这样会涉及到多线程竞争的问题),而是通过传递消息的形式(请求-响应、负载均衡、远程过程调用)给计算子单元去处理;

        看到这里,同时也引发了一个问题:“多生产者-多消费者”应该如何设计呢?这个场景在分布式处理用的就比较多了,而且实现难度更大(如何保证消息来源去向一致、如何保证消息可靠性),这也就不仅仅是PUSH、PULL能解决了,ZMQ还有其他套件呢,让我们拭目以待!


    参考文章:

    [1] http://zguide.zeromq.org/page:all

    展开全文
  • 一、背景 继续ZMQ系列,本期我们看一下“单生产者-多消费者”的编程场景,使用ZMQ_PUB/ZMQ_SUB实践“发布-订阅”模型二、相关知识2.1 ZMQ_PUBZMQ_PUB A socket of type ZMQ_PUB is used by a publisher to ...

    一、背景

        继续ZMQ系列,本期我们看一下“单生产者-多消费者”的编程场景,使用ZMQ_PUB/ZMQ_SUB实践“发布-订阅”模型

    二、相关知识

    2.1 ZMQ_PUB

    ZMQ_PUB
        A socket of type ZMQ_PUB is used by a publisher to distribute data. Messages sent are distributed in a fan out fashion to all connected peers. The
        zmq_recv(3) function is not implemented for this socket type.
    
        When a ZMQ_PUB socket enters the mute state due to having reached the high water mark for a subscriber, then any messages that would be sent to the
        subscriber in question shall instead be dropped until the mute state ends. The zmq_send() function shall never block for this socket type.

    ZMQ_PUB为发布端socket类型,用于消息分发,消息以扇出的方式分发到各个连接端上。该socket类型仅支持zmq_send进行发送,不支持zmq_recv()。注意当订阅者处理速度慢的时候,需要在PUB设置合适的高水位HWM来保证消息不会丢失。

    2.2 ZMQ_SUB

    ZMQ_SUB
        A socket of type ZMQ_SUB is used by a subscriber to subscribe to data distributed by a publisher. Initially a ZMQ_SUB socket is not subscribed to any
        messages, use the ZMQ_SUBSCRIBE option of zmq_setsockopt(3) to specify which messages to subscribe to. The zmq_send() function is not implemented for
        this socket type.

    ZMQ_SUB为订阅端socket类型,用于订阅接收发布者发送的消息。需要通过设置 ZMQ_SUBSCRIBE 选项来指定订阅哪种消息。该socket不支持zmq_send()方法。 

    2.3 使用方法

    使用方法为一个PUB对应多个SUB,如图所示:


    三、实现

    Publisher代码如下,绑定在127.0.0.1:5443 上进行监听

    void test_pub(void *ctx, int times)
    {
            int ret = 0, ix = 0;
            char id[16] = {0};
            char request[1024];
    
            void *sock = zmq_socket(ctx, ZMQ_PUB);
            assert(sock);
    
            s_set_id_ex(sock, id, sizeof(id));
    
            ret = zmq_bind(sock, "tcp://127.0.0.1:5443");
            assert(ret == 0);
    
            LOGN("Pub %s start\n", id);
            for (ix = 0; ix < times; ix++) {
                    snprintf(request, sizeof(request), "Data-%03s-%03d", id, ix);
                    s_send(sock, request);
                    LOGN("Pub %s send: %s\n", id, request);
                    usleep(300 * 1000);
            }
            LOGN("Pub %s stop\n", id);
    
            zmq_close(sock);
    }

    Subscriber则进行地址连接,进行接收:

    int test_sub(void *ctx)
    {
            int ret = 0, cnt = 0;
            char id[16] = {0};
            char request[1024];
    
            void *sock = zmq_socket(ctx, ZMQ_SUB);
            assert(sock);
    
            s_set_id_ex(sock, id, sizeof(id));
            ret = zmq_connect(sock, "tcp://127.0.0.1:5443");
            assert(ret == 0);
    
            ret = zmq_setsockopt(sock, ZMQ_SUBSCRIBE, "", 0);
            assert(ret == 0);
    
            LOGN("Sub %s start\n", id);
            while (++cnt) {
                    s_recv(sock, request);
                    LOGN("Sub %s recv: %s\n", id, request);
                    usleep(300 * 1000);
            }
            LOGN("Sub %s stop\n", id);
    
            zmq_close(sock);
    }

    主函数入口:

    int main(int argc, char *argv[])
    {
            void *ctx = zmq_ctx_new();
            assert(ctx);
    
            srandom(time(NULL));
    
            if (argc > 1) {
                    test_pub(ctx, atoi(argv[1]));
            }
            else {
                    test_sub(ctx);
            }
            zmq_ctx_destroy(ctx);
            exit(EXIT_SUCCESS);
    }

    开启2个Subscriber、1个Publisher,执行结果:

    Subscriber#1、Subscriber#2 先启动后阻塞住,开启Publisher后才收到消息:

    ./pubsub 
    [ 1528395731.593 ]: Sub 0034 start
    [ 1528395745.306 ]: Sub 0034 recv: Data-00B9-001
    [ 1528395745.607 ]: Sub 0034 recv: Data-00B9-002
    [ 1528395745.907 ]: Sub 0034 recv: Data-00B9-003
    [ 1528395746.209 ]: Sub 0034 recv: Data-00B9-004
    [ 1528395746.509 ]: Sub 0034 recv: Data-00B9-005
    [ 1528395746.810 ]: Sub 0034 recv: Data-00B9-006
    [ 1528395747.111 ]: Sub 0034 recv: Data-00B9-007
    [ 1528395747.413 ]: Sub 0034 recv: Data-00B9-008
    [ 1528395747.714 ]: Sub 0034 recv: Data-00B9-009
    ./pubsub 
    [ 1528395738.672 ]: Sub 00B7 start
    [ 1528395745.306 ]: Sub 00B7 recv: Data-00B9-001
    [ 1528395745.607 ]: Sub 00B7 recv: Data-00B9-002
    [ 1528395745.907 ]: Sub 00B7 recv: Data-00B9-003
    [ 1528395746.209 ]: Sub 00B7 recv: Data-00B9-004
    [ 1528395746.509 ]: Sub 00B7 recv: Data-00B9-005
    [ 1528395746.810 ]: Sub 00B7 recv: Data-00B9-006
    [ 1528395747.111 ]: Sub 00B7 recv: Data-00B9-007
    [ 1528395747.413 ]: Sub 00B7 recv: Data-00B9-008
    [ 1528395747.714 ]: Sub 00B7 recv: Data-00B9-009

    Publisher:

    ./pubsub 10 
    [ 1528395745.004 ]: Pub 00B9 start
    [ 1528395745.004 ]: Pub 00B9 send: Data-00B9-000
    [ 1528395745.306 ]: Pub 00B9 send: Data-00B9-001
    [ 1528395745.606 ]: Pub 00B9 send: Data-00B9-002
    [ 1528395745.907 ]: Pub 00B9 send: Data-00B9-003
    [ 1528395746.208 ]: Pub 00B9 send: Data-00B9-004
    [ 1528395746.509 ]: Pub 00B9 send: Data-00B9-005
    [ 1528395746.809 ]: Pub 00B9 send: Data-00B9-006
    [ 1528395747.111 ]: Pub 00B9 send: Data-00B9-007
    [ 1528395747.412 ]: Pub 00B9 send: Data-00B9-008
    [ 1528395747.714 ]: Pub 00B9 send: Data-00B9-009
    [ 1528395748.014 ]: Pub 00B9 stop
    netstat -anpt|grep pubsub
    tcp        0      0 127.0.0.1:5443          0.0.0.0:*               LISTEN      31141/pubsub        
    tcp        0      0 127.0.0.1:36680         127.0.0.1:5443          ESTABLISHED 31074/pubsub        
    tcp        0      0 127.0.0.1:36678         127.0.0.1:5443          ESTABLISHED 31071/pubsub        
    tcp        0      0 127.0.0.1:5443          127.0.0.1:36680         ESTABLISHED 31141/pubsub        
    tcp        0      0 127.0.0.1:5443          127.0.0.1:36678         ESTABLISHED 31141/pubsub
    期间netstat查看连接状态,发现Publisher启动后,两个Subscriber才建立起socket连接;

    四、结论

        本文实践了ZeroMQ的 PUB/SUB模式,该模式可以实现一对多的消息分发功能。在实际场景中使用,需要根据订阅者的数量、消息大小进行HWM设置,来保证消息可靠性、内存使用情况。


    参考文章:

    [1] http://zguide.zeromq.org/page:all#toc49

    [2] https://blog.csdn.net/yahohi/article/details/76231389


    展开全文
  • 最近考虑到一个问题,项目中有同时处理socket、zeromq的逻辑需求,想通过libevent(I/O服用)一块将zmq-socket的事件也放一个线程中处理。         网上了解了一些实现,大部分都是通过将zmq的...
  • Linux——ZMQ-zmq_socket

    千次阅读 2014-10-27 20:55:14
    客户端调用ZMQ发送函数,然后调用函数zmq_send(3),然后调用函数zmq_recv(3),这是一个循环(或者如果需要的话只调用一次)。使用别的任何序列(如在一行发送两个消息)都会出错。类似的,服务器按照先调用函数zmq_...
  • linuxzmq的安装

    2019-10-05 09:18:11
    export LIBRARY_PATH=$LIBRARY_PATH:yourLibFilePath // 当编译的时候找到zmq库,否则会无法识别-lzmq export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:yourLibFilePath//设置当动态链接的时候寻找库文件的路径 转载...
  • 一、背景 在C/S编程模式中,经常需要进行进程间消息传递,常用的模式是“请求-应答”方式,客户端通过发起请求,服务端进行处理再进行回复,如果使用socket去实现,难免还要实现消息的分包、连接状态的维护的功能。...
  • Linux——ZMQ环境搭建

    千次阅读 2014-10-27 17:11:56
    编译安装zmq:  $ ./configure  $ make  $ sudo make install  $ sudo ldconfig 3、拷贝动态库:cp libzmq.so libzmq.so.3 libzmq.so.3.0.0 /lib -R 4、简易Makefile: OBJ = server ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 2,703
精华内容 1,081
关键字:

linux使用zmq

linux 订阅