⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 messagingservice.java

📁 java实现的P2P多agent中间件
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
		private void postMessage(ACLMessage msg, AID receiverID) throws NotFoundException {
			boolean found = myContainer.postMessageToLocalAgent(msg, receiverID);
			if(!found) {
				throw new NotFoundException("Messaging service slice failed to find " + receiverID);
			}
		}
		
		private MTPDescriptor installMTP(String address, String className) throws IMTPException, ServiceException, MTPException {
			
			try {
				// Create the MTP
				Class c = Class.forName(className);
				MTP proto = (MTP)c.newInstance();
				
				InChannel.Dispatcher dispatcher = new InChannel.Dispatcher() {
					public void dispatchMessage(Envelope env, byte[] payload) {
						//log("Message from remote platform received", 2);

						if (myLogger.isLoggable(Logger.FINE))
							myLogger.log(Logger.FINE,"Message from remote platform received");
						
						// To avoid message loops, make sure that the ID of this ACC does
						// not appear in a previous 'received' stamp
						
						ReceivedObject[] stamps = env.getStamps();
						for(int i = 0; i < stamps.length; i++) {
							String id = stamps[i].getBy();
							if(CaseInsensitiveString.equalsIgnoreCase(id, accID)) {
								System.err.println("ERROR: Message loop detected !!!");
								System.err.println("Route is: ");
								for(int j = 0; j < stamps.length; j++)
									System.err.println("[" + j + "]" + stamps[j].getBy());
								System.err.println("Message dispatch aborted.");
								return;
							}
						}
						
						// Put a 'received-object' stamp in the envelope
						ReceivedObject ro = new ReceivedObject();
						ro.setBy(accID);
						ro.setDate(new Date());
						env.setReceived(ro);
						
						Iterator it = env.getAllIntendedReceiver();
						// FIXME: There is a problem if no 'intended-receiver' is present,
						// but this should not happen
						while (it.hasNext()) {
							AID rcv = (AID)it.next();
							GenericMessage msg = new GenericMessage(env,payload);
							String traceId = getTraceId(env);
							if (traceId != null) {
								myLogger.log(Logger.INFO, "MTP In-Channel handling message from the outside for receiver "+rcv.getName()+". TraceID = "+traceId);
								msg.setTraceID(traceId);
							}
							myMessageManager.deliver(msg, rcv, MessagingService.this);
						}
					}
				};
				
				if(address == null) {
					// Let the MTP choose the address
					TransportAddress ta = proto.activate(dispatcher, myProfile);
					address = proto.addrToStr(ta);
				}
				else {
					// Convert the given string into a TransportAddress object and use it
					TransportAddress ta = proto.strToAddr(address);
					proto.activate(dispatcher, ta, myProfile);
				}
				MTPDescriptor result = new MTPDescriptor(proto.getName(), className, new String[] {address}, proto.getSupportedProtocols());
				routes.addLocalMTP(address, proto, result);
				
				String[] pp = result.getSupportedProtocols();
				for (int i = 0; i < pp.length; ++i) {
					//log("Added Route-Via-MTP for protocol "+pp[i], 1);
					if (myLogger.isLoggable(Logger.CONFIG))
						myLogger.log(Logger.CONFIG,"Added Route-Via-MTP for protocol "+pp[i]);
					
				}
				
				String[] addresses = result.getAddresses();
				for(int i = 0; i < addresses.length; i++) {
					myContainer.addAddressToLocalAgents(addresses[i]);
				}
				
				GenericCommand gCmd = new GenericCommand(MessagingSlice.NEW_MTP, MessagingSlice.NAME, null);
				gCmd.addParam(result);
				gCmd.addParam(myContainer.getID());
				submit(gCmd);
				
				return result;
			}
			/*#DOTNET_INCLUDE_BEGIN
			 catch(System.TypeLoadException tle)
			 {
			 ClassNotFoundException cnfe = new ClassNotFoundException(tle.get_Message());
			 throw new MTPException("The class " + className  + " raised IllegalAccessException (see nested exception)", cnfe);
			 }
			 catch(System.TypeInitializationException tie)
			 {
			 InstantiationException ie = new InstantiationException(tie.get_Message());
			 throw new MTPException("The class " + className + " raised InstantiationException (see nested exception)", ie);
			 }
			 #DOTNET_INCLUDE_END*/
			catch(ClassNotFoundException cnfe) 
			{
				throw new MTPException("ERROR: The class " + className + " for the " + address  + " MTP was not found");
			}
			catch(InstantiationException ie) {
				throw new MTPException("The class " + className + " raised InstantiationException (see nested exception)", ie);
			}
			catch(IllegalAccessException iae) {
				throw new MTPException("The class " + className  + " raised IllegalAccessException (see nested exception)", iae);
			}
		}
		
		private void uninstallMTP(String address) throws IMTPException, ServiceException, NotFoundException, MTPException {
			
			RoutingTable.MTPInfo info = routes.removeLocalMTP(address);
			if(info != null) {
				MTP proto = info.getMTP();
				TransportAddress ta = proto.strToAddr(address);
				proto.deactivate(ta);
				MTPDescriptor desc = info.getDescriptor();
				//MTPDescriptor desc = new MTPDescriptor(proto.getName(), proto.getClass().getName(), new String[] {address}, proto.getSupportedProtocols());
				
				String[] addresses = desc.getAddresses();
				for(int i = 0; i < addresses.length; i++) {
					myContainer.removeAddressFromLocalAgents(addresses[i]);
				}
				
				GenericCommand gCmd = new GenericCommand(MessagingSlice.DEAD_MTP, MessagingSlice.NAME, null);
				gCmd.addParam(desc);
				gCmd.addParam(myContainer.getID());
				submit(gCmd);				
			}
			else {
				throw new MTPException("No such address was found on this container: " + address);
			}
		}
		
		private  void newMTP(MTPDescriptor mtp, ContainerID cid) throws IMTPException, ServiceException {
			MainContainer impl = myContainer.getMain();
			
			if(impl != null) {
				
				// Update the routing tables of all the other slices
				Service.Slice[] slices = getAllSlices();
				for(int i = 0; i < slices.length; i++) {
					try {
						MessagingSlice slice = (MessagingSlice)slices[i];
						String sliceName = slice.getNode().getName();
						if(!sliceName.equals(cid.getName())) {
							slice.addRoute(mtp, cid.getName());
						}
					}
					catch(Throwable t) {
						// Re-throw allowed exceptions
						if(t instanceof IMTPException) {
							throw (IMTPException)t;
						}
						if(t instanceof ServiceException) {
							throw (ServiceException)t;
						}
						//System.err.println("### addRoute() threw " + t.getClass().getName() + " ###");
						myLogger.log(Logger.WARNING,"### addRoute() threw " + t + " ###");
					}
				}
				impl.newMTP(mtp, cid);
			}
			else {
				// Do nothing for now, but could also route the command to the main slice, thus enabling e.g. AMS replication
			}
		}
		
		private void deadMTP(MTPDescriptor mtp, ContainerID cid) throws IMTPException, ServiceException {
			MainContainer impl = myContainer.getMain();
			
			if(impl != null) {
				
				// Update the routing tables of all the other slices
				Service.Slice[] slices = getAllSlices();
				for(int i = 0; i < slices.length; i++) {
					try {
						MessagingSlice slice = (MessagingSlice)slices[i];
						String sliceName = slice.getNode().getName();
						if(!sliceName.equals(cid.getName())) {
							slice.removeRoute(mtp, cid.getName());
						}
					}
					catch(Throwable t) {
						// Re-throw allowed exceptions
						if(t instanceof IMTPException) {
							throw (IMTPException)t;
						}
						if(t instanceof ServiceException) {
							throw (ServiceException)t;
						}
						
						myLogger.log(Logger.WARNING,"### removeRoute() threw " + t + " ###");
					}
				}
				impl.deadMTP(mtp, cid);
			}
			else {
				// Do nothing for now, but could also route the command to the main slice, thus enabling e.g. AMS replication
			}
		}
	} // END of inner class CommandTargetSink
	
	
	/**
	 Inner class for this service: this class receives commands from
	 service <code>Sink</code> and serves them, coordinating with
	 remote parts of this service through the <code>Slice</code>
	 interface (that extends the <code>Service.Slice</code>
	 interface).
	 */
	private class ServiceComponent implements Service.Slice {
		// Implementation of the Service.Slice interface
		public Service getService() {
			return MessagingService.this;
		}
		
		public Node getNode() throws ServiceException {
			try {
				return MessagingService.this.getLocalNode();
			}
			catch(IMTPException imtpe) {
				throw new ServiceException("Problem in contacting the IMTP Manager", imtpe);
			}
		}
		
		public VerticalCommand serve(HorizontalCommand cmd) {
			VerticalCommand result = null;
			try {
				String cmdName = cmd.getName();
				Object[] params = cmd.getParams();
				
				if(cmdName.equals(MessagingSlice.H_DISPATCHLOCALLY)) {
					GenericCommand gCmd = new GenericCommand(MessagingSlice.SEND_MESSAGE, MessagingSlice.NAME, null);
					AID senderAID = (AID)params[0];
					GenericMessage msg = (GenericMessage)params[1];
					AID receiverID = (AID)params[2];
					if (msg.getTraceID() != null) {
						myLogger.log(Logger.INFO, "MessagingService slice received message "+MessageManager.stringify(msg)+" for receiver "+receiverID.getLocalName()+". Trace ID = "+msg.getTraceID());
					}
					gCmd.addParam(senderAID);
					gCmd.addParam(msg);
					gCmd.addParam(receiverID);
					result = gCmd;
				}
				else if(cmdName.equals(MessagingSlice.H_ROUTEOUT)) {
					Envelope env = (Envelope)params[0];
					byte[] payload = (byte[])params[1];
					AID receiverID = (AID)params[2];
					String address = (String)params[3];
					
					routeOut(env, payload, receiverID, address);
				}
				else if(cmdName.equals(MessagingSlice.H_GETAGENTLOCATION)) {
					AID agentID = (AID)params[0];
					
					cmd.setReturnValue(getAgentLocation(agentID));
				}
				else if(cmdName.equals(MessagingSlice.H_INSTALLMTP)) {
					GenericCommand gCmd = new GenericCommand(MessagingSlice.INSTALL_MTP, MessagingSlice.NAME, null);
					String address = (String)params[0];
					String className = (String)params[1];
					gCmd.addParam(address);
					gCmd.addParam(className);
					
					result = gCmd;
				}
				else if(cmdName.equals(MessagingSlice.H_UNINSTALLMTP)) {
					GenericCommand gCmd = new GenericCommand(MessagingSlice.UNINSTALL_MTP, MessagingSlice.NAME, null);
					String address = (String)params[0];
					gCmd.addParam(address);
					
					result = gCmd;
				}
				else if(cmdName.equals(MessagingSlice.H_NEWMTP)) {
					MTPDescriptor mtp = (MTPDescriptor)params[0];
					ContainerID cid = (ContainerID)params[1];
					
					GenericCommand gCmd = new GenericCommand(MessagingSlice.NEW_MTP, MessagingSlice.NAME, null);
					gCmd.addParam(mtp);
					gCmd.addParam(cid);
					
					result = gCmd;
				}
				else if(cmdName.equals(MessagingSlice.H_DEADMTP)) {
					MTPDescriptor mtp = (MTPDescriptor)params[0];
					ContainerID cid = (ContainerID)params[1];
					
					GenericCommand gCmd = new GenericCommand(MessagingSlice.DEAD_MTP, MessagingSlice.NAME, null);
					gCmd.addParam(mtp);
					gCmd.addParam(cid);
					
					result = gCmd;
				}
				else if(cmdName.equals(MessagingSlice.H_ADDROUTE)) {
					MTPDescriptor mtp = (MTPDescriptor)params[0];
					String sliceName = (String)params[1];
					
					addRoute(mtp, sliceName);
				}
				else if(cmdName.equals(MessagingSlice.H_REMOVEROUTE)) {
					MTPDescriptor mtp = (MTPDescriptor)params[0];
					String sliceName = (String)params[1];
					
					removeRoute(mtp, sliceName);
				}
			}
			catch(Throwable t) {
				cmd.setReturnValue(t);
			}
			return result;
		}
		
		// Private methods
		private void routeOut(Envelope env, byte[] payload, AID receiverID, String address) throws IMTPException, MTPException {
			RoutingTable.OutPort out = routes.lookup(address);
			//log("Routing message to "+receiverID.getName()+" towards port "+out, 2);
			if (myLogger.isLoggable(Logger.FINE))
				myLogger.log(Logger.FINE,"Routing message to "+receiverID.getName()+" towards port "+out);
			
			if(out != null)
				out.route(env, payload, receiverID, address);
			else
				throw new MTPException("No suitable route found for address " + address + ".");
		}
		
		private ContainerID getAgentLocation(AID agentID) throws IMTPException, NotFoundException {
			MainContainer impl = myContainer.getMain();
			if(impl != null) {
				return impl.getContainerID(agentID);
			}
			else {
				// Do nothing for now, but could also have a local GADT copy, thus enabling e.g. Main Container replication
				return null;
			}
		}
		
		private void addRoute(MTPDescriptor mtp, String sliceName) throws IMTPException, ServiceException {
			// Be sure the slice is fresh --> bypass the service cache
			MessagingSlice slice = (MessagingSlice)getFreshSlice(sliceName);
			if (routes.addRemoteMTP(mtp, sliceName, slice)) {
				// This is actually a new MTP --> Add the new address to all local agents.
				// NOTE that a notification about a remote MTP can be received more than once in case of fault and successive recovery of the Main Container
				String[] pp = mtp.getSupportedProtocols();
				for (int i = 0; i < pp.length; ++i) {
					if (myLogger.isLoggable(Logger.CONFIG))
						myLogger.log(Logger.CONFIG,"Added Route-Via-Slice("+sliceName+") for protocol "+pp[i]);			
				}
				
				String[] addresses = mtp.getAddresses();
				for(int i = 0; i < addresses.length; i++) {
					myContainer.addAddressToLocalAgents(addresses[i]);
				}
			}
		}
		
		private void removeRoute(MTPDescriptor mtp, String sliceName) throws IMTPException, ServiceException {
			// Don't care about whether or not the slice is fresh. Only the name matters.
			MessagingSlice slice = (MessagingSlice)getSlice(sliceName);
			routes.removeRemoteMTP(mtp, sliceName, slice);
			
			String[] pp = mtp.getSupportedProtocols();
			for (int i = 0; i < pp.length; ++i) {
				if (myLogger.isLoggable(Logger.CONFIG))
					myLogger.log(Logger.CONFIG,"Removed Route-Via-Slice("+sliceName+") for protocol "+pp[i]);
				
			}
			
			String[] addresses = mtp.getAddresses();
			for(int i = 0; i < addresses.length; i++) {
				myContainer.removeAddressFromLocalAgents(addresses[i]);
			}
		}
		
	} // End of ServiceComponent class
	
	
	
	///////////////////////////////////////////////
	// Message delivery
	///////////////////////////////////////////////
	
	// Entry point for the ACL message delivery
	public void deliverNow(GenericMessage msg, AID receiverID) {
		if (msg.getTraceID() != null) {
			myLogger.log(Logger.INFO, msg.getTraceID()+" - Serving message delivery");
		}
		try {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -