⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 relayserverclient.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
        // May be shorter than lease length. Compute real value from expire        // time.        return TimeUtils.toRelativeTimeMillis(expireTime, TimeUtils.timeNow());    }    public void closeClient() {        Messenger messengerToClose;        synchronized (this) {            if (isClosed) {                return;            }            isClosed = true;            if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                LOG.info( "Terminating client:" + "\n\tclient=" + clientAddr + "\tnbMessages=" + messageList.size()                                + "\tmessenger=" + messenger + (messenger == null ? "" : "(c:" + messenger.isClosed() + ")")                                + "\tlease-left=" + TimeUtils.toRelativeTimeMillis(expireTime, TimeUtils.timeNow()) + "\tt=" + (thread != null));            }            messengerToClose = messenger;            expireTime = 0;            messenger = null;            // remove all queued messages if expired            messageList.clear();        }        // We can do that out of sync. It avoids nesting locks.        server.removeClient(clientPeerId, this);        if (messengerToClose != null) {            messengerToClose.close();        }    }    /**     * remove all queued messages.     */    synchronized void flushQueue() {        messageList.clear();    }    public boolean addMessenger(Messenger newMessenger) {        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("addMessenger() " + newMessenger);        }        // make sure we are being passed a valid messenger        if (newMessenger == null || newMessenger.isClosed()) {            return false;        }        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("messenger (" + messenger + ") thread=" + thread);        }        // Unless we change our mind, we'll close the new messenger.        // If we do not keep it, we must close it. Otherwise        // the client on the other end will never know what happened.        // Its connection will be left hanging for a long time.        Messenger messengerToClose = newMessenger;        synchronized (this) {            // Do not use isExpired() here. IsExpired is not supposed to be called            // synchronized. Also  isClosed() is good enough. The handler being            // expired is not a problem; we'll figure it out soon enough and do the            // right thing.            if (!isClosed) {                // use this messenger instead of the old one.                if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                    if (messenger != null) {                        LOG.info("closing messenger replaced by a new one : " + clientAddr);                    }                }                // Swap messengers; we'll close the old one if there was one.                messengerToClose = messenger;                messenger = newMessenger;                if ((thread == null || thread_idle) && ((!messageList.isEmpty()) || (outOfBandMessage != null))) {                    // if we do not already have a thread, start one                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("messageList.size() = " + messageList.size() + " client=" + clientAddr);                    }                    if (thread != null) {                        notify();                    } else {                        thread = new Thread(server.group.getHomeThreadGroup(), this, "Draining queue to " + clientAddr);                        thread.setDaemon(true);                        thread.start();                    }                }                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("messenger (" + messenger + ") thread=" + thread);                }            }        }        // Close whichever messenger out of sync.        // In either case, we claim that we kept the new one.        if (messengerToClose != null) {            messengerToClose.close();        }        return true;    }    public boolean isExpired() {        boolean isExpired = TimeUtils.timeNow() > expireTime;        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("isExpired() = " + isExpired + " client=" + clientAddr);        }        if (isExpired) {            if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                LOG.info("Closing expired client : " + clientAddr);            }            closeClient();        }        return isExpired;    }    public synchronized boolean renewLease() {        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("renewLease() old expireTime = " + expireTime);        }        // It is ok to renew a lease past the expiration time, as long        // as the handler has not been closed yet. So, we do not use        // isExpired().        if (isClosed) {            return false;        }        // As long as there are messages to send, the lease is controlled        // by our ability to send them, not by client renewal.        if (!messageList.isEmpty()) {            return true;        }        expireTime = TimeUtils.toAbsoluteTimeMillis(leaseLength);        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("renewLease() new expireTime = " + expireTime);        }        return true;    }    /**     * add a message to the tail of the list     *     * @param message  the message     * @param outOfBand if true, indicates outbound     * @throws IOException if an io error occurs     */    private void queueMessage(Message message, boolean outOfBand) throws IOException {        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("queueMessage (" + messageList.size() + ") client=" + clientAddr);        }        synchronized (this) {            if (isClosed) {                throw new IOException("Client has been disconnected");            }            if (outOfBand) {                // We have a single oob message pending.                outOfBandMessage = message;            } else {                // We will simply discard the latest msg when the queue is full                // to avoid penalty of dropping earlier reliable message                if (!messageList.offer(message)) {                    if (Logging.SHOW_WARNING) {                        LOG.warning("Dropping relayed message " + message.toString() + " for peer " + clientPeerId);                    }                }            }            // check if a new thread needs to be started.            if ((thread == null) || thread_idle) {                // Normally, if messenger is null we knew it already:                // it becomes null only when we detect that it breaks while                // trying to send. However, let's imagine it's possible that                // we never had one so far. Be carefull that this is not a                // one-time event; we must not keep renewing the short lease;                // that would ruin it's purpose.                if (messenger == null) {                    long newExpireTime = TimeUtils.toAbsoluteTimeMillis(stallTimeout);                    if (expireTime > newExpireTime) {                        expireTime = newExpireTime;                    }                } else {                    // Messenger good.                    // if we do not already have a thread, start one                    if (thread != null) {                        notify();                    } else {                        thread = new Thread(server.group.getHomeThreadGroup(), this, "Draining queue to " + clientAddr);                        thread.setDaemon(true);                        thread.start();                    }                }            }        }        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("done queueMessage (" + messageList.size() + ") client=" + clientAddr);        }        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("messenger (" + messenger + ") thread=" + thread);        }    }    protected EndpointAddress getClientAddress() {        return clientAddr;    }    protected Messenger getMessenger(EndpointAddress srcAddr, EndpointAddress destAddr, boolean outOfBand) {        return new RelayMessenger(srcAddr, destAddr, this, outOfBand);    }    protected static class RelayMessenger extends BlockingMessenger {        private final MessageElement srcAddressElement;        private final RelayServerClient handler;        private boolean outOfBand = false;        // Since we send messages through other messengers that do not necessarily have the        // same destination service and param (usually none), we have to pass these along explicitly        // in all cases. If we just build a destination element for the message it will be overwritten        // by messengers below.        private final String defaultServiceName;        private final String defaultServiceParam;        public RelayMessenger(EndpointAddress srcAddress, EndpointAddress destAddress, RelayServerClient handler, boolean outOfBand) {            // We do not use self destruction            super(handler.server.group.getPeerGroupID(), destAddress, false);            this.defaultServiceName = destAddress.getServiceName();            this.defaultServiceParam = destAddress.getServiceParameter();            this.handler = handler;            this.outOfBand = outOfBand;            this.srcAddressElement = new StringMessageElement(EndpointServiceImpl.MESSAGE_SOURCE_NAME, srcAddress.toString(), null);        }        /*        * The cost of just having a finalize routine is high. The finalizer is        * a bottleneck and can delay garbage collection all the way to heap        * exhaustion. Leave this comment as a reminder to future maintainers.        * Below is the reason why finalize is not needed here.        *        * This is never given to application layers directly. No need        * to close-on-finalize.        *        protected void finalize() {        }        */        /**         * {@inheritDoc}         */        @Override        public boolean isIdleImpl() {            // We do not use self destruction            return false;        }        /**         * {@inheritDoc}         */        @Override        public void closeImpl() {            // Nothing to do. The underlying connection is not affected.            // The messenger will be marked closed by the state machine once completely down; that's it.        }        /**         * {@inheritDoc}         */        @Override        public EndpointAddress getLogicalDestinationImpl() {            // getClientAddress returns a clone of the client's jxta: address.            return handler.getClientAddress();        }        /*        *   {@inheritDoc}        *        * <p/>Send messages. Messages are queued and then processed when there is a transport messenger.        */        @Override        public void sendMessageBImpl(Message message, String serviceName, String serviceParam) throws IOException {            // Set the message with the appropriate src address            message.replaceMessageElement(EndpointServiceImpl.MESSAGE_SOURCE_NS, srcAddressElement);            // load the final destination into the message            EndpointAddress destAddressToUse = getDestAddressToUse(serviceName, serviceParam);            MessageElement dstAddressElement = new StringMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NAME,                    destAddressToUse.toString(), null);            message.replaceMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NS, dstAddressElement);            // simply enqueue the message.            // We clone it, since we pretend it's been sent synchronously.            handler.queueMessage(message.clone(), outOfBand);        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -