📄 relayserverclient.java
字号:
// May be shorter than lease length. Compute real value from expire // time. return TimeUtils.toRelativeTimeMillis(expireTime, TimeUtils.timeNow()); } public void closeClient() { Messenger messengerToClose; synchronized (this) { if (isClosed) { return; } isClosed = true; if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info( "Terminating client:" + "\n\tclient=" + clientAddr + "\tnbMessages=" + messageList.size() + "\tmessenger=" + messenger + (messenger == null ? "" : "(c:" + messenger.isClosed() + ")") + "\tlease-left=" + TimeUtils.toRelativeTimeMillis(expireTime, TimeUtils.timeNow()) + "\tt=" + (thread != null)); } messengerToClose = messenger; expireTime = 0; messenger = null; // remove all queued messages if expired messageList.clear(); } // We can do that out of sync. It avoids nesting locks. server.removeClient(clientPeerId, this); if (messengerToClose != null) { messengerToClose.close(); } } /** * remove all queued messages. */ synchronized void flushQueue() { messageList.clear(); } public boolean addMessenger(Messenger newMessenger) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("addMessenger() " + newMessenger); } // make sure we are being passed a valid messenger if (newMessenger == null || newMessenger.isClosed()) { return false; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("messenger (" + messenger + ") thread=" + thread); } // Unless we change our mind, we'll close the new messenger. // If we do not keep it, we must close it. Otherwise // the client on the other end will never know what happened. // Its connection will be left hanging for a long time. Messenger messengerToClose = newMessenger; synchronized (this) { // Do not use isExpired() here. IsExpired is not supposed to be called // synchronized. Also isClosed() is good enough. The handler being // expired is not a problem; we'll figure it out soon enough and do the // right thing. if (!isClosed) { // use this messenger instead of the old one. if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { if (messenger != null) { LOG.info("closing messenger replaced by a new one : " + clientAddr); } } // Swap messengers; we'll close the old one if there was one. messengerToClose = messenger; messenger = newMessenger; if ((thread == null || thread_idle) && ((!messageList.isEmpty()) || (outOfBandMessage != null))) { // if we do not already have a thread, start one if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("messageList.size() = " + messageList.size() + " client=" + clientAddr); } if (thread != null) { notify(); } else { thread = new Thread(server.group.getHomeThreadGroup(), this, "Draining queue to " + clientAddr); thread.setDaemon(true); thread.start(); } } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("messenger (" + messenger + ") thread=" + thread); } } } // Close whichever messenger out of sync. // In either case, we claim that we kept the new one. if (messengerToClose != null) { messengerToClose.close(); } return true; } public boolean isExpired() { boolean isExpired = TimeUtils.timeNow() > expireTime; if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("isExpired() = " + isExpired + " client=" + clientAddr); } if (isExpired) { if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("Closing expired client : " + clientAddr); } closeClient(); } return isExpired; } public synchronized boolean renewLease() { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("renewLease() old expireTime = " + expireTime); } // It is ok to renew a lease past the expiration time, as long // as the handler has not been closed yet. So, we do not use // isExpired(). if (isClosed) { return false; } // As long as there are messages to send, the lease is controlled // by our ability to send them, not by client renewal. if (!messageList.isEmpty()) { return true; } expireTime = TimeUtils.toAbsoluteTimeMillis(leaseLength); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("renewLease() new expireTime = " + expireTime); } return true; } /** * add a message to the tail of the list * * @param message the message * @param outOfBand if true, indicates outbound * @throws IOException if an io error occurs */ private void queueMessage(Message message, boolean outOfBand) throws IOException { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("queueMessage (" + messageList.size() + ") client=" + clientAddr); } synchronized (this) { if (isClosed) { throw new IOException("Client has been disconnected"); } if (outOfBand) { // We have a single oob message pending. outOfBandMessage = message; } else { // We will simply discard the latest msg when the queue is full // to avoid penalty of dropping earlier reliable message if (!messageList.offer(message)) { if (Logging.SHOW_WARNING) { LOG.warning("Dropping relayed message " + message.toString() + " for peer " + clientPeerId); } } } // check if a new thread needs to be started. if ((thread == null) || thread_idle) { // Normally, if messenger is null we knew it already: // it becomes null only when we detect that it breaks while // trying to send. However, let's imagine it's possible that // we never had one so far. Be carefull that this is not a // one-time event; we must not keep renewing the short lease; // that would ruin it's purpose. if (messenger == null) { long newExpireTime = TimeUtils.toAbsoluteTimeMillis(stallTimeout); if (expireTime > newExpireTime) { expireTime = newExpireTime; } } else { // Messenger good. // if we do not already have a thread, start one if (thread != null) { notify(); } else { thread = new Thread(server.group.getHomeThreadGroup(), this, "Draining queue to " + clientAddr); thread.setDaemon(true); thread.start(); } } } } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("done queueMessage (" + messageList.size() + ") client=" + clientAddr); } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("messenger (" + messenger + ") thread=" + thread); } } protected EndpointAddress getClientAddress() { return clientAddr; } protected Messenger getMessenger(EndpointAddress srcAddr, EndpointAddress destAddr, boolean outOfBand) { return new RelayMessenger(srcAddr, destAddr, this, outOfBand); } protected static class RelayMessenger extends BlockingMessenger { private final MessageElement srcAddressElement; private final RelayServerClient handler; private boolean outOfBand = false; // Since we send messages through other messengers that do not necessarily have the // same destination service and param (usually none), we have to pass these along explicitly // in all cases. If we just build a destination element for the message it will be overwritten // by messengers below. private final String defaultServiceName; private final String defaultServiceParam; public RelayMessenger(EndpointAddress srcAddress, EndpointAddress destAddress, RelayServerClient handler, boolean outOfBand) { // We do not use self destruction super(handler.server.group.getPeerGroupID(), destAddress, false); this.defaultServiceName = destAddress.getServiceName(); this.defaultServiceParam = destAddress.getServiceParameter(); this.handler = handler; this.outOfBand = outOfBand; this.srcAddressElement = new StringMessageElement(EndpointServiceImpl.MESSAGE_SOURCE_NAME, srcAddress.toString(), null); } /* * The cost of just having a finalize routine is high. The finalizer is * a bottleneck and can delay garbage collection all the way to heap * exhaustion. Leave this comment as a reminder to future maintainers. * Below is the reason why finalize is not needed here. * * This is never given to application layers directly. No need * to close-on-finalize. * protected void finalize() { } */ /** * {@inheritDoc} */ @Override public boolean isIdleImpl() { // We do not use self destruction return false; } /** * {@inheritDoc} */ @Override public void closeImpl() { // Nothing to do. The underlying connection is not affected. // The messenger will be marked closed by the state machine once completely down; that's it. } /** * {@inheritDoc} */ @Override public EndpointAddress getLogicalDestinationImpl() { // getClientAddress returns a clone of the client's jxta: address. return handler.getClientAddress(); } /* * {@inheritDoc} * * <p/>Send messages. Messages are queued and then processed when there is a transport messenger. */ @Override public void sendMessageBImpl(Message message, String serviceName, String serviceParam) throws IOException { // Set the message with the appropriate src address message.replaceMessageElement(EndpointServiceImpl.MESSAGE_SOURCE_NS, srcAddressElement); // load the final destination into the message EndpointAddress destAddressToUse = getDestAddressToUse(serviceName, serviceParam); MessageElement dstAddressElement = new StringMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NAME, destAddressToUse.toString(), null); message.replaceMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NS, dstAddressElement); // simply enqueue the message. // We clone it, since we pretend it's been sent synchronously. handler.queueMessage(message.clone(), outOfBand); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -