⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mainreplicationservice.java

📁 java实现的P2P多agent中间件
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*****************************************************************
 JADE - Java Agent DEvelopment Framework is a framework to develop
 multi-agent systems in compliance with the FIPA specifications.
 Copyright (C) 2000 CSELT S.p.A.

 GNU Lesser General Public License

 This library is free software; you can redistribute it and/or
 modify it under the terms of the GNU Lesser General Public
 License as published by the Free Software Foundation,
 version 2.1 of the License.

 This library is distributed in the hope that it will be useful,
 but WITHOUT ANY WARRANTY; without even the implied warranty of
 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 Lesser General Public License for more details.

 You should have received a copy of the GNU Lesser General Public
 License along with this library; if not, write to the
 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 Boston, MA  02111-1307, USA.
 *****************************************************************/

package jade.core.replication;

//#MIDP_EXCLUDE_FILE

import java.util.Vector;

import jade.core.HorizontalCommand;
import jade.core.PlatformManagerImpl;
import jade.core.VerticalCommand;
import jade.core.GenericCommand;
import jade.core.Service;
import jade.core.BaseService;
import jade.core.ServiceException;
import jade.core.Filter;
import jade.core.Node;
import jade.core.NodeDescriptor;
import jade.core.NodeEventListener;
import jade.core.NodeFailureMonitor;

import jade.core.AgentContainer;
import jade.core.MainContainerImpl;
import jade.core.Profile;
import jade.core.ProfileException;
import jade.core.IMTPException;
import jade.core.NotFoundException;
import jade.core.NameClashException;

import jade.core.AID;
import jade.core.ContainerID;

import jade.domain.AMSEventQueueFeeder;
import jade.domain.FIPAAgentManagement.AMSAgentDescription;

import jade.mtp.MTPDescriptor;

import jade.security.Credentials;
import jade.security.JADEPrincipal;
import jade.security.JADESecurityException;

import jade.util.leap.List;
import jade.util.leap.LinkedList;
import jade.util.leap.Iterator;
import jade.util.InputQueue;
import jade.util.Logger;


/**
 A kernel-level service to manage a ring of Main Containers,
 keeping the various replicas in sync and providing failure
 detection and recovery to make JADE tolerate Main Container
 crashes.

 @author Giovanni Rimassa - FRAMeTech s.r.l.

 */
public class MainReplicationService extends BaseService {
	public static final String NAME = MainReplicationSlice.NAME;
	public static final String SNAPSHOT_ON_FAILURE = "jade_core_replication_MainReplicationService_snapshotonfailure";
	
	private static final boolean EXCLUDE_MYSELF = false;

	private static final boolean INCLUDE_MYSELF = true;

	private static final String[] OWNED_COMMANDS = new String[] {};

	public void init(AgentContainer ac, Profile p) throws ProfileException {
		super.init(ac, p);

		myContainer = ac;

		// Create a local slice
		localSlice = new ServiceComponent();

		// Create the command filters
		outFilter = new CommandOutgoingFilter();
		inFilter = new CommandIncomingFilter();

		snapshotOnFailure = p.getBooleanProperty(SNAPSHOT_ON_FAILURE, false);
	}

	public String getName() {
		return MainReplicationSlice.NAME;
	}

	public Class getHorizontalInterface() {
		try {
			return Class.forName(MainReplicationSlice.NAME + "Slice");
		} catch (ClassNotFoundException cnfe) {
			return null;
		}
	}

	public Service.Slice getLocalSlice() {
		return localSlice;
	}

	public Filter getCommandFilter(boolean direction) {
		if (direction == Filter.OUTGOING) {
			return outFilter;
		} else {
			return inFilter;
		}
	}

	public String[] getOwnedCommands() {
		return OWNED_COMMANDS;
	}

	public void boot(Profile p) throws ServiceException {
		try {
			// Initialize the label of this node
			Service.Slice[] slices = getAllSlices();
			myLabel = slices.length - 1;

			// Temporarily store the slices into an array...
			MainReplicationSlice[] temp = new MainReplicationSlice[slices.length];
			// Besides notifying GADT information, the MainReplication slice that will monitor this newly started
			// slice will also have to issue a NEW_NODE VCommand and a NEW_SLICE VCommands for each local service
			// to allow services to notify service specific information
			NodeDescriptor dsc = myContainer.getNodeDescriptor();
			Vector localServices = myContainer.getServiceManager().getLocalServices();
			String localNodeName = getLocalNode().getName();
			for (int i = 0; i < slices.length; i++) {
				try {
					MainReplicationSlice slice = (MainReplicationSlice) slices[i];
					String sliceName = slice.getNode().getName();
					int label = slice.getLabel();

					temp[label] = slice;

					if (!sliceName.equals(localNodeName)) {
						slice.addReplica(localNodeName, myPlatformManager.getLocalAddress(), myLabel, dsc, localServices);
					}

					if (label == myLabel - 1) {
						localSlice.attachTo(label, slice);
					}
				} catch (IMTPException imtpe) {
					// Ignore it: stale slice...
				}
			}

			// copy all the slices from the temporary array to the slice list
			for (int i = 0; i < temp.length; i++) {
				replicas.add(temp[i]);
			}
			
			if (myLabel > 0) {
				myLogger.log(Logger.INFO, "Main container ring re-arranged: label = "+myLabel+" monitored label = "+localSlice.monitoredLabel);
			}
		} catch (IMTPException imtpe) {
			throw new ServiceException("An error occurred during service startup.", imtpe);
		}

	}

	public void shutdown() {
		if (localSlice != null) {
			localSlice.stopMonitoring();
		}
	}

	
	/**
	 * Inner class CommandOutgoingFilter
	 * Keep tool agents information in synch among replicas 
	 */
	private class CommandOutgoingFilter extends Filter {

		public boolean accept(VerticalCommand cmd) {

			try {
				String name = cmd.getName();

				if (name.equals(jade.core.management.AgentManagementSlice.ADD_TOOL)) {
					handleNewTool(cmd);
				} else if (name.equals(jade.core.management.AgentManagementSlice.REMOVE_TOOL)) {
					handleDeadTool(cmd);
				}
			} catch (IMTPException imtpe) {
				cmd.setReturnValue(imtpe);
			} catch (ServiceException se) {
				cmd.setReturnValue(se);
			}

			// Never veto a command
			return true;
		}

		private void handleNewTool(VerticalCommand cmd) throws IMTPException, ServiceException {
			Object[] params = cmd.getParams();
			AID tool = (AID) params[0];

			GenericCommand hCmd = new GenericCommand(MainReplicationSlice.H_NEWTOOL, MainReplicationSlice.NAME, null);
			hCmd.addParam(tool);

			broadcastToReplicas(hCmd, EXCLUDE_MYSELF);
		}

		private void handleDeadTool(VerticalCommand cmd) throws IMTPException, ServiceException {
			Object[] params = cmd.getParams();
			AID tool = (AID) params[0];

			GenericCommand hCmd = new GenericCommand(MainReplicationSlice.H_DEADTOOL, MainReplicationSlice.NAME, null);
			hCmd.addParam(tool);

			broadcastToReplicas(hCmd, EXCLUDE_MYSELF);
		}
	} // End of CommandOutgoingFilter class

	
	/**
	 * Inner class CommandIncomingFilter
	 * Keep agents and MTPs information in synch among replicas 
	 */
	private class CommandIncomingFilter extends Filter {

		public void postProcess(VerticalCommand cmd) {
 			try {
				String name = cmd.getName();

				if (name.equals(jade.core.management.AgentManagementSlice.INFORM_CREATED)) {
					handleInformCreated(cmd);
				} else if (name.equals(jade.core.management.AgentManagementSlice.INFORM_KILLED)) {
					handleInformKilled(cmd);
				} else if (name.equals(jade.core.management.AgentManagementSlice.INFORM_STATE_CHANGED)) {
					handleInformStateChanged(cmd);
				} else if (name.equals(jade.core.messaging.MessagingSlice.NEW_MTP)) {
					handleNewMTP(cmd);
				} else if (name.equals(jade.core.messaging.MessagingSlice.DEAD_MTP)) {
					handleDeadMTP(cmd);
				}
			} 
			catch (Throwable t) {
				cmd.setReturnValue(t);
			}
		}

		private void handleInformCreated(VerticalCommand cmd) throws IMTPException, NotFoundException, NameClashException, JADESecurityException, ServiceException {
			Object ret = cmd.getReturnValue();
			// Avoid propagating to other slices in case the agent creation failed due to a name-clash
			if (!(ret != null && ret instanceof NameClashException)) {
				Object[] params = cmd.getParams();
	
				AID agentID = (AID) params[0];
				ContainerID cid = (ContainerID) params[1];
	
				GenericCommand hCmd = new GenericCommand(MainReplicationSlice.H_BORNAGENT, MainReplicationSlice.NAME, null);
				hCmd.addParam(agentID);
				hCmd.addParam(cid);
				hCmd.setPrincipal(cmd.getPrincipal());
				hCmd.setCredentials(cmd.getCredentials());
	
				broadcastToReplicas(hCmd, EXCLUDE_MYSELF);
			}
		}

		private void handleInformKilled(VerticalCommand cmd) throws IMTPException, NotFoundException, ServiceException {
			Object[] params = cmd.getParams();
			AID agentID = (AID) params[0];

			GenericCommand hCmd = new GenericCommand(MainReplicationSlice.H_DEADAGENT, MainReplicationSlice.NAME, null);
			hCmd.addParam(agentID);

			broadcastToReplicas(hCmd, EXCLUDE_MYSELF);
		}

		private void handleInformStateChanged(VerticalCommand cmd) throws IMTPException, NotFoundException, ServiceException {
			Object[] params = cmd.getParams();
			AID agentID = (AID) params[0];
			String newState = (String) params[1];

			if (newState.equals(jade.domain.FIPAAgentManagement.AMSAgentDescription.SUSPENDED)) {
				GenericCommand hCmd = new GenericCommand(MainReplicationSlice.H_SUSPENDEDAGENT, MainReplicationSlice.NAME, null);
				hCmd.addParam(agentID);

				broadcastToReplicas(hCmd, EXCLUDE_MYSELF);
			} else if (newState.equals(jade.domain.FIPAAgentManagement.AMSAgentDescription.ACTIVE)) {
				GenericCommand hCmd = new GenericCommand(MainReplicationSlice.H_RESUMEDAGENT, MainReplicationSlice.NAME, null);
				hCmd.addParam(agentID);

				broadcastToReplicas(hCmd, EXCLUDE_MYSELF);
			}

		}

		private void handleNewMTP(VerticalCommand cmd) throws IMTPException, ServiceException {
			Object[] params = cmd.getParams();
			MTPDescriptor mtp = (MTPDescriptor) params[0];
			ContainerID cid = (ContainerID) params[1];

			GenericCommand hCmd = new GenericCommand(MainReplicationSlice.H_NEWMTP, MainReplicationSlice.NAME, null);
			hCmd.addParam(mtp);
			hCmd.addParam(cid);

			broadcastToReplicas(hCmd, EXCLUDE_MYSELF);
		}

		private void handleDeadMTP(VerticalCommand cmd) throws IMTPException, ServiceException {
			Object[] params = cmd.getParams();
			MTPDescriptor mtp = (MTPDescriptor) params[0];
			ContainerID cid = (ContainerID) params[1];

			GenericCommand hCmd = new GenericCommand(MainReplicationSlice.H_DEADMTP, MainReplicationSlice.NAME, null);
			hCmd.addParam(mtp);
			hCmd.addParam(cid);

			broadcastToReplicas(hCmd, EXCLUDE_MYSELF);
		}
	} // End of CommandIncomingFilter class

	
	/**
	 * Inner class ServiceComponent
	 */
	private class ServiceComponent implements Service.Slice, NodeEventListener {

		public ServiceComponent() {
			myMain = (MainContainerImpl) myContainer.getMain();
			myPlatformManager = (PlatformManagerImpl) myMain.getPlatformManager();
		}

		public void stopMonitoring() {
			if (nodeMonitor != null) {
				if (myLogger.isLoggable(Logger.CONFIG))
					myLogger.log(Logger.CONFIG, "Stop monitoring node <" + nodeMonitor.getNode().getName() + ">");
				nodeMonitor.stop();
			}
		}

		private void attachTo(int label, MainReplicationSlice slice) throws IMTPException, ServiceException {
			// Stop the previous monitor, if any
			stopMonitoring();

			// Store the label of the monitored slice
			monitoredLabel = label;

			// Avoid monitoring yourself
			if (monitoredLabel == myLabel) {
				return;
			}

			// Store the Service Manager address for the monitored slice
			monitoredSvcMgr = slice.getPlatformManagerAddress();

			// Set up a failure monitor on the target slice...
			nodeMonitor = NodeFailureMonitor.getFailureMonitor();
			nodeMonitor.start(slice.getNode(), this);
		}

		// Implementation of the Service.Slice interface

		public Service getService() {
			return MainReplicationService.this;
		}

		public Node getNode() throws ServiceException {
			try {
				return MainReplicationService.this.getLocalNode();
			} catch (IMTPException imtpe) {
				throw new ServiceException("Problem in contacting the IMTP Manager", imtpe);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -