⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 socketioprocessor.java

📁 apache 的一个socket框架
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
        // process idle sessions        long currentTime = System.currentTimeMillis();        if ((currentTime - lastIdleCheckTime) >= 1000) {            lastIdleCheckTime = currentTime;            Set<SelectionKey> keys = selector.keys();            if (keys != null) {                for (SelectionKey key : keys) {                    SocketSessionImpl session = (SocketSessionImpl) key                            .attachment();                    notifyIdleness(session, currentTime);                }            }        }    }    private void notifyIdleness(SocketSessionImpl session, long currentTime) {        notifyIdleness0(session, currentTime, session                .getIdleTimeInMillis(IdleStatus.BOTH_IDLE),                IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session                        .getLastIdleTime(IdleStatus.BOTH_IDLE)));        notifyIdleness0(session, currentTime, session                .getIdleTimeInMillis(IdleStatus.READER_IDLE),                IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(),                        session.getLastIdleTime(IdleStatus.READER_IDLE)));        notifyIdleness0(session, currentTime, session                .getIdleTimeInMillis(IdleStatus.WRITER_IDLE),                IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(),                        session.getLastIdleTime(IdleStatus.WRITER_IDLE)));        notifyWriteTimeout(session, currentTime, session                .getWriteTimeoutInMillis(), session.getLastWriteTime());    }    private void notifyIdleness0(SocketSessionImpl session, long currentTime,            long idleTime, IdleStatus status, long lastIoTime) {        if (idleTime > 0 && lastIoTime != 0                && (currentTime - lastIoTime) >= idleTime) {            session.increaseIdleCount(status);            session.getFilterChain().fireSessionIdle(session, status);        }    }    private void notifyWriteTimeout(SocketSessionImpl session,            long currentTime, long writeTimeout, long lastIoTime) {        SelectionKey key = session.getSelectionKey();        if (writeTimeout > 0 && (currentTime - lastIoTime) >= writeTimeout                && key != null && key.isValid()                && (key.interestOps() & SelectionKey.OP_WRITE) != 0) {            session.getFilterChain().fireExceptionCaught(session,                    new WriteTimeoutException());        }    }    private void doFlush() {        for (;;) {            SocketSessionImpl session = flushingSessions.poll();            if (session == null)                break;            if (!session.isConnected()) {                releaseWriteBuffers(session);                continue;            }            SelectionKey key = session.getSelectionKey();            // Retry later if session is not yet fully initialized.            // (In case that Session.write() is called before addSession() is processed)            if (key == null) {                scheduleFlush(session);                break;            }            // Skip if the channel is already closed.            if (!key.isValid()) {                continue;            }            try {                doFlush(session);            } catch (IOException e) {                scheduleRemove(session);                session.getFilterChain().fireExceptionCaught(session, e);            }        }    }    private void releaseWriteBuffers(SocketSessionImpl session) {        Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();        WriteRequest req;        while ((req = writeRequestQueue.poll()) != null) {            try {                ((ByteBuffer) req.getMessage()).release();            } catch (IllegalStateException e) {                session.getFilterChain().fireExceptionCaught(session, e);            } finally {                req.getFuture().setWritten(false);            }        }    }    private void doFlush(SocketSessionImpl session) throws IOException {        // Clear OP_WRITE        SelectionKey key = session.getSelectionKey();        key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));        SocketChannel ch = session.getChannel();        Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();        for (;;) {            WriteRequest req = writeRequestQueue.peek();            if (req == null)                break;            ByteBuffer buf = (ByteBuffer) req.getMessage();            if (buf.remaining() == 0) {                writeRequestQueue.poll();                session.increaseWrittenMessages();                buf.reset();                session.getFilterChain().fireMessageSent(session, req);                continue;            }            if (key.isWritable()) {                int writtenBytes = ch.write(buf.buf());                if (writtenBytes > 0) {                    session.increaseWrittenBytes(writtenBytes);                }            }            if (buf.hasRemaining()) {                // Kernel buffer is full                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);                break;            }        }    }    private void doUpdateTrafficMask() {        if (trafficControllingSessions.isEmpty())            return;        for (;;) {            SocketSessionImpl session = trafficControllingSessions.poll();            if (session == null)                break;            SelectionKey key = session.getSelectionKey();            // Retry later if session is not yet fully initialized.            // (In case that Session.suspend??() or session.resume??() is            // called before addSession() is processed)            if (key == null) {                scheduleTrafficControl(session);                break;            }            // skip if channel is already closed            if (!key.isValid()) {                continue;            }            // The normal is OP_READ and, if there are write requests in the            // session's write queue, set OP_WRITE to trigger flushing.            int ops = SelectionKey.OP_READ;            Queue<WriteRequest> writeRequestQueue = session                    .getWriteRequestQueue();            synchronized (writeRequestQueue) {                if (!writeRequestQueue.isEmpty()) {                    ops |= SelectionKey.OP_WRITE;                }            }            // Now mask the preferred ops with the mask of the current session            int mask = session.getTrafficMask().getInterestOps();            key.interestOps(ops & mask);        }    }    private class Worker implements Runnable {        public void run() {            Thread.currentThread().setName(SocketIoProcessor.this.threadName);            for (;;) {                try {                    int nKeys = selector.select(1000);                    doAddNew();                    doUpdateTrafficMask();                    if (nKeys > 0) {                        process(selector.selectedKeys());                    }                    doFlush();                    doRemove();                    notifyIdleness();                    if (selector.keys().isEmpty()) {                        synchronized (lock) {                            if (selector.keys().isEmpty()                                    && newSessions.isEmpty()) {                                worker = null;                                try {                                    selector.close();                                } catch (IOException e) {                                    ExceptionMonitor.getInstance()                                            .exceptionCaught(e);                                } finally {                                    selector = null;                                }                                break;                            }                        }                    }                } catch (Throwable t) {                    ExceptionMonitor.getInstance().exceptionCaught(t);                    try {                        Thread.sleep(1000);                    } catch (InterruptedException e1) {                        ExceptionMonitor.getInstance().exceptionCaught(e1);                    }                }            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -