⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 datagramconnectordelegate.java

📁 apache 的一个socket框架
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
            // Now mask the preferred ops with the mask of the current session            int mask = session.getTrafficMask().getInterestOps();            key.interestOps(ops & mask);        }    }    private class Worker implements Runnable {        public void run() {            Thread.currentThread().setName("DatagramConnector-" + id);            for (;;) {                try {                    int nKeys = selector.select();                    registerNew();                    doUpdateTrafficMask();                    if (nKeys > 0) {                        processReadySessions(selector.selectedKeys());                    }                    flushSessions();                    cancelKeys();                    if (selector.keys().isEmpty()) {                        synchronized (lock) {                            if (selector.keys().isEmpty()                                    && registerQueue.isEmpty()                                    && cancelQueue.isEmpty()) {                                worker = null;                                try {                                    selector.close();                                } catch (IOException e) {                                    ExceptionMonitor.getInstance()                                            .exceptionCaught(e);                                } finally {                                    selector = null;                                }                                break;                            }                        }                    }                } catch (IOException e) {                    ExceptionMonitor.getInstance().exceptionCaught(e);                    try {                        Thread.sleep(1000);                    } catch (InterruptedException e1) {                        ExceptionMonitor.getInstance().exceptionCaught(e1);                    }                }            }        }    }    private void processReadySessions(Set<SelectionKey> keys) {        Iterator<SelectionKey> it = keys.iterator();        while (it.hasNext()) {            SelectionKey key = it.next();            it.remove();            DatagramSessionImpl session = (DatagramSessionImpl) key                    .attachment();            // Let the recycler know that the session is still active.             getSessionRecycler(session).recycle(session.getLocalAddress(),                    session.getRemoteAddress());            if (key.isReadable() && session.getTrafficMask().isReadable()) {                readSession(session);            }            if (key.isWritable() && session.getTrafficMask().isWritable()) {                scheduleFlush(session);            }        }    }    private IoSessionRecycler getSessionRecycler(IoSession session) {        IoServiceConfig config = session.getServiceConfig();        IoSessionRecycler sessionRecycler;        if (config instanceof DatagramServiceConfig) {            sessionRecycler = ((DatagramServiceConfig) config)                    .getSessionRecycler();        } else {            sessionRecycler = defaultConfig.getSessionRecycler();        }        return sessionRecycler;    }    private void readSession(DatagramSessionImpl session) {        ByteBuffer readBuf = ByteBuffer.allocate(session.getReadBufferSize());        try {            int readBytes = session.getChannel().read(readBuf.buf());            if (readBytes > 0) {                readBuf.flip();                ByteBuffer newBuf = ByteBuffer.allocate(readBuf.limit());                newBuf.put(readBuf);                newBuf.flip();                session.increaseReadBytes(readBytes);                session.getFilterChain().fireMessageReceived(session, newBuf);            }        } catch (IOException e) {            session.getFilterChain().fireExceptionCaught(session, e);        } finally {            readBuf.release();        }    }    private void flushSessions() {        if (flushingSessions.size() == 0)            return;        for (;;) {            DatagramSessionImpl session = flushingSessions.poll();            if (session == null)                break;            try {                flush(session);            } catch (IOException e) {                session.getFilterChain().fireExceptionCaught(session, e);            }        }    }    private void flush(DatagramSessionImpl session) throws IOException {        DatagramChannel ch = session.getChannel();        Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();        for (;;) {            WriteRequest req = writeRequestQueue.peek();            if (req == null)                break;            ByteBuffer buf = (ByteBuffer) req.getMessage();            if (buf.remaining() == 0) {                // pop and fire event                writeRequestQueue.poll();                session.increaseWrittenMessages();                buf.reset();                session.getFilterChain().fireMessageSent(session, req);                continue;            }            SelectionKey key = session.getSelectionKey();            if (key == null) {                scheduleFlush(session);                break;            }            if (!key.isValid()) {                continue;            }            int writtenBytes = ch.write(buf.buf());            if (writtenBytes == 0) {                // Kernel buffer is full                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);            } else if (writtenBytes > 0) {                key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));                // pop and fire event                writeRequestQueue.poll();                session.increaseWrittenBytes(writtenBytes);                session.increaseWrittenMessages();                buf.reset();                session.getFilterChain().fireMessageSent(session, req);            }        }    }    private void registerNew() {        if (registerQueue.isEmpty())            return;        for (;;) {            RegistrationRequest req = registerQueue.poll();            if (req == null)                break;            DatagramSessionImpl session = new DatagramSessionImpl(wrapper,                    this, req.config, req.channel, req.handler, req.channel                            .socket().getRemoteSocketAddress(), req.channel                            .socket().getLocalSocketAddress());            // AbstractIoFilterChain will notify the connect future.            session.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE, req);            boolean success = false;            try {                SelectionKey key = req.channel.register(selector,                        SelectionKey.OP_READ, session);                session.setSelectionKey(key);                buildFilterChain(req, session);                getSessionRecycler(session).put(session);                // The CONNECT_FUTURE attribute is cleared and notified here.                getListeners().fireSessionCreated(session);                success = true;            } catch (Throwable t) {                // The CONNECT_FUTURE attribute is cleared and notified here.                session.getFilterChain().fireExceptionCaught(session, t);            } finally {                if (!success) {                    try {                        req.channel.disconnect();                        req.channel.close();                    } catch (IOException e) {                        ExceptionMonitor.getInstance().exceptionCaught(e);                    }                }            }        }    }    private void buildFilterChain(RegistrationRequest req, IoSession session)            throws Exception {        getFilterChainBuilder().buildFilterChain(session.getFilterChain());        req.config.getFilterChainBuilder().buildFilterChain(                session.getFilterChain());        req.config.getThreadModel().buildFilterChain(session.getFilterChain());    }    private void cancelKeys() {        if (cancelQueue.isEmpty())            return;        for (;;) {            DatagramSessionImpl session = cancelQueue.poll();            if (session == null)                break;            else {                SelectionKey key = session.getSelectionKey();                DatagramChannel ch = (DatagramChannel) key.channel();                try {                    ch.disconnect();                    ch.close();                } catch (IOException e) {                    ExceptionMonitor.getInstance().exceptionCaught(e);                }                getListeners().fireSessionDestroyed(session);                session.getCloseFuture().setClosed();                key.cancel();                selector.wakeup(); // wake up again to trigger thread death            }        }    }    private static class RegistrationRequest extends DefaultConnectFuture {        private final DatagramChannel channel;        private final IoHandler handler;        private final IoServiceConfig config;        private RegistrationRequest(DatagramChannel channel, IoHandler handler,                IoServiceConfig config) {            this.channel = channel;            this.handler = handler;            this.config = config;        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -