連接RabbitMQ
1.創(chuàng)建ConnectionFactory,給定參數(shù)ip地址,端口號,用戶名和密碼等
2.創(chuàng)建ConnectionFactory,使用uri方式實現(xiàn),創(chuàng)建channel。
注意:
Connection可以用來創(chuàng)建多個channel實例,但channel實例不能在線程間共享,應(yīng)用程序為每個線程開辟一個channel。多線程間共享channel實例是非線程安全的。
Channel或connection有個isOpen方法可以用來檢測其是否已處于開啟狀態(tài),但不推薦使用有可能會產(chǎn)生競爭。
在調(diào)用createxx或newxx方法后,可以認為Connection和channel已經(jīng)處于開啟狀態(tài),而不會使用isopen檢測,如果使用channel時已經(jīng)處于關(guān)閉狀態(tài),那么程序會拋出一個ShutdownSignalException,只需捕獲異常即可。
使用交換器和隊列
交換器和隊列是amqp中high-level層面的構(gòu)建模塊,需確保在使用他們的時候就已經(jīng)存在,在使用前需要聲明他們
聲明
//創(chuàng)建一個持久化的,非自動刪除的,綁定類型為direct的交換器
//創(chuàng)建一個非持久化,排他的,自動刪除的隊列
//使用路由鍵將隊列和交換器綁定起來
channel.exchangeDeclare(exchangeName,"direct",true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,exchangeName,routingKey);
何時創(chuàng)建
rabbitmq消息存儲在隊列中,交換器不真正耗費服務(wù)器的性能,隊列會。實際業(yè)務(wù)中,需要對所創(chuàng)建的隊列的流量、內(nèi)存占用及網(wǎng)卡占用有一個清晰的認知,預估其平均值和峰值。
預先創(chuàng)建好資源可以確保交換器和隊列之間正確綁定匹配。
發(fā)送消息
使用channel類的basicPublish方法
byte[] messageBodyBytes = “Hello,world”.getBytes();
channel.basicPublish(exchangeName,routingKey,null,messageBodyBytes);
mandatory參數(shù)可以控制發(fā)送
消費消息
消費模式分為推模式和拉模式
推模式Basic.Consume
一般通過實現(xiàn)Consumer接口或繼承DefaultConsumer類實現(xiàn)。當調(diào)用與Consumer相關(guān)的api方法時,不同的訂閱采用不同的消費者標簽來區(qū)分彼此,在同一個channel中的消費者也需要通過唯一的消費者標簽作區(qū)分。
拉模式Basic.Get
通過channel.basicGet方法可以單條獲取消息,返回值GetRespone。
消費端的確認與拒絕
消息確認
消費者訂閱隊列時,指定autoAck參數(shù)
- autoAck==false:rabbitmq會等待消費者顯式恢復確認信號才從內(nèi)存中移去消息。
- 對于rabbitmq服務(wù)端而言,隊列中消息分為兩部分
- 等待投遞給消費者的消息
- 已經(jīng)投遞給消費者但還沒有收到消費者確認信號的消息
- 如果rabbitmq一直沒有收到消費者的確認信號,并且消費此消息的消費者已經(jīng)斷開連接,則rabbitmq會安排該消息重新進入隊列,等待投遞給下一個消費者,當然也有可能還是原來的那個消費者。
- rabbitmq不會為未確認的消息設(shè)置過期時間,他判斷此消息是否需要重新投遞給消費者的唯一依據(jù)是消費該消息的消費者連接是否已經(jīng)斷開。rabbitmq允許消費者消費一條消息的時間可以很久很久
- 對于rabbitmq服務(wù)端而言,隊列中消息分為兩部分
- autoAck==true:rabbitmq會自動把發(fā)送的消息置為確認,然后從內(nèi)存中刪除,不管消費者是否真正地消費到這些消息
消息拒絕
- Basic.Reject:消費者客戶端可以調(diào)用與其對應(yīng)的channel.basicReject
- Basic.Nack:reject命令一次只能拒絕一條消息,如果想批量拒絕,使用Basic.Nack
- Basic.Reover:具備可重入隊列特性,用來請求rabbitmq重新發(fā)送還未被確認的消息。
- true:未被確認的消息會被重新加入到隊列中,這樣對于同一條消息來說,可能會被分配給與之前不同的消費者,默認為true
- false:同一條消息會被分配給與之前相同的消費者。
關(guān)閉連接
channel.close();
conn.close();
Connection和Channel生命周期
-
open():開啟狀態(tài),代表當前對象可以使用
-
closing():正在關(guān)閉狀態(tài),當前對象被顯示地通知調(diào)用關(guān)閉方法,這樣產(chǎn)生了一個關(guān)閉請求讓其內(nèi)部對象進行相應(yīng)的操作,并等待這些關(guān)閉操作完成
-
closed():已經(jīng)關(guān)閉狀態(tài),當前對象已經(jīng)接收到所有的內(nèi)部對象已完成關(guān)閉動作的通知,并且也關(guān)閉了自身。
-
getCloseReason:獲取對象關(guān)閉原因
-
當觸發(fā)shutdownListener時,可以獲取到ShutdownSignalException,包含了關(guān)閉的原因,可以通過調(diào)用getCloseReason獲取。文章來源:http://www.zghlxwxcb.cn/news/detail-677033.html
- isHardError可以知道是Connection還是Channel的錯誤
- getReason可以獲取cause相關(guān)的信息
參考:《RabbitMQ實戰(zhàn)指南》文章來源地址http://www.zghlxwxcb.cn/news/detail-677033.html
到了這里,關(guān)于rabbitmq筆記-rabbitmq客戶端開發(fā)使用的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!