⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rendezvousserviceimpl.java

📁 jxme的一些相关程序,主要是手机上程序开发以及手机和计算机通信的一些程序资料,程序编译需要Ant支持
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
					// is changed to now include the group id. The following code allows to also
					// process messages from older versions of this protocol. It should be removed
					// when the older version will no longer be supported (probably the next release).
					// Try to see if we have the old version Propagate message.

					el = message.getElement(RendezVousPropagateMessage.Name);

					// END OF COMPATIBILITY FIX

					if (el == null) {
						// There is really no header. Discard message
						return;
					}
				}
				propHdr = new RendezVousPropagateMessage(el.getStream());

			} catch(Exception neverMind) {
				// No header. Discard the message
				return;
			}
			// If the message does not have a message id, create one now.
			// This is a safety guard.
			if (propHdr.getMsgId() == null) {
				// Improper message. Drop it
				if (LOG.isEnabledFor(Priority.DEBUG)) {
					LOG.debug("Received a message without msgid. Drop it.");
				}

				return;
			}
			// Get the destination real destination of the message
			String sName = propHdr.getDestSName();
			String sParam = propHdr.getDestSParam();
			// Check if we have a local listener for this message
			rdv.processReceivedMessage(message,
			                           propHdr.getMsgId(),
			                           sName,
			                           sParam,
			                           srcAddr,
			                           dstAddr);
		}
	}

	protected void processReceivedMessage(Message message,
	                                      String msgId,
	                                      String sName,
	                                      String sParam,
	                                      EndpointAddress srcAddr,
	                                      EndpointAddress dstAddr) {

		if (LOG.isEnabledFor(Priority.DEBUG))
			LOG.debug("processReceivedMessage starts");
		EndpointListener listener = (EndpointListener) listeners.get(sName + sParam);
		if (listener != null) {
			// We have a local listener for this message.
			// Deliver it.
			if (LOG.isEnabledFor(Priority.DEBUG))
				LOG.debug("Local listener is called");
			// Rebuild the address
			EndpointAddress realDst = endpoint.newEndpointAddress(dstAddr.toString());
			realDst.setServiceName(sName);
			realDst.setServiceParameter(sParam);

			// Add the message id into our local cache
			// Since the message is going to be locally delivered to services
			// that may want to repropagate it, we must remember it has already been here.

			if (isMsgIdRecorded(msgId)) {
				// Got that one already. Discard.
				return;
			}
			addMsgId(msgId);
			listener.processIncomingMessage(message, srcAddr, realDst);
		} else {
			// We do not have a local listener. Reprogate it.
			addMsgId(msgId);
			if (LOG.isEnabledFor(Priority.DEBUG)) {
				LOG.debug("No local listener - repropagate");
			}
			rePropagateInGroup(message);
		}
	}

	/**
	 * Clients of the rendezvous service can use this to receive raw
	 * propagation rather than crawling query/responses. It takes care of TTL
	 * counting and loop detection.
	 *
	 * @param  name             The name of the listener.
	 * @param  listener         An EndpointListener to process the message.
	 * @exception  IOException  Description of Exception
	 */

	public synchronized void addPropagateListener(String name,
	        EndpointListener listener) throws IOException {
		listeners.put(name, listener);
		try {
			endpoint.addListener(name, listener);
		} catch (Exception ez1) {
			if (LOG.isEnabledFor(Priority.WARN))
				LOG.warn("Cannot connect listener " + name, ez1);
		}
	}


	/**
	 * Removes a Listener previously added with addPropagateListener.
	 *
	 * @param  name             The name of the listener.
	 * @param  listener         An EndpointListener to process the message.
	 * @exception  IOException  Description of Exception
	 */
	public synchronized void removePropagateListener(String name,
	        EndpointListener listener) throws IOException {

		EndpointListener myListener = (EndpointListener) listeners.get(name);
		if (myListener == null) {
			throw new IOException("No propagate listener named \"" + name + "\"");
		}

		listeners.remove(name);
	}


	/**
	 * Add a listener for RenDezVousEvents
	 *
	 * @param  listener  An RendezvousListener to process the event.
	 */

	public void addListener(RendezvousListener listener) {

		applisteners.addElement(listener);
	}


	/**
	 * Removes a Listener previously added with addListener.
	 *
	 * @param  listener  the RendezvousListener listener remove
	 * @return           Description of the Returned Value
	 */

	public boolean removeListener(RendezvousListener listener) {

		return (applisteners.removeElement(listener));
	}


	public void propagate(Message msg,
	                      String serviceName,
	                      String serviceParam,
	                      int defaultTTL)
	throws IOException {
		if (defaultTTL == 1)
			propagateToNeighbors(msg, serviceName,
			                     serviceParam, defaultTTL, null);
		else
			propagateInGroup(msg, serviceName,
			                 serviceParam, defaultTTL, null);
	}


	/*
	 *  Propagation with TTL.
	 *  The methods that follow provide an abstract propagation that may
	 *  use point-to-point communication through rendezvous or direct
	 *  propagation on the network, or both.
	 *  The caller only specifies which scope is desired (neighbors or all
	 *  group). The strategy actualy used is not specified at the API level,
	 *  only whether an effort is made at reaching farther, or not.
	 *
	 */
	/**
	 * This API is provided to the Core implementation in order to
	 * propagate a message onto as many peers on the local network
	 * as possible. Typically the message will go to all the peers to
	 * which at least one endpoint transport can address without using
	 * the router.
	 *
	 * Only a single HOP at a time is performed. Messages are always
	 * delivered to the destination handler on arrival. This handler
	 * is responsible for repropagating further, if deemed appropropriate.
	 *
	 * Loop and TTL control are performed automatically.
	 *
	 * Messages can be propagated via this method for the first time or
	 * can be re-propagated by re-using a message that came in via propagation.
	 * In the later case, the TTL and loop detection parameters CANNOT be
	 * re-initialized. If one wants to "re-propagate" a message with a new TTL
	 * and blank gateways list one must generate a completely new message.
	 * This limits the risk of accidental propagation storms, although they
	 * can always be engineered deliberately.
	 *
	 * Note: The original msg is not modified and may be reused upon return.
	 *
	 * @param  msg              is the message to propagate.
	 * @param  serviceName      is the name of the service
	 * @param  serviceParam     is the parameter of the service
	 * @param  prunePeer        is a peer to prune in the propagation.
	 * @param  defaultTTL       Description of Parameter
	 * @exception  IOException  Description of Exception
	 */
	public void propagateToNeighbors(Message msg,
	                                 String serviceName,
	                                 String serviceParam,
	                                 int defaultTTL,
	                                 String prunePeer) throws IOException {

		// Do not touch the original.
		Message dupMsg = (Message) msg.clone();

		// If this fails it's fatal. We just let it bail out.
		if (updatePropHeader(dupMsg, defaultTTL, serviceName, serviceParam)) {

			// Now that the message is safe we'll try to propagate it.
			// It may fail silently. Propagation is never garanteed.
			sendToNetwork(dupMsg, PropSName, PropPName);
		}
	}


	/**
	 * This API is provided to the Core implementation in order to
	 * propagate a message onto as many peers in the group as possible.
	 *
	 * Only a single HOP at a time is performed. Messages are always
	 * delivered to the destination handler on arrival. This handler
	 * is responsible for repropagating further, if deemed appropropriate.
	 *
	 * Loop and TTL control are performed automatically.
	 *
	 * Messages can be propagated via this method for the first time or
	 * can be re-propagated by re-using a message that came in via propagation.
	 * In the later case, the TTL and loop detection parameters CANNOT be
	 * re-initialized. If one wants to "re-propagate" a message with a new TTL
	 * and blank gateways list one must generate a completely new message.
	 * This limits the risk of accidental propagation storms, although they
	 * can always be engineered deliberately.
	 *
	 * Note: The original msg is not modified and may be reused upon return.
	 *
	 * @param  msg              is the message to propagate.
	 * @param  serviceName      is the name of the service
	 * @param  serviceParam     is the parameter of the service
	 * @param  defaultTTL       Description of Parameter
	 * @exception  IOException  Description of Exception
	 */

	public void propagateInGroup(Message msg,
	                             String serviceName,
	                             String serviceParam,
	                             int defaultTTL,
	                             String prunePeer) throws IOException {

		// Do not touch the original.
		Message dupMsg = (Message) msg.clone();

		// If this fails it's fatal. We just let it bail out.
		if (updatePropHeader(dupMsg, defaultTTL, serviceName, serviceParam)) {

			// Now that the message is safe we'll try to propagate it.
			// It may fail silently. Propagation is never garanteed.

			sendToNetwork(dupMsg, PropSName, PropPName);
			sendToEachRendezVous(dupMsg, PropSName, PropPName);
			sendToEachClient(dupMsg, PropSName, PropPName);
		}
	}


	protected void rePropagateInGroup(Message msg) {

		try {
			// Do not touch the original.
			Message dupMsg = (Message) msg.clone();

			// If this fails it's fatal. We just let it bail out.
			if (updatePropHeader(dupMsg, MaxTTL, null, null)) {

				// Now that the message is safe we'll try to propagate it.
				// It may fail silently. Propagation is never garanteed.

				sendToNetwork(dupMsg, PropSName, PropPName);
				sendToEachRendezVous(dupMsg, PropSName, PropPName);
				sendToEachClient(dupMsg, PropSName, PropPName);
			}
		} catch (Exception ez1) {
			// Not much we can do
			if (LOG.isEnabledFor(Priority.DEBUG))
				LOG.debug("Cannot repropagate the message ", ez1);
		}
	}

	/**
	 * Check if a particular peer has already received the propagated
	 * message, based on the Propagation header embedded within the message.
	 *
	 * @param  msg   Description of Parameter
	 * @param  peer  Description of Parameter
	 * @return       The peerInPropHeader value
	 */

	private boolean isPeerInPropHeader(Message msg,
	                                   String peer) {

		MessageElement elem = msg.getElement(headerName);
		if (elem == null) {
			// XXX-lomax@jxta.or: BACK COMPATIBILITY December 14th 2001
			// We need to also associate a Message Filter with the following name
			// for backward compatibility reasons: the name of the message element
			// is changed to now include the group id. The following code allows to also
			// process messages from older versions of this protocol. It should be removed
			// when the older version will no longer be supported (probably the next release).
			// Try to see if we have the old version Propagate message.

			elem = msg.getElement(RendezVousPropagateMessage.Name);

			// END OF COMPATIBILITY FIX

			if (elem == null) {
				// There is really no header.
				return false;
			}
		}

		try {
			InputStream ip = elem.getStream();
			if (ip == null) {
				return false;
			}
			RendezVousPropagateMessage propHdr = null;
			propHdr = new RendezVousPropagateMessage(ip);

			if (propHdr.hasInPath(peer)) {
				return true;
			}
			return false;
		} catch (Exception ez1) {
			return false;
		}
	}


	/**
	 *Description of the Method
	 *
	 * @param  destPeer  Description of Parameter
	 * @return           Description of the Returned Value
	 */
	private EndpointAddress mkAddress(String destPeer) {
		try {
			PeerID asID = (PeerID) IDFactory.fromURL(IDFactory.jxtaURL(destPeer));
			String asString = "jxta://"
			                  + asID.getUniqueValue().toString();
			EndpointAddress addr = endpoint.newEndpointAddress(asString);
			return addr;
		} catch (Exception e) {
			if (LOG.isEnabledFor(Priority.WARN))
				LOG.warn("Invalid peerID string : " + destPeer, e );
			return null;
		}
	}


	/**
	 *Description of the Method
	 *
	 * @param  destPeer  Description of Parameter
	 * @param  serv      Description of Parameter
	 * @param  parm      Description of Parameter
	 * @return           Description of the Returned Value
	 */
	private EndpointAddress mkAddress(String destPeer,
	                                  String serv,
	                                  String parm) {

		EndpointAddress addr = mkAddress(destPeer);
		addr.setServiceName(serv);
		addr.setServiceParameter(parm);
		return addr;
	}


	/**
	 *Adds a feature to the Rdv attribute of the RendezVousServiceImpl object
	 *
	 * @param  peer   The feature to be added to the Rdv attribute
	 * @param  lease  The feature to be added to the Rdv attribute
	 */
	private synchronized void addRdv(String peer, long lease) {

		if (peer == null) {
			return;
		}

		if (!isClient) {
			// This is the first rendezVous
			rendezVous = new Vector();
			isClient = true;
		}

		PeerConnection tmp = new PeerConnection(group, peer, lease);

		// Check if the peer is already registered.
		if (rendezVous.contains(tmp)) {
			if (LOG.isEnabledFor(Priority.DEBUG)) {
				LOG.debug("   already registered");
			}
			// Already connected, just upgrade the lease
			try {
				int index = rendezVous.indexOf(tmp);
				if (index == -1) {
					// This should not happen
					if (LOG.isEnabledFor(Priority.DEBUG))
						LOG.debug("addRdv: cannot access PeerConnection for " + peer);
				} else {
					tmp = (PeerConnection) rendezVous.elementAt(index);
					long time = System.currentTimeMillis();
					if (lease < 0) {
						// Forever
						tmp.setLease(lease);
					} else {
						tmp.setLease(time + lease);
					}
					tmp.connect();
					if (LOG.isEnabledFor(Priority.DEBUG)) {
						LOG.debug("Got renewed leased for " + peer);
					}
				}
			} catch (Exception e) {
				if (LOG.isEnabledFor(Priority.DEBUG))
					LOG.debug("addRdv has failed [1] ", e);
			}
			return;
		}

		tmp.connect();
		rendezVous.addElement(tmp);
		generateEvent(RendezvousEvent.RDVCONNECT, peer);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -