📄 datagramacceptordelegate.java
字号:
if (nKeys > 0) { processReadySessions(selector.selectedKeys()); } flushSessions(); cancelKeys(); if (selector.keys().isEmpty()) { synchronized (lock) { if (selector.keys().isEmpty() && registerQueue.isEmpty() && cancelQueue.isEmpty()) { worker = null; try { selector.close(); } catch (IOException e) { ExceptionMonitor.getInstance() .exceptionCaught(e); } finally { selector = null; } break; } } } } catch (IOException e) { ExceptionMonitor.getInstance().exceptionCaught(e); try { Thread.sleep(1000); } catch (InterruptedException e1) { ExceptionMonitor.getInstance().exceptionCaught(e1); } } } } } private void processReadySessions(Set<SelectionKey> keys) { Iterator<SelectionKey> it = keys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); DatagramChannel ch = (DatagramChannel) key.channel(); RegistrationRequest req = (RegistrationRequest) key.attachment(); try { if (key.isReadable()) { readSession(ch, req); } if (key.isWritable()) { for (Object o : getManagedSessions(req.address)) { scheduleFlush((DatagramSessionImpl) o); } } } catch (Throwable t) { ExceptionMonitor.getInstance().exceptionCaught(t); } } } private void readSession(DatagramChannel channel, RegistrationRequest req) throws Exception { ByteBuffer readBuf = ByteBuffer .allocate(((DatagramSessionConfig) req.config .getSessionConfig()).getReceiveBufferSize()); try { SocketAddress remoteAddress = channel.receive(readBuf.buf()); if (remoteAddress != null) { DatagramSessionImpl session = (DatagramSessionImpl) newSession( remoteAddress, req.address); readBuf.flip(); ByteBuffer newBuf = ByteBuffer.allocate(readBuf.limit()); newBuf.put(readBuf); newBuf.flip(); session.increaseReadBytes(newBuf.remaining()); session.getFilterChain().fireMessageReceived(session, newBuf); } } finally { readBuf.release(); } } private void flushSessions() { if (flushingSessions.size() == 0) return; for (;;) { DatagramSessionImpl session = flushingSessions.poll(); if (session == null) break; try { flush(session); } catch (IOException e) { session.getFilterChain().fireExceptionCaught(session, e); } } } private void flush(DatagramSessionImpl session) throws IOException { DatagramChannel ch = session.getChannel(); Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue(); for (;;) { WriteRequest req = writeRequestQueue.peek(); if (req == null) break; ByteBuffer buf = (ByteBuffer) req.getMessage(); if (buf.remaining() == 0) { // pop and fire event writeRequestQueue.poll(); session.increaseWrittenMessages(); buf.reset(); session.getFilterChain().fireMessageSent(session, req); continue; } SelectionKey key = session.getSelectionKey(); if (key == null) { scheduleFlush(session); break; } if (!key.isValid()) { continue; } SocketAddress destination = req.getDestination(); if (destination == null) { destination = session.getRemoteAddress(); } int writtenBytes = ch.send(buf.buf(), destination); if (writtenBytes == 0) { // Kernel buffer is full key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); } else if (writtenBytes > 0) { key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); // pop and fire event writeRequestQueue.poll(); session.increaseWrittenBytes(writtenBytes); session.increaseWrittenMessages(); buf.reset(); session.getFilterChain().fireMessageSent(session, req); } } } private void registerNew() { if (registerQueue.isEmpty()) return; for (;;) { RegistrationRequest req = registerQueue.poll(); if (req == null) break; DatagramChannel ch = null; try { ch = DatagramChannel.open(); DatagramSessionConfig cfg; if (req.config.getSessionConfig() instanceof DatagramSessionConfig) { cfg = (DatagramSessionConfig) req.config.getSessionConfig(); } else { cfg = getDefaultConfig().getSessionConfig(); } ch.socket().setReuseAddress(cfg.isReuseAddress()); ch.socket().setBroadcast(cfg.isBroadcast()); ch.socket().setReceiveBufferSize(cfg.getReceiveBufferSize()); ch.socket().setSendBufferSize(cfg.getSendBufferSize()); if (ch.socket().getTrafficClass() != cfg.getTrafficClass()) { ch.socket().setTrafficClass(cfg.getTrafficClass()); } ch.configureBlocking(false); ch.socket().bind(req.address); if (req.address == null || req.address.getPort() == 0) { req.address = (InetSocketAddress) ch.socket() .getLocalSocketAddress(); } ch.register(selector, SelectionKey.OP_READ, req); channels.put(req.address, ch); getListeners().fireServiceActivated(this, req.address, req.handler, req.config); } catch (Throwable t) { req.exception = t; } finally { synchronized (req) { req.done = true; req.notify(); } if (ch != null && req.exception != null) { try { ch.disconnect(); ch.close(); } catch (Throwable e) { ExceptionMonitor.getInstance().exceptionCaught(e); } } } } } private void cancelKeys() { if (cancelQueue.isEmpty()) return; for (;;) { CancellationRequest request = cancelQueue.poll(); if (request == null) { break; } DatagramChannel ch = channels.remove(request.address); // close the channel try { if (ch == null) { request.exception = new IllegalArgumentException( "Address not bound: " + request.address); } else { SelectionKey key = ch.keyFor(selector); request.registrationRequest = (RegistrationRequest) key .attachment(); key.cancel(); selector.wakeup(); // wake up again to trigger thread death ch.disconnect(); ch.close(); } } catch (Throwable t) { ExceptionMonitor.getInstance().exceptionCaught(t); } finally { synchronized (request) { request.done = true; request.notify(); } if (request.exception == null) { getListeners().fireServiceDeactivated(this, request.address, request.registrationRequest.handler, request.registrationRequest.config); } } } } public void updateTrafficMask(DatagramSessionImpl session) { // There's no point in changing the traffic mask for sessions originating // from this acceptor since new sessions are created every time data is // received. } private static class RegistrationRequest { private InetSocketAddress address; private final IoHandler handler; private final IoServiceConfig config; private Throwable exception; private boolean done; private RegistrationRequest(SocketAddress address, IoHandler handler, IoServiceConfig config) { this.address = (InetSocketAddress) address; this.handler = handler; this.config = config; } } private static class CancellationRequest { private final SocketAddress address; private boolean done; private RegistrationRequest registrationRequest; private RuntimeException exception; private CancellationRequest(SocketAddress address) { this.address = address; } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -