⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 abstractpollingioacceptor.java

📁 mina是以Java实现的一个开源的网络程序框架
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
     * {@inheritDoc}     */    @Override    protected final Set<SocketAddress> bindInternal(            List<? extends SocketAddress> localAddresses) throws Exception {        // Create a bind request as a Future operation. When the selector        // have handled the registration, it will signal this future.        AcceptorOperationFuture request = new AcceptorOperationFuture(                localAddresses);        // adds the Registration request to the queue for the Workers        // to handle        registerQueue.add(request);        // creates the Acceptor instance and has the local        // executor kick it off.        startupAcceptor();                // As we just started the acceptor, we have to unblock the select()        // in order to process the bind request we just have added to the         // registerQueue.        wakeup();                // Now, we wait until this request is completed.        request.awaitUninterruptibly();        if (request.getException() != null) {            throw request.getException();        }        // Update the local addresses.        // setLocalAddresses() shouldn't be called from the worker thread        // because of deadlock.        Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();                for (H handle:boundHandles.values()) {            newLocalAddresses.add(localAddress(handle));        }        return newLocalAddresses;    }    /**     * This method is called by the doBind() and doUnbind()     * methods.  If the acceptor is null, the acceptor object will     * be created and kicked off by the executor.  If the acceptor     * object is null, probably already created and this class     * is now working, then nothing will happen and the method     * will just return.     */    private void startupAcceptor() {        // If the acceptor is not ready, clear the queues        // TODO : they should already be clean : do we have to do that ?        if (!selectable) {            registerQueue.clear();            cancelQueue.clear();        }        // start the acceptor if not already started        synchronized (lock) {            if (acceptor == null) {                acceptor = new Acceptor();                executeWorker(acceptor);            }        }    }    /**     * {@inheritDoc}     */    @Override    protected final void unbind0(List<? extends SocketAddress> localAddresses)            throws Exception {        AcceptorOperationFuture future = new AcceptorOperationFuture(                localAddresses);        cancelQueue.add(future);        startupAcceptor();        wakeup();        future.awaitUninterruptibly();        if (future.getException() != null) {            throw future.getException();        }    }    /**     * This class is called by the startupAcceptor() method and is     * placed into a NamePreservingRunnable class.     * It's a thread accepting incoming connections from clients.     * The loop is stopped when all the bound handlers are unbound.     */    private class Acceptor implements Runnable {        public void run() {            int nHandles = 0;            while (selectable) {                try {                    // Detect if we have some keys ready to be processed                    // The select() will be woke up if some new connection                    // have occurred, or if the selector has been explicitly                    // woke up                    int selected = select();                    // this actually sets the selector to OP_ACCEPT,                    // and binds to the port on which this class will                    // listen on                    nHandles += registerHandles();                    if (selected > 0) {                        // We have some connection request, let's process                         // them here.                         processHandles(selectedHandles());                    }                    // check to see if any cancellation request has been made.                    nHandles -= unregisterHandles();                    // Now, if the number of registred handles is 0, we can                    // quit the loop: we don't have any socket listening                    // for incoming connection.                    if (nHandles == 0) {                        synchronized (lock) {                            if (registerQueue.isEmpty()                                    && cancelQueue.isEmpty()) {                                acceptor = null;                                break;                            }                        }                    }                } catch (Throwable e) {                    ExceptionMonitor.getInstance().exceptionCaught(e);                    try {                        Thread.sleep(1000);                    } catch (InterruptedException e1) {                        ExceptionMonitor.getInstance().exceptionCaught(e1);                    }                }            }            // Cleanup all the processors, and shutdown the acceptor.            if (selectable && isDisposing()) {                selectable = false;                try {                    if (createdProcessor) {                        processor.dispose();                    }                } finally {                    try {                        synchronized (disposalLock) {                            if (isDisposing()) {                                destroy();                            }                        }                    } catch (Exception e) {                        ExceptionMonitor.getInstance().exceptionCaught(e);                    } finally {                        disposalFuture.setDone();                    }                }            }        }        /**         * This method will process new sessions for the Worker class.  All         * keys that have had their status updates as per the Selector.selectedKeys()         * method will be processed here.  Only keys that are ready to accept         * connections are handled here.         * <p/>         * Session objects are created by making new instances of SocketSessionImpl         * and passing the session object to the SocketIoProcessor class.         */        @SuppressWarnings("unchecked")        private void processHandles(Iterator<H> handles) throws Exception {            while (handles.hasNext()) {                H handle = handles.next();                handles.remove();                // Associates a new created connection to a processor,                // and get back a session                T session = accept(processor, handle);                                if (session == null) {                    break;                }                initSession(session, null, null);                // add the session to the SocketIoProcessor                session.getProcessor().add(session);            }        }    }    /**     * Sets up the socket communications.  Sets items such as:     * <p/>     * Blocking     * Reuse address     * Receive buffer size     * Bind to listen port     * Registers OP_ACCEPT for selector     */    private int registerHandles() {        for (;;) {            // The register queue contains the list of services to manage            // in this acceptor.            AcceptorOperationFuture future = registerQueue.poll();                        if (future == null) {                return 0;            }            // We create a temporary map to store the bound handles,            // as we may have to remove them all if there is an exception            // during the sockets opening.            Map<SocketAddress, H> newHandles = new HashMap<SocketAddress, H>();            List<SocketAddress> localAddresses = future.getLocalAddresses();            try {                // Process all the addresses                for (SocketAddress a : localAddresses) {                    H handle = open(a);                    newHandles.put(localAddress(handle), handle);                }                // Everything went ok, we can now update the map storing                // all the bound sockets.                boundHandles.putAll(newHandles);                // and notify.                future.setDone();                return newHandles.size();            } catch (Exception e) {                // We store the exception in the future                future.setException(e);            } finally {                // Roll back if failed to bind all addresses.                if (future.getException() != null) {                    for (H handle : newHandles.values()) {                        try {                            close(handle);                        } catch (Exception e) {                            ExceptionMonitor.getInstance().exceptionCaught(e);                        }                    }                                        // TODO : add some comment : what is the wakeup() waking up ?                    wakeup();                }            }        }    }    /**     * This method just checks to see if anything has been placed into the     * cancellation queue.  The only thing that should be in the cancelQueue     * is CancellationRequest objects and the only place this happens is in     * the doUnbind() method.     */    private int unregisterHandles() {        int cancelledHandles = 0;        for (;;) {            AcceptorOperationFuture future = cancelQueue.poll();            if (future == null) {                break;            }            // close the channels            for (SocketAddress a : future.getLocalAddresses()) {                H handle = boundHandles.remove(a);                if (handle == null) {                    continue;                }                try {                    close(handle);                    wakeup(); // wake up again to trigger thread death                } catch (Throwable e) {                    ExceptionMonitor.getInstance().exceptionCaught(e);                } finally {                    cancelledHandles++;                }            }            future.setDone();        }        return cancelledHandles;    }    /**     * {@inheritDoc}     */    public final IoSession newSession(SocketAddress remoteAddress,            SocketAddress localAddress) {        throw new UnsupportedOperationException();    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -