📄 tcptransport.java
字号:
int getRestrictionPort() { return restrictionPort; } TransportBindingMeter getUnicastTransportBindingMeter(PeerID peerID, EndpointAddress destinationAddress) { if (unicastTransportMeter != null) { return unicastTransportMeter.getTransportBindingMeter( (peerID != null) ? peerID.toString() : TransportMeter.UNKNOWN_PEER, destinationAddress); } else { return null; } } void messengerReadyEvent(Messenger newMessenger, EndpointAddress connAddr) { messengerEventListener.messengerReady(new MessengerEvent(this, newMessenger, connAddr)); } /** * Getter for property 'server'. * * @return Value for property 'server'. */ IncomingUnicastServer getServer() { return unicastServer; } /** * Get a write selector from the cache. * * @return A write selector. * @throws InterruptedException If interrupted while waiting for a selector * to become available. */ Selector getSelector() throws InterruptedException { synchronized (writeSelectorCache) { Selector selector = null; try { if (!writeSelectorCache.isEmpty()) { selector = writeSelectorCache.pop(); } } catch (EmptyStackException ese) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("No write selector available, waiting for one"); } } int attempts = 0; while (selector == null && attempts < 2) { writeSelectorCache.wait(connectionTimeOut); try { if (!writeSelectorCache.isEmpty()) { selector = writeSelectorCache.pop(); } } catch (EmptyStackException ese) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Failed to get a write selector available, waiting for one", ese); } } attempts++; } return selector; } } /** * Return the <code>Selector</code> to the cache * * @param selector the selector to put back into the pool */ void returnSelector(Selector selector) { synchronized (writeSelectorCache) { if (extraWriteSelectors > 0) { // Allow the selector to be discarded. extraWriteSelectors--; } else { writeSelectorCache.push(selector); // it does not hurt to notify, even if there are no waiters writeSelectorCache.notify(); } } } /** * Waits for incoming data on channels and sends it to the appropriate * messenger object. */ private class MessengerSelectorThread implements Runnable { /** * {@inheritDoc} */ public void run() { try { if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("MessengerSelectorThread polling started"); } while (!isClosed) { try { int selectedKeys = 0; // Update channel registerations. updateChannelRegisterations(); try { // this can be interrupted through wakeup selectedKeys = messengerSelector.select(); } catch (CancelledKeyException cke) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Key was cancelled", cke); } } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine(MessageFormat.format("MessengerSelector has {0} selected keys", selectedKeys)); } if (selectedKeys == 0 && messengerSelector.selectNow() == 0) { // We were probably just woken. continue; } Set<SelectionKey> keySet = messengerSelector.selectedKeys(); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine(MessageFormat.format("KeySet has {0} selected keys", keySet.size())); } Iterator<SelectionKey> it = keySet.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); // remove it from the SelectedKeys Set it.remove(); if (key.isValid()) { try { if (key.isReadable() && key.channel().isOpen()) { // ensure this channel is not selected again until the thread is done with it // TcpMessenger is expected to reset the interestOps back to OP_READ // Without this, expect multiple threads to execute on the same event, until // the first thread completes reading all data available key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); // get the messenger TcpMessenger msgr = (TcpMessenger) key.attachment(); // process the data try { executor.execute(msgr); } catch (RejectedExecutionException re) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, MessageFormat.format("Executor rejected task for messenger :{0}", msgr.toString()), re); } } } } catch (CancelledKeyException cce) { //in case the key was canceled after the selection } } else { // unregister it, no need to keep invalid/closed channels around try { key.channel().close(); } catch (IOException io) { // avoids breaking out of the selector loop } key.cancel(); key = null; } } } catch (ClosedSelectorException cse) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("IO Selector closed"); } } catch (InterruptedIOException woken) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Thread inturrupted", woken); } } catch (IOException e1) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "An exception occurred while selecting keys", e1); } } catch (SecurityException e2) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "A security exception occurred while selecting keys", e2); } } } // XXX 20070205 bondolo What should we do about the channels // that are still registered with the selector and any pending // updates? } catch (Throwable all) { if (Logging.SHOW_SEVERE && Logging.SHOW_SEVERE) { LOG.log(Level.SEVERE, "Uncaught Throwable", all); } } finally { messengerSelectorThread = null; } } } /** * Registers the channel with the Read selector and attaches the messenger to the channel * * @param channel the socket channel. * @param messenger the messenger to attach to the channel. */ void register(SocketChannel channel, TcpMessenger messenger) { regisMap.put(messenger, channel); messengerSelector.wakeup(); } /** * Unregisters the channel with the Read selector * * @param channel the socket channel. */ void unregister(SocketChannel channel) { unregisMap.add(channel); messengerSelector.wakeup(); } /** * Registers all newly accepted and returned (by TcpMessenger) channels. * Removes all closing TcpMessengers. */ private synchronized void updateChannelRegisterations() { if (!regisMap.isEmpty() && Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine(MessageFormat.format("Registering {0} channels with MessengerSelectorThread", regisMap.size())); } if (!regisMap.isEmpty()) { Iterator<Map.Entry<TcpMessenger, SocketChannel>> eachMsgr = regisMap.entrySet().iterator(); while (eachMsgr.hasNext()) { Map.Entry<TcpMessenger, SocketChannel> anEntry = eachMsgr.next(); TcpMessenger msgr = anEntry.getKey(); SocketChannel channel = anEntry.getValue(); SelectionKey key = channel.keyFor(messengerSelector); try { if (key == null) { key = channel.register(messengerSelector, SelectionKey.OP_READ, msgr); } key.interestOps(key.interestOps() | SelectionKey.OP_READ); if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) { LOG.finer(MessageFormat.format("Key interestOps on channel {0}, bit set :{1}", channel, key.interestOps())); } } catch (ClosedChannelException e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Failed to register Channel with messenger selector", e); } // it's best a new messenger is created when a new messenger is requested msgr.close(); } catch (CancelledKeyException e) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Key is already cancelled, removing key from registeration map", e); } } catch (IllegalBlockingModeException e) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Invalid blocking channel mode, closing messenger", e); } // messenger state is unknown msgr.close(); } // remove it from the table eachMsgr.remove(); } } // Unregister and close channels. if (!unregisMap.isEmpty() && Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine(MessageFormat.format("Unregistering {0} channels with MessengerSelectorThread", unregisMap.size())); } if (!unregisMap.isEmpty()) { Iterator<SocketChannel> eachChannel; synchronized (unregisMap) { List<SocketChannel> allChannels = new ArrayList<SocketChannel>(unregisMap); unregisMap.clear(); eachChannel = allChannels.iterator(); } while (eachChannel.hasNext()) { SocketChannel aChannel = eachChannel.next(); SelectionKey key = aChannel.keyFor(messengerSelector); if (null != key) { try { key.cancel(); } catch (CancelledKeyException e) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Key is already cancelled, removing key from registeration map", e); } } } } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -