一、關(guān)于 異步驅(qū)動
從3.0 版本開始,MongoDB 開始提供異步方式的驅(qū)動(Java Async Driver),這為應(yīng)用提供了一種更高性能的選擇。
但實質(zhì)上,使用同步驅(qū)動(Java Sync Driver)的項目也不在少數(shù),或許是因為先入為主的原因(同步Driver的文檔說明更加的完善),又或者是為了兼容舊的 MongoDB 版本。
無論如何,由于 Reactive 的發(fā)展,未來使用異步驅(qū)動應(yīng)該是一個趨勢。
在使用 Async Driver 之前,需要對 Reactive 的概念有一些熟悉。
二、理解 Reactive (響應(yīng)式)
響應(yīng)式(Reactive)是一種異步的、面向數(shù)據(jù)流的開發(fā)方式,最早是來自于.NET 平臺上的 Reactive Extensions 庫,隨后被擴(kuò)展為各種編程語言的實現(xiàn)。
在著名的 Reactive Manifesto(響應(yīng)式宣言) 中,對 Reactive 定義了四個特征:
及時響應(yīng)(Responsive):系統(tǒng)能及時的響應(yīng)請求。 有韌性(Resilient):系統(tǒng)在出現(xiàn)異常時仍然可以響應(yīng),即支持容錯。 有彈性(Elastic):在不同的負(fù)載下,系統(tǒng)可彈性伸縮來保證運(yùn)行。 消息驅(qū)動(Message Driven):不同組件之間使用異步消息傳遞來進(jìn)行交互,并確保松耦合及相互隔離。
在響應(yīng)式宣言的所定義的這些系統(tǒng)特征中,無一不與響應(yīng)式的流有若干的關(guān)系,于是乎就有了 2013年發(fā)起的 響應(yīng)式流規(guī)范(Reactive Stream Specification)。
https://www.reactive-streams.org/
其中,對于響應(yīng)式流的處理環(huán)節(jié)又做了如下定義:
具有處理無限數(shù)量的元素的能力,即允許流永不結(jié)束 按序處理 異步地傳遞元素 實現(xiàn)非阻塞的負(fù)壓(back-pressure)
Java 平臺則是在 JDK 9 版本上發(fā)布了對 Reactive Streams 的支持。
下面介紹響應(yīng)式流的幾個關(guān)鍵接口:
Publisher
Publisher 是數(shù)據(jù)的發(fā)布者。Publisher 接口只有一個方法 subscribe,用于添加數(shù)據(jù)的訂閱者,也就是 Subscriber。
Subscriber
Subscriber 是數(shù)據(jù)的訂閱者。Subscriber 接口有4個方法,都是作為不同事件的處理器。在訂閱者成功訂閱到發(fā)布者之后,其 onSubscribe(Subscription s) 方法會被調(diào)用。
Subscription 表示的是當(dāng)前的訂閱關(guān)系。
當(dāng)訂閱成功后,可以使用 Subscription 的 request(long n) 方法來請求發(fā)布者發(fā)布 n 條數(shù)據(jù)。發(fā)布者可能產(chǎn)生3種不同的消息通知,分別對應(yīng) Subscriber 的另外3個回調(diào)方法。
數(shù)據(jù)通知:對應(yīng) onNext 方法,表示發(fā)布者產(chǎn)生的數(shù)據(jù)。
錯誤通知:對應(yīng) onError 方法,表示發(fā)布者產(chǎn)生了錯誤。
結(jié)束通知:對應(yīng) onComplete 方法,表示發(fā)布者已經(jīng)完成了所有數(shù)據(jù)的發(fā)布。
在上述3種通知中,錯誤通知和結(jié)束通知都是終結(jié)通知,也就是在終結(jié)通知之后,不會再有其他通知產(chǎn)生。
Subscription
Subscription 表示的是一個訂閱關(guān)系。除了之前提到的 request 方法之外,還有 cancel 方法用來取消訂閱。需要注意的是,在 cancel 方法調(diào)用之后,發(fā)布者仍然有可能繼續(xù)發(fā)布通知。但訂閱最終會被取消。
這幾個接口的關(guān)系如下圖所示:
圖片出處:http://wiki.jikexueyuan.com/index.php/project/reactor-2.0/05.html
MongoDB 的異步驅(qū)動為 mongo-java-driver-reactivestreams 組件,其實現(xiàn)了 Reactive Stream 的上述接口。
> 除了 reactivestream 之外,MongoDB 的異步驅(qū)動還包含 RxJava 等風(fēng)格的版本,有興趣的讀者可以進(jìn)一步了解
http://mongodb.github.io/mongo-java-driver-reactivestreams/1.11/getting-started/quick-tour-primer/
三、使用示例
接下來,通過一個簡單的例子來演示一下 Reactive 方式的代碼風(fēng)格:
A. 引入依賴
org.mongodbmongodb-driver-reactivestreams1.11.0
> 引入mongodb-driver-reactivestreams 將會自動添加 reactive-streams, bson, mongodb-driver-async組件
B. 連接數(shù)據(jù)庫
//服務(wù)器實例表Listservers=newArrayList();servers.add(newServerAddress(localhost,27018));//配置構(gòu)建器MongoClientSettings.BuildersettingsBuilder=MongoClientSettings.builder();//傳入服務(wù)器實例settingsBuilder.applyToClusterSettings(builder->builder.hosts(servers));//構(gòu)建Client實例MongoClientmongoClient=MongoClients.create(settingsBuilder.build());
C. 實現(xiàn)文檔查詢
//獲得數(shù)據(jù)庫對象MongoDatabasedatabase=client.getDatabase(databaseName);//獲得集合MongoCollectioncollection=database.getCollection(collectionName);//異步返回PublisherFindPublisherpublisher=collection.find();//訂閱實現(xiàn)publisher.subscribe(newSubscriber(){@OverridepublicvoidonSubscribe(Subscriptions){System.out.println(start...);//執(zhí)行請求s.request(Integer.MAX_VALUE);}@OverridepublicvoidonNext(Documentdocument){//獲得文檔System.out.println(Document:+document.toJson());}@OverridepublicvoidonError(Throwablet){System.out.println(erroroccurs.);}@OverridepublicvoidonComplete(){System.out.println(finished.);}});
注意到,與使用同步驅(qū)動不同的是,collection.find()方法返回的不是 Cursor,而是一個 FindPublisher對象,這是Publisher接口的一層擴(kuò)展。
而且,在返回 Publisher 對象時,此時并沒有產(chǎn)生真正的數(shù)據(jù)庫IO請求。真正發(fā)起請求需要通過調(diào)用 Subscription.request()方法。
在上面的代碼中,為了讀取由 Publisher 產(chǎn)生的結(jié)果,通過自定義一個Subscriber,在onSubscribe 事件觸發(fā)時就執(zhí)行 數(shù)據(jù)庫的請求,之后分別對 onNext、onError、onComplete進(jìn)行處理。
盡管這種實現(xiàn)方式是純異步的,但在使用上比較繁瑣。試想如果對于每個數(shù)據(jù)庫操作都要完成一個Subscriber 邏輯,那么開發(fā)的工作量是巨大的。
為了盡可能復(fù)用重復(fù)的邏輯,可以對Subscriber的邏輯做一層封裝,包含如下功能:
使用 List 容器對請求結(jié)果進(jìn)行緩存 實現(xiàn)阻塞等待結(jié)果的方法,可指定超時時間 捕獲異常,在等待結(jié)果時拋出
代碼如下:
publicclassObservableSubscriberimplementsSubscriber{//響應(yīng)數(shù)據(jù)privatefinalListreceived;//錯誤信息privatefinalListerrors;//等待對象privatefinalCountDownLatchlatch;//訂閱器privatevolatileSubscriptionsubscription;//是否完成privatevolatilebooleancompleted;publicObservableSubscriber(){this.received=newArrayList();this.errors=newArrayList();this.latch=newCountDownLatch(1);}@OverridepublicvoidonSubscribe(finalSubscriptions){subscription=s;}@OverridepublicvoidonNext(finalTt){received.add(t);}@OverridepublicvoidonError(finalThrowablet){errors.add(t);onComplete();}@OverridepublicvoidonComplete(){completed=true;latch.countDown();}publicSubscriptiongetSubscription(){returnsubscription;}publicListgetReceived(){returnreceived;}publicThrowablegetError(){if(errors.size()>0){returnerrors.get(0);}returnnull;}publicbooleanisCompleted(){returncompleted;}/***阻塞一定時間等待結(jié)果**@paramtimeout*@paramunit*@return*@throwsThrowable*/publicListget(finallongtimeout,finalTimeUnitunit)throwsThrowable{returnawait(timeout,unit).getReceived();}/***一直阻塞等待請求完成**@return*@throwsThrowable*/publicObservableSubscriberawait()throwsThrowable{returnawait(Long.MAX_VALUE,TimeUnit.MILLISECONDS);}/***阻塞一定時間等待完成**@paramtimeout*@paramunit*@return*@throwsThrowable*/publicObservableSubscriberawait(finallongtimeout,finalTimeUnitunit)throwsThrowable{subscription.request(Integer.MAX_VALUE);if(!latch.await(timeout,unit)){thrownewMongoTimeoutException(PublisheronCompletetimedout);}if(!errors.isEmpty()){throwerrors.get(0);}returnthis;}}
借助這個基礎(chǔ)的工具類,我們對于文檔的異步操作就變得簡單多了。
比如對于文檔查詢的操作可以改造如下:
ObservableSubscribersubscriber=newObservableSubscriber();collection.find().subscribe(subscriber);//結(jié)果處理subscriber.get(15,TimeUnit.SECONDS).forEach(d->{System.out.println(Document:+d.toJson());});
當(dāng)然,這個例子還有可以繼續(xù)完善,比如使用 List 作為緩存,則要考慮數(shù)據(jù)量的問題,避免將全部(或超量) 的文檔一次性轉(zhuǎn)入內(nèi)存。
作者:唐卓章
華為技術(shù)專家,多年互聯(lián)網(wǎng)研發(fā)/架設(shè)經(jīng)驗,關(guān)注NOSQL 中間件高可用及彈性擴(kuò)展,在分布式系統(tǒng)架構(gòu)性能優(yōu)化方面有豐富的實踐經(jīng)驗,目前從事物聯(lián)網(wǎng)平臺研發(fā)工作,致力于打造大容量高可用的物聯(lián)網(wǎng)服務(wù)。
分享名稱:Reactive-MongoDB異步JavaDriver解讀
URL標(biāo)題:http://www.rwnh.cn/article26/cpeocg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供用戶體驗、網(wǎng)頁設(shè)計公司、定制網(wǎng)站、自適應(yīng)網(wǎng)站、網(wǎng)站改版、軟件開發(fā)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)