欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

利用Java手寫阻塞隊(duì)列的示例代碼

 更新時(shí)間:2022年08月16日 08:30:04   作者:一無是處的研究僧  
在我們平時(shí)編程的時(shí)候一個(gè)很重要的工具就是容器,在本篇文章當(dāng)中主要給大家介紹阻塞隊(duì)列的原理,并且在了解原理之后自己動(dòng)手實(shí)現(xiàn)一個(gè)低配版的阻塞隊(duì)列,感興趣的可以嘗試一下

前言

在我們平時(shí)編程的時(shí)候一個(gè)很重要的工具就是容器,在本篇文章當(dāng)中主要給大家介紹阻塞隊(duì)列的原理,并且在了解原理之后自己動(dòng)手實(shí)現(xiàn)一個(gè)低配版的阻塞隊(duì)列。

需求分析

在前面的兩篇文章ArrayDeque(JDK雙端隊(duì)列)源碼深度剖析深入剖析(JDK)ArrayQueue源碼當(dāng)中我們仔細(xì)介紹了隊(duì)列的原理,如果大家感興趣可以查看一下!

而在本篇文章所談到的阻塞隊(duì)列當(dāng)中,是在并發(fā)的情況下使用的,上面所談到的是隊(duì)列是并發(fā)不安全的,但是阻塞隊(duì)列在并發(fā)下情況是安全的。阻塞隊(duì)列的主要的需求如下:

  • 隊(duì)列基礎(chǔ)的功能需要有,往隊(duì)列當(dāng)中放數(shù)據(jù),從隊(duì)列當(dāng)中取數(shù)據(jù)。
  • 所有的隊(duì)列操作都要是并發(fā)安全的。
  • 當(dāng)隊(duì)列滿了之后再往隊(duì)列當(dāng)中放數(shù)據(jù)的時(shí)候,線程需要被掛起,當(dāng)隊(duì)列當(dāng)中的數(shù)據(jù)被取出,讓隊(duì)列當(dāng)中有空間的時(shí)候線程需要被喚醒。
  • 當(dāng)隊(duì)列空了之后再往隊(duì)列當(dāng)中取數(shù)據(jù)的時(shí)候,線程需要被掛起,當(dāng)有線程往隊(duì)列當(dāng)中加入數(shù)據(jù)的時(shí)候被掛起的線程需要被喚醒。
  • 在我們實(shí)現(xiàn)的隊(duì)列當(dāng)中我們使用數(shù)組去存儲(chǔ)數(shù)據(jù),因此在構(gòu)造函數(shù)當(dāng)中需要提供數(shù)組的初始大小,設(shè)置用多大的數(shù)組。

阻塞隊(duì)列實(shí)現(xiàn)原理

線程阻塞和喚醒

在上面我們已經(jīng)談到了阻塞隊(duì)列是并發(fā)安全的,而且我們還有將線程喚醒和阻塞的需求,因此我們可以選擇可重入鎖ReentrantLock保證并發(fā)安全,但是我們還需要將線程喚醒和阻塞,因此我們可以選擇條件變量Condition進(jìn)行線程的喚醒和阻塞操作,在Condition當(dāng)中我們將會(huì)使用到的,主要有以下兩個(gè)函數(shù):

  • signal用于喚醒線程,當(dāng)一個(gè)線程調(diào)用Conditionsignal函數(shù)的時(shí)候就可以喚醒一個(gè)被await函數(shù)阻塞的線程。
  • await用于阻塞線程,當(dāng)一個(gè)線程調(diào)用Conditionawait函數(shù)的時(shí)候這個(gè)線程就會(huì)阻塞。

數(shù)組循環(huán)使用

因?yàn)殛?duì)列是一端進(jìn)一端出,因此隊(duì)列肯定有頭有尾。

當(dāng)我們往隊(duì)列當(dāng)中加入一些數(shù)據(jù)之后,隊(duì)列的情況可能如下:

在上圖的基礎(chǔ)之上我們?cè)谶M(jìn)行四次出隊(duì)操作,結(jié)果如下:

在上面的狀態(tài)下,我們繼續(xù)加入8個(gè)數(shù)據(jù),那么布局情況如下:

我們知道上圖在加入數(shù)據(jù)的時(shí)候不僅將數(shù)組后半部分的空間使用完了,而且可以繼續(xù)使用前半部分沒有使用過的空間,也就是說在隊(duì)列內(nèi)部實(shí)現(xiàn)了一個(gè)循環(huán)使用的過程。

為了保證數(shù)組的循環(huán)使用,我們需要用一個(gè)變量記錄隊(duì)列頭在數(shù)組當(dāng)中的位置,用一個(gè)變量記錄隊(duì)列尾部在數(shù)組當(dāng)中的位置,還需要有一個(gè)變量記錄隊(duì)列當(dāng)中有多少個(gè)數(shù)據(jù)。

代碼實(shí)現(xiàn)

成員變量定義

根據(jù)上面的分析我們可以知道,在我們自己實(shí)現(xiàn)的類當(dāng)中我們需要有如下的類成員變量:

// 用于保護(hù)臨界區(qū)的鎖
private final ReentrantLock lock;
// 用于喚醒取數(shù)據(jù)的時(shí)候被阻塞的線程
private final Condition notEmpty;
// 用于喚醒放數(shù)據(jù)的時(shí)候被阻塞的線程
private final Condition notFull;
// 用于記錄從數(shù)組當(dāng)中取數(shù)據(jù)的位置 也就是隊(duì)列頭部的位置
private int takeIndex;
// 用于記錄從數(shù)組當(dāng)中放數(shù)據(jù)的位置 也就是隊(duì)列尾部的位置
private int putIndex;
// 記錄隊(duì)列當(dāng)中有多少個(gè)數(shù)據(jù)
private int count;
// 用于存放具體數(shù)據(jù)的數(shù)組
private Object[] items;

構(gòu)造函數(shù)

我們的構(gòu)造函數(shù)也很簡單,最核心的就是傳入一個(gè)數(shù)組大小的參數(shù),并且給上面的變量進(jìn)行初始化賦值。

@SuppressWarnings("unchecked")
public MyArrayBlockingQueue(int size) {
  this.lock = new ReentrantLock();
  this.notEmpty = lock.newCondition();
  this.notFull = lock.newCondition();
  // 其實(shí)可以不用初始化 類會(huì)有默認(rèn)初始化 默認(rèn)初始化為0
  takeIndex = 0;
  putIndex = 0;
  count = 0;
  // 數(shù)組的長度肯定不能夠小于0
  if (size <= 0)
    throw new RuntimeException("size can not be less than 1");
  items = (E[])new Object[size];
}

put函數(shù)

這是一個(gè)比較重要的函數(shù)了,在這個(gè)函數(shù)當(dāng)中如果隊(duì)列沒有滿,則直接將數(shù)據(jù)放入到數(shù)組當(dāng)中即可,如果數(shù)組滿了,則需要將線程掛起。

public void put(E x){
  // put 函數(shù)可能多個(gè)線程調(diào)用 但是我們需要保證在給變量賦值的時(shí)候只能夠有一個(gè)線程
  // 因?yàn)槿绻鄠€(gè)線程同時(shí)進(jìn)行賦值的話 那么可能后一個(gè)線程的賦值操作覆蓋了前一個(gè)線程的賦值操作
  // 因此這里需要上鎖
  lock.lock();
 
  try {
    // 如果隊(duì)列當(dāng)中的數(shù)據(jù)個(gè)數(shù)等于數(shù)組的長度的話 說明數(shù)組已經(jīng)滿了
    // 這個(gè)時(shí)候需要將線程掛起
    while (count == items.length)
      notFull.await(); // 將調(diào)用 await的線程掛起
    // 當(dāng)數(shù)組沒有滿 或者在掛起之后再次喚醒的話說明數(shù)組當(dāng)中有空間了
    // 這個(gè)時(shí)候需要將數(shù)組入隊(duì) 
    // 調(diào)用入隊(duì)函數(shù)將數(shù)據(jù)入隊(duì)
    enqueue(x);
  } catch (InterruptedException e) {
    e.printStackTrace();
  } finally {
    // 解鎖
    lock.unlock();
  }
}
 
// 將數(shù)據(jù)入隊(duì)
private void enqueue(E x) {
  this.items[putIndex] = x;
  if (++putIndex == items.length)
    putIndex = 0;
  count++;
  notEmpty.signal(); // 喚醒一個(gè)被 take 函數(shù)阻塞的線程喚醒
}

offer函數(shù)

offer函數(shù)和put函數(shù)一樣,但是與put函數(shù)不同的是,當(dāng)數(shù)組當(dāng)中數(shù)據(jù)填滿之后offer函數(shù)返回false,而不是被阻塞。

public boolean offer(E e) {
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    // 如果數(shù)組滿了 則直接返回false 而不是被阻塞
    if (count == items.length)
      return false;
    else {
      // 如果數(shù)組沒有滿則直接入隊(duì) 并且返回 true
      enqueue(e);
      return true;
    }
  } finally {
    lock.unlock();
  }
}

add函數(shù)

這個(gè)函數(shù)和上面兩個(gè)函數(shù)作用一樣,也是往隊(duì)列當(dāng)中加入數(shù)據(jù),但當(dāng)單隊(duì)列滿了之后這個(gè)函數(shù)會(huì)拋出異常。

public boolean add(E e) {
  if (offer(e))
    return true;
  else
    throw new RuntimeException("Queue full");
}

take函數(shù)

這個(gè)函數(shù)主要是從隊(duì)列當(dāng)中取出一個(gè)數(shù)據(jù),但是當(dāng)隊(duì)列為空的時(shí)候,這個(gè)函數(shù)會(huì)阻塞調(diào)用該函數(shù)的線程:

public E take() throws InterruptedException {
  // 這個(gè)函數(shù)也是不能夠并發(fā)的 否則可能不同的線程取出的是同一個(gè)位置的數(shù)據(jù)
  // 進(jìn)行加鎖操作
  lock.lock();
  try {
    // 當(dāng) count 等于0 說明隊(duì)列為空
    // 需要將線程掛起等待
    while (count == 0)
      notEmpty.await();
    // 當(dāng)被喚醒之后進(jìn)行出隊(duì)操作
    return dequeue();
  }finally {
    lock.unlock();
  }
}
 
private E  dequeue() {
  final Object[] items = this.items;
  @SuppressWarnings("unchecked")
  E x = (E) items[takeIndex];
  items[takeIndex] = null; // 將對(duì)應(yīng)的位置設(shè)置為 null GC就可以回收了
  if (++takeIndex == items.length)
    takeIndex = 0;
  count--; // 隊(duì)列當(dāng)中數(shù)據(jù)少一個(gè)了
  // 因?yàn)槌鲫?duì)了一個(gè)數(shù)據(jù) 可以喚醒一個(gè)被 put 函數(shù)阻塞的線程 如果這個(gè)時(shí)候沒有被阻塞的線程
  // 這個(gè)函數(shù)就不會(huì)起作用 也就說在這個(gè)函數(shù)調(diào)用之后被 put 函數(shù)掛起的線程也不會(huì)被喚醒
  notFull.signal(); // 喚醒一個(gè)被 put 函數(shù)阻塞的線程
  return x;
}

重寫toString函數(shù)

因?yàn)槲覀冊(cè)诤竺娴臏y(cè)試函數(shù)當(dāng)中會(huì)打印我們這個(gè)類,而打印這個(gè)類的時(shí)候會(huì)調(diào)用對(duì)象的toString方法得到一個(gè)字符串,最后打印這個(gè)字符串。

@Override
public String toString() {
  StringBuilder stringBuilder = new StringBuilder();
  stringBuilder.append("[");
  // 這里需要上鎖 因?yàn)槲覀冊(cè)诖蛴〉臅r(shí)候需要打印所有的數(shù)據(jù)
  // 打印所有的數(shù)據(jù)就需要對(duì)數(shù)組進(jìn)行遍歷操作 而在進(jìn)行遍歷
  // 操作的時(shí)候是不能進(jìn)行插入和刪除操作的 因?yàn)榇蛴〉氖悄?
  // 個(gè)時(shí)刻的數(shù)據(jù)
  lock.lock();
  try {
    if (count == 0)
      stringBuilder.append("]");
    else {
      int cur = 0;
      // 對(duì)數(shù)據(jù)進(jìn)行遍歷 一共遍歷 count 次 因?yàn)閿?shù)組當(dāng)中一共有 count
      // 個(gè)數(shù)據(jù)
      while (cur != count) {
        // 從 takeIndex 位置開始進(jìn)行遍歷 因?yàn)閿?shù)據(jù)是從這個(gè)位置開始的
        stringBuilder.append(items[(cur + takeIndex) % items.length].toString() + ", ");
        cur += 1;
      }
      // 刪除掉最后一次沒用的 ", "
      stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length());
      stringBuilder.append(']');
    }
  }finally {
    lock.unlock();
  }
  return stringBuilder.toString();
}

完整代碼

整個(gè)我們自己完成的阻塞隊(duì)列的代碼如下:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
 
public class MyArrayBlockingQueue<E> {
 
  // 用于保護(hù)臨界區(qū)的鎖
  private final ReentrantLock lock;
  // 用于喚醒取數(shù)據(jù)的時(shí)候被阻塞的線程
  private final Condition notEmpty;
  // 用于喚醒放數(shù)據(jù)的時(shí)候被阻塞的線程
  private final Condition notFull;
  // 用于記錄從數(shù)組當(dāng)中取數(shù)據(jù)的位置 也就是隊(duì)列頭部的位置
  private int takeIndex;
  // 用于記錄從數(shù)組當(dāng)中放數(shù)據(jù)的位置 也就是隊(duì)列尾部的位置
  private int putIndex;
  // 記錄隊(duì)列當(dāng)中有多少個(gè)數(shù)據(jù)
  private int count;
  // 用于存放具體數(shù)據(jù)的數(shù)組
  private Object[] items;
 
 
  @SuppressWarnings("unchecked")
  public MyArrayBlockingQueue(int size) {
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.notFull = lock.newCondition();
    // 其實(shí)可以不用初始化 類會(huì)有默認(rèn)初始化 默認(rèn)初始化為0
    takeIndex = 0;
    putIndex = 0;
    count = 0;
    if (size <= 0)
      throw new RuntimeException("size can not be less than 1");
    items = (E[])new Object[size];
  }
 
  public void put(E x){
    lock.lock();
 
    try {
      while (count == items.length)
        notFull.await();
      enqueue(x);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      lock.unlock();
    }
  }
 
  private void enqueue(E x) {
    this.items[putIndex] = x;
    if (++putIndex == items.length)
      putIndex = 0;
    count++;
    notEmpty.signal();
  }
 
  private E  dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
      takeIndex = 0;
    count--;
    notFull.signal();
    return x;
  }
 
  public boolean add(E e) {
    if (offer(e))
      return true;
    else
      throw new RuntimeException("Queue full");
  }
 
  public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      if (count == items.length)
        return false;
      else {
        enqueue(e);
        return true;
      }
    } finally {
      lock.unlock();
    }
  }
 
  public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      return (count == 0) ? null : dequeue();
    } finally {
      lock.unlock();
    }
  }
 
  public E take() throws InterruptedException {
    lock.lock();
    try {
      while (count == 0)
        notEmpty.await();
      return dequeue();
    }finally {
      lock.unlock();
    }
  }
 
  @Override
  public String toString() {
    StringBuilder stringBuilder = new StringBuilder();
    stringBuilder.append("[");
    lock.lock();
    try {
      if (count == 0)
        stringBuilder.append("]");
      else {
        int cur = 0;
        while (cur != count) {
          stringBuilder.append(items[(cur + takeIndex) % items.length].toString()).append(", ");
          cur += 1;
        }
        stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length());
        stringBuilder.append(']');
      }
    }finally {
      lock.unlock();
    }
    return stringBuilder.toString();
  }
 
}

