Rabbitmq 简单生产消费者模型

生产者 send.py

#!/usr/bin/env python
import pika
# 创建凭证,使用rabbitmq用户密码登录
# 去邮局取邮件,必须得验证身份
credentials = pika.PlainCredentials("andy","123")
# 新建连接,这里localhost可以更换为服务器ip
# 找到这个邮局,等于连接上服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.119.10',credentials=credentials))
# 创建频道
# 建造一个大邮箱,隶属于这家邮局的邮箱,就是个连接
channel = connection.channel()
# 声明一个队列,用于接收消息,队列名字叫“水许传”
channel.queue_declare(queue='水许传')
# 注意在rabbitmq中,消息想要发送给队列,必须经过交换(exchange),初学可以使用空字符串交换(exchange=''),它允许我们精确的指定发送给哪个队列(routing_key=''),参数body值发送的数据
channel.basic_publish(exchange='',
                      routing_key='水许传',
                      body='武松又去打老虎啦')
print("已经发送了消息")
# 程序退出前,确保刷新网络缓冲以及消息发送给rabbitmq,需要关闭本次连接
connection.close()

接受者receive.py

import pika
# 建立与rabbitmq的连接
credentials = pika.PlainCredentials("andy","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.119.10',credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue="水许传")

def callbak(ch,method,properties,body):
    print("消费者接收到了任务:%r"%body.decode("utf8"))
# 有消息来临,立即执行callbak,没有消息则夯住,等待消息
# 老百姓开始去邮箱取邮件啦,队列名字是水许传
channel.basic_consume(callbak,queue="水许传",no_ack=True)
# 开始消费,接收消息
channel.start_consuming()

说明:RabbitMq存在 ack和no ack机制:

no -ack 机制:不确认机制

也就是说每次消费者接收到数据后,不管是否处理完毕,rabbitmq-server都会把这个消息标记完成,从队列中删除.

ack机制:

ACK机制用于保证消费者如果拿了队列的消息,客户端处理时出错了,那么队列中仍然还存在这个消息,提供下一位消费者继续取

 

ack确认机制不中,生产者不需要作任何修改,而消费者(接收者)则需要作修改:

一是要将no_ack设置为False,另外 需要告诉服务端,我已经取走了。

import pika

credentials = pika.PlainCredentials("andy","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.119.10',credentials=credentials))
channel = connection.channel()

# 声明一个队列(创建一个队列)
channel.queue_declare(queue='水浒传')

def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body.decode("utf-8"))
    # int('asdfasdf')
    # 我告诉rabbitmq服务端,我已经取走了消息
    # 回复方式在这
    ch.basic_ack(delivery_tag=method.delivery_tag)
# 关闭no_ack,代表给与服务端ack回复,确认给与回复
channel.basic_consume(callback,queue='水浒传',no_ack=False)

channel.start_consuming()

 

 

上一篇:RabbitMQ 用户与权限管理

下一篇:Linux wget命令