欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

用JAVA實現(xiàn)一套背壓機制

 更新時間:2023年06月30日 08:47:27   作者:hwp0710  
背壓依我的理解來說,是指訂閱者能和發(fā)布者交互,可以調節(jié)發(fā)布者發(fā)布數(shù)據(jù)的速率,解決把訂閱者壓垮的問題,這篇文章主要介紹了用JAVA自己實現(xiàn)一套背壓機制,需要的朋友可以參考下

Reactive Streams:一種支持背壓的異步數(shù)據(jù)流處理標準,主流實現(xiàn)有RxJava和Reactor,Spring WebFlux默認集成的是Reactor。

Reactive Streams主要解決背壓(back-pressure)問題。當傳入的任務速率大于系統(tǒng)處理能力時,數(shù)據(jù)處理將會對未處理數(shù)據(jù)產(chǎn)生一個緩沖區(qū)。

背壓依我的理解來說,是指訂閱者能和發(fā)布者交互(通過代碼里面的調用request和cancel方法交互),可以調節(jié)發(fā)布者發(fā)布數(shù)據(jù)的速率,解決把訂閱者壓垮的問題。關鍵在于上面例子里面的訂閱關系Subscription這個接口,他有request和cancel 2個方法,用于通知發(fā)布者需要數(shù)據(jù)和通知發(fā)布者不再接受數(shù)據(jù)。

我們重點理解背壓在jdk9里面是如何實現(xiàn)的。關鍵在于發(fā)布者Publisher的實現(xiàn)類SubmissionPublisher的submit方法是block方法。訂閱者會有一個緩沖池,默認為Flow.defaultBufferSize() = 256。當訂閱者的緩沖池滿了之后,發(fā)布者調用submit方法發(fā)布數(shù)據(jù)就會被阻塞,發(fā)布者就會停(慢)下來;訂閱者消費了數(shù)據(jù)之后(調用Subscription.request方法),緩沖池有位置了,submit方法就會繼續(xù)執(zhí)行下去,就是通過這樣的機制,實現(xiàn)了調節(jié)發(fā)布者發(fā)布數(shù)據(jù)的速率,消費得快,生成就快,消費得慢,發(fā)布者就會被阻塞,當然就會慢下來了。

單線程版本:

一個生產(chǎn)者,一個消費者

import lombok.SneakyThrows;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class BackpressureExample {
    public static void main(String[] args) throws InterruptedException {
        BackpressureSubscriber subscriber = new BackpressureSubscriber();
        BackpressurePublisher publisher = new BackpressurePublisher(subscriber);
        publisher.start();
        subscriber.start();
        // 為了演示效果,這里讓主線程休眠一段時間
        Thread.sleep(50000);
        publisher.stop();
        subscriber.stop();
    }
    @SneakyThrows
    public static void processDataLogic(List<Integer> batch) {
        //模擬任務執(zhí)行
        int r = new Random().nextInt(3000);
        Thread.sleep(r);
        System.out.println(Thread.currentThread().getName() + ",Received batch: " + batch + ",sleep ms = " + r);
    }
    static class BackpressurePublisher {
        private final BackpressureSubscriber subscriber;
        private volatile boolean running;
        public BackpressurePublisher(BackpressureSubscriber subscriber) {
            this.subscriber = subscriber;
            this.running = true;
        }
        public void start() {
            Thread thread = new Thread(() -> {
                int item = 1;
                while (running) {
                    List<Integer> batch = new ArrayList<>();
                    for (int i = 0; i < 5; i++) {
                        System.out.println(Thread.currentThread().getName() + "-----produce data = " + item);
                        batch.add(item++);
                    }
                    while (!subscriber.accept(batch)) {
                        if (!running) {
                            break;
                        }
                    }
                }
            });
            thread.start();
        }
        public void stop() {
            running = false;
        }
    }
    static class BackpressureSubscriber {
        private volatile boolean running;
        public BackpressureSubscriber() {
            this.running = true;
        }
        public boolean accept(List<Integer> batch) {
            if (running) {
                processDataLogic(batch);
                return true;
            } else {
                return false;
            }
        }
        public void start() {
            // Subscriber 在 JDK 8 中沒有異步處理的能力,因此不需要單獨開啟線程
        }
        public void stop() {
            running = false;
        }
    }
}

多線程版本

一個生產(chǎn)者,多個消費者

import lombok.SneakyThrows;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class BackpressureExample {
    public static void main(String[] args) throws InterruptedException {
        BackpressureSubscriber subscriber = new BackpressureSubscriber();
        BackpressurePublisher publisher = new BackpressurePublisher(subscriber);
        publisher.start();
        subscriber.start();
        // 為了演示效果,這里讓主線程休眠一段時間
        Thread.sleep(50000);
        publisher.stop();
        subscriber.stop();
    }
    @SneakyThrows
    public static void processDataLogic(List<Integer> batch) {
        //模擬任務執(zhí)行
        int r = new Random().nextInt(3000);
        Thread.sleep(r);
        System.out.println(Thread.currentThread().getName() + ",Received batch: " + batch + ",sleep ms = " + r);
    }
    static class BackpressurePublisher {
        private final BackpressureSubscriber subscriber;
        private volatile boolean running;
        public BackpressurePublisher(BackpressureSubscriber subscriber) {
            this.subscriber = subscriber;
            this.running = true;
        }
        public void start() {
            Thread thread = new Thread(() -> {
                int item = 1;
                while (running) {
                    List<Integer> batch = new ArrayList<>();
                    for (int i = 0; i < 5; i++) {
                        System.out.println(Thread.currentThread().getName() + "-----produce data = " + item);
                        batch.add(item++);
                    }
                    while (!subscriber.accept(batch)) {
                        if (!running) {
                            break;
                        }
                    }
                }
            });
            thread.start();
        }
        public void stop() {
            running = false;
        }
    }
    static class BackpressureSubscriber {
        private volatile boolean running;
        private final ExecutorService executor;
        private final int workerSize = 2;
        private final List<Future> futures;
        public BackpressureSubscriber() {
            this.running = true;
            this.executor = Executors.newFixedThreadPool(workerSize);
            futures = new ArrayList<>(workerSize);
        }
        public boolean accept(List<Integer> batch) {
            if (running) {
                Future f = executor.submit(() -> processDataLogic(batch));
                futures.add(f);
                waitForTaskDone(futures);
                return true;
            } else {
                return false;
            }
        }
        public void waitForTaskDone(List<Future> futures) {
            while (futures.size() >= workerSize) {
                for (Future future : futures) {
                    if (future.isDone()) {
                        // 只要有一個worker是空閑就重新獲取任務
                        futures.remove(future);
                        return;
                    }
                }
            }
        }
        public void start() {
            // Subscriber 在 JDK 8 中沒有異步處理的能力,因此不需要單獨開啟線程
        }
        public void stop() {
            running = false;
            executor.shutdown();
        }
    }
}

到此這篇關于用JAVA自己實現(xiàn)一套背壓機制的文章就介紹到這了,更多相關java背壓機制內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • 淺談JDK8中的Duration Period和ChronoUnit

    淺談JDK8中的Duration Period和ChronoUnit

    在JDK8中,引入了三個非常有用的時間相關的API:Duration,Period和ChronoUnit。他們都是用來對時間進行統(tǒng)計的,本文將會詳細講解一下這三個API的使用
    2021-06-06
  • Java中捕獲線程異常的幾種方式總結

    Java中捕獲線程異常的幾種方式總結

    這篇文章主要介紹了Java中捕獲線程異常的幾種方式總結,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-11-11
  • Servlet的兩種創(chuàng)建方式(xml?注解)示例詳解

    Servlet的兩種創(chuàng)建方式(xml?注解)示例詳解

    這篇文章主要為大家介紹了Servlet的兩種創(chuàng)建方式(xml?注解)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-08-08
  • Java線程生命周期及轉換過程

    Java線程生命周期及轉換過程

    這篇文章主要介紹了Java線程生命周期及轉換過程,線程的生命周期指的是線程從創(chuàng)建到銷毀的整個過程初始狀態(tài)、可運行狀態(tài)、運行狀態(tài)、休眠狀態(tài)、終止狀態(tài),更多詳細介紹,需要的小伙伴可以參考下面文章內(nèi)容
    2022-05-05
  • Java HashMap的工作原理

    Java HashMap的工作原理

    這篇文章主要介紹了Java HashMap的工作原理的相關資料,需要的朋友可以參考下
    2016-03-03
  • MyBatis的動態(tài)攔截sql并修改

    MyBatis的動態(tài)攔截sql并修改

    因工作需求,需要根據(jù)用戶的數(shù)據(jù)權限,來查詢并展示相應的數(shù)據(jù),那么就需要動態(tài)攔截sql,本文就來介紹了MyBatis的動態(tài)攔截sql并修改,感興趣的可以了解一下
    2023-11-11
  • Java并發(fā)編程信號量Semapher

    Java并發(fā)編程信號量Semapher

    這篇文章主要介紹了Java并發(fā)編程信號量Semapher,Semapher信號量也是Java中的一個同步器,下文關于信號量Semapher的更多內(nèi)容介紹,需要的小伙伴可以參考下面文章
    2022-04-04
  • Debian 7 和 Debian 8 用戶安裝 Java 8的方法

    Debian 7 和 Debian 8 用戶安裝 Java 8的方法

    Oracle Java 8 穩(wěn)定版本近期已發(fā)布,有很多新的特征變化。其中,有功能的程序支持通過“Lambda項目 ”,收到了一些安全更新和界面改進上的bug修復,使得開發(fā)人員的工作更容易。
    2014-03-03
  • Java的非阻塞隊列ConcurrentLinkedQueue解讀

    Java的非阻塞隊列ConcurrentLinkedQueue解讀

    這篇文章主要介紹了Java的非阻塞隊列ConcurrentLinkedQueue解讀,在并發(fā)編程中,有時候需要使用線程安全的隊列,如果要實現(xiàn)一個線程安全的隊列有兩種方式:一種是使用阻塞算法,另一種是使用非阻塞算法,需要的朋友可以參考下
    2023-12-12
  • java 從int數(shù)組中獲取最大數(shù)的方法

    java 從int數(shù)組中獲取最大數(shù)的方法

    這篇文章主要介紹了java 從int數(shù)組中獲取最大數(shù)的方法,需要的朋友可以參考下
    2017-02-02

最新評論