基于ZooKeeper實現(xiàn)隊列源碼
實現(xiàn)原理
先進(jìn)先出隊列是最常用的隊列,使用Zookeeper實現(xiàn)先進(jìn)先出隊列就是在特定的目錄下創(chuàng)建PERSISTENT_EQUENTIAL節(jié)點,創(chuàng)建成功時Watcher通知等待的隊列,隊列刪除序列號最小的節(jié)點用以消費。此場景下Zookeeper的znode用于消息存儲,znode存儲的數(shù)據(jù)就是消息隊列中的消息內(nèi)容,SEQUENTIAL序列號就是消息的編號,按序取出即可。由于創(chuàng)建的節(jié)點是持久化的,所以不必?fù)?dān)心隊列消息的丟失問題。
隊列(Queue)
分布式隊列是通用的數(shù)據(jù)結(jié)構(gòu),為了在 Zookeeper 中實現(xiàn)分布式隊列,首先需要指定一個 Znode 節(jié)點作為隊列節(jié)點(queue node), 各個分布式客戶端通過調(diào)用 create() 函數(shù)向隊列中放入數(shù)據(jù),調(diào)用create()時節(jié)點路徑名帶"qn-"結(jié)尾,并設(shè)置順序(sequence)節(jié)點標(biāo)志。 由于設(shè)置了節(jié)點的順序標(biāo)志,新的路徑名具有以下字符串模式:"_path-to-queue-node_/qn-X",X 是唯一自增號。需要從隊列中獲取數(shù)據(jù)/移除數(shù)據(jù)的客戶端首先調(diào)用 getChildren() 函數(shù),有數(shù)據(jù)則獲?。ǐ@取數(shù)據(jù)后可以刪除也可以不刪),沒有則在隊列節(jié)點(queue node)上將 watch 設(shè)置為 true,等待觸發(fā)并處理最小序號的節(jié)點(即從序號最小的節(jié)點中取數(shù)據(jù))。
應(yīng)用場景
Zookeeper隊列不太適合要求高性能的場合,但可以在數(shù)據(jù)量不大的情況下考慮使用。比如已在項目中使用Zookeeper又需要小規(guī)模的隊列應(yīng)用,這時可以使用Zookeeper實現(xiàn)的隊列;畢竟引進(jìn)一個消息中間件會增加系統(tǒng)的復(fù)雜性和運維的壓力。
詳細(xì)代碼
ZookeeperClient工具類
package org.massive.common;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* Created by Massive on 2016/12/18.
*/
public class ZooKeeperClient {
private static String connectionString = "localhost:2181";
private static int sessionTimeout = 10000;
public static ZooKeeper getInstance() throws IOException, InterruptedException {
//--------------------------------------------------------------
// 為避免連接還未完成就執(zhí)行zookeeper的get/create/exists操作引起的(KeeperErrorCode = ConnectionLoss)
// 這里等Zookeeper的連接完成才返回實例
//--------------------------------------------------------------
final CountDownLatch connectedSignal = new CountDownLatch(1);
ZooKeeper zk = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
connectedSignal.countDown();
} else if (event.getState() == Event.KeeperState.Expired) {
}
}
});
connectedSignal.await(sessionTimeout, TimeUnit.MILLISECONDS);
return zk;
}
public static int getSessionTimeout() {
return sessionTimeout;
}
public static void setSessionTimeout(int sessionTimeout) {
ZooKeeperClient.sessionTimeout = sessionTimeout;
}
}
ZooKeeperQueue
package org.massive.queue;
import org.apache.commons.lang3.RandomUtils;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.massive.common.ZooKeeperClient;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
/**
* Created by Allen on 2016/12/22.
*/
public class ZooKeeperQueue {
private ZooKeeper zk;
private int sessionTimeout;
private static byte[] ROOT_QUEUE_DATA = {0x12,0x34};
private static String QUEUE_ROOT = "/QUEUE";
private String queueName;
private String queuePath;
private Object mutex = new Object();
public ZooKeeperQueue(String queueName) throws IOException, KeeperException, InterruptedException {
this.queueName = queueName;
this.queuePath = QUEUE_ROOT + "/" + queueName;
this.zk = ZooKeeperClient.getInstance();
this.sessionTimeout = zk.getSessionTimeout();
//----------------------------------------------------
// 確保隊列根目錄/QUEUE和當(dāng)前隊列的目錄的存在
//----------------------------------------------------
ensureExists(QUEUE_ROOT);
ensureExists(queuePath);
}
public byte[] consume() throws InterruptedException, KeeperException, UnsupportedEncodingException {
List<String> nodes = null;
byte[] returnVal = null;
Stat stat = null;
do {
synchronized (mutex) {
nodes = zk.getChildren(queuePath, new ProduceWatcher());
//----------------------------------------------------
// 如果沒有消息節(jié)點,等待生產(chǎn)者的通知
//----------------------------------------------------
if (nodes == null || nodes.size() == 0) {
mutex.wait();
} else {
SortedSet<String> sortedNode = new TreeSet<String>();
for (String node : nodes) {
sortedNode.add(queuePath + "/" + node);
}
//----------------------------------------------------
// 消費隊列里序列號最小的消息
//----------------------------------------------------
String first = sortedNode.first();
returnVal = zk.getData(first, false, stat);
zk.delete(first, -1);
System.out.print(Thread.currentThread().getName() + " ");
System.out.print("consume a message from queue:" + first);
System.out.println(", message data is: " + new String(returnVal,"UTF-8"));
return returnVal;
}
}
} while (true);
}
class ProduceWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
//----------------------------------------------------
// 生產(chǎn)一條消息成功后通知一個等待線程
//----------------------------------------------------
synchronized (mutex) {
mutex.notify();
}
}
}
public void produce(byte[] data) throws KeeperException, InterruptedException, UnsupportedEncodingException {
//----------------------------------------------------
// 確保當(dāng)前隊列目錄存在
// example: /QUEUE/queueName
//----------------------------------------------------
ensureExists(queuePath);
String node = zk.create(queuePath + "/", data,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
System.out.print(Thread.currentThread().getName() + " ");
System.out.print("produce a message to queue:" + node);
System.out.println(" , message data is: " + new String(data,"UTF-8"));
}
public void ensureExists(String path) {
try {
Stat stat = zk.exists(path, false);
if (stat == null) {
zk.create(path, ROOT_QUEUE_DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
String queueName = "test";
final ZooKeeperQueue queue = new ZooKeeperQueue(queueName);
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
queue.consume();
System.out.println("--------------------------------------------------------");
System.out.println();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}).start();
}
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(RandomUtils.nextInt(100 * i, 200 * i));
queue.produce(("massive" + i).getBytes());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
},"Produce-thread").start();
}
}
測試
運行main方法,本機(jī)器的某次輸出結(jié)果
Produce-thread produce a message to queue:/QUEUE/test/0000000000 , message data is: massive0 Thread-8 consume a message from queue:/QUEUE/test/0000000000, message data is: massive0 -------------------------------------------------------- Produce-thread produce a message to queue:/QUEUE/test/0000000001 , message data is: massive1 Thread-6 consume a message from queue:/QUEUE/test/0000000001, message data is: massive1 -------------------------------------------------------- Produce-thread produce a message to queue:/QUEUE/test/0000000002 , message data is: massive2 Thread-3 consume a message from queue:/QUEUE/test/0000000002, message data is: massive2 -------------------------------------------------------- Produce-thread produce a message to queue:/QUEUE/test/0000000003 , message data is: massive3 Thread-0 consume a message from queue:/QUEUE/test/0000000003, message data is: massive3 -------------------------------------------------------- Produce-thread produce a message to queue:/QUEUE/test/0000000004 , message data is: massive4 Thread-5 consume a message from queue:/QUEUE/test/0000000004, message data is: massive4 -------------------------------------------------------- Produce-thread produce a message to queue:/QUEUE/test/0000000005 , message data is: massive5 Thread-2 consume a message from queue:/QUEUE/test/0000000005, message data is: massive5 -------------------------------------------------------- Produce-thread produce a message to queue:/QUEUE/test/0000000006 , message data is: massive6 Thread-4 consume a message from queue:/QUEUE/test/0000000006, message data is: massive6 -------------------------------------------------------- Produce-thread produce a message to queue:/QUEUE/test/0000000007 , message data is: massive7 Thread-9 consume a message from queue:/QUEUE/test/0000000007, message data is: massive7 -------------------------------------------------------- Produce-thread produce a message to queue:/QUEUE/test/0000000008 , message data is: massive8 Thread-7 consume a message from queue:/QUEUE/test/0000000008, message data is: massive8 -------------------------------------------------------- Produce-thread produce a message to queue:/QUEUE/test/0000000009 , message data is: massive9 Thread-1 consume a message from queue:/QUEUE/test/0000000009, message data is: massive9
總結(jié)
以上就是本文有關(guān)于隊列和基于ZooKeeper實現(xiàn)隊列源碼介紹的全部內(nèi)容,希望對大家有所幫助。
感謝朋友們對本站的支持!
相關(guān)文章
使用@PathVariable時候無法將參數(shù)映射到變量中的解決
這篇文章主要介紹了使用@PathVariable時候無法將參數(shù)映射到變量中的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08
基于Springboot2.3訪問本地路徑下靜態(tài)資源的方法(解決報錯:Not allowed to load local
這篇文章主要介紹了基于Springboot2.3訪問本地路徑下靜態(tài)資源的方法(解決報錯:Not allowed to load local resource),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-08-08
Spring?Boot攔截器和監(jiān)聽器實現(xiàn)對請求和響應(yīng)處理實戰(zhàn)
這篇文章主要介紹了Spring?Boot攔截器和監(jiān)聽器實現(xiàn)對請求和響應(yīng)處理實戰(zhàn),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-06-06
如何將復(fù)雜SQL轉(zhuǎn)換成Java對象的實例講解
轉(zhuǎn)換復(fù)雜SQL到Java代碼,我們需要確定數(shù)據(jù)庫連接方式和工具,使用JDBC的API來連接數(shù)據(jù)庫、執(zhí)行SQL語句,復(fù)雜SQL語句可以被拆分為多個步驟,每個步驟執(zhí)行一個特定的操作,通過將SQL語句拆分為多個步驟,我們可以更好地理解復(fù)雜SQL的邏輯,并且更容易將其轉(zhuǎn)換為Java代碼2024-05-05

