📄 relayclient.java
字号:
/** * Logic for the relay client * * <ol> * <li>Pick a relay server to try</li> * <li>try getting a messenger to relay server, if can not get messenger, start over</li> * <li>use the messenger to send a connect message</li> * <li> wait for a response, if there is no response or a disconnect response, start over</li> * <li>while still connected * <ol> * <li>renew the lease as needed and keep the messenger connected</li> * <ol></li> * </ol> * * <p/>FIXME 20041102 bondolo The approach used here is really, really * stupid. The calls to <code>connectToRelay()</code> will not return if a * connection to a relay is achieved. This makes continued iteration over * seeds after return incredibly silly. <code>connectToRelay()</code> only * returns when it can <b>NO LONGER CONNECT</b> to the relay. The only * hack I can think of to subvert this is to stop iteration of advs/seeds * if <code>connectToRelay()</code> takes a long time. bizarre. **/ public void run() { if (LOG.isEnabledFor(Level.INFO)) { LOG.info("Start relay client thread"); } try { long nextReSeedAt = 0; long nextSeedAt = 0; long nextDiscoveryAt = 0; // run until the service is stopped while ( !closed ) { long nextConnectAttemptAt = Math.min( nextSeedAt, nextDiscoveryAt ); long untilNextConnectAttempt = TimeUtils.toRelativeTimeMillis( nextConnectAttemptAt ); if( untilNextConnectAttempt > 0 ) { try { Thread.sleep(untilNextConnectAttempt); } catch (InterruptedException e) { // ignore interrupted exception if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Thread Interrupted ", e); } continue; } } if( TimeUtils.toRelativeTimeMillis( nextDiscoveryAt ) < 0 ) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Check discovery for Relay Advertisement"); } DiscoveryService discovery = group.getDiscoveryService(); if( null == discovery ) { continue; } Enumeration advEnum; try { advEnum = discovery.getLocalAdvertisements(DiscoveryService.ADV, RdvAdvertisement.ServiceNameTag, serviceName); } catch (IOException e) { // ignore IOException if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Failure loading Relay Advertisements", e); } continue; } while (advEnum.hasMoreElements() && !closed) { Object obj = advEnum.nextElement(); if (obj instanceof RdvAdvertisement) { RdvAdvertisement relayAdv = (RdvAdvertisement) obj; // sanity check if (!serviceName.equals(relayAdv.getServiceName())) { continue; } while (relayAdv != null) { relayAdv = connectToRelay(new RelayServerConnection(this, relayAdv)); } } } nextDiscoveryAt = TimeUtils.toAbsoluteTimeMillis( 10 * TimeUtils.ASECOND ); continue; } if( TimeUtils.toRelativeTimeMillis( nextSeedAt ) < 0 ) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Use the seed relay servers"); } List allSeeds = new ArrayList( seedRelays ); if( TimeUtils.toRelativeTimeMillis( nextReSeedAt ) < 0 ) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Loading seeds froms seeding URIs"); } seededRelays.clear(); Iterator allSeedingURIs = seedingURIs.iterator(); while(allSeedingURIs.hasNext()) { URI aURI = (URI) allSeedingURIs.next(); try { seededRelays.addAll(Arrays.asList(loadSeeds(aURI))); } catch( IOException failed ) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Failed loading seeding list from : " + aURI ); } } } // We try not to reseed very often. nextReSeedAt = TimeUtils.toAbsoluteTimeMillis( 20 * TimeUtils.AMINUTE ); } allSeeds.addAll( seededRelays ); Collections.shuffle( allSeeds ); Iterator allSeedRelays = allSeeds.iterator(); while ( allSeedRelays.hasNext() && !closed ) { EndpointAddress aSeed = new EndpointAddress( (EndpointAddress) allSeedRelays.next(), serviceName, peerId ); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Attempting relay connect to : " + aSeed ); } RdvAdvertisement relayAdv = connectToRelay(new RelayServerConnection(this, aSeed)); if (relayAdv != null) { // FIXME: jice@jxta.org - 20030206 : we rely on // connectToRelay() to return null when we are supposed // to leave. We should rather check. while (relayAdv != null) { relayAdv = connectToRelay(new RelayServerConnection(this, relayAdv)); } // since there should be at least one Relay Advertisement published, // do not use seed relay servers any more break; } } nextSeedAt = TimeUtils.toAbsoluteTimeMillis( 30 * TimeUtils.ASECOND ); continue; } } } catch (Throwable all) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Uncaught Throwable in thread :" + Thread.currentThread().getName(), all); } } finally { thread = null; if (LOG.isEnabledFor(Level.INFO)) { LOG.info("stop client thread"); } } } protected boolean isRelayConnectDone() { return (thread == null || Thread.currentThread() != thread); } /** * @param server The relay server to connect to * @return The advertisement of an alternate relay server to try. **/ RdvAdvertisement connectToRelay(RelayServerConnection server) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Connecting to " + server); } RdvAdvertisement alternateRelayAdv = null; // make this the current server currentServer = server; // try getting a messenger to the relay peer if (server.createMessenger(leaseLengthToRequest) == false) { return alternateRelayAdv; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("got messenger " + server); } // check the peerId of the relay peer if (server.logicalAddress != null && "jxta".equals(server.logicalAddress.getProtocolName())) { server.peerId = server.logicalAddress.getProtocolAddress(); } // make sure that the peerId was found. if (server.peerId == null) { if (server.messenger != null) { server.sendDisconnectMessage(); server.messenger.close(); } return alternateRelayAdv; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("got peerId " + server); } synchronized (this) { // wait for a response from the server // There is no real damage other than bandwidth usage in sending // a message on top of the connection request, so we realy do not // wait very long before doing it. long requestTimeoutAt = TimeUtils.toAbsoluteTimeMillis(5 * TimeUtils.ASECOND); while (currentServer != null && currentServer.leaseLength == 0 && !isRelayConnectDone()) { long waitTimeout = requestTimeoutAt - System.currentTimeMillis(); if (waitTimeout <= 0) { // did not receive the response in time ? break; } try { wait(waitTimeout); } catch (InterruptedException e) { // ignore interrupt if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("wait got interrupted early ", e); } } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("wait done"); } } } if (currentServer == null) { return server.alternateRelayAdv; } if (isRelayConnectDone()) { if (currentServer.messenger != null) { currentServer.messenger.close(); } currentServer = null; return server.alternateRelayAdv; } // If we did not get a lease in the first 5 secs, maybe it is because // the server knows us from a previous session. Then it will wait for // a lease renewal message before responding, not just the connection. // Send one and wait another 15. if (currentServer.leaseLength == 0) { currentServer.sendConnectMessage(leaseLengthToRequest); synchronized (this) { // wait for a response from the server long requestTimeoutAt = TimeUtils.toAbsoluteTimeMillis(15 * TimeUtils.ASECOND); while (currentServer != null && currentServer.leaseLength == 0 && !isRelayConnectDone()) { long waitTimeout = requestTimeoutAt - System.currentTimeMillis(); if (waitTimeout <= 0) { // did not receive the response in time ? break; } try { wait(waitTimeout); } catch (InterruptedException e) { // ignore interrupt if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("wait got interrupted early ", e); } } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("wait done"); } } } } // If we had a messenger but are going to give up that relay server because it is // not responsive or rejected us. Make sure that the messenger is closed. if (currentServer == null) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("did not get connect from " + server); } // return any alternate relay advertisements return server.alternateRelayAdv; } if (currentServer.relayAdv == null || currentServer.leaseLength == 0 || isRelayConnectDone()) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("did not get connect from " + server); } if (currentServer.messenger != null) { currentServer.sendDisconnectMessage(); currentServer.messenger.close(); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -