📄 socketacceptor.java
字号:
try { selector.close(); } catch (IOException e) { ExceptionMonitor.getInstance() .exceptionCaught(e); } finally { selector = null; } break; } } } } catch (IOException e) { ExceptionMonitor.getInstance().exceptionCaught(e); try { Thread.sleep(1000); } catch (InterruptedException e1) { ExceptionMonitor.getInstance().exceptionCaught(e1); } } } } private void processSessions(Set<SelectionKey> keys) throws IOException { Iterator<SelectionKey> it = keys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); if (!key.isAcceptable()) { continue; } ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel ch = ssc.accept(); if (ch == null) { continue; } boolean success = false; try { RegistrationRequest req = (RegistrationRequest) key .attachment(); SocketSessionImpl session = new SocketSessionImpl( SocketAcceptor.this, nextProcessor(), getListeners(), req.config, ch, req.handler, req.address); getFilterChainBuilder().buildFilterChain( session.getFilterChain()); req.config.getFilterChainBuilder().buildFilterChain( session.getFilterChain()); req.config.getThreadModel().buildFilterChain( session.getFilterChain()); session.getIoProcessor().addNew(session); success = true; } catch (Throwable t) { ExceptionMonitor.getInstance().exceptionCaught(t); } finally { if (!success) { ch.close(); } } } } } private SocketIoProcessor nextProcessor() { if (this.processorDistributor == Integer.MAX_VALUE) { this.processorDistributor = Integer.MAX_VALUE % this.processorCount; } return ioProcessors[processorDistributor++ % processorCount]; } public SocketAcceptorConfig getDefaultConfig() { return defaultConfig; } /** * Sets the config this acceptor will use by default. * * @param defaultConfig the default config. * @throws NullPointerException if the specified value is <code>null</code>. */ public void setDefaultConfig(SocketAcceptorConfig defaultConfig) { if (defaultConfig == null) { throw new NullPointerException("defaultConfig"); } this.defaultConfig = defaultConfig; } private void registerNew() { if (registerQueue.isEmpty()) { return; } for (;;) { RegistrationRequest req = registerQueue.poll(); if (req == null) { break; } ServerSocketChannel ssc = null; try { ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); // Configure the server socket, SocketAcceptorConfig cfg; if (req.config instanceof SocketAcceptorConfig) { cfg = (SocketAcceptorConfig) req.config; } else { cfg = getDefaultConfig(); } ssc.socket().setReuseAddress(cfg.isReuseAddress()); ssc.socket().setReceiveBufferSize( cfg.getSessionConfig().getReceiveBufferSize()); // and bind. ssc.socket().bind(req.address, cfg.getBacklog()); if (req.address == null || req.address.getPort() == 0) { req.address = (InetSocketAddress) ssc.socket() .getLocalSocketAddress(); } ssc.register(selector, SelectionKey.OP_ACCEPT, req); channels.put(req.address, ssc); getListeners().fireServiceActivated(this, req.address, req.handler, req.config); } catch (IOException e) { req.exception = e; } finally { req.done.countDown(); if (ssc != null && req.exception != null) { try { ssc.close(); } catch (IOException e) { ExceptionMonitor.getInstance().exceptionCaught(e); } } } } } private void cancelKeys() { if (cancelQueue.isEmpty()) { return; } for (;;) { CancellationRequest request = cancelQueue.poll(); if (request == null) { break; } ServerSocketChannel ssc = channels.remove(request.address); // close the channel try { if (ssc == null) { request.exception = new IllegalArgumentException( "Address not bound: " + request.address); } else { SelectionKey key = ssc.keyFor(selector); request.registrationRequest = (RegistrationRequest) key .attachment(); key.cancel(); selector.wakeup(); // wake up again to trigger thread death ssc.close(); } } catch (IOException e) { ExceptionMonitor.getInstance().exceptionCaught(e); } finally { request.done.countDown(); if (request.exception == null) { getListeners().fireServiceDeactivated(this, request.address, request.registrationRequest.handler, request.registrationRequest.config); } } } } private static class RegistrationRequest { private InetSocketAddress address; private final IoHandler handler; private final IoServiceConfig config; private final CountDownLatch done = new CountDownLatch(1); private volatile IOException exception; private RegistrationRequest(SocketAddress address, IoHandler handler, IoServiceConfig config) { this.address = (InetSocketAddress) address; this.handler = handler; this.config = config; } } private static class CancellationRequest { private final SocketAddress address; private final CountDownLatch done = new CountDownLatch(1); private RegistrationRequest registrationRequest; private volatile RuntimeException exception; private CancellationRequest(SocketAddress address) { this.address = address; } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -