Java高并發(fā)編程之CAS實現(xiàn)無鎖隊列代碼實例
一、什么是無鎖(Lock-Free)隊列
在多線程操作中,我們通常會添加鎖來保證線程的安全,那么這樣勢必會影響程序的性能。
那么為了解決這一問題,于是就有了在無鎖操作的情況下依然能夠保證線程的安全,實現(xiàn)無鎖的原理就是利用了Campare and swap(CAS)算法
而我們的無鎖隊列無疑也是使用了這一方法。
二、線程不安全的隊列
package com.brycen.concurrency03.collections.myqueue; public class UnThreadSafeQueue<E> { //定義Node節(jié)點(diǎn) private static class Node<E> { private E element;//節(jié)點(diǎn)內(nèi)存儲的元素 private Node<E> next;//下一個節(jié)點(diǎn) public Node(E element, Node<E> next) { super(); this.element = element; this.next = next; } public E getElement() { return element; } public void setElement(E element) { this.element = element; } public Node<E> getNext() { return next; } public void setNext(Node<E> next) { this.next = next; } @Override public String toString() { return (element == null) ? "" : element.toString(); } } //定義隊列的頭和尾 private Node<E> head, last; //初始化隊列長度為0 private int size = 0; public int size() { return size; } public boolean isEmpty() { return size == 0; } //返回第一個元素 public E peekFirst() { return head.element == null ? null : head.getElement(); } //返回最后一個元素 public E peekLast() { return last.element == null ? null : last.getElement(); } //在尾部添加元素 public void addLast(E element) { Node<E> newNode = new Node<E>(element, null); //如果為0,則代表隊列沒有元素 if (size == 0) { head = newNode; }else { //隊列有元素,則將最后一個元素的下一個值設(shè)置為新的元素 last.setNext(newNode); } //新元素賦值給last last = newNode; //隊列長度+1 size++; } //移除并返回第一個元素 public E removeFirst() { //如果為null,直接返回null if (isEmpty()) return null; //拿到第一個Node中的元素 E result = head.getElement(); //獲取第一個Node中的下一個元素并賦值給head head = head.getNext(); //隊列長度-1 size--; //判斷隊列是否為null,如果為null,需要將last置為null if (size==0) last = null; return result; } public static void main(String[] args) { UnThreadSafeQueue<String> queue = new UnThreadSafeQueue<String>(); queue.addLast("Hello"); queue.addLast("World"); queue.addLast("Java"); System.out.println(queue.removeFirst()); System.out.println(queue.removeFirst()); System.out.println(queue.removeFirst()); } }
運(yùn)行結(jié)果:
HelloWorldJava
三、線程安全的無鎖隊列
package com.brycen.concurrency03.collections.myqueue; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.IntStream; public class LockFreeQueue<E> { //定義頭和尾的原子性節(jié)點(diǎn) private AtomicReference<Node<E>> head, last; //定義原子性size private AtomicInteger size = new AtomicInteger(0); //初始化隊列,將頭和尾都指向一個null的節(jié)點(diǎn) public LockFreeQueue() { Node<E> node = new Node<E>(null); head = new AtomicReference<Node<E>>(node); last = new AtomicReference<Node<E>>(node); } //定義節(jié)點(diǎn)類 private static class Node<E> { E element; //需要volatile,因為防止在next賦值的時候發(fā)生重排序,并且需要對其他線程可見 volatile Node<E> next; public Node(E element) { this.element = element; } @Override public String toString() { return element == null ? null : element.toString(); } } //添加元素到隊尾 public void addLast(E element) { //元素不允許為null if (element == null) throw new NullPointerException("The null element not allow"); //新建一個新節(jié)點(diǎn) Node<E> newNode = new Node<E>(element); //getAndSet操作為原子性操作,先獲取last的節(jié)點(diǎn)再將新的節(jié)點(diǎn)賦值給last Node<E> oldNode = last.getAndSet(newNode); //將舊節(jié)點(diǎn)的next指向新的節(jié)點(diǎn) oldNode.next = newNode; //隊列長度+1 size.incrementAndGet(); } //移除并返回隊首元素 public E removeFirst() { //因為隊首節(jié)點(diǎn)是存在的,但是他可能沒有下一個節(jié)點(diǎn),所以需要一個valueNode來判斷 Node<E> headNode, valueNode; do { //獲取到隊首節(jié)點(diǎn) headNode = head.get(); //判斷下一個節(jié)點(diǎn)是否為null valueNode = headNode.next; //當(dāng)valueNode不為null,并且headNode不等于隊列的head節(jié)點(diǎn)時,代表該元素被別的線程拿走的,需要重新獲取。 //當(dāng)headNode等于隊列的head時則代表頭元素沒有被其他元素拿走,并將head節(jié)點(diǎn)替換為valueNode。 } while (valueNode != null && !head.compareAndSet(headNode, valueNode)); E result = valueNode != null ? valueNode.element : null; //valueNode的元素被拿走了,所有將其置為null if (valueNode != null) { valueNode.element = null; } //隊列長度-1 size.decrementAndGet(); return result; } public static void main(String[] args) throws InterruptedException { //創(chuàng)建線程池 ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(10); //實例化隊列 LockFreeQueue<String> queue = new LockFreeQueue<String>(); //該map用于檢查該隊列是否是線程安全的,利用其key不能重復(fù)來判斷 ConcurrentHashMap<String, Object> map = new ConcurrentHashMap<String, Object>(); //隨機(jī)數(shù) Random random = new Random(System.currentTimeMillis()); //創(chuàng)建5個寫runnable IntStream.range(0, 5).boxed().map(i -> (Runnable) () -> { int count = 0; //每個runnable往隊列插入10個元素 while (count++<10) { //這里值用系統(tǒng)的納秒+隨機(jī)數(shù)+count,以防止重復(fù)影響map集合對隊列線程安全的判斷 queue.addLast(System.nanoTime()+":"+random.nextInt(10000)+":"+count); } //提交任務(wù) }).forEach(threadPool::submit); //創(chuàng)建5個讀runnable IntStream.range(0, 5).boxed().map(i -> (Runnable) () -> { int count = 10; //每個runnable讀10個元素 while (count-->0) { //休眠 try { TimeUnit.MILLISECONDS.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } //移除隊列中的隊首元素 String result = queue.removeFirst(); //輸出 System.out.println(result); //將該元素加入map中,來判斷隊列中真實存入的元素個數(shù) map.put(result, new Object()); } //提交任務(wù) }).forEach(threadPool::submit); //關(guān)閉線程池 threadPool.shutdown(); //等待1小時候強(qiáng)制關(guān)閉線程池 threadPool.awaitTermination(1, TimeUnit.HOURS); //打印map中的元素個數(shù) System.out.println(map.size()); } }
運(yùn)行結(jié)果:
21135673377124:2114:1
21135673377124:1841:1
21135673535368:7640:2
21135673535316:7247:2
21135673430720:1589:1
21135673535143:670:2
21135673549201:8948:3
21135673549364:4671:3
21135673560864:9436:4
21135673551532:5637:3
21135673560412:6560:4
21135673570638:5363:5
21135673577820:9344:5
21135673570345:1147:5
21135673562713:1104:4
21135673580083:7526:6
21135673592905:8578:7
21135673589852:4333:7
21135673587482:4044:6
21135673585072:4774:6
21135673596794:8990:7
21135673600935:1491:8
21135673605719:7387:8
21135673602798:5391:8
21135673610435:7771:9
21135673610435:6732:9
21135673614788:9523:9
21135673623594:3529:10
21135673620198:3206:10
21135673620049:7079:10
21135673698937:3917:2
21135673722794:9326:5
21135673715108:8062:4
21135673683921:1847:1
21135673707190:7836:3
21135673730671:4207:6
21135673737982:9430:7
21135673756931:3648:9
21135673745386:6520:8
21135673764785:3733:10
21135673859035:6858:2
21135673840248:8995:1
21135673880691:7612:4
21135673871709:1741:3
21135673889204:9351:5
21135673897341:5110:6
21135673913246:9156:8
21135673918400:2077:9
21135673926590:1221:10
21135673905604:4850:7
50
到此這篇關(guān)于Java高并發(fā)編程之CAS實現(xiàn)無鎖隊列代碼實例的文章就介紹到這了,更多相關(guān)Java的CAS實現(xiàn)無鎖隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java中的字符型文件流FileReader和FileWriter詳細(xì)解讀
這篇文章主要介紹了Java中的字符型文件流FileReader和FileWriter詳細(xì)解讀,與字節(jié)型文件流不同,字節(jié)型文件流讀取和寫入的都是一個又一個的字節(jié),而字符型文件流操作的單位是一個又一個的字符,字符型流認(rèn)為一個字母是一個字符,而一個漢字也是一個字符,需要的朋友可以參考下2023-10-10Mybatis SqlSessionFactory與SqlSession詳細(xì)講解
SqlSessionFactory是MyBatis的核心類之一,其最重要的功能就是提供創(chuàng)建MyBatis的核心接口SqlSession,所以我們需要先創(chuàng)建SqlSessionFactory,為此我們需要提供配置文件和相關(guān)的參數(shù)2022-11-11idea創(chuàng)建spring boot項目及java版本只能選擇17和21的問題
這篇文章主要介紹了idea創(chuàng)建spring boot項目及java版本只能選擇17和21的問題,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧2024-01-01Netty結(jié)合Protobuf進(jìn)行編解碼的方法
這篇文章主要介紹了Netty結(jié)合Protobuf進(jìn)行編解碼,通過文檔表述和代碼實例充分說明了如何進(jìn)行使用和操作,需要的朋友可以參考下2021-06-06