📄 abstractpollingioprocessor.java
字号:
} private void flush(long currentTime) { final T firstSession = flushingSessions.peek(); if (firstSession == null) { return; } T session = flushingSessions.poll(); // the same one with firstSession for (; ;) { session.setScheduledForFlush(false); SessionState state = state(session); switch (state) { case OPEN: try { boolean flushedAll = flushNow(session, currentTime); if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) && !session.isScheduledForFlush()) { scheduleFlush(session); } } catch (Exception e) { scheduleRemove(session); IoFilterChain filterChain = session.getFilterChain(); filterChain.fireExceptionCaught(e); } break; case CLOSED: // Skip if the channel is already closed. break; case PREPARING: // Retry later if session is not yet fully initialized. // (In case that Session.write() is called before addSession() is processed) scheduleFlush(session); return; default: throw new IllegalStateException(String.valueOf(state)); } session = flushingSessions.peek(); if (session == null || session == firstSession) { break; } session = flushingSessions.poll(); } } private boolean flushNow(T session, long currentTime) { if (!session.isConnected()) { scheduleRemove(session); return false; } final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation(); final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue(); // Set limitation for the number of written bytes for read-write // fairness. I used maxReadBufferSize * 3 / 2, which yields best // performance in my experience while not breaking fairness much. final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize() + (session.getConfig().getMaxReadBufferSize() >>> 1); int writtenBytes = 0; try { // Clear OP_WRITE setInterestedInWrite(session, false); do { // Check for pending writes. WriteRequest req = session.getCurrentWriteRequest(); if (req == null) { req = writeRequestQueue.poll(session); if (req == null) { break; } session.setCurrentWriteRequest(req); } int localWrittenBytes = 0; Object message = req.getMessage(); if (message instanceof IoBuffer) { localWrittenBytes = writeBuffer( session, req, hasFragmentation, maxWrittenBytes - writtenBytes, currentTime); if (localWrittenBytes > 0 && ((IoBuffer)message).hasRemaining() ) { // the buffer isn't empty, we re-interest it in writing writtenBytes += localWrittenBytes; setInterestedInWrite(session, true); return false; } } else if (message instanceof FileRegion) { localWrittenBytes = writeFile( session, req, hasFragmentation, maxWrittenBytes - writtenBytes, currentTime); // Fix for Java bug on Linux http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988 // If there's still data to be written in the FileRegion, return 0 indicating that we need // to pause until writing may resume. if (localWrittenBytes > 0 && ((FileRegion) message).getRemainingBytes() > 0) { writtenBytes += localWrittenBytes; setInterestedInWrite(session, true); return false; } } else { throw new IllegalStateException("Don't know how to handle message of type '" + message.getClass().getName() + "'. Are you missing a protocol encoder?"); } if (localWrittenBytes == 0) { // Kernel buffer is full. setInterestedInWrite(session, true); return false; } writtenBytes += localWrittenBytes; if (writtenBytes >= maxWrittenBytes) { // Wrote too much scheduleFlush(session); return false; } } while (writtenBytes < maxWrittenBytes); } catch (Exception e) { IoFilterChain filterChain = session.getFilterChain(); filterChain.fireExceptionCaught(e); return false; } return true; } private int writeBuffer(T session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime) throws Exception { IoBuffer buf = (IoBuffer) req.getMessage(); int localWrittenBytes = 0; if (buf.hasRemaining()) { int length; if (hasFragmentation) { length = Math.min(buf.remaining(), maxLength); } else { length = buf.remaining(); } for (int i = WRITE_SPIN_COUNT; i > 0; i --) { localWrittenBytes = write(session, buf, length); if (localWrittenBytes != 0) { break; } } } session.increaseWrittenBytes(localWrittenBytes, currentTime); if (!buf.hasRemaining() || !hasFragmentation && localWrittenBytes != 0) { // Buffer has been sent, clear the current request. buf.reset(); fireMessageSent(session, req); } return localWrittenBytes; } private int writeFile(T session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime) throws Exception { int localWrittenBytes; FileRegion region = (FileRegion) req.getMessage(); if (region.getRemainingBytes() > 0) { int length; if (hasFragmentation) { length = (int) Math.min(region.getRemainingBytes(), maxLength); } else { length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes()); } localWrittenBytes = transferFile(session, region, length); region.update(localWrittenBytes); } else { localWrittenBytes = 0; } session.increaseWrittenBytes(localWrittenBytes, currentTime); if (region.getRemainingBytes() <= 0 || !hasFragmentation && localWrittenBytes != 0) { fireMessageSent(session, req); } return localWrittenBytes; } private void fireMessageSent(T session, WriteRequest req) { session.setCurrentWriteRequest(null); IoFilterChain filterChain = session.getFilterChain(); filterChain.fireMessageSent(req); } private void updateTrafficMask() { for (; ;) { T session = trafficControllingSessions.poll(); if (session == null) { break; } SessionState state = state(session); switch (state) { case OPEN: updateTrafficControl(session); break; case CLOSED: break; case PREPARING: // Retry later if session is not yet fully initialized. // (In case that Session.suspend??() or session.resume??() is // called before addSession() is processed) scheduleTrafficControl(session); return; default: throw new IllegalStateException(String.valueOf(state)); } } } public void updateTrafficControl(T session) { try { setInterestedInRead(session, !session.isReadSuspended()); } catch (Exception e) { IoFilterChain filterChain = session.getFilterChain(); filterChain.fireExceptionCaught(e); } try { setInterestedInWrite( session, !session.getWriteRequestQueue().isEmpty(session) && !session.isWriteSuspended()); } catch (Exception e) { IoFilterChain filterChain = session.getFilterChain(); filterChain.fireExceptionCaught(e); } } private class Processor implements Runnable { public void run() { int nSessions = 0; lastIdleCheckTime = System.currentTimeMillis(); for (;;) { try { // This select has a timeout so that we can manage // dile session when we get out of the select every // second. (note : this is a hack to avoid creating // a dedicated thread). int selected = select(SELECT_TIMEOUT); nSessions += handleNewSessions(); updateTrafficMask(); // Now, if we have had some incoming or outgoing events, // deal with them if (selected > 0) { process(); } long currentTime = System.currentTimeMillis(); flush(currentTime); nSessions -= remove(); notifyIdleSessions(currentTime); if (nSessions == 0) { synchronized (lock) { if (newSessions.isEmpty() && isSelectorEmpty()) { processor = null; break; } } } // Disconnect all sessions immediately if disposal has been // requested so that we exit this loop eventually. if (isDisposing()) { for (Iterator<T> i = allSessions(); i.hasNext(); ) { scheduleRemove(i.next()); } wakeup(); } } catch (Throwable t) { ExceptionMonitor.getInstance().exceptionCaught(t); try { Thread.sleep(1000); } catch (InterruptedException e1) { ExceptionMonitor.getInstance().exceptionCaught(e1); } } } try { synchronized (disposalLock) { if (isDisposing()) { dispose0(); } } } catch (Throwable t) { ExceptionMonitor.getInstance().exceptionCaught(t); } finally { disposalFuture.setValue(true); } } } protected static enum SessionState { OPEN, CLOSED, PREPARING, }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -