📄 abstractpollingioprocessor.java
字号:
* @param length the length of the portion to send * @return the number of written bytes * @throws Exception any exception thrown by the underlying system calls */ protected abstract int transferFile(T session, FileRegion region, int length) throws Exception; /** * {@inheritDoc} */ public final void add(T session) { if (isDisposing()) { throw new IllegalStateException("Already disposed."); } // Adds the session to the newSession queue and starts the worker newSessions.add(session); startupProcessor(); } /** * {@inheritDoc} */ public final void remove(T session) { scheduleRemove(session); startupProcessor(); } private void scheduleRemove(T session) { removingSessions.add(session); } /** * {@inheritDoc} */ public final void flush(T session) { boolean needsWakeup = flushingSessions.isEmpty(); if (scheduleFlush(session) && needsWakeup) { wakeup(); } } private boolean scheduleFlush(T session) { if (session.setScheduledForFlush(true)) { flushingSessions.add(session); return true; } return false; } /** * {@inheritDoc} */ public final void updateTrafficMask(T session) { scheduleTrafficControl(session); wakeup(); } private void scheduleTrafficControl(T session) { trafficControllingSessions.add(session); } /** * Starts the inner Processor, asking the executor to pick a thread in its * pool. The Runnable will be renamed */ private void startupProcessor() { synchronized (lock) { if (processor == null) { processor = new Processor(); executor.execute(new NamePreservingRunnable(processor, threadName)); } } // Just stop the select() and start it again, so that the processor // can be activated immediately. wakeup(); } /** * Handle newly created sessions * @return The number of new sessions */ private int handleNewSessions() { int addedSessions = 0; // Loop on the new sessions blocking queue, to count // the number of sessions who has been created for (;;) { T session = newSessions.poll(); if (session == null) { // We don't have new sessions break; } if (addNow(session)) { // A new session has been created addedSessions ++; } } return addedSessions; } private boolean addNow(T session) { boolean registered = false; boolean notified = false; try { init(session); registered = true; // Build the filter chain of this session. session.getService().getFilterChainBuilder().buildFilterChain( session.getFilterChain()); // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here // in AbstractIoFilterChain.fireSessionOpened(). ((AbstractIoService) session.getService()).getListeners().fireSessionCreated(session); notified = true; } catch (Throwable e) { if (notified) { // Clear the DefaultIoFilterChain.CONNECT_FUTURE attribute // and call ConnectFuture.setException(). scheduleRemove(session); IoFilterChain filterChain = session.getFilterChain(); filterChain.fireExceptionCaught(e); wakeup(); } else { ExceptionMonitor.getInstance().exceptionCaught(e); try { destroy(session); } catch (Exception e1) { ExceptionMonitor.getInstance().exceptionCaught(e1); } finally { registered = false; } } } return registered; } private int remove() { int removedSessions = 0; for (; ;) { T session = removingSessions.poll(); if (session == null) { break; } SessionState state = state(session); switch (state) { case OPEN: if (removeNow(session)) { removedSessions ++; } break; case CLOSED: // Skip if channel is already closed break; case PREPARING: // Retry later if session is not yet fully initialized. // (In case that Session.close() is called before addSession() is processed) scheduleRemove(session); return removedSessions; default: throw new IllegalStateException(String.valueOf(state)); } } return removedSessions; } private boolean removeNow(T session) { clearWriteRequestQueue(session); try { destroy(session); return true; } catch (Exception e) { IoFilterChain filterChain = session.getFilterChain(); filterChain.fireExceptionCaught(e); } finally { clearWriteRequestQueue(session); ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session); } return false; } private void clearWriteRequestQueue(T session) { WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue(); WriteRequest req; List<WriteRequest> failedRequests = new ArrayList<WriteRequest>(); if ((req = writeRequestQueue.poll(session)) != null) { Object m = req.getMessage(); if (m instanceof IoBuffer) { IoBuffer buf = (IoBuffer) req.getMessage(); // The first unwritten empty buffer must be // forwarded to the filter chain. if (buf.hasRemaining()) { buf.reset(); failedRequests.add(req); } else { IoFilterChain filterChain = session.getFilterChain(); filterChain.fireMessageSent(req); } } else { failedRequests.add(req); } // Discard others. while ((req = writeRequestQueue.poll(session)) != null) { failedRequests.add(req); } } // Create an exception and notify. if (!failedRequests.isEmpty()) { WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests); for (WriteRequest r: failedRequests) { session.decreaseScheduledBytesAndMessages(r); r.getFuture().setException(cause); } IoFilterChain filterChain = session.getFilterChain(); filterChain.fireExceptionCaught(cause); } } private void process() throws Exception { for (Iterator<T> i = selectedSessions(); i.hasNext();) { T session = i.next(); process(session); i.remove(); } } /** * Deal with session ready for the read or write operations, or both. */ private void process(T session) { // Process Reads if (isReadable(session) && !session.isReadSuspended()) { read(session); } // Process writes if (isWritable(session) && !session.isWriteSuspended()) { scheduleFlush(session); } } private void read(T session) { IoSessionConfig config = session.getConfig(); IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize()); final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation(); try { int readBytes = 0; int ret; try { if (hasFragmentation) { while ((ret = read(session, buf)) > 0) { readBytes += ret; if (!buf.hasRemaining()) { break; } } } else { ret = read(session, buf); if (ret > 0) { readBytes = ret; } } } finally { buf.flip(); } if (readBytes > 0) { IoFilterChain filterChain = session.getFilterChain(); filterChain.fireMessageReceived(buf); buf = null; if (hasFragmentation) { if (readBytes << 1 < config.getReadBufferSize()) { session.decreaseReadBufferSize(); } else if (readBytes == config.getReadBufferSize()) { session.increaseReadBufferSize(); } } } if (ret < 0) { scheduleRemove(session); } } catch (Throwable e) { if (e instanceof IOException) { scheduleRemove(session); } IoFilterChain filterChain = session.getFilterChain(); filterChain.fireExceptionCaught(e); } } private void notifyIdleSessions(long currentTime) throws Exception { // process idle sessions if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) { lastIdleCheckTime = currentTime; AbstractIoSession.notifyIdleness(allSessions(), currentTime); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -