📄 relayclient.java
字号:
if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("renew lease " + currentServer); } // If we do not receive any response to our lease renewals // (that is the response is overdue badly), then we give // up and try another relayServer. We give up after 4 minutes // because if we go as far as 5 we start overshooting other // timeouts such as the local peer becoming a rdv in a sub-group. // This later timeout is usually set to 5 minutes or more. if ((currentTime > currentServer.leaseObtainedAt + currentServer.leaseLength / 3 + 4 * TimeUtils.AMINUTE) || (!currentServer.sendConnectMessage(leaseLengthToRequest))) { if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("renew lease failed" + currentServer); } if (currentServer.messenger != null) { currentServer.messenger.close(); } currentServer.messenger = null; currentServer.peerId = null; currentServer.leaseLength = 0; currentServer.leaseObtainedAt = 0; currentServer.relayAdv = null; currentServer = null; break; } } } } if (isRelayConnectDone() && currentServer != null) { currentServer.sendDisconnectMessage(); if (currentServer.messenger != null) { currentServer.messenger.close(); } currentServer.messenger = null; currentServer.peerId = null; currentServer.leaseLength = 0; currentServer.leaseObtainedAt = 0; currentServer.relayAdv = null; // Make sure that we will not suggest an alternate // since we're asked to terminate. currentServer.alternateRelayAdv = null; currentServer = null; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("maintainRelayConnection() terminated " + currentServer); } return server.alternateRelayAdv; } protected synchronized void handleResponse(Message message, EndpointAddress dstAddr) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("handleResponse " + currentServer); } // ignore all responses if there is not a current server if (currentServer == null) { return; } // get the request, make it lowercase so that case is ignored String response = RelayTransport.getString(message, RelayTransport.RESPONSE_ELEMENT); if (response == null) { return; } response = response.toLowerCase(); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("response = " + response); } // check if a relay advertisement was included RdvAdvertisement relayAdv = null; MessageElement advElement = message.getMessageElement(RelayTransport.RELAY_NS, RelayTransport.RELAY_ADV_ELEMENT); if (null != advElement) { try { XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(advElement); Advertisement adv = AdvertisementFactory.newAdvertisement(asDoc); if (adv instanceof RdvAdvertisement) { relayAdv = (RdvAdvertisement) adv; } } catch (IOException e) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Could not read Relay RdvAdvertisement", e); } } } // WATCHOUT: this is not a pid, just the unique string portion. String serverPeerId = dstAddr.getServiceParameter(); // only process the request if a client peer id was sent if (serverPeerId == null) { return; } // ignore all responses that are not from the current server if (!serverPeerId.equals(currentServer.peerId)) { return; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("serverPeerId = " + serverPeerId); } // Figure out which response it is if (RelayTransport.CONNECTED_RESPONSE.equals(response)) { // Connect Response if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("connected response for " + currentServer); } String responseLeaseString = RelayTransport.getString(message, RelayTransport.LEASE_ELEMENT); long responseLease = 0; if (responseLeaseString != null) { try { responseLease = Long.parseLong(responseLeaseString); } catch (NumberFormatException e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "could not parse response lease string", e); } } } // make sure the lease is valid if (responseLease <= 0) { // invalid lease value return; } // update the lease values currentServer.leaseLength = responseLease; currentServer.leaseObtainedAt = System.currentTimeMillis(); // Since we got the lease, if we requested a queue flush, it's // now done. We never send it with a new messenger creation, but // when the server already has us as a client it does not respond // to connections through messenger creation, so we're sure we // will have to send an explicit connect message before we get // a response. So, we're sure it's done if it was needed. currentServer.flushNeeded = false; if (relayAdv != null) { // Set it only if it is the server's own. Else it got // published. Still set alternateRelayAdv so that we // can return something that could be usefull when this // connection breaks. PeerID pidOfAdv = relayAdv.getPeerID(); String pidOfAdvUnique = pidOfAdv.getUniqueValue().toString(); if (currentServer.peerId.equals(pidOfAdvUnique)) { currentServer.relayAdv = relayAdv.getRouteAdv(); // Fix the embedded route adv ! currentServer.relayAdv.setDestPeerID(pidOfAdv); } else { currentServer.alternateRelayAdv = relayAdv; } } notifyAll(); } else if (RelayTransport.DISCONNECTED_RESPONSE.equals(response)) { // Disconnect Response if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("disconnected from " + currentServer); } // If our request was denied, the adv that came back is // always an alternate one. currentServer.alternateRelayAdv = relayAdv; if (currentServer.messenger != null) { currentServer.messenger.close(); } currentServer.messenger = null; currentServer.peerId = null; currentServer.leaseLength = 0; currentServer.leaseObtainedAt = 0; currentServer.relayAdv = null; currentServer = null; notifyAll(); } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("response handled for " + currentServer); } } static class RelayServerConnection { final RelayClient client; Messenger messenger = null; EndpointAddress logicalAddress = null; String peerId = null; long leaseLength = 0; long leaseObtainedAt = 0; // If seeded out of a raw address, we have relayAddress. // relayAdv comes only later. public RouteAdvertisement relayAdv = null; EndpointAddress relayAddress = null; RdvAdvertisement alternateRelayAdv = null; boolean seeded = false; boolean flushNeeded = true; // true until we know it's been done protected RelayServerConnection(RelayClient client, EndpointAddress addr) { this.client = client; relayAddress = new EndpointAddress(addr, null, null); seeded = true; } protected RelayServerConnection(RelayClient client, RouteAdvertisement relayAdv) { this.client = client; this.relayAdv = relayAdv; } protected boolean createMessenger(long leaseLengthToRequest) { // make sure the old messenger is closed if (messenger != null) { messenger.close(); messenger = null; } List<String> endpointAddresses = null; // check for a relay advertisement if (relayAdv != null) { AccessPointAdvertisement accessPointAdv = relayAdv.getDest(); if (accessPointAdv != null) { endpointAddresses = accessPointAdv.getVectorEndpointAddresses(); } } else { // silly but if we use getVetorEndpointAddresses, we get // strings. It's realy simpler to have only one kind of obj // inthere. endpointAddresses = new ArrayList<String>(1); endpointAddresses.add(relayAddress.toString()); } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("createMessenger to " + endpointAddresses); } // make sure we found some endpoint addresses to try if (endpointAddresses == null) { return false; } // try each endpoint address until one is successful for (String s : endpointAddresses) { if (s == null) { continue; } EndpointAddress addr = new EndpointAddress(s); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("find transport for " + addr); } // get the list of messengers on this endpoint Iterator transports = client.endpoint.getAllMessageTransports(); while (transports.hasNext() && messenger == null) { MessageTransport transport = (MessageTransport) transports.next(); // only try transports that are senders and allow routing if (transport instanceof MessageSender && ((MessageSender) transport).allowsRouting()) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("try transport " + transport); } if (addr.getProtocolName().equals(transport.getProtocolName())) { // NOTE: here we're creating a messenger. // For risk management reason, we refrain from // including the flush request at this time in // this. There is the possibility that the // connection will be repeatedly established // by the transport in our bakck, and would keep // including the flush request ! Normaly this // does not matter because the server should // disregard it when it come in that way, but // still, let's be defensive. We will still send // the flush in a subsequent explicit message. String reqStr = RelayTransport.createConnectString(leaseLengthToRequest, relayAdv == null, false); // NOTE: this is simulating address mangling by CrossgroupMessenger. // The real service param is after the "/" in the below serviceParam arg. EndpointAddress addrToUse = new EndpointAddress(addr, "EndpointService:" + client.groupName , client.serviceName + "/" + reqStr); messenger = ((MessageSender) transport).getMessenger(addrToUse, null); if (messenger != null && messenger.isClosed()) { messenger = null; } if (messenger != null) { logicalAddress = messenger.getLogicalDestinationAddress(); // We're using a known adv, which means that // we did not ask to get the adv back. // Make sure that we do not keep going with // an adv for the wrong peer. That can happen. if (relayAdv != null && !addr2pid(logicalAddress).equals(relayAdv.getDestPeerID())) { // oops, wrong guy ! messenger.close(); messenger = null; logicalAddress = null; } // In case it was not given, set relayAddress // for toString purposes. relayAddress = addr; } } } } } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("messenger=" + messenger); } return (messenger != null); } protected boolean sendConnectMessage(long leaseLengthToRequest) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -