⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 relayserverclient.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
        return clientPeerId + "," + (messageList == null ? -1 : messageList.getCurrentInQueue()) + "," + (messenger == null ? "-m" : "+m") + ","                + (expireTime - System.currentTimeMillis());    }        protected int getQueueSize() {        return messageList.getCurrentInQueue();    }        public long getLeaseRemaining() {        // May be shorter than lease length. Compute real value from expire        // time.        return expireTime - System.currentTimeMillis();    }        public void closeClient() {                Messenger messengerToClose = null;                synchronized (this) {            if (isClosed) {                return;            }                        isClosed = true;                        if (LOG.isEnabledFor(Level.INFO)) {                LOG.info(                        "Terminating client:" + "\n\tclient=" + clientAddr + "\tnbMessages=" + messageList.getCurrentInQueue() + "\tmessenger="                        + messenger + (messenger == null ? "" : "(c:" + messenger.isClosed() + ")") + "\tlease-left="                        + (expireTime - System.currentTimeMillis()) + "\tt=" + (thread != null));            }                        messengerToClose = messenger;                        expireTime = 0;            messenger = null;                        // remove all queued messages if expired            messageList.clear();        }                // We can do that out of sync. It avoids nesting locks.        server.removeClient(clientPeerId, this);        if (messengerToClose != null) {            messengerToClose.close();        }    }        /**     * remove all queued messages.     **/    synchronized void flushQueue() {        messageList.clear();    }        public boolean addMessenger(Messenger newMessenger) {                if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("addMessenger() " + newMessenger);        }                // make sure we are being passed a valid messenger        if (newMessenger == null || newMessenger.isClosed()) {            return false;        }                if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("messenger (" + messenger + ") thread=" + thread);        }                // Unless we change our mind, we'll close the new messenger.        // If we do not keep it, we must close it. Otherwise        // the client on the other end will never know what happened.        // Its connection will be left hanging for a long time.                Messenger messengerToClose = newMessenger;                synchronized (this) {            // Do not use isExpired() here. IsExpired is not supposed to be called            // synchronized. Also  isClosed() is good enough. The handler being            // expired is not a problem; we'll figure it out soon enough and do the            // right thing.                        if (!isClosed) {                // use this messenger instead of the old one.                                if (LOG.isEnabledFor(Level.INFO)) {                    if (messenger != null) {                        LOG.info("closing messenger replaced by a new one : " + clientAddr);                    }                }                                // Swap messengers; we'll close the old one if there was one.                                messengerToClose = messenger;                messenger = newMessenger;                                if ((thread == null || thread_idle) && ((messageList.getCurrentInQueue() > 0) || (outOfBandMessage != null))) {                                        // if we do not already have a thread, start one                                        if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug("messageList.getCurrentInQueue() = " + messageList.getCurrentInQueue() + " client=" + clientAddr);                    }                                        if (thread != null) {                        notify();                    } else {                        thread = new Thread(server.group.getHomeThreadGroup(), this, "Draining queue to " + clientAddr);                        thread.setDaemon(true);                        thread.start();                    }                }                                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("messenger (" + messenger + ") thread=" + thread);                }            }        }                // Close whichever messenger out of sync.        // In either case, we claim that we kept the new one.                if (messengerToClose != null) {            messengerToClose.close();        }                return true;    }        public boolean isExpired() {                boolean isExpired = (System.currentTimeMillis() > expireTime);                if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("isExpired() = " + isExpired + " client=" + clientAddr);        }                if (isExpired) {            if (LOG.isEnabledFor(Level.INFO)) {                LOG.info("Closing expired client : " + clientAddr);            }            closeClient();        }                return isExpired;    }        public synchronized boolean renewLease() {        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("renewLease() old expireTime = " + expireTime);        }                // It is ok to renew a lease past the expiration time, as long        // as the handler has not been closed yet. So, we do not use        // isExpired().                if (isClosed) {            return false;        }                // As long as there are messages to send, the lease is controlled        // by our ability to send them, not by client renewal.                if (messageList.getCurrentInQueue() > 0) {            return true;        }                expireTime = System.currentTimeMillis() + leaseLength;                if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("renewLease() new expireTime = " + expireTime);        }        return true;    }        /**     * add a message to the tail of the list     **/    private void queueMessage(Message message, boolean outOfBand) throws IOException {        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("queueMessage (" + messageList.getCurrentInQueue() + ") client=" + clientAddr);        }                synchronized (this) {                        if (isClosed) {                throw new IOException("Client has been disconnected");            }                        if (outOfBand) {                // We have a single oob message pending.                outOfBandMessage = message;            } else {                messageList.push(message);            }                        // check if a new thread needs to be started.            if ((thread == null) || thread_idle) {                                // Normally, if messenger is null we knew it already:                // it becomes null only when we detect that it breaks while                // trying to send. However, let's imagine it's possible that                // we never had one so far. Be carefull that this is not a                // one-time event; we must not keep renewing the short lease;                // that would ruin it's purpose.                                if (messenger == null) {                                        long newExpireTime = System.currentTimeMillis() + stallTimeout;                                        if (expireTime > newExpireTime) {                        expireTime = newExpireTime;                    }                                    } else {                                        // Messenger good.                    // if we do not already have a thread, start one                    if (thread != null) {                        notify();                    } else {                        thread = new Thread(server.group.getHomeThreadGroup(), this, "Draining queue to " + clientAddr);                        thread.setDaemon(true);                        thread.start();                    }                }            }        }                if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("done queueMessage (" + messageList.getCurrentInQueue() + ") client=" + clientAddr);        }                if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("messenger (" + messenger + ") thread=" + thread);        }    }        protected EndpointAddress getClientAddress() {        return (EndpointAddress) clientAddr.clone();    }        protected Messenger getMessenger(EndpointAddress srcAddr, EndpointAddress destAddr, boolean outOfBand) {        return new RelayMessenger(srcAddr, destAddr, this, outOfBand);    }        protected static class RelayMessenger extends BlockingMessenger {        private final MessageElement srcAddressElement;        private final RelayServerClient handler;        private boolean outOfBand = false;                // Since we send messages through other messengers that do not necessarily have the        // same destination service and param (usually none), we have to pass these along explicitly        // in all cases. If we just build a destination element for the message it will be overwritten        // by messengers below.        private final String defaultServiceName;        private final String defaultServiceParam;                public RelayMessenger(EndpointAddress srcAddress,                EndpointAddress destAddress,                RelayServerClient handler,                boolean outOfBand) {                        // We do not use self destruction            super(handler.server.group.getPeerGroupID(), destAddress, false);                        this.defaultServiceName = destAddress.getServiceName();            this.defaultServiceParam = destAddress.getServiceParameter();            this.handler = handler;            this.outOfBand = outOfBand;                        this.srcAddressElement = new StringMessageElement(EndpointServiceImpl.MESSAGE_SOURCE_NAME, srcAddress.toString(), null);        }                /*         * The cost of just having a finalize routine is high. The finalizer is         * a bottleneck and can delay garbage collection all the way to heap         * exhaustion. Leave this comment as a reminder to future maintainers.         * Below is the reason why finalize is not needed here.         *         * This is never given to application layers directly. No need         * to close-on-finalize.         *                  protected void finalize() {         }                  */                /**         *   {@inheritDoc}         **/        public boolean isIdleImpl() {            // We do not use self destruction            return false;        }                /**         *   {@inheritDoc}         **/        public void closeImpl() {// Nothing to do. The underlying connection is not affected.            // The messenger will be marked closed by the state machine once completely down; that's it.        }                /**         *   {@inheritDoc}         **/        public EndpointAddress getLogicalDestinationImpl() {            // getClientAddress returns a clone of the client's jxta: address.            return handler.getClientAddress();        }                /*         *   {@inheritDoc}         *         * <p/>Send messages. Messages are queued and then processed when there is a transport messenger.         */        public boolean sendMessageBImpl(Message message, String serviceName, String serviceParam)            throws IOException {                        // Set the message with the appropriate src address            message.replaceMessageElement(EndpointServiceImpl.MESSAGE_SOURCE_NS, srcAddressElement);                        // load the final destination into the message            EndpointAddress destAddressToUse = getDestAddressToUse(serviceName, serviceParam);                        MessageElement dstAddressElement = new StringMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NAME, destAddressToUse.toString(),                    (MessageElement) null);                        message.replaceMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NS, dstAddressElement);                        // simply enqueue the message.            // We clone it, since we pretend it's been sent synchronously.            handler.queueMessage((Message) message.clone(), outOfBand);                        // The need for a return value is historical (so that many implementations of            // this method did not have to change - other than the name).            return true;        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -