Java Stream 并行流簡介、使用與注意事項小結(jié)
1. 并行流簡介
Java 8 引入了 Stream API,提供了一種高效的數(shù)據(jù)處理方式。而 ?并行流(Parallel Stream)? 則是 Stream 的并行版本,能夠?qū)⒘鞑僮鞣峙涞蕉鄠€線程中執(zhí)行,充分利用多核 CPU 的性能。
?特點:
- 默認使用
ForkJoinPool.commonPool()執(zhí)行任務。 - 適合處理 ?計算密集型 任務。
- 任務執(zhí)行順序不確定。
?2. 并行流的簡單使用
將普通流轉(zhuǎn)換為并行流非常簡單,只需調(diào)用 parallel() 方法即可。
?示例:并行流的基本使用
import java.util.Arrays;
import java.util.List;
public class ParallelStreamExample {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// 將流轉(zhuǎn)換為并行流
numbers.parallelStream()
.forEach(num -> System.out.println("線程: " + Thread.currentThread().getName() + ", 處理: " + num));
}
}輸出示例:
線程: main, 處理: 6
線程: ForkJoinPool.commonPool-worker-1, 處理: 3
線程: ForkJoinPool.commonPool-worker-2, 處理: 8
...
?3. 配合自定義線程池
默認情況下,并行流使用 ForkJoinPool.commonPool() 執(zhí)行任務。你可以通過自定義線程池來控制并行流的執(zhí)行環(huán)境。
?示例:自定義線程池
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class ParallelStreamCustomPool {
public static void main(String[] args) {
// 創(chuàng)建自定義線程池
ForkJoinPool customPool = new ForkJoinPool(4);
// 在自定義線程池中執(zhí)行并行流任務
customPool.submit(() -> {
List<Integer> result = IntStream.rangeClosed(1, 10)
.parallel()
.map(i -> {
System.out.println("線程: " + Thread.currentThread().getName() + ", 處理: " + i);
return i * 2;
})
.boxed()
.collect(Collectors.toList());
System.out.println("結(jié)果: " + result);
}).join(); // 等待任務完成
customPool.shutdown(); // 關閉線程池
}
}?示例:配合CompletableFuture實現(xiàn)異步
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class ParallelStreamWithCompletableFuture {
public static void main(String[] args) {
// 創(chuàng)建一個并行流
List<CompletableFuture<Integer>> futures = IntStream.rangeClosed(1, 10)
.parallel()
.mapToObj(i -> CompletableFuture.supplyAsync(() -> {
System.out.println("線程: " + Thread.currentThread().getName() + ", 處理: " + i);
return i * 2; // 模擬計算任務
}))
.collect(Collectors.toList());
// 等待所有任務完成并獲取結(jié)果
List<Integer> results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
System.out.println("結(jié)果: " + results);
}
}好處:
- ?并行流:適合處理數(shù)據(jù)流中的計算密集型任務,能夠自動將任務分配到多個線程中執(zhí)行。
- ?CompletableFuture:提供強大的異步編程能力,可以處理任務的依賴關系、異常處理、結(jié)果合并等。
結(jié)合兩者的優(yōu)勢,可以實現(xiàn):
- ?異步并行處理:將并行流的任務異步化,進一步提升性能。
- ?任務依賴管理:通過
CompletableFuture管理任務之間的依賴關系。 - ?結(jié)果合并:將多個任務的結(jié)果合并處理。
?4. 控制有序性
并行流的任務執(zhí)行順序是不確定的。如果需要保持順序,可以使用 forEachOrdered() 方法。
?示例:保持順序
import java.util.Arrays;
import java.util.List;
public class ParallelStreamOrder {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
numbers.parallelStream()
.forEachOrdered(System.out::println); // 輸出順序與流中元素順序一致
}
}?5. 共享資源的安全性
并行流在多個線程中執(zhí)行操作,如果操作共享可變狀態(tài),可能會導致線程安全問題。
?示例:線程安全問題
import java.util.ArrayList;
import java.util.List;
public class ParallelStreamThreadSafety {
public static void main(String[] args) {
List<Integer> result = new ArrayList<>();
IntStream.rangeClosed(1, 1000)
.parallel()
.forEach(result::add); // 這里會出現(xiàn)線程安全問題
System.out.println("結(jié)果大小: " + result.size()); // 結(jié)果可能小于 1000
}
}解決方法:
- 使用線程安全的集合,如
Collections.synchronizedList()。 - 使用
collect()方法將結(jié)果收集到線程安全的容器中。
?示例:線程安全的解決方案
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class ParallelStreamThreadSafety {
public static void main(String[] args) {
List<Integer> result = IntStream.rangeClosed(1, 1000)
.parallel()
.boxed()
.collect(Collectors.toList()); // 使用 collect() 方法
System.out.println("結(jié)果大小: " + result.size()); // 輸出: 1000
}
}?6. 注意事項
- ?任務類型:
- 適合 ?計算密集型 任務,不適合 ?I/O 密集型 任務。
- ?線程安全:
- 避免在并行流中操作共享可變狀態(tài)。
- ?任務順序:
- 并行流的任務執(zhí)行順序不確定,使用
forEachOrdered()保持順序。
- 并行流的任務執(zhí)行順序不確定,使用
- ?線程池管理:
- 使用自定義線程池時,記得關閉線程池,避免資源泄漏。
?7. 總結(jié)
并行流是 Java 8 提供的一個強大工具,能夠顯著提升數(shù)據(jù)處理性能。但在使用時需要注意線程安全、任務順序和線程池管理等問題。通過合理使用并行流,可以編寫高效、靈活的代碼。
?附錄:完整代碼
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class ParallelStreamDemo {
public static void main(String[] args) {
// 基本使用
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
numbers.parallelStream()
.forEach(num -> System.out.println("線程: " + Thread.currentThread().getName() + ", 處理: " + num));
// 自定義線程池
ForkJoinPool customPool = new ForkJoinPool(4);
customPool.submit(() -> {
List<Integer> result = IntStream.rangeClosed(1, 10)
.parallel()
.map(i -> {
System.out.println("線程: " + Thread.currentThread().getName() + ", 處理: " + i);
return i * 2;
})
.boxed()
.collect(Collectors.toList());
System.out.println("結(jié)果: " + result);
}).join();
customPool.shutdown();
// 保持順序
numbers.parallelStream()
.forEachOrdered(System.out::println);
// 線程安全
List<Integer> safeResult = IntStream.rangeClosed(1, 1000)
.parallel()
.boxed()
.collect(Collectors.toList());
System.out.println("結(jié)果大小: " + safeResult.size());
}
}希望這篇文章能幫助你更好地理解和使用 Java 的并行流!如果有任何問題,歡迎在評論區(qū)討論!
到此這篇關于Java Stream 并行流簡介、使用與注意事項小結(jié)的文章就介紹到這了,更多相關Java Stream 并行流內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
輸出java進程的jstack信息示例分享 通過線程堆棧信息分析java線程
通過ps到java進程號將進程的jstack信息輸出。jstack信息是java進程的線程堆棧信息,通過該信息可以分析java的線程阻塞等問題。2014-01-01
spring-boot整合ehcache實現(xiàn)緩存機制的方法
spring-boot是一個快速的集成框架,其設計目的是用來簡化新Spring應用的初始搭建以及開發(fā)過程。這篇文章主要介紹了spring-boot整合ehcache實現(xiàn)緩存機制,需要的朋友可以參考下2018-01-01
Java讀取properties文件連接數(shù)據(jù)庫的方法示例
這篇文章主要介紹了Java讀取properties文件連接數(shù)據(jù)庫的方法示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2019-04-04
java字符串轉(zhuǎn)數(shù)字及各種數(shù)字轉(zhuǎn)字符串的3種方法
這篇文章主要介紹了java字符串轉(zhuǎn)數(shù)字及各種數(shù)字轉(zhuǎn)字符串的3種方法,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-09-09
淺談Spring中@Transactional事務回滾及示例(附源碼)
本篇文章主要介紹了淺談Spring中@Transactional事務回滾及示例(附源碼),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-12-12

