Java高并發(fā)編程之CAS實(shí)現(xiàn)無鎖隊(duì)列代碼實(shí)例
一、什么是無鎖(Lock-Free)隊(duì)列
在多線程操作中,我們通常會(huì)添加鎖來保證線程的安全,那么這樣勢(shì)必會(huì)影響程序的性能。
那么為了解決這一問題,于是就有了在無鎖操作的情況下依然能夠保證線程的安全,實(shí)現(xiàn)無鎖的原理就是利用了Campare and swap(CAS)算法
而我們的無鎖隊(duì)列無疑也是使用了這一方法。
二、線程不安全的隊(duì)列
package com.brycen.concurrency03.collections.myqueue;
public class UnThreadSafeQueue<E> {
//定義Node節(jié)點(diǎn)
private static class Node<E> {
private E element;//節(jié)點(diǎn)內(nèi)存儲(chǔ)的元素
private Node<E> next;//下一個(gè)節(jié)點(diǎn)
public Node(E element, Node<E> next) {
super();
this.element = element;
this.next = next;
}
public E getElement() {
return element;
}
public void setElement(E element) {
this.element = element;
}
public Node<E> getNext() {
return next;
}
public void setNext(Node<E> next) {
this.next = next;
}
@Override
public String toString() {
return (element == null) ? "" : element.toString();
}
}
//定義隊(duì)列的頭和尾
private Node<E> head, last;
//初始化隊(duì)列長度為0
private int size = 0;
public int size() {
return size;
}
public boolean isEmpty() {
return size == 0;
}
//返回第一個(gè)元素
public E peekFirst() {
return head.element == null ? null : head.getElement();
}
//返回最后一個(gè)元素
public E peekLast() {
return last.element == null ? null : last.getElement();
}
//在尾部添加元素
public void addLast(E element) {
Node<E> newNode = new Node<E>(element, null);
//如果為0,則代表隊(duì)列沒有元素
if (size == 0) {
head = newNode;
}else {
//隊(duì)列有元素,則將最后一個(gè)元素的下一個(gè)值設(shè)置為新的元素
last.setNext(newNode);
}
//新元素賦值給last
last = newNode;
//隊(duì)列長度+1
size++;
}
//移除并返回第一個(gè)元素
public E removeFirst() {
//如果為null,直接返回null
if (isEmpty())
return null;
//拿到第一個(gè)Node中的元素
E result = head.getElement();
//獲取第一個(gè)Node中的下一個(gè)元素并賦值給head
head = head.getNext();
//隊(duì)列長度-1
size--;
//判斷隊(duì)列是否為null,如果為null,需要將last置為null
if (size==0)
last = null;
return result;
}
public static void main(String[] args) {
UnThreadSafeQueue<String> queue = new UnThreadSafeQueue<String>();
queue.addLast("Hello");
queue.addLast("World");
queue.addLast("Java");
System.out.println(queue.removeFirst());
System.out.println(queue.removeFirst());
System.out.println(queue.removeFirst());
}
}運(yùn)行結(jié)果:
HelloWorldJava
三、線程安全的無鎖隊(duì)列
package com.brycen.concurrency03.collections.myqueue;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
public class LockFreeQueue<E> {
//定義頭和尾的原子性節(jié)點(diǎn)
private AtomicReference<Node<E>> head, last;
//定義原子性size
private AtomicInteger size = new AtomicInteger(0);
//初始化隊(duì)列,將頭和尾都指向一個(gè)null的節(jié)點(diǎn)
public LockFreeQueue() {
Node<E> node = new Node<E>(null);
head = new AtomicReference<Node<E>>(node);
last = new AtomicReference<Node<E>>(node);
}
//定義節(jié)點(diǎn)類
private static class Node<E> {
E element;
//需要volatile,因?yàn)榉乐乖趎ext賦值的時(shí)候發(fā)生重排序,并且需要對(duì)其他線程可見
volatile Node<E> next;
public Node(E element) {
this.element = element;
}
@Override
public String toString() {
return element == null ? null : element.toString();
}
}
//添加元素到隊(duì)尾
public void addLast(E element) {
//元素不允許為null
if (element == null)
throw new NullPointerException("The null element not allow");
//新建一個(gè)新節(jié)點(diǎn)
Node<E> newNode = new Node<E>(element);
//getAndSet操作為原子性操作,先獲取last的節(jié)點(diǎn)再將新的節(jié)點(diǎn)賦值給last
Node<E> oldNode = last.getAndSet(newNode);
//將舊節(jié)點(diǎn)的next指向新的節(jié)點(diǎn)
oldNode.next = newNode;
//隊(duì)列長度+1
size.incrementAndGet();
}
//移除并返回隊(duì)首元素
public E removeFirst() {
//因?yàn)殛?duì)首節(jié)點(diǎn)是存在的,但是他可能沒有下一個(gè)節(jié)點(diǎn),所以需要一個(gè)valueNode來判斷
Node<E> headNode, valueNode;
do {
//獲取到隊(duì)首節(jié)點(diǎn)
headNode = head.get();
//判斷下一個(gè)節(jié)點(diǎn)是否為null
valueNode = headNode.next;
//當(dāng)valueNode不為null,并且headNode不等于隊(duì)列的head節(jié)點(diǎn)時(shí),代表該元素被別的線程拿走的,需要重新獲取。
//當(dāng)headNode等于隊(duì)列的head時(shí)則代表頭元素沒有被其他元素拿走,并將head節(jié)點(diǎn)替換為valueNode。
} while (valueNode != null && !head.compareAndSet(headNode, valueNode));
E result = valueNode != null ? valueNode.element : null;
//valueNode的元素被拿走了,所有將其置為null
if (valueNode != null) {
valueNode.element = null;
}
//隊(duì)列長度-1
size.decrementAndGet();
return result;
}
public static void main(String[] args) throws InterruptedException {
//創(chuàng)建線程池
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
//實(shí)例化隊(duì)列
LockFreeQueue<String> queue = new LockFreeQueue<String>();
//該map用于檢查該隊(duì)列是否是線程安全的,利用其key不能重復(fù)來判斷
ConcurrentHashMap<String, Object> map = new ConcurrentHashMap<String, Object>();
//隨機(jī)數(shù)
Random random = new Random(System.currentTimeMillis());
//創(chuàng)建5個(gè)寫runnable
IntStream.range(0, 5).boxed().map(i -> (Runnable) () -> {
int count = 0;
//每個(gè)runnable往隊(duì)列插入10個(gè)元素
while (count++<10) {
//這里值用系統(tǒng)的納秒+隨機(jī)數(shù)+count,以防止重復(fù)影響map集合對(duì)隊(duì)列線程安全的判斷
queue.addLast(System.nanoTime()+":"+random.nextInt(10000)+":"+count);
}
//提交任務(wù)
}).forEach(threadPool::submit);
//創(chuàng)建5個(gè)讀runnable
IntStream.range(0, 5).boxed().map(i -> (Runnable) () -> {
int count = 10;
//每個(gè)runnable讀10個(gè)元素
while (count-->0) {
//休眠
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
//移除隊(duì)列中的隊(duì)首元素
String result = queue.removeFirst();
//輸出
System.out.println(result);
//將該元素加入map中,來判斷隊(duì)列中真實(shí)存入的元素個(gè)數(shù)
map.put(result, new Object());
}
//提交任務(wù)
}).forEach(threadPool::submit);
//關(guān)閉線程池
threadPool.shutdown();
//等待1小時(shí)候強(qiáng)制關(guān)閉線程池
threadPool.awaitTermination(1, TimeUnit.HOURS);
//打印map中的元素個(gè)數(shù)
System.out.println(map.size());
}
}運(yùn)行結(jié)果:
21135673377124:2114:1
21135673377124:1841:1
21135673535368:7640:2
21135673535316:7247:2
21135673430720:1589:1
21135673535143:670:2
21135673549201:8948:3
21135673549364:4671:3
21135673560864:9436:4
21135673551532:5637:3
21135673560412:6560:4
21135673570638:5363:5
21135673577820:9344:5
21135673570345:1147:5
21135673562713:1104:4
21135673580083:7526:6
21135673592905:8578:7
21135673589852:4333:7
21135673587482:4044:6
21135673585072:4774:6
21135673596794:8990:7
21135673600935:1491:8
21135673605719:7387:8
21135673602798:5391:8
21135673610435:7771:9
21135673610435:6732:9
21135673614788:9523:9
21135673623594:3529:10
21135673620198:3206:10
21135673620049:7079:10
21135673698937:3917:2
21135673722794:9326:5
21135673715108:8062:4
21135673683921:1847:1
21135673707190:7836:3
21135673730671:4207:6
21135673737982:9430:7
21135673756931:3648:9
21135673745386:6520:8
21135673764785:3733:10
21135673859035:6858:2
21135673840248:8995:1
21135673880691:7612:4
21135673871709:1741:3
21135673889204:9351:5
21135673897341:5110:6
21135673913246:9156:8
21135673918400:2077:9
21135673926590:1221:10
21135673905604:4850:7
50
到此這篇關(guān)于Java高并發(fā)編程之CAS實(shí)現(xiàn)無鎖隊(duì)列代碼實(shí)例的文章就介紹到這了,更多相關(guān)Java的CAS實(shí)現(xiàn)無鎖隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java中的字符型文件流FileReader和FileWriter詳細(xì)解讀
這篇文章主要介紹了Java中的字符型文件流FileReader和FileWriter詳細(xì)解讀,與字節(jié)型文件流不同,字節(jié)型文件流讀取和寫入的都是一個(gè)又一個(gè)的字節(jié),而字符型文件流操作的單位是一個(gè)又一個(gè)的字符,字符型流認(rèn)為一個(gè)字母是一個(gè)字符,而一個(gè)漢字也是一個(gè)字符,需要的朋友可以參考下2023-10-10
Mybatis SqlSessionFactory與SqlSession詳細(xì)講解
SqlSessionFactory是MyBatis的核心類之一,其最重要的功能就是提供創(chuàng)建MyBatis的核心接口SqlSession,所以我們需要先創(chuàng)建SqlSessionFactory,為此我們需要提供配置文件和相關(guān)的參數(shù)2022-11-11
Java實(shí)現(xiàn)解析.xlsb文件的示例代碼
這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)解析.xlsb文件的相關(guān)方法,文中的示例代碼講解詳細(xì),具有一定的借鑒價(jià)值,感興趣的可以了解一下2023-01-01
idea創(chuàng)建spring boot項(xiàng)目及java版本只能選擇17和21的問題
這篇文章主要介紹了idea創(chuàng)建spring boot項(xiàng)目及java版本只能選擇17和21的問題,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2024-01-01
Netty結(jié)合Protobuf進(jìn)行編解碼的方法
這篇文章主要介紹了Netty結(jié)合Protobuf進(jìn)行編解碼,通過文檔表述和代碼實(shí)例充分說明了如何進(jìn)行使用和操作,需要的朋友可以參考下2021-06-06

