📄 abstractpollingioacceptor.java
字号:
* {@inheritDoc} */ @Override protected final Set<SocketAddress> bindInternal( List<? extends SocketAddress> localAddresses) throws Exception { // Create a bind request as a Future operation. When the selector // have handled the registration, it will signal this future. AcceptorOperationFuture request = new AcceptorOperationFuture( localAddresses); // adds the Registration request to the queue for the Workers // to handle registerQueue.add(request); // creates the Acceptor instance and has the local // executor kick it off. startupAcceptor(); // As we just started the acceptor, we have to unblock the select() // in order to process the bind request we just have added to the // registerQueue. wakeup(); // Now, we wait until this request is completed. request.awaitUninterruptibly(); if (request.getException() != null) { throw request.getException(); } // Update the local addresses. // setLocalAddresses() shouldn't be called from the worker thread // because of deadlock. Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>(); for (H handle:boundHandles.values()) { newLocalAddresses.add(localAddress(handle)); } return newLocalAddresses; } /** * This method is called by the doBind() and doUnbind() * methods. If the acceptor is null, the acceptor object will * be created and kicked off by the executor. If the acceptor * object is null, probably already created and this class * is now working, then nothing will happen and the method * will just return. */ private void startupAcceptor() { // If the acceptor is not ready, clear the queues // TODO : they should already be clean : do we have to do that ? if (!selectable) { registerQueue.clear(); cancelQueue.clear(); } // start the acceptor if not already started synchronized (lock) { if (acceptor == null) { acceptor = new Acceptor(); executeWorker(acceptor); } } } /** * {@inheritDoc} */ @Override protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception { AcceptorOperationFuture future = new AcceptorOperationFuture( localAddresses); cancelQueue.add(future); startupAcceptor(); wakeup(); future.awaitUninterruptibly(); if (future.getException() != null) { throw future.getException(); } } /** * This class is called by the startupAcceptor() method and is * placed into a NamePreservingRunnable class. * It's a thread accepting incoming connections from clients. * The loop is stopped when all the bound handlers are unbound. */ private class Acceptor implements Runnable { public void run() { int nHandles = 0; while (selectable) { try { // Detect if we have some keys ready to be processed // The select() will be woke up if some new connection // have occurred, or if the selector has been explicitly // woke up int selected = select(); // this actually sets the selector to OP_ACCEPT, // and binds to the port on which this class will // listen on nHandles += registerHandles(); if (selected > 0) { // We have some connection request, let's process // them here. processHandles(selectedHandles()); } // check to see if any cancellation request has been made. nHandles -= unregisterHandles(); // Now, if the number of registred handles is 0, we can // quit the loop: we don't have any socket listening // for incoming connection. if (nHandles == 0) { synchronized (lock) { if (registerQueue.isEmpty() && cancelQueue.isEmpty()) { acceptor = null; break; } } } } catch (Throwable e) { ExceptionMonitor.getInstance().exceptionCaught(e); try { Thread.sleep(1000); } catch (InterruptedException e1) { ExceptionMonitor.getInstance().exceptionCaught(e1); } } } // Cleanup all the processors, and shutdown the acceptor. if (selectable && isDisposing()) { selectable = false; try { if (createdProcessor) { processor.dispose(); } } finally { try { synchronized (disposalLock) { if (isDisposing()) { destroy(); } } } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } finally { disposalFuture.setDone(); } } } } /** * This method will process new sessions for the Worker class. All * keys that have had their status updates as per the Selector.selectedKeys() * method will be processed here. Only keys that are ready to accept * connections are handled here. * <p/> * Session objects are created by making new instances of SocketSessionImpl * and passing the session object to the SocketIoProcessor class. */ @SuppressWarnings("unchecked") private void processHandles(Iterator<H> handles) throws Exception { while (handles.hasNext()) { H handle = handles.next(); handles.remove(); // Associates a new created connection to a processor, // and get back a session T session = accept(processor, handle); if (session == null) { break; } initSession(session, null, null); // add the session to the SocketIoProcessor session.getProcessor().add(session); } } } /** * Sets up the socket communications. Sets items such as: * <p/> * Blocking * Reuse address * Receive buffer size * Bind to listen port * Registers OP_ACCEPT for selector */ private int registerHandles() { for (;;) { // The register queue contains the list of services to manage // in this acceptor. AcceptorOperationFuture future = registerQueue.poll(); if (future == null) { return 0; } // We create a temporary map to store the bound handles, // as we may have to remove them all if there is an exception // during the sockets opening. Map<SocketAddress, H> newHandles = new HashMap<SocketAddress, H>(); List<SocketAddress> localAddresses = future.getLocalAddresses(); try { // Process all the addresses for (SocketAddress a : localAddresses) { H handle = open(a); newHandles.put(localAddress(handle), handle); } // Everything went ok, we can now update the map storing // all the bound sockets. boundHandles.putAll(newHandles); // and notify. future.setDone(); return newHandles.size(); } catch (Exception e) { // We store the exception in the future future.setException(e); } finally { // Roll back if failed to bind all addresses. if (future.getException() != null) { for (H handle : newHandles.values()) { try { close(handle); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } } // TODO : add some comment : what is the wakeup() waking up ? wakeup(); } } } } /** * This method just checks to see if anything has been placed into the * cancellation queue. The only thing that should be in the cancelQueue * is CancellationRequest objects and the only place this happens is in * the doUnbind() method. */ private int unregisterHandles() { int cancelledHandles = 0; for (;;) { AcceptorOperationFuture future = cancelQueue.poll(); if (future == null) { break; } // close the channels for (SocketAddress a : future.getLocalAddresses()) { H handle = boundHandles.remove(a); if (handle == null) { continue; } try { close(handle); wakeup(); // wake up again to trigger thread death } catch (Throwable e) { ExceptionMonitor.getInstance().exceptionCaught(e); } finally { cancelledHandles++; } } future.setDone(); } return cancelledHandles; } /** * {@inheritDoc} */ public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) { throw new UnsupportedOperationException(); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -