一、簡介
訂閱發(fā)布模式(Publish-Subscribe Pattern)是一種行之有效的解耦框架與業(yè)務(wù)邏輯的方式,也是一種常見的觀察者設(shè)計模式,它被廣泛應(yīng)用于事件驅(qū)動架構(gòu)中。
在這個模式中,發(fā)布者(或者說是主題)并不直接發(fā)送消息給訂閱者,而是通過調(diào)度中心(或者叫消息代理)來傳遞消息。 發(fā)布者(或者說是主題)并不知道訂閱者的存在,而訂閱者也不知道發(fā)布者的存在。他們彼此唯一的關(guān)系就是在調(diào)度中心注冊成為訂閱者或者發(fā)布者。
當(dāng)一個發(fā)布者有新消息時,就將這個消息發(fā)布到調(diào)度中心。調(diào)度中心就會將這個消息通知給所有訂閱者。這就實現(xiàn)了發(fā)布者和訂閱者之間的解耦,發(fā)布者和訂閱者不再直接依賴于彼此,他們可以獨立地擴展自己。
在具體的實現(xiàn)中,可以通過消息隊列、事件總線等機制來實現(xiàn)調(diào)度中心,不同語言和平臺都有實現(xiàn)的庫和框架,例如 Java 中的 ActiveMQ、RabbitMQ、Kafka等。
訂閱發(fā)布模式有以下優(yōu)點:
- 性能好,發(fā)布者發(fā)送消息后直接返回不需要等待消費者處理完畢。
- 解耦性較強,發(fā)布者和訂閱者之間不存在直接依賴,滿足高內(nèi)聚低耦合的設(shè)計思想。
- 可以支持一對多、多對多的消息通信模型,提供了更加靈活的消息傳遞方式。
- 可以動態(tài)地增加或刪除發(fā)布者和訂閱者,擴展性較好。
二、Java實現(xiàn)發(fā)布訂閱模式
- 創(chuàng)建訂閱者接口,用于接受消息通知。
interface Subscriber {
void update(String message);
}
- 創(chuàng)建發(fā)布者,用于發(fā)布消息。實現(xiàn)了增加、刪除和發(fā)布的功能,并且維護了一個訂閱列表,
class Publisher {
private Map<String, List<Subscriber>> subscribers = new HashMap<>();
public void subscribe(String topic, Subscriber subscriber) {
List<Subscriber> subscriberList = subscribers.get(topic);
if (subscriberList == null) {
subscriberList = new ArrayList<>();
subscribers.put(topic, subscriberList);
}
subscriberList.add(subscriber);
}
public void unsubscribe(String topic, Subscriber subscriber) {
List<Subscriber> subscriberList = subscribers.get(topic);
if (subscriberList != null) {
subscriberList.remove(subscriber);
}
}
public void publish(String topic, String message) {
List<Subscriber> subscriberList = subscribers.get(topic);
if (subscriberList != null) {
for (Subscriber subscriber : subscriberList) {
subscriber.update(message);
}
}
}
}
- 我們還實現(xiàn)了兩個不同的 Subscriber 實現(xiàn),一個是 EmailSubscriber,另一個是 SMSSubscriber,用于接受發(fā)布者的消息并將其分別發(fā)送到郵箱和手機上。
class EmailSubscriber implements Subscriber {
private String email;
public EmailSubscriber(String email) {
this.email = email;
}
public void update(String message) {
System.out.println("Send email to " + email + ": " + message);
}
}
class SMSSubscriber implements Subscriber {
private String phoneNumber;
public SMSSubscriber(String phoneNumber) {
this.phoneNumber = phoneNumber;
}
public void update(String message) {
System.out.println("Send SMS to " + phoneNumber + ": " + message);
}
}
- 在 Main 類中,我們創(chuàng)建了一個 Publisher 對象,并添加了兩個 EmailSubscriber 和兩個 SMSSubscriber,分別訂閱了 news 主題的更新。我們先給這個主題發(fā)送一條消息,然后取消 news 主題的其中一個訂閱者,最后我們再次給 news 主題發(fā)送一條消息。
public class Main {
public static void main(String[] args) {
Publisher publisher = new Publisher();
Subscriber emailSubscriber1 = new EmailSubscriber("foo@example.com");
Subscriber smsSubscriber1 = new SMSSubscriber("1234567890");
publisher.subscribe("news", emailSubscriber1);
publisher.subscribe("news", smsSubscriber1);
publisher.publish("news", "發(fā)布新消息1");
publisher.unsubscribe("news", smsSubscriber1);
publisher.publish("news", "發(fā)布新消息2");
}
}
打印輸出如下:
Send email to foo@example.com: 發(fā)布新消息1
Send SMS to 1234567890: 發(fā)布新消息1
Send email to foo@example.com: 發(fā)布新消息2
三、Spring中自帶的訂閱發(fā)布模式
Spring的訂閱發(fā)布模式是通過發(fā)布事件、事件監(jiān)聽器和事件發(fā)布器3個部分來完成的
這里我們通過 newbee-mall-pro 項目中已經(jīng)實現(xiàn)訂閱發(fā)布模式的下單流程給大家講解,項目地址:https://github.com/wayn111/newbee-mall-pro
- 自定義訂單發(fā)布事件,繼承 ApplicationEvent
public class OrderEvent extends ApplicationEvent {
void onApplicationEvent(Object event) {
...
}
}
- 定義訂單監(jiān)聽器,實現(xiàn) ApplicationListener
@Component
public class OrderListener implements ApplicationListener<OrderEvent> {
@Override
public void onApplicationEvent(OrderEvent event) {
// 生成訂單、刪除購物車、扣減庫存
...
}
}
- 下單流程,通過事件發(fā)布器 applicationEventPublisher 發(fā)布訂單事件,然后再訂單監(jiān)聽器中處理訂單保存邏輯。
@Resource
private ApplicationEventPublisher applicationEventPublisher;
private void saveOrder(MallUserVO mallUserVO, Long couponUserId, List<ShopCatVO> shopcatVOList, String orderNo) {
// 訂單檢查
...
// 生成訂單號
String orderNo = NumberUtil.genOrderNo();
// 發(fā)布訂單事件,在事件監(jiān)聽中處理下單邏輯
applicationEventPublisher.publishEvent(new OrderEvent(orderNo, mallUserVO, couponUserId, shopcatVOList));
// 所有操作成功后,將訂單號返回
return orderNo;
...
}
通過事件監(jiān)聽機制,我們將下單邏輯拆分成如下步驟:
- 訂單檢查
- 生成訂單號
- 發(fā)布訂單事件,在事件監(jiān)聽中處理訂單保存邏輯
- 所有操作成功后,將訂單號返回
每個步驟都是各自獨立不互相影響
如上的代碼已經(jīng)實現(xiàn)了訂閱發(fā)布模式,成功解耦了下單邏輯。但是在性能上還沒有得到優(yōu)化,因為 Spring Boot 項目中,默認(rèn)情況下事件監(jiān)聽器是同步處理的,也就是說這里下單流程會等待事件監(jiān)聽器處理完畢才返回,最終影響接口響應(yīng)時長。
四、使用異步的事件監(jiān)聽發(fā)布類
Spring Boot 項目中事件監(jiān)聽發(fā)布類是由 SimpleApplicationEventMulticaster
這個類實現(xiàn)的,源碼中通知訂閱者代碼如下:
可以看到,代碼里是有判斷 getTaskExecutor()
方法返回不為空的話,就交由 executor 執(zhí)行,負(fù)責(zé)同步執(zhí)行。這個時候大家就要問了,這里不是有線程池在異步通知訂閱者嗎?
不急,博主帶大家繼續(xù)查看源碼。
可以看到 getTaskExecutor()
方法返回一個成員屬性,這個成員屬性在 SimpleApplicationEventMulticaster
類中是通過 setTaskExecutor(@Nullable Executor taskExecutor)
方法設(shè)置的。我們通過 ctrl + f7
查一下 setTaskExecutor(...)
方法在哪里被調(diào)用過,
Ok,到此水落石出,SimpleApplicationEventMulticaster
類的 taskExecutor 成員屬性一直為 null,所以在通過訂閱者的時候一直是同步處理,等待訂閱者處理完畢。
對于異步處理,我們可以從2個方面入手:
- 事件監(jiān)聽器入手,將事件監(jiān)聽器的事件觸發(fā)方法改為異步執(zhí)行,例如將生成訂單、刪除購物車、扣減庫存邏輯放入線程池異步執(zhí)行,或者是在訂閱者的通知方法
onApplicationEvent
上加上@Async
注解,表示該方法異步執(zhí)行。 - 事件監(jiān)聽發(fā)布類入手,設(shè)置默認(rèn)事件監(jiān)聽發(fā)布類的
taskExecutor
屬性,通過源碼可知,也可以解決。
這里博主給大家介紹下怎么修改事件監(jiān)聽發(fā)布類來解決。
/**
* 系統(tǒng)啟動時執(zhí)行
*/
@Component
public class SpringBeanStartupRunner implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
// 設(shè)置spring默認(rèn)的事件監(jiān)聽為異步執(zhí)行
SimpleApplicationEventMulticaster multicaster = SpringContextUtil.getBean(SimpleApplicationEventMulticaster.class);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
5,
10,
60L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(500),
new CustomizableThreadFactory("newbee—event-task"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
multicaster.setTaskExecutor(threadPoolExecutor);
}
}
在系統(tǒng)啟動時反射修改SimpleApplicationEventMulticaster
類的taskExecutor
屬性,從而讓SimpleApplicationEventMulticaster
類支持異步事件通知。
總結(jié)
建議大家在日常開發(fā)中多加思考哪些業(yè)務(wù)流程可以適用,例如微服務(wù)項目中訂單支付成功后需要通知用戶、商品、活動等多個服務(wù)時,可以考慮使用訂閱發(fā)布模式。解耦發(fā)布者和訂閱者,發(fā)布者只管發(fā)布消息,不需要知道有哪些訂閱者,也不需要知道訂閱者的具體實現(xiàn)。訂閱者只需要關(guān)注自己感興趣的消息即可。這種松耦合的設(shè)計使得系統(tǒng)更容易擴展和維護。文章來源:http://www.zghlxwxcb.cn/news/detail-460386.html
關(guān)注公眾號【waynblog】每周分享技術(shù)干貨、開源項目、實戰(zhàn)經(jīng)驗、高效開發(fā)工具等,您的關(guān)注將是我的更新動力!文章來源地址http://www.zghlxwxcb.cn/news/detail-460386.html
到了這里,關(guān)于設(shè)計模式之訂閱發(fā)布模式的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!