⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 abstractpollingconnectionlessioacceptor.java

📁 mina是以Java实现的一个开源的网络程序框架
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
        public void updateTrafficControl(T session) {            throw new UnsupportedOperationException();        }        public void dispose() {        }        public boolean isDisposed() {            return false;        }        public boolean isDisposing() {            return false;        }    }    /**     * Starts the inner Acceptor thread.     */    private void startupAcceptor() {        if (!selectable) {            registerQueue.clear();            cancelQueue.clear();            flushingSessions.clear();        }        synchronized (lock) {            if (acceptor == null) {                acceptor = new Acceptor();                executeWorker(acceptor);            }        }    }    private boolean scheduleFlush(T session) {        if (session.setScheduledForFlush(true)) {            flushingSessions.add(session);            return true;        } else {            return false;        }    }    /**     * This private class is used to accept incoming connection from      * clients. It's an infinite loop, which can be stopped when all     * the registered handles have been removed (unbound).      */    private class Acceptor implements Runnable {        public void run() {            int nHandles = 0;            lastIdleCheckTime = System.currentTimeMillis();            while (selectable) {                try {                    int selected = select();                    nHandles += registerHandles();                    if (selected > 0) {                        processReadySessions(selectedHandles());                    }                    long currentTime = System.currentTimeMillis();                    flushSessions(currentTime);                    nHandles -= unregisterHandles();                    notifyIdleSessions(currentTime);                    if (nHandles == 0) {                        synchronized (lock) {                            if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {                                acceptor = null;                                break;                            }                        }                    }                } catch (Exception e) {                    ExceptionMonitor.getInstance().exceptionCaught(e);                    try {                        Thread.sleep(1000);                    } catch (InterruptedException e1) {                    }                }            }            if (selectable && isDisposing()) {                selectable = false;                try {                    destroy();                } catch (Exception e) {                    ExceptionMonitor.getInstance().exceptionCaught(e);                } finally {                    disposalFuture.setValue(true);                }            }        }    }    @SuppressWarnings("unchecked")    private void processReadySessions(Iterator<H> handles) {        while (handles.hasNext()) {            H h = handles.next();            handles.remove();            try {                if (isReadable(h)) {                    readHandle(h);                }                if (isWritable(h)) {                    for (IoSession session : getManagedSessions().values()) {                        scheduleFlush((T) session);                    }                }            } catch (Throwable t) {                ExceptionMonitor.getInstance().exceptionCaught(t);            }        }    }    private void readHandle(H handle) throws Exception {        IoBuffer readBuf = IoBuffer.allocate(                getSessionConfig().getReadBufferSize());        SocketAddress remoteAddress = receive(handle, readBuf);        if (remoteAddress != null) {            IoSession session = newSessionWithoutLock(                    remoteAddress, localAddress(handle));            readBuf.flip();            IoBuffer newBuf = IoBuffer.allocate(readBuf.limit());            newBuf.put(readBuf);            newBuf.flip();            session.getFilterChain().fireMessageReceived(newBuf);        }    }    private void flushSessions(long currentTime) {        for (; ;) {            T session = flushingSessions.poll();            if (session == null) {                break;            }            session.setScheduledForFlush(false);            try {                boolean flushedAll = flush(session, currentTime);                if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) &&                    !session.isScheduledForFlush()) {                    scheduleFlush(session);                }            } catch (Exception e) {                session.getFilterChain().fireExceptionCaught(e);            }        }    }    private boolean flush(T session, long currentTime) throws Exception {        // Clear OP_WRITE        setInterestedInWrite(session, false);        final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();        final int maxWrittenBytes =            session.getConfig().getMaxReadBufferSize() +            (session.getConfig().getMaxReadBufferSize() >>> 1);        int writtenBytes = 0;        try {            for (; ;) {                WriteRequest req = session.getCurrentWriteRequest();                if (req == null) {                    req = writeRequestQueue.poll(session);                    if (req == null) {                        break;                    }                    session.setCurrentWriteRequest(req);                }                IoBuffer buf = (IoBuffer) req.getMessage();                if (buf.remaining() == 0) {                    // Clear and fire event                    session.setCurrentWriteRequest(null);                    buf.reset();                    session.getFilterChain().fireMessageSent(req);                    continue;                }                SocketAddress destination = req.getDestination();                if (destination == null) {                    destination = session.getRemoteAddress();                }                int localWrittenBytes = send(session, buf, destination);                if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {                    // Kernel buffer is full or wrote too much                    setInterestedInWrite(session, true);                    return false;                } else {                    setInterestedInWrite(session, false);                    // Clear and fire event                    session.setCurrentWriteRequest(null);                    writtenBytes += localWrittenBytes;                    buf.reset();                    session.getFilterChain().fireMessageSent(req);                }            }        } finally {            session.increaseWrittenBytes(writtenBytes, currentTime);        }        return true;    }    private int registerHandles() {        for (;;) {            AcceptorOperationFuture req = registerQueue.poll();            if (req == null) {                break;            }            Map<SocketAddress, H> newHandles = new HashMap<SocketAddress, H>();            List<SocketAddress> localAddresses = req.getLocalAddresses();            try {                for (SocketAddress a: localAddresses) {                    H handle = open(a);                    newHandles.put(localAddress(handle), handle);                }                boundHandles.putAll(newHandles);                getListeners().fireServiceActivated();                req.setDone();                return newHandles.size();            } catch (Exception e) {                req.setException(e);            } finally {                // Roll back if failed to bind all addresses.                if (req.getException() != null) {                    for (H handle: newHandles.values()) {                        try {                            close(handle);                        } catch (Exception e) {                            ExceptionMonitor.getInstance().exceptionCaught(e);                        }                    }                    wakeup();                }            }        }        return 0;    }    private int unregisterHandles() {        int nHandles = 0;        for (;;) {            AcceptorOperationFuture request = cancelQueue.poll();            if (request == null) {                break;            }            // close the channels            for (SocketAddress a: request.getLocalAddresses()) {                H handle = boundHandles.remove(a);                if (handle == null) {                    continue;                }                try {                    close(handle);                    wakeup(); // wake up again to trigger thread death                } catch (Throwable e) {                    ExceptionMonitor.getInstance().exceptionCaught(e);                } finally {                    nHandles ++;                }            }            request.setDone();        }        return nHandles;    }    private void notifyIdleSessions(long currentTime) {        // process idle sessions        if (currentTime - lastIdleCheckTime >= 1000) {            lastIdleCheckTime = currentTime;            AbstractIoSession.notifyIdleness(                    getListeners().getManagedSessions().values().iterator(),                    currentTime);        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -