用JAVA實現(xiàn)一套背壓機制
Reactive Streams:一種支持背壓的異步數(shù)據(jù)流處理標準,主流實現(xiàn)有RxJava和Reactor,Spring WebFlux默認集成的是Reactor。
Reactive Streams主要解決背壓(back-pressure)問題。當傳入的任務速率大于系統(tǒng)處理能力時,數(shù)據(jù)處理將會對未處理數(shù)據(jù)產(chǎn)生一個緩沖區(qū)。
背壓依我的理解來說,是指訂閱者能和發(fā)布者交互(通過代碼里面的調用request和cancel方法交互),可以調節(jié)發(fā)布者發(fā)布數(shù)據(jù)的速率,解決把訂閱者壓垮的問題。關鍵在于上面例子里面的訂閱關系Subscription這個接口,他有request和cancel 2個方法,用于通知發(fā)布者需要數(shù)據(jù)和通知發(fā)布者不再接受數(shù)據(jù)。
我們重點理解背壓在jdk9里面是如何實現(xiàn)的。關鍵在于發(fā)布者Publisher的實現(xiàn)類SubmissionPublisher的submit方法是block方法。訂閱者會有一個緩沖池,默認為Flow.defaultBufferSize() = 256。當訂閱者的緩沖池滿了之后,發(fā)布者調用submit方法發(fā)布數(shù)據(jù)就會被阻塞,發(fā)布者就會停(慢)下來;訂閱者消費了數(shù)據(jù)之后(調用Subscription.request方法),緩沖池有位置了,submit方法就會繼續(xù)執(zhí)行下去,就是通過這樣的機制,實現(xiàn)了調節(jié)發(fā)布者發(fā)布數(shù)據(jù)的速率,消費得快,生成就快,消費得慢,發(fā)布者就會被阻塞,當然就會慢下來了。
單線程版本:
一個生產(chǎn)者,一個消費者
import lombok.SneakyThrows;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class BackpressureExample {
public static void main(String[] args) throws InterruptedException {
BackpressureSubscriber subscriber = new BackpressureSubscriber();
BackpressurePublisher publisher = new BackpressurePublisher(subscriber);
publisher.start();
subscriber.start();
// 為了演示效果,這里讓主線程休眠一段時間
Thread.sleep(50000);
publisher.stop();
subscriber.stop();
}
@SneakyThrows
public static void processDataLogic(List<Integer> batch) {
//模擬任務執(zhí)行
int r = new Random().nextInt(3000);
Thread.sleep(r);
System.out.println(Thread.currentThread().getName() + ",Received batch: " + batch + ",sleep ms = " + r);
}
static class BackpressurePublisher {
private final BackpressureSubscriber subscriber;
private volatile boolean running;
public BackpressurePublisher(BackpressureSubscriber subscriber) {
this.subscriber = subscriber;
this.running = true;
}
public void start() {
Thread thread = new Thread(() -> {
int item = 1;
while (running) {
List<Integer> batch = new ArrayList<>();
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + "-----produce data = " + item);
batch.add(item++);
}
while (!subscriber.accept(batch)) {
if (!running) {
break;
}
}
}
});
thread.start();
}
public void stop() {
running = false;
}
}
static class BackpressureSubscriber {
private volatile boolean running;
public BackpressureSubscriber() {
this.running = true;
}
public boolean accept(List<Integer> batch) {
if (running) {
processDataLogic(batch);
return true;
} else {
return false;
}
}
public void start() {
// Subscriber 在 JDK 8 中沒有異步處理的能力,因此不需要單獨開啟線程
}
public void stop() {
running = false;
}
}
}多線程版本
一個生產(chǎn)者,多個消費者
import lombok.SneakyThrows;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class BackpressureExample {
public static void main(String[] args) throws InterruptedException {
BackpressureSubscriber subscriber = new BackpressureSubscriber();
BackpressurePublisher publisher = new BackpressurePublisher(subscriber);
publisher.start();
subscriber.start();
// 為了演示效果,這里讓主線程休眠一段時間
Thread.sleep(50000);
publisher.stop();
subscriber.stop();
}
@SneakyThrows
public static void processDataLogic(List<Integer> batch) {
//模擬任務執(zhí)行
int r = new Random().nextInt(3000);
Thread.sleep(r);
System.out.println(Thread.currentThread().getName() + ",Received batch: " + batch + ",sleep ms = " + r);
}
static class BackpressurePublisher {
private final BackpressureSubscriber subscriber;
private volatile boolean running;
public BackpressurePublisher(BackpressureSubscriber subscriber) {
this.subscriber = subscriber;
this.running = true;
}
public void start() {
Thread thread = new Thread(() -> {
int item = 1;
while (running) {
List<Integer> batch = new ArrayList<>();
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + "-----produce data = " + item);
batch.add(item++);
}
while (!subscriber.accept(batch)) {
if (!running) {
break;
}
}
}
});
thread.start();
}
public void stop() {
running = false;
}
}
static class BackpressureSubscriber {
private volatile boolean running;
private final ExecutorService executor;
private final int workerSize = 2;
private final List<Future> futures;
public BackpressureSubscriber() {
this.running = true;
this.executor = Executors.newFixedThreadPool(workerSize);
futures = new ArrayList<>(workerSize);
}
public boolean accept(List<Integer> batch) {
if (running) {
Future f = executor.submit(() -> processDataLogic(batch));
futures.add(f);
waitForTaskDone(futures);
return true;
} else {
return false;
}
}
public void waitForTaskDone(List<Future> futures) {
while (futures.size() >= workerSize) {
for (Future future : futures) {
if (future.isDone()) {
// 只要有一個worker是空閑就重新獲取任務
futures.remove(future);
return;
}
}
}
}
public void start() {
// Subscriber 在 JDK 8 中沒有異步處理的能力,因此不需要單獨開啟線程
}
public void stop() {
running = false;
executor.shutdown();
}
}
}到此這篇關于用JAVA自己實現(xiàn)一套背壓機制的文章就介紹到這了,更多相關java背壓機制內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
淺談JDK8中的Duration Period和ChronoUnit
在JDK8中,引入了三個非常有用的時間相關的API:Duration,Period和ChronoUnit。他們都是用來對時間進行統(tǒng)計的,本文將會詳細講解一下這三個API的使用2021-06-06
Servlet的兩種創(chuàng)建方式(xml?注解)示例詳解
這篇文章主要為大家介紹了Servlet的兩種創(chuàng)建方式(xml?注解)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-08-08
Debian 7 和 Debian 8 用戶安裝 Java 8的方法
Oracle Java 8 穩(wěn)定版本近期已發(fā)布,有很多新的特征變化。其中,有功能的程序支持通過“Lambda項目 ”,收到了一些安全更新和界面改進上的bug修復,使得開發(fā)人員的工作更容易。2014-03-03
Java的非阻塞隊列ConcurrentLinkedQueue解讀
這篇文章主要介紹了Java的非阻塞隊列ConcurrentLinkedQueue解讀,在并發(fā)編程中,有時候需要使用線程安全的隊列,如果要實現(xiàn)一個線程安全的隊列有兩種方式:一種是使用阻塞算法,另一種是使用非阻塞算法,需要的朋友可以參考下2023-12-12
java 從int數(shù)組中獲取最大數(shù)的方法
這篇文章主要介紹了java 從int數(shù)組中獲取最大數(shù)的方法,需要的朋友可以參考下2017-02-02

