⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 abstractpollingioprocessor.java

📁 mina是以Java实现的一个开源的网络程序框架
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
     * @param length the length of the portion to send     * @return the number of written bytes     * @throws Exception any exception thrown by the underlying system calls     */    protected abstract int transferFile(T session, FileRegion region, int length) throws Exception;    /**     * {@inheritDoc}     */    public final void add(T session) {        if (isDisposing()) {            throw new IllegalStateException("Already disposed.");        }        // Adds the session to the newSession queue and starts the worker        newSessions.add(session);        startupProcessor();    }    /**     * {@inheritDoc}     */    public final void remove(T session) {        scheduleRemove(session);        startupProcessor();    }    private void scheduleRemove(T session) {        removingSessions.add(session);    }    /**     * {@inheritDoc}     */    public final void flush(T session) {        boolean needsWakeup = flushingSessions.isEmpty();        if (scheduleFlush(session) && needsWakeup) {            wakeup();        }    }    private boolean scheduleFlush(T session) {        if (session.setScheduledForFlush(true)) {            flushingSessions.add(session);            return true;        }        return false;    }    /**     * {@inheritDoc}     */    public final void updateTrafficMask(T session) {        scheduleTrafficControl(session);        wakeup();    }    private void scheduleTrafficControl(T session) {        trafficControllingSessions.add(session);    }    /**     * Starts the inner Processor, asking the executor to pick a thread in its     * pool. The Runnable will be renamed      */    private void startupProcessor() {        synchronized (lock) {            if (processor == null) {                processor = new Processor();                executor.execute(new NamePreservingRunnable(processor, threadName));            }        }                // Just stop the select() and start it again, so that the processor        // can be activated immediately.         wakeup();    }    /**     * Handle newly created sessions     * @return The number of new sessions     */    private int handleNewSessions() {        int addedSessions = 0;                // Loop on the new sessions blocking queue, to count        // the number of sessions who has been created        for (;;) {            T session = newSessions.poll();            if (session == null) {                // We don't have new sessions                break;            }            if (addNow(session)) {                // A new session has been created                 addedSessions ++;            }        }        return addedSessions;    }    private boolean addNow(T session) {        boolean registered = false;        boolean notified = false;        try {            init(session);            registered = true;            // Build the filter chain of this session.            session.getService().getFilterChainBuilder().buildFilterChain(                    session.getFilterChain());            // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here            // in AbstractIoFilterChain.fireSessionOpened().            ((AbstractIoService) session.getService()).getListeners().fireSessionCreated(session);            notified = true;        } catch (Throwable e) {            if (notified) {                // Clear the DefaultIoFilterChain.CONNECT_FUTURE attribute                // and call ConnectFuture.setException().                scheduleRemove(session);                IoFilterChain filterChain = session.getFilterChain();                 filterChain.fireExceptionCaught(e);                wakeup();            } else {                ExceptionMonitor.getInstance().exceptionCaught(e);                try {                    destroy(session);                } catch (Exception e1) {                    ExceptionMonitor.getInstance().exceptionCaught(e1);                } finally {                    registered = false;                }            }        }        return registered;    }    private int remove() {        int removedSessions = 0;        for (; ;) {            T session = removingSessions.poll();            if (session == null) {                break;            }            SessionState state = state(session);            switch (state) {            case OPEN:                if (removeNow(session)) {                    removedSessions ++;                }                break;            case CLOSED:                // Skip if channel is already closed                break;            case PREPARING:                // Retry later if session is not yet fully initialized.                // (In case that Session.close() is called before addSession() is processed)                scheduleRemove(session);                return removedSessions;            default:                throw new IllegalStateException(String.valueOf(state));            }        }        return removedSessions;    }    private boolean removeNow(T session) {        clearWriteRequestQueue(session);        try {            destroy(session);            return true;        } catch (Exception e) {            IoFilterChain filterChain = session.getFilterChain();             filterChain.fireExceptionCaught(e);        } finally {            clearWriteRequestQueue(session);            ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);        }        return false;    }    private void clearWriteRequestQueue(T session) {        WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();        WriteRequest req;        List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();        if ((req = writeRequestQueue.poll(session)) != null) {            Object m = req.getMessage();            if (m instanceof IoBuffer) {                IoBuffer buf = (IoBuffer) req.getMessage();                // The first unwritten empty buffer must be                // forwarded to the filter chain.                if (buf.hasRemaining()) {                    buf.reset();                    failedRequests.add(req);                } else {                    IoFilterChain filterChain = session.getFilterChain();                     filterChain.fireMessageSent(req);                }            } else {                failedRequests.add(req);            }            // Discard others.            while ((req = writeRequestQueue.poll(session)) != null) {                failedRequests.add(req);            }        }        // Create an exception and notify.        if (!failedRequests.isEmpty()) {            WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);            for (WriteRequest r: failedRequests) {                session.decreaseScheduledBytesAndMessages(r);                r.getFuture().setException(cause);            }            IoFilterChain filterChain = session.getFilterChain();             filterChain.fireExceptionCaught(cause);        }    }    private void process() throws Exception {        for (Iterator<T> i = selectedSessions(); i.hasNext();) {        	T session = i.next();            process(session);            i.remove();        }    }    /**     * Deal with session ready for the read or write operations, or both.     */    private void process(T session) {        // Process Reads        if (isReadable(session) && !session.isReadSuspended()) {            read(session);        }        // Process writes        if (isWritable(session) && !session.isWriteSuspended()) {            scheduleFlush(session);        }    }    private void read(T session) {        IoSessionConfig config = session.getConfig();        IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());        final boolean hasFragmentation =            session.getTransportMetadata().hasFragmentation();        try {            int readBytes = 0;            int ret;            try {                if (hasFragmentation) {                    while ((ret = read(session, buf)) > 0) {                        readBytes += ret;                        if (!buf.hasRemaining()) {                            break;                        }                    }                } else {                    ret = read(session, buf);                    if (ret > 0) {                        readBytes = ret;                    }                }            } finally {                buf.flip();            }            if (readBytes > 0) {                IoFilterChain filterChain = session.getFilterChain();                 filterChain.fireMessageReceived(buf);                buf = null;                if (hasFragmentation) {                    if (readBytes << 1 < config.getReadBufferSize()) {                        session.decreaseReadBufferSize();                    } else if (readBytes == config.getReadBufferSize()) {                        session.increaseReadBufferSize();                    }                }            }            if (ret < 0) {                scheduleRemove(session);            }        } catch (Throwable e) {            if (e instanceof IOException) {                scheduleRemove(session);            }            IoFilterChain filterChain = session.getFilterChain();             filterChain.fireExceptionCaught(e);        }    }    private void notifyIdleSessions(long currentTime) throws Exception {        // process idle sessions        if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) {            lastIdleCheckTime = currentTime;            AbstractIoSession.notifyIdleness(allSessions(), currentTime);        }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -