Java 線程池+分布式實(shí)現(xiàn)代碼
1. 線程池
在 Java 開發(fā)中,"池" 通過預(yù)先創(chuàng)建并管理一定數(shù)量的資源,避免頻繁創(chuàng)建和銷毀資源帶來的性能開銷,從而提高系統(tǒng)效率。常見的池包括:
- 線程池:管理線程資源,避免頻繁創(chuàng)建線程
- 字符串常量池:緩存字符串,減少內(nèi)存占用
- 連接池:管理數(shù)據(jù)庫連接、網(wǎng)絡(luò)連接等
核心思想:用一個(gè)集合來維護(hù)固定大小或可配置的資源,當(dāng)需要使用資源時(shí)從池中獲取,使用完畢后歸還給池,而不是直接銷毀。
1.1 自定義線程池實(shí)現(xiàn)
1.1.1 線程池核心
PoolInit:線程池初始化和任務(wù)管理
ThreadPool:線程池核心類,管理工作線程
WorkThread:工作線程類,執(zhí)行具體任務(wù)
1.1.2 代碼示例
1.1.2.1 PoolInit
package com.lj.demo5;
import java.util.LinkedList;
import java.util.Scanner;
/**
* 多線程與集合綜合應(yīng)用示例
* 演示線程池的初始化和任務(wù)提交過程
*
* 應(yīng)用場景:
* 如QQ聊天服務(wù)器,面對大量用戶連接時(shí),
* 使用線程池管理線程資源比為每個(gè)用戶創(chuàng)建新線程更高效
*/
public class PoolInit {
// 存儲(chǔ)待執(zhí)行任務(wù)的鏈表,作為任務(wù)隊(duì)列
public static LinkedList<String> linkTaskName = new LinkedList<String>();
// 線程池實(shí)例
public static ThreadPool t;
public PoolInit() {
// 初始化線程池,指定核心線程數(shù)為3
t = new ThreadPool(3);
// 從控制臺(tái)接收任務(wù)并提交給線程池
Scanner scanner = new Scanner(System.in);
String taskName = "";
while (true) {
System.out.println("請輸入線程需要處理的任務(wù)名稱:");
taskName = scanner.next();
// 將任務(wù)添加到任務(wù)隊(duì)列
linkTaskName.add(taskName);
// 提交第一個(gè)任務(wù)給線程池執(zhí)行
t.executeTask(linkTaskName.get(0));
}
}
public static void main(String[] args) {
// 啟動(dòng)線程池
new PoolInit();
}
}LinkTaskName:使用 LinkedList 作為任務(wù)隊(duì)列,存儲(chǔ)待執(zhí)行的任務(wù),構(gòu)造方法中初始化線程池,并通過控制臺(tái)接收用戶輸入的任務(wù),每當(dāng)有新任務(wù)輸入,先添加到任務(wù)隊(duì)列,再提交給線程池執(zhí)行。
1.1.2.2 ThreadPool
package com.lj.demo5;
import java.util.Vector;
/**
* 線程池核心類
* 管理工作線程,負(fù)責(zé)任務(wù)的分配與調(diào)度
*/
public class ThreadPool {
// 線程池大小
private int num;
// 存儲(chǔ)工作線程的集合,使用Vector保證線程安全
Vector<Runnable> vectors ;
/**
* 構(gòu)造方法,初始化線程池
* @param num 線程池中的線程數(shù)量
*/
public ThreadPool(int num) {
this.num = num;
// 初始化線程集合,容量為指定的線程數(shù)量
this.vectors = new Vector<Runnable>(this.num);
System.out.println("線程集合當(dāng)前大?。? + this.vectors.size());
System.out.println("線程集合容量:" + this.vectors.capacity());
// 創(chuàng)建指定數(shù)量的工作線程并啟動(dòng)
for(int i=0;i<this.vectors.capacity();i++) {
WorkThread w = new WorkThread();
this.vectors.add(w);
w.start(); // 啟動(dòng)工作線程
}
}
/**
* 執(zhí)行任務(wù)方法
* 查找空閑線程并分配任務(wù)
* @param taskName 任務(wù)名稱
*/
public void executeTask(String taskName) {
// 遍歷線程池中的線程,尋找空閑線程
for(int i=0;i<this.vectors.capacity();i++) {
WorkThread w = (WorkThread) this.vectors.get(i);
System.out.println("線程池中的線程" + w.getName() + "的狀態(tài)為:" + w.isFlag());
// 如果線程處于空閑狀態(tài)(flag為false)
if(!w.isFlag()) {
System.out.println(w.getName() + ",現(xiàn)在是空閑狀態(tài),分配新任務(wù)");
// 設(shè)置線程為忙碌狀態(tài)
w.setFlag(true);
// 分配任務(wù)
w.setTaskName(taskName);
// 從任務(wù)隊(duì)列移除已分配的任務(wù)
PoolInit.linkTaskName.remove(0);
break; // 找到空閑線程后退出循環(huán)
}
}
}
}使用 Vector 存儲(chǔ)工作線程,因?yàn)?Vector 是線程安全的集合,構(gòu)造方法中創(chuàng)建并啟動(dòng)指定數(shù)量的工作線程
executeTask()方法將任務(wù)分配給空閑線程:
遍歷所有工作線程,檢查線程狀態(tài)
找到空閑線程后,設(shè)置線程狀態(tài)為忙碌,分配任務(wù)
從任務(wù)隊(duì)列中移除已分配的任務(wù)
1.1.2.3 WorkThread
package com.lj.demo5;
/**
* 工作線程類
* 實(shí)際執(zhí)行任務(wù)的線程
*/
public class WorkThread extends Thread {
// 線程狀態(tài)標(biāo)志:false表示空閑,true表示忙碌
private boolean flag;
// 當(dāng)前執(zhí)行的任務(wù)名稱
private String taskName;
/**
* 線程運(yùn)行方法
* 循環(huán)處理任務(wù):有任務(wù)則執(zhí)行,無任務(wù)則等待
*/
@Override
public synchronized void run() {
while (true) {
if (this.isFlag()) { // 有任務(wù)要執(zhí)行
System.out.println(Thread.currentThread().getName() + "正在執(zhí)行任務(wù):" + this.taskName);
try {
// 模擬任務(wù)執(zhí)行時(shí)間(30秒)
Thread.sleep(30 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "已完成任務(wù):" + this.taskName + ",狀態(tài)變?yōu)榭臻e");
// 任務(wù)完成,設(shè)置為空閑狀態(tài)
this.setFlag(false);
} else { // 無任務(wù)
System.out.println("任務(wù)隊(duì)列中等待的任務(wù)數(shù)量:" + PoolInit.linkTaskName.size());
// 檢查是否有未處理的任務(wù)
if (PoolInit.linkTaskName.size() > 0) {
String taskName = PoolInit.linkTaskName.get(0);
PoolInit.t.executeTask(taskName);
System.out.println(Thread.currentThread().getName() +
"發(fā)現(xiàn)未處理任務(wù),嘗試處理:" + taskName);
} else {
System.out.println(Thread.currentThread().getName() + ":當(dāng)前沒有任務(wù),進(jìn)入等待狀態(tài)...");
try {
// 進(jìn)入等待狀態(tài),釋放鎖
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
// getter和setter方法
public boolean isFlag() {
return flag;
}
/**
* 設(shè)置線程狀態(tài)
* 當(dāng)設(shè)置為忙碌狀態(tài)時(shí),喚醒等待的線程
*/
public synchronized void setFlag(boolean flag) {
this.flag = flag;
if (this.flag) {
// 有新任務(wù),喚醒線程
this.notify();
}
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
}flag變量用于標(biāo)識線程狀態(tài):false(空閑),true(忙碌)
run()方法是線程的核心,通過循環(huán)處理任務(wù):
- 當(dāng)有任務(wù)時(shí)(flag=true),執(zhí)行任務(wù)并休眠模擬處理時(shí)間
任務(wù)完成后,將線程狀態(tài)設(shè)為空閑
- 當(dāng)無任務(wù)時(shí)(flag=false),檢查任務(wù)隊(duì)列是否有等待任務(wù)
如果有等待任務(wù),通知線程池分配任務(wù);如果沒有,則進(jìn)入等待狀態(tài)
1.2 總結(jié)流程
- 初始化線程池,創(chuàng)建指定數(shù)量的工作線程并啟動(dòng)
- 工作線程啟動(dòng)后進(jìn)入等待狀態(tài),等待接收任務(wù)
- 用戶輸入任務(wù),添加到任務(wù)隊(duì)列
- 線程池查找空閑線程,分配任務(wù)并喚醒線程
- 線程執(zhí)行任務(wù),完成后回到空閑狀態(tài)
- 重復(fù)步驟 3-5,直到程序結(jié)束
2. Java分布式處理
分布式系統(tǒng)是由多個(gè)獨(dú)立計(jì)算機(jī)組成的系統(tǒng),這些計(jì)算機(jī)通過網(wǎng)絡(luò)協(xié)同工作,對外表現(xiàn)為一個(gè)整體。
下面我通過 WebService 實(shí)現(xiàn)一個(gè)簡單的分布式服務(wù)。
2.1 WebService 服務(wù)接口
package com.lj.data;
import javax.jws.WebMethod;
import javax.jws.WebService;
/**
* WebService服務(wù)接口
* 定義遠(yuǎn)程可調(diào)用的方法
*
* RPC(遠(yuǎn)程過程調(diào)用)機(jī)制:
* - RMI:客戶端和服務(wù)器端都必須是Java
* - WebService:支持跨語言、跨平臺(tái)通信
* - 其他:hessian, thrift, grpc, dubbo等
*/
@WebService(targetNamespace = "http://lj.com/wsdl")
public interface IAdminService {
/**
* 定義WebService方法
* @return 字符串結(jié)果
*/
@WebMethod
public String queryStr();
}2.2 WebService 服務(wù)實(shí)現(xiàn)
package com.lj.data.impl;
import javax.jws.WebService;
import com.lj.data.IAdminService;
/**
* WebService服務(wù)實(shí)現(xiàn)類
* 提供接口中定義的方法的具體實(shí)現(xiàn)
*/
@WebService(
portName = "admin",//端口名稱
serviceName = "AdminServiceImpl",//服務(wù)名稱
targetNamespace = "http://lj.com/wsdl",//必須與接口的命名空間一致
endpointInterface = "com.lj.data.IAdminService"http://指定實(shí)現(xiàn)的接口
)
public class AdminServiceImpl implements IAdminService {
/**
* 實(shí)現(xiàn)查詢方法
* @return 返回示例字符串
*/
@Override
public String queryStr() {
return "你好,世界";
}
}2.3 發(fā)布 WebService 服務(wù)
package com.lj.webserviceclient;
import javax.xml.ws.Endpoint;
import com.lj.data.IAdminService;
import com.lj.data.impl.AdminServiceImpl;
/**
* WebService服務(wù)發(fā)布類
* 將服務(wù)發(fā)布到指定地址,供客戶端調(diào)用
*/
public class App {
public static void main(String[] args) {
System.out.println("開始發(fā)布WebService遠(yuǎn)程服務(wù)...");
// 創(chuàng)建服務(wù)實(shí)現(xiàn)類實(shí)例
IAdminService adminService = new AdminServiceImpl();
// 發(fā)布服務(wù),指定服務(wù)地址
Endpoint.publish("http://127.0.0.1:8225/AdminServiceImpl/admin", adminService);
System.out.println("WebService服務(wù)發(fā)布成功!");
}
}3.4 WebService 客戶端調(diào)用
package com.lj.webserviceclient;
import java.net.URL;
import javax.xml.namespace.QName;
import javax.xml.ws.Service;
import com.lj.data.IAdminService;
/**
* WebService客戶端
* 調(diào)用遠(yuǎn)程發(fā)布的WebService服務(wù)
*/
public class App {
public static void main(String[] args) {
try {
// 服務(wù)的URL地址
URL url = new URL("http://127.0.0.1:8225/AdminServiceImpl/admin");
// 創(chuàng)建QName對象,指定命名空間和服務(wù)名稱
QName qname = new QName("http://lj.com/wsdl", "AdminServiceImpl");
// 創(chuàng)建Service對象
Service service = Service.create(url, qname);
// 獲取服務(wù)接口實(shí)例
IAdminService adminService = service.getPort(IAdminService.class);
// 調(diào)用遠(yuǎn)程方法
String message = adminService.queryStr();
System.out.println("調(diào)用遠(yuǎn)程WebService服務(wù)的結(jié)果為: " + message);
} catch (Exception e) {
e.printStackTrace();
}
}
}到此這篇關(guān)于Java 線程池+分布式實(shí)現(xiàn)代碼的文章就介紹到這了,更多相關(guān)Java 線程池+分布式內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Java實(shí)現(xiàn)ThreadLocal數(shù)據(jù)在線程池間傳遞的解決方案
- Java中的xxl-job調(diào)度器線程池工作機(jī)制
- Java線程池內(nèi)部任務(wù)出異常后發(fā)現(xiàn)異常的3種方法
- Java 線程池核心參數(shù)、執(zhí)行流程與實(shí)戰(zhàn)建議全解析
- java分布式定時(shí)任務(wù)實(shí)現(xiàn)細(xì)節(jié)
- JAVA常用分布式鎖Redisson詳解
- JAVA實(shí)現(xiàn)redis分布式雙重加鎖的示例代碼
- java中的分布式事務(wù)解決方式
- java實(shí)現(xiàn)分布式鎖的常用三種方式
相關(guān)文章
分布式鎖在Spring Boot應(yīng)用中的實(shí)現(xiàn)過程
文章介紹在SpringBoot中通過自定義Lock注解、LockAspect切面和RedisLockUtils工具類實(shí)現(xiàn)分布式鎖,確保多實(shí)例并發(fā)操作的數(shù)據(jù)一致性,注解聲明式管理鎖,切面處理獲取與釋放,工具類基于Redisson操作Redis,簡化鎖機(jī)制集成2025-08-08
java實(shí)現(xiàn)Spring在XML配置java類的方法
下面小編就為大家?guī)硪黄猨ava實(shí)現(xiàn)Spring在XML配置java類的方法。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2016-11-11
nodejs連接dubbo服務(wù)的java工程實(shí)現(xiàn)示例
這篇文章主要介紹了在項(xiàng)目遷移中,nodejs連接dubbo服務(wù)的java工程實(shí)現(xiàn)示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2022-03-03
Spring Cloud-Feign服務(wù)調(diào)用的問題及處理方法
Feign 是一個(gè)聲明式的 REST 客戶端,它用了基于接口的注解方式,很方便實(shí)現(xiàn)客戶端配置。接下來通過本文給大家介紹Spring Cloud-Feign服務(wù)調(diào)用,需要的朋友可以參考下2021-10-10
RocketMQ生產(chǎn)者調(diào)用start發(fā)送消息原理示例
這篇文章主要為大家介紹了RocketMQ生產(chǎn)者調(diào)用start發(fā)送消息原理示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-11-11

