rendezvousserviceimpl.java

来自「jxme的一些相关程序,主要是手机上程序开发以及手机和计算机通信的一些程序资料,」· Java 代码 · 共 2,201 行 · 第 1/5 页

JAVA
2,201
字号
	 *Description of the Method
	 *
	 * @param  msg  Description of Parameter
	 */
	private void processRdvAdvReply(Message msg) {

		if (monitor == null) {
			return;
		}

		Advertisement adv = null;
		try {
			adv = AdvertisementFactory.newAdvertisement(textXml,
			        msg.getElement(RdvAdvReply).getStream());
		} catch (Exception e) {
			return;
		}

		if (adv == null) {
			return;
		}

		monitor.discovered(adv);
		return;
	}


	/**
	 *Description of the Method
	 *
	 * @param  peer   Description of Parameter
	 * @param  adv    Description of Parameter
	 * @param  lease  Description of Parameter
	 * @return        Description of the Returned Value
	 */
	private boolean sendReply(String peer, Advertisement adv, long lease) {

		localPeerAdv = group.getPeerAdvertisement();

		if (LOG.isEnabledFor(Priority.DEBUG)) {
			LOG.debug("sendReply");
		}
		// Try to get an EndpointMessenger to that peer.
		// First publish the advertisement
		try {
			// This is not our own peer adv so we must not keep it
			// longer than its expiration time.
			group.getDiscoveryService().publish(adv,
			                                    DiscoveryService.PEER,
			                                    DiscoveryService.DEFAULT_EXPIRATION,
			                                    DiscoveryService.DEFAULT_EXPIRATION);
		} catch (Exception e) {
			if (LOG.isEnabledFor(Priority.DEBUG))
				LOG.debug("   publish failed with " + e);
		}

		// Make sure that EndpointService has also the document
		publishInParentGroup(adv);

		ID asID;
		EndpointAddress addr;

		try {
			addr = mkAddress(peer,
			                 pName,
			                 pParam);

		} catch (Exception caught) {
			if (LOG.isEnabledFor(Priority.DEBUG))
				LOG.debug(" malformed peer id " + peer);
			return false;
		}

		// Check that the EndpointService can talk to that peer
		if (!endpoint.ping(addr)) {
			if (LOG.isEnabledFor(Priority.DEBUG))
				LOG.debug("   cannot get route to peer " + addr);
			return false;
		}

		EndpointMessenger messenger = null;
		try {
			messenger = endpoint.getMessenger(addr);
			if (messenger == null) {
				if (LOG.isEnabledFor(Priority.DEBUG))
					LOG.debug("    no messenger");
				return false;
			}
		} catch (Exception e) {
			if (LOG.isEnabledFor(Priority.DEBUG))
				LOG.debug("   getting messenger failed with ", e);
			return false;
		}

		Message msg = endpoint.newMessage();

		ByteArrayInputStream ipPeer =
		    new ByteArrayInputStream(localPeerId.getBytes());
		ByteArrayInputStream ipLease =
		    new ByteArrayInputStream(Long.toString(lease).getBytes());

		try {
			InputStream ip = null;
			Document doc = null;
			doc = localPeerAdv.getDocument(textXml);
			ip = doc.getStream();

			msg.addElement(msg.newMessageElement(ConnectedRdvAdvReply,
			                                     textXml,
			                                     ip));

			msg.setString(ConnectedPeerReply, localPeerId);
			msg.setString(ConnectedLeaseReply, Long.toString(lease));

			messenger.sendMessage(msg);
			if (LOG.isEnabledFor(Priority.DEBUG)) {
				LOG.debug("   message sent");
			}
		} catch (Exception e) {
			if (LOG.isEnabledFor(Priority.DEBUG))
				LOG.debug("  failed with ", e);
			return false;
		}
		return true;
	}


	/**
	 * Propagates on all endpoint protocols.
	 * This is the internal version. See also the public one that adds
	 * ttl/loop control.
	 *
	 * Note: The original msg is not modified and may be reused upon return.
	 *
	 * @param  msg          is the message to propagate.
	 * @param  serviceName  is the name of the service
	 * @param  queueName    Description of Parameter
	 */
	private void sendToNetwork(Message msg,
	                           String  serviceName,
	                           String  queueName) {
		try {
			endpoint.propagate(msg,
			                   serviceName,
			                   queueName);
		} catch (Exception e) {
			if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(e.toString());
		}
	}


	/**
	 * Sends to all rdv clients.
	 * This is the internal version. See also the public one that adds
	 * ttl/loop control.
	 *
	 * Note: The original msg is not modified and may be reused upon return.
	 *
	 * @param  msg           is the message to propagate.
	 * @param  serviceName   is the name of the service
	 * @param  serviceParam  is the parameter of the service
	 */
	private void sendToEachClient(Message msg,
	                              String serviceName,
	                              String serviceParam) {

		if (LOG.isEnabledFor(Priority.DEBUG))
			LOG.debug("sendToEachClient");
		if ((!isRendezVous) || (clients == null) || (clients.size() == 0)) {
			if (LOG.isEnabledFor(Priority.DEBUG))
				LOG.debug("  no clients");
			return;
		}

		// Do it backwards so that we get the size only once and
		// do not have to make adjustments when we remove entry.
		for (int i = clients.size() - 1; i >= 0; --i) {
			PeerConnection pConn = null;
			try {
				pConn = (PeerConnection) clients.elementAt(i);

				if (LOG.isEnabledFor(Priority.DEBUG))
					LOG.debug("to: " + pConn.getPeer());
				// Check if this client has already processed this propagated
				// message.
				if (isPeerInPropHeader(msg, pConn.getPeer())) {
					// Discard.
					continue;
				}

				Message tmpMsg = (Message) msg.clone();
				pConn.sendMessage(tmpMsg, serviceName, serviceParam);
				if (LOG.isEnabledFor(Priority.DEBUG))
					LOG.debug("   sent to " + pConn.getPeer());
			} catch (Exception e) {
				if (LOG.isEnabledFor(Priority.DEBUG))
					LOG.debug("   failed sent to " + pConn.getPeer(), e);
			}
		}
		if (LOG.isEnabledFor(Priority.DEBUG))
			LOG.debug("   done");
	}


	/**
	 * Sends to rdv all rendezvous.
	 * This is the internal version. See also the public one that adds
	 * ttl/loop control.
	 *
	 * Note: The original msg is not modified and may be reused upon return.
	 *
	 * @param  msg           is the message to propagate.
	 * @param  serviceName   is the name of the service
	 * @param  serviceParam  is the parameter of the service
	 * @param  prunePeer     is a peer to prune in the propagation.
	 */
	private void sendToEachRendezVous(Message msg,
	                                  String serviceName,
	                                  String serviceParam) {

		if (LOG.isEnabledFor(Priority.DEBUG))
			LOG.debug("sendToEachRendezVous");
		if ((!isClient) || (rendezVous == null) || (rendezVous.size() == 0)) {
			if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("  no rendezvous");
			return;
		}

		if (LOG.isEnabledFor(Priority.DEBUG))
			LOG.debug("    Sending to all rendezvous");
		// Do it backwards so that we get the size only once and
		// do not have to make adjustments when we remove entry.
		for (int i = rendezVous.size() - 1; i >= 0; --i) {
			try {
				PeerConnection pConn =
				    (PeerConnection) rendezVous.elementAt(i);


				// Check if this rendezvous has already processed
				// this propagated message.
				if (isPeerInPropHeader(msg, pConn.getPeer())) {
					// Discard.
					continue;
				}

				Message tmpMsg = (Message) msg.clone();
				pConn.sendMessage(tmpMsg, serviceName, serviceParam);

			} catch (Exception e) {
				if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("   failed with " + e);
			}
		}
		if (LOG.isEnabledFor(Priority.DEBUG))
			LOG.debug("     done");
	}


	/**
	 *Description of the Method
	 *
	 * @param  peer  Description of Parameter
	 */
	private void ping(String peer) {

		if (LOG.isEnabledFor(Priority.DEBUG))
			LOG.debug("ping " + peer);
		EndpointMessenger messenger = null;
		try {
			messenger = endpoint.getMessenger(mkAddress(peer,
			                                  pName,
			                                  pParam));

			if (messenger == null) {
				if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("    no messenger");
				return;
			}
		} catch (Exception e) {
			if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("   getting messenger failed with " + e);
			return;
		}

		Message msg = endpoint.newMessage();

		try {
			msg.setString(PingRequest, localPeerId);

			messenger.sendMessage(msg);
			if (LOG.isEnabledFor(Priority.DEBUG))
				LOG.debug("   message sent");
		} catch (Exception e) {
			if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("  failed with " + e);
		}
		return;
	}


	/**
	 *Description of the Method
	 */
	private synchronized void gcClients() {

		if (LOG.isEnabledFor(Priority.DEBUG))
			LOG.debug("GC CLIENTS starts");
		if ((clients == null) || (clients.size() == 0)) {
			// Nothing to do.
			return;
		}
		PeerConnection pConn = null;

		for (int i = 0; i < clients.size(); ++i) {

			try {
				pConn = (PeerConnection) clients.elementAt(i);
				long time = System.currentTimeMillis();

				if ((!pConn.isConnected()) || (pConn.getLease() < time)) {
					// This client has dropped out. Remove it.
					// or the leased is over.

					if (LOG.isEnabledFor(Priority.DEBUG)) {
						LOG.debug("GC CLIENT: drop " + pConn.getPeer());
						LOG.debug("        lease= " + pConn.getLease());
						LOG.debug(" current time= " + time);
						LOG.debug("    connected= " + pConn.isConnected());
					}

					clients.removeElementAt(i);
					pConn.close();
					// Decrement i because the index has changed
					--i;
					continue;
				}
				continue;
			} catch (Exception e) {
				if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("gcClient failed " + e);
			}
		}
		if (LOG.isEnabledFor(Priority.DEBUG))
			LOG.debug("GC RENDEZVOUS done");
	}


	private synchronized void gcRdvs() {

		if (LOG.isEnabledFor(Priority.DEBUG))
			LOG.debug("GC Rendezvous starts");
		if (rendezVous == null) {
			return;
		}
		PeerConnection pConn = null;

		for (int i = 0; i < rendezVous.size(); ++i) {

			try {
				pConn = (PeerConnection) rendezVous.elementAt(i);
				long time = System.currentTimeMillis();

				if ((!pConn.isConnected()) || (pConn.getLease() < time)) {
					// This client has dropped out. Remove it.
					// or the leased is over.

					if (LOG.isEnabledFor(Priority.DEBUG)) {
						LOG.debug("GC RDV: drop " + pConn.getPeer());
						LOG.debug("        lease= " + pConn.getLease());
						LOG.debug(" current time= " + time);
						LOG.debug("    connected= " + pConn.isConnected());
					}

					rendezVous.removeElementAt(i);
					pConn.close();
					removedRendezVous.add(pConn);
					// Decrement i because the index has changed
					--i;
					continue;
				}
				continue;
			} catch (Exception e) {
				if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("ping failed " + e);
			}
		}
		if (LOG.isEnabledFor(Priority.DEBUG))
			LOG.debug("GC RENDEZVOUS done");
	}

	private void gc() {
		// GC the clients
		gcClients();
		// GC the rendezvous
		gcRdvs();
	}


	/**
	 *Description of the Method
	 *
	 * @param  peer  Description of Parameter
	 */
	private synchronized void clientIsConnected(String peer) {

		if (clients == null) {
			return;
		}
		PeerConnection pConn = null;

		for (int i = 0; i < clients.size(); ++i) {

			try {
				pConn = (PeerConnection) clients.elementAt(i);
				if (pConn.getPeer().equals(peer)) {
					pConn.connect();
					return;
				}
			} catch (Exception e) {
			}
		}
		return;
	}


	/**
	 *Description of the Method
	 *
	 * @param  msg  Description of Parameter
	 */
	private void processPingReply(Message msg) {
		String peer = msg.getString(PingReply);
		clientIsConnected(peer);
	}


	/**
	 *Description of the Method
	 *
	 * @param  msg  Description of Parameter
	 */
	private void processPingRequest(Message msg) {

		EndpointMesseng

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?