RabbitMQ消息隊(duì)列中多路復(fù)用Channel信道詳解
什么叫消息隊(duì)列
消息(Message)是指在應(yīng)用間傳送的數(shù)據(jù)。
消息可以非常簡(jiǎn)單,比如只包含文本字符串,也可以更復(fù)雜,可能包含嵌入對(duì)象。
消息隊(duì)列(Message Queue)是一種應(yīng)用間的通信方式,消息發(fā)送后可以立即返回,由消息系統(tǒng)來確保消息的可靠傳遞。
消息發(fā)布者只管把消息發(fā)布到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而不管是誰發(fā)布的。
這樣發(fā)布者和使用者都不用知道對(duì)方的存在。
最簡(jiǎn)單的理解是開了兩個(gè)定時(shí)器的線程,分別是上傳參數(shù)和下載數(shù)據(jù)兩個(gè)不同的線程,他們之間通過數(shù)據(jù)庫(kù)(也就是這里的queue)進(jìn)行異步聯(lián)通,但這里并不是用的定時(shí)器而是用的觀察者模式
RabbitMQ 基本概念
上面只是最簡(jiǎn)單抽象的描述,具體到 RabbitMQ 則有更詳細(xì)的概念需要解釋。
上面介紹過 RabbitMQ 是 AMQP 協(xié)議的一個(gè)開源實(shí)現(xiàn),所以其內(nèi)部實(shí)際上也是 AMQP 中的基本概念:

RabbitMQ 內(nèi)部結(jié)構(gòu)
- Message
- 消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對(duì)于其他消息的優(yōu)先權(quán))、delivery-mode(指出該消息可能需要持久性存儲(chǔ))等。
- Publisher
- 消息的生產(chǎn)者,也是一個(gè)向交換器發(fā)布消息的客戶端應(yīng)用程序。
- Exchange
- 交換器,用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊(duì)列。
- Binding
- 綁定,用于消息隊(duì)列和交換器之間的關(guān)聯(lián)。一個(gè)綁定就是基于路由鍵將交換器和消息隊(duì)列連接起來的路由規(guī)則,所以可以將交換器理解成一個(gè)由綁定構(gòu)成的路由表。
- Queue
- 消息隊(duì)列,用來保存消息直到發(fā)送給消費(fèi)者。它是消息的容器,也是消息的終點(diǎn)。一個(gè)消息可投入一個(gè)或多個(gè)隊(duì)列。消息一直在隊(duì)列里面,等待消費(fèi)者連接到這個(gè)隊(duì)列將其取走。
- Connection
- 網(wǎng)絡(luò)連接,比如一個(gè)TCP連接。
- Channel
- 信道,多路復(fù)用連接中的一條獨(dú)立的雙向數(shù)據(jù)流通道。信道是建立在真實(shí)的TCP連接內(nèi)地虛擬連接,AMQP 命令都是通過信道發(fā)出去的,不管是發(fā)布消息、訂閱隊(duì)列還是接收消息,這些動(dòng)作都是通過信道完成。因?yàn)閷?duì)于操作系統(tǒng)來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復(fù)用一條 TCP 連接。
- Consumer
- 消息的消費(fèi)者,表示一個(gè)從消息隊(duì)列中取得消息的客戶端應(yīng)用程序。
- Virtual Host
- 虛擬主機(jī),表示一批交換器、消息隊(duì)列和相關(guān)對(duì)象。虛擬主機(jī)是共享相同的身份認(rèn)證和加密環(huán)境的獨(dú)立服務(wù)器域。每個(gè) vhost 本質(zhì)上就是一個(gè) mini 版的 RabbitMQ 服務(wù)器,擁有自己的隊(duì)列、交換器、綁定和權(quán)限機(jī)制。vhost 是 AMQP 概念的基礎(chǔ),必須在連接時(shí)指定,RabbitMQ 默認(rèn)的 vhost 是 / 。
- Broker
- 表示消息隊(duì)列服務(wù)器實(shí)體。
對(duì)于理解多路復(fù)用,需要講下NIO:
其實(shí),多路復(fù)用是一種思想,多路是指多個(gè)客戶端連接線路(TCP、Channel),復(fù)用是指使用一個(gè)線程重復(fù)使用,總的來說,就是單線程能同時(shí)處理多個(gè)請(qǐng)求。要實(shí)現(xiàn)這一點(diǎn),就得改造BIO的連接模式了,BIO是客戶端直接連接服務(wù)端,NIO采用的是多路復(fù)用器 (Selector),相當(dāng)于客戶端的連接不會(huì)直接連接服務(wù)端,而是連接到多路復(fù)用器。


這樣做的好處,就是把服務(wù)端和客戶端隔離了,如果不直接連接,服務(wù)器端就不會(huì)阻塞,多路復(fù)用器會(huì)將收到的消息做為事件請(qǐng)求發(fā)送給服務(wù)端,但服務(wù)端在處理事件的時(shí)候?qū)τ谄渌蛻舳藖碚f還是阻塞的,這些事件有不同類型。

Channel
了解了多路復(fù)用這個(gè)設(shè)計(jì)后,再講下Channel部分,Channel是一個(gè)雙向讀寫通道,是異步傳輸?shù)模跀?shù)據(jù)塊結(jié)構(gòu)傳輸,BIO使用的是Stream流,基于字節(jié)傳輸。

關(guān)于性能方面,我也沒做過測(cè)試,但我知道一口咬定NIO比BIO性能要高效的言論,是錯(cuò)誤的,NIO主要解決的不是性能的問題 Channel有很多實(shí)現(xiàn),因?yàn)楸疚慕榻B的是Socket,客戶端使用SocketChannel,服務(wù)端使用ServerSocketChannel
代碼實(shí)現(xiàn)
服務(wù)端:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NioServer {
public static void main(String[] args) throws IOException, InterruptedException {
// 打開Channel服務(wù)端并綁定監(jiān)聽一個(gè)端口
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(8459));
ssc.configureBlocking(false);
// 打開多路復(fù)用器 并注冊(cè)到 ServerSocketChannel 并監(jiān)聽連接事件
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服務(wù)器已啟動(dòng)...");
while (true) {
// 如果沒有事件發(fā)生 則select() 處于阻塞狀態(tài)
selector.select();
// 發(fā)生事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
// 處理事件
while (iterator.hasNext()) {
// 拿到具體事件
SelectionKey selectionKey = iterator.next();
// 判斷事件的類型
if (selectionKey.isAcceptable()) {
System.out.println("客戶端連接事件");
SocketChannel channel = ssc.accept();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
if (channel.finishConnect()) {
System.out.println("完成連接");
}
} else if (selectionKey.isReadable()) {
SocketChannel sc = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int read = sc.read(buffer);
System.out.println("收到的消息:" + new String(buffer.array(), 0, read));
// 響應(yīng)客戶端 這里可有可無
buffer.clear();
buffer.put("已收到消息".getBytes());
// 將緩沖區(qū)各標(biāo)志復(fù)位,因?yàn)橄蚶锩鎝ut了數(shù)據(jù)標(biāo)志被改變要想從中讀取數(shù)據(jù)發(fā)向服務(wù)器,就要復(fù)位
buffer.flip();
sc.write(buffer);
// 設(shè)置監(jiān)聽事件的集合 這里把寫入事件加入
selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE);
System.out.println("服務(wù)器向客戶端發(fā)送已確認(rèn)消息");
} else if (selectionKey.isWritable()) {
System.out.println("觸發(fā)往客戶端寫入數(shù)據(jù)事件");
// 發(fā)送完了就取消監(jiān)聽寫事件,否則會(huì)無限循環(huán)觸發(fā)寫事件
selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE);
}
// 事件完成后,將其移除
iterator.remove();
}
}
}客戶端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
public class NioClient {
public static void main(String[] args) throws IOException {
//打開選擇器
Selector selector = Selector.open();
//打開通道
SocketChannel socketChannel = SocketChannel.open();
//配置非阻塞模型
socketChannel.configureBlocking(false);
//連接遠(yuǎn)程主機(jī)
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8459));
//注冊(cè)事件
socketChannel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ );
//循環(huán)處理
new Thread(() -> {
while (true) {
try {
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isConnectable()) {
//連接建立或者連接建立不成功
SocketChannel channel = (SocketChannel) key.channel();
//完成連接的建立
if (channel.finishConnect()) {
System.out.println("完成連接");
}
} else if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.clear();
channel.read(buffer);
System.out.println("客戶端收到消息:" + new String(buffer.array()));
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
} else if (key.isWritable()) {
System.out.println("客戶端向服務(wù)端寫入數(shù)據(jù)");
// 設(shè)置監(jiān)聽事件的集合 這里把寫入事件加入
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
}
iter.remove();
}
} catch (IOException e) {
e.printStackTrace();
break;
}
}
}).start();
Scanner scanner = new Scanner(System.in);
while (true) {
System.out.println("請(qǐng)輸入要發(fā)送的字符串");
String str = scanner.nextLine();
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put(str.getBytes());
buffer.flip();
socketChannel.write(buffer);
}
}
}講完多路復(fù)用,接下來就是TCP實(shí)現(xiàn)的多路復(fù)用,整體和NIO很相似,都是把請(qǐng)求注冊(cè)到selector中,根據(jù)SelectionKey進(jìn)行阻塞式的通信,因?yàn)橛辛俗?cè)和SelectionKey使得單線程能同時(shí)處理多個(gè)請(qǐng)求成為可能
IO多路復(fù)用(multiplexing)屬于同步IO網(wǎng)絡(luò)模型
是以Reactor模式實(shí)現(xiàn)
常見的IO多路復(fù)用應(yīng)用有:select、poll、epoll
本篇文章采用Java的NIO框架來實(shí)現(xiàn)單線程的IO多路復(fù)用
Reactor模式的組成角色
1. Reactor:負(fù)責(zé)派發(fā)IO事件給對(duì)應(yīng)的角色處理。為了監(jiān)聽I(yíng)O事件,select必須實(shí)現(xiàn)在Reactor中。
2. Acceptor:負(fù)責(zé)接受client的連線,然后給client綁定一個(gè)Handler并注冊(cè)IO事件到Reactor上監(jiān)聽。
3. Handler:負(fù)責(zé)處理與client交互的事件或行為。通常因?yàn)镠andler要處理與所對(duì)應(yīng)client交互的多個(gè)事件或行為,為了簡(jiǎn)化設(shè)計(jì),會(huì)以狀態(tài)模式來實(shí)現(xiàn)Handler。

代碼實(shí)現(xiàn)
[TCPReactor.java]
// Reactor線程
package server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;
public class TCPReactor implements Runnable {
private final ServerSocketChannel ssc;
private final Selector selector;
public TCPReactor(int port) throws IOException {
selector = Selector.open();
ssc = ServerSocketChannel.open();
InetSocketAddress addr = new InetSocketAddress(port);
ssc.socket().bind(addr); // 在ServerSocketChannel綁定監(jiān)聽端口
ssc.configureBlocking(false); // 設(shè)置ServerSocketChannel為非阻塞
SelectionKey sk = ssc.register(selector, SelectionKey.OP_ACCEPT); // ServerSocketChannel向selector註冊(cè)一個(gè)OP_ACCEPT事件,然後返回該通道的key
sk.attach(new Acceptor(selector, ssc)); // 給定key一個(gè)附加的Acceptor對(duì)象
}
@Override
public void run() {
while (!Thread.interrupted()) { // 在線程被中斷前持續(xù)運(yùn)行
System.out.println("Waiting for new event on port: " + ssc.socket().getLocalPort() + "...");
try {
if (selector.select() == 0) // 若沒有事件就緒則不往下執(zhí)行
continue;
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Set<SelectionKey> selectedKeys = selector.selectedKeys(); // 取得所有已就緒事件的key集合
Iterator<SelectionKey> it = selectedKeys.iterator();
while (it.hasNext()) {
dispatch((SelectionKey) (it.next())); // 根據(jù)事件的key進(jìn)行調(diào)度
it.remove();
}
}
}
/*
* name: dispatch(SelectionKey key)
* description: 調(diào)度方法,根據(jù)事件綁定的對(duì)象開新線程
*/
private void dispatch(SelectionKey key) {
Runnable r = (Runnable) (key.attachment()); // 根據(jù)事件之key綁定的對(duì)象開新線程
if (r != null)
r.run();
}
} [Acceptor.java]
// 接受連線請(qǐng)求線程
package server;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
public class Acceptor implements Runnable {
private final ServerSocketChannel ssc;
private final Selector selector;
public Acceptor(Selector selector, ServerSocketChannel ssc) {
this.ssc=ssc;
this.selector=selector;
}
@Override
public void run() {
try {
SocketChannel sc= ssc.accept(); // 接受client連線請(qǐng)求
System.out.println(sc.socket().getRemoteSocketAddress().toString() + " is connected.");
if(sc!=null) {
sc.configureBlocking(false); // 設(shè)置為非阻塞
SelectionKey sk = sc.register(selector, SelectionKey.OP_READ); // SocketChannel向selector註冊(cè)一個(gè)OP_READ事件,然後返回該通道的key
selector.wakeup(); // 使一個(gè)阻塞住的selector操作立即返回
sk.attach(new TCPHandler(sk, sc)); // 給定key一個(gè)附加的TCPHandler對(duì)象
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
} 我們先來簡(jiǎn)單點(diǎn)的,Handler不以??狀態(tài)模式實(shí)現(xiàn),只以比較直覺的方式實(shí)現(xiàn)。
[TCPHandler.java]
// Handler線程
package server;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TCPHandler implements Runnable {
private final SelectionKey sk;
private final SocketChannel sc;
int state;
public TCPHandler(SelectionKey sk, SocketChannel sc) {
this.sk = sk;
this.sc = sc;
state = 0; // 初始狀態(tài)設(shè)定為READING
}
@Override
public void run() {
try {
if (state == 0)
read(); // 讀取網(wǎng)絡(luò)數(shù)據(jù)
else
send(); // 發(fā)送網(wǎng)絡(luò)數(shù)據(jù)
} catch (IOException e) {
System.out.println("[Warning!] A client has been closed.");
closeChannel();
}
}
private void closeChannel() {
try {
sk.cancel();
sc.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
private synchronized void read() throws IOException {
// non-blocking下不可用Readers,因?yàn)镽eaders不支援non-blocking
byte[] arr = new byte[1024];
ByteBuffer buf = ByteBuffer.wrap(arr);
int numBytes = sc.read(buf); // 讀取字符串
if(numBytes == -1)
{
System.out.println("[Warning!] A client has been closed.");
closeChannel();
return;
}
String str = new String(arr); // 將讀取到的byte內(nèi)容轉(zhuǎn)為字符串型態(tài)
if ((str != null) && !str.equals(" ")) {
process(str); // 邏輯處理
System.out.println(sc.socket().getRemoteSocketAddress().toString()
+ " > " + str);
state = 1; // 改變狀態(tài)
sk.interestOps(SelectionKey.OP_WRITE); // 通過key改變通道註冊(cè)的事件
sk.selector().wakeup(); // 使一個(gè)阻塞住的selector操作立即返回
}
}
private void send() throws IOException {
// get message from message queue
String str = "Your message has sent to "
+ sc.socket().getLocalSocketAddress().toString() + "\r\n";
ByteBuffer buf = ByteBuffer.wrap(str.getBytes()); // wrap自動(dòng)把buf的position設(shè)為0,所以不需要再flip()
while (buf.hasRemaining()) {
sc.write(buf); // 回傳給client回應(yīng)字符串,發(fā)送buf的position位置 到limit位置為止之間的內(nèi)容
}
state = 0; // 改變狀態(tài)
sk.interestOps(SelectionKey.OP_READ); // 通過key改變通道註冊(cè)的事件
sk.selector().wakeup(); // 使一個(gè)阻塞住的selector操作立即返回
}
void process(String str) {
// do process(decode, logically process, encode)..
// ..
}
} 最后是主程序代碼
[Main.java]
package server;
import java.io.IOException;
public class Main {
public static void main(String[] args) {
// TODO Auto-generated method stub
try {
TCPReactor reactor = new TCPReactor(1333);
reactor.run();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
} 下面附上客戶端代碼:
[Client.java]
package main.pkg;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;
public class Client {
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
String hostname=args[0];
int port = Integer.parseInt(args[1]);
//String hostname="127.0.0.1";
//int port=1333;
System.out.println("Connecting to "+ hostname +":"+port);
try {
Socket client = new Socket(hostname, port); // 連接至目的地
System.out.println("Connected to "+ hostname);
PrintWriter out = new PrintWriter(client.getOutputStream());
BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in));
String input;
while((input=stdIn.readLine()) != null) { // 讀取輸入
out.println(input); // 發(fā)送輸入的字符串
out.flush(); // 強(qiáng)制將緩衝區(qū)內(nèi)的數(shù)據(jù)輸出
if(input.equals("exit"))
{
break;
}
System.out.println("server: "+in.readLine());
}
client.close();
System.out.println("client stop.");
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
System.err.println("Don't know about host: " + hostname);
} catch (IOException e) {
// TODO Auto-generated catch block
System.err.println("Couldn't get I/O for the socket connection");
}
}
}到此這篇關(guān)于RabbitMQ消息隊(duì)列中多路復(fù)用Channel信道詳解的文章就介紹到這了,更多相關(guān)RabbitMQ多路復(fù)用Channel內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解SpringBoot配置devtools實(shí)現(xiàn)熱部署
本篇文章主要介紹了詳解SpringBoot配置devtools實(shí)現(xiàn)熱部署 ,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-05-05
java使用this調(diào)用構(gòu)造函數(shù)的實(shí)現(xiàn)方法示例
這篇文章主要介紹了java使用this調(diào)用構(gòu)造函數(shù)的實(shí)現(xiàn)方法,結(jié)合實(shí)例形式分析了java面向?qū)ο蟪绦蛟O(shè)計(jì)中函數(shù)調(diào)用相關(guān)操作技巧,需要的朋友可以參考下2019-08-08
SpringSecurity默認(rèn)登錄頁(yè)的使用示例教程
Spring 是非常流行和成功的 Java 應(yīng)用開發(fā)框架,Spring Security 正是 Spring 家族中的成員,Spring Security 基于 Spring 框架,提供了一套 Web 應(yīng)用安全性的完整解決方案,本文給大家介紹SpringSecurity的默認(rèn)登錄頁(yè)的使用教程,感興趣的朋友一起看看吧2023-12-12
手工體驗(yàn)smtp和pop3協(xié)議 郵件實(shí)現(xiàn)詳解(二)
POP3/IMAP協(xié)議定義了郵件客戶端軟件和POP3郵件服務(wù)器的通信規(guī)則,這篇文章我們就來手工體驗(yàn)SMTP和POP3協(xié)議的奧秘,感興趣的小伙伴們可以參考一下2017-10-10
Spring?Boot?Shiro?auto-configure工作流程詳解
這篇文章主要為大家介紹了Spring?Boot?Shiro?auto-configure工作流程詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-02-02

