📄 edgepeerrdvservice.java
字号:
rendezvousConnectionMeter.connectionDisconnected(); } } private void sendLeaseRequest(RdvConnection pConn) throws IOException { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Sending Lease request to " + pConn); } RendezvousConnectionMeter rendezvousConnectionMeter = null; if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousServiceMonitor != null)) { rendezvousConnectionMeter = rendezvousServiceMonitor.getRendezvousConnectionMeter( pConn.getPeerID().toString() ); } Message msg = new Message(); // The request simply includes the local peer advertisement. msg.replaceMessageElement("jxta", new TextDocumentMessageElement(ConnectRequest, getPeerAdvertisementDoc(), null)); pConn.sendMessage(msg, pName, pParam); if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousConnectionMeter != null)) { rendezvousConnectionMeter.beginConnection(); } } /** * Description of the Method * * @param msg Description of Parameter */ private void processConnectedReply(Message msg) { // get the Peer Advertisement of the RDV. MessageElement elem = msg.getMessageElement("jxta", ConnectedRdvAdvReply); if (null == elem) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("missing rendezvous peer advertisement"); } return; } long lease; try { MessageElement el = msg.getMessageElement("jxta", ConnectedLeaseReply); if (el == null) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("missing lease"); } return; } lease = Long.parseLong(el.toString()); } catch (Exception e) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Parse lease failed with ", e); } return; } ID pId; MessageElement el = msg.getMessageElement("jxta", ConnectedPeerReply); if (el == null) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("missing rdv peer"); } return; } try { pId = IDFactory.fromURI(new URI(el.toString())); } catch (URISyntaxException badID) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Bad RDV peer ID"); } return; } if (lease <= 0) { removeRdv(pId, false); } else { if (rendezVous.containsKey(pId) || ((rendezVous.size() < MAX_RDV_CONNECTIONS) && (rdvService.rpv.getPeerViewElement(pId) != null))) { InputStream is = null; PeerAdvertisement padv = null; try { is = elem.getStream(); padv = (PeerAdvertisement) AdvertisementFactory.newAdvertisement(elem.getMimeType(), is); // This is not our own peer adv so we must not keep it longer than // its expiration time. DiscoveryService discovery = group.getDiscoveryService(); if (null != discovery) { // This is not our own peer adv so we must not share it or keep it that long. discovery.publish(padv, lease * 2, 0); } } catch (Exception e) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("failed to publish Rendezvous Advertisement", e); } } finally { if (null != is) { try { is.close(); } catch (IOException ignored) { ; } } is = null; } if (null == padv) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("missing rendezvous peer advertisement"); } return; } String rdvName = padv.getName(); if (null == padv.getName()) { rdvName = pId.toString(); } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("RDV Connect Response : peer=" + rdvName + " lease=" + lease + "ms"); } addRdv(padv, lease); } else { LOG.debug("Ignoring lease offer from " + pId); // XXX bondolo 20040423 perhaps we should send a disconnect here. } } } /** * {@inheritDoc} */ public void setChoiceDelay(long delay) { monitorStartAt = TimeUtils.toAbsoluteTimeMillis(delay); } /** * {@inheritDoc} */ public void peerViewEvent(PeerViewEvent event) { int theEventType = event.getType(); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("[" + group.getPeerGroupName() + "] Processing " + event); } switch (theEventType) { case PeerViewEvent.ADD: synchronized (this) { try { // There is a new rdv in the peerview. If we are not // connected, it is worth a try, right away. // Use the single timer thread rather than doing it // from this thread which belongs to the invoker. // This removes risks of dealocks and other calamities. // All we have to do is to change the schedule. if (!rendezVous.isEmpty()) { break; } // We do not act upon every single add event. If they // come in storms as they do during boot, it would // make us launch many immediate attempts in parallel, // which causes useless traffic. As long as // choiceDelay is not exhausted we just reschedule // accordingly. Once choiceDelay is exhausted, we // schedule for immediate execution, but only if we // haven't done so in the last ADDEVENT_DELAY. long choiceDelay; if (monitorStartAt == -1) { // The startDate had never been decided. Initialize it now. choiceDelay = maxChoiceDelay; monitorStartAt = TimeUtils.toAbsoluteTimeMillis(choiceDelay); } else { choiceDelay = TimeUtils.toRelativeTimeMillis(monitorStartAt); } if (choiceDelay <= 0) { if (TimeUtils.toRelativeTimeMillis(monitorNotBefore) > 0) { break; } monitorNotBefore = TimeUtils.toAbsoluteTimeMillis(ADDEVENT_DELAY); choiceDelay = 0; } else { monitorStartAt -= ADDEVENT_DELAY; } // Either way, we're allowed to (re) schedule; possibly immediately. if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Scheduling rdv monitor in " + choiceDelay + "ms."); } timer.schedule(new MonitorTask(), choiceDelay); } catch (Exception anything) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Event could not be processed", anything); } // Don't do it, then. The likely cause is that this // monitor is being closed. } } break; case PeerViewEvent.REMOVE: case PeerViewEvent.FAIL: PeerViewElement pve = event.getPeerViewElement(); ID failedPVE = pve.getRdvAdvertisement().getPeerID(); RdvConnection pConn = (RdvConnection) rendezVous.get(failedPVE); if (null != pConn) { pConn.setConnected(false); removeRdv(pConn.getPeerID(), false); } break; default: break; } } /** * Connects to a random rendezvous from the peer view. */ private void connectToRandomRdv() { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Periodic rendezvous connect attempt for " + group.getPeerGroupID()); } List currentView = new ArrayList(Arrays.asList(rdvService.rpv.getView().toArray())); Collections.shuffle(currentView); while (!currentView.isEmpty()) { PeerViewElement pve = (PeerViewElement) currentView.remove(0); RdvAdvertisement radv = pve.getRdvAdvertisement(); if (null == radv) { continue; } if( null != getPeerConnection( radv.getPeerID() ) ) { continue; } try { newLeaseRequest(radv); break; } catch (IOException ez) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("rdv connection failed.", ez); } } } } /** * A timer task for monitoring our active rendezvous connections * * <p/>Checks leases, challenges when peer adv has changed, initiates * lease renewals, starts new lease requests. */ private class MonitorTask extends TimerTask { /** * @inheritDoc */ public void run() { try { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("[" + group.getPeerGroupID() + "] Periodic rendezvous check"); } Iterator eachRendezvous = Arrays.asList(rendezVous.values().toArray()).iterator(); while (eachRendezvous.hasNext()) { RdvConnection pConn = (RdvConnection) eachRendezvous.next(); try { if ( !pConn.isConnected() ) { if (LOG.isEnabledFor(Level.INFO)) { LOG.debug("[" + group.getPeerGroupID() + "] Lease expired. Disconnected from " + pConn); } removeRdv(pConn.getPeerID(), false); continue; } if (pConn.peerAdvertisementHasChanged()) { // Pretend that our lease is expiring, so that we do not rest // until we have proven that we still have an rdv. if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("[" + group.getPeerGroupID() + "] Local PeerAdvertisement changed. Challenging " + pConn); } challengeRendezVous(pConn.getPeerID(), CHALLENGE_TIMEOUT); continue; } if (TimeUtils.toRelativeTimeMillis(pConn.getRenewal()) <= 0) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("[" + group.getPeerGroupID() + "] Attempting lease renewal for " + pConn); } sendLeaseRequest(pConn); } } catch (Exception e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn( "[" + group.getPeerGroupID() + "] Failure while checking " + pConn, e); } } } // Not enough Rdvs? Try finding more. if (rendezVous.size() < MAX_RDV_CONNECTIONS) { connectToRandomRdv(); } } catch (Throwable t) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Uncaught throwable in thread :" + Thread.currentThread().getName(), t); } } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -