欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

java9新特性Reactive?Stream響應(yīng)式編程?API

 更新時(shí)間:2022年03月15日 14:49:56   作者:字母哥哥  
這篇文章主要為大家介紹了java9新特性響應(yīng)式編程API的特點(diǎn)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

file

我計(jì)劃在后續(xù)的一段時(shí)間內(nèi),寫(xiě)一系列關(guān)于java 9的文章,雖然java 9 不像Java 8或者Java 11那樣的核心java版本,但是還是有很多的特性值得關(guān)注。期待您能關(guān)注我,我將把java 9 寫(xiě)成一系列的文章,大概十篇左右。

java9第一篇-可以在interface中定義私有方法了

java9第二篇-Java9改進(jìn)try-with-resources語(yǔ)法

java9第三篇-支持多JDK版本下運(yùn)行的Jar文件打包方式

Java 9的 Reactive Streams是對(duì)異步流式編程的一種實(shí)現(xiàn)。它基于異步發(fā)布和訂閱模型,具有非阻塞“背壓”數(shù)據(jù)處理的特點(diǎn)。

Non-blocking Back Pressure(非阻塞背壓):它是一種機(jī)制,讓發(fā)布訂閱模型中的訂閱者避免接收大量數(shù)據(jù)(超出其處理能力),訂閱者可以異步通知發(fā)布者降低或提升數(shù)據(jù)生產(chǎn)發(fā)布的速率。它是響應(yīng)式編程實(shí)現(xiàn)效果的核心特點(diǎn)!

一、Java9 Reactive Stream API

Java 9提供了一組定義響應(yīng)式流編程的接口。所有這些接口都作為靜態(tài)內(nèi)部接口定義在java.util.concurrent.Flow類里面。

file

下面是Java 響應(yīng)式編程中的一些重要角色和概念,先簡(jiǎn)單理解一下

發(fā)布者(Publisher)是潛在的無(wú)限數(shù)量的有序數(shù)據(jù)元素的生產(chǎn)者。 它根據(jù)收到的需求(subscription)向當(dāng)前訂閱者發(fā)布一定數(shù)量的數(shù)據(jù)元素。

訂閱者(Subscriber)從發(fā)布者那里訂閱并接收數(shù)據(jù)元素。與發(fā)布者建立訂閱關(guān)系后,發(fā)布者向訂閱者發(fā)送訂閱令牌(subscription),訂閱者可以根據(jù)自己的處理能力請(qǐng)求發(fā)布者發(fā)布數(shù)據(jù)元素的數(shù)量。

訂閱令牌(subscription)表示訂閱者與發(fā)布者之間建立的訂閱關(guān)系。 當(dāng)建立訂閱關(guān)系后,發(fā)布者將其傳遞給訂閱者。 訂閱者使用訂閱令牌與發(fā)布者進(jìn)行交互,例如請(qǐng)求數(shù)據(jù)元素的數(shù)量或取消訂閱。

二、Java響應(yīng)式編程四大接口

2.1.Subscriber Interface(訂閱者訂閱接口)

public static interface Subscriber<T> {
    public void onSubscribe(Subscription subscription);
    public void onNext(T item);
    public void onError(Throwable throwable);
    public void onComplete();
}

onSubscribe:在發(fā)布者接受訂閱者的訂閱動(dòng)作之后,發(fā)布任何的訂閱消息之前被調(diào)用。新創(chuàng)建的Subscription訂閱令牌對(duì)象通過(guò)此方法傳遞給訂閱者。

onNext:下一個(gè)待處理的數(shù)據(jù)項(xiàng)的處理函數(shù)

onError:在發(fā)布者或訂閱遇到不可恢復(fù)的錯(cuò)誤時(shí)調(diào)用

onComplete:當(dāng)沒(méi)有訂閱者調(diào)用(包括onNext()方法)發(fā)生時(shí)調(diào)用。

2.2.Subscription Interface (訂閱令牌接口)

訂閱令牌對(duì)象通過(guò)Subscriber.onSubscribe()方法傳遞

public static interface Subscription {    public void request(long n);    public void cancel();}

request(long n)是無(wú)阻塞背壓概念背后的關(guān)鍵方法。訂閱者使用它來(lái)請(qǐng)求n個(gè)以上的消費(fèi)項(xiàng)目。這樣,訂閱者控制了它當(dāng)前能夠接收多少個(gè)數(shù)據(jù)。cancel()由訂閱者主動(dòng)來(lái)取消其訂閱,取消后將不會(huì)在接收到任何數(shù)據(jù)消息。

2.3.Publisher Interface(發(fā)布者接口)

@FunctionalInterface
public static interface Publisher<T> {
    public void subscribe(Subscriber<? super T> subscriber);
}

調(diào)用該方法,建立訂閱者Subscriber與發(fā)布者Publisher之間的消息訂閱關(guān)系。

2.4.Processor Interface(處理器接口)

處理者Processor 可以同時(shí)充當(dāng)訂閱者和發(fā)布者,起到轉(zhuǎn)換發(fā)布者——訂閱者管道中的元素的作用。用于將發(fā)布者T類型的數(shù)據(jù)元素,接收并轉(zhuǎn)換為類型R的數(shù)據(jù)并發(fā)布。

public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

二、實(shí)戰(zhàn)案例

現(xiàn)在我們要去實(shí)現(xiàn)上面的四個(gè)接口來(lái)完成響應(yīng)式編程

Subscription Interface訂閱令牌接口通常不需要我們自己編程去實(shí)現(xiàn),我們只需要在知道request()方法和cancle()方法含義即可。

Publisher Interface發(fā)布者接口,Java 9 已經(jīng)默認(rèn)為我們提供了實(shí)現(xiàn)SubmissionPublisher,該實(shí)現(xiàn)類除了實(shí)現(xiàn)Publisher接口的方法外,提供了一個(gè)方法叫做submit()來(lái)完成消息數(shù)據(jù)的發(fā)送。

Subscriber Interface訂閱者接口,通常需要我們自己去實(shí)現(xiàn)。因?yàn)樵跀?shù)據(jù)訂閱接收之后,不同的業(yè)務(wù)有不同的處理邏輯。

Processor實(shí)際上是 Publisher Interface和Subscriber Interface的集合體,有需要數(shù)據(jù)類型轉(zhuǎn)換及數(shù)據(jù)處理的需求才去實(shí)現(xiàn)這個(gè)接口

下面的例子實(shí)現(xiàn)的式字符串的數(shù)據(jù)消息訂閱處理

實(shí)現(xiàn)訂閱者Subscriber Interface

import java.util.concurrent.Flow;
public class MySubscriber implements Flow.Subscriber<String> {
  private Flow.Subscription subscription;  //訂閱令牌
  @Override
  public void onSubscribe(Flow.Subscription subscription) {
      System.out.println("訂閱關(guān)系建立onSubscribe: " + subscription);
      this.subscription = subscription;
      subscription.request(2);
  }
  @Override
  public void onNext(String item) {
      System.out.println("item: " + item);
      // 一個(gè)消息處理完成之后,可以繼續(xù)調(diào)用subscription.request(n);向發(fā)布者要求數(shù)據(jù)發(fā)送
      //subscription.request(n);
  }
  @Override
  public void onError(Throwable throwable) {
      System.out.println("onError: " + throwable);
  }
  @Override
  public void onComplete() {
      System.out.println("onComplete");
  }
}

SubmissionPublisher消息發(fā)布者

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
public class SubmissionPublisherExample {
  public static void main(String[] args) throws InterruptedException {
      ExecutorService executor = Executors.newFixedThreadPool(1);
      SubmissionPublisher<String> sb = new SubmissionPublisher<>(executor, Flow.defaultBufferSize());
      sb.subscribe(new MySubscriber());   //建立訂閱關(guān)系,可以有多個(gè)訂閱者
      sb.submit("數(shù)據(jù) 1");  //發(fā)送消息1
      sb.submit("數(shù)據(jù) 2"); //發(fā)送消息2
      sb.submit("數(shù)據(jù) 3"); //發(fā)送消息3
      executor.shutdown();
  }
}

控制臺(tái)打印輸出結(jié)果

訂閱關(guān)系建立
onSubscribe: java.util.concurrent.SubmissionPublisher$BufferedSubscription@27e81a39
item: 數(shù)據(jù) 1
item: 數(shù)據(jù) 2

請(qǐng)注意:即使發(fā)布者submit了3條數(shù)據(jù),MySubscriber也僅收到了2條數(shù)據(jù)進(jìn)行了處理。是因?yàn)槲覀冊(cè)?code>MySubscriber#onSubscribe()方法中使用了subscription.request(2);。這就是“背壓”的響應(yīng)式編程效果,我有能力處理多少數(shù)據(jù),就會(huì)通知消息發(fā)布者給多少數(shù)據(jù)。

以上就是java9新特性Reactive Stream響應(yīng)式編程 API的詳細(xì)內(nèi)容,更多關(guān)于java9 Reactive Stream響應(yīng)式API的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • java解析JSON數(shù)據(jù)詳解

    java解析JSON數(shù)據(jù)詳解

    這篇文章主要介紹了java解析JSON數(shù)據(jù)詳解,具有一定借鑒價(jià)值,需要的朋友可以參考下。
    2017-12-12
  • maven插件maven-assembly-plugin打包歸納文件zip/tar使用

    maven插件maven-assembly-plugin打包歸納文件zip/tar使用

    java項(xiàng)目運(yùn)行的文件需要jar或者war格式,同時(shí)還需要使用Java命令,本文主要介紹了maven插件maven-assembly-plugin打包歸納文件zip/tar使用,具有一定的參考價(jià)值,感興趣的可以了解一下
    2024-02-02
  • Java優(yōu)秀測(cè)試框架TestNG詳解

    Java優(yōu)秀測(cè)試框架TestNG詳解

    這篇文章主要為大家詳細(xì)介紹了Java優(yōu)秀測(cè)試框架TestNG,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來(lái)幫助
    2022-02-02
  • Kotlin中的抽象類實(shí)現(xiàn)

    Kotlin中的抽象類實(shí)現(xiàn)

    這篇文章主要介紹了Kotlin中的抽象類實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-11-11
  • 詳解IDEA搭建springBoot方式一(推薦)

    詳解IDEA搭建springBoot方式一(推薦)

    這篇文章主要介紹了IDEA搭建springBoot方式一(推薦),本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-10-10
  • IDEA?Eval?Reset?使用方法匯總

    IDEA?Eval?Reset?使用方法匯總

    本文給大家介紹了IDEA?Eval?Reset?使用方法,安裝插件包括離線安裝方式和在線安裝方式,本文給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧
    2023-10-10
  • Java?IO之流的分類詳解

    Java?IO之流的分類詳解

    這篇文章主要為大家介紹了Java?IO之流的分類,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來(lái)幫助
    2022-01-01
  • SpringBoot升級(jí)到2.7.18后不兼容的地方及解決

    SpringBoot升級(jí)到2.7.18后不兼容的地方及解決

    這篇文章主要介紹了SpringBoot升級(jí)到2.7.18后不兼容的地方及解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-08-08
  • Spring jackson原理及基本使用方法詳解

    Spring jackson原理及基本使用方法詳解

    這篇文章主要介紹了Spring jackson原理及基本使用方法詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-10-10
  • RocketMQ事務(wù)消息保證消息的可靠性和一致性

    RocketMQ事務(wù)消息保證消息的可靠性和一致性

    RocketMQ事務(wù)消息是一種能夠保證消息傳遞的可靠性和一致性的消息傳遞模式。它通過(guò)引入“半消息”和“事務(wù)狀態(tài)”機(jī)制,實(shí)現(xiàn)了消息發(fā)送和本地事務(wù)執(zhí)行的原子性,從而確保了消息的可靠性和一致性
    2023-04-04

最新評(píng)論