fanout發(fā)布訂閱模式
基本用法
生產(chǎn)者
import json
import rabbitmq
# 建立連接
credentials = rabbitmq.PlainCredentials(
'zhangdapeng',
'zhangdapeng520',
) # mq用戶名和密碼
connection_target = rabbitmq.ConnectionParameters(
host='127.0.0.1',
port=5672,
virtual_host='/',
credentials=credentials,
)
connection = rabbitmq.BlockingConnection(connection_target)
# 隊(duì)列信息
exchange_name = "user_manager_fanout"
queue_name = "user_manager_fanout"
# 創(chuàng)建管道
channel = connection.channel()
# 聲明一個(gè)交換機(jī)
channel.exchange_declare(exchange=exchange_name, exchange_type=rabbitmq.ExchangeType.fanout)
# 向隊(duì)列中寫入數(shù)據(jù)
user = {"id": 1, "name": "張三", "age": 23}
message = json.dumps(user, ensure_ascii=True)
channel.basic_publish(
exchange=exchange_name,
routing_key=queue_name, # 隊(duì)列名
body=message.encode('utf8'),
properties=rabbitmq.BasicProperties(delivery_mode=2), # 聲明消息在隊(duì)列中持久化
)
print(message)
# 關(guān)閉連接
connection.close()
消費(fèi)者
import rabbitmq
import json
# 創(chuàng)建連接
credentials = rabbitmq.PlainCredentials(
'zhangdapeng',
'zhangdapeng520',
)
target = rabbitmq.ConnectionParameters(
host='127.0.0.1',
port=5672,
virtual_host='/',
credentials=credentials,
)
connection = rabbitmq.BlockingConnection(target)
# 創(chuàng)建管道
channel = connection.channel()
# 隊(duì)列信息
exchange_name = "user_manager_fanout"
queue_name = "user_manager_fanout"
# 綁定交換機(jī)
channel.exchange_declare(
exchange=exchange_name,
exchange_type=rabbitmq.ExchangeType.fanout,
)
# 綁定隊(duì)列
result = channel.queue_declare(
queue=queue_name,
exclusive=True,
)
channel.queue_bind(
exchange=exchange_name,
queue=queue_name,
)
def callback(ch, method, properties, body):
"""每次接收到消息的消費(fèi)回調(diào)方法"""
ch.basic_ack(delivery_tag=method.delivery_tag)
data = body.decode("utf8")
print(json.loads(data))
# 開始消費(fèi)
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=False,
)
try:
channel.start_consuming()
finally:
connection.close()
簡化代碼
生產(chǎn)者
import rabbitmq
# 建立連接
connection = rabbitmq.get_connection()
# 隊(duì)列信息
exchange_name = "user_manager_fanout"
queue_name = "user_manager_fanout"
# 創(chuàng)建管道
channel = connection.channel()
# 聲明一個(gè)交換機(jī)
channel.exchange_declare(exchange=exchange_name, exchange_type=rabbitmq.ExchangeType.fanout)
# 向隊(duì)列中寫入數(shù)據(jù)
user = {"id": 1, "name": "張三", "age": 23}
rabbitmq.send_json(channel, user, exchange_name, queue_name)
# 關(guān)閉連接
connection.close()
消費(fèi)者
import rabbitmq
import json
# 創(chuàng)建連接
connection = rabbitmq.get_connection()
# 創(chuàng)建管道
channel = connection.channel()
# 隊(duì)列信息
exchange_name = "user_manager_fanout"
queue_name = "user_manager_fanout"
# 綁定交換機(jī)
channel.exchange_declare(
exchange=exchange_name,
exchange_type=rabbitmq.ExchangeType.fanout,
)
# 綁定隊(duì)列
result = channel.queue_declare(
queue=queue_name,
exclusive=True,
)
channel.queue_bind(
exchange=exchange_name,
queue=queue_name,
)
def callback(ch, method, properties, body):
"""每次接收到消息的消費(fèi)回調(diào)方法"""
print(rabbitmq.receive_json(ch, method, body))
# 開始消費(fèi)
rabbitmq.consume(connection, queue_name, callback)
進(jìn)一步簡化代碼
生產(chǎn)者
import rabbitmq
# 建立連接
connection = rabbitmq.get_connection()
# 隊(duì)列信息
exchange_name = "user_manager_fanout"
queue_name = "user_manager_fanout"
# 創(chuàng)建管道
channel = rabbitmq.get_fanout_channel(connection, exchange_name)
# 向隊(duì)列中寫入數(shù)據(jù)
user = {"id": 1, "name": "張三", "age": 23}
rabbitmq.send_json(channel, user, exchange_name, queue_name)
# 關(guān)閉連接
connection.close()
消費(fèi)者
import rabbitmq
# 創(chuàng)建連接
connection = rabbitmq.get_connection()
# 隊(duì)列信息
exchange_name = "user_manager_fanout"
queue_name = "user_manager_fanout"
# 創(chuàng)建管道
channel = rabbitmq.get_fanout_channel(connection, exchange_name, queue_name)
def callback(ch, method, properties, body):
"""每次接收到消息的消費(fèi)回調(diào)方法"""
print(rabbitmq.receive_json(ch, method, body))
# 開始消費(fèi)
rabbitmq.consume(connection, queue_name, callback)
文章來源地址http://www.zghlxwxcb.cn/news/detail-798574.html
文章來源:http://www.zghlxwxcb.cn/news/detail-798574.html
到了這里,關(guān)于Python如何操作RabbitMQ實(shí)現(xiàn)fanout發(fā)布訂閱模式?有錄播直播私教課視頻教程的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!