📄 abstractpollingconnectionlessioacceptor.java
字号:
public void updateTrafficControl(T session) { throw new UnsupportedOperationException(); } public void dispose() { } public boolean isDisposed() { return false; } public boolean isDisposing() { return false; } } /** * Starts the inner Acceptor thread. */ private void startupAcceptor() { if (!selectable) { registerQueue.clear(); cancelQueue.clear(); flushingSessions.clear(); } synchronized (lock) { if (acceptor == null) { acceptor = new Acceptor(); executeWorker(acceptor); } } } private boolean scheduleFlush(T session) { if (session.setScheduledForFlush(true)) { flushingSessions.add(session); return true; } else { return false; } } /** * This private class is used to accept incoming connection from * clients. It's an infinite loop, which can be stopped when all * the registered handles have been removed (unbound). */ private class Acceptor implements Runnable { public void run() { int nHandles = 0; lastIdleCheckTime = System.currentTimeMillis(); while (selectable) { try { int selected = select(); nHandles += registerHandles(); if (selected > 0) { processReadySessions(selectedHandles()); } long currentTime = System.currentTimeMillis(); flushSessions(currentTime); nHandles -= unregisterHandles(); notifyIdleSessions(currentTime); if (nHandles == 0) { synchronized (lock) { if (registerQueue.isEmpty() && cancelQueue.isEmpty()) { acceptor = null; break; } } } } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); try { Thread.sleep(1000); } catch (InterruptedException e1) { } } } if (selectable && isDisposing()) { selectable = false; try { destroy(); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } finally { disposalFuture.setValue(true); } } } } @SuppressWarnings("unchecked") private void processReadySessions(Iterator<H> handles) { while (handles.hasNext()) { H h = handles.next(); handles.remove(); try { if (isReadable(h)) { readHandle(h); } if (isWritable(h)) { for (IoSession session : getManagedSessions().values()) { scheduleFlush((T) session); } } } catch (Throwable t) { ExceptionMonitor.getInstance().exceptionCaught(t); } } } private void readHandle(H handle) throws Exception { IoBuffer readBuf = IoBuffer.allocate( getSessionConfig().getReadBufferSize()); SocketAddress remoteAddress = receive(handle, readBuf); if (remoteAddress != null) { IoSession session = newSessionWithoutLock( remoteAddress, localAddress(handle)); readBuf.flip(); IoBuffer newBuf = IoBuffer.allocate(readBuf.limit()); newBuf.put(readBuf); newBuf.flip(); session.getFilterChain().fireMessageReceived(newBuf); } } private void flushSessions(long currentTime) { for (; ;) { T session = flushingSessions.poll(); if (session == null) { break; } session.setScheduledForFlush(false); try { boolean flushedAll = flush(session, currentTime); if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) && !session.isScheduledForFlush()) { scheduleFlush(session); } } catch (Exception e) { session.getFilterChain().fireExceptionCaught(e); } } } private boolean flush(T session, long currentTime) throws Exception { // Clear OP_WRITE setInterestedInWrite(session, false); final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue(); final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize() + (session.getConfig().getMaxReadBufferSize() >>> 1); int writtenBytes = 0; try { for (; ;) { WriteRequest req = session.getCurrentWriteRequest(); if (req == null) { req = writeRequestQueue.poll(session); if (req == null) { break; } session.setCurrentWriteRequest(req); } IoBuffer buf = (IoBuffer) req.getMessage(); if (buf.remaining() == 0) { // Clear and fire event session.setCurrentWriteRequest(null); buf.reset(); session.getFilterChain().fireMessageSent(req); continue; } SocketAddress destination = req.getDestination(); if (destination == null) { destination = session.getRemoteAddress(); } int localWrittenBytes = send(session, buf, destination); if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) { // Kernel buffer is full or wrote too much setInterestedInWrite(session, true); return false; } else { setInterestedInWrite(session, false); // Clear and fire event session.setCurrentWriteRequest(null); writtenBytes += localWrittenBytes; buf.reset(); session.getFilterChain().fireMessageSent(req); } } } finally { session.increaseWrittenBytes(writtenBytes, currentTime); } return true; } private int registerHandles() { for (;;) { AcceptorOperationFuture req = registerQueue.poll(); if (req == null) { break; } Map<SocketAddress, H> newHandles = new HashMap<SocketAddress, H>(); List<SocketAddress> localAddresses = req.getLocalAddresses(); try { for (SocketAddress a: localAddresses) { H handle = open(a); newHandles.put(localAddress(handle), handle); } boundHandles.putAll(newHandles); getListeners().fireServiceActivated(); req.setDone(); return newHandles.size(); } catch (Exception e) { req.setException(e); } finally { // Roll back if failed to bind all addresses. if (req.getException() != null) { for (H handle: newHandles.values()) { try { close(handle); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } } wakeup(); } } } return 0; } private int unregisterHandles() { int nHandles = 0; for (;;) { AcceptorOperationFuture request = cancelQueue.poll(); if (request == null) { break; } // close the channels for (SocketAddress a: request.getLocalAddresses()) { H handle = boundHandles.remove(a); if (handle == null) { continue; } try { close(handle); wakeup(); // wake up again to trigger thread death } catch (Throwable e) { ExceptionMonitor.getInstance().exceptionCaught(e); } finally { nHandles ++; } } request.setDone(); } return nHandles; } private void notifyIdleSessions(long currentTime) { // process idle sessions if (currentTime - lastIdleCheckTime >= 1000) { lastIdleCheckTime = currentTime; AbstractIoSession.notifyIdleness( getListeners().getManagedSessions().values().iterator(), currentTime); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -