⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 datagramacceptordelegate.java

📁 apache 的一个socket框架
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
                    if (nKeys > 0) {                        processReadySessions(selector.selectedKeys());                    }                    flushSessions();                    cancelKeys();                    if (selector.keys().isEmpty()) {                        synchronized (lock) {                            if (selector.keys().isEmpty()                                    && registerQueue.isEmpty()                                    && cancelQueue.isEmpty()) {                                worker = null;                                try {                                    selector.close();                                } catch (IOException e) {                                    ExceptionMonitor.getInstance()                                            .exceptionCaught(e);                                } finally {                                    selector = null;                                }                                break;                            }                        }                    }                } catch (IOException e) {                    ExceptionMonitor.getInstance().exceptionCaught(e);                    try {                        Thread.sleep(1000);                    } catch (InterruptedException e1) {                        ExceptionMonitor.getInstance().exceptionCaught(e1);                    }                }            }        }    }    private void processReadySessions(Set<SelectionKey> keys) {        Iterator<SelectionKey> it = keys.iterator();        while (it.hasNext()) {            SelectionKey key = it.next();            it.remove();            DatagramChannel ch = (DatagramChannel) key.channel();            RegistrationRequest req = (RegistrationRequest) key.attachment();            try {                if (key.isReadable()) {                    readSession(ch, req);                }                if (key.isWritable()) {                    for (Object o : getManagedSessions(req.address)) {                        scheduleFlush((DatagramSessionImpl) o);                    }                }            } catch (Throwable t) {                ExceptionMonitor.getInstance().exceptionCaught(t);            }        }    }    private void readSession(DatagramChannel channel, RegistrationRequest req)            throws Exception {        ByteBuffer readBuf = ByteBuffer                .allocate(((DatagramSessionConfig) req.config                        .getSessionConfig()).getReceiveBufferSize());        try {            SocketAddress remoteAddress = channel.receive(readBuf.buf());            if (remoteAddress != null) {                DatagramSessionImpl session = (DatagramSessionImpl) newSession(                        remoteAddress, req.address);                readBuf.flip();                ByteBuffer newBuf = ByteBuffer.allocate(readBuf.limit());                newBuf.put(readBuf);                newBuf.flip();                session.increaseReadBytes(newBuf.remaining());                session.getFilterChain().fireMessageReceived(session, newBuf);            }        } finally {            readBuf.release();        }    }    private void flushSessions() {        if (flushingSessions.size() == 0)            return;        for (;;) {            DatagramSessionImpl session = flushingSessions.poll();            if (session == null)                break;            try {                flush(session);            } catch (IOException e) {                session.getFilterChain().fireExceptionCaught(session, e);            }        }    }    private void flush(DatagramSessionImpl session) throws IOException {        DatagramChannel ch = session.getChannel();        Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();        for (;;) {            WriteRequest req = writeRequestQueue.peek();            if (req == null)                break;            ByteBuffer buf = (ByteBuffer) req.getMessage();            if (buf.remaining() == 0) {                // pop and fire event                writeRequestQueue.poll();                session.increaseWrittenMessages();                buf.reset();                session.getFilterChain().fireMessageSent(session, req);                continue;            }            SelectionKey key = session.getSelectionKey();            if (key == null) {                scheduleFlush(session);                break;            }            if (!key.isValid()) {                continue;            }            SocketAddress destination = req.getDestination();            if (destination == null) {                destination = session.getRemoteAddress();            }            int writtenBytes = ch.send(buf.buf(), destination);            if (writtenBytes == 0) {                // Kernel buffer is full                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);            } else if (writtenBytes > 0) {                key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));                // pop and fire event                writeRequestQueue.poll();                session.increaseWrittenBytes(writtenBytes);                session.increaseWrittenMessages();                buf.reset();                session.getFilterChain().fireMessageSent(session, req);            }        }    }    private void registerNew() {        if (registerQueue.isEmpty())            return;        for (;;) {            RegistrationRequest req = registerQueue.poll();            if (req == null)                break;            DatagramChannel ch = null;            try {                ch = DatagramChannel.open();                DatagramSessionConfig cfg;                if (req.config.getSessionConfig() instanceof DatagramSessionConfig) {                    cfg = (DatagramSessionConfig) req.config.getSessionConfig();                } else {                    cfg = getDefaultConfig().getSessionConfig();                }                ch.socket().setReuseAddress(cfg.isReuseAddress());                ch.socket().setBroadcast(cfg.isBroadcast());                ch.socket().setReceiveBufferSize(cfg.getReceiveBufferSize());                ch.socket().setSendBufferSize(cfg.getSendBufferSize());                if (ch.socket().getTrafficClass() != cfg.getTrafficClass()) {                    ch.socket().setTrafficClass(cfg.getTrafficClass());                }                ch.configureBlocking(false);                ch.socket().bind(req.address);                if (req.address == null || req.address.getPort() == 0) {                    req.address = (InetSocketAddress) ch.socket()                            .getLocalSocketAddress();                }                ch.register(selector, SelectionKey.OP_READ, req);                channels.put(req.address, ch);                getListeners().fireServiceActivated(this, req.address,                        req.handler, req.config);            } catch (Throwable t) {                req.exception = t;            } finally {                synchronized (req) {                    req.done = true;                    req.notify();                }                if (ch != null && req.exception != null) {                    try {                        ch.disconnect();                        ch.close();                    } catch (Throwable e) {                        ExceptionMonitor.getInstance().exceptionCaught(e);                    }                }            }        }    }    private void cancelKeys() {        if (cancelQueue.isEmpty())            return;        for (;;) {            CancellationRequest request = cancelQueue.poll();            if (request == null) {                break;            }            DatagramChannel ch = channels.remove(request.address);            // close the channel            try {                if (ch == null) {                    request.exception = new IllegalArgumentException(                            "Address not bound: " + request.address);                } else {                    SelectionKey key = ch.keyFor(selector);                    request.registrationRequest = (RegistrationRequest) key                            .attachment();                    key.cancel();                    selector.wakeup(); // wake up again to trigger thread death                    ch.disconnect();                    ch.close();                }            } catch (Throwable t) {                ExceptionMonitor.getInstance().exceptionCaught(t);            } finally {                synchronized (request) {                    request.done = true;                    request.notify();                }                if (request.exception == null) {                    getListeners().fireServiceDeactivated(this,                            request.address,                            request.registrationRequest.handler,                            request.registrationRequest.config);                }            }        }    }    public void updateTrafficMask(DatagramSessionImpl session) {        // There's no point in changing the traffic mask for sessions originating        // from this acceptor since new sessions are created every time data is        // received.    }    private static class RegistrationRequest {        private InetSocketAddress address;        private final IoHandler handler;        private final IoServiceConfig config;        private Throwable exception;        private boolean done;        private RegistrationRequest(SocketAddress address, IoHandler handler,                IoServiceConfig config) {            this.address = (InetSocketAddress) address;            this.handler = handler;            this.config = config;        }    }    private static class CancellationRequest {        private final SocketAddress address;        private boolean done;        private RegistrationRequest registrationRequest;        private RuntimeException exception;        private CancellationRequest(SocketAddress address) {            this.address = address;        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -