RabbitMQ消息隊(duì)列中多路復(fù)用Channel信道詳解
什么叫消息隊(duì)列
消息(Message)是指在應(yīng)用間傳送的數(shù)據(jù)。
消息可以非常簡(jiǎn)單,比如只包含文本字符串,也可以更復(fù)雜,可能包含嵌入對(duì)象。
消息隊(duì)列(Message Queue)是一種應(yīng)用間的通信方式,消息發(fā)送后可以立即返回,由消息系統(tǒng)來確保消息的可靠傳遞。
消息發(fā)布者只管把消息發(fā)布到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而不管是誰發(fā)布的。
這樣發(fā)布者和使用者都不用知道對(duì)方的存在。
最簡(jiǎn)單的理解是開了兩個(gè)定時(shí)器的線程,分別是上傳參數(shù)和下載數(shù)據(jù)兩個(gè)不同的線程,他們之間通過數(shù)據(jù)庫(kù)(也就是這里的queue)進(jìn)行異步聯(lián)通,但這里并不是用的定時(shí)器而是用的觀察者模式
RabbitMQ 基本概念
上面只是最簡(jiǎn)單抽象的描述,具體到 RabbitMQ 則有更詳細(xì)的概念需要解釋。
上面介紹過 RabbitMQ 是 AMQP 協(xié)議的一個(gè)開源實(shí)現(xiàn),所以其內(nèi)部實(shí)際上也是 AMQP 中的基本概念:
RabbitMQ 內(nèi)部結(jié)構(gòu)
- Message
- 消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對(duì)于其他消息的優(yōu)先權(quán))、delivery-mode(指出該消息可能需要持久性存儲(chǔ))等。
- Publisher
- 消息的生產(chǎn)者,也是一個(gè)向交換器發(fā)布消息的客戶端應(yīng)用程序。
- Exchange
- 交換器,用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊(duì)列。
- Binding
- 綁定,用于消息隊(duì)列和交換器之間的關(guān)聯(lián)。一個(gè)綁定就是基于路由鍵將交換器和消息隊(duì)列連接起來的路由規(guī)則,所以可以將交換器理解成一個(gè)由綁定構(gòu)成的路由表。
- Queue
- 消息隊(duì)列,用來保存消息直到發(fā)送給消費(fèi)者。它是消息的容器,也是消息的終點(diǎn)。一個(gè)消息可投入一個(gè)或多個(gè)隊(duì)列。消息一直在隊(duì)列里面,等待消費(fèi)者連接到這個(gè)隊(duì)列將其取走。
- Connection
- 網(wǎng)絡(luò)連接,比如一個(gè)TCP連接。
- Channel
- 信道,多路復(fù)用連接中的一條獨(dú)立的雙向數(shù)據(jù)流通道。信道是建立在真實(shí)的TCP連接內(nèi)地虛擬連接,AMQP 命令都是通過信道發(fā)出去的,不管是發(fā)布消息、訂閱隊(duì)列還是接收消息,這些動(dòng)作都是通過信道完成。因?yàn)閷?duì)于操作系統(tǒng)來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復(fù)用一條 TCP 連接。
- Consumer
- 消息的消費(fèi)者,表示一個(gè)從消息隊(duì)列中取得消息的客戶端應(yīng)用程序。
- Virtual Host
- 虛擬主機(jī),表示一批交換器、消息隊(duì)列和相關(guān)對(duì)象。虛擬主機(jī)是共享相同的身份認(rèn)證和加密環(huán)境的獨(dú)立服務(wù)器域。每個(gè) vhost 本質(zhì)上就是一個(gè) mini 版的 RabbitMQ 服務(wù)器,擁有自己的隊(duì)列、交換器、綁定和權(quán)限機(jī)制。vhost 是 AMQP 概念的基礎(chǔ),必須在連接時(shí)指定,RabbitMQ 默認(rèn)的 vhost 是 / 。
- Broker
- 表示消息隊(duì)列服務(wù)器實(shí)體。
對(duì)于理解多路復(fù)用,需要講下NIO:
其實(shí),多路復(fù)用是一種思想,多路是指多個(gè)客戶端連接線路(TCP、Channel),復(fù)用是指使用一個(gè)線程重復(fù)使用,總的來說,就是單線程能同時(shí)處理多個(gè)請(qǐng)求。要實(shí)現(xiàn)這一點(diǎn),就得改造BIO的連接模式了,BIO是客戶端直接連接服務(wù)端,NIO采用的是多路復(fù)用器 (Selector),相當(dāng)于客戶端的連接不會(huì)直接連接服務(wù)端,而是連接到多路復(fù)用器。
這樣做的好處,就是把服務(wù)端和客戶端隔離了,如果不直接連接,服務(wù)器端就不會(huì)阻塞,多路復(fù)用器會(huì)將收到的消息做為事件請(qǐng)求發(fā)送給服務(wù)端,但服務(wù)端在處理事件的時(shí)候?qū)τ谄渌蛻舳藖碚f還是阻塞的,這些事件有不同類型。
Channel
了解了多路復(fù)用這個(gè)設(shè)計(jì)后,再講下Channel部分,Channel是一個(gè)雙向讀寫通道,是異步傳輸?shù)模跀?shù)據(jù)塊結(jié)構(gòu)傳輸,BIO使用的是Stream流,基于字節(jié)傳輸。
關(guān)于性能方面,我也沒做過測(cè)試,但我知道一口咬定NIO比BIO性能要高效的言論,是錯(cuò)誤的,NIO主要解決的不是性能的問題 Channel有很多實(shí)現(xiàn),因?yàn)楸疚慕榻B的是Socket,客戶端使用SocketChannel,服務(wù)端使用ServerSocketChannel
代碼實(shí)現(xiàn)
服務(wù)端:
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class NioServer { public static void main(String[] args) throws IOException, InterruptedException { // 打開Channel服務(wù)端并綁定監(jiān)聽一個(gè)端口 ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress(8459)); ssc.configureBlocking(false); // 打開多路復(fù)用器 并注冊(cè)到 ServerSocketChannel 并監(jiān)聽連接事件 Selector selector = Selector.open(); ssc.register(selector, SelectionKey.OP_ACCEPT); System.out.println("服務(wù)器已啟動(dòng)..."); while (true) { // 如果沒有事件發(fā)生 則select() 處于阻塞狀態(tài) selector.select(); // 發(fā)生事件 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); // 處理事件 while (iterator.hasNext()) { // 拿到具體事件 SelectionKey selectionKey = iterator.next(); // 判斷事件的類型 if (selectionKey.isAcceptable()) { System.out.println("客戶端連接事件"); SocketChannel channel = ssc.accept(); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ); if (channel.finishConnect()) { System.out.println("完成連接"); } } else if (selectionKey.isReadable()) { SocketChannel sc = (SocketChannel) selectionKey.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int read = sc.read(buffer); System.out.println("收到的消息:" + new String(buffer.array(), 0, read)); // 響應(yīng)客戶端 這里可有可無 buffer.clear(); buffer.put("已收到消息".getBytes()); // 將緩沖區(qū)各標(biāo)志復(fù)位,因?yàn)橄蚶锩鎝ut了數(shù)據(jù)標(biāo)志被改變要想從中讀取數(shù)據(jù)發(fā)向服務(wù)器,就要復(fù)位 buffer.flip(); sc.write(buffer); // 設(shè)置監(jiān)聽事件的集合 這里把寫入事件加入 selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE); System.out.println("服務(wù)器向客戶端發(fā)送已確認(rèn)消息"); } else if (selectionKey.isWritable()) { System.out.println("觸發(fā)往客戶端寫入數(shù)據(jù)事件"); // 發(fā)送完了就取消監(jiān)聽寫事件,否則會(huì)無限循環(huán)觸發(fā)寫事件 selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE); } // 事件完成后,將其移除 iterator.remove(); } } }
客戶端
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Scanner; import java.util.Set; public class NioClient { public static void main(String[] args) throws IOException { //打開選擇器 Selector selector = Selector.open(); //打開通道 SocketChannel socketChannel = SocketChannel.open(); //配置非阻塞模型 socketChannel.configureBlocking(false); //連接遠(yuǎn)程主機(jī) socketChannel.connect(new InetSocketAddress("127.0.0.1", 8459)); //注冊(cè)事件 socketChannel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ ); //循環(huán)處理 new Thread(() -> { while (true) { try { selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iter = keys.iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); if (key.isConnectable()) { //連接建立或者連接建立不成功 SocketChannel channel = (SocketChannel) key.channel(); //完成連接的建立 if (channel.finishConnect()) { System.out.println("完成連接"); } } else if (key.isReadable()) { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.clear(); channel.read(buffer); System.out.println("客戶端收到消息:" + new String(buffer.array())); key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); } else if (key.isWritable()) { System.out.println("客戶端向服務(wù)端寫入數(shù)據(jù)"); // 設(shè)置監(jiān)聽事件的集合 這里把寫入事件加入 key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); } iter.remove(); } } catch (IOException e) { e.printStackTrace(); break; } } }).start(); Scanner scanner = new Scanner(System.in); while (true) { System.out.println("請(qǐng)輸入要發(fā)送的字符串"); String str = scanner.nextLine(); ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.put(str.getBytes()); buffer.flip(); socketChannel.write(buffer); } } }
講完多路復(fù)用,接下來就是TCP實(shí)現(xiàn)的多路復(fù)用,整體和NIO很相似,都是把請(qǐng)求注冊(cè)到selector中,根據(jù)SelectionKey進(jìn)行阻塞式的通信,因?yàn)橛辛俗?cè)和SelectionKey使得單線程能同時(shí)處理多個(gè)請(qǐng)求成為可能
IO多路復(fù)用(multiplexing)屬于同步IO網(wǎng)絡(luò)模型
是以Reactor模式實(shí)現(xiàn)
常見的IO多路復(fù)用應(yīng)用有:select、poll、epoll
本篇文章采用Java的NIO框架來實(shí)現(xiàn)單線程的IO多路復(fù)用
Reactor模式的組成角色
1. Reactor:負(fù)責(zé)派發(fā)IO事件給對(duì)應(yīng)的角色處理。為了監(jiān)聽I(yíng)O事件,select必須實(shí)現(xiàn)在Reactor中。
2. Acceptor:負(fù)責(zé)接受client的連線,然后給client綁定一個(gè)Handler并注冊(cè)IO事件到Reactor上監(jiān)聽。
3. Handler:負(fù)責(zé)處理與client交互的事件或行為。通常因?yàn)镠andler要處理與所對(duì)應(yīng)client交互的多個(gè)事件或行為,為了簡(jiǎn)化設(shè)計(jì),會(huì)以狀態(tài)模式來實(shí)現(xiàn)Handler。
代碼實(shí)現(xiàn)
[TCPReactor.java]
// Reactor線程 package server; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.util.Iterator; import java.util.Set; public class TCPReactor implements Runnable { private final ServerSocketChannel ssc; private final Selector selector; public TCPReactor(int port) throws IOException { selector = Selector.open(); ssc = ServerSocketChannel.open(); InetSocketAddress addr = new InetSocketAddress(port); ssc.socket().bind(addr); // 在ServerSocketChannel綁定監(jiān)聽端口 ssc.configureBlocking(false); // 設(shè)置ServerSocketChannel為非阻塞 SelectionKey sk = ssc.register(selector, SelectionKey.OP_ACCEPT); // ServerSocketChannel向selector註冊(cè)一個(gè)OP_ACCEPT事件,然後返回該通道的key sk.attach(new Acceptor(selector, ssc)); // 給定key一個(gè)附加的Acceptor對(duì)象 } @Override public void run() { while (!Thread.interrupted()) { // 在線程被中斷前持續(xù)運(yùn)行 System.out.println("Waiting for new event on port: " + ssc.socket().getLocalPort() + "..."); try { if (selector.select() == 0) // 若沒有事件就緒則不往下執(zhí)行 continue; } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } Set<SelectionKey> selectedKeys = selector.selectedKeys(); // 取得所有已就緒事件的key集合 Iterator<SelectionKey> it = selectedKeys.iterator(); while (it.hasNext()) { dispatch((SelectionKey) (it.next())); // 根據(jù)事件的key進(jìn)行調(diào)度 it.remove(); } } } /* * name: dispatch(SelectionKey key) * description: 調(diào)度方法,根據(jù)事件綁定的對(duì)象開新線程 */ private void dispatch(SelectionKey key) { Runnable r = (Runnable) (key.attachment()); // 根據(jù)事件之key綁定的對(duì)象開新線程 if (r != null) r.run(); } }
[Acceptor.java]
// 接受連線請(qǐng)求線程 package server; import java.io.IOException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; public class Acceptor implements Runnable { private final ServerSocketChannel ssc; private final Selector selector; public Acceptor(Selector selector, ServerSocketChannel ssc) { this.ssc=ssc; this.selector=selector; } @Override public void run() { try { SocketChannel sc= ssc.accept(); // 接受client連線請(qǐng)求 System.out.println(sc.socket().getRemoteSocketAddress().toString() + " is connected."); if(sc!=null) { sc.configureBlocking(false); // 設(shè)置為非阻塞 SelectionKey sk = sc.register(selector, SelectionKey.OP_READ); // SocketChannel向selector註冊(cè)一個(gè)OP_READ事件,然後返回該通道的key selector.wakeup(); // 使一個(gè)阻塞住的selector操作立即返回 sk.attach(new TCPHandler(sk, sc)); // 給定key一個(gè)附加的TCPHandler對(duì)象 } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
我們先來簡(jiǎn)單點(diǎn)的,Handler不以??狀態(tài)模式實(shí)現(xiàn),只以比較直覺的方式實(shí)現(xiàn)。
[TCPHandler.java]
// Handler線程 package server; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class TCPHandler implements Runnable { private final SelectionKey sk; private final SocketChannel sc; int state; public TCPHandler(SelectionKey sk, SocketChannel sc) { this.sk = sk; this.sc = sc; state = 0; // 初始狀態(tài)設(shè)定為READING } @Override public void run() { try { if (state == 0) read(); // 讀取網(wǎng)絡(luò)數(shù)據(jù) else send(); // 發(fā)送網(wǎng)絡(luò)數(shù)據(jù) } catch (IOException e) { System.out.println("[Warning!] A client has been closed."); closeChannel(); } } private void closeChannel() { try { sk.cancel(); sc.close(); } catch (IOException e1) { e1.printStackTrace(); } } private synchronized void read() throws IOException { // non-blocking下不可用Readers,因?yàn)镽eaders不支援non-blocking byte[] arr = new byte[1024]; ByteBuffer buf = ByteBuffer.wrap(arr); int numBytes = sc.read(buf); // 讀取字符串 if(numBytes == -1) { System.out.println("[Warning!] A client has been closed."); closeChannel(); return; } String str = new String(arr); // 將讀取到的byte內(nèi)容轉(zhuǎn)為字符串型態(tài) if ((str != null) && !str.equals(" ")) { process(str); // 邏輯處理 System.out.println(sc.socket().getRemoteSocketAddress().toString() + " > " + str); state = 1; // 改變狀態(tài) sk.interestOps(SelectionKey.OP_WRITE); // 通過key改變通道註冊(cè)的事件 sk.selector().wakeup(); // 使一個(gè)阻塞住的selector操作立即返回 } } private void send() throws IOException { // get message from message queue String str = "Your message has sent to " + sc.socket().getLocalSocketAddress().toString() + "\r\n"; ByteBuffer buf = ByteBuffer.wrap(str.getBytes()); // wrap自動(dòng)把buf的position設(shè)為0,所以不需要再flip() while (buf.hasRemaining()) { sc.write(buf); // 回傳給client回應(yīng)字符串,發(fā)送buf的position位置 到limit位置為止之間的內(nèi)容 } state = 0; // 改變狀態(tài) sk.interestOps(SelectionKey.OP_READ); // 通過key改變通道註冊(cè)的事件 sk.selector().wakeup(); // 使一個(gè)阻塞住的selector操作立即返回 } void process(String str) { // do process(decode, logically process, encode).. // .. } }
最后是主程序代碼
[Main.java]
package server; import java.io.IOException; public class Main { public static void main(String[] args) { // TODO Auto-generated method stub try { TCPReactor reactor = new TCPReactor(1333); reactor.run(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
下面附上客戶端代碼:
[Client.java]
package main.pkg; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.net.UnknownHostException; public class Client { /** * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub String hostname=args[0]; int port = Integer.parseInt(args[1]); //String hostname="127.0.0.1"; //int port=1333; System.out.println("Connecting to "+ hostname +":"+port); try { Socket client = new Socket(hostname, port); // 連接至目的地 System.out.println("Connected to "+ hostname); PrintWriter out = new PrintWriter(client.getOutputStream()); BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in)); String input; while((input=stdIn.readLine()) != null) { // 讀取輸入 out.println(input); // 發(fā)送輸入的字符串 out.flush(); // 強(qiáng)制將緩衝區(qū)內(nèi)的數(shù)據(jù)輸出 if(input.equals("exit")) { break; } System.out.println("server: "+in.readLine()); } client.close(); System.out.println("client stop."); } catch (UnknownHostException e) { // TODO Auto-generated catch block System.err.println("Don't know about host: " + hostname); } catch (IOException e) { // TODO Auto-generated catch block System.err.println("Couldn't get I/O for the socket connection"); } } }
到此這篇關(guān)于RabbitMQ消息隊(duì)列中多路復(fù)用Channel信道詳解的文章就介紹到這了,更多相關(guān)RabbitMQ多路復(fù)用Channel內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解SpringBoot配置devtools實(shí)現(xiàn)熱部署
本篇文章主要介紹了詳解SpringBoot配置devtools實(shí)現(xiàn)熱部署 ,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-05-05java使用this調(diào)用構(gòu)造函數(shù)的實(shí)現(xiàn)方法示例
這篇文章主要介紹了java使用this調(diào)用構(gòu)造函數(shù)的實(shí)現(xiàn)方法,結(jié)合實(shí)例形式分析了java面向?qū)ο蟪绦蛟O(shè)計(jì)中函數(shù)調(diào)用相關(guān)操作技巧,需要的朋友可以參考下2019-08-08SpringSecurity默認(rèn)登錄頁(yè)的使用示例教程
Spring 是非常流行和成功的 Java 應(yīng)用開發(fā)框架,Spring Security 正是 Spring 家族中的成員,Spring Security 基于 Spring 框架,提供了一套 Web 應(yīng)用安全性的完整解決方案,本文給大家介紹SpringSecurity的默認(rèn)登錄頁(yè)的使用教程,感興趣的朋友一起看看吧2023-12-12手工體驗(yàn)smtp和pop3協(xié)議 郵件實(shí)現(xiàn)詳解(二)
POP3/IMAP協(xié)議定義了郵件客戶端軟件和POP3郵件服務(wù)器的通信規(guī)則,這篇文章我們就來手工體驗(yàn)SMTP和POP3協(xié)議的奧秘,感興趣的小伙伴們可以參考一下2017-10-10Spring?Boot?Shiro?auto-configure工作流程詳解
這篇文章主要為大家介紹了Spring?Boot?Shiro?auto-configure工作流程詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-02-02