精华内容
下载资源
问答
  • pip install pika使用API操作RabbitMQ基于Queue实现生产者消费者模型View Code对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。#!/usr/bin/env ...

    pip install pika

    使用API操作RabbitMQ

    基于Queue实现生产者消费者模型

    View Code

    对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。#!/usr/bin/env python

    importpika########################## 生产者 #########################

    connection=pika.BlockingConnection(pika.ConnectionParameters(

    host='localhost'))

    channel=connection.channel()

    channel.queue_declare(queue='hello')

    channel.basic_publish(exchange='',

    routing_key='hello',

    body='Hello World!')print("[x] Sent 'Hello World!'")

    connection.close()#!/usr/bin/env python

    importpika########################### 消费者 ##########################

    connection=pika.BlockingConnection(pika.ConnectionParameters(

    host='localhost'))

    channel=connection.channel()

    channel.queue_declare(queue='hello')defcallback(ch, method, properties, body):print("[x] Received %r" %body)

    channel.basic_consume(callback,

    queue='hello',

    no_ack=True)print('[*] Waiting for messages. To exit press CTRL+C')

    channel.start_consuming()1、acknowledgment 消息不丢失

    no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection islost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。

    消费者2、durable 消息不丢失

    生产者

    消费者3、消息获取顺序

    默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。

    channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列

    消费者4、发布订阅

    发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

    exchange type=fanout

    发布者

    订阅者5、关键字发送

    exchange type=direct

    之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

    消费者

    生产者6、模糊匹配

    exchange type=topic

    在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。#表示可以匹配 0 个 或 多个 单词

    *表示只能匹配 一个 单词

    发送者路由值 队列中

    old.boy.python old.* --不匹配

    old.boy.python old.#-- 匹配

    消费者

    生产者

    展开全文
  • rabbitMQ是一款基于AMQP协议的消息中间件,它能够在应用之间提供可靠的消息传输。在易用性,扩展性,高可用性上表现优秀。使用消息中间件利于应用之间的解耦,生产者(客户端)无需知道消费者(服务端)的存在。而且...

    rabbitMQ是一款基于AMQP协议的消息中间件,它能够在应用之间提供可靠的消息传输。在易用性,扩展性,高可用性上表现优秀。使用消息中间件利于应用之间的解耦,生产者(客户端)无需知道消费者(服务端)的存在。而且两端可以使用不同的语言编写,大大提供了灵活性。

    2231313-e091f493c3f2b67e

    安装

    #安装配置epel源

    rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm#安装erlang

    yum -y install erlang#安装RabbitMQ

    yum -y install rabbitmq-server#启动/停止

    service rabbitmq-server start/stop

    rabbitMQ工作模型

    简单模式

    ContractedBlock.gif

    ExpandedBlockStart.gif

    importpika

    connection= pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))

    channel=connection.channel()

    channel.queue_declare(queue='hello')

    channel.basic_publish(exchange='',

    routing_key='hello',

    body='Hello World!')print("[x] Sent 'Hello World!'")

    connection.close()

    生产者

    ContractedBlock.gif

    ExpandedBlockStart.gif

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

    channel=connection.channel()

    channel.queue_declare(queue='hello')defcallback(ch, method, properties, body):print("[x] Received %r" %body)

    channel.basic_consume( callback,

    queue='hello',

    no_ack=True)print('[*] Waiting for messages. To exit press CTRL+C')

    channel.start_consuming()

    消费者

    相关参数

    1,no-ack = False

    如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。

    回调函数中的 ch.basic_ack(delivery_tag=method.delivery_tag)

    basic_comsume中的no_ack=False

    接收消息端应该这么写:

    importpika

    connection=pika.BlockingConnection(pika.ConnectionParameters(

    host='10.211.55.4'))

    channel=connection.channel()

    channel.queue_declare(queue='hello')defcallback(ch, method, properties, body):print("[x] Received %r" %body)importtime

    time.sleep(10)print 'ok'ch.basic_ack(delivery_tag=method.delivery_tag)

    channel.basic_consume(callback,

    queue='hello',

    no_ack=False)print('[*] Waiting for messages. To exit press CTRL+C')

    channel.start_consuming()

    2,durable :消息不丢失

    ContractedBlock.gif

    ExpandedBlockStart.gif

    importpika

    connection= pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))

    channel=connection.channel()#make message persistent

    channel.queue_declare(queue='hello', durable=True)

    channel.basic_publish(exchange='',

    routing_key='hello',

    body='Hello World!',

    properties=pika.BasicProperties(

    delivery_mode=2, #make message persistent

    ))print("[x] Sent 'Hello World!'")

    connection.close()

    生产者

    ContractedBlock.gif

    ExpandedBlockStart.gif

    importpika

    connection= pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))

    channel=connection.channel()#make message persistent

    channel.queue_declare(queue='hello', durable=True)defcallback(ch, method, properties, body):print("[x] Received %r" %body)importtime

    time.sleep(10)print 'ok'ch.basic_ack(delivery_tag=method.delivery_tag)

    channel.basic_consume(callback,

    queue='hello',

    no_ack=False)print('[*] Waiting for messages. To exit press CTRL+C')

    channel.start_consuming()

    消费者

    3,消息获取顺序

    默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。

    channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列

    importpika

    connection= pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))

    channel=connection.channel()#make message persistent

    channel.queue_declare(queue='hello')defcallback(ch, method, properties, body):print("[x] Received %r" %body)importtime

    time.sleep(10)print 'ok'ch.basic_ack(delivery_tag=method.delivery_tag)

    channel.basic_qos(prefetch_count=1)

    channel.basic_consume(callback,

    queue='hello',

    no_ack=False)print('[*] Waiting for messages. To exit press CTRL+C')

    channel.start_consuming()

    exchange模型

    1,发布订阅

    425762-20160717140730998-2143093474.png

    发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

    exchange type = fanout

    ContractedBlock.gif

    ExpandedBlockStart.gif

    importpikaimportsys

    connection=pika.BlockingConnection(pika.ConnectionParameters(

    host='localhost'))

    channel=connection.channel()

    channel.exchange_declare(exchange='logs',

    type='fanout')

    message= ' '.join(sys.argv[1:]) or "info: Hello World!"channel.basic_publish(exchange='logs',

    routing_key='',

    body=message)print("[x] Sent %r" %message)

    connection.close()

    生产者

    ContractedBlock.gif

    ExpandedBlockStart.gif

    importpika

    connection=pika.BlockingConnection(pika.ConnectionParameters(

    host='localhost'))

    channel=connection.channel()

    channel.exchange_declare(exchange='logs',

    type='fanout')

    result= channel.queue_declare(exclusive=True)

    queue_name=result.method.queue

    channel.queue_bind(exchange='logs',

    queue=queue_name)print('[*] Waiting for logs. To exit press CTRL+C')defcallback(ch, method, properties, body):print("[x] %r" %body)

    channel.basic_consume(callback,

    queue=queue_name,

    no_ack=True)

    channel.start_consuming()

    消费则

    2,关键字发送

    425762-20160717140748795-1181706200.png

    之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

    exchange type = direct

    ContractedBlock.gif

    ExpandedBlockStart.gif

    importpikaimportsys

    connection=pika.BlockingConnection(pika.ConnectionParameters(

    host='localhost'))

    channel=connection.channel()

    channel.exchange_declare(exchange='direct_logs',

    type='direct')

    result= channel.queue_declare(exclusive=True)

    queue_name=result.method.queue

    severities= sys.argv[1:]if notseverities:

    sys.stderr.write("Usage: %s [info] [warning] [error]\n" %sys.argv[0])

    sys.exit(1)for severity inseverities:

    channel.queue_bind(exchange='direct_logs',

    queue=queue_name,

    routing_key=severity)print('[*] Waiting for logs. To exit press CTRL+C')defcallback(ch, method, properties, body):print("[x] %r:%r" %(method.routing_key, body))

    channel.basic_consume(callback,

    queue=queue_name,

    no_ack=True)

    channel.start_consuming()

    View Code

    3,模糊匹配

    425762-20160717140807232-1395723247.png

    exchange type = topic

    发送者路由值 队列中

    old.boy.python old.* --不匹配

    old.boy.python old.#-- 匹配

    在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

    # 表示可以匹配 0 个 或 多个 单词

    * 表示只能匹配 一个 单词

    ContractedBlock.gif

    ExpandedBlockStart.gif

    importpikaimportsys

    connection=pika.BlockingConnection(pika.ConnectionParameters(

    host='localhost'))

    channel=connection.channel()

    channel.exchange_declare(exchange='topic_logs',

    type='topic')

    result= channel.queue_declare(exclusive=True)

    queue_name=result.method.queue

    binding_keys= sys.argv[1:]if notbinding_keys:

    sys.stderr.write("Usage: %s [binding_key]...\n" %sys.argv[0])

    sys.exit(1)for binding_key inbinding_keys:

    channel.queue_bind(exchange='topic_logs',

    queue=queue_name,

    routing_key=binding_key)print('[*] Waiting for logs. To exit press CTRL+C')defcallback(ch, method, properties, body):print("[x] %r:%r" %(method.routing_key, body))

    channel.basic_consume(callback,

    queue=queue_name,

    no_ack=True)

    channel.start_consuming()

    栗子

    基于rabbitMQ的RPC

    Callback queue 回调队列

    一个客户端向服务器发送请求,服务器端处理请求后,将其处理结果保存在一个存储体中。而客户端为了获得处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址 reply_to。

    Correlation id 关联标识

    一个客户端可能会发送多个请求给服务器,当服务器处理完后,客户端无法辨别在回调队列中的响应具体和那个请求时对应的。为了处理这种情况,客户端在发送每个请求时,同时会附带一个独有correlation_id属性,这样客户端在回调队列中根据correlation_id字段的值就可以分辨此响应属于哪个请求。

    客户端发送请求:

    某个应用将请求信息交给客户端,然后客户端发送RPC请求,在发送RPC请求到RPC请求队列时,客户端至少发送带有reply_to以及correlation_id两个属性的信息

    服务端工作流:

    等待接受客户端发来RPC请求,当请求出现的时候,服务器从RPC请求队列中取出请求,然后处理后,将响应发送到reply_to指定的回调队列中

    客户端接受处理结果:

    客户端等待回调队列中出现响应,当响应出现时,它会根据响应中correlation_id字段的值,将其返回给对应的应用

    展开全文
  • 引入RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。rabbitMQ是一款基于AMQP协议的消息中间件,它能够在应用之间提供可靠的消息传输。在易用性,扩展性,高可用性上表现优秀。使用消息中间件利于应用之间的...

    引入

    RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。

    rabbitMQ是一款基于AMQP协议的消息中间件,它能够在应用之间提供可靠的消息传输。在易用性,扩展性,高可用性上表现优秀。使用消息中间件利于应用之间的解耦,生产者(客户端)无需知道消费者(服务端)的存在。而且两端可以使用不同的语言编写,大大提供了灵活性。

    2019031910040622.jpg

    安装

    # 安装配置epel源

    rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm

    # 安装erlang

    yum -y install erlang

    # 安装RabbitMQ

    yum -y install rabbitmq-server

    # 启动/停止

    service rabbitmq-server start/stop

    rabbitMQ工作模型

    简单模式

    生产者

    import pika

    connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))

    channel = connection.channel()

    channel.queue_declare(queue='hello')

    channel.basic_publish(exchange='',

    routing_key='hello',

    body='Hello World!')

    print(" [x] Sent 'Hello World!'")

    connection.close()

    消费者

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

    channel = connection.channel()

    channel.queue_declare(queue='hello')

    def callback(ch, method, properties, body):

    print(" [x] Received %r" % body)

    channel.basic_consume( callback,

    queue='hello',

    no_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')

    channel.start_consuming()

    相关参数

    1,no-ack = False

    如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。

    回调函数中的 ch.basic_ack(delivery_tag=method.delivery_tag)

    basic_comsume中的no_ack=False

    接收消息端应该这么写:

    import pika

    connection = pika.BlockingConnection(pika.ConnectionParameters(

    host='10.211.55.4'))

    channel = connection.channel()

    channel.queue_declare(queue='hello')

    def callback(ch, method, properties, body):

    print(" [x] Received %r" % body)

    import time

    time.sleep(10)

    print 'ok'

    ch.basic_ack(delivery_tag = method.delivery_tag)

    channel.basic_consume(callback,

    queue='hello',

    no_ack=False)

    print(' [*] Waiting for messages. To exit press CTRL+C')

    channel.start_consuming()

    2,durable :消息不丢失

    生产者

    import pika

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))

    channel = connection.channel()

    # make message persistent

    channel.queue_declare(queue='hello', durable=True)

    channel.basic_publish(exchange='',

    routing_key='hello',

    body='Hello World!',

    properties=pika.BasicProperties(

    delivery_mode=2, # make message persistent

    ))

    print(" [x] Sent 'Hello World!'")

    connection.close()

    3,消息获取顺序

    默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。

    channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列

    import pika

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))

    channel = connection.channel()

    # make message persistent

    channel.queue_declare(queue='hello')

    def callback(ch, method, properties, body):

    print(" [x] Received %r" % body)

    import time

    time.sleep(10)

    print 'ok'

    ch.basic_ack(delivery_tag = method.delivery_tag)

    channel.basic_qos(prefetch_count=1)

    channel.basic_consume(callback,

    queue='hello',

    no_ack=False)

    print(' [*] Waiting for messages. To exit press CTRL+C')

    channel.start_consuming()

    exchange模型

    1,发布订阅

    2019031910040623.png

    发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

    exchange type = fanout

    生产者

    import pika

    import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters(

    host='localhost'))

    channel = connection.channel()

    channel.exchange_declare(exchange='logs',

    type='fanout')

    message = ' '.join(sys.argv[1:]) or "info: Hello World!"

    channel.basic_publish(exchange='logs',

    routing_key='',

    body=message)

    print(" [x] Sent %r" % message)

    connection.close()

    消费者

    import pika

    connection = pika.BlockingConnection(pika.ConnectionParameters(

    host='localhost'))

    channel = connection.channel()

    channel.exchange_declare(exchange='logs',

    type='fanout')

    result = channel.queue_declare(exclusive=True)

    queue_name = result.method.queue

    channel.queue_bind(exchange='logs',

    queue=queue_name)

    print(' [*] Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):

    print(" [x] %r" % body)

    channel.basic_consume(callback,

    queue=queue_name,

    no_ack=True)

    channel.start_consuming()

    2,关键字发送

    2019031910040624.png

    之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

    exchange type = direct

    import pika

    import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters(

    host='localhost'))

    channel = connection.channel()

    channel.exchange_declare(exchange='direct_logs',

    type='direct')

    result = channel.queue_declare(exclusive=True)

    queue_name = result.method.queue

    severities = sys.argv[1:]

    if not severities:

    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])

    sys.exit(1)

    for severity in severities:

    channel.queue_bind(exchange='direct_logs',

    queue=queue_name,

    routing_key=severity)

    print(' [*] Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):

    print(" [x] %r:%r" % (method.routing_key, body))

    channel.basic_consume(callback,

    queue=queue_name,

    no_ack=True)

    channel.start_consuming()

    3,模糊匹配

    2019031910040725.png

    exchange type = topic

    发送者路由值 队列中

    old.boy.python old.* -- 不匹配

    old.boy.python old.# -- 匹配

    在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

    # 表示可以匹配 0 个 或 多个 单词

    * 表示只能匹配 一个 单词

    import pika

    import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters(

    host='localhost'))

    channel = connection.channel()

    channel.exchange_declare(exchange='topic_logs',

    type='topic')

    result = channel.queue_declare(exclusive=True)

    queue_name = result.method.queue

    binding_keys = sys.argv[1:]

    if not binding_keys:

    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])

    sys.exit(1)

    for binding_key in binding_keys:

    channel.queue_bind(exchange='topic_logs',

    queue=queue_name,

    routing_key=binding_key)

    print(' [*] Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):

    print(" [x] %r:%r" % (method.routing_key, body))

    channel.basic_consume(callback,

    queue=queue_name,

    no_ack=True)

    channel.start_consuming()

    基于rabbitMQ的RPC

    Callback queue 回调队列

    一个客户端向服务器发送请求,服务器端处理请求后,将其处理结果保存在一个存储体中。而客户端为了获得处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址 reply_to 。

    Correlation id 关联标识

    一个客户端可能会发送多个请求给服务器,当服务器处理完后,客户端无法辨别在回调队列中的响应具体和那个请求时对应的。为了处理这种情况,客户端在发送每个请求时,同时会附带一个独有 correlation_id 属性,这样客户端在回调队列中根据 correlation_id 字段的值就可以分辨此响应属于哪个请求。

    客户端发送请求:

    某个应用将请求信息交给客户端,然后客户端发送RPC请求,在发送RPC请求到RPC请求队列时,客户端至少发送带有reply_to以及correlation_id两个属性的信息

    服务端工作流:

    等待接受客户端发来RPC请求,当请求出现的时候,服务器从RPC请求队列中取出请求,然后处理后,将响应发送到reply_to指定的回调队列中

    客户端接受处理结果:

    客户端等待回调队列中出现响应,当响应出现时,它会根据响应中correlation_id字段的值,将其返回给对应的应用

    服务者

    import pika

    # 建立连接,服务器地址为localhost,可指定ip地址

    connection = pika.BlockingConnection(pika.ConnectionParameters(

    host='localhost'))

    # 建立会话

    channel = connection.channel()

    # 声明RPC请求队列

    channel.queue_declare(queue='rpc_queue')

    # 数据处理方法

    def fib(n):

    if n == 0:

    return 0

    elif n == 1:

    return 1

    else:

    return fib(n-1) + fib(n-2)

    # 对RPC请求队列中的请求进行处理

    def on_request(ch, method, props, body):

    n = int(body)

    print(" [.] fib(%s)" % n)

    # 调用数据处理方法

    response = fib(n)

    # 将处理结果(响应)发送到回调队列

    ch.basic_publish(exchange='',

    routing_key=props.reply_to,

    properties=pika.BasicProperties(correlation_id = \

    props.correlation_id),

    body=str(response))

    ch.basic_ack(delivery_tag = method.delivery_tag)

    # 负载均衡,同一时刻发送给该服务器的请求不超过一个

    channel.basic_qos(prefetch_count=1)

    channel.basic_consume(on_request, queue='rpc_queue')

    print(" [x] Awaiting RPC requests")

    channel.start_consuming()

    客户端

    import pika

    import uuid

    class FibonacciRpcClient(object):

    def __init__(self):

    """

    客户端启动时,创建回调队列,会开启会话用于发送RPC请求以及接受响应

    """

    # 建立连接,指定服务器的ip地址

    self.connection = pika.BlockingConnection(pika.ConnectionParameters(

    host='localhost'))

    # 建立一个会话,每个channel代表一个会话任务

    self.channel = self.connection.channel()

    # 声明回调队列,再次声明的原因是,服务器和客户端可能先后开启,该声明是幂等的,多次声明,但只生效一次

    result = self.channel.queue_declare(exclusive=True)

    # 将次队列指定为当前客户端的回调队列

    self.callback_queue = result.method.queue

    # 客户端订阅回调队列,当回调队列中有响应时,调用`on_response`方法对响应进行处理;

    self.channel.basic_consume(self.on_response, no_ack=True,

    queue=self.callback_queue)

    # 对回调队列中的响应进行处理的函数

    def on_response(self, ch, method, props, body):

    if self.corr_id == props.correlation_id:

    self.response = body

    # 发出RPC请求

    def call(self, n):

    # 初始化 response

    self.response = None

    #生成correlation_id

    self.corr_id = str(uuid.uuid4())

    # 发送RPC请求内容到RPC请求队列`rpc_queue`,同时发送的还有`reply_to`和`correlation_id`

    self.channel.basic_publish(exchange='',

    routing_key='rpc_queue',

    properties=pika.BasicProperties(

    reply_to = self.callback_queue,

    correlation_id = self.corr_id,

    ),

    body=str(n))

    while self.response is None:

    self.connection.process_data_events()

    return int(self.response)

    # 建立客户端

    fibonacci_rpc = FibonacciRpcClient()

    # 发送RPC请求

    print(" [x] Requesting fib(30)")

    response = fibonacci_rpc.call(30)

    print(" [.] Got %r" % response)

    以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

    展开全文
  • 一、rabbitmqRabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。...消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼...

    一、rabbitmq

    RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

    MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

    1.1 安装rabbitmq

    RabbitMQ安装

    1

    2

    3

    4

    5

    6

    7

    8

    安装配置epel源

    $ rpm-ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm

    安装erlang

    $ yum-y install erlang

    安装RabbitMQ

    $ yum-y install rabbitmq-server

    注意:service rabbitmq-server start/stop

    安装API:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    pip install pika

    or

    easy_install pika

    or

    源码

    or

    pycharm

    https://pypi.python.org/pypi/pika

    1.3 用python操作rabbitmq

    1.3.1 基于Queue实现生产者消费者模型

    ExpandedBlockStart.gif

    copycode.gif

    #!/usr/bin/env python

    # -*- coding:utf-8 -*-

    import Queue

    import threading

    message = Queue.Queue(10)

    def producer(i):

    while True:

    message.put(i)

    def consumer(i):

    while True:

    msg = message.get()

    for i in range(12):

    t = threading.Thread(target=producer, args=(i,))

    t.start()

    for i in range(10):

    t = threading.Thread(target=consumer, args=(i,))

    t.start()

    copycode.gif

    1.3.2 rabbitmq实现消息队列

    对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。

    先运行消费者脚本,让它监听队列消息,然后运行生产者脚本,生产者往队列里发消息。然后消费者往队列里取消息。

    copycode.gif

    import pika

    # ########################### 消费者 ###########################

    connection = pika.BlockingConnection(pika.ConnectionParameters(

    host='192.168.137.208'))

    channel = connection.channel()

    channel.queue_declare(queue='abc') # 如果队列没有创建,就创建这个队列

    def callback(ch, method, propertities,body):

    print(" [x] Received %r" % body)

    channel.basic_consume(callback,

    queue='abc', # 队列名

    no_ack=True) # 不通知已经收到,如果连接中断可能消息丢失

    print(' [*] Waiting for message. To exit press CTRL+C')

    channel.start_consuming()

    copycode.gif

    copycode.gif

    import pika

    # ############################## 生产者 ##############################

    connection = pika.BlockingConnection(pika.ConnectionParameters(

    host='192.168.137.208'

    ))

    channel = connection.channel()

    channel.queue_declare(queue='abc') # 如果队列没有创建,就创建这个队列

    channel.basic_publish(exchange='',

    routing_key='abc', # 指定队列的关键字为,这里是队列的名字

    body='Hello World!') # 往队列里发的消息内容

    print(" [x] Sent 'Hello World!'")

    connection.close()

    copycode.gif

    先运行消费者,然后再运行生产者:

    ExpandedBlockStart.gif

    copycode.gif

    '''

    打印:

    生产者:

    [x] Sent 'Hello World!'

    消费者:

    [*] Waiting for message. To exit press CTRL+C

    [x] Received b'Hello World!'

    '''

    copycode.gif

    1.4 no-ack=False:rabbitmq消费者连接断了 消息不丢失

    rabbitmq支持一种方式:应答。比如我从消息里拿一条消息,如果全处理完,你就不要帮我记着了。如果没处理完,突然断开了,再连接上的时候,消息队列就会重新发消息。

    总结:

    Basic.Ack 发回给 RabbitMQ 以告知,可以将相应 message 从 RabbitMQ 的消息缓存中移除。

    Basic.Ack 未被 consumer 发回给 RabbitMQ 前出现了异常,RabbitMQ 发现与该 consumer 对应的连接被断开,之后将该 message 以轮询方式发送给其他 consumer (假设存在多个 consumer 订阅同一个 queue)。

    在 no_ack=true 的情况下,RabbitMQ 认为 message 一旦被 deliver 出去了,就已被确认了,所以会立即将缓存中的 message 删除。所以在 consumer 异常时会导致消息丢失。

    来自 consumer 侧的 Basic.Ack 与 发送给 Producer 侧的 Basic.Ack 没有直接关系

    注意:

    1)只有在Consumer(消费者)断开连接时,RabbitMQ才会重新发送未经确认的消息。

    2)超时的情况并未考虑:无论Consumer需要处理多长时间,RabbitMQ都不会重发消息。

    消息不丢失的关键代码:

    1)在接收端的callback最后:

    channel.basic_ack(delivery_tag=method.delivery_tag)

    1

    2

    3

    ack即acknowledge(承认,告知已收到)

    也就是消费者每次收到消息,要通知一声:已经收到,如果消费者连接断了,rabbitmq会重新把消息放到队列里,下次消费者可以连接的时候,就能重新收到丢失消息。

    A message MUSTnot be acknowledged morethan once. The receiving peer MUST validate that a non-zero delivery-tag refersto a delivered message,
    and raise a channel exceptionif thisis not the case.

    2)除了callback函数,还要在之前设置接收消息时指定no_ack(默认False):

    channel.basic_consume(callback, queue='hello', no_ack=False)

    消费者:

    copycode.gif

    import pika

    # ########################### 消费者 ##########################

    connection = pika.BlockingConnection(pika.ConnectionParameters(

    host='10.211.55.4'))

    channel = connection.channel()

    channel.queue_declare(queue='hello')

    def callback(ch, method, properties, body):

    print(" [x] Received %r" % body)

    import time

    time.sleep(10)

    print('ok')

    ch.basic_ack(delivery_tag=method.delivery_tag)

    channel.basic_consume(callback,

    queue='hello',

    no_ack=False)

    print(' [*] Waiting for messages. To exit press CTRL+C')

    channel.start_consuming()

    copycode.gif

    消费者断掉连接,再次连接,消息还会收到。

    1.5 durable:rabbitmq服务端宕机 消息不丢失

    发数据的时候,就说了:我这条数据要持久化保存。

    如果rabbitmq服务端机器如果挂掉了,会给这台机器做持久化。如果启动机器后,消息队列还在。

    生产者.py:

    copycode.gif

    import pika

    # ############################## 生产者 ##############################

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))

    channel = connection.channel()

    # make message persistent

    channel.queue_declare(queue='hello', durable=True)

    channel.basic_publish(exchange='',

    routing_key='hello',

    body='Hello World!',

    properties=pika.BasicProperties(

    delivery_mode=2, # make message persistent

    ))print(" [x] Sent 'Hello World!'")

    connection.close()

    copycode.gif

    消费者.py:

    copycode.gif

    import pika

    # ########################### 消费者 ###########################

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))

    channel = connection.channel()

    # make message persistent

    channel.queue_declare(queue='hello', durable=True)

    def callback(ch, method, properties, body):

    print(" [x] Received %r" % body)

    import time

    time.sleep(10)

    print('ok')

    ch.basic_ack(delivery_tag = method.delivery_tag)

    channel.basic_consume(callback,

    queue='hello',

    no_ack=False)

    print(' [*] Waiting for messages. To exit press CTRL+C')

    channel.start_consuming()

    copycode.gif

    测试:

    1)把生产者.py执行三次。

    2)然后在linux上停掉rabbitmq服务,然后再开启rabbitmq服务

    1

    2

    3

    4

    5

    6

    [root@localhost ~]# /etc/init.d/rabbitmq-server stop

    Stopping rabbitmq-server: rabbitmq-server.

    [root@localhost ~]# /etc/init.d/rabbitmq-server start

    Starting rabbitmq-server: SUCCESS

    rabbitmq-server.

    3)运行:消费者.py:三条消息都打印了:

    1

    2

    3

    4

    5

    6

    7

    [*] Waitingfor messages. To exit press CTRL+C

    [x] Received b'Hello World!'

    ok

    [x] Received b'Hello World!'

    ok

    [x] Received b'Hello World!'

    ok

    1.6 消息获取顺序

    默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。

    因为默认是跳着取得。第一个消费者取得很快,已经执行到20了,但是第二个消费者只取到13,可能消息执行的顺序就有问题了。

    如果多个消费者,如果不想跳着取,就按消息的顺序取,而不是按着自己的间隔了。

    channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列

    copycode.gif

    #!/usr/bin/env python

    # -*- coding:utf-8 -*-

    __author__ = 'WangQiaomei'

    import pika

    # ########################### 消费者 ###########################

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.137.208'))

    channel = connection.channel()

    # make message persistent

    channel.queue_declare(queue='hello1')

    def callback(ch, method, properties, body):

    print(" [x] Received %r" % body)

    import time

    time.sleep(10)

    print('ok')

    ch.basic_ack(delivery_tag = method.delivery_tag)

    channel.basic_qos(prefetch_count=1)

    channel.basic_consume(callback,

    queue='hello1',

    no_ack=False)

    print(' [*] Waiting for messages. To exit press CTRL+C')

    channel.start_consuming()

    copycode.gif

    1.7发布订阅

    发布订阅原理:

    1)发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。

    2)所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

    3)exchange 可以帮你发消息到多个队列!type设为什么值,就把消息发给哪些队列。

    发布订阅应用到监控上:

    模板就是写上一段脚本,放在服务器上,

    客户端每5分钟,从服务端拿到监控模板,根据模板来取数据,

    然后把数据结果发步到服务端的redis频道里。

    服务端收到数据,1)处理历史记录 2)报警 3)dashboard显示监控信息

    服务端有三处一直来订阅服务端频道(一直来收取客户端监控数据)

    1.7.1 发布给所有绑定队列

    exchange type = fanout

    exchange 可以帮你发消息到多个队列,type = fanout表示:跟exchange绑定的所有队列,都会收到消息。

    958407-20160729164334669-873173505.png

    发布者:

    copycode.gif

    #!/usr/bin/env python

    # -*- coding: utf-8 -*-

    __author__ = 'WangQiaomei

    import pika

    import sys

    # ########################### 发布者 ###########################

    connection = pika.BlockingConnection(pika.ConnectionParameters(

    host='192.168.137.208'))

    channel = connection.channel()

    channel.exchange_declare(exchange='logs',

    type='fanout')

    message = ' '.join(sys.argv[1:]) or "info: Hello World!"

    channel.basic_publish(exchange='logs',

    routing_key='',

    body=message)

    print(" [x] Sent %r" % message)

    connection.close()

    copycode.gif

    订阅者:

    copycode.gif

    #!/usr/bin/env python

    # -*- coding: utf-8 -*-

    __author__ = 'WangQiaomei'

    import pika

    # ########################### 订阅者 ###########################

    connection = pika.BlockingConnection(pika.ConnectionParameters(

    host='192.168.137.208'))

    channel = connection.channel()

    channel.exchange_declare(exchange='logs',

    type='fanout')

    # 随机创建队列

    result = channel.queue_declare(exclusive=True)

    queue_name = result.method.queue

    # 绑定

    channel.queue_bind(exchange='logs',

    queue=queue_name)

    print(' [*] Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):

    print(" [x] %r" % body)

    channel.basic_consume(callback,

    queue=queue_name,

    no_ack=True)

    channel.start_consuming()

    '''

    多次执行这个文件,就会随机生成多个队列。并且exchange都绑定这些队列。

    然后发布者只需要给exchange发送消息,然后exchange绑定的多个队列都有这个消息了。订阅者就收到这个消息了。

    '''

    copycode.gif

    1.7.2关键字发送

    一个队列还可以绑定多个关键字

    958407-20160729170459559-1350792192.png

    对一个随机队列,绑定三个关键字

    再次执行,对另一个随机队列,只绑定一个关键字。

    消费者:每执行一次可以生成一个队列。通过使用命令行传参的方式,来传入队列的关键字。

    copycode.gif

    #!/usr/bin/env python

    import pika

    import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters(

    host='localhost'))

    channel = connection.channel()

    channel.exchange_declare(exchange='direct_logs',

    type='direct')

    result = channel.queue_declare(exclusive=True)

    queue_name = result.method.queue

    severities:]if not severities:

    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])

    sys.exit(1)

    for severity in severities:

    channel.queue_bind(exchange='direct_logs',

    queue=queue_name,

    routing_key=severity)

    print(' [*] Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):

    print(" [x] %r:%r" % (method.routing_key, body))

    channel.basic_consume(callback,

    queue=queue_name,

    no_ack=True)

    channel.start_consuming()

    copycode.gif

    容易测试的版本:

    消费者1:

    copycode.gif

    #!/usr/bin/env python

    # -*- coding: utf-8 -*-

    __author__ = 'WangQiaomei'

    import pika

    import sys

    # ########################### 消费者1 ###########################

    connection = pika.BlockingConnection(pika.ConnectionParameters(

    host='192.168.137.208'))

    channel = connection.channel()

    channel.exchange_declare(exchange='direct_logs',

    type='direct')

    result = channel.queue_declare(exclusive=True) # 随机生成队列

    queue_name = result.method.queue

    severities]if not severities:

    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])

    sys.exit(1)

    for severity in severities:

    channel.queue_bind(exchange='direct_logs',

    queue=queue_name,

    routing_key=severity)

    print(' [*] Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):

    print(" [x] %r:%r" % (method.routing_key, body))

    channel.basic_consume(callback,

    queue=queue_name,

    no_ack=True)

    channel.start_consuming()

    copycode.gif

    消费者2:

    copycode.gif

    #!/usr/bin/env python

    # -*- coding: utf-8 -*-

    __author__ = 'WangQiaomei'

    import pika

    import sys

    # ########################### 消费者2 ###########################

    connection = pika.BlockingConnection(pika.ConnectionParameters(

    host='192.168.137.208'))

    channel = connection.channel()

    channel.exchange_declare(exchange='direct_logs',

    type='direct')

    result = channel.queue_declare(exclusive=True) # 随机生成队列

    queue_name = result.method.queue

    severities]for severity in severities:

    channel.queue_bind(exchange='direct_logs',

    queue=queue_name,

    routing_key=severity)

    print(' [*] Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):

    print(" [x] %r:%r" % (method.routing_key, body))

    channel.basic_consume(callback,

    queue=queue_name,

    no_ack=True)

    channel.start_consuming()

    copycode.gif

    生产者:

    copycode.gif

    #!/usr/bin/env python

    # -*- coding: utf-8 -*-

    __author__ = 'WangQiaomei'

    import pika

    import sys

    # ############################## 生产者 ##############################

    connection = pika.BlockingConnection(pika.ConnectionParameters(

    host='192.168.137.208'))

    channel = connection.channel()

    channel.exchange_declare(exchange='direct_logs',

    type='direct')

    severity= 'info'

    message = 'Hello World!'

    channel.basic_publish(exchange='direct_logs',

    routing_key=severity,

    body=message)

    print(" [x] Sent %r:%r" % (severity, message))

    connection.close()

    '''

    同时运行消费者1,消费者2,然后修改生产者的关键字,运行生产者。

    当生产者:severity = 'info',则消费者1收到消息,消费者2没收到消息

    当生产者:severity = 'error',则消费者1、消费者2 都收到消息

    '''

    copycode.gif

    1.7.2模糊匹配

    958407-20160729174021919-1441740600.png

    exchange type = topic

    在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

    # 表示可以匹配 0 个 或 多个 字符

    * 表示只能匹配 一个 任意字符

    1

    2

    3

    发送者路由值 队列中

    old.boy.python old.* -- 不匹配

    old.boy.python old.# -- 匹配

    消费者:

    copycode.gif

    #!/usr/bin/env python

    import pika

    import sys

    # ############################## 消费者 ##############################

    connection = pika.BlockingConnection(pika.ConnectionParameters(

    host='192.168.137.208'))

    channel = connection.channel()

    channel.exchange_declare(exchange='topic_logs',

    type='topic')

    result = channel.queue_declare(exclusive=True)

    queue_name = result.method.queue

    binding_keys = "*.orange.*"

    for binding_key in binding_keys:

    channel.queue_bind(exchange='topic_logs',

    queue=queue_name,

    routing_key=binding_key)

    print(' [*] Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):

    print(" [x] %r:%r" % (method.routing_key, body))

    channel.basic_consume(callback,

    queue=queue_name,

    no_ack=True)

    channel.start_consuming()

    copycode.gif

    生产者:

    copycode.gif

    #!/usr/bin/env python

    import pika

    import sys

    # ############################## 生产者 ##############################

    connection = pika.BlockingConnection(pika.ConnectionParameters(

    host='192.168.137.208'))

    channel = connection.channel()

    channel.exchange_declare(exchange='topic_logs',

    type='topic')

    # routing_key = 'abc.new.qiaomei.old'

    routing_key = 'neworangeold'

    message = 'Hello World!'

    channel.basic_publish(exchange='topic_logs',

    routing_key=routing_key,

    body=message)

    print(" [x] Sent %r:%r" % (routing_key, message))

    connection.close()

    '''

    #.orange.# 匹配:new.orange.old neworangeold

    *.orange.* 匹配:neworangeold,不匹配:new.orange.old

    '''

    copycode.gif

    1.8 saltstack原理实现

    saltstack:zeromq:放到内存里的,会更快,会基于这个做rcp

    openstack:大量使用:rabbitmq

    saltstack上有master,有三个队列。,让三个客户端每个人取一个队列的任务

    saltstack的原理:

    1)发一条命令ifconfig,想让所有nginx主机组的机器,都执行。

    2)在master我们可以发命令给exchange,nginx总共有10台服务器,创建10个带有nginx关键字的10个队列,

    3)master随机生成队列,md5是一个队列的名字,exchange把命令和md5这个消息推送到nginx关键字的队列里。

    4)nginx10台服务器从队列中取出消息,执行命令,并且把主机名和执行的结果返回给这个队列里。

    5)master变为消费者,取出队列里的主机名和执行结果,并打印到终端上。

    服务器1:

    copycode.gif

    #!/usr/bin/env python

    # -*- coding: utf-8 -*-

    __author__ = 'WangQiaomei'

    import pika

    import sys

    # ########################### 消费者1 ###########################

    connection = pika.BlockingConnection(pika.ConnectionParameters(

    host='192.168.137.208'))

    channel = connection.channel()

    channel.exchange_declare(exchange='direct_logs',

    type='direct')

    result = channel.queue_declare(exclusive=True) # 随机生成队列

    queue_name = result.method.queue

    severities = ]if not severities:

    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])

    sys.exit(1)

    for severity in severities:

    channel.queue_bind(exchange='direct_logs',

    queue=queue_name,

    routing_key=severity)

    print(' [*] Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):

    print(" [x] %r:%r" % (method.routing_key, body))

    queue_md5=body.decode().split(",")[1]

    hostname = 'nginx1'

    channel.queue_declare(queue=queue_md5) # 如果队列没有创建,就创建这个队列

    channel.basic_publish(exchange='',

    routing_key=queue_md5, # 指定队列的关键字为,这里是队列的名字

    body='%s|cmd_result1' %hostname) # 往队列里发的消息内容

    channel.basic_consume(callback,

    queue=queue_name,

    no_ack=True)

    channel.start_consuming()

    copycode.gif

    服务器2:

    copycode.gif

    #!/usr/bin/env python

    # -*- coding: utf-8 -*-

    __author__ = 'WangQiaomei'

    import pika

    import sys

    # ########################### 消费者2 ###########################

    connection = pika.BlockingConnection(pika.ConnectionParameters(

    host='192.168.137.208'))

    channel = connection.channel()

    channel.exchange_declare(exchange='direct_logs',

    type='direct')

    result = channel.queue_declare(exclusive=True) # 随机生成队列

    queue_name = result.method.queue

    severities = ["nginx"]

    for severity in severities:

    channel.queue_bind(exchange='direct_logs',

    queue=queue_name,

    routing_key=severity)

    print(' [*] Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):

    print(" [x] %r:%r" % (method.routing_key, body))

    queue_md5=body.decode().split(",")[1]

    hostname = 'nginx2'

    channel.queue_declare(queue=queue_md5) # 如果队列没有创建,就创建这个队列

    channel.basic_publish(exchange='',

    routing_key=queue_md5, # 指定队列的关键字为,这里是队列的名字

    body='%s|cmd_result2' %hostname) # 往队列里发的消息内容

    channel.basic_consume(callback,

    queue=queue_name,

    no_ack=True)

    channel.start_consuming()

    copycode.gif

    master:

    copycode.gif

    #!/usr/bin/env python

    # -*- coding: utf-8 -*-

    __author__ = 'WangQiaomei'

    import pika

    import sys

    import hashlib

    # ############################## 生产者 ##############################

    connection = pika.BlockingConnection(pika.ConnectionParameters(

    host='192.168.137.208'))

    channel = connection.channel()

    channel.exchange_declare(exchange='direct_logs',

    type='direct')

    severity = 'nginx'

    m2 = hashlib.md5()

    m2.update(severity.encode('utf-8'))

    md5_security=m2.hexdigest()

    print('md5_security:',md5_security)

    message = 'cmd,%s' % md5_security

    channel.basic_publish(exchange='direct_logs',

    routing_key=severity,

    body=message)

    print(" [x] Sent %r:%r" % (severity, message))

    connection.close()

    #################################3

    connection = pika.BlockingConnection(pika.ConnectionParameters(

    host='192.168.137.208'))

    channel = connection.channel()

    channel.queue_declare(queue=md5_security) # 如果队列没有创建,就创建这个队列

    def callback(ch, method, propertities,body):

    print(" [x] Received %r" % body)

    channel.basic_consume(callback,

    queue=md5_security, # 队列名

    no_ack=True) # 不通知已经收到,如果连接中断消息就丢失

    print(' [*] Waiting for message. To exit press CTRL+C')

    channel.start_consuming()

    copycode.gif

    打印:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    '''

    服务器1,和服务器2都打印:

    [*] Waiting for logs. To exit press CTRL+C

    [x] 'nginx':b'cmd,ee434023cf89d7dfb21f63d64f0f9d74'

    master打印:

    md5_security: ee434023cf89d7dfb21f63d64f0f9d74

    [x] Sent 'nginx':'cmd,ee434023cf89d7dfb21f63d64f0f9d74'

    [*] Waiting for message. To exit press CTRL+C

    [x] Received b'nginx2|cmd_result2'

    [x] Received b'nginx1|cmd_result1'

    '''

    展开全文
  • RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息...
  • python操作rabbitmq操作数据

    千次阅读 2018-08-27 14:55:13
    RabbitMQ也是消息队列,那RabbitMQ和之前python的Queue有什么区别么? py 消息队列: 线程 queue(同一进程下线程之间进行交互) 进程 Queue(父子进程进行交互 或者 同属于同一进程下的多个子进程进行交互)...
  • 先说一下笔者这里的测试环境:Ubuntu14.04 + Python 2.7.4RabbitMQ服务器sudo apt-get install rabbitmq-serverPython使用RabbitMQ需要Pika库sudo pip install pika远程结果返回消息发送端发送消息出去后没有结果...
  • 基本用法生产者1 importpika2 importsys34 username = 'wt' #指定远程rabbitmq的用户名密码5 pwd = '111111'6 user_pwd =pika.PlainCredentials(username, pwd)7 s_conn = pika.BlockingConnection(pika....
  • 1. RabbitMQ WorkQueue基本工作模式介绍上一篇我们使用C#语言讲解了单个消费者从消息队列中处理消息的模型,这一篇我们使用Python语言来讲解多个消费者同时工作从一个Queue处理消息的模型。工作队列(又称:任务队列...
  • (一)安装一个消息中间件,如:rabbitMQ(二)生产者sendmq.pyimport pikaimport sysimport time# 远程rabbitmq服务的配置信息username = 'admin' # 指定远程rabbitmq的用户名密码pwd = 'admin'ip_addr = '10.1.7.7...
  • 一:介绍简介RabbitMq本质就是一个消息... 在这个比喻中, RabbitMQ 就是一个邮筒, 同时也是邮局和邮递员在上述比喻中不同点在于RabbitMq不处理纸质信件,而是接收(accept),存储(storage),转发(forwards)二进制数据消息...
  • 原标题:Python操作rabbitMQRabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。rabbitMQ是一款基于AMQP协议的消息中间件,它能够在应用之间提供可靠的消息传输。在易用性,扩展性,高可用性上表现优秀。使用...
  • Python操作rabbitmq1.基本用法生产者[](javascript:void(0)????1 import pika2 import sys34 username = 'wt' #指定远程rabbitmq的用户名密码5 pwd = '111111'6 user_pwd = pika.PlainCredentials(username, pwd)7 s...
  • RabbitMQ也是消息队列,那RabbitMQ和之前python的Queue有什么区别么? py 消息队列: 线程 queue(同一进程下线程之间进行交互) 进程 Queue(父子进程进行交互 或者 同属于同一进程下的多个子进程进行交互) ...
  • python操作RabbitMQ一、rabbitmqRabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。MQ全称为Message Queue,消息队列(MQ)是一种应用程序对应用程序的通信方法。应用...
  • Python连接RabbitMQ

    2021-05-06 15:52:09
    之前写过RabbitMQ的安装这里记下如何用Python连接RabbitMQ 1.安装好后,登录rabbitmq http://localhost:15672 (如果是按照开头提到的博客安装的,用户名密码都是admin) 2.新建Exchanges(图里忘了填写Name,我的...
  • python操作RabbitMQ

    2018-10-30 08:55:38
    一、安装RabbitMQ ...二 用Python操作 RabbitMQ RabbitMQ 是一个在AMQP 基础上完成的, 可复用的企业消息系统。他遵循Mozilla Public License开源协议。 2.1 基于queue实现的消费者生成者模型 import Queue ...
  • Python操作rabbitmq

    2019-03-06 11:51:00
    Python操作rabbitmq 1.基本用法 生产者 1 import pika 2 import sys 3 4 username = 'wt' #指定远程rabbitmq的用户名密码 5 pwd = '111111' 6 user_pwd = pika.PlainCredentials(username, pwd) 7 s_conn =...
  • Python操作RabbitMQ

    2018-08-30 03:10:33
    来源:http://www.cnblogs.com/phennry/p/5713274.html 本篇博客主要介绍如何通过Python来操作管理RabbitMQ消息队列,大家在工作中可能遇到很多类似RabbitMQ这种消息队列的中间件,如:ZeroMQ...
  • RabbitMQ是一款基于MQ的服务器,Python可以通过Pika库来进行程序操控,这里我们将来详解Python操作RabbitMQ服务器消息队列的远程结果返回:
  • pythonrabbitmq

    2018-09-09 12:04:33
    centos7 YUM安装rabbitmq-server 安装pika模块(pip安装) .1 基本用法 生产者 #!bin/env python # -*- coding: UTF-8 -*- import pika #指定链接参数 connection = pika.BlockingConnection( pika.Connection...
  • python操作rabbitMQ

    2019-01-17 15:51:25
    RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。 MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息...
  • PythonrabbitMQ

    2021-05-29 10:10:45
    Python实现的消息队列 一、消息队列介绍 MQ全称为Message Queue 消息队列(MQ)是一种应用程序对应用程序的通信方法。MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取...
  • Python操作RabbitMQ初体验

    千次阅读 2018-09-03 15:57:11
    Python操作RabbitMQ初体验(一) 2015-11-24来源:python人气:939 ...
  • Python操作RabbitMq详解

    2020-02-16 13:57:47
    目录一、简介:二、VirtualHost三、RabbitMq的应用场景四、RabbitM去中的Connection和Channel五、RabbitMq生产者消费者模型六、RabbitMq持久化七、RabbitMq发布与订阅 一、简介: RabbitMq是实现了高级消息队列协议...
  • python 操作RabbitMQ

    2018-07-23 17:45:23
    RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。 MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 12,445
精华内容 4,978
关键字:

python读取rabbitmq数据

python 订阅