java9新特性Reactive?Stream響應式編程?API
我計劃在后續(xù)的一段時間內(nèi),寫一系列關于java 9的文章,雖然java 9 不像Java 8或者Java 11那樣的核心java版本,但是還是有很多的特性值得關注。期待您能關注我,我將把java 9 寫成一系列的文章,大概十篇左右。
java9第二篇-Java9改進try-with-resources語法
java9第三篇-支持多JDK版本下運行的Jar文件打包方式
Java 9的 Reactive Streams是對異步流式編程的一種實現(xiàn)。它基于異步發(fā)布和訂閱模型,具有非阻塞“背壓”數(shù)據(jù)處理的特點。
Non-blocking Back Pressure(非阻塞背壓):它是一種機制,讓發(fā)布訂閱模型中的訂閱者避免接收大量數(shù)據(jù)(超出其處理能力),訂閱者可以異步通知發(fā)布者降低或提升數(shù)據(jù)生產(chǎn)發(fā)布的速率。它是響應式編程實現(xiàn)效果的核心特點!
一、Java9 Reactive Stream API
Java 9提供了一組定義響應式流編程的接口。所有這些接口都作為靜態(tài)內(nèi)部接口定義在java.util.concurrent.Flow
類里面。
下面是Java 響應式編程中的一些重要角色和概念,先簡單理解一下
發(fā)布者(Publisher)是潛在的無限數(shù)量的有序數(shù)據(jù)元素的生產(chǎn)者。 它根據(jù)收到的需求(subscription)向當前訂閱者發(fā)布一定數(shù)量的數(shù)據(jù)元素。
訂閱者(Subscriber)從發(fā)布者那里訂閱并接收數(shù)據(jù)元素。與發(fā)布者建立訂閱關系后,發(fā)布者向訂閱者發(fā)送訂閱令牌(subscription),訂閱者可以根據(jù)自己的處理能力請求發(fā)布者發(fā)布數(shù)據(jù)元素的數(shù)量。
訂閱令牌(subscription)表示訂閱者與發(fā)布者之間建立的訂閱關系。 當建立訂閱關系后,發(fā)布者將其傳遞給訂閱者。 訂閱者使用訂閱令牌與發(fā)布者進行交互,例如請求數(shù)據(jù)元素的數(shù)量或取消訂閱。
二、Java響應式編程四大接口
2.1.Subscriber Interface(訂閱者訂閱接口)
public static interface Subscriber<T> { public void onSubscribe(Subscription subscription); public void onNext(T item); public void onError(Throwable throwable); public void onComplete(); }
onSubscribe
:在發(fā)布者接受訂閱者的訂閱動作之后,發(fā)布任何的訂閱消息之前被調(diào)用。新創(chuàng)建的Subscription
訂閱令牌對象通過此方法傳遞給訂閱者。
onNext
:下一個待處理的數(shù)據(jù)項的處理函數(shù)
onError
:在發(fā)布者或訂閱遇到不可恢復的錯誤時調(diào)用
onComplete
:當沒有訂閱者調(diào)用(包括onNext()方法)發(fā)生時調(diào)用。
2.2.Subscription Interface (訂閱令牌接口)
訂閱令牌對象通過Subscriber.onSubscribe()
方法傳遞
public static interface Subscription { public void request(long n); public void cancel();}
request(long n)
是無阻塞背壓概念背后的關鍵方法。訂閱者使用它來請求n個以上的消費項目。這樣,訂閱者控制了它當前能夠接收多少個數(shù)據(jù)。cancel()
由訂閱者主動來取消其訂閱,取消后將不會在接收到任何數(shù)據(jù)消息。
2.3.Publisher Interface(發(fā)布者接口)
@FunctionalInterface public static interface Publisher<T> { public void subscribe(Subscriber<? super T> subscriber); }
調(diào)用該方法,建立訂閱者Subscriber與發(fā)布者Publisher之間的消息訂閱關系。
2.4.Processor Interface(處理器接口)
處理者Processor 可以同時充當訂閱者和發(fā)布者,起到轉(zhuǎn)換發(fā)布者——訂閱者管道中的元素的作用。用于將發(fā)布者T類型的數(shù)據(jù)元素,接收并轉(zhuǎn)換為類型R的數(shù)據(jù)并發(fā)布。
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> { }
二、實戰(zhàn)案例
現(xiàn)在我們要去實現(xiàn)上面的四個接口來完成響應式編程
Subscription Interface
訂閱令牌接口通常不需要我們自己編程去實現(xiàn),我們只需要在知道request()方法和cancle()方法含義即可。
Publisher Interface
發(fā)布者接口,Java 9 已經(jīng)默認為我們提供了實現(xiàn)SubmissionPublisher,該實現(xiàn)類除了實現(xiàn)Publisher接口的方法外,提供了一個方法叫做submit()來完成消息數(shù)據(jù)的發(fā)送。
Subscriber Interface
訂閱者接口,通常需要我們自己去實現(xiàn)。因為在數(shù)據(jù)訂閱接收之后,不同的業(yè)務有不同的處理邏輯。
Processor
實際上是 Publisher Interface和Subscriber Interface的集合體,有需要數(shù)據(jù)類型轉(zhuǎn)換及數(shù)據(jù)處理的需求才去實現(xiàn)這個接口
下面的例子實現(xiàn)的式字符串的數(shù)據(jù)消息訂閱處理
實現(xiàn)訂閱者Subscriber Interface
import java.util.concurrent.Flow; public class MySubscriber implements Flow.Subscriber<String> { private Flow.Subscription subscription; //訂閱令牌 @Override public void onSubscribe(Flow.Subscription subscription) { System.out.println("訂閱關系建立onSubscribe: " + subscription); this.subscription = subscription; subscription.request(2); } @Override public void onNext(String item) { System.out.println("item: " + item); // 一個消息處理完成之后,可以繼續(xù)調(diào)用subscription.request(n);向發(fā)布者要求數(shù)據(jù)發(fā)送 //subscription.request(n); } @Override public void onError(Throwable throwable) { System.out.println("onError: " + throwable); } @Override public void onComplete() { System.out.println("onComplete"); } }
SubmissionPublisher消息發(fā)布者
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; public class SubmissionPublisherExample { public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(1); SubmissionPublisher<String> sb = new SubmissionPublisher<>(executor, Flow.defaultBufferSize()); sb.subscribe(new MySubscriber()); //建立訂閱關系,可以有多個訂閱者 sb.submit("數(shù)據(jù) 1"); //發(fā)送消息1 sb.submit("數(shù)據(jù) 2"); //發(fā)送消息2 sb.submit("數(shù)據(jù) 3"); //發(fā)送消息3 executor.shutdown(); } }
控制臺打印輸出結果
訂閱關系建立
onSubscribe: java.util.concurrent.SubmissionPublisher$BufferedSubscription@27e81a39
item: 數(shù)據(jù) 1
item: 數(shù)據(jù) 2
請注意:即使發(fā)布者submit了3條數(shù)據(jù),MySubscriber也僅收到了2條數(shù)據(jù)進行了處理。是因為我們在MySubscriber#onSubscribe()
方法中使用了subscription.request(2);
。這就是“背壓”的響應式編程效果,我有能力處理多少數(shù)據(jù),就會通知消息發(fā)布者給多少數(shù)據(jù)。
以上就是java9新特性Reactive Stream響應式編程 API的詳細內(nèi)容,更多關于java9 Reactive Stream響應式API的資料請關注腳本之家其它相關文章!
相關文章
maven插件maven-assembly-plugin打包歸納文件zip/tar使用
java項目運行的文件需要jar或者war格式,同時還需要使用Java命令,本文主要介紹了maven插件maven-assembly-plugin打包歸納文件zip/tar使用,具有一定的參考價值,感興趣的可以了解一下2024-02-02