📄 peerview.java
字号:
} } /** * Refresh the local copy of the peer advertisement and the rendezvous * advertisement. */ private void refreshSelf() { RdvAdvertisement radv; synchronized (this) { PeerAdvertisement newPadv = group.getPeerAdvertisement(); int newModCount = newPadv.getModCount(); if ((lastPeerAdv != newPadv) || (lastModCount != newModCount)) { lastPeerAdv = newPadv; lastModCount = newModCount; // create a new local RdvAdvertisement and set it to self. radv = createRdvAdvertisement(lastPeerAdv, name); if (radv != null) { self.setRdvAdvertisement(radv); } } } } static RdvAdvertisement createRdvAdvertisement(PeerAdvertisement padv, String serviceName) { try { // FIX ME: 10/19/2002 lomax@jxta.org. We need to properly set up the service ID. Unfortunately // this current implementation of the PeerView takes a String as a service name and not its ID. // Since currently, there is only PeerView per group (all peerviews share the same "service", this // is not a problem, but that will have to be fixed eventually. // create a new RdvAdvertisement RdvAdvertisement rdv = (RdvAdvertisement) AdvertisementFactory.newAdvertisement( RdvAdvertisement.getAdvertisementType()); rdv.setPeerID(padv.getPeerID()); rdv.setGroupID(padv.getPeerGroupID()); rdv.setServiceName(serviceName); rdv.setName(padv.getName()); RouteAdvertisement ra = EndpointUtils.extractRouteAdv(padv); // Insert it into the RdvAdvertisement. rdv.setRouteAdv(ra); return rdv; } catch (Exception ez) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Cannot create Local RdvAdvertisement: ", ez); } return null; } } /** * Add a listener for PeerViewEvent * * @param listener An PeerViewListener to process the event. * @return true if successful */ public boolean addListener(PeerViewListener listener) { boolean added = rpvListeners.add(listener); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Registered PeerViewEvent Listener (" + listener.getClass().getName() + ")"); } return added; } /** * Removes a PeerViewEvent Listener previously added with addListener. * * @param listener the PeerViewListener listener remove * @return whether successful or not */ public boolean removeListener(PeerViewListener listener) { boolean removed = rpvListeners.remove(listener); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Removed PeerViewEvent Listener (" + listener.getClass().getName() + ")"); } return removed; } /** * Generate a PeerView Event and notify all listeners. * * @param type the Event Type. * @param element The peer having the event. */ private void generateEvent(int type, PeerViewElement element) { PeerViewEvent newevent = new PeerViewEvent(this, type, element); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Calling listeners for " + newevent + " in group " + group.getPeerGroupID()); } for (Object o : Arrays.asList(rpvListeners.toArray())) { PeerViewListener pvl = (PeerViewListener) o; try { pvl.peerViewEvent(newevent); } catch (Throwable ignored) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.log(Level.SEVERE, "Uncaught Throwable in PeerViewEvent listener : (" + pvl.getClass().getName() + ")" , ignored); } } } } static PipeAdvertisement makeWirePipeAdvertisement(PeerGroup destGroup, PeerGroup group, String name) { PipeAdvertisement adv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType()); // Create a pipe advertisement for this group. // Generate a well known but unique ID. // FIXME bondolo 20040507 The ID created is really poor, it has only // 2 unique bytes on average. it would be much better to hash something // also, since the the definition of how to use the seed bytes is not // fixed, it's not reliable. PipeID pipeId = IDFactory.newPipeID(destGroup.getPeerGroupID() , (SERVICE_NAME + group.getPeerGroupID().getUniqueValue().toString() + name).getBytes()); adv.setPipeID(pipeId); adv.setType(PipeService.PropagateType); adv.setName(SERVICE_NAME + " pipe for " + group.getPeerGroupID()); return adv; } private synchronized void openWirePipes() { PipeService pipes = group.getPipeService(); if (null == pipes) { scheduleOpenPipes(TimeUtils.ASECOND); // Try again in one second. return; } try { // First, listen to in our own PeerGroup if (null == localGroupWirePipeInputPipe) { localGroupWirePipeInputPipe = pipes.createInputPipe(localGroupWirePipeAdv, new WirePipeListener()); } if (null == localGroupWirePipeOutputPipe) { // Creates the OutputPipe - note that timeout is irrelevant for // propagated pipe. localGroupWirePipeOutputPipe = pipes.createOutputPipe(localGroupWirePipeAdv, 1 * TimeUtils.ASECOND); } if (localGroupWirePipeOutputPipe == null) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Cannot get OutputPipe for current group"); } } } catch (Exception failed) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("PipeService not ready yet. Trying again in 1 second."); } // Try again in one second. scheduleOpenPipes(TimeUtils.ASECOND); return; } if (advertisingGroup != null) { try { pipes = advertisingGroup.getPipeService(); if (null == pipes) { // Try again in one second. scheduleOpenPipes(TimeUtils.ASECOND); return; } if (null == wirePipeInputPipe) { wirePipeInputPipe = pipes.createInputPipe(advGroupPropPipeAdv, new WirePipeListener()); } if (null == wirePipeOutputPipe) { wirePipeOutputPipe = pipes.createOutputPipe(advGroupPropPipeAdv, 1 * TimeUtils.ASECOND); } if (wirePipeOutputPipe == null) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Cannot get OutputPipe for current group"); } } } catch (Exception failed) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Could not open pipes in local group. Trying again in 1 second."); } // Try again in one second. scheduleOpenPipes(TimeUtils.ASECOND); return; } } if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("Propagate Pipes opened."); } } private synchronized void closeWirePipes() { if (localGroupWirePipeInputPipe != null) { localGroupWirePipeInputPipe.close(); localGroupWirePipeInputPipe = null; } if (localGroupWirePipeOutputPipe != null) { localGroupWirePipeOutputPipe.close(); localGroupWirePipeOutputPipe = null; } if (wirePipeInputPipe != null) { wirePipeInputPipe.close(); wirePipeInputPipe = null; } if (wirePipeOutputPipe != null) { wirePipeOutputPipe.close(); wirePipeOutputPipe = null; } if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("Propagate Pipes closed."); } } /** * Adapter class for receiving wire pipe messages */ private class WirePipeListener implements PipeMsgListener { /** * {@inheritDoc} */ public void pipeMsgEvent(PipeMsgEvent event) { Message msg = event.getMessage(); boolean failure = (null != msg.getMessageElement(MESSAGE_NAMESPACE, FAILURE_ELEMENT_NAME)); boolean response = (null != msg.getMessageElement(MESSAGE_NAMESPACE, RESPONSE_ELEMENT_NAME)); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine( "Received a PeerView " + (failure ? "failure " : "") + (response ? "response " : "") + "message [" + msg + "] on propagated pipe " + event.getPipeID()); } if (!failure && !response) { // If this is not a failure message then decide if we will respond. // // We play a game that is tuned by the view size so that the expectation of number of responses is equal to // minHappyPeerView. The game is to draw a number between 0 and the pv size. If the result is < minHappyPeerView, // then we win (respond) else we lose (stay silent). The probability of winning is HAPPY_SIZE/viewsize. If each of // the viewsize peers plays the same game, on average HAPPY_SIZE of them win (with a significant variance, but // that is good enough). If viewsize is <= HAPPY_SIZE, then all respond. This is approximate, of course, since // the view size is not always consistent among peers. int viewsize = PeerView.this.localView.size(); if (viewsize > minHappyPeerView) { int randinview = random.nextInt(viewsize); if (randinview >= minHappyPeerView) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Ignoring " + msg + " from pipe " + event.getPipeID()); } // We "lose". return; } } // Else, we always win; don't bother playing. } // Fabricate dummy src and dst addrs so that we can call processIncoming. These are // only used for traces. The merit of using the pipeID is that it is recognizable // in these traces. EndpointAddress src = new EndpointAddress(event.getPipeID(), SERVICE_NAME, null); EndpointAddress dest = new EndpointAddress(event.getPipeID(), SERVICE_NAME, null); try { // call the peerview. PeerView.this.processIncomingMessage(msg, src, dest); } catch (Throwable ez) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Failed processing " + msg + " from pipe " + event.getPipeID(), ez); } } } } private synchronized void scheduleAdvertisingGroupQuery(long delay) { if (closed) { return; } TimerTask task = new AdvertisingGroupQueryTask(); addTask(task, delay, -1); } /** * Class implementing the query request on the AdvertisingGroup */ private final class AdvertisingGroupQueryTask extends TimerTask { /** * {@inheritDoc} */ @Override public boolean cancel() { boolean res = super.cancel(); return res; } /** * {@inheritDoc} */ @Override public void run() { try { if (closed) { return; } OutputPipe op = wirePipeOutputPipe; if (null != op) { Message msg = makeMessage(self, false, false); op.send(msg); } } catch (Throwable all) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LO
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -