⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 abstractpollingioconnector.java

📁 mina是以Java实现的一个开源的网络程序框架
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
     * Register a new client socket for connection, add it to connection polling     * @param handle client socket handle      * @param request the associated {@link ConnectionRequest}     * @throws Exception any exception thrown by the underlying systems calls     */    protected abstract void register(H handle, ConnectionRequest request) throws Exception;        /**     * get the {@link ConnectionRequest} for a given client socket handle     * @param handle the socket client handle      * @return the connection request if the socket is connecting otherwise <code>null</code>     */    protected abstract ConnectionRequest getConnectionRequest(H handle);    /**     * {@inheritDoc}     */    @Override    protected final IoFuture dispose0() throws Exception {        if (!disposalFuture.isDone()) {            startupWorker();            wakeup();        }        return disposalFuture;    }    /**     * {@inheritDoc}     */    @Override    @SuppressWarnings("unchecked")    protected final ConnectFuture connect0(            SocketAddress remoteAddress, SocketAddress localAddress,            IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {        H handle = null;        boolean success = false;        try {            handle = newHandle(localAddress);            if (connect(handle, remoteAddress)) {                ConnectFuture future = new DefaultConnectFuture();                T session = newSession(processor, handle);                initSession(session, future, sessionInitializer);                // Forward the remaining process to the IoProcessor.                session.getProcessor().add(session);                success = true;                return future;            }            success = true;        } catch (Exception e) {            return DefaultConnectFuture.newFailedFuture(e);        } finally {            if (!success && handle != null) {                try {                    close(handle);                } catch (Exception e) {                    ExceptionMonitor.getInstance().exceptionCaught(e);                }            }        }        ConnectionRequest request = new ConnectionRequest(handle, sessionInitializer);        connectQueue.add(request);        startupWorker();        wakeup();        return request;    }    private void startupWorker() {        if (!selectable) {            connectQueue.clear();            cancelQueue.clear();        }        synchronized (lock) {            if (connector == null) {                connector = new Connector();                executeWorker(connector);            }        }    }    private int registerNew() {        int nHandles = 0;        for (; ;) {            ConnectionRequest req = connectQueue.poll();            if (req == null) {                break;            }            H handle = req.handle;            try {                register(handle, req);                nHandles ++;            } catch (Exception e) {                req.setException(e);                try {                    close(handle);                } catch (Exception e2) {                    ExceptionMonitor.getInstance().exceptionCaught(e2);                }            }        }        return nHandles;    }    private int cancelKeys() {        int nHandles = 0;        for (; ;) {            ConnectionRequest req = cancelQueue.poll();            if (req == null) {                break;            }            H handle = req.handle;            try {                close(handle);            } catch (Exception e) {                ExceptionMonitor.getInstance().exceptionCaught(e);            } finally {                nHandles ++;            }        }        return nHandles;    }    /**     * Process the incoming connections, creating a new session for each     * valid connection.      */    private int processConnections(Iterator<H> handlers) {        int nHandles = 0;                // Loop on each connection request        while (handlers.hasNext()) {            H handle = handlers.next();            handlers.remove();            ConnectionRequest connectionRequest = getConnectionRequest(handle);                        if ( connectionRequest == null) {                continue;            }                        boolean success = false;            try {                if (finishConnect(handle)) {                    T session = newSession(processor, handle);                    initSession(session, connectionRequest, connectionRequest.getSessionInitializer());                    // Forward the remaining process to the IoProcessor.                    session.getProcessor().add(session);                    nHandles ++;                }                success = true;            } catch (Throwable e) {                connectionRequest.setException(e);            } finally {                if (!success) {                    // The connection failed, we have to cancel it.                    cancelQueue.offer(connectionRequest);                }            }        }        return nHandles;    }    private void processTimedOutSessions(Iterator<H> handles) {        long currentTime = System.currentTimeMillis();        while (handles.hasNext()) {            H handle = handles.next();            ConnectionRequest connectionRequest = getConnectionRequest(handle);            if ((connectionRequest != null) && (currentTime >= connectionRequest.deadline)) {                connectionRequest.setException(                        new ConnectException("Connection timed out."));                cancelQueue.offer(connectionRequest);            }        }    }    private class Connector implements Runnable {        public void run() {            int nHandles = 0;            while (selectable) {                try {                    // the timeout for select shall be smaller of the connect                    // timeout or 1 second...                    int timeout = (int)Math.min(getConnectTimeoutMillis(), 1000L);                    int selected = select(timeout);                    nHandles += registerNew();                    if (selected > 0) {                        nHandles -= processConnections(selectedHandles());                    }                    processTimedOutSessions(allHandles());                    nHandles -= cancelKeys();                    if (nHandles == 0) {                        synchronized (lock) {                            if (connectQueue.isEmpty()) {                                connector = null;                                break;                            }                        }                    }                } catch (Throwable e) {                    ExceptionMonitor.getInstance().exceptionCaught(e);                    try {                        Thread.sleep(1000);                    } catch (InterruptedException e1) {                        ExceptionMonitor.getInstance().exceptionCaught(e1);                    }                }            }            if (selectable && isDisposing()) {                selectable = false;                try {                    if (createdProcessor) {                        processor.dispose();                    }                } finally {                    try {                        synchronized (disposalLock) {                            if (isDisposing()) {                                destroy();                            }                        }                    } catch (Exception e) {                        ExceptionMonitor.getInstance().exceptionCaught(e);                    } finally {                        disposalFuture.setDone();                    }                }            }        }    }    public final class ConnectionRequest extends DefaultConnectFuture {        private final H handle;        private final long deadline;        private final IoSessionInitializer<? extends ConnectFuture> sessionInitializer;        public ConnectionRequest(H handle, IoSessionInitializer<? extends ConnectFuture> callback) {            this.handle = handle;            long timeout = getConnectTimeoutMillis();            if (timeout <= 0L) {                this.deadline = Long.MAX_VALUE;            } else {                this.deadline = System.currentTimeMillis() + timeout;            }            this.sessionInitializer = callback;        }        public H getHandle() {            return handle;        }        public long getDeadline() {            return deadline;        }        public IoSessionInitializer<? extends ConnectFuture> getSessionInitializer() {            return sessionInitializer;        }        @Override        public void cancel() {            if ( !isDone() ) {                super.cancel();                cancelQueue.add(this);                startupWorker();                wakeup();            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -