欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

基于ZooKeeper實(shí)現(xiàn)隊(duì)列源碼

 更新時(shí)間:2017年09月18日 15:43:17   作者:MassiveStars  
這篇文章主要介紹了基于ZooKeeper實(shí)現(xiàn)隊(duì)列源碼的相關(guān)內(nèi)容,包括其實(shí)現(xiàn)原理和應(yīng)用場(chǎng)景,以及對(duì)隊(duì)列的簡(jiǎn)單介紹,具有一定參考價(jià)值,需要的朋友可以了解下。

實(shí)現(xiàn)原理

先進(jìn)先出隊(duì)列是最常用的隊(duì)列,使用Zookeeper實(shí)現(xiàn)先進(jìn)先出隊(duì)列就是在特定的目錄下創(chuàng)建PERSISTENT_EQUENTIAL節(jié)點(diǎn),創(chuàng)建成功時(shí)Watcher通知等待的隊(duì)列,隊(duì)列刪除序列號(hào)最小的節(jié)點(diǎn)用以消費(fèi)。此場(chǎng)景下Zookeeper的znode用于消息存儲(chǔ),znode存儲(chǔ)的數(shù)據(jù)就是消息隊(duì)列中的消息內(nèi)容,SEQUENTIAL序列號(hào)就是消息的編號(hào),按序取出即可。由于創(chuàng)建的節(jié)點(diǎn)是持久化的,所以不必?fù)?dān)心隊(duì)列消息的丟失問(wèn)題。

隊(duì)列(Queue)

分布式隊(duì)列是通用的數(shù)據(jù)結(jié)構(gòu),為了在 Zookeeper 中實(shí)現(xiàn)分布式隊(duì)列,首先需要指定一個(gè) Znode 節(jié)點(diǎn)作為隊(duì)列節(jié)點(diǎn)(queue node), 各個(gè)分布式客戶端通過(guò)調(diào)用 create() 函數(shù)向隊(duì)列中放入數(shù)據(jù),調(diào)用create()時(shí)節(jié)點(diǎn)路徑名帶"qn-"結(jié)尾,并設(shè)置順序(sequence)節(jié)點(diǎn)標(biāo)志。 由于設(shè)置了節(jié)點(diǎn)的順序標(biāo)志,新的路徑名具有以下字符串模式:"_path-to-queue-node_/qn-X",X 是唯一自增號(hào)。需要從隊(duì)列中獲取數(shù)據(jù)/移除數(shù)據(jù)的客戶端首先調(diào)用 getChildren() 函數(shù),有數(shù)據(jù)則獲?。ǐ@取數(shù)據(jù)后可以刪除也可以不刪),沒(méi)有則在隊(duì)列節(jié)點(diǎn)(queue node)上將 watch 設(shè)置為 true,等待觸發(fā)并處理最小序號(hào)的節(jié)點(diǎn)(即從序號(hào)最小的節(jié)點(diǎn)中取數(shù)據(jù))。

應(yīng)用場(chǎng)景

Zookeeper隊(duì)列不太適合要求高性能的場(chǎng)合,但可以在數(shù)據(jù)量不大的情況下考慮使用。比如已在項(xiàng)目中使用Zookeeper又需要小規(guī)模的隊(duì)列應(yīng)用,這時(shí)可以使用Zookeeper實(shí)現(xiàn)的隊(duì)列;畢竟引進(jìn)一個(gè)消息中間件會(huì)增加系統(tǒng)的復(fù)雜性和運(yùn)維的壓力。

詳細(xì)代碼

ZookeeperClient工具類

package org.massive.common; 
import org.apache.zookeeper.WatchedEvent; 
import org.apache.zookeeper.Watcher; 
import org.apache.zookeeper.ZooKeeper; 
import java.io.IOException; 
import java.util.concurrent.CountDownLatch; 
import java.util.concurrent.TimeUnit; 
/** 
 * Created by Massive on 2016/12/18. 
 */ 
public class ZooKeeperClient { 
 private static String connectionString = "localhost:2181"; 
 private static int sessionTimeout = 10000; 
 public static ZooKeeper getInstance() throws IOException, InterruptedException { 
 //-------------------------------------------------------------- 
 // 為避免連接還未完成就執(zhí)行zookeeper的get/create/exists操作引起的(KeeperErrorCode = ConnectionLoss) 
 // 這里等Zookeeper的連接完成才返回實(shí)例 
 //-------------------------------------------------------------- 
 final CountDownLatch connectedSignal = new CountDownLatch(1); 
 ZooKeeper zk = new ZooKeeper(connectionString, sessionTimeout, new Watcher() { 
  @Override 
  public void process(WatchedEvent event) { 
   if (event.getState() == Event.KeeperState.SyncConnected) { 
   connectedSignal.countDown(); 
   } else if (event.getState() == Event.KeeperState.Expired) { 
   } 
  } 
  }); 
 connectedSignal.await(sessionTimeout, TimeUnit.MILLISECONDS); 
 return zk; 
 } 
 public static int getSessionTimeout() { 
 return sessionTimeout; 
 } 
 public static void setSessionTimeout(int sessionTimeout) { 
 ZooKeeperClient.sessionTimeout = sessionTimeout; 
 } 
}

ZooKeeperQueue

package org.massive.queue; 
import org.apache.commons.lang3.RandomUtils; 
import org.apache.zookeeper.*; 
import org.apache.zookeeper.data.Stat; 
import org.massive.common.ZooKeeperClient; 
import java.io.IOException; 
import java.io.UnsupportedEncodingException; 
import java.util.List; 
import java.util.SortedSet; 
import java.util.TreeSet; 
/** 
 * Created by Allen on 2016/12/22. 
 */ 
public class ZooKeeperQueue { 
 private ZooKeeper zk; 
 private int sessionTimeout; 
 private static byte[] ROOT_QUEUE_DATA = {0x12,0x34}; 
 private static String QUEUE_ROOT = "/QUEUE"; 
 private String queueName; 
 private String queuePath; 
 private Object mutex = new Object(); 
 public ZooKeeperQueue(String queueName) throws IOException, KeeperException, InterruptedException { 
 this.queueName = queueName; 
 this.queuePath = QUEUE_ROOT + "/" + queueName; 
 this.zk = ZooKeeperClient.getInstance(); 
 this.sessionTimeout = zk.getSessionTimeout(); 
 //---------------------------------------------------- 
 // 確保隊(duì)列根目錄/QUEUE和當(dāng)前隊(duì)列的目錄的存在 
 //---------------------------------------------------- 
 ensureExists(QUEUE_ROOT); 
 ensureExists(queuePath); 
 } 
 public byte[] consume() throws InterruptedException, KeeperException, UnsupportedEncodingException { 
 List<String> nodes = null; 
 byte[] returnVal = null; 
 Stat stat = null; 
 do { 
  synchronized (mutex) { 
  nodes = zk.getChildren(queuePath, new ProduceWatcher()); 
  //---------------------------------------------------- 
  // 如果沒(méi)有消息節(jié)點(diǎn),等待生產(chǎn)者的通知 
  //---------------------------------------------------- 
  if (nodes == null || nodes.size() == 0) { 
   mutex.wait(); 
  } else { 
   SortedSet<String> sortedNode = new TreeSet<String>(); 
   for (String node : nodes) { 
   sortedNode.add(queuePath + "/" + node); 
   } 
   //---------------------------------------------------- 
   // 消費(fèi)隊(duì)列里序列號(hào)最小的消息 
   //---------------------------------------------------- 
   String first = sortedNode.first(); 
   returnVal = zk.getData(first, false, stat); 
   zk.delete(first, -1); 
   System.out.print(Thread.currentThread().getName() + " "); 
   System.out.print("consume a message from queue:" + first); 
   System.out.println(", message data is: " + new String(returnVal,"UTF-8")); 
   return returnVal; 
  } 
  } 
 } while (true); 
 } 
 class ProduceWatcher implements Watcher { 
 @Override 
 public void process(WatchedEvent event) { 
  //---------------------------------------------------- 
  // 生產(chǎn)一條消息成功后通知一個(gè)等待線程 
  //---------------------------------------------------- 
  synchronized (mutex) { 
  mutex.notify(); 
  } 
 } 
 } 
 public void produce(byte[] data) throws KeeperException, InterruptedException, UnsupportedEncodingException { 
 //---------------------------------------------------- 
 // 確保當(dāng)前隊(duì)列目錄存在 
 // example: /QUEUE/queueName 
 //---------------------------------------------------- 
 ensureExists(queuePath); 
 String node = zk.create(queuePath + "/", data, 
  ZooDefs.Ids.OPEN_ACL_UNSAFE, 
  CreateMode.PERSISTENT_SEQUENTIAL); 
 System.out.print(Thread.currentThread().getName() + " "); 
 System.out.print("produce a message to queue:" + node); 
 System.out.println(" , message data is: " + new String(data,"UTF-8")); 
 } 
 public void ensureExists(String path) { 
 try { 
  Stat stat = zk.exists(path, false); 
  if (stat == null) { 
  zk.create(path, ROOT_QUEUE_DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 
  } 
 } catch (KeeperException e) { 
  e.printStackTrace(); 
 } catch (InterruptedException e) { 
  e.printStackTrace(); 
 } 
 } 
 public static void main(String[] args) throws IOException, InterruptedException, KeeperException { 
 String queueName = "test"; 
 final ZooKeeperQueue queue = new ZooKeeperQueue(queueName); 
 for (int i = 0; i < 10; i++) { 
  new Thread(new Runnable() { 
  @Override 
  public void run() { 
   try { 
   queue.consume(); 
   System.out.println("--------------------------------------------------------"); 
   System.out.println(); 
   } catch (InterruptedException e) { 
   e.printStackTrace(); 
   } catch (KeeperException e) { 
   e.printStackTrace(); 
   } catch (UnsupportedEncodingException e) { 
   e.printStackTrace(); 
   } 
  } 
  }).start(); 
 } 
 new Thread(new Runnable() { 
  @Override 
  public void run() { 
  for (int i = 0; i < 10; i++) { 
   try { 
   Thread.sleep(RandomUtils.nextInt(100 * i, 200 * i)); 
   queue.produce(("massive" + i).getBytes()); 
   } catch (InterruptedException e) { 
   e.printStackTrace(); 
   } catch (KeeperException e) { 
   e.printStackTrace(); 
   } catch (UnsupportedEncodingException e) { 
   e.printStackTrace(); 
   } 
  } 
  } 
 },"Produce-thread").start(); 
 } 
}

測(cè)試

運(yùn)行main方法,本機(jī)器的某次輸出結(jié)果

Produce-thread produce a message to queue:/QUEUE/test/0000000000 , message data is: massive0 
Thread-8 consume a message from queue:/QUEUE/test/0000000000, message data is: massive0 
-------------------------------------------------------- 
Produce-thread produce a message to queue:/QUEUE/test/0000000001 , message data is: massive1 
Thread-6 consume a message from queue:/QUEUE/test/0000000001, message data is: massive1 
-------------------------------------------------------- 
Produce-thread produce a message to queue:/QUEUE/test/0000000002 , message data is: massive2 
Thread-3 consume a message from queue:/QUEUE/test/0000000002, message data is: massive2 
-------------------------------------------------------- 
Produce-thread produce a message to queue:/QUEUE/test/0000000003 , message data is: massive3 
Thread-0 consume a message from queue:/QUEUE/test/0000000003, message data is: massive3 
-------------------------------------------------------- 
Produce-thread produce a message to queue:/QUEUE/test/0000000004 , message data is: massive4 
Thread-5 consume a message from queue:/QUEUE/test/0000000004, message data is: massive4 
-------------------------------------------------------- 
Produce-thread produce a message to queue:/QUEUE/test/0000000005 , message data is: massive5 
Thread-2 consume a message from queue:/QUEUE/test/0000000005, message data is: massive5 
-------------------------------------------------------- 
Produce-thread produce a message to queue:/QUEUE/test/0000000006 , message data is: massive6 
Thread-4 consume a message from queue:/QUEUE/test/0000000006, message data is: massive6 
-------------------------------------------------------- 
Produce-thread produce a message to queue:/QUEUE/test/0000000007 , message data is: massive7 
Thread-9 consume a message from queue:/QUEUE/test/0000000007, message data is: massive7 
-------------------------------------------------------- 
Produce-thread produce a message to queue:/QUEUE/test/0000000008 , message data is: massive8 
Thread-7 consume a message from queue:/QUEUE/test/0000000008, message data is: massive8 
-------------------------------------------------------- 
Produce-thread produce a message to queue:/QUEUE/test/0000000009 , message data is: massive9 
Thread-1 consume a message from queue:/QUEUE/test/0000000009, message data is: massive9 

總結(jié)

以上就是本文有關(guān)于隊(duì)列和基于ZooKeeper實(shí)現(xiàn)隊(duì)列源碼介紹的全部?jī)?nèi)容,希望對(duì)大家有所幫助。

感謝朋友們對(duì)本站的支持!

相關(guān)文章

  • 使用@PathVariable時(shí)候無(wú)法將參數(shù)映射到變量中的解決

    使用@PathVariable時(shí)候無(wú)法將參數(shù)映射到變量中的解決

    這篇文章主要介紹了使用@PathVariable時(shí)候無(wú)法將參數(shù)映射到變量中的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-08-08
  • Java并發(fā)計(jì)數(shù)器的深入理解

    Java并發(fā)計(jì)數(shù)器的深入理解

    這篇文章主要給大家介紹了關(guān)于Java并發(fā)計(jì)數(shù)器的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Java具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-05-05
  • Java基于中介者模式實(shí)現(xiàn)多人聊天室功能示例

    Java基于中介者模式實(shí)現(xiàn)多人聊天室功能示例

    這篇文章主要介紹了Java基于中介者模式實(shí)現(xiàn)多人聊天室功能,詳細(xì)分析了中介者模式的概念、原理以及使用中介模式實(shí)現(xiàn)多人聊天的步驟、操作技巧與注意事項(xiàng),需要的朋友可以參考下
    2018-05-05
  • 基于Springboot2.3訪問(wèn)本地路徑下靜態(tài)資源的方法(解決報(bào)錯(cuò):Not allowed to load local resource)

    基于Springboot2.3訪問(wèn)本地路徑下靜態(tài)資源的方法(解決報(bào)錯(cuò):Not allowed to load local

    這篇文章主要介紹了基于Springboot2.3訪問(wèn)本地路徑下靜態(tài)資源的方法(解決報(bào)錯(cuò):Not allowed to load local resource),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-08-08
  • Spring?Boot攔截器和監(jiān)聽(tīng)器實(shí)現(xiàn)對(duì)請(qǐng)求和響應(yīng)處理實(shí)戰(zhàn)

    Spring?Boot攔截器和監(jiān)聽(tīng)器實(shí)現(xiàn)對(duì)請(qǐng)求和響應(yīng)處理實(shí)戰(zhàn)

    這篇文章主要介紹了Spring?Boot攔截器和監(jiān)聽(tīng)器實(shí)現(xiàn)對(duì)請(qǐng)求和響應(yīng)處理實(shí)戰(zhàn),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-06-06
  • SpringBoot中VO/DTO/PO的具體使用

    SpringBoot中VO/DTO/PO的具體使用

    VO/DTO/PO等實(shí)體類中字段常常會(huì)存在多數(shù)相同,根據(jù)業(yè)務(wù)需求少數(shù)不同,本文主要介紹了SpringBoot中VO/DTO/PO的具體使用,感興趣的可以了解一下
    2024-03-03
  • MybatisPlus中的save方法詳解

    MybatisPlus中的save方法詳解

    save方法是Mybatis-plus框架提供的一個(gè)添加記錄的方法,它用于將一個(gè)實(shí)體對(duì)象插入到數(shù)據(jù)庫(kù)表中,這篇文章主要介紹了MybatisPlus中的save方法,需要的朋友可以參考下
    2023-11-11
  • Spring中@Primary注解的作用詳解

    Spring中@Primary注解的作用詳解

    這篇文章主要介紹了Spring中@Primary注解的作用詳解,@Primary 注解是Spring框架中的一個(gè)注解,用于標(biāo)識(shí)一個(gè)Bean作為默認(rèn)的實(shí)現(xiàn)類,當(dāng)存在多個(gè)實(shí)現(xiàn)類時(shí),通過(guò)使用@Primary注解,可以指定其中一個(gè)作為默認(rèn)的實(shí)現(xiàn)類,以便在注入時(shí)自動(dòng)選擇該實(shí)現(xiàn)類,需要的朋友可以參考下
    2023-10-10
  • 如何將復(fù)雜SQL轉(zhuǎn)換成Java對(duì)象的實(shí)例講解

    如何將復(fù)雜SQL轉(zhuǎn)換成Java對(duì)象的實(shí)例講解

    轉(zhuǎn)換復(fù)雜SQL到Java代碼,我們需要確定數(shù)據(jù)庫(kù)連接方式和工具,使用JDBC的API來(lái)連接數(shù)據(jù)庫(kù)、執(zhí)行SQL語(yǔ)句,復(fù)雜SQL語(yǔ)句可以被拆分為多個(gè)步驟,每個(gè)步驟執(zhí)行一個(gè)特定的操作,通過(guò)將SQL語(yǔ)句拆分為多個(gè)步驟,我們可以更好地理解復(fù)雜SQL的邏輯,并且更容易將其轉(zhuǎn)換為Java代碼
    2024-05-05
  • Java基礎(chǔ)之throw和throws的示例詳解

    Java基礎(chǔ)之throw和throws的示例詳解

    throw是用來(lái)拋出一個(gè)具體的異常實(shí)例,而throws是用來(lái)聲明方法可能會(huì)拋出哪些類型的異常,是對(duì)調(diào)用者的一種通知和要求,這篇文章主要介紹了Java基礎(chǔ):throw和throws的詳解,需要的朋友可以參考下
    2024-06-06

最新評(píng)論