⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 messagingservice.java

📁 java实现的P2P多agent中间件
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
			if (!msg.hasForeignReceiver()) {
				deliverInLocalPlatfrom(msg, receiverID);
			} 
			else {
				// Dispatch it through the ACC
				if (msg.getTraceID() != null) {
					myLogger.log(Logger.INFO, msg.getTraceID() + " - Activating ACC delivery");
				}
				Iterator addresses = receiverID.getAllAddresses();
				while (addresses.hasNext()) {
					String address = (String) addresses.next();
					try {
						forwardMessage(msg, receiverID, address);
						return;
					} 
					catch (MTPException mtpe) {
						if (myLogger.isLoggable(Logger.WARNING) && !isPersistentDeliveryRetry(msg))
							myLogger.log(Logger.WARNING, "Cannot deliver message to address: " + address + " [" + mtpe.toString() + "]. Trying the next one...");
					}
				}
				notifyFailureToSender(msg, receiverID, new InternalError("No valid address contained within the AID " + receiverID.getName()));
			}
		} 
		catch (NotFoundException nfe) {
			// The receiver does not exist --> Send a FAILURE message
			if (msg.getTraceID() != null) {
				myLogger.log(Logger.WARNING, msg.getTraceID()+" - Receiver does not exist.", nfe);
			}
			notifyFailureToSender(msg, receiverID, new InternalError("Agent not found: " + nfe.getMessage()));
		} 
		catch (IMTPException imtpe) {
			// Can't reach the destination container --> Send a FAILURE message
			myLogger.log(Logger.WARNING, msg.getTraceID()+" - Receiver unreachable.", imtpe);
			notifyFailureToSender(msg, receiverID, new InternalError("Agent unreachable: " + imtpe.getMessage()));
		} 
		catch (ServiceException se) {
			// Service error during delivery --> Send a FAILURE message
			myLogger.log(Logger.WARNING, msg.getTraceID()+" - Service error delivering message.", se);
			notifyFailureToSender(msg, receiverID, new InternalError("Service error: " + se.getMessage()));
		} 
		catch (JADESecurityException jse) {
			// Delivery not authorized--> Send a FAILURE message
			if (msg.getTraceID() != null) {
				myLogger.log(Logger.WARNING, msg.getTraceID()+" - Not authorized.", jse);
			}
			notifyFailureToSender(msg, receiverID, new InternalError("Not authorized: " + jse.getMessage()));
		}
	}
	
	private boolean isPersistentDeliveryRetry(GenericMessage msg) {
		boolean ret = false;
		//#J2ME_EXCLUDE_BEGIN
		ACLMessage acl = msg.getACLMessage();
		if (acl != null) {
			ret = acl.getAllUserDefinedParameters().containsKey(PersistentDeliveryService.ACL_USERDEF_DUE_DATE);
		}
		//#J2ME_EXCLUDE_END
		return ret;
	}
	
	void deliverInLocalPlatfrom(GenericMessage msg, AID receiverID) throws IMTPException, ServiceException, NotFoundException, JADESecurityException {
		if (msg.getTraceID() != null) {
			myLogger.log(Logger.INFO, msg.getTraceID() + " - Activating local-platform delivery");
		}
		
		MainContainer impl = myContainer.getMain();
		if (impl != null) {
			while (true) {
				// Directly use the GADT on the main container
				ContainerID cid = impl.getContainerID(receiverID);
				MessagingSlice targetSlice = oneShotDeliver(cid, msg, receiverID);
				if (targetSlice != null) {
					return;
				}
			}
		} else {
			// Try first with the cached <AgentID;MessagingSlice> pairs
			MessagingSlice cachedSlice = (MessagingSlice) cachedSlices.get(receiverID);
			if (cachedSlice != null) { // Cache hit :-)
				try {
					if (msg.getTraceID() != null) {
						myLogger.log(Logger.INFO, msg.getTraceID() + " - Delivering message to cached slice "+cachedSlice.getNode().getName());
					}
					cachedSlice.dispatchLocally(msg.getSender(), msg, receiverID);
					if (msg.getTraceID() != null) {
						myLogger.log(Logger.INFO, msg.getTraceID() + " - Delivery OK.");
					}
					return;
				} catch (IMTPException imtpe) {
					if (msg.getTraceID() != null) {
						myLogger.log(Logger.INFO, msg.getTraceID() + " - Cached slice for receiver " + receiverID.getName() + " unreachable.");
					}
				} catch (NotFoundException nfe) {
					if (msg.getTraceID() != null) {
						myLogger.log(Logger.INFO, msg.getTraceID() + " - Receiver " + receiverID.getName() + " not found on cached slice container.");
					}
				}
				// Eliminate stale cache entry
				cachedSlices.remove(receiverID);
			}
			
			// Either the receiver was not found in cache or the cache entry was no longer valid
			deliverUntilOK(msg, receiverID);
		}
	}
	
	
	private void deliverUntilOK(GenericMessage msg, AID receiverID) throws IMTPException, NotFoundException, ServiceException, JADESecurityException {
		while (true) {
			MessagingSlice mainSlice = (MessagingSlice) getSlice(MAIN_SLICE);
			ContainerID cid;
			try {
				cid = mainSlice.getAgentLocation(receiverID);
			} 
			catch (IMTPException imtpe) {
				// Try to get a newer slice and repeat...
				mainSlice = (MessagingSlice) getFreshSlice(MAIN_SLICE);
				cid = mainSlice.getAgentLocation(receiverID);
			}
			
			MessagingSlice targetSlice = oneShotDeliver(cid, msg, receiverID);
			if (targetSlice != null) {
				// On successful message dispatch, put the slice into the slice cache
				cachedSlices.put(receiverID, targetSlice);
				return;
			}
		}
	}
	
	private MessagingSlice oneShotDeliver(ContainerID cid, GenericMessage msg, AID receiverID) throws IMTPException, ServiceException, JADESecurityException {
		if (msg.getTraceID() != null) {
			myLogger.log(Logger.FINER, msg.getTraceID()+" - Receiver "+receiverID.getLocalName()+" lives on container "+cid.getName());
		}
		
		MessagingSlice targetSlice = (MessagingSlice) getSlice(cid.getName());
		try {
			try {
				if (msg.getTraceID() != null) {
					myLogger.log(Logger.INFO, msg.getTraceID()+" - Delivering message to slice "+targetSlice.getNode().getName());
				}
				targetSlice.dispatchLocally(msg.getSender(), msg, receiverID);
			} 
			catch (IMTPException imtpe) {
				// Try to get a newer slice and repeat...
				if (msg.getTraceID() != null) {
					myLogger.log(Logger.FINER, msg.getTraceID()+" - Messaging slice on container "+cid.getName()+" unreachable. Try to get a fresh one.");
				}
				
				targetSlice = (MessagingSlice) getFreshSlice(cid.getName());
				if (msg.getTraceID() != null && (targetSlice != null)) {
					myLogger.log(Logger.FINER, msg.getTraceID()+" - Fresh slice for container "+cid.getName()+" found.");
				}
				
				targetSlice.dispatchLocally(msg.getSender(), msg, receiverID);
			}
			if (msg.getTraceID() != null) {
				myLogger.log(Logger.INFO, msg.getTraceID()+" - Delivery OK");
			}
			return targetSlice;
		} 
		catch (NotFoundException nfe) {
			// The agent was found in the GADT, but not on the container where it is supposed to 
			// be. Possibly it moved elsewhere in the meanwhile. ==> Try again.
			if (msg.getTraceID() != null) {
				myLogger.log(Logger.FINER, msg.getTraceID()+" - Receiver "+receiverID.getLocalName()+" not found on container "+cid.getName()+". Possibly he moved elsewhere --> Retry");
			}
		} 
		catch (NullPointerException npe) {
			// This is thrown if targetSlice is null: The agent was found in the GADT, 
			// but his container does not exist anymore. Possibly the agent moved elsewhere in 
			// the meanwhile ==> Try again.
			if (msg.getTraceID() != null) {
				myLogger.log(Logger.FINER, msg.getTraceID()+" - Container "+cid.getName()+" for receiver "+receiverID.getLocalName()+" does not exist anymore. Possibly the receiver moved elsewhere --> Retry");
			}
		}
		
		// Wait a bit before enabling next delivery attempt
		try {Thread.sleep(200);} catch (InterruptedException ie) {}
		return null;
	}
	
	
	private void forwardMessage(GenericMessage msg, AID receiver, String address) throws MTPException {
		// FIXME what if there is no envelope?
		AID aid = msg.getEnvelope().getFrom();
		
		if (aid == null) {
			//System.err.println("ERROR: null message sender. Aborting message dispatch...");
			if (myLogger.isLoggable(Logger.SEVERE))
				myLogger.log(Logger.SEVERE,"ERROR: null message sender. Aborting message dispatch...");
			return;
		}
		
		// FIXME The message can no longer be updated
		// if has no address set, then adds the addresses of this platform
		if(!aid.getAllAddresses().hasNext())
			addPlatformAddresses(aid);
		
		try {
			if (msg.getTraceID() != null) {
				myLogger.log(Logger.INFO, msg.getTraceID() + " - Routing message out to address "+address);
				msg.getEnvelope().addProperties(new Property(ACLMessage.TRACE, msg.getTraceID()));
			}
			localSlice.routeOut(msg.getEnvelope(),msg.getPayload(), receiver, address);
		}
		catch(IMTPException imtpe) {
			throw new MTPException("Error during message routing", imtpe);
		}
		
	}
	
	
	/**
	 * This method is used internally by the platform in order
	 * to notify the sender of a message that a failure was reported by
	 * the Message Transport Service.
	 */
	public void notifyFailureToSender(GenericMessage msg, AID receiver, InternalError ie) {
		ACLMessage acl = msg.getACLMessage();
		if (acl == null) {
			// ACLMessage can be null in case we get a failure delivering a message coming from an external platform (received by a local MTP).
			// In this case in fact the message is encoded. Try to decode it so that a suitable FAILURE response can be sent back.
			// If the payload is mangled in some way (e.g. encrypted) decoding will fail and no suitable FAILURE response will be sent
			try {
				acl = ((IncomingEncodingFilter) encInFilter).decodeMessage(msg.getEnvelope(), msg.getPayload());
				acl.setEnvelope(msg.getEnvelope());
				msg.setACLMessage(acl);
			}
			catch (Exception e) {
				// Just do nothing
				e.printStackTrace();
			}
		}
		if (acl != null && "true".equals(acl.getUserDefinedParameter(ACLMessage.IGNORE_FAILURE))) {
			// Ignore the failure 
			return;
		}
		
		GenericCommand cmd = new GenericCommand(MessagingSlice.NOTIFY_FAILURE, MessagingSlice.NAME, null);
		cmd.addParam(msg);
		cmd.addParam(receiver);
		cmd.addParam(ie);
		
		try {
			submit(cmd);
		}
		catch(ServiceException se) {
			// It should never happen
			se.printStackTrace();
		}
	}
	
	
	/*
	 * This method is called before preparing the Envelope of an outgoing message.
	 * It checks for all the AIDs present in the message and adds the addresses, if not present
	 **/
	private void addPlatformAddresses(AID id) {
		Iterator it = routes.getAddresses();
		while(it.hasNext()) {
			String addr = (String)it.next();
			id.addAddresses(addr);
		}
	}
	
	// Package scoped since it is accessed by the OutgoingEncoding filter
	final boolean livesHere(AID id) {
		if (!acceptForeignAgents) {
			// All agents in the platform must have a name with the form
			// <local-name>@<platform-name>
			String hap = id.getHap();
			return CaseInsensitiveString.equalsIgnoreCase(hap, platformID);
		}
		else {
			String[] addresses = id.getAddressesArray();
			if (addresses.length == 0) {
				return true;
			}
			else {
				boolean allLocalAddresses = true;
				for (int i = 0; i < addresses.length; ++i) {
					if (!isPlatformAddress(addresses[i])) {
						allLocalAddresses = false;
						break;
					}
				}
				if (allLocalAddresses) {
					return true;
				}
				else {
					// Check in the GADT
					try {
						MainContainer impl = myContainer.getMain();
						if(impl != null) {
							// Directly use the GADT on the main container
							impl.getContainerID(id);
						}
						else {
							// Use the main slice
							MessagingSlice mainSlice = (MessagingSlice)getSlice(MAIN_SLICE);
							try {
								mainSlice.getAgentLocation(id);
							}
							catch(IMTPException imtpe) {
								// Try to get a newer slice and repeat...
								mainSlice = (MessagingSlice)getFreshSlice(MAIN_SLICE);
								mainSlice.getAgentLocation(id);
							}
						}
						return true;
					}
					catch (NotFoundException nfe) {
						// The agent does not live in the platform
						return false;
					}
					catch (Exception e) {
						// Intra-platform delivery would fail, so try inter-platform
						return false;
					}
				}
			}
		}
	}
	
	private final boolean isPlatformAddress(String addr) {
		Iterator it = routes.getAddresses();
		while(it.hasNext()) {
			String ad = (String)it.next();
			if (CaseInsensitiveString.equalsIgnoreCase(ad, addr)) {
				return true;
			}
		}
		return false;
	}
	
	// Work-around for PJAVA compilation
	protected Service.Slice getFreshSlice(String name) throws ServiceException {
		return super.getFreshSlice(name);
	}
	
	
	// Only for debugging:
	private volatile int traceCnt = 0;
	
	private void checkTracing(GenericMessage msg) {
		ACLMessage acl = msg.getACLMessage();
		if (acl != null) {
			if (myLogger.isLoggable(Logger.FINE) || "true".equals(acl.getAllUserDefinedParameters().get(ACLMessage.TRACE))) {
				msg.setTraceID(ACLMessage.getPerformative(acl.getPerformative())+"-"+msg.getSender().getLocalName()+"-"+traceCnt);
				traceCnt++;
			}
		}
	}
	
	private String getTraceId(Envelope env) {
		Iterator it = env.getAllProperties();
		while (it.hasNext()) {
			Property p = (Property) it.next();
			if (p.getName().equals(ACLMessage.TRACE)) {
				return (String) p.getValue();
			}
		}
		return null; 
	}
	
	// For debugging purpose
	public String[] getMessageManagerQueueStatus() {
		return myMessageManager.getQueueStatus();
	}
	
	// For debugging purpose
	public String[] getMessageManagerThreadPoolStatus() {
		return myMessageManager.getThreadPoolStatus();
	}
	
	// For debugging purpose
	public String getMessageManagerGlobalInfo() {
		return myMessageManager.getGlobalInfo();
	}
	
	// For debugging purpose
	public Thread[] getMessageManagerThreadPool() {
		return myMessageManager.getThreadPool();
	}
	
	protected void clearCachedSlice(String name) {
		if (cachedSlices != null){
			cachedSlices.clear();
			myLogger.log(Logger.INFO, "Clearing cache");
		}
		super.clearCachedSlice(name);
	}
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -