📄 datagramconnectordelegate.java
字号:
// Now mask the preferred ops with the mask of the current session int mask = session.getTrafficMask().getInterestOps(); key.interestOps(ops & mask); } } private class Worker implements Runnable { public void run() { Thread.currentThread().setName("DatagramConnector-" + id); for (;;) { try { int nKeys = selector.select(); registerNew(); doUpdateTrafficMask(); if (nKeys > 0) { processReadySessions(selector.selectedKeys()); } flushSessions(); cancelKeys(); if (selector.keys().isEmpty()) { synchronized (lock) { if (selector.keys().isEmpty() && registerQueue.isEmpty() && cancelQueue.isEmpty()) { worker = null; try { selector.close(); } catch (IOException e) { ExceptionMonitor.getInstance() .exceptionCaught(e); } finally { selector = null; } break; } } } } catch (IOException e) { ExceptionMonitor.getInstance().exceptionCaught(e); try { Thread.sleep(1000); } catch (InterruptedException e1) { ExceptionMonitor.getInstance().exceptionCaught(e1); } } } } } private void processReadySessions(Set<SelectionKey> keys) { Iterator<SelectionKey> it = keys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); DatagramSessionImpl session = (DatagramSessionImpl) key .attachment(); // Let the recycler know that the session is still active. getSessionRecycler(session).recycle(session.getLocalAddress(), session.getRemoteAddress()); if (key.isReadable() && session.getTrafficMask().isReadable()) { readSession(session); } if (key.isWritable() && session.getTrafficMask().isWritable()) { scheduleFlush(session); } } } private IoSessionRecycler getSessionRecycler(IoSession session) { IoServiceConfig config = session.getServiceConfig(); IoSessionRecycler sessionRecycler; if (config instanceof DatagramServiceConfig) { sessionRecycler = ((DatagramServiceConfig) config) .getSessionRecycler(); } else { sessionRecycler = defaultConfig.getSessionRecycler(); } return sessionRecycler; } private void readSession(DatagramSessionImpl session) { ByteBuffer readBuf = ByteBuffer.allocate(session.getReadBufferSize()); try { int readBytes = session.getChannel().read(readBuf.buf()); if (readBytes > 0) { readBuf.flip(); ByteBuffer newBuf = ByteBuffer.allocate(readBuf.limit()); newBuf.put(readBuf); newBuf.flip(); session.increaseReadBytes(readBytes); session.getFilterChain().fireMessageReceived(session, newBuf); } } catch (IOException e) { session.getFilterChain().fireExceptionCaught(session, e); } finally { readBuf.release(); } } private void flushSessions() { if (flushingSessions.size() == 0) return; for (;;) { DatagramSessionImpl session = flushingSessions.poll(); if (session == null) break; try { flush(session); } catch (IOException e) { session.getFilterChain().fireExceptionCaught(session, e); } } } private void flush(DatagramSessionImpl session) throws IOException { DatagramChannel ch = session.getChannel(); Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue(); for (;;) { WriteRequest req = writeRequestQueue.peek(); if (req == null) break; ByteBuffer buf = (ByteBuffer) req.getMessage(); if (buf.remaining() == 0) { // pop and fire event writeRequestQueue.poll(); session.increaseWrittenMessages(); buf.reset(); session.getFilterChain().fireMessageSent(session, req); continue; } SelectionKey key = session.getSelectionKey(); if (key == null) { scheduleFlush(session); break; } if (!key.isValid()) { continue; } int writtenBytes = ch.write(buf.buf()); if (writtenBytes == 0) { // Kernel buffer is full key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); } else if (writtenBytes > 0) { key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); // pop and fire event writeRequestQueue.poll(); session.increaseWrittenBytes(writtenBytes); session.increaseWrittenMessages(); buf.reset(); session.getFilterChain().fireMessageSent(session, req); } } } private void registerNew() { if (registerQueue.isEmpty()) return; for (;;) { RegistrationRequest req = registerQueue.poll(); if (req == null) break; DatagramSessionImpl session = new DatagramSessionImpl(wrapper, this, req.config, req.channel, req.handler, req.channel .socket().getRemoteSocketAddress(), req.channel .socket().getLocalSocketAddress()); // AbstractIoFilterChain will notify the connect future. session.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE, req); boolean success = false; try { SelectionKey key = req.channel.register(selector, SelectionKey.OP_READ, session); session.setSelectionKey(key); buildFilterChain(req, session); getSessionRecycler(session).put(session); // The CONNECT_FUTURE attribute is cleared and notified here. getListeners().fireSessionCreated(session); success = true; } catch (Throwable t) { // The CONNECT_FUTURE attribute is cleared and notified here. session.getFilterChain().fireExceptionCaught(session, t); } finally { if (!success) { try { req.channel.disconnect(); req.channel.close(); } catch (IOException e) { ExceptionMonitor.getInstance().exceptionCaught(e); } } } } } private void buildFilterChain(RegistrationRequest req, IoSession session) throws Exception { getFilterChainBuilder().buildFilterChain(session.getFilterChain()); req.config.getFilterChainBuilder().buildFilterChain( session.getFilterChain()); req.config.getThreadModel().buildFilterChain(session.getFilterChain()); } private void cancelKeys() { if (cancelQueue.isEmpty()) return; for (;;) { DatagramSessionImpl session = cancelQueue.poll(); if (session == null) break; else { SelectionKey key = session.getSelectionKey(); DatagramChannel ch = (DatagramChannel) key.channel(); try { ch.disconnect(); ch.close(); } catch (IOException e) { ExceptionMonitor.getInstance().exceptionCaught(e); } getListeners().fireSessionDestroyed(session); session.getCloseFuture().setClosed(); key.cancel(); selector.wakeup(); // wake up again to trigger thread death } } } private static class RegistrationRequest extends DefaultConnectFuture { private final DatagramChannel channel; private final IoHandler handler; private final IoServiceConfig config; private RegistrationRequest(DatagramChannel channel, IoHandler handler, IoServiceConfig config) { this.channel = channel; this.handler = handler; this.config = config; } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -