Java中的Semaphore信號量使用解析
1、Semaphore 是什么
Semaphore 通常我們叫它信號量, 可以用來控制同時(shí)訪問特定資源的線程數(shù)量,通過協(xié)調(diào)各個(gè)線程,以保證合理的使用資源。
可以把它簡單的理解成我們停車場入口立著的那個(gè)顯示屏,每有一輛車進(jìn)入停車場顯示屏就會顯示剩余車位減1,每有一輛車從停車場出去,顯示屏上顯示的剩余車輛就會加1,當(dāng)顯示屏上的剩余車位為0時(shí),停車場入口的欄桿就不會再打開,車輛就無法進(jìn)入停車場了,直到有一輛車從停車場出去為止。
2、使用場景
主要用于那些資源有明確訪問數(shù)量限制的場景,常用于限流 。
比如:數(shù)據(jù)庫連接池,同時(shí)進(jìn)行連接的線程有數(shù)量限制,連接不能超過一定的數(shù)量,當(dāng)連接達(dá)到了限制數(shù)量后,后面的線程只能排隊(duì)等前面的線程釋放了數(shù)據(jù)庫連接才能獲得數(shù)據(jù)庫連接。
public class TestPoolSemaphore { public static void main(String[] args) { Pool pool = new Pool(2); for (int i = 0; i < 5; i++) { new Thread(() -> { Connection conn = pool.borrow(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } pool.free(conn); }).start(); } } } @Slf4j(topic = "c.Pool") class Pool { // 1. 連接池大小 private final int poolSize; // 2. 連接對象數(shù)組 private Connection[] connections; // 3. 連接狀態(tài)數(shù)組 0 表示空閑, 1 表示繁忙 private AtomicIntegerArray states; private Semaphore semaphore; // 4. 構(gòu)造方法初始化 public Pool(int poolSize) { this.poolSize = poolSize; // 讓許可數(shù)與資源數(shù)一致 this.semaphore = new Semaphore(poolSize); this.connections = new Connection[poolSize]; this.states = new AtomicIntegerArray(new int[poolSize]); for (int i = 0; i < poolSize; i++) { connections[i] = new MockConnection("連接" + (i+1)); } } // 5. 借連接 public Connection borrow() {// t1, t2, t3 // 獲取許可 try { semaphore.acquire(); // 沒有許可的線程,在此等待 } catch (InterruptedException e) { e.printStackTrace(); } for (int i = 0; i < poolSize; i++) { // 獲取空閑連接 if(states.get(i) == 0) { if (states.compareAndSet(i, 0, 1)) { log.debug("borrow {}", connections[i]); return connections[i]; } } } // 不會執(zhí)行到這里 return null; } // 6. 歸還連接 public void free(Connection conn) { for (int i = 0; i < poolSize; i++) { if (connections[i] == conn) { states.set(i, 0); log.debug("free {}", conn); semaphore.release(); break; } } } } class MockConnection implements Connection { private String name; public MockConnection(String name) { this.name = name; } @Override public String toString() { return "MockConnection{" + "name='" + name + '\'' + '}'; } @Override public Statement createStatement() throws SQLException { return null; } @Override public PreparedStatement prepareStatement(String sql) throws SQLException { return null; } @Override public CallableStatement prepareCall(String sql) throws SQLException { return null; } @Override public String nativeSQL(String sql) throws SQLException { return null; } @Override public void setAutoCommit(boolean autoCommit) throws SQLException { } @Override public boolean getAutoCommit() throws SQLException { return false; } @Override public void commit() throws SQLException { } @Override public void rollback() throws SQLException { } @Override public void close() throws SQLException { } @Override public boolean isClosed() throws SQLException { return false; } @Override public DatabaseMetaData getMetaData() throws SQLException { return null; } @Override public void setReadOnly(boolean readOnly) throws SQLException { } @Override public boolean isReadOnly() throws SQLException { return false; } @Override public void setCatalog(String catalog) throws SQLException { } @Override public String getCatalog() throws SQLException { return null; } @Override public void setTransactionIsolation(int level) throws SQLException { } @Override public int getTransactionIsolation() throws SQLException { return 0; } @Override public SQLWarning getWarnings() throws SQLException { return null; } @Override public void clearWarnings() throws SQLException { } @Override public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { return null; } @Override public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { return null; } @Override public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { return null; } @Override public Map<String, Class<?>> getTypeMap() throws SQLException { return null; } @Override public void setTypeMap(Map<String, Class<?>> map) throws SQLException { } @Override public void setHoldability(int holdability) throws SQLException { } @Override public int getHoldability() throws SQLException { return 0; } @Override public Savepoint setSavepoint() throws SQLException { return null; } @Override public Savepoint setSavepoint(String name) throws SQLException { return null; } @Override public void rollback(Savepoint savepoint) throws SQLException { } @Override public void releaseSavepoint(Savepoint savepoint) throws SQLException { } @Override public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { return null; } @Override public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { return null; } @Override public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { return null; } @Override public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { return null; } @Override public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { return null; } @Override public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { return null; } @Override public Clob createClob() throws SQLException { return null; } @Override public Blob createBlob() throws SQLException { return null; } @Override public NClob createNClob() throws SQLException { return null; } @Override public SQLXML createSQLXML() throws SQLException { return null; } @Override public boolean isValid(int timeout) throws SQLException { return false; } @Override public void setClientInfo(String name, String value) throws SQLClientInfoException { } @Override public void setClientInfo(Properties properties) throws SQLClientInfoException { } @Override public String getClientInfo(String name) throws SQLException { return null; } @Override public Properties getClientInfo() throws SQLException { return null; } @Override public Array createArrayOf(String typeName, Object[] elements) throws SQLException { return null; } @Override public Struct createStruct(String typeName, Object[] attributes) throws SQLException { return null; } @Override public void setSchema(String schema) throws SQLException { } @Override public String getSchema() throws SQLException { return null; } @Override public void abort(Executor executor) throws SQLException { } @Override public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { } @Override public int getNetworkTimeout() throws SQLException { return 0; } @Override public <T> T unwrap(Class<T> iface) throws SQLException { return null; } @Override public boolean isWrapperFor(Class<?> iface) throws SQLException { return false; } }
比如:停車場場景,車位數(shù)量有限,同時(shí)只能容納多少臺車,車位滿了之后只有等里面的車離開停車場外面的車才可以進(jìn)入。
/** * @author WGR * @create 2020/12/27 -- 22:19 */ public class Test1 { public static void main(String[] args) { // 1. 創(chuàng)建 semaphore 對象 Semaphore semaphore = new Semaphore(3); // 2. 10個(gè)線程同時(shí)運(yùn)行 for (int i = 0; i < 10; i++) { final int x = i; new Thread(() -> { // 3. 獲取許可 try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } try { System.out.println(x +"占到車位。。。"); Thread.sleep(1); System.out.println(x +"釋放車位。。。"); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 4. 釋放許可 semaphore.release(); } }).start(); } } }
3、Semaphore實(shí)現(xiàn)原理
(1)、Semaphore初始化
Semaphore semaphore=new Semaphore(3);
1、當(dāng)調(diào)用new Semaphore(3) 方法時(shí),默認(rèn)會創(chuàng)建一個(gè)非公平的鎖的同步阻塞隊(duì)列。
2、把初始令牌數(shù)量賦值給同步隊(duì)列的state狀態(tài),state的值就代表當(dāng)前所剩余的令牌數(shù)量。
(2)獲取令牌
semaphore.acquire();
1、當(dāng)前線程會嘗試去同步隊(duì)列獲取一個(gè)令牌,獲取令牌的過程也就是使用原子的操作去修改同步隊(duì)列的state ,獲取一個(gè)令牌則修改為state=state-1。
2、 當(dāng)計(jì)算出來的state<0,則代表令牌數(shù)量不足,此時(shí)會創(chuàng)建一個(gè)Node節(jié)點(diǎn)加入阻塞隊(duì)列,掛起當(dāng)前線程。
3、當(dāng)計(jì)算出來的state>=0,則代表獲取令牌成功。
源碼:
/** * 獲取1個(gè)令牌 */ public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
/** * 共享模式下獲取令牌,獲取成功則返回,失敗則加入阻塞隊(duì)列,掛起線程 * @param arg * @throws InterruptedException */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //嘗試獲取令牌,arg為獲取令牌個(gè)數(shù),當(dāng)可用令牌數(shù)減當(dāng)前令牌數(shù)結(jié)果小于0,則創(chuàng)建一個(gè)節(jié)點(diǎn)加入阻塞隊(duì)列,掛起當(dāng)前線程。 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
/** * 1、創(chuàng)建節(jié)點(diǎn),加入阻塞隊(duì)列, * 2、重雙向鏈表的head,tail節(jié)點(diǎn)關(guān)系,清空無效節(jié)點(diǎn) * 3、掛起當(dāng)前節(jié)點(diǎn)線程 * @param arg * @throws InterruptedException */ private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //創(chuàng)建節(jié)點(diǎn)加入阻塞隊(duì)列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { //獲得當(dāng)前節(jié)點(diǎn)pre節(jié)點(diǎn) final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg);//返回鎖的state if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //重組雙向鏈表,清空無效節(jié)點(diǎn),掛起當(dāng)前線程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
Semaphore 有點(diǎn)像一個(gè)停車場,permits 就好像停車位數(shù)量,當(dāng)線程獲得了 permits 就像是獲得了停車位,然后停車場顯示空余車位減一剛開始,permits(state)為 3,這時(shí) 5 個(gè)線程來獲取資源
假設(shè)其中 Thread-1,Thread-2,Thread-4 cas 競爭成功,而 Thread-0 和 Thread-3 競爭失敗,進(jìn)入 AQS 隊(duì)列park 阻塞
(3)、釋放令牌
semaphore.release();
當(dāng)調(diào)用semaphore.release() 方法時(shí)
1、線程會嘗試釋放一個(gè)令牌,釋放令牌的過程也就是把同步隊(duì)列的state修改為state=state+1的過程
2、釋放令牌成功之后,同時(shí)會喚醒同步隊(duì)列的所有阻塞節(jié)共享節(jié)點(diǎn)線程
3、被喚醒的節(jié)點(diǎn)會重新嘗試去修改state=state-1 的操作,如果state>=0則獲取令牌成功,否則重新進(jìn)入阻塞隊(duì)列,掛起線程。
源碼:
/** * 釋放令牌 */ public void release() { sync.releaseShared(1); }
/** *釋放共享鎖,同時(shí)喚醒所有阻塞隊(duì)列共享節(jié)點(diǎn)線程 * @param arg * @return */ public final boolean releaseShared(int arg) { //釋放共享鎖 if (tryReleaseShared(arg)) { //喚醒所有共享節(jié)點(diǎn)線程 doReleaseShared(); return true; } return false; }
/** * 喚醒所有共享節(jié)點(diǎn)線程 */ private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) {//是否需要喚醒后繼節(jié)點(diǎn) if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//修改狀態(tài)為初始0 continue; unparkSuccessor(h);//喚醒h.nex節(jié)點(diǎn)線程 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)); } if (h == head) // loop if head changed break; } }
這時(shí) Thread-4 釋放了 permits,狀態(tài)如下
接下來 Thread-0 競爭成功,state 再次設(shè)置為 0,設(shè)置自己為 head 節(jié)點(diǎn),斷開原來的 head 節(jié)點(diǎn),unpark 接下來的 Thread-3 節(jié)點(diǎn),但由于 state 是 0,因此 Thread-3 在嘗試不成功后再次進(jìn)入 park 狀態(tài)
到此這篇關(guān)于Java中的Semaphore信號量使用解析的文章就介紹到這了,更多相關(guān)Semaphore信號量內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring Boot 直接用jar運(yùn)行項(xiàng)目的方法
這篇文章主要介紹了Spring Boot 直接用jar運(yùn)行項(xiàng)目的方法,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友參考下2018-02-02springboot內(nèi)置tomcat調(diào)優(yōu)并發(fā)線程數(shù)解析
這篇文章主要介紹了springboot內(nèi)置tomcat調(diào)優(yōu)并發(fā)線程數(shù)解析,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12如何自定義Jackson序列化?@JsonSerialize
這篇文章主要介紹了如何自定義Jackson序列化?@JsonSerialize,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12使用Java構(gòu)造和解析Json數(shù)據(jù)的兩種方法(詳解一)
JSON(JavaScript Object Notation) 是一種輕量級的數(shù)據(jù)交換格式,采用完全獨(dú)立于語言的文本格式,是理想的數(shù)據(jù)交換格式。接下來通過本文給大家介紹使用Java構(gòu)造和解析Json數(shù)據(jù)的兩種方法,需要的朋友參考下吧2016-03-03Java中實(shí)現(xiàn)線程的超時(shí)中斷方法實(shí)例
之前在使用Java實(shí)現(xiàn)熔斷降級組件的時(shí)候,需要實(shí)現(xiàn)接口請求的超時(shí)中斷,通過查找相關(guān)資料了解了相關(guān)的方法,下面這篇文章主要給大家介紹了關(guān)于Java中實(shí)現(xiàn)線程的超時(shí)中斷的相關(guān)資料,需要的朋友可以參考下2018-06-06