⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 abstractpollingioprocessor.java

📁 mina是以Java实现的一个开源的网络程序框架
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
    }    private void flush(long currentTime) {        final T firstSession = flushingSessions.peek();        if (firstSession == null) {            return;        }        T session = flushingSessions.poll(); // the same one with firstSession        for (; ;) {            session.setScheduledForFlush(false);            SessionState state = state(session);                        switch (state) {            case OPEN:                try {                    boolean flushedAll = flushNow(session, currentTime);                    if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) &&                        !session.isScheduledForFlush()) {                        scheduleFlush(session);                    }                } catch (Exception e) {                    scheduleRemove(session);                    IoFilterChain filterChain = session.getFilterChain();                     filterChain.fireExceptionCaught(e);                }                break;            case CLOSED:                // Skip if the channel is already closed.                break;            case PREPARING:                // Retry later if session is not yet fully initialized.                // (In case that Session.write() is called before addSession() is processed)                scheduleFlush(session);                return;            default:                throw new IllegalStateException(String.valueOf(state));            }            session = flushingSessions.peek();            if (session == null || session == firstSession) {                break;            }            session = flushingSessions.poll();        }    }    private boolean flushNow(T session, long currentTime) {        if (!session.isConnected()) {            scheduleRemove(session);            return false;        }        final boolean hasFragmentation =            session.getTransportMetadata().hasFragmentation();        final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();        // Set limitation for the number of written bytes for read-write        // fairness.  I used maxReadBufferSize * 3 / 2, which yields best        // performance in my experience while not breaking fairness much.        final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize() +                              (session.getConfig().getMaxReadBufferSize() >>> 1);        int writtenBytes = 0;        try {            // Clear OP_WRITE            setInterestedInWrite(session, false);            do {                // Check for pending writes.                WriteRequest req = session.getCurrentWriteRequest();                if (req == null) {                    req = writeRequestQueue.poll(session);                    if (req == null) {                        break;                    }                    session.setCurrentWriteRequest(req);                }                int localWrittenBytes = 0;                Object message = req.getMessage();                if (message instanceof IoBuffer) {                    localWrittenBytes = writeBuffer(                            session, req, hasFragmentation,                            maxWrittenBytes - writtenBytes,                            currentTime);                    if (localWrittenBytes > 0 && ((IoBuffer)message).hasRemaining() ) {                    	// the buffer isn't empty, we re-interest it in writing                     	writtenBytes += localWrittenBytes;    	                    	setInterestedInWrite(session, true);                        return false;                    }                } else if (message instanceof FileRegion) {                    localWrittenBytes = writeFile(                            session, req, hasFragmentation,                            maxWrittenBytes - writtenBytes,                            currentTime);                    // Fix for Java bug on Linux http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988                    // If there's still data to be written in the FileRegion, return 0 indicating that we need                    // to pause until writing may resume.                    if (localWrittenBytes > 0 && ((FileRegion) message).getRemainingBytes() > 0) {                        writtenBytes += localWrittenBytes;                        setInterestedInWrite(session, true);                        return false;                    }                } else {                    throw new IllegalStateException("Don't know how to handle message of type '" + message.getClass().getName() + "'.  Are you missing a protocol encoder?");                }                if (localWrittenBytes == 0) {                    // Kernel buffer is full.                    setInterestedInWrite(session, true);                    return false;                }                writtenBytes += localWrittenBytes;                if (writtenBytes >= maxWrittenBytes) {                    // Wrote too much                    scheduleFlush(session);                    return false;                }            } while (writtenBytes < maxWrittenBytes);        } catch (Exception e) {            IoFilterChain filterChain = session.getFilterChain();             filterChain.fireExceptionCaught(e);            return false;        }        return true;    }    private int writeBuffer(T session, WriteRequest req,            boolean hasFragmentation, int maxLength, long currentTime) throws Exception {        IoBuffer buf = (IoBuffer) req.getMessage();        int localWrittenBytes = 0;        if (buf.hasRemaining()) {            int length;            if (hasFragmentation) {                length = Math.min(buf.remaining(), maxLength);            } else {                length = buf.remaining();            }            for (int i = WRITE_SPIN_COUNT; i > 0; i --) {                localWrittenBytes = write(session, buf, length);                if (localWrittenBytes != 0) {                    break;                }            }        }        session.increaseWrittenBytes(localWrittenBytes, currentTime);        if (!buf.hasRemaining() ||                !hasFragmentation && localWrittenBytes != 0) {            // Buffer has been sent, clear the current request.            buf.reset();            fireMessageSent(session, req);        }        return localWrittenBytes;    }    private int writeFile(T session, WriteRequest req,            boolean hasFragmentation, int maxLength, long currentTime) throws Exception {        int localWrittenBytes;        FileRegion region = (FileRegion) req.getMessage();        if (region.getRemainingBytes() > 0) {            int length;            if (hasFragmentation) {                length = (int) Math.min(region.getRemainingBytes(), maxLength);            } else {                length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes());            }            localWrittenBytes = transferFile(session, region, length);            region.update(localWrittenBytes);        } else {            localWrittenBytes = 0;        }        session.increaseWrittenBytes(localWrittenBytes, currentTime);        if (region.getRemainingBytes() <= 0 ||                    !hasFragmentation && localWrittenBytes != 0) {            fireMessageSent(session, req);        }        return localWrittenBytes;    }    private void fireMessageSent(T session, WriteRequest req) {        session.setCurrentWriteRequest(null);        IoFilterChain filterChain = session.getFilterChain();         filterChain.fireMessageSent(req);    }    private void updateTrafficMask() {        for (; ;) {            T session = trafficControllingSessions.poll();            if (session == null) {                break;            }            SessionState state = state(session);            switch (state) {            case OPEN:            	updateTrafficControl(session);                break;            case CLOSED:                break;            case PREPARING:                // Retry later if session is not yet fully initialized.                // (In case that Session.suspend??() or session.resume??() is                // called before addSession() is processed)                scheduleTrafficControl(session);                return;            default:                throw new IllegalStateException(String.valueOf(state));            }        }    }    public void updateTrafficControl(T session) {    	try {            setInterestedInRead(session, !session.isReadSuspended());        } catch (Exception e) {            IoFilterChain filterChain = session.getFilterChain();             filterChain.fireExceptionCaught(e);        }        try {            setInterestedInWrite(                    session,                    !session.getWriteRequestQueue().isEmpty(session) &&                            !session.isWriteSuspended());        } catch (Exception e) {            IoFilterChain filterChain = session.getFilterChain();             filterChain.fireExceptionCaught(e);        }    }            private class Processor implements Runnable {        public void run() {            int nSessions = 0;            lastIdleCheckTime = System.currentTimeMillis();            for (;;) {                try {                    // This select has a timeout so that we can manage                    // dile session when we get out of the select every                    // second. (note : this is a hack to avoid creating                    // a dedicated thread).                    int selected = select(SELECT_TIMEOUT);                    nSessions += handleNewSessions();                    updateTrafficMask();                    // Now, if we have had some incoming or outgoing events,                    // deal with them                    if (selected > 0) {                        process();                    }                    long currentTime = System.currentTimeMillis();                    flush(currentTime);                    nSessions -= remove();                    notifyIdleSessions(currentTime);                    if (nSessions == 0) {                        synchronized (lock) {                            if (newSessions.isEmpty() && isSelectorEmpty()) {                                processor = null;                                break;                            }                        }                    }                    // Disconnect all sessions immediately if disposal has been                    // requested so that we exit this loop eventually.                    if (isDisposing()) {                        for (Iterator<T> i = allSessions(); i.hasNext(); ) {                            scheduleRemove(i.next());                        }                        wakeup();                    }                } catch (Throwable t) {                    ExceptionMonitor.getInstance().exceptionCaught(t);                    try {                        Thread.sleep(1000);                    } catch (InterruptedException e1) {                        ExceptionMonitor.getInstance().exceptionCaught(e1);                    }                }            }            try {                synchronized (disposalLock) {                    if (isDisposing()) {                        dispose0();                    }                }            } catch (Throwable t) {                ExceptionMonitor.getInstance().exceptionCaught(t);            } finally {                disposalFuture.setValue(true);            }        }    }    protected static enum SessionState {        OPEN,        CLOSED,        PREPARING,    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -