Java進階之高并發(fā)核心Selector詳解
一、Selector設計
筆者下載得是openjdk8的源碼, 畫出類圖

比較清晰得看到,openjdk中Selector的實現(xiàn)是SelectorImpl,然后SelectorImpl又將職責委托給了具體的平臺,比如圖中框出的
- linux2.6以后才有的
EpollSelectorImpl - Windows平臺是
WindowsSelectorImpl - MacOSX平臺是
KQueueSelectorImpl
從名字也可以猜到,openjdk肯定在底層還是用epoll,kqueue,iocp這些技術來實現(xiàn)的I/O多路復用。
二、獲取Selector
眾所周知,Selector.open()可以得到一個Selector實例,怎么實現(xiàn)的呢?
// Selector.java
public static Selector open() throws IOException {
// 首先找到provider,然后再打開Selector
return SelectorProvider.provider().openSelector();
}
// java.nio.channels.spi.SelectorProvider
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
// 這里就是打開Selector的真正方法
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
在openjdk中,每個操作系統(tǒng)都有一個sun.nio.ch.DefaultSelectorProvider實現(xiàn),以solaris為例:
/**
* Returns the default SelectorProvider.
*/
public static SelectorProvider create() {
// 獲取OS名稱
String osname = AccessController
.doPrivileged(new GetPropertyAction("os.name"));
// 根據(jù)名稱來創(chuàng)建不同的Selctor
if (osname.equals("SunOS"))
return createProvider("sun.nio.ch.DevPollSelectorProvider");
if (osname.equals("Linux"))
return createProvider("sun.nio.ch.EPollSelectorProvider");
return new sun.nio.ch.PollSelectorProvider();
}
如果系統(tǒng)名稱是Linux的話,真正創(chuàng)建的是sun.nio.ch.EPollSelectorProvider。如果不是SunOS也不是Linux,就使用sun.nio.ch.PollSelectorProvider, 關于PollSelector有興趣的讀者自行了解下, 本文僅以實際常用的EpollSelector為例探討。
打開sun.nio.ch.EPollSelectorProvider查看openSelector方法
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}
很直觀,這樣我們在Linux平臺就得到了最終的Selector實現(xiàn):sun.nio.ch.EPollSelectorImpl
三、EPollSelector如何進行select
epoll系統(tǒng)調(diào)用主要分為3個函數(shù)
epoll_create: 創(chuàng)建一個epollfd,并開辟epoll自己的內(nèi)核高速cache區(qū),建立紅黑樹,分配好想要的size的內(nèi)存對象,建立一個list鏈表,用于存儲準備就緒的事件。epoll_wait: 等待內(nèi)核返回IO事件epoll_ctl: 對新舊事件進行新增修改或者刪除
3.1 Epoll fd的創(chuàng)建
EPollSelectorImpl的構造器代碼如下:
EPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
// makePipe返回管道的2個文件描述符,編碼在一個long類型的變量中
// 高32位代表讀 低32位代表寫
// 使用pipe為了實現(xiàn)Selector的wakeup邏輯
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;
// 新建一個EPollArrayWrapper
pollWrapper = new EPollArrayWrapper();
pollWrapper.initInterrupt(fd0, fd1);
fdToKey = new HashMap<>();
}
再看EPollArrayWrapper的初始化過程
EPollArrayWrapper() throws IOException {
// creates the epoll file descriptor
// 創(chuàng)建epoll fd
epfd = epollCreate();
// the epoll_event array passed to epoll_wait
int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
// eventHigh needed when using file descriptors > 64k
if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
eventsHigh = new HashMap<>();
}
private native int epollCreate();
在初始化過程中調(diào)用了epollCreate方法,這是個native方法。
打開
jdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c
EPollArrayWrapper() throws IOException {
// creates the epoll file descriptor
// 創(chuàng)建epoll fd
epfd = epollCreate();
// the epoll_event array passed to epoll_wait
int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
// eventHigh needed when using file descriptors > 64k
if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
eventsHigh = new HashMap<>();
}
private native int epollCreate();
可以看到最后還是使用了操作系統(tǒng)的api: epoll_create函數(shù)
3.2 Epoll wait等待內(nèi)核IO事件
調(diào)用Selector.select(),最后會委托給各個實現(xiàn)的doSelect方法,限于篇幅不貼出太詳細的,這里看下EpollSelectorImpl的doSelect方法
protected int doSelect(long timeout) throws IOException {
if (closed)
throw new ClosedSelectorException();
processDeregisterQueue();
try {
begin();
// 真正的實現(xiàn)是這行
pollWrapper.poll(timeout);
} finally {
end();
}
processDeregisterQueue();
int numKeysUpdated = updateSelectedKeys();
// 以下基本都是異常處理
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
然后我們?nèi)タ?code>pollWrapper.poll, 打開jdk/src/solaris/classes/sun/nio/ch/EPollArrayWrapper.java:
int poll(long timeout) throws IOException {
updateRegistrations();
// 這個epollWait是不是有點熟悉呢?
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
}
private native int epollWait(long pollAddress, int numfds, long timeout,
int epfd) throws IOException;
epollWait也是個native方法,打開c代碼一看:
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,
jlong address, jint numfds,
jlong timeout, jint epfd)
{
struct epoll_event *events = jlong_to_ptr(address);
int res;
if (timeout <= 0) { /* Indefinite or no wait */
// 發(fā)起epoll_wait系統(tǒng)調(diào)用等待內(nèi)核事件
RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);
} else { /* Bounded wait; bounded restarts */
res = iepoll(epfd, events, numfds, timeout);
}
if (res < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
}
return res;
}
=
可以看到,最后還是發(fā)起的epoll_wait系統(tǒng)調(diào)用.
3.3 epoll control以及openjdk對事件管理的封裝
JDK中對于注冊到Selector上的IO事件關系是使用SelectionKey來表示,代表了Channel感興趣的事件,如Read,Write,Connect,Accept.
調(diào)用Selector.register()時均會將事件存儲到EpollArrayWrapper的成員變量eventsLow和eventsHigh中
// events for file descriptors with registration changes pending, indexed
// by file descriptor and stored as bytes for efficiency reasons. For
// file descriptors higher than MAX_UPDATE_ARRAY_SIZE (unlimited case at
// least) then the update is stored in a map.
// 使用數(shù)組保存事件變更, 數(shù)組的最大長度是MAX_UPDATE_ARRAY_SIZE, 最大64*1024
private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE];
// 超過數(shù)組長度的事件會緩存到這個map中,等待下次處理
private Map<Integer,Byte> eventsHigh;
/**
* Sets the pending update events for the given file descriptor. This
* method has no effect if the update events is already set to KILLED,
* unless {@code force} is {@code true}.
*/
private void setUpdateEvents(int fd, byte events, boolean force) {
// 判斷fd和數(shù)組長度
if (fd < MAX_UPDATE_ARRAY_SIZE) {
if ((eventsLow[fd] != KILLED) || force) {
eventsLow[fd] = events;
}
} else {
Integer key = Integer.valueOf(fd);
if (!isEventsHighKilled(key) || force) {
eventsHigh.put(key, Byte.valueOf(events));
}
}
}
上面看到EpollArrayWrapper.poll()的時候, 首先會調(diào)用updateRegistrations
/**
* Returns the pending update events for the given file descriptor.
*/
private byte getUpdateEvents(int fd) {
if (fd < MAX_UPDATE_ARRAY_SIZE) {
return eventsLow[fd];
} else {
Byte result = eventsHigh.get(Integer.valueOf(fd));
// result should never be null
return result.byteValue();
}
}
/**
* Update the pending registrations.
*/
private void updateRegistrations() {
synchronized (updateLock) {
int j = 0;
while (j < updateCount) {
int fd = updateDescriptors[j];
// 從保存的eventsLow和eventsHigh里取出事件
short events = getUpdateEvents(fd);
boolean isRegistered = registered.get(fd);
int opcode = 0;
if (events != KILLED) {
// 判斷操作類型以傳給epoll_ctl
// 沒有指定EPOLLET事件類型
if (isRegistered) {
opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
} else {
opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
}
if (opcode != 0) {
// 熟悉的epoll_ctl
epollCtl(epfd, opcode, fd, events);
if (opcode == EPOLL_CTL_ADD) {
registered.set(fd);
} else if (opcode == EPOLL_CTL_DEL) {
registered.clear(fd);
}
}
}
j++;
}
updateCount = 0;
}
}
private native void epollCtl(int epfd, int opcode, int fd, int events);
在獲取到事件之后將操作委托給了epollCtl,這又是個native方法,打開相應的c代碼一看:
JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,
jint opcode, jint fd, jint events)
{
struct epoll_event event;
int res;
event.events = events;
event.data.fd = fd;
// 發(fā)起epoll_ctl調(diào)用來進行IO事件的管理
RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);
/*
* A channel may be registered with several Selectors. When each Selector
* is polled a EPOLL_CTL_DEL op will be inserted into its pending update
* list to remove the file descriptor from epoll. The "last" Selector will
* close the file descriptor which automatically unregisters it from each
* epoll descriptor. To avoid costly synchronization between Selectors we
* allow pending updates to be processed, ignoring errors. The errors are
* harmless as the last update for the file descriptor is guaranteed to
* be EPOLL_CTL_DEL.
*/
if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");
}
}
原來還是我們的老朋友epoll_ctl.
有個小細節(jié)是jdk沒有指定ET(邊緣觸發(fā))還是LT(水平觸發(fā)),所以默認會用LT:)
在AbstractSelectorImpl中有3個set保存事件
// Public views of the key sets // 注冊的所有事件 private Set<SelectionKey> publicKeys; // Immutable // 內(nèi)核返回的IO事件封裝,表示哪些fd有數(shù)據(jù)可讀可寫 private Set<SelectionKey> publicSelectedKeys; // Removal allowed, but not addition // 取消的事件 private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>();
在EpollArrayWrapper.poll調(diào)用完成之后, 會調(diào)用updateSelectedKeys來更新上面的仨set
private int updateSelectedKeys() {
int entries = pollWrapper.updated;
int numKeysUpdated = 0;
for (int i=0; i<entries; i++) {
int nextFD = pollWrapper.getDescriptor(i);
SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
// ski is null in the case of an interrupt
if (ski != null) {
int rOps = pollWrapper.getEventOps(i);
if (selectedKeys.contains(ski)) {
if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
numKeysUpdated++;
}
} else {
ski.channel.translateAndSetReadyOps(rOps, ski);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
selectedKeys.add(ski);
numKeysUpdated++;
}
}
}
}
return numKeysUpdated;
代碼很直白,拿出事件對set比對操作。
四、Selector類的相關方法

重點注意四個方法
- select():
這是一個阻塞方法,調(diào)用該方法,會阻塞,直到返回一個有事件發(fā)生的selectionKey集合 - selectNow() :非阻塞方法,
獲取不到有事件發(fā)生的selectionKey集合,也會立即返回 - select(long):阻塞方法,
如果沒有獲取到有事件發(fā)生的selectionKey集合,阻塞指定的long時間 - selectedKeys(): 返回全部selectionKey集合,不管是否有事件發(fā)生

可以理解:selector一直在監(jiān)聽select()
五、Selector、SelectionKey、ServerScoketChannel、ScoketChannel的關系
Server代碼:
public class NIOServer {
public static void main(String[] args) throws Exception{
//創(chuàng)建ServerSocketChannel -> ServerSocket
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//得到一個Selecor對象
Selector selector = Selector.open();
//綁定一個端口6666, 在服務器端監(jiān)聽
serverSocketChannel.socket().bind(new InetSocketAddress(6666));
//設置為非阻塞
serverSocketChannel.configureBlocking(false);
//把 serverSocketChannel 注冊到 selector 關心 事件為 OP_ACCEPT
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("注冊后的selectionkey 數(shù)量=" + selector.keys().size()); // 1
//循環(huán)等待客戶端連接
while (true) {
//這里我們等待1秒,如果沒有事件發(fā)生, 返回
if(selector.select(1000) == 0) { //沒有事件發(fā)生
System.out.println("服務器等待了1秒,無連接");
continue;
}
//如果返回的>0, 就獲取到相關的 selectionKey集合
//1.如果返回的>0, 表示已經(jīng)獲取到關注的事件
//2. selector.selectedKeys() 返回關注事件的集合
// 通過 selectionKeys 反向獲取通道
Set<SelectionKey> selectionKeys = selector.selectedKeys();
System.out.println("selectionKeys 數(shù)量 = " + selectionKeys.size());
//遍歷 Set<SelectionKey>, 使用迭代器遍歷
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
//獲取到SelectionKey
SelectionKey key = keyIterator.next();
//根據(jù)key 對應的通道發(fā)生的事件做相應處理
if(key.isAcceptable()) { //如果是 OP_ACCEPT, 有新的客戶端連接
//該該客戶端生成一個 SocketChannel
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("客戶端連接成功 生成了一個 socketChannel " + socketChannel.hashCode());
//將 SocketChannel 設置為非阻塞
socketChannel.configureBlocking(false);
//將socketChannel 注冊到selector, 關注事件為 OP_READ, 同時給socketChannel
//關聯(lián)一個Buffer
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
System.out.println("客戶端連接后 ,注冊的selectionkey 數(shù)量=" + selector.keys().size()); //2,3,4..
}
if(key.isReadable()) { //發(fā)生 OP_READ
//通過key 反向獲取到對應channel
SocketChannel channel = (SocketChannel)key.channel();
//獲取到該channel關聯(lián)的buffer
ByteBuffer buffer = (ByteBuffer)key.attachment();
channel.read(buffer);
System.out.println("form 客戶端 " + new String(buffer.array()));
}
//手動從集合中移動當前的selectionKey, 防止重復操作
keyIterator.remove();
}
}
}
}
Client代碼
public class NIOClient {
public static void main(String[] args) throws Exception{
//得到一個網(wǎng)絡通道
SocketChannel socketChannel = SocketChannel.open();
//設置非阻塞
socketChannel.configureBlocking(false);
//提供服務器端的ip 和 端口
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
//連接服務器
if (!socketChannel.connect(inetSocketAddress)) {
while (!socketChannel.finishConnect()) {
System.out.println("因為連接需要時間,客戶端不會阻塞,可以做其它工作..");
}
}
//...如果連接成功,就發(fā)送數(shù)據(jù)
String str = "hello, 尚硅谷~";
//Wraps a byte array into a buffer
ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
//發(fā)送數(shù)據(jù),將 buffer 數(shù)據(jù)寫入 channel
socketChannel.write(buffer);
System.in.read();
}
}
六、總結

jdk中Selector是對操作系統(tǒng)的IO多路復用調(diào)用的一個封裝,在Linux中就是對epoll的封裝。epoll實質(zhì)上是將event loop交給了內(nèi)核,因為網(wǎng)絡數(shù)據(jù)都是首先到內(nèi)核的,直接內(nèi)核處理可以避免無謂的系統(tǒng)調(diào)用和數(shù)據(jù)拷貝, 性能是最好的。jdk中對IO事件的封裝是SelectionKey, 保存Channel關心的事件。
到此這篇關于Java進階之高并發(fā)核心Selector詳解的文章就介紹到這了,更多相關高并發(fā)核心Selector內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
spring boot里增加表單驗證hibernate-validator并在freemarker模板里顯示錯誤信息(推
這篇文章主要介紹了spring boot里增加表單驗證hibernate-validator并在freemarker模板里顯示錯誤信息的相關資料,需要的朋友可以參考下2018-01-01
聊聊Spring?Cloud?Gateway過濾器精確控制異常返回問題
這篇文章主要介紹了Spring?Cloud?Gateway過濾器精確控制異常返回問題,本篇任務就是分析上述現(xiàn)象的原因,通過閱讀源碼搞清楚返回碼和響應body生成的具體邏輯,需要的朋友可以參考下2021-11-11

