📄 abstractpollingioconnector.java
字号:
* Register a new client socket for connection, add it to connection polling * @param handle client socket handle * @param request the associated {@link ConnectionRequest} * @throws Exception any exception thrown by the underlying systems calls */ protected abstract void register(H handle, ConnectionRequest request) throws Exception; /** * get the {@link ConnectionRequest} for a given client socket handle * @param handle the socket client handle * @return the connection request if the socket is connecting otherwise <code>null</code> */ protected abstract ConnectionRequest getConnectionRequest(H handle); /** * {@inheritDoc} */ @Override protected final IoFuture dispose0() throws Exception { if (!disposalFuture.isDone()) { startupWorker(); wakeup(); } return disposalFuture; } /** * {@inheritDoc} */ @Override @SuppressWarnings("unchecked") protected final ConnectFuture connect0( SocketAddress remoteAddress, SocketAddress localAddress, IoSessionInitializer<? extends ConnectFuture> sessionInitializer) { H handle = null; boolean success = false; try { handle = newHandle(localAddress); if (connect(handle, remoteAddress)) { ConnectFuture future = new DefaultConnectFuture(); T session = newSession(processor, handle); initSession(session, future, sessionInitializer); // Forward the remaining process to the IoProcessor. session.getProcessor().add(session); success = true; return future; } success = true; } catch (Exception e) { return DefaultConnectFuture.newFailedFuture(e); } finally { if (!success && handle != null) { try { close(handle); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } } } ConnectionRequest request = new ConnectionRequest(handle, sessionInitializer); connectQueue.add(request); startupWorker(); wakeup(); return request; } private void startupWorker() { if (!selectable) { connectQueue.clear(); cancelQueue.clear(); } synchronized (lock) { if (connector == null) { connector = new Connector(); executeWorker(connector); } } } private int registerNew() { int nHandles = 0; for (; ;) { ConnectionRequest req = connectQueue.poll(); if (req == null) { break; } H handle = req.handle; try { register(handle, req); nHandles ++; } catch (Exception e) { req.setException(e); try { close(handle); } catch (Exception e2) { ExceptionMonitor.getInstance().exceptionCaught(e2); } } } return nHandles; } private int cancelKeys() { int nHandles = 0; for (; ;) { ConnectionRequest req = cancelQueue.poll(); if (req == null) { break; } H handle = req.handle; try { close(handle); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } finally { nHandles ++; } } return nHandles; } /** * Process the incoming connections, creating a new session for each * valid connection. */ private int processConnections(Iterator<H> handlers) { int nHandles = 0; // Loop on each connection request while (handlers.hasNext()) { H handle = handlers.next(); handlers.remove(); ConnectionRequest connectionRequest = getConnectionRequest(handle); if ( connectionRequest == null) { continue; } boolean success = false; try { if (finishConnect(handle)) { T session = newSession(processor, handle); initSession(session, connectionRequest, connectionRequest.getSessionInitializer()); // Forward the remaining process to the IoProcessor. session.getProcessor().add(session); nHandles ++; } success = true; } catch (Throwable e) { connectionRequest.setException(e); } finally { if (!success) { // The connection failed, we have to cancel it. cancelQueue.offer(connectionRequest); } } } return nHandles; } private void processTimedOutSessions(Iterator<H> handles) { long currentTime = System.currentTimeMillis(); while (handles.hasNext()) { H handle = handles.next(); ConnectionRequest connectionRequest = getConnectionRequest(handle); if ((connectionRequest != null) && (currentTime >= connectionRequest.deadline)) { connectionRequest.setException( new ConnectException("Connection timed out.")); cancelQueue.offer(connectionRequest); } } } private class Connector implements Runnable { public void run() { int nHandles = 0; while (selectable) { try { // the timeout for select shall be smaller of the connect // timeout or 1 second... int timeout = (int)Math.min(getConnectTimeoutMillis(), 1000L); int selected = select(timeout); nHandles += registerNew(); if (selected > 0) { nHandles -= processConnections(selectedHandles()); } processTimedOutSessions(allHandles()); nHandles -= cancelKeys(); if (nHandles == 0) { synchronized (lock) { if (connectQueue.isEmpty()) { connector = null; break; } } } } catch (Throwable e) { ExceptionMonitor.getInstance().exceptionCaught(e); try { Thread.sleep(1000); } catch (InterruptedException e1) { ExceptionMonitor.getInstance().exceptionCaught(e1); } } } if (selectable && isDisposing()) { selectable = false; try { if (createdProcessor) { processor.dispose(); } } finally { try { synchronized (disposalLock) { if (isDisposing()) { destroy(); } } } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } finally { disposalFuture.setDone(); } } } } } public final class ConnectionRequest extends DefaultConnectFuture { private final H handle; private final long deadline; private final IoSessionInitializer<? extends ConnectFuture> sessionInitializer; public ConnectionRequest(H handle, IoSessionInitializer<? extends ConnectFuture> callback) { this.handle = handle; long timeout = getConnectTimeoutMillis(); if (timeout <= 0L) { this.deadline = Long.MAX_VALUE; } else { this.deadline = System.currentTimeMillis() + timeout; } this.sessionInitializer = callback; } public H getHandle() { return handle; } public long getDeadline() { return deadline; } public IoSessionInitializer<? extends ConnectFuture> getSessionInitializer() { return sessionInitializer; } @Override public void cancel() { if ( !isDone() ) { super.cancel(); cancelQueue.add(this); startupWorker(); wakeup(); } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -