⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 messagingservice.java

📁 java实现的P2P多agent中间件
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
	
	
	/**
	 * Access the command filter this service needs to perform its
	 * tasks. This filter will be installed within the local command
	 * processing engine.
	 * @param direction One of the two constants
	 * <code>Filter.INCOMING</code> and <code>Filter.OUTGOING</code>,
	 * distinguishing between the two filter chains managed by the
	 * command processor.
	 * @return A <code>Filter</code> object, used by this service to
	 * intercept and process kernel-level commands.
	 */
	public Filter getCommandFilter(boolean direction){
		if (direction == Filter.OUTGOING){
			return encOutFilter;
		} else {
			return encInFilter;
		}
	}
	
	/**
	 * Access the command sink this service uses to handle its own
	 * vertical commands.
	 */
	public Sink getCommandSink(boolean side) {
		if(side == Sink.COMMAND_SOURCE) {
			return senderSink;
		}
		else {
			return receiverSink;
		}
	}
	
	/**
	 * Access the names of the vertical commands this service wants to
	 * handle as their final destination. This set must not overlap
	 * with the owned commands set of any previously installed
	 * service, or an exception will be raised and service
	 * activation will fail.
	 *
	 * @see jade.core.Service#getCommandSink()
	 */
	public String[] getOwnedCommands() {
		return OWNED_COMMANDS;
	}
	
	void notifyLocalMTPs() {
		Iterator it = routes.getLocalMTPs();
		while (it.hasNext()) {
			RoutingTable.MTPInfo info = (RoutingTable.MTPInfo) it.next();
			MTPDescriptor mtp = info.getDescriptor();
			ContainerID cid = myContainer.getID();
			
			try {
				MessagingSlice mainSlice = (MessagingSlice)getSlice(MAIN_SLICE);
				try {
					mainSlice.newMTP(mtp, cid);
				}
				catch(IMTPException imtpe) {
					mainSlice = (MessagingSlice)getFreshSlice(MAIN_SLICE);
					mainSlice.newMTP(mtp, cid);
				}
			}
			catch (Exception e) {
				myLogger.log(Logger.WARNING, "Error notifying local MTP "+mtp.getName()+" to Main Container.", e);
			}
		}
	}
	
	/**
	 * Inner class CommandSourceSink
	 * This inner class handles the messaging commands on the command
	 * issuer side, turning them into horizontal commands and
	 * forwarding them to remote slices when necessary.
	 */
	private class CommandSourceSink implements Sink {
		
		public void consume(VerticalCommand cmd) {
			
			try {
				String name = cmd.getName();
				
				if(name.equals(MessagingSlice.SEND_MESSAGE)) {
					handleSendMessage(cmd);
				}
				else if(name.equals(MessagingSlice.NOTIFY_FAILURE)) {
					handleNotifyFailure(cmd);
				}
				else if(name.equals(MessagingSlice.INSTALL_MTP)) {
					Object result = handleInstallMTP(cmd);
					cmd.setReturnValue(result);
				}
				else if(name.equals(MessagingSlice.UNINSTALL_MTP)) {
					handleUninstallMTP(cmd);
				}
				else if(name.equals(MessagingSlice.NEW_MTP)) {
					handleNewMTP(cmd);
				}
				else if(name.equals(MessagingSlice.DEAD_MTP)) {
					handleDeadMTP(cmd);
				}
				else if(name.equals(MessagingSlice.SET_PLATFORM_ADDRESSES)) {
					handleSetPlatformAddresses(cmd);
				}
			}
			catch(IMTPException imtpe) {
				imtpe.printStackTrace();
			}
			catch(NotFoundException nfe) {
				nfe.printStackTrace();
			}
			catch(ServiceException se) {
				se.printStackTrace();
			}
			catch(MTPException mtpe) {
				mtpe.printStackTrace();
			}
			catch(Throwable t) {
				cmd.setReturnValue(t);
			}
		}
		
		// Vertical command handler methods
		
		private void handleSendMessage(VerticalCommand cmd) {
			Object[] params = cmd.getParams();
			AID sender = (AID)params[0];
			GenericMessage msg = (GenericMessage)params[1];
			AID dest = (AID)params[2];
			// Since message delivery is asynchronous we use the GenericMessage
			// as a temporary holder for the sender principal and credentials
			msg.setSenderPrincipal(cmd.getPrincipal());
			msg.setSenderCredentials(cmd.getCredentials());
			checkTracing(msg);
			if (msg.getTraceID() != null) {
				myLogger.log(Logger.INFO, "MessagingService source sink handling message "+MessageManager.stringify(msg)+" for receiver "+dest.getName()+". TraceID = "+msg.getTraceID());
			}
			myMessageManager.deliver(msg, dest, MessagingService.this);
			if (msg.getTraceID() != null) {
				myLogger.log(Logger.INFO, msg.getTraceID()+" - Message enqueued to MessageManager.");
			}
		}
		
		private void handleNotifyFailure(VerticalCommand cmd) {
			Object[] params = cmd.getParams();
			GenericMessage msg = (GenericMessage)params[0];
			AID receiver = (AID)params[1];
			InternalError ie = (InternalError)params[2];
			
			// If (the sender is not the AMS)
			// The acl message contained inside the GenericMessage should never
			// be null (it is used to generate the failure message)
			ACLMessage aclmsg = msg.getACLMessage();
			if((aclmsg.getSender()==null) || (aclmsg.getSender().equals(myContainer.getAMS()))) // sanity check to avoid infinite loops
				return;
			
			// Send back a failure message
			final ACLMessage failure = aclmsg.createReply();
			failure.setPerformative(ACLMessage.FAILURE);
			final AID theAMS = myContainer.getAMS();
			failure.setSender(theAMS);
			failure.setLanguage(FIPANames.ContentLanguage.FIPA_SL);
			
			// FIXME: the content is not completely correct, but that should
			// also avoid creating wrong content
			String content = "( (action " + msg.getSender().toString();
			content = content + " (ACLMessage) ) (MTS-error "+receiver+" "+ie.getMessage() + ") )";
			failure.setContent(content);
			
			try {
				GenericCommand command = new GenericCommand(MessagingSlice.SEND_MESSAGE, MessagingSlice.NAME, null);
				command.addParam(theAMS);
				GenericMessage gm = new GenericMessage(failure);
				gm.setAMSFailure(true);
				command.addParam(gm);
				command.addParam((AID)(failure.getAllReceiver().next()));
				// FIXME: We should set the AMS principal and credentials
				
				submit(command);
			}
			catch(ServiceException se) {
				// It should never happen
				se.printStackTrace();
			}
		}
		
		private MTPDescriptor handleInstallMTP(VerticalCommand cmd) throws IMTPException, ServiceException, NotFoundException, MTPException {
			Object[] params = cmd.getParams();
			String address = (String)params[0];
			ContainerID cid = (ContainerID)params[1];
			String className = (String)params[2];
			
			MessagingSlice targetSlice = (MessagingSlice)getSlice(cid.getName());
			try {
				return targetSlice.installMTP(address, className);
			}
			catch(IMTPException imtpe) {
				targetSlice = (MessagingSlice)getFreshSlice(cid.getName());
				return targetSlice.installMTP(address, className);
			}
		}
		
		private void handleUninstallMTP(VerticalCommand cmd) throws IMTPException, ServiceException, NotFoundException, MTPException {
			Object[] params = cmd.getParams();
			String address = (String)params[0];
			ContainerID cid = (ContainerID)params[1];
			
			MessagingSlice targetSlice = (MessagingSlice)getSlice(cid.getName());
			try {
				targetSlice.uninstallMTP(address);
			}
			catch(IMTPException imtpe) {
				targetSlice = (MessagingSlice)getFreshSlice(cid.getName());
				targetSlice.uninstallMTP(address);
			}
		}
		
		private void handleNewMTP(VerticalCommand cmd) throws IMTPException, ServiceException {
			Object[] params = cmd.getParams();
			MTPDescriptor mtp = (MTPDescriptor)params[0];
			ContainerID cid = (ContainerID)params[1];
			
			MessagingSlice mainSlice = (MessagingSlice)getSlice(MAIN_SLICE);
			try {
				mainSlice.newMTP(mtp, cid);
			}
			catch(IMTPException imtpe) {
				mainSlice = (MessagingSlice)getFreshSlice(MAIN_SLICE);
				mainSlice.newMTP(mtp, cid);
			}
		}
		
		private void handleDeadMTP(VerticalCommand cmd) throws IMTPException, ServiceException {
			Object[] params = cmd.getParams();
			MTPDescriptor mtp = (MTPDescriptor)params[0];
			ContainerID cid = (ContainerID)params[1];
			
			MessagingSlice mainSlice = (MessagingSlice)getSlice(MAIN_SLICE);
			try {
				mainSlice.deadMTP(mtp, cid);
			}
			catch(IMTPException imtpe) {
				mainSlice = (MessagingSlice)getFreshSlice(MAIN_SLICE);
				mainSlice.deadMTP(mtp, cid);
			}
			
		}
		
		private void handleSetPlatformAddresses(VerticalCommand cmd) {
			Object[] params = cmd.getParams();
			AID id = (AID)params[0];
			id.clearAllAddresses();
			addPlatformAddresses(id);
		}
		
	} // END of inner class CommandSourceSink
	
	
	/**
	 * Inner class CommandTargetSink
	 */
	private class CommandTargetSink implements Sink {
		
		public void consume(VerticalCommand cmd) {
			
			try {
				String name = cmd.getName();
				if(name.equals(MessagingSlice.SEND_MESSAGE)) {
					handleSendMessage(cmd);
				}
				else if(name.equals(MessagingSlice.INSTALL_MTP)) {
					Object result = handleInstallMTP(cmd);
					cmd.setReturnValue(result);
				}
				else if(name.equals(MessagingSlice.UNINSTALL_MTP)) {
					handleUninstallMTP(cmd);
				}
				else if(name.equals(MessagingSlice.NEW_MTP)) {
					handleNewMTP(cmd);
				}
				else if(name.equals(MessagingSlice.DEAD_MTP)) {
					handleDeadMTP(cmd);
				}
				else if(name.equals(MessagingSlice.SET_PLATFORM_ADDRESSES)) {
					handleSetPlatformAddresses(cmd);
				}
				else if(name.equals(Service.NEW_SLICE)) {
					handleNewSlice(cmd);
				}
			}
			catch(IMTPException imtpe) {
				cmd.setReturnValue(imtpe);
			}
			catch(NotFoundException nfe) {
				cmd.setReturnValue(nfe);
			}
			catch(ServiceException se) {
				cmd.setReturnValue(se);
			}
			catch(MTPException mtpe) {
				cmd.setReturnValue(mtpe);
			}
		}
		
		private void handleSendMessage(VerticalCommand cmd) throws NotFoundException {
			Object[] params = cmd.getParams();
			AID senderID = (AID)params[0];
			GenericMessage msg = (GenericMessage)params[1];
			AID receiverID = (AID)params[2];
			if (msg.getTraceID() != null) {
				myLogger.log(Logger.INFO, msg.getTraceID()+" - MessagingService target sink posting message to receiver "+receiverID.getLocalName());
				
			}
			postMessage(msg.getACLMessage(), receiverID);
			if (msg.getTraceID() != null) {
				myLogger.log(Logger.INFO, msg.getTraceID()+" - Message posted");
				
			}
		}
		
		private MTPDescriptor handleInstallMTP(VerticalCommand cmd) throws IMTPException, ServiceException, MTPException {
			Object[] params = cmd.getParams();
			String address = (String)params[0];
			String className = (String)params[1];
			
			return installMTP(address, className);
		}
		
		private void handleUninstallMTP(VerticalCommand cmd) throws IMTPException, ServiceException, NotFoundException, MTPException {
			Object[] params = cmd.getParams();
			String address = (String)params[0];
			
			uninstallMTP(address);
		}
		
		private void handleNewMTP(VerticalCommand cmd) throws IMTPException, ServiceException {
			Object[] params = cmd.getParams();
			MTPDescriptor mtp = (MTPDescriptor)params[0];
			ContainerID cid = (ContainerID)params[1];
			
			newMTP(mtp, cid);
		}
		
		private void handleDeadMTP(VerticalCommand cmd) throws IMTPException, ServiceException {
			Object[] params = cmd.getParams();
			MTPDescriptor mtp = (MTPDescriptor)params[0];
			ContainerID cid = (ContainerID)params[1];
			
			deadMTP(mtp, cid);
		}
		
		private void handleSetPlatformAddresses(VerticalCommand cmd) {
			
		}
		
		private void handleNewSlice(VerticalCommand cmd) {
			MainContainer impl = myContainer.getMain();
			if(impl != null) {
				Object[] params = cmd.getParams();
				String newSliceName = (String) params[0];
				try {
					// Be sure to get the new (fresh) slice --> Bypass the service cache
					MessagingSlice newSlice = (MessagingSlice) getFreshSlice(newSliceName);
					
					// Send all possible routes to the new slice
					ContainerID[] cids = impl.containerIDs();
					for(int i = 0; i < cids.length; i++) {
						ContainerID cid = cids[i];
						
						try {
							List mtps = impl.containerMTPs(cid);
							Iterator it = mtps.iterator();
							while(it.hasNext()) {
								MTPDescriptor mtp = (MTPDescriptor)it.next();
								newSlice.addRoute(mtp, cid.getName());
							}
						}
						catch(NotFoundException nfe) {
							// Should never happen
							nfe.printStackTrace();
						}
					}
				}
				catch (ServiceException se) {
					// Should never happen since getSlice() should always work on the Main container
					se.printStackTrace();
				}
				catch (IMTPException imtpe) {
					// Should never happen since the new slice should be always valid at this time
					imtpe.printStackTrace();
				}
			}
		}
		

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -