📄 rendezvousserviceimpl.java
字号:
*/ public synchronized boolean addPropagateListener(String name, EndpointListener listener) { // FIXME: jice@jxta.org - 20040726 - The naming of PropagateListener is inconsistent with that of EndpointListener. It is // not a major issue but is ugly since messages are always addressed with the EndpointListener convention. The only way to // fix it is to deprecate addPropagateListener in favor of a two argument version and wait for applications to adapt. Only // once that transition is over, will we be able to know where the separator has to be. if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Adding listener (" + listener + ") for name= " + name); } // Take the optimistic stance. Since we're synchronized, replace the current one, and if we find there was one and it's // not the same, put things back as they were. EndpointListener current = (EndpointListener) propListeners.put(name, listener); if ((current != null) && (current != listener)) { propListeners.put(name, current); return false; } return true; } /** * {@inheritDoc} */ public boolean addPropagateListener(String serviceName, String serviceParam, EndpointListener listener) { // Until the old API is killed, the new API behaves like the old one (so that name // collisions are still detected if both APIs are in use). return addPropagateListener(serviceName + serviceParam, listener); } public synchronized EndpointListener getListener(String str) { return (EndpointListener) propListeners.get(str); } /** * {@inheritDoc} */ public synchronized EndpointListener removePropagateListener(String name, EndpointListener listener) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Removing listener (" + listener + ") for name= " + name); } // Take the optimistic stance. Since we're synchronized, remove it, and if we find the invoker is cheating. Put it back. EndpointListener current = (EndpointListener) propListeners.remove(name); if ((current != null) && (current != listener)) { propListeners.put(name, current); return null; } return current; } /** * {@inheritDoc} */ public EndpointListener removePropagateListener(String serviceName, String serviceParam, EndpointListener listener) { // Until the old API is killed, the new API behaves like the old one (so that name // collisions are still detected if both APIs are in use). return removePropagateListener(serviceName + serviceParam, listener); } /** * {@inheritDoc} */ public void propagate(Message msg, String serviceName, String serviceParam, int defaultTTL) throws IOException { RendezVousServiceProvider currentProvider = provider; if (null == currentProvider) { throw new IOException("No RDV provider"); } currentProvider.propagate(msg, serviceName, serviceParam, defaultTTL); } /** * {@inheritDoc} */ public void propagate(Enumeration destPeerIDs, Message msg, String serviceName, String serviceParam, int defaultTTL) throws IOException { RendezVousServiceProvider currentProvider = provider; if (null == currentProvider) { throw new IOException("No RDV provider"); } currentProvider.propagate(destPeerIDs, msg, serviceName, serviceParam, defaultTTL); } /** * {@inheritDoc} */ public void walk(Message msg, String serviceName, String serviceParam, int defaultTTL) throws IOException { RendezVousServiceProvider currentProvider = provider; if (null == currentProvider) { throw new IOException("No RDV provider"); } currentProvider.walk(msg, serviceName, serviceParam, defaultTTL); } /** * {@inheritDoc} */ public void walk(Vector destPeerIDs, Message msg, String serviceName, String serviceParam, int defaultTTL) throws IOException { RendezVousServiceProvider currentProvider = provider; if (null == currentProvider) { throw new IOException("No RDV provider"); } currentProvider.walk(destPeerIDs, msg, serviceName, serviceParam, defaultTTL); } /** * {@inheritDoc} */ public Vector getLocalWalkView() { Vector tmp = new Vector(); PeerView currView = rpv; if (null == currView) { return tmp; } Iterator eachPVE = Arrays.asList(currView.getView().toArray()).iterator(); while (eachPVE.hasNext()) { PeerViewElement peer = (PeerViewElement) eachPVE.next(); RdvAdvertisement adv = peer.getRdvAdvertisement(); tmp.add(adv); } return tmp; } /** * {@inheritDoc} */ public void propagateToNeighbors(Message msg, String serviceName, String serviceParam, int ttl, String prunePeer) throws IOException { propagateToNeighbors(msg, serviceName, serviceParam, ttl); } /** * {@inheritDoc} */ public void propagateToNeighbors(Message msg, String serviceName, String serviceParam, int ttl) throws IOException { RendezVousServiceProvider currentProvider = provider; if (null == currentProvider) { throw new IOException("No RDV provider"); } currentProvider.propagateToNeighbors(msg, serviceName, serviceParam, ttl); } /** * {@inheritDoc} */ public void propagateInGroup(Message msg, String serviceName, String serviceParam, int ttl, String prunePeer) throws IOException { propagateInGroup(msg, serviceName, serviceParam, ttl); } /** * {@inheritDoc} */ public void propagateInGroup(Message msg, String serviceName, String serviceParam, int ttl) throws IOException { RendezVousServiceProvider currentProvider = provider; if (null == currentProvider) { throw new IOException("No RDV provider"); } currentProvider.propagateInGroup(msg, serviceName, serviceParam, ttl); } /** * {@inheritDoc} */ public final void addListener(RendezvousListener listener) { eventListeners.add(listener); } /** * {@inheritDoc} */ public final boolean removeListener(RendezvousListener listener) { return eventListeners.remove(listener); } /** * Creates a rendezvous event and sends it to all registered listeners. */ public final void generateEvent(int type, ID regarding) { Iterator eachListener = Arrays.asList(eventListeners.toArray()).iterator(); RendezvousEvent event = new RendezvousEvent(getInterface(), type, regarding); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Calling listeners for " + event); } while (eachListener.hasNext()) { RendezvousListener aListener = (RendezvousListener) eachListener.next(); try { aListener.rendezvousEvent(event); } catch (Throwable ignored) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Uncaught Throwable in listener (" + aListener + ")", ignored); } } } } private synchronized void startWatchDogTimer() { stopWatchDogTimer(); autoRdvTask = new RdvWatchdogTask(); // Now that we have an Auto-switch flag we only use the higher timeout // if auto-switch is off . // Set a watchdog, so the peer will become rendezvous if, after rdv_watchdog_interval it // still has not connected to any rendezvous peer. timer.schedule( autoRdvTask, rdv_watchdog_interval, rdv_watchdog_interval); } private synchronized void stopWatchDogTimer() { RdvWatchdogTask tw = autoRdvTask; if (tw != null) { autoRdvTask.cancel(); autoRdvTask = null; } } /** * Edge Peer mode connection watchdog. */ private class RdvWatchdogTask extends TimerTask { /** * {@inheritDoc} */ public synchronized void run() { try { if (!isRendezVous()) { Enumeration rdvs = getConnectedRendezVous(); if (!rdvs.hasMoreElements()) { // This peer has not been able to connect to any rendezvous peer. // become one. // become a rendezvous peer. startRendezVous(); } } else { // Perhaps we can demote ourselves back to an edge int numberOfClients = getConnectedPeerIDs().size(); int peerViewSize = getLocalWalkView().size(); boolean isManyElementsInPeerView = (peerViewSize > DEMOTION_MIN_PEERVIEW_COUNT); boolean isFewClients = (numberOfClients < DEMOTION_MIN_CLIENT_COUNT); if (isManyElementsInPeerView) { if (numberOfClients == 0) { // Demote ourselves if there are no clients and // there are more than the minimum rendezvous around stopRendezVous(); } else if (isFewClients && (RendezVousServiceImpl.random.nextDouble() < DEMOTION_FACTOR)) { // Randomly Demote ourselves if there are few clients and // there are many rendezvous stopRendezVous(); } } } } catch (Throwable all) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Uncaught Throwable in Timer : " + Thread.currentThread().getName(), all); } } } } public boolean isMsgIdRecorded(UUID id) { boolean found; synchronized (msgIds) { found = msgIds.contains(id); } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug(id + " = " + found); } return found; } /** * Checks if a message id has been recorded * * @param id message to record. * @result true if message was added otherwise (duplicate) false. */ public boolean addMsgId(UUID id) { synchronized (msgIds) { if (isMsgIdRecorded(id)) { // Already there. Nothing to do return false; } if (msgIds.size() < MAX_MSGIDS) { msgIds.add(id); } else { msgIds.set((messagesReceived % MAX_MSGIDS), id); } messagesReceived++; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Added Message ID : " + id); } return true; } public UUID createMsgId() { return UUIDFactory.newSeqUUID(); } /** * Get the current provider. This is for debugging purposes only. * * @deprecated This is private for debugging and diagnostics only. */ net.jxta.impl.rendezvous.RendezVousServiceProvider getRendezvousProvider() { return provider; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -