⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 messagingservice.java

📁 java实现的P2P多agent中间件
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
/*****************************************************************
 JADE - Java Agent DEvelopment Framework is a framework to develop
 multi-agent systems in compliance with the FIPA specifications.
 Copyright (C) 2000 CSELT S.p.A.
 
 GNU Lesser General Public License
 
 This library is free software; you can redistribute it and/or
 modify it under the terms of the GNU Lesser General Public
 License as published by the Free Software Foundation,
 version 2.1 of the License.
 
 This library is distributed in the hope that it will be useful,
 but WITHOUT ANY WARRANTY; without even the implied warranty of
 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 Lesser General Public License for more details.
 
 You should have received a copy of the GNU Lesser General Public
 License along with this library; if not, write to the
 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 Boston, MA  02111-1307, USA.
 *****************************************************************/

package jade.core.messaging;

//#MIDP_EXCLUDE_FILE

import java.io.FileWriter;
import java.io.PrintWriter;
import java.io.IOException;

import java.util.Date;

import jade.core.HorizontalCommand;
import jade.core.VerticalCommand;
import jade.core.GenericCommand;
import jade.core.Service;
import jade.core.BaseService;
import jade.core.ServiceException;
import jade.core.Sink;
import jade.core.Filter;
import jade.core.Node;

import jade.core.AgentContainer;
import jade.core.MainContainer;
import jade.core.CaseInsensitiveString;
import jade.core.Agent;
import jade.core.AID;
import jade.core.ContainerID;
import jade.core.Profile;
import jade.core.Specifier;
import jade.core.ProfileException;
import jade.core.IMTPException;
import jade.core.NotFoundException;

import jade.domain.FIPANames;
import jade.domain.FIPAAgentManagement.InternalError;
import jade.domain.FIPAAgentManagement.Envelope;
import jade.domain.FIPAAgentManagement.Property;
import jade.domain.FIPAAgentManagement.ReceivedObject;

import jade.security.JADESecurityException;

import jade.lang.acl.ACLMessage;
import jade.lang.acl.ACLCodec;
import jade.lang.acl.LEAPACLCodec;
import jade.lang.acl.StringACLCodec;

import jade.mtp.MTP;
import jade.mtp.MTPDescriptor;
import jade.mtp.MTPException;
import jade.mtp.InChannel;
import jade.mtp.TransportAddress;

import jade.util.leap.Iterator;
import jade.util.leap.Map;
import jade.util.leap.HashMap;
import jade.util.leap.List;
import jade.util.Logger;
import jade.util.HashCache;


/**
 *
 * The JADE service to manage the message passing subsystem installed
 * on the platform.
 *
 * @author Giovanni Rimassa - FRAMeTech s.r.l.
 * @author Nicolas Lhuillier - Motorola Labs
 * @author Jerome Picault - Motorola Labs
 */
public class MessagingService extends BaseService implements MessageManager.Channel {
	public static final String NAME = MessagingSlice.NAME;
	
	public static final String CACHE_SIZE = "jade_core_messaging_MessagingService_cachesize";
	public static final int CACHE_SIZE_DEFAULT = 100;
	
	public static final String ATTACH_PLATFORM_INFO = "jade_core_messaging_MessagingService_attachplatforminfo";
	public static final String PLATFORM_IDENTIFIER = "x-sender-platform-identifer";
	public static final String MTP_IDENTIFIER = "x-sender-mtp-identifer";
	
	// The profile passed to this object
	private Profile myProfile;
	
	// A flag indicating whether or not we must accept foreign agents
	private boolean acceptForeignAgents = false;
	
	// The ID of the Platform this service belongs to
	private String platformID;
	
	// The concrete agent container, providing access to LADT, etc.
	private AgentContainer myContainer;
	
	// The local slice for this service
	private final ServiceComponent localSlice = new ServiceComponent();
	
	// The command sink, source side
	private final CommandSourceSink senderSink = new CommandSourceSink();
	
	// The command sink, target side
	private final CommandTargetSink receiverSink = new CommandTargetSink();
	
	// The filter for incoming commands related to ACL encoding
	private Filter encOutFilter;
	
	// The filter for outgoing commands related to ACL encoding
	private Filter encInFilter;
	
	// The cached AID -> MessagingSlice associations
	private Map cachedSlices; 
	
	// The routing table mapping MTP addresses to their hosting slice
	private RoutingTable routes;
	
	private final static int EXPECTED_ACLENCODINGS_SIZE = 3;
	// The table of the locally installed ACL message encodings
	private final Map messageEncodings = new HashMap(EXPECTED_ACLENCODINGS_SIZE);
	
	// The platform ID, to be used in inter-platform dispatching
	private String accID;
	
	// The component managing asynchronous message delivery and retries
	private MessageManager myMessageManager;
	
	
	public static class UnknownACLEncodingException extends NotFoundException {
		UnknownACLEncodingException(String msg) {
			super(msg);
		}
	} // End of UnknownACLEncodingException class
	
	
	private static final String[] OWNED_COMMANDS = new String[] {
		MessagingSlice.SEND_MESSAGE,
		MessagingSlice.NOTIFY_FAILURE,
		MessagingSlice.INSTALL_MTP,
		MessagingSlice.UNINSTALL_MTP,
		MessagingSlice.NEW_MTP,
		MessagingSlice.DEAD_MTP,
		MessagingSlice.SET_PLATFORM_ADDRESSES
	};
	
	public MessagingService() {
	}
	
	
	/**
	 * Performs the passive initialization step of the service. This
	 * method is called <b>before</b> activating the service. Its role
	 * should be simply the one of a constructor, setting up the
	 * internal data as needed.
	 * Service implementations should not use the Service Manager and
	 * Service Finder facilities from within this method. A
	 * distributed initialization protocol, if needed, should be
	 * exectuted within the <code>boot()</code> method.
	 * @param ac The agent container this service is activated on.
	 * @param p The configuration profile for this service.
	 * @throws ProfileException If the given profile is not valid.
	 */
	public void init(AgentContainer ac, Profile p) throws ProfileException {
		super.init(ac, p);
		myProfile = p;
		myContainer = ac;
		
		int size = CACHE_SIZE_DEFAULT;
		try {
			size = Integer.parseInt(myProfile.getParameter(CACHE_SIZE, null));
		}
		catch (Exception e) {
			// Keep default
		}
		cachedSlices = new HashCache(size);
		
		routes = new RoutingTable(myProfile.getBooleanProperty(ATTACH_PLATFORM_INFO, false));
		
		// Look in the profile and check whether we must accept foreign agents
		acceptForeignAgents = myProfile.getBooleanProperty(Profile.ACCEPT_FOREIGN_AGENTS, false);
		
		// Initialize its own ID
		platformID = myContainer.getPlatformID();
		accID = "fipa-mts://" + platformID + "/acc";
		
		// create the command filters related to the encoding of ACL messages
		encOutFilter = new OutgoingEncodingFilter(messageEncodings, myContainer, this);
		encInFilter = new IncomingEncodingFilter(messageEncodings, this);
		
		myMessageManager = MessageManager.instance(p);
	}
	
	/**
	 * Performs the active initialization step of a kernel-level
	 * service: Activates the ACL codecs and MTPs as specified in the given
	 * <code>Profile</code> instance.
	 *
	 * @param myProfile The <code>Profile</code> instance containing
	 * the list of ACL codecs and MTPs to activate on this node.
	 * @throws ServiceException If a problem occurs during service
	 * initialization.
	 */
	public void boot(Profile myProfile) throws ServiceException {
		this.myProfile = myProfile;
		
		try {
			// Activate the default ACL String codec anyway
			ACLCodec stringCodec = new StringACLCodec();
			messageEncodings.put(stringCodec.getName().toLowerCase(), stringCodec);
			
			// Activate the efficient encoding for intra-platform encoding
			ACLCodec efficientCodec = new LEAPACLCodec();
			messageEncodings.put(efficientCodec.getName().toLowerCase(), efficientCodec);
			
			// Codecs
			List l = myProfile.getSpecifiers(Profile.ACLCODECS);
			Iterator codecs = l.iterator();
			while (codecs.hasNext()) {
				Specifier spec = (Specifier) codecs.next();
				String className = spec.getClassName();
				try{
					Class c = Class.forName(className);
					ACLCodec codec = (ACLCodec)c.newInstance();
					messageEncodings.put(codec.getName().toLowerCase(), codec);
					if (myLogger.isLoggable(Logger.CONFIG))
						myLogger.log(Logger.CONFIG,"Installed "+ codec.getName()+ " ACLCodec implemented by " + className + "\n");
					
					// FIXME: notify the AMS of the new Codec to update the APDescritption.
				}
				catch(ClassNotFoundException cnfe){
					throw new jade.lang.acl.ACLCodec.CodecException("ERROR: The class " +className +" for the ACLCodec not found.", cnfe);
				}
				catch(InstantiationException ie) {
					throw new jade.lang.acl.ACLCodec.CodecException("The class " + className + " raised InstantiationException (see NestedException)", ie);
				}
				catch(IllegalAccessException iae) {
					throw new jade.lang.acl.ACLCodec.CodecException("The class " + className  + " raised IllegalAccessException (see nested exception)", iae);
				}
			}
			
			// MTPs
			l = myProfile.getSpecifiers(Profile.MTPS);
			PrintWriter f = null;
			StringBuffer sb = null;
			
			Iterator mtps = l.iterator();
			while (mtps.hasNext()) {
				Specifier spec = (Specifier) mtps.next();
				String className = spec.getClassName();
				String addressURL = null;
				Object[] args = spec.getArgs();
				if (args != null && args.length > 0) {
					addressURL = args[0].toString();
					if(addressURL.equals("")) {
						addressURL = null;
					}
				}
				
				MessagingSlice s = (MessagingSlice)getSlice(getLocalNode().getName());
				MTPDescriptor mtp = s.installMTP(addressURL, className);
				String[] mtpAddrs = mtp.getAddresses();
				if (f == null) { 
					String fileName = myProfile.getParameter(Profile.FILE_DIR, "") + "MTPs-" + myContainer.getID().getName() + ".txt";
					f = new PrintWriter(new FileWriter(fileName));
					sb = new StringBuffer("MTP addresses:");
				}
				f.println(mtpAddrs[0]);
				sb.append("\n");
				sb.append(mtpAddrs[0]);
			}
			
			if (f != null) {
				myLogger.log(Logger.INFO, sb.toString());
				f.close();
			}
		}
		catch (ProfileException pe1) {
			//System.err.println("Error reading MTPs/Codecs");
			if (myLogger.isLoggable(Logger.SEVERE))
				myLogger.log(Logger.SEVERE,"Error reading MTPs/Codecs");
			pe1.printStackTrace();
		}
		catch(ServiceException se) {
			//System.err.println("Error installing local MTPs");
			if (myLogger.isLoggable(Logger.SEVERE))
				myLogger.log(Logger.SEVERE,"Error installing local MTPs");
			se.printStackTrace();
		}
		catch(jade.lang.acl.ACLCodec.CodecException ce) {
			//System.err.println("Error installing ACL Codec");
			if (myLogger.isLoggable(Logger.SEVERE))
				myLogger.log(Logger.SEVERE,"Error installing ACL Codec");
			ce.printStackTrace();
		}
		catch(MTPException me) {
			//System.err.println("Error installing MTP");
			if (myLogger.isLoggable(Logger.SEVERE))
				myLogger.log(Logger.SEVERE,"Error installing MTP");
			me.printStackTrace();
		}
		catch(IOException ioe) {
			//System.err.println("Error writing platform address");
			if (myLogger.isLoggable(Logger.SEVERE))
				myLogger.log(Logger.SEVERE,"Error writing platform address");
			ioe.printStackTrace();
		}
		catch(IMTPException imtpe) {
			// Should never happen as this is a local call
			imtpe.printStackTrace();
		}
	}
	
	// kindly provided by David Bernstein, 15/6/2005
	public void shutdown() {
		// clone addresses (externally because leap list doesn't
		// implement Cloneable) so don't get concurrent modification
		// exception on the list as the MTPs are being uninstalled
		List platformAddresses = new jade.util.leap.ArrayList();
		Iterator routeIterator = routes.getAddresses();
		while ( routeIterator.hasNext() ) {
			platformAddresses.add( routeIterator.next() );
		}
		// make an uninstall-mtp command to re-use for each MTP installed
		GenericCommand cmd = new GenericCommand( MessagingSlice.UNINSTALL_MTP, getName(), null );
		// for each platform address, uninstall the MTP it represents
		routeIterator = platformAddresses.iterator();
		while ( routeIterator.hasNext() ) {
			String route = (String)routeIterator.next();
			try {
				cmd.addParam( route );
				receiverSink.consume( cmd );
				cmd.removeParam( route );
				if ( myLogger.isLoggable( Logger.FINER ) ) {
					myLogger.log( Logger.FINER,"uninstalled MTP "+route );
				}
			}
			catch ( Exception e ) {
				if ( myLogger.isLoggable( Logger.SEVERE ) ) {
					myLogger.log( Logger.SEVERE,"Exception uninstalling MTP "+route+". "+e);
				}
			}
		}
	}
	
	
	
	/**
	 * Retrieve the name of this service, that can be used to look up
	 * its slices in the Service Finder.
	 * @return The name of this service.
	 * @see jade.core.ServiceFinder
	 */
	public String getName() {
		return MessagingSlice.NAME;
	}
	
	/**
	 * Retrieve the interface through which the different service
	 * slices will communicate, that is, the service <i>Horizontal
	 * Interface</i>.
	 * @return A <code>Class</code> object, representing the interface
	 * that is implemented by the slices of this service.
	 */
	public Class getHorizontalInterface() {
		try {
			return Class.forName(MessagingSlice.NAME + "Slice");
		}
		catch(ClassNotFoundException cnfe) {
			return null;
		}
	}
	
	/**
	 * Retrieve the locally installed slice of this service.
	 */
	public Service.Slice getLocalSlice() {
		return localSlice;
	}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -