⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 socketacceptor.java

📁 apache 的一个socket框架
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
                                try {                                    selector.close();                                } catch (IOException e) {                                    ExceptionMonitor.getInstance()                                            .exceptionCaught(e);                                } finally {                                    selector = null;                                }                                break;                            }                        }                    }                } catch (IOException e) {                    ExceptionMonitor.getInstance().exceptionCaught(e);                    try {                        Thread.sleep(1000);                    } catch (InterruptedException e1) {                        ExceptionMonitor.getInstance().exceptionCaught(e1);                    }                }            }        }        private void processSessions(Set<SelectionKey> keys) throws IOException {            Iterator<SelectionKey> it = keys.iterator();            while (it.hasNext()) {                SelectionKey key = it.next();                it.remove();                if (!key.isAcceptable()) {                    continue;                }                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();                SocketChannel ch = ssc.accept();                if (ch == null) {                    continue;                }                boolean success = false;                try {                    RegistrationRequest req = (RegistrationRequest) key                            .attachment();                    SocketSessionImpl session = new SocketSessionImpl(                            SocketAcceptor.this, nextProcessor(),                            getListeners(), req.config, ch, req.handler,                            req.address);                    getFilterChainBuilder().buildFilterChain(                            session.getFilterChain());                    req.config.getFilterChainBuilder().buildFilterChain(                            session.getFilterChain());                    req.config.getThreadModel().buildFilterChain(                            session.getFilterChain());                    session.getIoProcessor().addNew(session);                    success = true;                } catch (Throwable t) {                    ExceptionMonitor.getInstance().exceptionCaught(t);                } finally {                    if (!success) {                        ch.close();                    }                }            }        }    }    private SocketIoProcessor nextProcessor() {        if (this.processorDistributor == Integer.MAX_VALUE) {            this.processorDistributor = Integer.MAX_VALUE % this.processorCount;        }        return ioProcessors[processorDistributor++ % processorCount];    }    public SocketAcceptorConfig getDefaultConfig() {        return defaultConfig;    }    /**     * Sets the config this acceptor will use by default.     *     * @param defaultConfig the default config.     * @throws NullPointerException if the specified value is <code>null</code>.     */    public void setDefaultConfig(SocketAcceptorConfig defaultConfig) {        if (defaultConfig == null) {            throw new NullPointerException("defaultConfig");        }        this.defaultConfig = defaultConfig;    }    private void registerNew() {        if (registerQueue.isEmpty()) {            return;        }        for (;;) {            RegistrationRequest req = registerQueue.poll();            if (req == null) {                break;            }            ServerSocketChannel ssc = null;            try {                ssc = ServerSocketChannel.open();                ssc.configureBlocking(false);                // Configure the server socket,                SocketAcceptorConfig cfg;                if (req.config instanceof SocketAcceptorConfig) {                    cfg = (SocketAcceptorConfig) req.config;                } else {                    cfg = getDefaultConfig();                }                ssc.socket().setReuseAddress(cfg.isReuseAddress());                ssc.socket().setReceiveBufferSize(                        cfg.getSessionConfig().getReceiveBufferSize());                // and bind.                ssc.socket().bind(req.address, cfg.getBacklog());                if (req.address == null || req.address.getPort() == 0) {                    req.address = (InetSocketAddress) ssc.socket()                            .getLocalSocketAddress();                }                ssc.register(selector, SelectionKey.OP_ACCEPT, req);                channels.put(req.address, ssc);                getListeners().fireServiceActivated(this, req.address,                        req.handler, req.config);            } catch (IOException e) {                req.exception = e;            } finally {                req.done.countDown();                if (ssc != null && req.exception != null) {                    try {                        ssc.close();                    } catch (IOException e) {                        ExceptionMonitor.getInstance().exceptionCaught(e);                    }                }            }        }    }    private void cancelKeys() {        if (cancelQueue.isEmpty()) {            return;        }        for (;;) {            CancellationRequest request = cancelQueue.poll();            if (request == null) {                break;            }            ServerSocketChannel ssc = channels.remove(request.address);            // close the channel            try {                if (ssc == null) {                    request.exception = new IllegalArgumentException(                            "Address not bound: " + request.address);                } else {                    SelectionKey key = ssc.keyFor(selector);                    request.registrationRequest = (RegistrationRequest) key                            .attachment();                    key.cancel();                    selector.wakeup(); // wake up again to trigger thread death                    ssc.close();                }            } catch (IOException e) {                ExceptionMonitor.getInstance().exceptionCaught(e);            } finally {                request.done.countDown();                if (request.exception == null) {                    getListeners().fireServiceDeactivated(this,                            request.address,                            request.registrationRequest.handler,                            request.registrationRequest.config);                }            }        }    }    private static class RegistrationRequest {        private InetSocketAddress address;        private final IoHandler handler;        private final IoServiceConfig config;        private final CountDownLatch done = new CountDownLatch(1);        private volatile IOException exception;        private RegistrationRequest(SocketAddress address, IoHandler handler,                IoServiceConfig config) {            this.address = (InetSocketAddress) address;            this.handler = handler;            this.config = config;        }    }    private static class CancellationRequest {        private final SocketAddress address;        private final CountDownLatch done = new CountDownLatch(1);        private RegistrationRequest registrationRequest;        private volatile RuntimeException exception;        private CancellationRequest(SocketAddress address) {            this.address = address;        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -