📄 socketioprocessor.java
字号:
// process idle sessions long currentTime = System.currentTimeMillis(); if ((currentTime - lastIdleCheckTime) >= 1000) { lastIdleCheckTime = currentTime; Set<SelectionKey> keys = selector.keys(); if (keys != null) { for (SelectionKey key : keys) { SocketSessionImpl session = (SocketSessionImpl) key .attachment(); notifyIdleness(session, currentTime); } } } } private void notifyIdleness(SocketSessionImpl session, long currentTime) { notifyIdleness0(session, currentTime, session .getIdleTimeInMillis(IdleStatus.BOTH_IDLE), IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session .getLastIdleTime(IdleStatus.BOTH_IDLE))); notifyIdleness0(session, currentTime, session .getIdleTimeInMillis(IdleStatus.READER_IDLE), IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE))); notifyIdleness0(session, currentTime, session .getIdleTimeInMillis(IdleStatus.WRITER_IDLE), IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE))); notifyWriteTimeout(session, currentTime, session .getWriteTimeoutInMillis(), session.getLastWriteTime()); } private void notifyIdleness0(SocketSessionImpl session, long currentTime, long idleTime, IdleStatus status, long lastIoTime) { if (idleTime > 0 && lastIoTime != 0 && (currentTime - lastIoTime) >= idleTime) { session.increaseIdleCount(status); session.getFilterChain().fireSessionIdle(session, status); } } private void notifyWriteTimeout(SocketSessionImpl session, long currentTime, long writeTimeout, long lastIoTime) { SelectionKey key = session.getSelectionKey(); if (writeTimeout > 0 && (currentTime - lastIoTime) >= writeTimeout && key != null && key.isValid() && (key.interestOps() & SelectionKey.OP_WRITE) != 0) { session.getFilterChain().fireExceptionCaught(session, new WriteTimeoutException()); } } private void doFlush() { for (;;) { SocketSessionImpl session = flushingSessions.poll(); if (session == null) break; if (!session.isConnected()) { releaseWriteBuffers(session); continue; } SelectionKey key = session.getSelectionKey(); // Retry later if session is not yet fully initialized. // (In case that Session.write() is called before addSession() is processed) if (key == null) { scheduleFlush(session); break; } // Skip if the channel is already closed. if (!key.isValid()) { continue; } try { doFlush(session); } catch (IOException e) { scheduleRemove(session); session.getFilterChain().fireExceptionCaught(session, e); } } } private void releaseWriteBuffers(SocketSessionImpl session) { Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue(); WriteRequest req; while ((req = writeRequestQueue.poll()) != null) { try { ((ByteBuffer) req.getMessage()).release(); } catch (IllegalStateException e) { session.getFilterChain().fireExceptionCaught(session, e); } finally { req.getFuture().setWritten(false); } } } private void doFlush(SocketSessionImpl session) throws IOException { // Clear OP_WRITE SelectionKey key = session.getSelectionKey(); key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); SocketChannel ch = session.getChannel(); Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue(); for (;;) { WriteRequest req = writeRequestQueue.peek(); if (req == null) break; ByteBuffer buf = (ByteBuffer) req.getMessage(); if (buf.remaining() == 0) { writeRequestQueue.poll(); session.increaseWrittenMessages(); buf.reset(); session.getFilterChain().fireMessageSent(session, req); continue; } if (key.isWritable()) { int writtenBytes = ch.write(buf.buf()); if (writtenBytes > 0) { session.increaseWrittenBytes(writtenBytes); } } if (buf.hasRemaining()) { // Kernel buffer is full key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); break; } } } private void doUpdateTrafficMask() { if (trafficControllingSessions.isEmpty()) return; for (;;) { SocketSessionImpl session = trafficControllingSessions.poll(); if (session == null) break; SelectionKey key = session.getSelectionKey(); // Retry later if session is not yet fully initialized. // (In case that Session.suspend??() or session.resume??() is // called before addSession() is processed) if (key == null) { scheduleTrafficControl(session); break; } // skip if channel is already closed if (!key.isValid()) { continue; } // The normal is OP_READ and, if there are write requests in the // session's write queue, set OP_WRITE to trigger flushing. int ops = SelectionKey.OP_READ; Queue<WriteRequest> writeRequestQueue = session .getWriteRequestQueue(); synchronized (writeRequestQueue) { if (!writeRequestQueue.isEmpty()) { ops |= SelectionKey.OP_WRITE; } } // Now mask the preferred ops with the mask of the current session int mask = session.getTrafficMask().getInterestOps(); key.interestOps(ops & mask); } } private class Worker implements Runnable { public void run() { Thread.currentThread().setName(SocketIoProcessor.this.threadName); for (;;) { try { int nKeys = selector.select(1000); doAddNew(); doUpdateTrafficMask(); if (nKeys > 0) { process(selector.selectedKeys()); } doFlush(); doRemove(); notifyIdleness(); if (selector.keys().isEmpty()) { synchronized (lock) { if (selector.keys().isEmpty() && newSessions.isEmpty()) { worker = null; try { selector.close(); } catch (IOException e) { ExceptionMonitor.getInstance() .exceptionCaught(e); } finally { selector = null; } break; } } } } catch (Throwable t) { ExceptionMonitor.getInstance().exceptionCaught(t); try { Thread.sleep(1000); } catch (InterruptedException e1) { ExceptionMonitor.getInstance().exceptionCaught(e1); } } } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -