📄 relayserver.java
字号:
OutputPipe retPipe = null; try { retPipe = pipeService.createOutputPipe(pipeAdv, Collections.singleton(otherPid), 2 * TimeUtils.ASECOND); if (retPipe == null) { return; } // create a new cache message message = new Message(); // String version of unique portion only. Per the protocol. RelayTransport.setString(message, RelayTransport.PEERID_ELEMENT, server.peerId); // Our own adv. RelayTransport.setString(message, RelayTransport.RELAY_ADV_ELEMENT, myAdv.toString()); // This is a response. New servers: do not respond! Old // servers won't respond anyway. RelayTransport.setString(message, RelayTransport.RESPONSE_ELEMENT, "t"); retPipe.send(message); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Responded"); } } catch (IOException e) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Could not send reply on pipe ", e); } } if (retPipe != null) { retPipe.close(); } return; } } } /** * {@inheritDoc} **/ public void run() { try { OutputPipe outputPipe = null; PipeService pipeService = server.group.getPipeService(); while (doRun && inputPipe == null) { try { inputPipe = pipeService.createInputPipe(pipeAdv, this); } catch (IOException e) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Could not create input pipe, try again"); } } catch (IllegalStateException e) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Pipe Service not ready yet, try again"); } } try { Thread.sleep(TimeUtils.ASECOND); } catch (InterruptedException e) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("wait interrupted"); } } } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Created input pipe"); } while (doRun && outputPipe == null) { try { outputPipe = pipeService.createOutputPipe(pipeAdv, 5 * TimeUtils.ASECOND); } catch (IOException e) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Could not create output pipe, try again"); } } catch (IllegalStateException e) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Pipe Service not ready yet, try again"); } } try { Thread.sleep(TimeUtils.ASECOND); } catch (InterruptedException e) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("wait interrupted "); } } } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Created output pipe"); } // Wait a little before mcasting our hello. // We depend on the rendezvous infrastructure for it to // work. It's pretty important to get the first one out // so that we may get a response from others. After that // the interval is very long (and its computation an total // nonsense) and so others do not talk much // either. We want to learn at least one other relay early on. // FIXME: jice@jxta.org 20030131 - We realy need to switch to // using peerview. It does all of that correctly. try { Thread.sleep(10 * TimeUtils.ASECOND); } catch (InterruptedException e) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("wait interrupted"); } } while (doRun) { RdvAdvertisement adv = server.createRdvAdvertisement(server.group.getPeerAdvertisement(), server.serviceName); // Make sure that the version that can be discovered // is consistent. try { server.discoveryService.publish(adv); } catch (IOException e) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Could not publish Relay RdvAdvertisement", e); } } if (adv != null) { // create a new cache message Message message = new Message(); RelayTransport.setString(message, RelayTransport.PEERID_ELEMENT, server.peerId); RelayTransport.setString(message, RelayTransport.RELAY_ADV_ELEMENT, adv.toString()); try { outputPipe.send(message); } catch (IOException e) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Could not send message on pipe ", e); } } } long sleepTime = server.minBroadcastInterval + ((server.relayedClients.size() + 1) * 100 / (server.maxClients + 1)) * server.minBroadcastInterval; if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("sleepTime=" + sleepTime); } try { Thread.sleep(sleepTime); } catch (InterruptedException e) { Thread.interrupted(); } } outputPipe.close(); if (System.currentTimeMillis() > server.refreshTime) { server.refreshTime = System.currentTimeMillis() + ACL_REFRESH_PERIOD; if (server.aclFile.lastModified() > server.aclFileLastModified) { server.aclFileLastModified = server.aclFile.lastModified(); server.acl.refresh(server.aclFile); } } } catch (Throwable all) { if (LOG.isEnabledFor(Level.FATAL)) { LOG.fatal("Uncaught Throwable in thread :" + Thread.currentThread().getName(), all); } } finally { cacheThread = null; if (LOG.isEnabledFor(Level.INFO)) { LOG.info("Cache thread quitting."); } } } protected void startCache() { doRun = true; cacheThread = new Thread(server.group.getHomeThreadGroup(), this, "RelayCache Worker Thread for " + server.publicAddress); cacheThread.setDaemon(true); cacheThread.start(); } protected void stopCache() { doRun = false; if (inputPipe != null) { inputPipe.close(); inputPipe = null; } cacheThread.interrupt(); } } /** * Sends a message on an synchronous messenger. **/ static class BGSend extends Thread { Messenger mr; Message ms; String sn; String ps; BGSend(Messenger mr, Message ms, String sn, String ps) { super("Relay Background Sender"); this.mr = mr; this.ms = ms; this.sn = sn; this.ps = ps; setDaemon(true); start(); } /** * {@inheritDoc} **/ public void run() { try { mr.sendMessage(ms, sn, ps); } catch (IOException e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Failed sending response " + ms + " to " + ps, e); } } catch (Throwable all) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Uncaught Throwable in thread :" + Thread.currentThread().getName(), all); } } } } private final static RouteAdvertisement extractRouteAdv(PeerAdvertisement adv) { try { // Get its EndpointService advertisement XMLElement endpParam = (XMLElement) adv.getServiceParam(PeerGroup.endpointClassID); if (endpParam == null) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("No Endpoint Params"); } return null; } // get the Route Advertisement element Enumeration paramChilds = endpParam.getChildren(RouteAdvertisement.getAdvertisementType()); XMLElement param = null; if (paramChilds.hasMoreElements()) { param = (XMLElement) paramChilds.nextElement(); } else { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("No Route Adv in Peer Adv"); } return null; } // build the new route RouteAdvertisement route = (RouteAdvertisement) AdvertisementFactory.newAdvertisement((XMLElement) param); route.setDestPeerID(adv.getPeerID()); return route; } catch (Exception e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("failed to extract radv", e); } } return null; } private final static RdvAdvertisement createRdvAdvertisement(PeerAdvertisement padv, String name) { try { // FIX ME: 10/19/2002 lomax@jxta.org. We need to properly set up the service ID. Unfortunately // this current implementation of the PeerView takes a String as a service name and not its ID. // Since currently, there is only PeerView per group (all peerviews share the same "service", this // is not a problem, but that will have to be fixed eventually. // create a new RdvAdvertisement RdvAdvertisement rdv = (RdvAdvertisement) AdvertisementFactory.newAdvertisement(RdvAdvertisement.getAdvertisementType()); rdv.setPeerID(padv.getPeerID()); rdv.setGroupID(padv.getPeerGroupID()); rdv.setServiceName(name); rdv.setName(padv.getName()); RouteAdvertisement ra = extractRouteAdv(padv); // Insert it into the RdvAdvertisement. rdv.setRouteAdv(ra); return rdv; } catch (Exception ez) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Cannot create Local RdvAdvertisement: ", ez); } return null; } } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -