- 安裝依賴包
pip install rocketmq
- 生產(chǎn)者
需要注意的是假如你用的java SDK 需要只是UNinname
我們可以看到下列代碼設(shè)置了tag以及key,在頁(yè)面可以根據(jù)key查找消息
from rocketmq.client import Producer, Message
import json
producer = Producer('PID-XXX')
# producer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')
# For ip and port name server address, use `set_namesrv_addr` method, for example:
producer.set_namesrv_addr('192.168.214.134:9876')
producer.set_max_message_size(1024*1024)
# producer.set_session_credentials('XXX', 'XXXX', 'ALIYUN') # No need to call this function if you don't use Aliyun.
producer.start()
msg = Message('ellis')
msg.set_keys('XXX')
msg.set_tags('XXX')
msg.set_body(json.dumps({"key1":"value1"}))
ret = producer.send_sync(msg)
print(ret.msg_id,ret.offset,ret.status)
producer.shutdown()
- 消費(fèi)方式PullConsumer(全部消費(fèi))(可重復(fù)消費(fèi))
```python
from rocketmq.client import PullConsumer
consumer = PullConsumer('CID_XXX')
consumer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')
# For ip and port name server address, use `set_namesrv_addr` method, for example:
# consumer.set_namesrv_addr('127.0.0.1:9887')
consumer.set_session_credentials('XXX', 'XXXX', 'ALIYUN') # No need to call this function if you don't use Aliyun.
consumer.start()
for msg in consumer.pull('YOUR-TOPIC'):
print(msg.id, msg.body)
consumer.shutdown()
- 消費(fèi)方式PushConsumer(即時(shí)消費(fèi))(不可重復(fù)消費(fèi))
import time
from rocketmq.client import PushConsumer
def callback(msg):
print(msg.id, msg.body.decode())
consumer = PushConsumer('ellis1')
consumer.set_group("ellis1")
consumer.set_instance_name("ellis1")
# consumer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')
# For ip and port name server address, use `set_namesrv_addr` method, for example:
consumer.set_namesrv_addr('127.0.0.1:9876')
# consumer.set_session_credentials('XXX', 'XXXX', 'ALIYUN') # No need to call this function if you don't use Aliyun.
其中tags是要訂閱的TAG
consumer.subscribe('ellis', callback,TAGS)
consumer.start()
while True:
time.sleep(3600)
consumer.shutdown()
- 生產(chǎn)者發(fā)送消息選擇隊(duì)列,以及設(shè)置順序
from rocketmq.client import Producer, Message
import json
def queue_selector(mq_size, msg, arg):
return 1
producer = Producer('PID-XXX')
# producer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')
# For ip and port name server address, use `set_namesrv_addr` method, for example:
producer.set_namesrv_addr('192.168.214.134:9876')
producer.set_max_message_size(1024*1024)
# producer.set_session_credentials('XXX', 'XXXX', 'ALIYUN') # No need to call this function if you don't use Aliyun.
producer.start()
msg = Message('ellis')
msg.set_keys('XXX')
msg.set_tags('XXX')
msg.set_body(json.dumps({"key1":"value1"}))
ret = producer.send_oneway_orderly(msg,arg=1,queue_selector=queue_selector)
producer.shutdown()
- 消費(fèi)者順序消費(fèi)
import time
from rocketmq.client import PushConsumer
def callback(msg):
print(msg)
print(msg.id, msg.body.decode())
consumer = PushConsumer('ellis1',orderly=True)
consumer.set_group("ellis3")
consumer.set_instance_name("ellis1")
# consumer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')
# For ip and port name server address, use `set_namesrv_addr` method, for example:
consumer.set_namesrv_addr('127.0.0.1:9876')
# consumer.set_session_credentials('XXX', 'XXXX', 'ALIYUN') # No need to call this function if you don't use Aliyun.
consumer.subscribe('ellis', callback)
consumer.start()
while True:
time.sleep(3600)
consumer.shutdown()
https://github.com/apache/rocketmq-clients文章來源地址http://www.zghlxwxcb.cn/news/detail-622221.html
文章來源:http://www.zghlxwxcb.cn/news/detail-622221.html
到了這里,關(guān)于python rocketmq生產(chǎn)者消費(fèi)者的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!