📄 relayclient.java
字号:
currentServer = null; // return any alternate relay advertisements return server.alternateRelayAdv; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Connected to " + server); } RouteAdvertisement holdAdv = server.relayAdv.getRouteAdv(); EndpointAddress holdDest = server.logicalAddress; // register this relay server addActiveRelay(holdDest, holdAdv); // maintain the relay server connection alternateRelayAdv = maintainRelayConnection(server); // unregister this relay server removeActiveRelay(holdDest, holdAdv); return alternateRelayAdv; } // FIXME: jice@jxta.org 20030212. This is junk code: that should be a // method of RelayServerConnection and at least not refer to currentServer // other than to assign the reference. protected RdvAdvertisement maintainRelayConnection(RelayServerConnection server) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("maintainRelayConnection() start " + currentServer); } if (server == null) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("RelayConnection() failed at start " + currentServer); } return null; } synchronized (this) { long currentTime = System.currentTimeMillis(); long renewLeaseAt = currentServer.leaseObtainedAt + currentServer.leaseLength / 3; long waitTimeout = 0; // This will be true if we need to do the first lease renewal early // (that is at the time of the next connection check). // We'll do that if we did not know the relay server's adv (seed). // In that case we told the relay server to send us its own // adv, else we told it to send us some alternate adv (we have to // chose). In the former case, we want to do a lease connect // request soon so that the server has an opportunity to send us // the alternate adv that we did not get during initial connection. boolean earlyRenew = currentServer.seeded; while (currentServer != null && !isRelayConnectDone()) { // calculate how long to wait waitTimeout = renewLeaseAt - currentTime; // check that the waitTimeout is not greater than the messengerPollInterval // We want to make sure that we poll. Most of the time it cost nothing. // Also, if we urgently need to renew our lease we may wait // less, but if we fail to get our lease renewed in time, the // delay may become negative. In that case we do not want // to start spinning madly. The only thing we can do is just // wait some arbitrary length of time for the lease to be // renewed. (If that gets badly overdue, we should probably // give up on that relay server, though). if (waitTimeout > messengerPollInterval || waitTimeout < 0) { waitTimeout = messengerPollInterval; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("waitTimeout=" + waitTimeout + " server=" + currentServer); } try { wait(waitTimeout); } catch (InterruptedException e) { // ignore interrupt if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("wait got interrupted early ", e); } } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("wait done, server=" + currentServer); } // make sure the server did not disconnect while waiting if (currentServer == null) { break; } // get the current time currentTime = System.currentTimeMillis(); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("check messenger " + currentServer); } // check if the messenger is still open if (currentServer.messenger.isClosed()) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Server connection broken"); } // See if we can re-open, that happens often. // That's a reason to renew the connection, // Not a reason to give up on the server yet. // Note we do not renew the lease. This is a transient // and if the server forgot about us, it will respond // to the connection alone. Otherwise, we'd rather avoid // getting a response, since in some cases http connections // close after each received message. if (!currentServer.createMessenger(currentServer.leaseLength)) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Server connection NOT re-established"); } // lost connection to relay server currentServer = null; break; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Server connection re-established"); } // getMessenger asks for a new lease. // In the meantime, we'll just assume our old lease is // still current and that the messenger breakage was just // a transient. if (!isRelayConnectDone()) { continue; } } // We've been asked to leave. Be nice and tell the // server about it. if (isRelayConnectDone()) { break; } // check if the lease needs to be renewed renewLeaseAt = currentServer.leaseObtainedAt + currentServer.leaseLength / 3; if (currentTime >= renewLeaseAt || earlyRenew) { earlyRenew = false; if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("renew lease " + currentServer); } // If we do not receive any response to our lease renewals // (that is the response is overdue badly), then we give // up and try another relayServer. We give up after 4 minutes // because if we go as far as 5 we start overshooting other // timeouts such as the local peer becoming a rdv in a sub-group. // This later timeout is usually set to 5 minutes or more. if ((currentTime > currentServer.leaseObtainedAt + currentServer.leaseLength / 3 + 4 * TimeUtils.AMINUTE) || (currentServer.sendConnectMessage(leaseLengthToRequest) == false)) { if (LOG.isEnabledFor(Level.INFO)) { LOG.info("renew lease failed" + currentServer); } if (currentServer.messenger != null) { currentServer.messenger.close(); } currentServer.messenger = null; currentServer.peerId = null; currentServer.leaseLength = 0; currentServer.leaseObtainedAt = 0; currentServer.relayAdv = null; currentServer = null; break; } } } } if (isRelayConnectDone() && currentServer != null) { currentServer.sendDisconnectMessage(); if (currentServer.messenger != null) { currentServer.messenger.close(); } currentServer.messenger = null; currentServer.peerId = null; currentServer.leaseLength = 0; currentServer.leaseObtainedAt = 0; currentServer.relayAdv = null; // Make sure that we will not suggest an alternate // since we're asked to terminate. currentServer.alternateRelayAdv = null; currentServer = null; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("maintainRelayConnection() terminated " + currentServer); } return server.alternateRelayAdv; } protected synchronized void handleResponse(Message message, EndpointAddress dstAddr) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("handleResponse " + currentServer); } // ignore all responses if there is not a current server if (currentServer == null) { return; } // get the request, make it lowercase so that case is ignored String response = RelayTransport.getString(message, RelayTransport.RESPONSE_ELEMENT); if (response == null) { return; } response = response.toLowerCase(); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("response = " + response); } // check if a relay advertisement was included RdvAdvertisement relayAdv = null; MessageElement advElement = message.getMessageElement(RelayTransport.RELAY_NS, RelayTransport.RELAY_ADV_ELEMENT); if (null != advElement) { try { Advertisement adv = AdvertisementFactory.newAdvertisement(MimeMediaType.XMLUTF8, advElement.getStream()); if (adv instanceof RdvAdvertisement) { relayAdv = (RdvAdvertisement) adv; } } catch (IOException e) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Could not read Relay RdvAdvertisement", e); } } } if (relayAdv != null) { // publish the relay advertisement for future use // NOTE: relayAdvs like padv include routes that do not have // a PID in the dest (because it is redundant) // We do publish it that way, but as soon as it's published, we // go and ADD the destPID in the embedded route because we'll // fetch that route adv for various purposes and it has to be // able to stand alone...this missing PID business is rather ugly. if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Got relay adv for : " + relayAdv.getPeerID()); } try { DiscoveryService discovery = group.getDiscoveryService(); if (discovery != null) { discovery.publish(relayAdv, DAY_EXPIRATION, DAY_EXPIRATION); } } catch (IOException e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Could not publish Relay RdvAdvertisement", e); } } } // WATCHOUT: this is not a pid, just the unique string portion. String serverPeerId = dstAddr.getServiceParameter(); // only process the request if a client peer id was sent if (serverPeerId == null) { return; } // ignore all responses that are not from the current server if (!serverPeerId.equals(currentServer.peerId)) { return; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("serverPeerId = " + serverPeerId); } // Figure out which response it is if (RelayTransport.CONNECTED_RESPONSE.equals(response)) { // Connect Response if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("connected response for " + currentServer); } String responseLeaseString = RelayTransport.getString(message, RelayTransport.LEASE_ELEMENT); long responseLease = 0; if (responseLeaseString != null) { try { responseLease = Long.parseLong(responseLeaseString); } catch (NumberFormatException e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("could not parse response lease string", e); } } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -