用JAVA實(shí)現(xiàn)一套背壓機(jī)制
Reactive Streams:一種支持背壓的異步數(shù)據(jù)流處理標(biāo)準(zhǔn),主流實(shí)現(xiàn)有RxJava和Reactor,Spring WebFlux默認(rèn)集成的是Reactor。
Reactive Streams主要解決背壓(back-pressure)問(wèn)題。當(dāng)傳入的任務(wù)速率大于系統(tǒng)處理能力時(shí),數(shù)據(jù)處理將會(huì)對(duì)未處理數(shù)據(jù)產(chǎn)生一個(gè)緩沖區(qū)。
背壓依我的理解來(lái)說(shuō),是指訂閱者能和發(fā)布者交互(通過(guò)代碼里面的調(diào)用request和cancel方法交互),可以調(diào)節(jié)發(fā)布者發(fā)布數(shù)據(jù)的速率,解決把訂閱者壓垮的問(wèn)題。關(guān)鍵在于上面例子里面的訂閱關(guān)系Subscription這個(gè)接口,他有request和cancel 2個(gè)方法,用于通知發(fā)布者需要數(shù)據(jù)和通知發(fā)布者不再接受數(shù)據(jù)。
我們重點(diǎn)理解背壓在jdk9里面是如何實(shí)現(xiàn)的。關(guān)鍵在于發(fā)布者Publisher的實(shí)現(xiàn)類SubmissionPublisher的submit方法是block方法。訂閱者會(huì)有一個(gè)緩沖池,默認(rèn)為Flow.defaultBufferSize() = 256。當(dāng)訂閱者的緩沖池滿了之后,發(fā)布者調(diào)用submit方法發(fā)布數(shù)據(jù)就會(huì)被阻塞,發(fā)布者就會(huì)停(慢)下來(lái);訂閱者消費(fèi)了數(shù)據(jù)之后(調(diào)用Subscription.request方法),緩沖池有位置了,submit方法就會(huì)繼續(xù)執(zhí)行下去,就是通過(guò)這樣的機(jī)制,實(shí)現(xiàn)了調(diào)節(jié)發(fā)布者發(fā)布數(shù)據(jù)的速率,消費(fèi)得快,生成就快,消費(fèi)得慢,發(fā)布者就會(huì)被阻塞,當(dāng)然就會(huì)慢下來(lái)了。
單線程版本:
一個(gè)生產(chǎn)者,一個(gè)消費(fèi)者
import lombok.SneakyThrows;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class BackpressureExample {
public static void main(String[] args) throws InterruptedException {
BackpressureSubscriber subscriber = new BackpressureSubscriber();
BackpressurePublisher publisher = new BackpressurePublisher(subscriber);
publisher.start();
subscriber.start();
// 為了演示效果,這里讓主線程休眠一段時(shí)間
Thread.sleep(50000);
publisher.stop();
subscriber.stop();
}
@SneakyThrows
public static void processDataLogic(List<Integer> batch) {
//模擬任務(wù)執(zhí)行
int r = new Random().nextInt(3000);
Thread.sleep(r);
System.out.println(Thread.currentThread().getName() + ",Received batch: " + batch + ",sleep ms = " + r);
}
static class BackpressurePublisher {
private final BackpressureSubscriber subscriber;
private volatile boolean running;
public BackpressurePublisher(BackpressureSubscriber subscriber) {
this.subscriber = subscriber;
this.running = true;
}
public void start() {
Thread thread = new Thread(() -> {
int item = 1;
while (running) {
List<Integer> batch = new ArrayList<>();
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + "-----produce data = " + item);
batch.add(item++);
}
while (!subscriber.accept(batch)) {
if (!running) {
break;
}
}
}
});
thread.start();
}
public void stop() {
running = false;
}
}
static class BackpressureSubscriber {
private volatile boolean running;
public BackpressureSubscriber() {
this.running = true;
}
public boolean accept(List<Integer> batch) {
if (running) {
processDataLogic(batch);
return true;
} else {
return false;
}
}
public void start() {
// Subscriber 在 JDK 8 中沒(méi)有異步處理的能力,因此不需要單獨(dú)開啟線程
}
public void stop() {
running = false;
}
}
}多線程版本
一個(gè)生產(chǎn)者,多個(gè)消費(fèi)者
import lombok.SneakyThrows;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class BackpressureExample {
public static void main(String[] args) throws InterruptedException {
BackpressureSubscriber subscriber = new BackpressureSubscriber();
BackpressurePublisher publisher = new BackpressurePublisher(subscriber);
publisher.start();
subscriber.start();
// 為了演示效果,這里讓主線程休眠一段時(shí)間
Thread.sleep(50000);
publisher.stop();
subscriber.stop();
}
@SneakyThrows
public static void processDataLogic(List<Integer> batch) {
//模擬任務(wù)執(zhí)行
int r = new Random().nextInt(3000);
Thread.sleep(r);
System.out.println(Thread.currentThread().getName() + ",Received batch: " + batch + ",sleep ms = " + r);
}
static class BackpressurePublisher {
private final BackpressureSubscriber subscriber;
private volatile boolean running;
public BackpressurePublisher(BackpressureSubscriber subscriber) {
this.subscriber = subscriber;
this.running = true;
}
public void start() {
Thread thread = new Thread(() -> {
int item = 1;
while (running) {
List<Integer> batch = new ArrayList<>();
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + "-----produce data = " + item);
batch.add(item++);
}
while (!subscriber.accept(batch)) {
if (!running) {
break;
}
}
}
});
thread.start();
}
public void stop() {
running = false;
}
}
static class BackpressureSubscriber {
private volatile boolean running;
private final ExecutorService executor;
private final int workerSize = 2;
private final List<Future> futures;
public BackpressureSubscriber() {
this.running = true;
this.executor = Executors.newFixedThreadPool(workerSize);
futures = new ArrayList<>(workerSize);
}
public boolean accept(List<Integer> batch) {
if (running) {
Future f = executor.submit(() -> processDataLogic(batch));
futures.add(f);
waitForTaskDone(futures);
return true;
} else {
return false;
}
}
public void waitForTaskDone(List<Future> futures) {
while (futures.size() >= workerSize) {
for (Future future : futures) {
if (future.isDone()) {
// 只要有一個(gè)worker是空閑就重新獲取任務(wù)
futures.remove(future);
return;
}
}
}
}
public void start() {
// Subscriber 在 JDK 8 中沒(méi)有異步處理的能力,因此不需要單獨(dú)開啟線程
}
public void stop() {
running = false;
executor.shutdown();
}
}
}到此這篇關(guān)于用JAVA自己實(shí)現(xiàn)一套背壓機(jī)制的文章就介紹到這了,更多相關(guān)java背壓機(jī)制內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
淺談JDK8中的Duration Period和ChronoUnit
在JDK8中,引入了三個(gè)非常有用的時(shí)間相關(guān)的API:Duration,Period和ChronoUnit。他們都是用來(lái)對(duì)時(shí)間進(jìn)行統(tǒng)計(jì)的,本文將會(huì)詳細(xì)講解一下這三個(gè)API的使用2021-06-06
Servlet的兩種創(chuàng)建方式(xml?注解)示例詳解
這篇文章主要為大家介紹了Servlet的兩種創(chuàng)建方式(xml?注解)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-08-08
Debian 7 和 Debian 8 用戶安裝 Java 8的方法
Oracle Java 8 穩(wěn)定版本近期已發(fā)布,有很多新的特征變化。其中,有功能的程序支持通過(guò)“Lambda項(xiàng)目 ”,收到了一些安全更新和界面改進(jìn)上的bug修復(fù),使得開發(fā)人員的工作更容易。2014-03-03
Java的非阻塞隊(duì)列ConcurrentLinkedQueue解讀
這篇文章主要介紹了Java的非阻塞隊(duì)列ConcurrentLinkedQueue解讀,在并發(fā)編程中,有時(shí)候需要使用線程安全的隊(duì)列,如果要實(shí)現(xiàn)一個(gè)線程安全的隊(duì)列有兩種方式:一種是使用阻塞算法,另一種是使用非阻塞算法,需要的朋友可以參考下2023-12-12
java 從int數(shù)組中獲取最大數(shù)的方法
這篇文章主要介紹了java 從int數(shù)組中獲取最大數(shù)的方法,需要的朋友可以參考下2017-02-02

