📄 relayserver.java
字号:
// Need to convert the other party's string pid into // a real pid. PeerID otherPid = null; try { otherPid = (PeerID) IDFactory.fromURI(new URI(ID.URIEncodingName, ID.URNNamespace + ":" + peerId, null)); } catch (Exception ex) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Bad peerid : " + peerId, ex); } } PipeService pipeService = server.group.getPipeService(); if (pipeService == null) { return; // Funny. We're receiving messages, after all. } // FIXME: jice@jxta.org 20030131 - We're making a rather // unorthodox use of the peer-subset feature of propagate // pipes. Basically what this does is to send the message // in unicast so that it is received on the propagate // input pipe of the specified peer. // The correct API, if it existed, would be respond(). OutputPipe retPipe = null; try { retPipe = pipeService.createOutputPipe(pipeAdv, Collections.singleton(otherPid), 2 * TimeUtils.ASECOND); if (retPipe == null) { return; } // create a new cache message message = new Message(); // String version of unique portion only. Per the protocol. RelayTransport.setString(message, RelayTransport.PEERID_ELEMENT, server.peerId); // Our own adv. RelayTransport.setString(message, RelayTransport.RELAY_ADV_ELEMENT, myAdv.toString()); // This is a response. New servers: do not respond! Old // servers won't respond anyway. RelayTransport.setString(message, RelayTransport.RESPONSE_ELEMENT, "t"); retPipe.send(message); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Responded"); } } catch (IOException e) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Could not send reply on pipe ", e); } } if (retPipe != null) { retPipe.close(); } } } } /** * {@inheritDoc} */ public void run() { try { OutputPipe outputPipe = null; PipeService pipeService = server.group.getPipeService(); while (doRun && inputPipe == null) { try { inputPipe = pipeService.createInputPipe(pipeAdv, this); } catch (IOException e) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Could not create input pipe, try again"); } } catch (IllegalStateException e) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Pipe Service not ready yet, try again"); } } try { Thread.sleep(TimeUtils.ASECOND); } catch (InterruptedException e) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("wait interrupted"); } } } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Created input pipe"); } while (doRun && outputPipe == null) { try { outputPipe = pipeService.createOutputPipe(pipeAdv, 5 * TimeUtils.ASECOND); } catch (IOException e) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Could not create output pipe, try again"); } } catch (IllegalStateException e) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Pipe Service not ready yet, try again"); } } try { Thread.sleep(TimeUtils.ASECOND); } catch (InterruptedException e) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("wait interrupted "); } } } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Created output pipe"); } // Wait a little before mcasting our hello. // We depend on the rendezvous infrastructure for it to // work. It's pretty important to get the first one out // so that we may get a response from others. After that // the interval is very long (and its computation an total // nonsense) and so others do not talk much // either. We want to learn at least one other relay early on. // FIXME: jice@jxta.org 20030131 - We realy need to switch to // using peerview. It does all of that correctly. try { Thread.sleep(10 * TimeUtils.ASECOND); } catch (InterruptedException e) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("wait interrupted"); } } while (doRun) { RdvAdvertisement adv = RelayServer.createRdvAdvertisement(server.group.getPeerAdvertisement(), server.serviceName); // Make sure that the version that can be discovered // is consistent. try { server.discoveryService.publish(adv); } catch (IOException e) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Could not publish Relay RdvAdvertisement", e); } } if (adv != null) { // create a new cache message Message message = new Message(); RelayTransport.setString(message, RelayTransport.PEERID_ELEMENT, server.peerId); RelayTransport.setString(message, RelayTransport.RELAY_ADV_ELEMENT, adv.toString()); try { outputPipe.send(message); } catch (IOException e) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Could not send message on pipe ", e); } } } long sleepTime = server.minBroadcastInterval + ((server.relayedClients.size() + 1) * 100 / (server.maxClients + 1)) * server.minBroadcastInterval; if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("sleepTime=" + sleepTime); } try { Thread.sleep(sleepTime); } catch (InterruptedException e) { Thread.interrupted(); } } outputPipe.close(); if (System.currentTimeMillis() > server.refreshTime) { server.refreshTime = System.currentTimeMillis() + ACL_REFRESH_PERIOD; if (server.aclFile.lastModified() > server.aclFileLastModified) { server.aclFileLastModified = server.aclFile.lastModified(); server.acl.refresh(server.aclFile); } } } catch (Throwable all) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.log(Level.SEVERE, "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all); } } finally { cacheThread = null; if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("Cache thread quitting."); } } } protected void startCache() { doRun = true; cacheThread = new Thread(server.group.getHomeThreadGroup(), this, "RelayCache Worker Thread for " + server.publicAddress); cacheThread.setDaemon(true); cacheThread.start(); } protected void stopCache() { doRun = false; if (inputPipe != null) { inputPipe.close(); inputPipe = null; } cacheThread.interrupt(); } } /** * Sends a message on an synchronous messenger. */ static class BGSend extends Thread { Messenger mr; Message ms; String sn; String ps; BGSend(Messenger mr, Message ms, String sn, String ps) { super("Relay Background Sender"); this.mr = mr; this.ms = ms; this.sn = sn; this.ps = ps; setDaemon(true); start(); } /** * {@inheritDoc} */ @Override public void run() { try { mr.sendMessage(ms, sn, ps); } catch (IOException e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Failed sending response " + ms + " to " + ps, e); } } catch (Throwable all) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.log(Level.SEVERE, "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all); } } } } private static RdvAdvertisement createRdvAdvertisement(PeerAdvertisement padv, String name) { try { // FIX ME: 10/19/2002 lomax@jxta.org. We need to properly set up the service ID. Unfortunately // this current implementation of the PeerView takes a String as a service name and not its ID. // Since currently, there is only PeerView per group (all peerviews share the same "service", this // is not a problem, but that will have to be fixed eventually. // create a new RdvAdvertisement RdvAdvertisement rdv = (RdvAdvertisement) AdvertisementFactory.newAdvertisement( RdvAdvertisement.getAdvertisementType()); rdv.setPeerID(padv.getPeerID()); rdv.setGroupID(padv.getPeerGroupID()); rdv.setServiceName(name); rdv.setName(padv.getName()); RouteAdvertisement ra = EndpointUtils.extractRouteAdv(padv); if (null == ra) { // No route available return null; } // Insert it into the RdvAdvertisement. rdv.setRouteAdv(ra); return rdv; } catch (Exception ez) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Cannot create Local RdvAdvertisement: ", ez); } return null; } } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -