国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

python rocketmq生產(chǎn)者消費(fèi)者

這篇具有很好參考價(jià)值的文章主要介紹了python rocketmq生產(chǎn)者消費(fèi)者。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

  1. 安裝依賴包
pip install rocketmq
  1. 生產(chǎn)者
    需要注意的是假如你用的java SDK 需要只是UNinname
    我們可以看到下列代碼設(shè)置了tag以及key,在頁(yè)面可以根據(jù)key查找消息

from rocketmq.client import Producer, Message
import json

producer = Producer('PID-XXX')
# producer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')
# For ip and port name server address, use `set_namesrv_addr` method, for example:
producer.set_namesrv_addr('192.168.214.134:9876')
producer.set_max_message_size(1024*1024)

# producer.set_session_credentials('XXX', 'XXXX', 'ALIYUN') # No need to call this function if you don't use Aliyun.
producer.start()

msg = Message('ellis')
msg.set_keys('XXX')
msg.set_tags('XXX')
msg.set_body(json.dumps({"key1":"value1"}))
ret = producer.send_sync(msg)

print(ret.msg_id,ret.offset,ret.status)
producer.shutdown()
  1. 消費(fèi)方式PullConsumer(全部消費(fèi))(可重復(fù)消費(fèi))

```python
from rocketmq.client import PullConsumer


consumer = PullConsumer('CID_XXX')
consumer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')
# For ip and port name server address, use `set_namesrv_addr` method, for example:
# consumer.set_namesrv_addr('127.0.0.1:9887')
consumer.set_session_credentials('XXX', 'XXXX', 'ALIYUN') # No need to call this function if you don't use Aliyun.
consumer.start()

for msg in consumer.pull('YOUR-TOPIC'):
    print(msg.id, msg.body)

consumer.shutdown()
  1. 消費(fèi)方式PushConsumer(即時(shí)消費(fèi))(不可重復(fù)消費(fèi))
import time

from rocketmq.client import PushConsumer


def callback(msg):
    print(msg.id, msg.body.decode())


consumer = PushConsumer('ellis1')
consumer.set_group("ellis1")
consumer.set_instance_name("ellis1")
# consumer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')
# For ip and port name server address, use `set_namesrv_addr` method, for example:
consumer.set_namesrv_addr('127.0.0.1:9876')
# consumer.set_session_credentials('XXX', 'XXXX', 'ALIYUN') # No need to call this function if you don't use Aliyun.
其中tags是要訂閱的TAG
consumer.subscribe('ellis', callback,TAGS)
consumer.start()

while True:
    time.sleep(3600)

consumer.shutdown()
  1. 生產(chǎn)者發(fā)送消息選擇隊(duì)列,以及設(shè)置順序
from rocketmq.client import Producer, Message
import json


def queue_selector(mq_size, msg, arg):
    return 1


producer = Producer('PID-XXX')
# producer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')
# For ip and port name server address, use `set_namesrv_addr` method, for example:
producer.set_namesrv_addr('192.168.214.134:9876')
producer.set_max_message_size(1024*1024)

# producer.set_session_credentials('XXX', 'XXXX', 'ALIYUN') # No need to call this function if you don't use Aliyun.
producer.start()

msg = Message('ellis')
msg.set_keys('XXX')
msg.set_tags('XXX')
msg.set_body(json.dumps({"key1":"value1"}))

ret = producer.send_oneway_orderly(msg,arg=1,queue_selector=queue_selector)


producer.shutdown()
  1. 消費(fèi)者順序消費(fèi)
import time

from rocketmq.client import PushConsumer


def callback(msg):
    print(msg)
    print(msg.id, msg.body.decode())


consumer = PushConsumer('ellis1',orderly=True)
consumer.set_group("ellis3")
consumer.set_instance_name("ellis1")
# consumer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')
# For ip and port name server address, use `set_namesrv_addr` method, for example:
consumer.set_namesrv_addr('127.0.0.1:9876')
# consumer.set_session_credentials('XXX', 'XXXX', 'ALIYUN') # No need to call this function if you don't use Aliyun.
consumer.subscribe('ellis', callback)
consumer.start()

while True:
    time.sleep(3600)

consumer.shutdown()

https://github.com/apache/rocketmq-clients文章來源地址http://www.zghlxwxcb.cn/news/detail-622221.html

到了這里,關(guān)于python rocketmq生產(chǎn)者消費(fèi)者的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • python爬蟲,多線程與生產(chǎn)者消費(fèi)者模式

    使用隊(duì)列完成生產(chǎn)者消費(fèi)者模式 使用類創(chuàng)建多線程提高爬蟲速度 通過隊(duì)列可以讓線程之間進(jìn)行通信 創(chuàng)建繼承Thread的類創(chuàng)建線程,run()會(huì)在線程start時(shí)執(zhí)行 吃cpu性能

    2024年02月09日
    瀏覽(18)
  • 生產(chǎn)者-消費(fèi)者模型

    生產(chǎn)者-消費(fèi)者模型

    目錄 1、生產(chǎn)者-消費(fèi)者模型是什么 2、Java中的實(shí)現(xiàn) 3、應(yīng)用于消息隊(duì)列 3.1 引入依賴 3.2?rabbitmq網(wǎng)站新建隊(duì)列queue 3.3 模塊中配置application.yml 3.4 生產(chǎn)者實(shí)現(xiàn)類 3.5 單元測(cè)試,發(fā)送msg到rabbitmq的隊(duì)列(my_simple_queue) 3.6 消費(fèi)者實(shí)現(xiàn)類 3.7?從rabbitmq隊(duì)列(my_simple_queue)消費(fèi)數(shù)據(jù) 3.8 隊(duì)列的配

    2024年02月06日
    瀏覽(19)
  • 生產(chǎn)者與消費(fèi)者問題

    生產(chǎn)者與消費(fèi)者問題

    ????????本篇文章我們使用C++探討一下生產(chǎn)者與消費(fèi)者問題.? ? ? ? ? 我們學(xué)習(xí)了操作系統(tǒng), 知道了進(jìn)程和線程的概念, 但是如果不進(jìn)行代碼實(shí)戰(zhàn)的話, 會(huì)很難理解它們. 特別是編程的初學(xué)者(比如我), 在了解了進(jìn)程和線程后通常會(huì)感到疑惑: 多線程怎么用? 為啥我平時(shí)寫代碼

    2024年02月12日
    瀏覽(25)
  • 多線程之生產(chǎn)者消費(fèi)者

    目的是回顧多線程的幾個(gè)api 多生產(chǎn)者+多消費(fèi)者+共享池

    2024年02月07日
    瀏覽(27)
  • Linux——生產(chǎn)者消費(fèi)者模型

    Linux——生產(chǎn)者消費(fèi)者模型

    目錄 一.為何要使用生產(chǎn)者消費(fèi)者模型 ?二.生產(chǎn)者消費(fèi)者模型優(yōu)點(diǎn) ?三.基于BlockingQueue的生產(chǎn)者消費(fèi)者模型 1.BlockingQueue——阻塞隊(duì)列 2.實(shí)現(xiàn)代碼 ?四.POSIX信號(hào)量 五.基于環(huán)形隊(duì)列的生產(chǎn)消費(fèi)模型 生產(chǎn)者消費(fèi)者模式就是通過一個(gè)容器來解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問題。生產(chǎn)者

    2024年02月08日
    瀏覽(22)
  • kafka生產(chǎn)者消費(fèi)者練習(xí)

    需求:寫一個(gè)生產(chǎn)者,不斷的去生產(chǎn)用戶行為數(shù)據(jù),寫入到kafka的一個(gè)topic中 生產(chǎn)的數(shù)據(jù)格式: 造數(shù)據(jù) {“guid”:1,“eventId”:“pageview”,“timestamp”:1637868346789} isNew = 1 {“guid”:1,“eventId”:“addcard”,“timestamp”:1637868347625} isNew = 0 {“guid”:2,“eventId”:“collect”,“timestamp”

    2024年02月08日
    瀏覽(28)
  • linux:生產(chǎn)者消費(fèi)者模型

    linux:生產(chǎn)者消費(fèi)者模型

    個(gè)人主頁(yè) : 個(gè)人主頁(yè) 個(gè)人專欄 : 《數(shù)據(jù)結(jié)構(gòu)》 《C語言》《C++》《Linux》 本文是對(duì)于生產(chǎn)者消費(fèi)者模型的知識(shí)總結(jié) 生產(chǎn)者消費(fèi)者模型就是通過一個(gè)容器來解決生產(chǎn)者消費(fèi)者的強(qiáng)耦合問題。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而是通過之間的容器來進(jìn)行通訊,所以生產(chǎn)者

    2024年04月15日
    瀏覽(19)
  • 【JavaEE】生產(chǎn)者消費(fèi)者模式

    【JavaEE】生產(chǎn)者消費(fèi)者模式

    作者主頁(yè): paper jie_博客 本文作者:大家好,我是paper jie,感謝你閱讀本文,歡迎一建三連哦。 本文于《JavaEE》專欄,本專欄是針對(duì)于大學(xué)生,編程小白精心打造的。筆者用重金(時(shí)間和精力)打造,將基礎(chǔ)知識(shí)一網(wǎng)打盡,希望可以幫到讀者們哦。 其他專欄:《MySQL》《C語言》

    2024年02月05日
    瀏覽(18)
  • rabbitmq消費(fèi)者與生產(chǎn)者

    rabbitmq消費(fèi)者與生產(chǎn)者

    在第一次學(xué)習(xí)rabbitmq的時(shí)候,遇到了許多不懂得 第一步導(dǎo)包 第二步新增生產(chǎn)者 在這里中: connectionFactory.setVirtualHost(\\\"my_vhost\\\");//填寫自己的隊(duì)列名稱,如果你的為”/“則填寫\\\'\\\'/\\\'\\\' 第三步新增消費(fèi)者 消息獲取成功 注意如果你用的云服務(wù)器需要打開這兩個(gè)端口 5672 15672 如果你使

    2024年02月11日
    瀏覽(28)
  • LabVIEW建立生產(chǎn)者消費(fèi)者

    LabVIEW建立生產(chǎn)者消費(fèi)者

    LabVIEW建立生產(chǎn)者消費(fèi)者 生產(chǎn)者/消費(fèi)者設(shè)計(jì)模式由并行循環(huán)組成,這些循環(huán)分為兩類:生產(chǎn)者循環(huán)和消費(fèi)者循環(huán)。生產(chǎn)者循環(huán)和消費(fèi)者循環(huán)間的通信可以使用隊(duì)列或通道連線來實(shí)現(xiàn)。 隊(duì)列 LabVIEW內(nèi)置的隊(duì)列操作VI可在函數(shù)選板數(shù)據(jù)通信隊(duì)列操作(?Functions?Data?Communication??Que

    2024年02月07日
    瀏覽(21)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包