从零开始的Linux运维屌丝之路,资源免费分享平台   运维人员首选:简单、易用、高效、安全、稳定、社区活跃的开源软件

Python消息队列(RabbitMQ)

发布:蔺要红07-04分类: Python


RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用(削峰)。可维护多个队列,可实现消息的一对一和广播等方式发送

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗

RabbitMQ在Linux下安装:https://www.linyaohong.com/blog/linux/205.html

吐槽:版本不同写法有变动、对于老夫这个新手太伤了,搞了半个多小时

RabbitMQ在python中使用需要安装 : pip install pika  #Linux和pycharm需要安装pika
同时还需要注意安装的版本,1.01和0.12有稍微的变动

# pip 1.01版本写法
chan.basic_consume( queue='hello',#指定取消息的队列名
                    on_message_callback=callback,  #调用回调函数,从队列里取消息,收到消息调用callback来处理消息
                    auto_ack=True #取完一条消息后,不给生产者发送确认消息,默认是False的,即  默认给rabbitmq发送一个收到消息的确认,一般默认即可
#pip0.12版本写法
chan.basic_consume(callback,  #调用回调函数,从队列里取消息
                    queue='hello',#指定取消息的队列名
                    no_ack=True) #取完一条消息后,不给生产者发送确认消息,默认是False的,即  默认给rabbitmq发送一个收到消息的确认,一般默认即可
                    )

# pip 1.01版本写法
channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')
#pip0.12版本写法
channel.exchange_declare(exchange='direct_logs',
                         type='direct')

# pip 1.01版本写法
result = channel.queue_declare(queue='',exclusive=True)
#pip0.12版本写法
result = channel.queue_declare(exclusive=True)


消息不丢失的方法主要依赖于两点:

auto_ack=False 默认就是Fales,不加这个参数即可
处理完成以后,给生产者发送确认信息,生产者才会把消息从队列里删除 : ch.basic_ack(delivery_tag=method.delivery_tag) 
生产者
 
# -*- coding: UTF-8 -*-
#pip install pika  #Linux和pycharm需要安装pika
import pika
username = 'admin'
pwd = 'linyaohong'
user_pwd = pika.PlainCredentials(username, pwd)
s_conn = pika.BlockingConnection(pika.ConnectionParameters('10.10.10.190', credentials=user_pwd))#创建连接
chan = s_conn.channel()  #在连接上创建一个频道
chan.queue_declare(queue='hello') #声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另一方能正常运行
chan.basic_publish( exchange='',  #交换机
                    routing_key='hello',#路由键,写明将消息发往哪个队列,本例是将消息发往队列hello
                    body='hello world'#生产者要发送的消息
                    )
print("[生产者]生产:'hello world")
s_conn.close()#当生产者发送完消息后,可选择关闭连接

消费者
 
# -*- coding: UTF-8 -*-
import pika,time
username = 'admin'#指定远程rabbitmq的用户名密码
pwd = 'linyaohong'
user_pwd = pika.PlainCredentials(username, pwd)
#s_conn = pika.BlockingConnection(pika.ConnectionParameters('10.10.10.190'))
s_conn = pika.BlockingConnection(pika.ConnectionParameters('10.10.10.190', credentials=user_pwd))#创建连接
chan = s_conn.channel()#在连接上创建一个频道
chan.queue_declare(queue='hello')#声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另一方能正常运行
def callback(ch,method,properties,body): # 定义一个回调函数,用来接收生产者发送的消息
    # print('ch',ch) #连接频道的内存地址
    # print(method)  #
    # print(properties)
    # print(body)   #消息队列的内容
    #time.sleep(5)
    print("[消费者]收到消息:%s" % body)
# pip0.12版本写法
# chan.basic_consume(callback,  #调用回调函数,从队列里取消息
#                    queue='hello',#指定取消息的队列名
#                    no_ack=True) #取完一条消息后,不给生产者发送确认消息,默认是False的,即  默认给rabbitmq发送一个收到消息的确认,一般默认即可
# pip 1.01版本写法
chan.basic_consume( queue='hello',#指定取消息的队列名
                    on_message_callback=callback,  #调用回调函数,从队列里取消息,收到消息调用callback来处理消息
                    #auto_ack=True #取完一条消息后,不给生产者发送确认消息,默认是False的,即  默认给rabbitmq发送一个收到消息的确认,一般默认即可
                    # 这里注释掉代表:当消费者消费了消息以后,没有给生产发送确认信息,消息则不会被删除,除非消费者回复了确认信息,消息才会被删除
                    )
print('[消费者] 等待消息中.............')
chan.start_consuming()#开始循环取消息
 

消息持久化存储(Message durability)

 

虽然有了消息反馈机制,但是如果rabbitmq自身挂掉的话,那么任务还是会丢失。所以需要将任务持久化存储起来。声明持久化存储

channel.queue_declare(queue='hello', durable=True) # 声明队列持久化


Ps: 但是这样程序会执行错误,因为‘hello’这个队列已经存在,并且是非持久化的,rabbitmq不允许使用不同的参数来重新定义存在的队列。因此需要重新定义一个队列

channel.queue_declare(queue='MMMMM', durable=True) # 声明队列持久化 (但是消息并不持久化,仅仅是对队列持久化)

注意:如果仅仅是设置了队列的持久化,仅队列本身可以在rabbit-server宕机后保留,队列中的信息依然会丢失,如果想让队列中的信息或者任务保留,还需要做以下设置:

chan.basic_publish( exchange='',  #交换机
                    routing_key='MMMMM',#路由键,写明将消息发往哪个队列,本例是将消息发往队列hello
                    body='hello world',  #生产者要发送的消息
                    properties=pika.BasicProperties(
                        delivery_mode = 2,  #使消息或任务也持久化存储
                    )
                    )
 
消息队列持久化包括3个部分:
      (1)exchange持久化,在声明时指定durable => 1
      (2)queue持久化,在声明时指定durable => 1
      (3)消息持久化,在投递时指定delivery_mode=> 2(1是非持久化)

    如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。
在rabbitMQ服务端重启查看效果:
[[email protected] ~]# rabbitmqctl  list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name	messages
MMMMM	6
[[email protected] ~]# systemctl restart rabbitmq-server
[[email protected] ~]# rabbitmqctl  list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name	messages
MMMMM	6

生产者(消息队列和消息持久化)两点设置:
1、durable=True
2、properties=pika.BasicProperties(
           delivery_mode = 2,   #使消息或任务也持久化存储
     )
# -*- coding: UTF-8 -*-
#pip install pika  #Linux和pycharm需要安装pika
import pika
username = 'admin'
pwd = 'linyaohong'
user_pwd = pika.PlainCredentials(username, pwd)
s_conn = pika.BlockingConnection(pika.ConnectionParameters('10.10.10.190', credentials=user_pwd))#创建连接
chan = s_conn.channel()  #在连接上创建一个频道
#durable=True 声明一个持久化消息队列
chan.queue_declare(queue='MMMMM',durable=True) #声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另一方能正常运行
chan.basic_publish( exchange='',  #交换机
                    routing_key='MMMMM',#路由键,写明将消息发往哪个队列,本例是将消息发往队列hello
                    body='hello world',  #生产者要发送的消息
                    properties=pika.BasicProperties(
                        delivery_mode=2,  #使消息或任务也持久化存储
                    )
                    )
print("[生产者]生产:'hello world")
s_conn.close()#当生产者发送完消息后,可选择关闭连接

消费者:
chan.basic_qos( prefetch_count= 1 )   用来设定,当有正在处理的消息大于1个的时候,不接受新的消息
 
# -*- coding: UTF-8 -*-
import pika,time
username = 'admin'#指定远程rabbitmq的用户名密码
pwd = 'linyaohong'
user_pwd = pika.PlainCredentials(username, pwd)
#s_conn = pika.BlockingConnection(pika.ConnectionParameters('10.10.10.190'))
s_conn = pika.BlockingConnection(pika.ConnectionParameters('10.10.10.190', credentials=user_pwd))#创建连接
chan = s_conn.channel()#在连接上创建一个频道
chan.queue_declare(queue='MMMMM',durable=True)#声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另一方能正常运行
def callback(ch,method,properties,body): # 定义一个回调函数,用来接收生产者发送的消息
    print("[消费者]收到消息:%s" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 保证消息不丢失(处理完成以后,给生产者发送确认信息,生产者才会把消息从队列里删除)
chan.basic_qos( prefetch_count= 1 )  #设定当队列中有1条消息的时候,不再接受新消息
chan.basic_consume( queue='MMMMM',#指定取 消息的队列名
                    on_message_callback=callback,  #调用回调函数,从队列里取消息,收到消息调用callback来处理消息
                    #auto_ack=True #取完一条消息后,不给生产者发送确认消息,默认是False的,即  默认给rabbitmq发送一个收到消息的确认,一般默认即可
                    # 这里注释掉代表:当消费者消费了消息以后,没有给生产发送确认信息,消息则不会被删除,除非消费者回复了确认信息,消息才会被删除
                    )
print('[消费者] 等待消息中.............')
chan.start_consuming()#开始循环取消息


发布与订阅

RabbitMQ的发布与订阅,借助于交换机(Exchange)来实现。
交换机的工作原理:消息发送端先将消息发送给交换机,交换机再将消息发送到绑定的消息队列,而后每个接收端(consumer)都能从各自的消息队列里接收到信息。

Exchange有三种工作模式,分别为:Fanout, Direct, Topic
 

模式1 Fanout 

 

任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上
  1.可以理解为路由表的模式
  2.这种模式不需要routing_key(即使指定,也是无效的)
  3.这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定。
  4.如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。

注意:这个时候必须先启动消费者,即订阅者。因为随机队列是在consumer启动的时候随机生成的,并且进行绑定的。producer仅仅是发送至exchange,并不直接与随机队列进行通信。

相当于收音机,生产者是广播模式,声明 exchange和类型、只负责广播,消费者进行声明exchange 负责接受,如果消费者关闭,则收不到消息,生产者也不会保留


生产者:
 

# -*- coding: UTF-8 -*-
import pika
credentials = pika.PlainCredentials('guest', 'guest')
#链接rabbit服务器(localhost是本机,如果是其他服务器请修改为ip地址)
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.199.11',5672,'/',credentials))
channel = connection.channel() #在连接上创建一个频道
# 定义交换机,exchange表示交换机名称,type表示类型
channel.exchange_declare(exchange='logs_fanout',
                         exchange_type='fanout')
message = 'Hello Python'
# 将消息发送到交换机
channel.basic_publish(exchange='logs_fanout',  # 指定exchange
                      routing_key='',  # fanout下不需要配置,配置了也不会生效
                      body=message)
print(" [生产者] 发送消息: %r" % message)
connection.close()

消费者
 
# -*- coding: utf-8 -*-
#  rabbitmq 订阅者
import pika
credentials = pika.PlainCredentials('guest', 'guest')
#链接rabbit服务器(localhost是本机,如果是其他服务器请修改为ip地址)
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.199.11',5672,'/',credentials))
channel = connection.channel()
# 定义交换机,进行exchange声明,exchange表示交换机名称,type表示类型
channel.exchange_declare(exchange='logs_fanout',
                         exchange_type='fanout')
# 随机创建队列

# result = channel.queue_declare(exclusive=True)  #pip老版本用这种写法 0.12 pika写法
result = channel.queue_declare(queue='',exclusive=True)  # pipa:1.01版本写法:exclusive=True表示建立临时队列,当consumer关闭后,该队列就会被删除
queue_name = result.method.queue #得到随机队列的名字
print(queue_name)
# 将队列与exchange进行绑定
channel.queue_bind(exchange='logs_fanout',
                   queue=queue_name)
print(' [消费者接受消息...........]. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [收到消息:] %r" % body)

# 从队列获取信息
channel.basic_consume(queue=queue_name,
                      on_message_callback=callback,
                      auto_ack=True)
channel.start_consuming()
 
模式2  Direct



 

路由键的工作原理:每个接收端的消息队列在绑定交换机的时候,可以设定相应的路由键。发送端通过交换机发送信息时,可以指明路由键 ,交换机会根据路由键把消息发送到相应的消息队列,这样接收端就能接收到消息了。  

任何发送到Direct Exchange的消息都会被转发到routing_key中指定的Queue:

  1.一般情况可以使用rabbitMQ自带的Exchange:””  (该Exchange的名字为空字符串), 也可以自定义Exchange   
  2.这种模式下不需要将Exchange进行任何绑定(bind)操作。当然也可以进行绑定。可以将不同的routing_key与不同的queue进行绑定,不同的    queue与不同exchange进行绑定
  3.消息传递时需要一个“routing_key”
  4.如果消息中中不存在routing_key中绑定的队列名,则该消息会被抛弃。

如果一个exchange 声明为direct,并且bind中指定了routing_key,那么发送消息时需要同时指明该exchange和routing_key.

生产者代码

 

# -*- coding: utf-8 -*-
# 发布者
import pika,sys
credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.199.11',5672,'/',credentials))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()


消费者代码
 

# -*- coding: utf-8 -*-
import pika,sys
credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.199.11',5672,'/',credentials))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)
for severity in severities:  #循环脚本后的参数
    channel.queue_bind( queue=queue_name,
                        exchange='direct_logs',
                        routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(  queue=queue_name,
                        on_message_callback=callback,
                        auto_ack=True)
channel.start_consuming()


实现效果:消费者可以选择不同错误级,别选择性收到信息



模式3 Topic

路由键模糊匹配,其实是路由键(routing_key)的扩展,就是可以使用正则表达式,和常用的正则表示式不同,这里的话“#”表示所有、全部的意思;“*”只匹配到一个词。

任何发送到Topic Exchange的消息都会被转发到所有关心routing_key中指定话题的Queue上

  1.这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(routing_key),Exchange会将消息转发到所有关注主题能与  routing_key模糊匹配的队列。
  2.这种模式需要routing_key,也许要提前绑定Exchange与Queue。
  3.在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及log的消息(一个routing_key为”MQ.log.error”的消息会被转发到该队列)。
  4.“#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。
  5.同样,如果Exchange没有发现能够与routing_key匹配的Queue,则会抛弃此消息。

具体代码这里不在多余写,参照第二种模式的就可以,唯一变动的地方就是exchange type的声明,以及进行绑定和发送的时候routing_key使用正则模式即可。

生产者代码

 

# -*- coding: utf-8 -*-
# 发布者
import pika,sys
credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.199.11',5672,'/',credentials))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()


消费者代码

 

# -*- coding: utf-8 -*-
import pika,sys
credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.199.11',5672,'/',credentials))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic')
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)
for binding_key in binding_keys:
    channel.queue_bind( queue=queue_name,
                        exchange='topic_logs',
                        routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(  queue=queue_name,
                        on_message_callback=callback,
                        auto_ack=True)
channel.start_consuming()





Remote procedure call(RPC)

消费者:--server端

# -*- coding:utf-8 -*-
import pika,time
credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.199.11',5672,'/',credentials))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def func(n):
    name = "results"
    return name
def on_request(ch, method,props,body):
    n = body  #(n为生产者传过来的message)
    print(" [消费者-服务端执行命令] fib(%s)" % n)
    response = func(n) #收到信息,执行什么
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to, #客户端随机生成的Q
                     properties=pika.BasicProperties(correlation_id= props.correlation_id),
                     body=response)
    ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1) # 设定当队列中有1条消息的时候,不再接受新消息
channel.basic_consume(queue='rpc_queue',
                      on_message_callback=on_request
                      )
print(" [消费者--服务端] 等待接受指令")
channel.start_consuming()


生产者-客户端

# -*- coding:utf-8 -*-
import pika,uuid,time
cmd = 'dir'
class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.199.11',5672,'/',pika.PlainCredentials('guest', 'guest')))
        self.channel = self.connection.channel()
        result = self.channel.queue_declare(queue='',exclusive=True)
        self.callback_queue = result.method.queue #生成一个随机Q
        self.channel.basic_consume(queue=self.callback_queue,
                                   on_message_callback=self.on_response, #回调函数
                                   auto_ack=True)
    def on_response(self,ch,method,props,body):
        if self.corr_id == props.correlation_id:
            self.response = body
    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        #发送消息
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   body=str(n),
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue, #随机Q
                                       correlation_id=self.corr_id,)
                                   )
        while self.response is None:
            #self.channel.start_consuming()
            self.connection.process_data_events() #  执行call方法后,先发送消息,然后开始接收消息 *******  非阻塞的  chan.start_consuming()#开始循环取消息 【发送完消息后,开始接受消息】
            # print("没有消息")
            time.sleep(1)
        return self.response
fibonacci_rpc = FibonacciRpcClient()
print(" [生产者开始发送指令: %s]"%cmd)
response = fibonacci_rpc.call(cmd)
print(" [生产者得到指令的执行结果:]%s" %response)

 

温馨提示如有转载或引用以上内容之必要,敬请将本文链接作为出处标注,如有侵权我会在24小时之内删除!

欢迎使用手机扫描访问本站