訂閱發(fā)布模式(Publish-Subscribe Pattern)是一種行之有效的解耦框架與業(yè)務(wù)邏輯的方式,也是一種常見的觀察者設(shè)計(jì)模式,它被廣泛應(yīng)用于事件驅(qū)動架構(gòu)中。
在這個模式中,發(fā)布者(或者說是主題)并不直接發(fā)送消息給訂閱者,而是通過調(diào)度中心(或者叫消息代理)來傳遞消息。 發(fā)布者(或者說是主題)并不知道訂閱者的存在,而訂閱者也不知道發(fā)布者的存在。他們彼此唯一的關(guān)系就是在調(diào)度中心注冊成為訂閱者或者發(fā)布者。
【資料圖】
當(dāng)一個發(fā)布者有新消息時(shí),就將這個消息發(fā)布到調(diào)度中心。調(diào)度中心就會將這個消息通知給所有訂閱者。這就實(shí)現(xiàn)了發(fā)布者和訂閱者之間的解耦,發(fā)布者和訂閱者不再直接依賴于彼此,他們可以獨(dú)立地?cái)U(kuò)展自己。
在具體的實(shí)現(xiàn)中,可以通過消息隊(duì)列、事件總線等機(jī)制來實(shí)現(xiàn)調(diào)度中心,不同語言和平臺都有實(shí)現(xiàn)的庫和框架,例如 Java 中的 ActiveMQ、RabbitMQ、Kafka等。
訂閱發(fā)布模式有以下優(yōu)點(diǎn):
性能好,發(fā)布者發(fā)送消息后直接返回不需要等待消費(fèi)者處理完畢。解耦性較強(qiáng),發(fā)布者和訂閱者之間不存在直接依賴,滿足高內(nèi)聚低耦合的設(shè)計(jì)思想??梢灾С忠粚Χ?、多對多的消息通信模型,提供了更加靈活的消息傳遞方式??梢詣討B(tài)地增加或刪除發(fā)布者和訂閱者,擴(kuò)展性較好。二、Java實(shí)現(xiàn)發(fā)布訂閱模式創(chuàng)建訂閱者接口,用于接受消息通知。interface Subscriber { void update(String message);}
創(chuàng)建發(fā)布者,用于發(fā)布消息。實(shí)現(xiàn)了增加、刪除和發(fā)布的功能,并且維護(hù)了一個訂閱列表,class Publisher { private Map> subscribers = new HashMap<>(); public void subscribe(String topic, Subscriber subscriber) { List subscriberList = subscribers.get(topic); if (subscriberList == null) { subscriberList = new ArrayList<>(); subscribers.put(topic, subscriberList); } subscriberList.add(subscriber); } public void unsubscribe(String topic, Subscriber subscriber) { List subscriberList = subscribers.get(topic); if (subscriberList != null) { subscriberList.remove(subscriber); } } public void publish(String topic, String message) { List subscriberList = subscribers.get(topic); if (subscriberList != null) { for (Subscriber subscriber : subscriberList) { subscriber.update(message); } } }}
我們還實(shí)現(xiàn)了兩個不同的 Subscriber 實(shí)現(xiàn),一個是 EmailSubscriber,另一個是 SMSSubscriber,用于接受發(fā)布者的消息并將其分別發(fā)送到郵箱和手機(jī)上。class EmailSubscriber implements Subscriber { private String email; public EmailSubscriber(String email) { this.email = email; } public void update(String message) { System.out.println("Send email to " + email + ": " + message); }}class SMSSubscriber implements Subscriber { private String phoneNumber; public SMSSubscriber(String phoneNumber) { this.phoneNumber = phoneNumber; } public void update(String message) { System.out.println("Send SMS to " + phoneNumber + ": " + message); }}
在 Main 類中,我們創(chuàng)建了一個 Publisher 對象,并添加了兩個 EmailSubscriber 和兩個 SMSSubscriber,分別訂閱了 news 主題的更新。我們先給這個主題發(fā)送一條消息,然后取消 news 主題的其中一個訂閱者,最后我們再次給 news 主題發(fā)送一條消息。public class Main { public static void main(String[] args) { Publisher publisher = new Publisher(); Subscriber emailSubscriber1 = new EmailSubscriber("foo@example.com"); Subscriber smsSubscriber1 = new SMSSubscriber("1234567890"); publisher.subscribe("news", emailSubscriber1); publisher.subscribe("news", smsSubscriber1); publisher.publish("news", "發(fā)布新消息1"); publisher.unsubscribe("news", smsSubscriber1); publisher.publish("news", "發(fā)布新消息2"); }}
打印輸出如下:
Send email to foo@example.com: 發(fā)布新消息1Send SMS to 1234567890: 發(fā)布新消息1Send email to foo@example.com: 發(fā)布新消息2
三、Spring中自帶的訂閱發(fā)布模式Spring的訂閱發(fā)布模式是通過發(fā)布事件、事件監(jiān)聽器和事件發(fā)布器3個部分來完成的
這里我們通過 newbee-mall-pro項(xiàng)目中已經(jīng)實(shí)現(xiàn)訂閱發(fā)布模式的下單流程給大家講解,項(xiàng)目地址:https://github.com/wayn111/newbee-mall-pro
自定義訂單發(fā)布事件,繼承 ApplicationEventpublic class OrderEvent extends ApplicationEvent { void onApplicationEvent(Object event) { ... }}
定義訂單監(jiān)聽器,實(shí)現(xiàn) ApplicationListener@Componentpublic class OrderListener implements ApplicationListener { @Override public void onApplicationEvent(OrderEvent event) { // 生成訂單、刪除購物車、扣減庫存 ... }}
下單流程,通過事件發(fā)布器 applicationEventPublisher 發(fā)布訂單事件,然后再訂單監(jiān)聽器中處理訂單保存邏輯。@Resourceprivate ApplicationEventPublisher applicationEventPublisher;private void saveOrder(MallUserVO mallUserVO, Long couponUserId, List shopcatVOList, String orderNo) { // 訂單檢查 ... // 生成訂單號 String orderNo = NumberUtil.genOrderNo(); // 發(fā)布訂單事件,在事件監(jiān)聽中處理下單邏輯 applicationEventPublisher.publishEvent(new OrderEvent(orderNo, mallUserVO, couponUserId, shopcatVOList)); // 所有操作成功后,將訂單號返回 return orderNo; ...}
通過事件監(jiān)聽機(jī)制,我們將下單邏輯拆分成如下步驟:
訂單檢查生成訂單號發(fā)布訂單事件,在事件監(jiān)聽中處理訂單保存邏輯所有操作成功后,將訂單號返回每個步驟都是各自獨(dú)立不互相影響
如上的代碼已經(jīng)實(shí)現(xiàn)了訂閱發(fā)布模式,成功解耦了下單邏輯。但是在性能上還沒有得到優(yōu)化,因?yàn)?Spring Boot 項(xiàng)目中,默認(rèn)情況下事件監(jiān)聽器是同步處理的,也就是說這里下單流程會等待事件監(jiān)聽器處理完畢才返回,最終影響接口響應(yīng)時(shí)長。
四、使用異步的事件監(jiān)聽發(fā)布類Spring Boot 項(xiàng)目中事件監(jiān)聽發(fā)布類是由 SimpleApplicationEventMulticaster
這個類實(shí)現(xiàn)的,源碼中通知訂閱者代碼如下:可以看到,代碼里是有判斷 getTaskExecutor()
方法返回不為空的話,就交由 executor 執(zhí)行,負(fù)責(zé)同步執(zhí)行。這個時(shí)候大家就要問了,這里不是有線程池在異步通知訂閱者嗎?
不急,博主帶大家繼續(xù)查看源碼??梢钥吹?getTaskExecutor()
方法返回一個成員屬性,這個成員屬性在 SimpleApplicationEventMulticaster
類中是通過 setTaskExecutor(@Nullable Executor taskExecutor)
方法設(shè)置的。我們通過 ctrl + f7
查一下 setTaskExecutor(...)
方法在哪里被調(diào)用過,Ok,到此水落石出,SimpleApplicationEventMulticaster
類的 taskExecutor 成員屬性一直為 null,所以在通過訂閱者的時(shí)候一直是同步處理,等待訂閱者處理完畢。
對于異步處理,我們可以從2個方面入手:
事件監(jiān)聽器入手,將事件監(jiān)聽器的事件觸發(fā)方法改為異步執(zhí)行,例如將生成訂單、刪除購物車、扣減庫存邏輯放入線程池異步執(zhí)行,或者是在訂閱者的通知方法onApplicationEvent
上加上@Async
注解,表示該方法異步執(zhí)行。事件監(jiān)聽發(fā)布類入手,設(shè)置默認(rèn)事件監(jiān)聽發(fā)布類的taskExecutor
屬性,通過源碼可知,也可以解決。這里博主給大家介紹下怎么修改事件監(jiān)聽發(fā)布類來解決。
/** * 系統(tǒng)啟動時(shí)執(zhí)行 */@Componentpublic class SpringBeanStartupRunner implements ApplicationRunner { @Override public void run(ApplicationArguments args) throws Exception { // 設(shè)置spring默認(rèn)的事件監(jiān)聽為異步執(zhí)行 SimpleApplicationEventMulticaster multicaster = SpringContextUtil.getBean(SimpleApplicationEventMulticaster.class); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 5, 10, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(500), new CustomizableThreadFactory("newbee—event-task"), new ThreadPoolExecutor.CallerRunsPolicy() ); multicaster.setTaskExecutor(threadPoolExecutor); }}
在系統(tǒng)啟動時(shí)反射修改SimpleApplicationEventMulticaster
類的taskExecutor
屬性,從而讓SimpleApplicationEventMulticaster
類支持異步事件通知。
建議大家在日常開發(fā)中多加思考哪些業(yè)務(wù)流程可以適用,例如微服務(wù)項(xiàng)目中訂單支付成功后需要通知用戶、商品、活動等多個服務(wù)時(shí),可以考慮使用訂閱發(fā)布模式。解耦發(fā)布者和訂閱者,發(fā)布者只管發(fā)布消息,不需要知道有哪些訂閱者,也不需要知道訂閱者的具體實(shí)現(xiàn)。訂閱者只需要關(guān)注自己感興趣的消息即可。這種松耦合的設(shè)計(jì)使得系統(tǒng)更容易擴(kuò)展和維護(hù)。
關(guān)注公眾號【waynblog】每周分享技術(shù)干貨、開源項(xiàng)目、實(shí)戰(zhàn)經(jīng)驗(yàn)、高效開發(fā)工具等,您的關(guān)注將是我的更新動力!
標(biāo)簽: