Java NIO Selector用法詳解【含多人聊天室實(shí)例】
本文實(shí)例講述了Java NIO Selector用法。分享給大家供大家參考,具體如下:
一、Java NIO 的核心組件
Java NIO的核心組件包括:Channel(通道),Buffer(緩沖區(qū)),Selector(選擇器),其中Channel和Buffer比較好理解
簡(jiǎn)單來(lái)說(shuō) NIO是面向通道和緩沖區(qū)的,意思就是:數(shù)據(jù)總是從通道中讀到buffer緩沖區(qū)內(nèi),或者從buffer寫入到通道中。
關(guān)于Channel 和 Buffer的詳細(xì)講解請(qǐng)看:Java NIO 教程
二、Java NIO Selector
1. Selector簡(jiǎn)介
選擇器提供選擇執(zhí)行已經(jīng)就緒的任務(wù)的能力.從底層來(lái)看,Selector提供了詢問(wèn)通道是否已經(jīng)準(zhǔn)備好執(zhí)行每個(gè)I/O操作的能力。Selector 允許單線程處理多個(gè)Channel。僅用單個(gè)線程來(lái)處理多個(gè)Channels的好處是,只需要更少的線程來(lái)處理通道。事實(shí)上,可以只用一個(gè)線程處理所有的通道,這樣會(huì)大量的減少線程之間上下文切換的開(kāi)銷。
在開(kāi)始之前,需要回顧一下Selector、SelectableChannel和SelectionKey:
選擇器(Selector)
Selector選擇器類管理著一個(gè)被注冊(cè)的通道集合的信息和它們的就緒狀態(tài)。通道是和選擇器一起被注冊(cè)的,并且使用選擇器來(lái)更新通道的就緒狀態(tài)。當(dāng)這么做的時(shí)候,可以選擇將被激發(fā)的線程掛起,直到有就緒的的通道。
可選擇通道(SelectableChannel)
SelectableChannel這個(gè)抽象類提供了實(shí)現(xiàn)通道的可選擇性所需要的公共方法。它是所有支持就緒檢查的通道類的父類。因?yàn)镕ileChannel類沒(méi)有繼承SelectableChannel因此是不是可選通道,而所有socket通道都是可選擇的,包括從管道(Pipe)對(duì)象的中獲得的通道。SelectableChannel可以被注冊(cè)到Selector對(duì)象上,同時(shí)可以指定對(duì)那個(gè)選擇器而言,那種操作是感興趣的。一個(gè)通道可以被注冊(cè)到多個(gè)選擇器上,但對(duì)每個(gè)選擇器而言只能被注冊(cè)一次。
選擇鍵(SelectionKey)
選擇鍵封裝了特定的通道與特定的選擇器的注冊(cè)關(guān)系。選擇鍵對(duì)象被SelectableChannel.register()返回并提供一個(gè)表示這種注冊(cè)關(guān)系的標(biāo)記。選擇鍵包含了兩個(gè)比特集(以整數(shù)的形式進(jìn)行編碼),指示了該注冊(cè)關(guān)系所關(guān)心的通道操作,以及通道已經(jīng)準(zhǔn)備好的操作。
下面是使用Selector管理多個(gè)channel的結(jié)構(gòu)圖:
2. Selector的使用
(1)創(chuàng)建Selector
Selector對(duì)象是通過(guò)調(diào)用靜態(tài)工廠方法open()來(lái)實(shí)例化的,如下:
Selector Selector=Selector.open();

類方法open()實(shí)際上向SPI1發(fā)出請(qǐng)求,通過(guò)默認(rèn)的SelectorProvider對(duì)象獲取一個(gè)新的實(shí)例。
(2)將Channel注冊(cè)到Selector
要實(shí)現(xiàn)Selector管理Channel,需要將channel注冊(cè)到相應(yīng)的Selector上,如下:
channel.configureBlocking(false); SelectionKey key= channel.register(selector,SelectionKey,OP_READ);
通過(guò)調(diào)用通道的register()方法會(huì)將它注冊(cè)到一個(gè)選擇器上。與Selector一起使用時(shí),Channel必須處于非阻塞模式下,否則將拋出IllegalBlockingModeException異常,這意味著不能將FileChannel與Selector一起使用,因?yàn)镕ileChannel不能切換到非阻塞模式,而套接字通道都可以。另外通道一旦被注冊(cè),將不能再回到阻塞狀態(tài),此時(shí)若調(diào)用通道的configureBlocking(true)將拋出BlockingModeException異常。
register()方法的第二個(gè)參數(shù)是“interest集合”,表示選擇器所關(guān)心的通道操作,它實(shí)際上是一個(gè)表示選擇器在檢查通道就緒狀態(tài)時(shí)需要關(guān)心的操作的比特掩碼。比如一個(gè)選擇器對(duì)通道的read和write操作感興趣,那么選擇器在檢查該通道時(shí),只會(huì)檢查通道的read和write操作是否已經(jīng)處在就緒狀態(tài)。
它有以下四種操作類型:
- Connect 連接
- Accept 接受
- Read 讀
- Write 寫
需要注意并非所有的操作在所有的可選擇通道上都能被支持,比如ServerSocketChannel支持Accept,而SocketChannel中不支持。我們可以通過(guò)通道上的validOps()方法來(lái)獲取特定通道下所有支持的操作集合。
Java中定義了四個(gè)常量來(lái)表示這四種操作類型:
SelectionKey.OP_CONNECT SelectionKey.OP_ACCEPT SelectionKey.OP_READ SelectionKey.OP_WRITE
如果Selector對(duì)通道的多操作類型感興趣,可以用“位或”操作符來(lái)實(shí)現(xiàn):
int interestSet=SelectionKey.OP_READ|SelectionKey.OP_WRITE;
當(dāng)通道觸發(fā)了某個(gè)操作之后,表示該通道的某個(gè)操作已經(jīng)就緒,可以被操作。因此,某個(gè)SocketChannel成功連接到另一個(gè)服務(wù)器稱為“連接就緒”(OP_CONNECT)。一個(gè)ServerSocketChannel準(zhǔn)備好接收新進(jìn)入的連接稱為“接收就緒”(OP_ACCEPT)。一個(gè)有數(shù)據(jù)可讀的通道可以說(shuō)是“讀就緒”(OP_READ)。等待寫數(shù)據(jù)的通道可以說(shuō)是“寫就緒”(OP_WRITE)。
我們注意到register()方法會(huì)返回一個(gè)SelectionKey對(duì)象,我們稱之為鍵對(duì)象。該對(duì)象包含了以下四種屬性:
- interest集合
- read集合
- Channel
- Selector
interest集合是Selector感興趣的集合,用于指示選擇器對(duì)通道關(guān)心的操作,可通過(guò)SelectionKey對(duì)象的interestOps()獲取。最初,該興趣集合是通道被注冊(cè)到Selector時(shí)傳進(jìn)來(lái)的值。該集合不會(huì)被選擇器改變,但是可通過(guò)interestOps()改變。我們可以通過(guò)以下方法來(lái)判斷Selector是否對(duì)Channel的某種事件感興趣:
int interestSet=selectionKey.interestOps(); boolean isInterestedInAccept = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT;
read集合是通道已經(jīng)就緒的操作的集合,表示一個(gè)通道準(zhǔn)備好要執(zhí)行的操作了,可通過(guò)SelctionKey對(duì)象的readyOps()來(lái)獲取相關(guān)通道已經(jīng)就緒的操作。它是interest集合的子集,并且表示了interest集合中從上次調(diào)用select()以后已經(jīng)就緒的那些操作。(比如選擇器對(duì)通道的read,write操作感興趣,而某時(shí)刻通道的read操作已經(jīng)準(zhǔn)備就緒可以被選擇器獲知了,前一種就是interest集合,后一種則是read集合。)。JAVA中定義以下幾個(gè)方法用來(lái)檢查這些操作是否就緒:
//int readSet=selectionKey.readOps(); selectionKey.isAcceptable();//等價(jià)于selectionKey.readyOps()&SelectionKey.OP_ACCEPT selectionKey.isConnectable(); selectionKey.isReadable(); selectionKey.isWritable();
需要注意的是,通過(guò)相關(guān)的選擇鍵的readyOps()方法返回的就緒狀態(tài)指示只是一個(gè)提示,底層的通道在任何時(shí)候都會(huì)不斷改變,而其他線程也可能在通道上執(zhí)行操作并影響到它的就緒狀態(tài)。另外,我們不能直接修改read集合。
取出SelectionKey所關(guān)聯(lián)的Selector和Channel
通過(guò)SelectionKey訪問(wèn)對(duì)應(yīng)的Selector和Channel:
Channel channel =selectionKey.channel(); Selector selector=selectionKey.selector();
關(guān)于取消SelectionKey對(duì)象的那點(diǎn)事
我們可以通過(guò)SelectionKey對(duì)象的cancel()方法來(lái)取消特定的注冊(cè)關(guān)系。該方法調(diào)用之后,該SelectionKey對(duì)象將會(huì)被”拷貝”至已取消鍵的集合中,該鍵此時(shí)已經(jīng)失效,但是該注冊(cè)關(guān)系并不會(huì)立刻終結(jié)。在下一次select()時(shí),已取消鍵的集合中的元素會(huì)被清除,相應(yīng)的注冊(cè)關(guān)系也真正終結(jié)。
(3)為SelectionKey綁定附加對(duì)象
可以將一個(gè)或者多個(gè)附加對(duì)象綁定到SelectionKey上,以便容易的識(shí)別給定的通道。通常有兩種方式:
1 在注冊(cè)的時(shí)候直接綁定:
SelectionKey key=channel.register(selector,SelectionKey.OP_READ,theObject);
2 在綁定完成之后附加:
selectionKey.attach(theObject);//綁定
綁定之后,可通過(guò)對(duì)應(yīng)的SelectionKey取出該對(duì)象:
selectionKey.attachment();
如果要取消該對(duì)象,則可以通過(guò)該種方式:
selectionKey.attach(null)
需要注意的是如果附加的對(duì)象不再使用,一定要人為清除,因?yàn)槔厥掌鞑粫?huì)回收該對(duì)象,若不清除的話會(huì)成內(nèi)存泄漏。
一個(gè)單獨(dú)的通道可被注冊(cè)到多個(gè)選擇器中,有些時(shí)候我們需要通過(guò)isRegistered()方法來(lái)檢查一個(gè)通道是否已經(jīng)被注冊(cè)到任何一個(gè)選擇器上。 通常來(lái)說(shuō),我們并不會(huì)這么做。
(4)通過(guò)Selector選擇通道
我們知道選擇器維護(hù)注冊(cè)過(guò)的通道的集合,并且這種注冊(cè)關(guān)系都被封裝在SelectionKey當(dāng)中。接下來(lái)我們簡(jiǎn)單的了解一下Selector維護(hù)的三種類型SelectionKey集合:
已注冊(cè)的鍵的集合(Registered key set)
所有與選擇器關(guān)聯(lián)的通道所生成的鍵的集合稱為已經(jīng)注冊(cè)的鍵的集合。并不是所有注冊(cè)過(guò)的鍵都仍然有效。這個(gè)集合通過(guò)keys()方法返回,并且可能是空的。這個(gè)已注冊(cè)的鍵的集合不是可以直接修改的;試圖這么做的話將引發(fā)java.lang.UnsupportedOperationException。
已選擇的鍵的集合(Selected key set)
已注冊(cè)的鍵的集合的子集。這個(gè)集合的每個(gè)成員都是相關(guān)的通道被選擇器(在前一個(gè)選擇操作中)判斷為已經(jīng)準(zhǔn)備好的,并且包含于鍵的interest集合中的操作。這個(gè)集合通過(guò)selectedKeys()方法返回(并有可能是空的)。
不要將已選擇的鍵的集合與ready集合弄混了。這是一個(gè)鍵的集合,每個(gè)鍵都關(guān)聯(lián)一個(gè)已經(jīng)準(zhǔn)備好至少一種操作的通道。每個(gè)鍵都有一個(gè)內(nèi)嵌的ready集合,指示了所關(guān)聯(lián)的通道已經(jīng)準(zhǔn)備好的操作。鍵可以直接從這個(gè)集合中移除,但不能添加。試圖向已選擇的鍵的集合中添加元素將拋出java.lang.UnsupportedOperationException。
已取消的鍵的集合(Cancelled key set)
已注冊(cè)的鍵的集合的子集,這個(gè)集合包含了cancel()方法被調(diào)用過(guò)的鍵(這個(gè)鍵已經(jīng)被無(wú)效化),但它們還沒(méi)有被注銷。這個(gè)集合是選擇器對(duì)象的私有成員,因而無(wú)法直接訪問(wèn)。
在剛初始化的Selector對(duì)象中,這三個(gè)集合都是空的。通過(guò)Selector的select()方法可以選擇已經(jīng)準(zhǔn)備就緒的通道(這些通道包含你感興趣的的事件)。比如你對(duì)讀就緒的通道感興趣,那么select()方法就會(huì)返回讀事件已經(jīng)就緒的那些通道。下面是Selector幾個(gè)重載的select()方法:
- select():阻塞到至少有一個(gè)通道在你注冊(cè)的事件上就緒了。
- select(long timeout):和select()一樣,但最長(zhǎng)阻塞事件為timeout毫秒。
- selectNow():非阻塞,只要有通道就緒就立刻返回。
select()方法返回的int值表示有多少通道已經(jīng)就緒,是自上次調(diào)用select()方法后有多少通道變成就緒狀態(tài)。之前在select()調(diào)用時(shí)進(jìn)入就緒的通道不會(huì)在本次調(diào)用中被記入,而在前一次select()調(diào)用進(jìn)入就緒但現(xiàn)在已經(jīng)不在處于就緒的通道也不會(huì)被記入。例如:首次調(diào)用select()方法,如果有一個(gè)通道變成就緒狀態(tài),返回了1,若再次調(diào)用select()方法,如果另一個(gè)通道就緒了,它會(huì)再次返回1。如果對(duì)第一個(gè)就緒的channel沒(méi)有做任何操作,現(xiàn)在就有兩個(gè)就緒的通道,但在每次select()方法調(diào)用之間,只有一個(gè)通道就緒了。
一旦調(diào)用select()方法,并且返回值不為0時(shí),則可以通過(guò)調(diào)用Selector的selectedKeys()方法來(lái)訪問(wèn)已選擇鍵集合。如下:
Set selectedKeys=selector.selectedKeys();
進(jìn)而可以放到和某SelectionKey關(guān)聯(lián)的Selector和Channel。如下所示:
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
}
關(guān)于Selector執(zhí)行選擇的過(guò)程
我們知道調(diào)用select()方法進(jìn)行通道,現(xiàn)在我們?cè)賮?lái)深入一下選擇的過(guò)程,也就是select()執(zhí)行過(guò)程。當(dāng)select()被調(diào)用時(shí)將執(zhí)行以下幾步:
- 首先檢查已取消鍵集合,也就是通過(guò)cancle()取消的鍵。如果該集合不為空,則清空該集合里的鍵,同時(shí)該集合中每個(gè)取消的鍵也將從已注冊(cè)鍵集合和已選擇鍵集合中移除。(一個(gè)鍵被取消時(shí),并不會(huì)立刻從集合中移除,而是將該鍵“拷貝”至已取消鍵集合中,這種取消策略就是我們常提到的“延遲取消”。)
- 再次檢查已注冊(cè)鍵集合(準(zhǔn)確說(shuō)是該集合中每個(gè)鍵的interest集合)。系統(tǒng)底層會(huì)依次詢問(wèn)每個(gè)已經(jīng)注冊(cè)的通道是否準(zhǔn)備好選擇器所感興趣的某種操作,一旦發(fā)現(xiàn)某個(gè)通道已經(jīng)就緒了,則會(huì)首先判斷該通道是否已經(jīng)存在在已選擇鍵集合當(dāng)中,如果已經(jīng)存在,則更新該通道在已注冊(cè)鍵集合中對(duì)應(yīng)的鍵的ready集合,如果不存在,則首先清空該通道的對(duì)應(yīng)的鍵的ready集合,然后重設(shè)ready集合,最后將該鍵存至已注冊(cè)鍵集合中。這里需要明白,當(dāng)更新ready集合時(shí),在上次select()中已經(jīng)就緒的操作不會(huì)被刪除,也就是ready集合中的元素是累積的,比如在第一次的selector對(duì)某個(gè)通道的read和write操作感興趣,在第一次執(zhí)行select()時(shí),該通道的read操作就緒,此時(shí)該通道對(duì)應(yīng)的鍵中的ready集合存有read元素,在第二次執(zhí)行select()時(shí),該通道的write操作也就緒了,此時(shí)該通道對(duì)應(yīng)的ready集合中將同時(shí)有read和write元素。
深入已注冊(cè)鍵集合的管理
到現(xiàn)在我們已經(jīng)知道一個(gè)通道的的鍵是如何被添加到已選擇鍵集合中的,下面我們來(lái)繼續(xù)了解對(duì)已選擇鍵集合的管理 。首先要記?。哼x擇器不會(huì)主動(dòng)刪除被添加到已選擇鍵集合中的鍵,而且被添加到已選擇鍵集合中的鍵的ready集合只能被設(shè)置,而不能被清理。如果我們希望清空已選擇鍵集合中某個(gè)鍵的ready集合該怎么辦?我們知道一個(gè)鍵在新加入已選擇鍵集合之前會(huì)首先置空該鍵的ready集合,這樣的話我們可以人為的將某個(gè)鍵從已注冊(cè)鍵集合中移除最終實(shí)現(xiàn)置空某個(gè)鍵的ready集合。被移除的鍵如果在下一次的select()中再次就緒,它將會(huì)重新被添加到已選擇的鍵的集合中。這就是為什么要在每次迭代的末尾調(diào)用keyIterator.remove()。
(5)停止選擇
選擇器執(zhí)行選擇的過(guò)程,系統(tǒng)底層會(huì)依次詢問(wèn)每個(gè)通道是否已經(jīng)就緒,這個(gè)過(guò)程可能會(huì)造成調(diào)用線程進(jìn)入阻塞狀態(tài),那么我們有以下三種方式可以喚醒在select()方法中阻塞的線程。
- 通過(guò)調(diào)用Selector對(duì)象的wakeup()方法讓處在阻塞狀態(tài)的select()方法立刻返回
該方法使得選擇器上的第一個(gè)還沒(méi)有返回的選擇操作立即返回。如果當(dāng)前沒(méi)有進(jìn)行中的選擇操作,那么下一次對(duì)select()方法的一次調(diào)用將立即返回。- 通過(guò)close()方法關(guān)閉Selector**
該方法使得任何一個(gè)在選擇操作中阻塞的線程都被喚醒(類似wakeup()),同時(shí)使得注冊(cè)到該Selector的所有Channel被注銷,所有的鍵將被取消,但是Channel本身并不會(huì)關(guān)閉。- 調(diào)用interrupt()
調(diào)用該方法會(huì)使睡眠的線程拋出InterruptException異常,捕獲該異常并在調(diào)用wakeup()
上面有些人看到“系統(tǒng)底層會(huì)依次詢問(wèn)每個(gè)通道”時(shí)可能在想如果已選擇鍵非常多是,會(huì)不會(huì)耗時(shí)較長(zhǎng)?答案是肯定的。但是我想說(shuō)的是通常你可以選擇忽略該過(guò)程,至于為什么,后面再說(shuō)。
三、NIO多人聊天室
服務(wù)端
public class ChatServer implements Runnable{
private Selector selector;
private SelectionKey serverKey;
private Vector<String> usernames;
private static final int PORT = 9999;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public ChatServer(){
usernames = new Vector<String>();
init();
}
public void init(){
try {
selector = Selector.open();
//創(chuàng)建serverSocketChannel
ServerSocketChannel serverChannel = ServerSocketChannel.open();
ServerSocket socket = serverChannel.socket();
socket.bind(new InetSocketAddress(PORT));
//加入到selector中
serverChannel.configureBlocking(false);
serverKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT);
printInfo("server starting.......");
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
try {
while(true){
//獲取就緒channel
int count = selector.select();
if(count > 0){
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
//若此key的通道是等待接受新的套接字連接
if(key.isAcceptable()){
System.out.println(key.toString() + " : 接收");
//一定要把這個(gè)accpet狀態(tài)的服務(wù)器key去掉,否則會(huì)出錯(cuò)
iterator.remove();
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
//接受socket
SocketChannel socket = serverChannel.accept();
socket.configureBlocking(false);
//將channel加入到selector中,并一開(kāi)始讀取數(shù)據(jù)
socket.register(selector, SelectionKey.OP_READ);
}
//若此key的通道是有數(shù)據(jù)可讀狀態(tài)
if(key.isValid() && key.isReadable()){
System.out.println(key.toString() + " : 讀");
readMsg(key);
}
//若此key的通道是寫數(shù)據(jù)狀態(tài)
if(key.isValid() && key.isWritable()){
System.out.println(key.toString() + " : 寫");
writeMsg(key);
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void readMsg(SelectionKey key) {
SocketChannel channel = null;
try {
channel = (SocketChannel) key.channel();
//設(shè)置buffer緩沖區(qū)
ByteBuffer buffer = ByteBuffer.allocate(1024);
//假如客戶端關(guān)閉了通道,這里在對(duì)該通道read數(shù)據(jù),會(huì)發(fā)生IOException,捕獲到Exception后,關(guān)閉掉該channel,取消掉該key
int count = channel.read(buffer);
StringBuffer buf = new StringBuffer();
//如果讀取到了數(shù)據(jù)
if(count > 0){
//讓buffer翻轉(zhuǎn),把buffer中的數(shù)據(jù)讀取出來(lái)
buffer.flip();
buf.append(new String(buffer.array(), 0, count));
}
String msg = buf.toString();
//如果此數(shù)據(jù)是客戶端連接時(shí)發(fā)送的數(shù)據(jù)
if(msg.indexOf("open_") != -1){
String name = msg.substring(5);//取出名字
printInfo(name + " --> online");
usernames.add(name);
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while(iter.hasNext()){
SelectionKey skey = iter.next();
//若不是服務(wù)器套接字通道的key,則將數(shù)據(jù)設(shè)置到此key中
//并更新此key感興趣的動(dòng)作
if(skey != serverKey){
skey.attach(usernames);
skey.interestOps(skey.interestOps() | SelectionKey.OP_WRITE);
}
}
//如果是下線時(shí)發(fā)送的數(shù)據(jù)
}else if(msg.indexOf("exit_") != -1){
String username = msg.substring(5);
usernames.remove(username);
key.attach("close");
//要退出的當(dāng)前channel加上close的標(biāo)示,并把興趣轉(zhuǎn)為寫,如果write中收到了close,則中斷channel的鏈接
key.interestOps(SelectionKey.OP_WRITE);
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while(iter.hasNext()){
SelectionKey sKey = iter.next();
sKey.attach(usernames);
sKey.interestOps(sKey.interestOps() | SelectionKey.OP_WRITE);
}
//如果是聊天發(fā)送數(shù)據(jù)
}else{
String uname = msg.substring(0, msg.indexOf("^"));
msg = msg.substring(msg.indexOf("^") + 1);
printInfo("("+uname+")說(shuō):" + msg);
String dateTime = sdf.format(new Date());
String smsg = uname + " " + dateTime + "\n " + msg + "\n";
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while(iter.hasNext()){
SelectionKey sKey = iter.next();
sKey.attach(smsg);
sKey.interestOps(sKey.interestOps() | SelectionKey.OP_WRITE);
}
}
buffer.clear();
} catch (IOException e) {
//當(dāng)客戶端關(guān)閉channel時(shí),服務(wù)端再往通道緩沖區(qū)中寫或讀數(shù)據(jù),都會(huì)報(bào)IOException,解決方法是:在服務(wù)端這里捕獲掉這個(gè)異常,并且關(guān)閉掉服務(wù)端這邊的Channel通道
key.cancel();
try {
channel.socket().close();
channel.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
private void writeMsg(SelectionKey key) {
try {
SocketChannel channel = (SocketChannel) key.channel();
Object attachment = key.attachment();
//獲取key的值之后,要把key的值置空,避免影響下一次的使用
key.attach("");
channel.write(ByteBuffer.wrap(attachment.toString().getBytes()));
key.interestOps(SelectionKey.OP_READ);
} catch (Exception e) {
e.printStackTrace();
}
}
private void printInfo(String str) {
System.out.println("[" + sdf.format(new Date()) + "] -> " + str);
}
public static void main(String[] args) {
ChatServer server = new ChatServer();
new Thread(server).start();
}
}
注意這里readMsg 和 writeMsg中,read操作的key重新設(shè)置interest要遍歷所有key,而write操作的key重新設(shè)置interest只需要設(shè)置傳入的當(dāng)前key,原因:
讀操作之所以要遍歷key,是因?yàn)檫@里channel的讀寫操作的流程是:
1. read到數(shù)據(jù)后,把數(shù)據(jù)加到每一個(gè)key的attach中
2. 寫數(shù)據(jù)時(shí),從key的attach中取出數(shù)據(jù),從而把該數(shù)據(jù)寫到buffer中
例如:當(dāng)選擇器有3個(gè)channel的情況下,實(shí)現(xiàn)多人聊天,流程:
1. 其中一個(gè)channel發(fā)送數(shù)據(jù),該channel接受到數(shù)據(jù)
2. 在該channel的讀操作中,遍歷所有的channel,為每一個(gè)channel的attach加上該數(shù)據(jù)
3. 每一個(gè)channel在寫操作時(shí),從key的attach中取出數(shù)據(jù),分別把該數(shù)據(jù)寫到各自的buffer中
4. 于是每一個(gè)channel的界面都能看到其中一個(gè)channel發(fā)送的數(shù)據(jù)
客戶端:
public class ChatClient {
private static final String HOST = "127.0.0.1";
private static int PORT = 9999;
private static SocketChannel socket;
private static ChatClient client;
private static byte[] lock = new byte[1];
//單例模式管理
private ChatClient() throws IOException{
socket = SocketChannel.open();
socket.connect(new InetSocketAddress(HOST, PORT));
socket.configureBlocking(false);
}
public static ChatClient getIntance(){
synchronized(lock){
if(client == null){
try {
client = new ChatClient();
} catch (IOException e) {
e.printStackTrace();
}
}
return client;
}
}
public void sendMsg(String msg){
try {
socket.write(ByteBuffer.wrap(msg.getBytes()));
} catch (IOException e) {
e.printStackTrace();
}
}
public String receiveMsg(){
String msg = null;
try {
ByteBuffer buffer = ByteBuffer.allocate(1024);
StringBuffer buf = new StringBuffer();
int count = 0;
//不一定一次就能讀滿,連續(xù)讀
while((count = socket.read(buffer)) > 0){
buf.append(new String(buffer.array(), 0, count));
}
//有數(shù)據(jù)
if(buf.length() > 0){
msg = buf.toString();
if(buf.toString().equals("close")){
//不過(guò)不sleep會(huì)導(dǎo)致ioException的發(fā)生,因?yàn)槿绻@里直接關(guān)閉掉通道,在server里,
//該channel在read(buffer)時(shí)會(huì)發(fā)生讀取異常,通過(guò)sleep一段時(shí)間,使得服務(wù)端那邊的channel先關(guān)閉,客戶端
//的channel后關(guān)閉,這樣就能防止read(buffer)的ioException
//但是這是一種笨方法
//Thread.sleep(100);
//更好的方法是,在readBuffer中捕獲異常后,手動(dòng)進(jìn)行關(guān)閉通道
socket.socket().close();
socket.close();
msg = null;
}
}
} catch (IOException e) {
e.printStackTrace();
}
return msg;
}
}
界面代碼:設(shè)置姓名
public class SetNameFrame extends JFrame {
private static final long serialVersionUID = 1L;
private static JTextField txtName;
private static JButton btnOK;
private static JLabel label;
public SetNameFrame() {
this.setLayout(null);
Toolkit kit = Toolkit.getDefaultToolkit();
int w = kit.getScreenSize().width;
int h = kit.getScreenSize().height;
this.setBounds(w / 2 - 230 / 2, h / 2 - 200 / 2, 230, 200);
this.setTitle("設(shè)置名稱");
this.setDefaultCloseOperation(EXIT_ON_CLOSE);
this.setResizable(false);
txtName = new JTextField(4);
this.add(txtName);
txtName.setBounds(10, 10, 100, 25);
btnOK = new JButton("OK");
this.add(btnOK);
btnOK.setBounds(120, 10, 80, 25);
label = new JLabel("[w:" + w + ",h:" + h + "]");
this.add(label);
label.setBounds(10, 40, 200, 100);
label.setText("<html>在上面的文本框中輸入名字<br/>顯示器寬度:" + w + "<br/>顯示器高度:" + h
+ "</html>");
btnOK.addActionListener(new ActionListener() {
@Override
public void actionPerformed(ActionEvent e) {
String uname = txtName.getText();
ChatClient service = ChatClient.getIntance();
ChatFrame chatFrame = new ChatFrame(service, uname);
chatFrame.show();
setVisible(false);
}
});
}
public static void main(String[] args) {
SetNameFrame setNameFrame = new SetNameFrame();
setNameFrame.setVisible(true);
}
}
界面代碼:聊天界面
public class ChatFrame {
private JTextArea readContext = new JTextArea(18, 30);// 顯示消息文本框
private JTextArea writeContext = new JTextArea(6, 30);// 發(fā)送消息文本框
private DefaultListModel modle = new DefaultListModel();// 用戶列表模型
private JList list = new JList(modle);// 用戶列表
private JButton btnSend = new JButton("發(fā)送");// 發(fā)送消息按鈕
private JButton btnClose = new JButton("關(guān)閉");// 關(guān)閉聊天窗口按鈕
private JFrame frame = new JFrame("ChatFrame");// 窗體界面
private String uname;// 用戶姓名
private ChatClient service;// 用于與服務(wù)器交互
private boolean isRun = false;// 是否運(yùn)行
public ChatFrame(ChatClient service, String uname) {
this.isRun = true;
this.uname = uname;
this.service = service;
}
// 初始化界面控件及事件
private void init() {
frame.setLayout(null);
frame.setTitle(uname + " 聊天窗口");
frame.setSize(500, 500);
frame.setLocation(400, 200);
frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
frame.setResizable(false);
JScrollPane readScroll = new JScrollPane(readContext);
readScroll.setVerticalScrollBarPolicy(JScrollPane.VERTICAL_SCROLLBAR_AS_NEEDED);
frame.add(readScroll);
JScrollPane writeScroll = new JScrollPane(writeContext);
writeScroll.setVerticalScrollBarPolicy(JScrollPane.VERTICAL_SCROLLBAR_AS_NEEDED);
frame.add(writeScroll);
frame.add(list);
frame.add(btnSend);
frame.add(btnClose);
readScroll.setBounds(10, 10, 320, 300);
readContext.setBounds(0, 0, 320, 300);
readContext.setEditable(false);
readContext.setLineWrap(true);// 自動(dòng)換行
writeScroll.setBounds(10, 315, 320, 100);
writeContext.setBounds(0, 0, 320, 100);
writeContext.setLineWrap(true);// 自動(dòng)換行
list.setBounds(340, 10, 140, 445);
btnSend.setBounds(150, 420, 80, 30);
btnClose.setBounds(250, 420, 80, 30);
frame.addWindowListener(new WindowAdapter() {
@Override
public void windowClosing(WindowEvent e) {
isRun = false;
service.sendMsg("exit_" + uname);
System.exit(0);
}
});
btnSend.addActionListener(new ActionListener() {
@Override
public void actionPerformed(ActionEvent e) {
String msg = writeContext.getText().trim();
if(msg.length() > 0){
service.sendMsg(uname + "^" + writeContext.getText());
}
writeContext.setText(null);
writeContext.requestFocus();
}
});
btnClose.addActionListener(new ActionListener() {
@Override
public void actionPerformed(ActionEvent e) {
isRun = false;
service.sendMsg("exit_" + uname);
System.exit(0);
}
});
list.addListSelectionListener(new ListSelectionListener() {
@Override
public void valueChanged(ListSelectionEvent e) {
// JOptionPane.showMessageDialog(null,
// list.getSelectedValue().toString());
}
});
writeContext.addKeyListener(new KeyListener() {
@Override
public void keyTyped(KeyEvent e) {
// TODO Auto-generated method stub
}
@Override
public void keyReleased(KeyEvent e) {
if(e.getKeyCode() == KeyEvent.VK_ENTER){
String msg = writeContext.getText().trim();
if(msg.length() > 0){
service.sendMsg(uname + "^" + writeContext.getText());
}
writeContext.setText(null);
writeContext.requestFocus();
}
}
@Override
public void keyPressed(KeyEvent e) {
// TODO Auto-generated method stub
}
});
}
// 此線程類用于輪詢讀取服務(wù)器發(fā)送的消息
private class MsgThread extends Thread {
@Override
public void run() {
while (isRun) {
String msg = service.receiveMsg();
if (msg != null) {
//如果存在[],這是verctor裝的usernames的toString生成的
if (msg.indexOf("[") != -1 && msg.lastIndexOf("]") != -1) {
msg = msg.substring(1, msg.length() - 1);
String[] userNames = msg.split(",");
modle.removeAllElements();
for (int i = 0; i < userNames.length; i++) {
modle.addElement(userNames[i].trim());
}
} else {//如果是普通的消息
String str = readContext.getText() + msg;
readContext.setText(str);
readContext.selectAll();
}
}
}
}
}
// 顯示界面
public void show() {
this.init();
service.sendMsg("open_" + uname);
MsgThread msgThread = new MsgThread();
msgThread.start();
this.frame.setVisible(true);
}
}
分析整個(gè)程序的流程:
只有一個(gè)客戶端連接的注釋:
[2017-01-23 21:26:14] -> server starting…….
sun.nio.ch.SelectionKeyImpl@99436c6 : 接收
sun.nio.ch.SelectionKeyImpl@3ee5015 : 讀
[2017-01-23 21:26:19] -> a –> online
sun.nio.ch.SelectionKeyImpl@3ee5015 : 寫
可以看出流程是:服務(wù)端接受通道 -> 通道進(jìn)行讀操作 -> 通道進(jìn)行寫操作
1. 當(dāng)客戶端的channel調(diào)用connect后,服務(wù)端接受到該Channel,于是把該通道的興趣改為read就緒
2. 客戶端connect后,立馬寫數(shù)據(jù)”open_”到通道緩沖區(qū)中,于是該通道進(jìn)入了有數(shù)據(jù)可讀狀態(tài)(即讀狀態(tài)),且該通道的興趣為read,所以select()的返回值為1,進(jìn)入了readMsg();
3. readMsg中把每一個(gè)key的狀態(tài)改為了寫狀態(tài),而此時(shí)客戶端一直在read數(shù)據(jù),要求你服務(wù)端要給我數(shù)據(jù),于是服務(wù)器的channel此時(shí)是寫狀態(tài),且該通道的興趣為write,所以select()的返回值為1,進(jìn)入了writeMsg();
有兩個(gè)個(gè)客戶端連接的注釋:
sun.nio.ch.SelectionKeyImpl@99436c6 : 接收
sun.nio.ch.SelectionKeyImpl@3ee5015 : 讀
[2017-01-23 21:26:19] -> a –> online
sun.nio.ch.SelectionKeyImpl@3ee5015 : 寫
sun.nio.ch.SelectionKeyImpl@99436c6 : 接收
sun.nio.ch.SelectionKeyImpl@3ee5015 : 寫
sun.nio.ch.SelectionKeyImpl@12cb94b7 : 讀
[2017-01-23 21:32:30] -> b –> online
sun.nio.ch.SelectionKeyImpl@3ee5015 : 寫
sun.nio.ch.SelectionKeyImpl@12cb94b7 : 寫
sun.nio.ch.SelectionKeyImpl@3ee5015 : 寫
可以看到,@99436c6是ServerSocketChannel,@3ee5015是第一個(gè)鏈接的Channel,@12cb94b7是第二個(gè)連接的Channel,可以看見(jiàn),第二個(gè)Channel連接之后
sun.nio.ch.SelectionKeyImpl@3ee5015 : 寫
sun.nio.ch.SelectionKeyImpl@12cb94b7 : 讀
[2017-01-23 21:32:30] -> b –> online
sun.nio.ch.SelectionKeyImpl@3ee5015 : 寫
sun.nio.ch.SelectionKeyImpl@12cb94b7 : 寫
sun.nio.ch.SelectionKeyImpl@3ee5015 : 寫
兩個(gè)Channel是交替運(yùn)行的,說(shuō)明Selector處理Channle,是輪詢處理的
更多java相關(guān)內(nèi)容感興趣的讀者可查看本站專題:《Java面向?qū)ο蟪绦蛟O(shè)計(jì)入門與進(jìn)階教程》、《Java數(shù)據(jù)結(jié)構(gòu)與算法教程》、《Java操作DOM節(jié)點(diǎn)技巧總結(jié)》、《Java文件與目錄操作技巧匯總》和《Java緩存操作技巧匯總》
希望本文所述對(duì)大家java程序設(shè)計(jì)有所幫助。
- java socket實(shí)現(xiàn)聊天室 java實(shí)現(xiàn)多人聊天功能
- 基于java編寫局域網(wǎng)多人聊天室
- java編程實(shí)現(xiàn)多人聊天室功能
- Java基于Tcp/ip連接的多人交互聊天室
- java使用MulticastSocket實(shí)現(xiàn)基于廣播的多人聊天室
- Java SE實(shí)現(xiàn)多人聊天室功能
- java實(shí)現(xiàn)多人聊天工具(socket+多線程)
- Java多線程實(shí)現(xiàn)多人聊天室功能
- Java基于中介者模式實(shí)現(xiàn)多人聊天室功能示例
- java實(shí)現(xiàn)多人聊天系統(tǒng)
相關(guān)文章
Android中幾種圖片特效的處理的實(shí)現(xiàn)方法
這篇文章主要介紹了 Android中幾種圖片特效的處理的實(shí)現(xiàn)方法的相關(guān)資料,這里有放大縮小圖片,獲得圓角圖片,獲得帶倒影圖片的幾種方法,需要的朋友可以參考下2017-08-08
Java配置HTTP/Socks代理的簡(jiǎn)單快速上手方法
這篇文章主要為大家介紹了Java配置HTTP/Socks代理的簡(jiǎn)單快速上手方法詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-08-08
BeanUtils.copyProperties在拷貝屬性時(shí)忽略空值的操作
這篇文章主要介紹了BeanUtils.copyProperties在拷貝屬性時(shí)忽略空值的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06
老生常談JVM的內(nèi)存溢出說(shuō)明及參數(shù)調(diào)整
下面小編就為大家?guī)?lái)一篇老生常談JVM的內(nèi)存溢出說(shuō)明及參數(shù)調(diào)整。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-03-03
SpringBoot單元測(cè)試框架Mockito介紹及使用
與集成測(cè)試將系統(tǒng)作為一個(gè)整體測(cè)試不同,單元測(cè)試更應(yīng)該專注于某個(gè)類。所以當(dāng)被測(cè)試類與外部類有依賴的時(shí)候,尤其是與數(shù)據(jù)庫(kù)相關(guān)的這種費(fèi)時(shí)且有狀態(tài)的類,很難做單元測(cè)試。但好在可以通過(guò)“Mockito”這種仿真框架來(lái)模擬這些比較費(fèi)時(shí)的類,從而專注于測(cè)試某個(gè)類內(nèi)部的邏輯2023-01-01