現(xiàn)在對(duì)上面的代碼進(jìn)行測(cè)試:

我們現(xiàn)在使用阻塞隊(duì)列模擬一個(gè)生產(chǎn)者消費(fèi)者模型,設(shè)置阻塞隊(duì)列的大小為5,生產(chǎn)者線程會(huì)往隊(duì)列當(dāng)中加入數(shù)據(jù),數(shù)據(jù)為0-9的10個(gè)數(shù)字,消費(fèi)者線程一共會(huì)消費(fèi)10次。

import java.util.concurrent.TimeUnit;
 
public class Test {
 
  public static void main(String[] args) throws InterruptedException {
    MyArrayBlockingQueue<Integer> queue = new MyArrayBlockingQueue<>(5);
    Thread thread = new Thread(() -> {
      for (int i = 0; i < 10; i++) {
        System.out.println(Thread.currentThread().getName() + " 往隊(duì)列當(dāng)中加入數(shù)據(jù):" + i);
        queue.put(i);
      }
    }, "生產(chǎn)者");
 
 
    Thread thread1 = new Thread(() -> {
      for (int i = 0; i < 10; i++) {
        try {
          System.out.println(Thread.currentThread().getName() + " 從隊(duì)列當(dāng)中取出數(shù)據(jù):" + queue.take());
          System.out.println(Thread.currentThread().getName() + " 當(dāng)前隊(duì)列當(dāng)中的數(shù)據(jù):" + queue);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }, "消費(fèi)者");
    thread.start();
    TimeUnit.SECONDS.sleep(3);
    thread1.start();
 
  }
}

上面代碼的輸出如下所示:

生產(chǎn)者 往隊(duì)列當(dāng)中加入數(shù)據(jù):0
生產(chǎn)者 往隊(duì)列當(dāng)中加入數(shù)據(jù):1
生產(chǎn)者 往隊(duì)列當(dāng)中加入數(shù)據(jù):2
生產(chǎn)者 往隊(duì)列當(dāng)中加入數(shù)據(jù):3
生產(chǎn)者 往隊(duì)列當(dāng)中加入數(shù)據(jù):4
生產(chǎn)者 往隊(duì)列當(dāng)中加入數(shù)據(jù):5
消費(fèi)者 從隊(duì)列當(dāng)中取出數(shù)據(jù):0
生產(chǎn)者 往隊(duì)列當(dāng)中加入數(shù)據(jù):6
消費(fèi)者 當(dāng)前隊(duì)列當(dāng)中的數(shù)據(jù):[1, 2, 3, 4, 5]
消費(fèi)者 從隊(duì)列當(dāng)中取出數(shù)據(jù):1
消費(fèi)者 當(dāng)前隊(duì)列當(dāng)中的數(shù)據(jù):[2, 3, 4, 5]
消費(fèi)者 從隊(duì)列當(dāng)中取出數(shù)據(jù):2
消費(fèi)者 當(dāng)前隊(duì)列當(dāng)中的數(shù)據(jù):[3, 4, 5, 6]
生產(chǎn)者 往隊(duì)列當(dāng)中加入數(shù)據(jù):7
消費(fèi)者 從隊(duì)列當(dāng)中取出數(shù)據(jù):3
消費(fèi)者 當(dāng)前隊(duì)列當(dāng)中的數(shù)據(jù):[4, 5, 6, 7]
消費(fèi)者 從隊(duì)列當(dāng)中取出數(shù)據(jù):4
消費(fèi)者 當(dāng)前隊(duì)列當(dāng)中的數(shù)據(jù):[5, 6, 7]
消費(fèi)者 從隊(duì)列當(dāng)中取出數(shù)據(jù):5
消費(fèi)者 當(dāng)前隊(duì)列當(dāng)中的數(shù)據(jù):[6, 7]
生產(chǎn)者 往隊(duì)列當(dāng)中加入數(shù)據(jù):8
消費(fèi)者 從隊(duì)列當(dāng)中取出數(shù)據(jù):6
消費(fèi)者 當(dāng)前隊(duì)列當(dāng)中的數(shù)據(jù):[7, 8]
消費(fèi)者 從隊(duì)列當(dāng)中取出數(shù)據(jù):7
消費(fèi)者 當(dāng)前隊(duì)列當(dāng)中的數(shù)據(jù):[8]
消費(fèi)者 從隊(duì)列當(dāng)中取出數(shù)據(jù):8
消費(fèi)者 當(dāng)前隊(duì)列當(dāng)中的數(shù)據(jù):[]
生產(chǎn)者 往隊(duì)列當(dāng)中加入數(shù)據(jù):9
消費(fèi)者 從隊(duì)列當(dāng)中取出數(shù)據(jù):9
消費(fèi)者 當(dāng)前隊(duì)列當(dāng)中的數(shù)據(jù):[]

從上面的輸出結(jié)果我們知道,生產(chǎn)者線程打印5之后被掛起了,因?yàn)槿绻麤]有被掛起,生產(chǎn)者線程肯定可以一次性輸出完成,因?yàn)橄M(fèi)者線程阻塞了3秒。但是他沒有輸出完成說明在打印5之后,因?yàn)樽枞?duì)列滿了,因而生產(chǎn)者線程被掛起了。然后消費(fèi)者開始消費(fèi),這樣阻塞隊(duì)列當(dāng)中就有空間了,生產(chǎn)者線程就可以繼續(xù)生產(chǎn)了。

總結(jié)

在本篇文章當(dāng)中,主要向大家介紹了阻塞隊(duì)列的原理并且實(shí)現(xiàn)了一個(gè)低配版的數(shù)組阻塞隊(duì)列,其實(shí)如果你了解數(shù)組隊(duì)列和鎖的話,這個(gè)代碼實(shí)現(xiàn)起來還是相對(duì)比較簡單的,我們只需要使用鎖去保證我們的程序并發(fā)安全即可。

我們?cè)趯?shí)現(xiàn)put函數(shù)的時(shí)候,如果當(dāng)前隊(duì)列已經(jīng)滿了,則當(dāng)前線程需要調(diào)用await函數(shù)進(jìn)行阻塞,當(dāng)線程被喚醒或者隊(duì)列沒有滿可以繼續(xù)執(zhí)行的時(shí)候,我們?cè)谕?duì)列當(dāng)中加入數(shù)據(jù)之后需要調(diào)用一次signal函數(shù),因?yàn)檫@樣可以喚醒在調(diào)用take函數(shù)的時(shí)候因?yàn)殛?duì)列空而阻塞的線程。

我們實(shí)現(xiàn)take函數(shù)的時(shí)候,如果當(dāng)前隊(duì)列已經(jīng)空了,則當(dāng)前線程也需要調(diào)用await函數(shù)進(jìn)行阻塞,當(dāng)線程被喚醒或者隊(duì)列不為空線程可以繼續(xù)執(zhí)行,在出隊(duì)之后需要調(diào)用一次signal函數(shù),因?yàn)檫@樣可以喚醒在調(diào)用put函數(shù)的時(shí)候因?yàn)殛?duì)列滿而阻塞的線程。

以上就是利用Java手寫阻塞隊(duì)列的示例代碼的詳細(xì)內(nèi)容,更多關(guān)于Java阻塞隊(duì)列的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評(píng)論