欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java多線程實(shí)現(xiàn)阻塞隊(duì)列的示例代碼

 更新時(shí)間:2024年12月26日 10:09:52   作者:米飯好好吃.  
本文主要介紹了Java多線程實(shí)現(xiàn)阻塞隊(duì)列的示例代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧

1. 阻塞隊(duì)列簡(jiǎn)介

1.1 阻塞隊(duì)列概念

阻塞隊(duì)列:是一種特殊的隊(duì)列,具有隊(duì)列"先進(jìn)先出"的特性,同時(shí)相較于普通隊(duì)列,阻塞隊(duì)列是線程安全的,并且?guī)в?code>阻塞功能,表現(xiàn)形式如下:

  • 當(dāng)隊(duì)列滿時(shí),繼續(xù)入隊(duì)列就會(huì)阻塞,直到有其他線程從隊(duì)列中取出元素
  • 當(dāng)隊(duì)列空時(shí),繼續(xù)出隊(duì)列就會(huì)阻塞,直到有其他線程往隊(duì)列中插入元素

基于阻塞隊(duì)列我們可以實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型,這在后端開發(fā)場(chǎng)景中是相當(dāng)重要的!

1.2 生產(chǎn)者-消費(fèi)者模型優(yōu)勢(shì)

基于阻塞隊(duì)列實(shí)現(xiàn)的 生產(chǎn)者消費(fèi)者模型 具有以下兩大優(yōu)勢(shì):

  • 解耦合:

image.png

以搜狗搜索的服務(wù)器舉例,用戶輸入搜索關(guān)鍵字 **美容,**客戶端的請(qǐng)求到達(dá)搜狗的"入口服務(wù)器"時(shí),會(huì)將請(qǐng)求轉(zhuǎn)發(fā)到 廣告服務(wù)器 和 大搜索服務(wù)器,此時(shí)廣告服務(wù)器返回相關(guān)廣告內(nèi)容,大搜索服務(wù)器根據(jù)搜索算法匹配對(duì)應(yīng)結(jié)果返回,如果按照這種方式通信,那么入口服務(wù)器需要編寫兩套代碼分別同廣告服務(wù)器和大搜索服務(wù)器進(jìn)行交互,并且一個(gè)嚴(yán)重問(wèn)題是如果其中廣告服務(wù)器宕機(jī)了,會(huì)導(dǎo)致入口服務(wù)器無(wú)法正常工作進(jìn)而影響大搜索服務(wù)器也無(wú)法正常工作??!

image.png

而引入阻塞隊(duì)列后,入口服務(wù)器不需要知曉廣告服務(wù)器和大搜索服務(wù)器的存在,只需要往阻塞隊(duì)列中發(fā)送請(qǐng)求即可,而廣告服務(wù)器和大搜索服務(wù)器也不需要知道入口服務(wù)器的存在,只需要從阻塞隊(duì)列中取出請(qǐng)求處理完畢返回給阻塞隊(duì)列即可,并且當(dāng)其中大搜索服務(wù)器宕機(jī)時(shí),不影響其他服務(wù)器以及入口服務(wù)器的正常運(yùn)作!

  • 削峰填谷:

image.png

如果沒(méi)有阻塞隊(duì)列,當(dāng)遇到一些突發(fā)場(chǎng)景例如"雙十一"大促等客戶請(qǐng)求量激增的時(shí)候,入口服務(wù)器轉(zhuǎn)發(fā)的請(qǐng)求量增多,壓力就會(huì)變大,同理廣告服務(wù)器和大搜索服務(wù)器處理過(guò)程復(fù)雜繁多,消耗的硬件資源就會(huì)激增,達(dá)到硬件瓶頸之后服務(wù)器就宕機(jī)了(直觀現(xiàn)象就是客戶端發(fā)送請(qǐng)求,服務(wù)器不會(huì)響應(yīng)了)

image.png

而引入阻塞隊(duì)列/消息隊(duì)列之后,由于阻塞隊(duì)列只負(fù)責(zé)存儲(chǔ)相應(yīng)的請(qǐng)求或者響應(yīng),無(wú)需額外的業(yè)務(wù)處理,因此抗壓能力比廣告服務(wù)器和大搜索服務(wù)器更強(qiáng),當(dāng)客戶請(qǐng)求量激增的時(shí)候交由阻塞隊(duì)列承受,而廣告服務(wù)器和大搜索服務(wù)器只需要按照特定的速率進(jìn)行讀取并返回處理結(jié)果即可,就起到了 削峰填谷 的作用!

注意:此處的阻塞隊(duì)列在現(xiàn)實(shí)場(chǎng)景中并不是一個(gè)單純的數(shù)據(jù)結(jié)構(gòu),往往是一個(gè)基于阻塞隊(duì)列的服務(wù)器程序,例如消息隊(duì)列(MQ)

2. 標(biāo)準(zhǔn)庫(kù)中的阻塞隊(duì)列

2.1 基本介紹

Java標(biāo)準(zhǔn)庫(kù)提供了現(xiàn)成的阻塞隊(duì)列數(shù)據(jù)結(jié)構(gòu)供開發(fā)者使用,即BlockingQueue接口
BlockingQueue:該接口具有以下實(shí)現(xiàn)類:

  • ArrayBlockingQueue:基于數(shù)組實(shí)現(xiàn)的阻塞隊(duì)列
  • LinkedBlockingQueue:基于鏈表實(shí)現(xiàn)的阻塞隊(duì)列
  • PriorityBlockingQueue:帶有優(yōu)先級(jí)的阻塞隊(duì)列

BlockingQueue方法:該接口具有以下常用方法

  • 帶有阻塞功能:
  • put:向隊(duì)列中入元素,隊(duì)列滿則阻塞等待
  • take:向隊(duì)列中取出元素,隊(duì)列空則阻塞等待
  • 不帶有阻塞功能:
  • peek:返回隊(duì)頭元素(不取出)
  • poll:返回隊(duì)頭元素(取出)
  • offer:向隊(duì)列中插入元素

2.2 代碼示例

/**
 * 測(cè)試Java標(biāo)準(zhǔn)庫(kù)提供的阻塞隊(duì)列實(shí)現(xiàn)
 */
public class TestStandardBlockingQueue {

    private static BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
    public static void main(String[] args) {
        // 生產(chǎn)者
        Thread t1 = new Thread(() -> {
            int i = 0;
            while (true) {
                try {
                    queue.put(i);
                    System.out.println("生產(chǎn)數(shù)據(jù):" + i);
                    i++;
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        // 消費(fèi)者
        Thread t2 = new Thread(() -> {
            while (true) {
                try {
                    Thread.sleep(1000);
                    int ele = queue.take();
                    System.out.println("消費(fèi)數(shù)據(jù):" + ele);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        t1.start();
        t2.start();
    }
}

運(yùn)行效果

image.png

我們?cè)谥骶€程中創(chuàng)建了兩個(gè)線程,其中t1線程作為生產(chǎn)者不斷循環(huán)生產(chǎn)元素,而線程t2作為消費(fèi)者每隔1s消費(fèi)一個(gè)數(shù)據(jù),所以我們很快看到當(dāng)生產(chǎn)數(shù)據(jù)個(gè)數(shù)達(dá)到容量capacity時(shí)就會(huì)繼續(xù)生產(chǎn)就會(huì)阻塞等待,直到消費(fèi)者線程消費(fèi)數(shù)據(jù)后才可以繼續(xù)入隊(duì)列,這樣就實(shí)現(xiàn)了一個(gè) 生產(chǎn)者-消費(fèi)者模型 !

3. 自定義實(shí)現(xiàn)阻塞隊(duì)列

首先我們需要明確實(shí)現(xiàn)一個(gè)阻塞隊(duì)列需要哪些步驟?

  • 首先我們需要實(shí)現(xiàn)一個(gè)普通隊(duì)列
  • 使用鎖機(jī)制將普通隊(duì)列變成線程安全的
  • 通過(guò)特殊機(jī)制讓該隊(duì)列能夠帶有"阻塞"功能

3.1 實(shí)現(xiàn)普通隊(duì)列

相信大家如果學(xué)過(guò) 數(shù)據(jù)結(jié)構(gòu)與算法 相關(guān)課程,應(yīng)該對(duì)隊(duì)列這種數(shù)據(jù)結(jié)構(gòu)的實(shí)現(xiàn)并不陌生!實(shí)現(xiàn)隊(duì)列有基于數(shù)組的也有基于鏈表的,我們此處采用基于數(shù)組實(shí)現(xiàn)的,基于數(shù)組實(shí)現(xiàn)的循環(huán)隊(duì)列也有以下兩種方式:

  • 騰出一個(gè)空間用來(lái)判斷隊(duì)列空或者滿
  • 使用額外的變量size用來(lái)記錄當(dāng)前元素的個(gè)數(shù)

我們使用第二種方式實(shí)現(xiàn),實(shí)現(xiàn)代碼如下:

/**
 * 自定義實(shí)現(xiàn)阻塞隊(duì)列
 */
public class MyBlockingQueue {
    private int head = 0; // 頭指針
    private int tail = 0; // 尾指針
    private int size = 0; // 當(dāng)前元素個(gè)數(shù)
    private String[] array = null;
    private int capacity; // 容量

    public MyBlockingQueue(int capacity) {
        this.capacity = capacity;
        this.array = new String[capacity];
    }

    /**
     * 入隊(duì)列方法
     */
    public void put(String elem) {
        if (size == capacity) {
            // 隊(duì)列已經(jīng)滿了
            return;
        }
        array[tail] = elem;
        tail++;
        if (tail >= capacity) {
            tail = 0;
        }
        size++;
    }

    /**
     * 出隊(duì)列方法
     */
    public String take() {
        // 判斷隊(duì)列是否為空
        if (size == 0) {
            return null;
        }
        String topElem = array[head];
        head++;
        if (head >= capacity) {
            head = 0;
        }
        size--;
        return topElem;
    }

    public static void main(String[] args) {
        MyBlockingQueue queue = new MyBlockingQueue(3);
        queue.put("11");
        queue.put("22");
        queue.put("33");
        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take());
    }
}

3.2 引入鎖機(jī)制實(shí)現(xiàn)線程安全

引入synchronized關(guān)鍵字在原有隊(duì)列實(shí)現(xiàn)的基礎(chǔ)上實(shí)現(xiàn)線程安全,代碼如下:

/**
 * 自定義實(shí)現(xiàn)阻塞隊(duì)列
 */
public class MyBlockingQueue {
    private int head = 0; // 頭指針
    private int tail = 0; // 尾指針
    private int size = 0; // 當(dāng)前元素個(gè)數(shù)
    private String[] array = null;
    private int capacity; // 容量
    private Object locker = new Object(); // 鎖對(duì)象

    public MyBlockingQueue(int capacity) {
        this.capacity = capacity;
        this.array = new String[capacity];
    }

    /**
     * 入隊(duì)列方法
     */
    public void put(String elem) {
        synchronized (locker) {
            if (size == capacity) {
                // 隊(duì)列已經(jīng)滿了
                return;
            }
            array[tail] = elem;
            tail++;
            if (tail >= capacity) {
                tail = 0;
            }
            size++;
        }
    }

    /**
     * 出隊(duì)列方法
     */
    public String take() {
        String topElem = "";
        synchronized (locker) {
            // 判斷隊(duì)列是否為空
            if (size == 0) {
                return null;
            }
            topElem = array[head];
            head++;
            if (head >= capacity) {
                head = 0;
            }
            size--;
        }
        return topElem;
    }

    public static void main(String[] args) {
        MyBlockingQueue queue = new MyBlockingQueue(3);
        queue.put("11");
        queue.put("22");
        queue.put("33");
        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take());
    }
}

我們?cè)?code>put、take等關(guān)鍵方法上將 多個(gè)線程修改同一個(gè)變量 部分的操作進(jìn)行加鎖處理,實(shí)現(xiàn)線程安全!

3.3 加入阻塞功能

在普通隊(duì)列的實(shí)現(xiàn)中,如果隊(duì)列滿或者空我們直接使用return關(guān)鍵字返回,但是在多線程環(huán)境下我們希望實(shí)現(xiàn)阻塞等待的功能,這就可以使用Object類提供的wait/notify這組方法實(shí)現(xiàn)阻塞與喚醒機(jī)制了!我們就需要考慮阻塞與喚醒的時(shí)機(jī)了!
何時(shí)阻塞:這個(gè)問(wèn)題非常簡(jiǎn)單,當(dāng)隊(duì)列滿時(shí)入隊(duì)列操作就應(yīng)該阻塞等待,而當(dāng)隊(duì)列為空時(shí)出隊(duì)列操作就需要阻塞等待
何時(shí)喚醒:想必大家都可以想到,對(duì)于入隊(duì)列操作來(lái)說(shuō),只要隊(duì)列不滿就可以被喚醒,而對(duì)于出隊(duì)列操作來(lái)說(shuō),隊(duì)列不為空就可以被喚醒,因此,只要有線程調(diào)用take操作出隊(duì)列,那么入隊(duì)列的線程就可以被喚醒,而只要有線程調(diào)用put操作入隊(duì)列,那么出隊(duì)列的線程就可以被喚醒

/**
 * 自定義實(shí)現(xiàn)阻塞隊(duì)列
 */
public class MyBlockingQueue {
    private int head = 0; // 頭指針
    private int tail = 0; // 尾指針
    private int size = 0; // 當(dāng)前元素個(gè)數(shù)
    private String[] array = null;
    private int capacity; // 容量
    private Object locker = new Object(); // 鎖對(duì)象

    public MyBlockingQueue(int capacity) {
        this.capacity = capacity;
        this.array = new String[capacity];
    }

    /**
     * 入隊(duì)列方法
     */
    public void put(String elem) {
        synchronized (locker) {
            while (size == capacity) {
                // 隊(duì)列已經(jīng)滿了(進(jìn)行阻塞)
                try {
                    locker.wait();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            array[tail] = elem;
            tail++;
            if (tail >= capacity) {
                tail = 0;
            }
            size++;
            locker.notifyAll();
        }
    }

    /**
     * 出隊(duì)列方法
     */
    public String take() {
        String topElem = "";
        synchronized (locker) {
            // 判斷隊(duì)列是否為空
            while (size == 0) {
                try {
                    locker.wait();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            topElem = array[head];
            head++;
            if (head >= capacity) {
                head = 0;
            }
            size--;
            locker.notifyAll();
        }
        return topElem;
    }

    public static void main(String[] args) {
        MyBlockingQueue queue = new MyBlockingQueue(10);
        // 生產(chǎn)者
        Thread producer = new Thread(() -> {
            int i = 0;
            while (true) {
                queue.put(i + "");
                System.out.println("生產(chǎn)元素:" + i);
                i++;
            }
        });
        // 消費(fèi)者
        Thread consumer = new Thread(() -> {
            while (true) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                String elem = queue.take();
                System.out.println("消費(fèi)元素" + elem);
            }
        });
        producer.start();
        consumer.start();
    }
}

我們使用wait/notify這組操作實(shí)現(xiàn)了阻塞/喚醒功能,并且滿足必須使用在synchronized關(guān)鍵字內(nèi)部的使用條件,這里有一個(gè)注意點(diǎn)

為什么我們將if判斷條件改成了while循環(huán)呢???這是需要考慮清楚的!

image.png

如圖所示:一開始由于隊(duì)列滿所以生產(chǎn)者1進(jìn)入阻塞狀態(tài),釋放鎖,然后生產(chǎn)者2也進(jìn)入阻塞狀態(tài)釋放鎖,此時(shí)消費(fèi)者消費(fèi)一個(gè)元素后喚醒生產(chǎn)者1,然后生產(chǎn)者1生產(chǎn)一個(gè)元素后(記住此時(shí)隊(duì)列已滿)繼續(xù)喚醒,但是此時(shí)喚醒的恰恰是 生產(chǎn)者2 ,生產(chǎn)者2繼續(xù)執(zhí)行生產(chǎn)元素,于是就出現(xiàn)問(wèn)題,我們總結(jié)一下出現(xiàn)問(wèn)題的原因:

  • notifyAll是隨機(jī)喚醒,無(wú)法指定喚醒線程,因此可能出現(xiàn)生產(chǎn)者喚醒生產(chǎn)者,消費(fèi)者喚醒消費(fèi)者的情況
  • if判定條件一經(jīng)執(zhí)行就無(wú)法繼續(xù)判定,所以生產(chǎn)者2被喚醒后沒(méi)有再次判斷當(dāng)前隊(duì)列是否滿

于是我們的應(yīng)對(duì)策略就是使用while循環(huán),當(dāng)線程被喚醒使重新判斷,如果隊(duì)列仍滿,入隊(duì)列操作繼續(xù)阻塞,而隊(duì)列仍空,出隊(duì)列操作繼續(xù)阻塞!Java標(biāo)準(zhǔn)也推薦我們使用 while 關(guān)鍵字和 wait 關(guān)鍵字一起使用!

image.png

4. 應(yīng)用場(chǎng)景(實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型)

我們繼續(xù)基于我們自定義實(shí)現(xiàn)的阻塞隊(duì)列再來(lái)實(shí)現(xiàn) 生產(chǎn)者-消費(fèi)者模型代碼示例(主函數(shù))

public static void main(String[] args) {
    MyBlockingQueue queue = new MyBlockingQueue(10);
    // 生產(chǎn)者
    Thread producer = new Thread(() -> {
        int i = 0;
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            queue.put(i + "");
            System.out.println("生產(chǎn)元素:" + i);
            i++;
        }
    });
    // 消費(fèi)者
    Thread consumer = new Thread(() -> {
        while (true) {
            String elem = queue.take();
            System.out.println("消費(fèi)元素" + elem);
        }
    });
    producer.start();
    consumer.start();
}

運(yùn)行效果

image.png

此時(shí)我們創(chuàng)建兩個(gè)兩個(gè)線程,producer作為生產(chǎn)者線程每隔1s生產(chǎn)一個(gè)元素,consumer作為消費(fèi)者線程不斷消費(fèi)元素,此時(shí)我們看到的就是消費(fèi)者消費(fèi)很快,當(dāng)阻塞隊(duì)列空時(shí)就進(jìn)入阻塞狀態(tài),直到生產(chǎn)者線程生產(chǎn)元素后才被喚醒繼續(xù)執(zhí)行!此時(shí)我們真正模擬實(shí)現(xiàn)了 阻塞隊(duì)列 這樣的數(shù)據(jù)結(jié)構(gòu)!

到此這篇關(guān)于Java多線程實(shí)現(xiàn)阻塞隊(duì)列的示例代碼的文章就介紹到這了,更多相關(guān)Java 阻塞隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論