⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tcptransport.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
    int getRestrictionPort() {        return restrictionPort;    }    TransportBindingMeter getUnicastTransportBindingMeter(PeerID peerID, EndpointAddress destinationAddress) {        if (unicastTransportMeter != null) {            return unicastTransportMeter.getTransportBindingMeter(                    (peerID != null) ? peerID.toString() : TransportMeter.UNKNOWN_PEER, destinationAddress);        } else {            return null;        }    }    void messengerReadyEvent(Messenger newMessenger, EndpointAddress connAddr) {        messengerEventListener.messengerReady(new MessengerEvent(this, newMessenger, connAddr));    }    /**     * Getter for property 'server'.     *     * @return Value for property 'server'.     */    IncomingUnicastServer getServer() {        return unicastServer;    }    /**     * Get a write selector from the cache.     *     * @return A write selector.     * @throws InterruptedException If interrupted while waiting for a selector     *                              to become available.     */    Selector getSelector() throws InterruptedException {        synchronized (writeSelectorCache) {            Selector selector = null;            try {                if (!writeSelectorCache.isEmpty()) {                    selector = writeSelectorCache.pop();                }            } catch (EmptyStackException ese) {                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("No write selector available, waiting for one");                }            }            int attempts = 0;            while (selector == null && attempts < 2) {                writeSelectorCache.wait(connectionTimeOut);                try {                    if (!writeSelectorCache.isEmpty()) {                        selector = writeSelectorCache.pop();                    }                } catch (EmptyStackException ese) {                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.log(Level.FINE, "Failed to get a write selector available, waiting for one", ese);                    }                }                attempts++;            }            return selector;        }    }    /**     * Return the <code>Selector</code> to the cache     *     * @param selector the selector to put back into the pool     */    void returnSelector(Selector selector) {        synchronized (writeSelectorCache) {            if (extraWriteSelectors > 0) {                // Allow the selector to be discarded.                extraWriteSelectors--;            } else {                writeSelectorCache.push(selector);                // it does not hurt to notify, even if there are no waiters                writeSelectorCache.notify();            }        }    }    /**     * Waits for incoming data on channels and sends it to the appropriate     * messenger object.     */    private class MessengerSelectorThread implements Runnable {        /**         * {@inheritDoc}         */        public void run() {            try {                if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                    LOG.info("MessengerSelectorThread polling started");                }                while (!isClosed) {                    try {                        int selectedKeys = 0;                        // Update channel registerations.                        updateChannelRegisterations();                        try {                            // this can be interrupted through wakeup                            selectedKeys = messengerSelector.select();                        } catch (CancelledKeyException cke) {                            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                                LOG.log(Level.FINE, "Key was cancelled", cke);                            }                        }                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                            LOG.fine(MessageFormat.format("MessengerSelector has {0} selected keys", selectedKeys));                        }                        if (selectedKeys == 0 && messengerSelector.selectNow() == 0) {                            // We were probably just woken.                            continue;                        }                        Set<SelectionKey> keySet = messengerSelector.selectedKeys();                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                            LOG.fine(MessageFormat.format("KeySet has {0} selected keys", keySet.size()));                        }                        Iterator<SelectionKey> it = keySet.iterator();                        while (it.hasNext()) {                            SelectionKey key = it.next();                            // remove it from the SelectedKeys Set                            it.remove();                            if (key.isValid()) {                                try {                                    if (key.isReadable() && key.channel().isOpen()) {                                        // ensure this channel is not selected again until the thread is done with it                                        // TcpMessenger is expected to reset the interestOps back to OP_READ                                        // Without this, expect multiple threads to execute on the same event, until                                        // the first thread completes reading all data available                                        key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);                                        // get the messenger                                        TcpMessenger msgr = (TcpMessenger) key.attachment();                                        // process the data                                        try {                                            executor.execute(msgr);                                        } catch (RejectedExecutionException re) {                                            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                                                LOG.log(Level.FINE,                                                        MessageFormat.format("Executor rejected task for messenger :{0}", msgr.toString()), re);                                            }                                        }                                    }                                } catch (CancelledKeyException cce) {                                    //in case the key was canceled after the selection                                }                            } else {                                // unregister it, no need to keep invalid/closed channels around                                try {                                    key.channel().close();                                } catch (IOException io) {                                    // avoids breaking out of the selector loop                                }                                key.cancel();                                key = null;                            }                        }                    } catch (ClosedSelectorException cse) {                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                            LOG.fine("IO Selector closed");                        }                    } catch (InterruptedIOException woken) {                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                            LOG.log(Level.FINE, "Thread inturrupted", woken);                        }                    } catch (IOException e1) {                        if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                            LOG.log(Level.WARNING, "An exception occurred while selecting keys", e1);                        }                    } catch (SecurityException e2) {                        if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                            LOG.log(Level.WARNING, "A security exception occurred while selecting keys", e2);                        }                    }                }                // XXX 20070205 bondolo What should we do about the channels                 // that are still registered with the selector and any pending                 // updates?            } catch (Throwable all) {                if (Logging.SHOW_SEVERE && Logging.SHOW_SEVERE) {                    LOG.log(Level.SEVERE, "Uncaught Throwable", all);                }            } finally {                messengerSelectorThread = null;            }        }    }    /**     * Registers the channel with the Read selector and attaches the messenger to the channel     *     * @param channel   the socket channel.     * @param messenger the messenger to attach to the channel.     */    void register(SocketChannel channel, TcpMessenger messenger) {        regisMap.put(messenger, channel);        messengerSelector.wakeup();    }    /**     * Unregisters the channel with the Read selector     *     * @param channel the socket channel.     */    void unregister(SocketChannel channel) {        unregisMap.add(channel);        messengerSelector.wakeup();    }    /**     * Registers all newly accepted and returned (by TcpMessenger) channels.     * Removes all closing TcpMessengers.     */    private synchronized void updateChannelRegisterations() {        if (!regisMap.isEmpty() && Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine(MessageFormat.format("Registering {0} channels with MessengerSelectorThread", regisMap.size()));        }        if (!regisMap.isEmpty()) {            Iterator<Map.Entry<TcpMessenger, SocketChannel>> eachMsgr = regisMap.entrySet().iterator();            while (eachMsgr.hasNext()) {                Map.Entry<TcpMessenger, SocketChannel> anEntry = eachMsgr.next();                TcpMessenger msgr = anEntry.getKey();                SocketChannel channel = anEntry.getValue();                SelectionKey key = channel.keyFor(messengerSelector);                try {                    if (key == null) {                        key = channel.register(messengerSelector, SelectionKey.OP_READ, msgr);                    }                    key.interestOps(key.interestOps() | SelectionKey.OP_READ);                    if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) {                        LOG.finer(MessageFormat.format("Key interestOps on channel {0}, bit set :{1}", channel, key.interestOps()));                    }                } catch (ClosedChannelException e) {                    if (Logging.SHOW_WARNING && LOG.isLoggable(Level.FINE)) {                        LOG.log(Level.FINE, "Failed to register Channel with messenger selector", e);                    }                    // it's best a new messenger is created when a new messenger is requested                    msgr.close();                } catch (CancelledKeyException e) {                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.log(Level.FINE, "Key is already cancelled, removing key from registeration map", e);                    }                } catch (IllegalBlockingModeException e) {                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.log(Level.FINE, "Invalid blocking channel mode, closing messenger", e);                    }                    // messenger state is unknown                    msgr.close();                }                // remove it from the table                eachMsgr.remove();            }        }        // Unregister and close channels.        if (!unregisMap.isEmpty() && Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine(MessageFormat.format("Unregistering {0} channels with MessengerSelectorThread", unregisMap.size()));        }        if (!unregisMap.isEmpty()) {            Iterator<SocketChannel> eachChannel;            synchronized (unregisMap) {                List<SocketChannel> allChannels = new ArrayList<SocketChannel>(unregisMap);                unregisMap.clear();                eachChannel = allChannels.iterator();            }            while (eachChannel.hasNext()) {                SocketChannel aChannel = eachChannel.next();                SelectionKey key = aChannel.keyFor(messengerSelector);                if (null != key) {                    try {                        key.cancel();                    } catch (CancelledKeyException e) {                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                            LOG.log(Level.FINE, "Key is already cancelled, removing key from registeration map", e);                        }                    }                }            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -