📄 relayserver.java
字号:
} } } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine( "request lease string = " + requestedLeaseString + "\treturn relay adv = " + returnRelayAdv + "\n\treturn other relay adv = " + returnOtherRelayAdv + "\tflush queue = " + flushQueue); } if (requestedLeaseString != null) { try { requestedLease = Long.parseLong(requestedLeaseString); } catch (NumberFormatException e) { if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("could not parse requested lease string"); } } if (requestedLease > maxLeaseDuration) { requestedLease = maxLeaseDuration; } } // process the connect request EndpointAddress clientAddr = new EndpointAddress("jxta", clientPeerId, serviceName, peerId); // If we have a messenger, the clientHandler gets it. // If the client handler did not already exist, it will be // created only if we pass a messenger. We can no-longer create // new clients without an incoming messenger. We used to get one // from the router but no-longer. Now initial lease requests must // come as part of the messenger creation. RelayServerClient handler = addClient(clientPeerId, requestedLease, messenger, flushQueue); if (handler != null) { // the client was added, send a connected response if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("added client " + clientPeerId); } // Now get a messenger that goes through the handler and // sends messages out-of-band (and internal perk). // jice@jxta.org - 20021227 all this code is getting ridiculous // it has to be re-organized. Addind the outOfBand feature // to all RelayMessengers just for that is overkill. This // just a temporary patch. The real fix would be to respond // straight with the messenger we have. Unfortunately, sometimes // we have to respond without a messenger in our hands because // sending a message over an explicit connection is the only // way for existing clients to ask for a response when they // reconnect. We would need to change the protocol and add an // "initial connection" request type to fix that. messenger = handler.getMessenger(publicAddress, clientAddr, true); responseMessage = RelayTransport.createConnectedMessage(handler.getLeaseRemaining()); // For protocol compatibility reasons, returnRelayAdv realy // means "return your own because I do not know it". // If returnOtherRelayAdv is true, then, we will return one // selected among those we know, for the enlightenment of the // other party. // If neither is true, we'll return no adv at all in order not // to confuse existing clients. RdvAdvertisement relayAdv = null; if (returnRelayAdv) { relayAdv = createRdvAdvertisement(group.getPeerAdvertisement(), serviceName); } else if (returnOtherRelayAdv) { relayAdv = relayServerCache.getRandomCacheAdv(); } if (relayAdv != null) { XMLDocument asDoc = (XMLDocument) relayAdv.getDocument(MimeMediaType.XMLUTF8); MessageElement relayAdvElement = new TextDocumentMessageElement(RelayTransport.RELAY_ADV_ELEMENT, asDoc, null); responseMessage.addMessageElement(RelayTransport.RELAY_NS, relayAdvElement); } } else { // We can't keep the messenger. // the client was not added, send a disconnected response if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("could not add client " + clientPeerId); } // We do not get a messenger for ourselves here, so // just get one from the router ourselves, if we have to. // and can. if (messenger == null) { // If we did not get one and manage to obtain one // from the endpoint, we can use it in-line, but // we must close it. (The only case). messenger = endpointService.getMessenger(clientAddr); if (messenger != null) { closeMessenger = true; } } else { // This is the incoming messenger. We cannot use it // synchronously. See, the use of BGSend, below. rawMessenger = true; } responseMessage = RelayTransport.createDisconnectedMessage(); // add the relay advertisement of another know relay for the client to try RdvAdvertisement relayAdv = relayServerCache.getRandomCacheAdv(); if (relayAdv != null) { XMLDocument asDoc = (XMLDocument) relayAdv.getDocument(MimeMediaType.XMLUTF8); MessageElement relayAdvElement = new TextDocumentMessageElement(RelayTransport.RELAY_ADV_ELEMENT, asDoc, null); responseMessage.addMessageElement(RelayTransport.RELAY_NS, relayAdvElement); } } } else if (RelayTransport.DISCONNECT_REQUEST.equals(request)) { // Disconnect Request, don't send a response if (clientPeerId != null) { closingHandler = removeClient(clientPeerId); if (closingHandler != null) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("removed client " + clientPeerId); } } } } else if (RelayTransport.PID_REQUEST.equals(request)) { // Generate a PeerID in the same group as our PeerID. // The group which my peerID stems from is not necessarily // the group where I am running (more likely it is the net peer // group). Rather than guessing, get the group from our own PID. PeerGroupID groupOfMyPid = (PeerGroupID) group.getPeerID().getPeerGroupID(); String pidStr = IDFactory.newPeerID(groupOfMyPid).toString(); responseMessage = RelayTransport.createPIDResponseMessage(pidStr); // If there is a raw incoming messenger, that's what we // use. Else, we won't respond. rawMessenger = true; } // if there is a messenger and a response, send it if (messenger != null && responseMessage != null) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("sending response to client " + clientPeerId); } // If rawMessenger, then this is the incoming // messenger brought in by messengerReady. In that case, // be carefull. It is synchronous and it could block this // here thread until the message can be sent. Which could // possibly imply that this here method returns...dead lock. // See HttpMessageServlet: messengerReady is called by // the same thread that subsequently picks up messages from // the BCMessenger. So, spawn a thread to reply. // FIXME: eventualy we should start replacing some listener // based code with state machines and event queues. if (rawMessenger) { // BGSend will *not* close the messenger after use // Because incoming messengers do not need to be closed. new BGSend(messenger, responseMessage, serviceName, peerId); } else { try { messenger.sendMessage(responseMessage, serviceName, peerId); } catch (IOException e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Could not send response message to " + clientPeerId, e); } } } } if (closeMessenger) { messenger.close(); } if (closingHandler != null) { closingHandler.closeClient(); } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("relayedClients.size()=" + relayedClients.size()); } } private RelayServerClient getClient(String clientPeerId) { RelayServerClient handler; synchronized (relayedClients) { handler = relayedClients.get(clientPeerId); } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("getClient(" + clientPeerId + ") = " + handler); } return handler; } // Add client is idempotent. It can be called for a client that already // exists. The flushqueue option instructs to clear the queue if the client // exists. private RelayServerClient addClient(String clientPeerId, long requestedLease, Messenger messenger, boolean flushQueue) { RelayServerClient handler; boolean isNewClient = false; if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("addClient(" + clientPeerId + ")"); } synchronized (relayedClients) { // check if this client is already registered handler = relayedClients.get(clientPeerId); if (handler == null) { // make sure the maximum number of clients has not been reached // and make sure that we have a messenger to give to the new // clientHandler. if ((relayedClients.size() < maxClients) && (messenger != null) && !messenger.isClosed()) { // create a new handler handler = new RelayServerClient(this, clientPeerId, requestedLease, stallTimeout, clientQueueSize); // add the handler to the list relayedClients.put(clientPeerId, handler); isNewClient = true; // check if this is the first client added if (relayedClients.size() == 1) { // start the gcThread if it is not already started if (gcThread == null) { gcThread = new Thread(group.getHomeThreadGroup(), this, "GC Thread for Relay at " + publicAddress); gcThread.setDaemon(true); gcThread.start(); } } } else { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine( "new client denied. nb clients: " + relayedClients.size() + "/" + maxClients + ", messenger: " + messenger); } } } } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("added = " + (handler != null)); } if (handler == null) { return null; } // renew the lease on the old handler // Watchout. The handler might have expired since we got it from the // map. RenewLease will tell us. In that case, tough luck. We don't // make a new one. FIXME: it's not nice to the client, but in no way // a disaster (and very rare). if (!handler.renewLease()) { return null; } if (flushQueue) { handler.flushQueue(); } if (messenger != null) { handler.addMessenger(messenger); // We must force the router to learn the new relay connection as // a direct route, so that it replies to route queries even if we // never start talking to the client otherwise. // Here we do something rather acrobatic. We invoke messengerReady // recursively with a new relay messenger that the router will // catch as if it where an incoming messenger (which it is, sort // of). The cleaner alternative: call getMessenger with a hint // causes too much commotion: sometimes an unreachable tcp address // is tried before the hint, which blocks getMessenger for long. if (isNewClient) { EndpointAddress ear = new EndpointAddress(RelayTransport.protocolName, clientPeerId, null, null); MessengerEvent me = new MessengerEvent(this, handler.getMessenger(publicAddress, ear, false), null); messengerEventListener.messengerReady(me); } } return handler; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -