📄 relayserver.java
字号:
private RelayServerClient removeClient(String clientPeerId) { RelayServerClient handler; if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("removeClient(" + clientPeerId + ")"); } synchronized (relayedClients) { handler = relayedClients.remove(clientPeerId); // check if there are any clients if (relayedClients.size() == 0) { // stop the gcThread if (gcThread != null) { try { gcThread.interrupt(); } catch (SecurityException e) { // ignore this exception if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine(e.toString()); } } } } } return handler; } // this is only used by the RelayServerClient when it is closing and needs to remove itself protected void removeClient(String clientPeerId, RelayServerClient handler) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("removeClient(" + clientPeerId + "," + handler + ")"); } synchronized (relayedClients) { RelayServerClient currentHandler = relayedClients.get(clientPeerId); // only remove the client if the current handler matches the passed one if (currentHandler == handler) { relayedClients.remove(clientPeerId); } // check if there are any clients if (relayedClients.size() == 0) { // stop the gcThread Thread temp = gcThread; if (temp != null) { try { temp.interrupt(); } catch (SecurityException e) { // ignore this exception if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine(e.toString()); } } } } } } /** * {@inheritDoc} */ public void run() { if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("Starting lease gc thread"); } try { while (true) { // check if there are any client handlers left synchronized (relayedClients) { if (relayedClients.size() == 0) { break; } } // do the lease gc doClientGC(); // check if there are any client handlers left synchronized (relayedClients) { if (relayedClients.size() == 0) { break; } } // sleep for a while. try { Thread.sleep(stallTimeout); } catch (InterruptedException e) { Thread.interrupted(); } } } catch (Throwable all) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.log(Level.SEVERE, "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all); } } finally { gcThread = null; if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("stopping lease gc thread"); } } } // checks for expired client handlers private void doClientGC() { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("start: check for expired client handler. # clients = " + relayedClients.size()); } // get a snapshot of the client handlers RelayServerClient[] handlers; synchronized (relayedClients) { handlers = relayedClients.values().toArray(new RelayServerClient[0]); } // run through the client handlers int i = handlers.length; while (i-- > 0) { try { // simply calling isExpired will cause the handler to check // if it is expired and remove itself if expired handlers[i].isExpired(); } catch (Exception e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Exception during client gc", e); } } } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("stop: check for expired client handler. # clients = " + relayedClients.size()); } } private static class RelayServerCache implements PipeMsgListener, Runnable { final static ID pipeID = ID.create( URI.create("urn:jxta:uuid-59616261646162614E50472050325033DEADBEEFDEAFBABAFEEDBABE0000000F04")); final RelayServer server; final PipeAdvertisement pipeAdv; InputPipe inputPipe = null; volatile boolean doRun = false; Thread cacheThread = null; final Map<String, RdvAdvertisement> relayAdvCache = new HashMap<String, RdvAdvertisement>(); final Random rand = new Random(); protected RelayServerCache(RelayServer server) { this.server = server; pipeAdv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType()); pipeAdv.setPipeID(pipeID); pipeAdv.setType(PipeService.PropagateType); } private int relayAdvCacheSize() { synchronized (relayAdvCache) { return relayAdvCache.size(); } } protected RdvAdvertisement getRandomCacheAdv() { synchronized (relayAdvCache) { RdvAdvertisement[] items = relayAdvCache.values().toArray(new RdvAdvertisement[0]); if (items.length == 0) { return null; } return items[rand.nextInt(items.length)]; } } private boolean putCacheAdv(String peerId, RdvAdvertisement adv) { if (!server.acl.isAllowed(adv.getPeerID())) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Rejected cache entry for : " + peerId); } return false; } synchronized (relayAdvCache) { boolean replaced = (null != relayAdvCache.put(peerId, adv)); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine((replaced ? "Updated" : "Created") + " cache entry for : " + peerId); } if (relayAdvCache.size() >= MAX_CACHED_SERVERS) { // New entry and map full. Remove one at random. String[] keys = relayAdvCache.keySet().toArray(new String[0]); relayAdvCache.remove(keys[rand.nextInt(keys.length)]); } return replaced; } } /** * {@inheritDoc} */ public void pipeMsgEvent(PipeMsgEvent event) { Message message = event.getMessage(); if (message == null) { return; } boolean isResponse = (RelayTransport.getString(message, RelayTransport.RESPONSE_ELEMENT) != null); String peerId = RelayTransport.getString(message, RelayTransport.PEERID_ELEMENT); if (peerId == null || peerId.equals(server.peerId)) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("pipeMsgEvent() discarding message no response PID defined, or loopback "); } return; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("pipeMsgEvent() from " + peerId); } MessageElement me = message.getMessageElement(RelayTransport.RELAY_NS, RelayTransport.RELAY_ADV_ELEMENT); if (null == me) { return; } Advertisement adv; try { // XXX bondolo 20041207 Force parsing of MessageElement as // XMLUTF8 rather than the actual mime type associated with the // MessageElement since the advertisement is often incorrectly // stored as a String by older JXTA implementations. adv = AdvertisementFactory.newAdvertisement(MimeMediaType.XMLUTF8, me.getStream()); } catch (IOException failed) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Failed building relay advertisement", failed); } return; } catch (NoSuchElementException failed) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Could not build relay advertisement", failed); } return; } if (!(adv instanceof RdvAdvertisement)) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Response does not contain relay advertisement (" + adv.getAdvType() + ")"); } return; } RdvAdvertisement radv = (RdvAdvertisement) adv; if (putCacheAdv(peerId, radv)) { // New entry, we might want to respond. // "someone" should respond; on average, one response // is all we want. And that response obviously should be // unicast. // We achieve an approximation of that by making a computation // that will result in "true" on average on only one peer // of the set, based on our idea of what the set is. // If we know very few other relays compared to what other // relays know, we are more likely to respond than they are. // So this is very approximate. We want to keep it simple // until we have time replace this lazy junk with something // sensible. // If it's a response already, the story stops here ! if (isResponse) { return; } // Here we go: int i = relayAdvCacheSize(); long magic = server.peerId.hashCode() % i; if (rand.nextInt(i) == magic) { // Our number came out. Respond. // See if we have amunition to respond anyway. // Very defensive. I care a lot more not to break anything // at this stage, than to have optimal functionality. RdvAdvertisement myAdv = RelayServer.createRdvAdvertisement(server.group.getPeerAdvertisement(), server.serviceName);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -