⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mainreplicationservice.java

📁 java实现的P2P多agent中间件
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
			}
		}

		public VerticalCommand serve(HorizontalCommand cmd) {
			try {
				String cmdName = cmd.getName();
				Object[] params = cmd.getParams();

				if (cmdName.equals(MainReplicationSlice.H_GETLABEL)) {
					Integer i = new Integer(getLabel());
					cmd.setReturnValue(i);
				} else if (cmdName.equals(MainReplicationSlice.H_GETPLATFORMMANAGERADDRESS)) {
					cmd.setReturnValue(getPlatformManagerAddress());
				} else if (cmdName.equals(MainReplicationSlice.H_ADDREPLICA)) {
					String sliceName = (String) params[0];
					String smAddr = (String) params[1];
					int sliceIndex = ((Integer) params[2]).intValue();
					NodeDescriptor dsc = (NodeDescriptor) params[3];
					Vector services = (Vector) params[4]; 
					addReplica(sliceName, smAddr, sliceIndex, dsc, services);
				} else if (cmdName.equals(MainReplicationSlice.H_REMOVEREPLICA)) {
					String smAddr = (String) params[0];
					int sliceIndex = ((Integer) params[1]).intValue();
					removeReplica(smAddr, sliceIndex);
				} else if (cmdName.equals(MainReplicationSlice.H_FILLGADT)) {
					AID[] agents = (AID[]) params[0];
					ContainerID[] containers = (ContainerID[]) params[1];
					fillGADT(agents, containers);
				} else if (cmdName.equals(MainReplicationSlice.H_BORNAGENT)) {
					AID name = (AID) params[0];
					ContainerID cid = (ContainerID) params[1];
					bornAgent(name, cid, cmd.getPrincipal(), cmd.getCredentials());
				} else if (cmdName.equals(MainReplicationSlice.H_DEADAGENT)) {
					AID name = (AID) params[0];
					deadAgent(name);
				} else if (cmdName.equals(MainReplicationSlice.H_SUSPENDEDAGENT)) {
					AID name = (AID) params[0];
					suspendedAgent(name);
				} else if (cmdName.equals(MainReplicationSlice.H_RESUMEDAGENT)) {
					AID name = (AID) params[0];
					resumedAgent(name);
				} else if (cmdName.equals(MainReplicationSlice.H_NEWMTP)) {
					MTPDescriptor mtp = (MTPDescriptor) params[0];
					ContainerID cid = (ContainerID) params[1];
					newMTP(mtp, cid);
				} else if (cmdName.equals(MainReplicationSlice.H_DEADMTP)) {
					MTPDescriptor mtp = (MTPDescriptor) params[0];
					ContainerID cid = (ContainerID) params[1];
					deadMTP(mtp, cid);
				} else if (cmdName.equals(MainReplicationSlice.H_NEWTOOL)) {
					AID tool = (AID) params[0];
					newTool(tool);
				} else if (cmdName.equals(MainReplicationSlice.H_DEADTOOL)) {
					AID tool = (AID) params[0];
					deadTool(tool);
				}
			} catch (Throwable t) {
				cmd.setReturnValue(t);
			}
			return null;
		}

		private int getLabel() throws IMTPException {
			return myLabel;
		}

		private String getPlatformManagerAddress() throws IMTPException {
			return myPlatformManager.getLocalAddress();
		}

		private void addReplica(String sliceName, String smAddr, int sliceIndex, NodeDescriptor dsc, Vector services) throws IMTPException, ServiceException {
			//Get a fresh slice: in this way the address is always right (and old address is overridden!!!)
			MainReplicationSlice slice = (MainReplicationSlice) getFreshSlice(sliceName);
			replicas.add(sliceIndex, slice);
			// If first in line, close the ring by monitoring the newly arrived slice,
			// and start sending data to the new slice...
			if (myLabel == 0) {
				attachTo(sliceIndex, slice);

				// Send all the data about the GADT...
				AID[] names = myMain.agentNames();
				ContainerID[] containers = new ContainerID[names.length];
				for (int i = 0; i < names.length; i++) {
					try {
						containers[i] = myMain.getContainerID(names[i]);
					} catch (NotFoundException nfe) {
						// It should never happen...
						nfe.printStackTrace();
					}
				}

				// FIXME: What about principal and ownership?
				slice.fillGADT(names, containers);

				// Update the status of each suspended agent...
				AMSAgentDescription amsd = new AMSAgentDescription();
				amsd.setState(AMSAgentDescription.SUSPENDED);
				List suspendedAgents = myMain.amsSearch(amsd, -1); // '-1' means 'all the results'

				Iterator it = suspendedAgents.iterator();
				while (it.hasNext()) {
					AMSAgentDescription desc = (AMSAgentDescription) it.next();
					try {
						slice.suspendedAgent(desc.getName());
					} catch (NotFoundException nfe) {
						// It should never happen...
						nfe.printStackTrace();
					}
				}

				// Send the tool list...
				AID[] tools = myMain.agentTools();
				for (int i = 0; i < tools.length; i++) {
					slice.newTool(tools[i]);
				}
				
				// Finally issue a NEW_NODE VCommand and a NEW_SLICE VCommand for each service to allow
				// local services to propagate service specific information to their slices in the new 
				// Main Container node.
				try {
					myPlatformManager.addMainContainerNode(dsc, services);
				}
				catch (JADESecurityException jse) {
					// Should we do something more?
					myLogger.log(Logger.WARNING, "Unauthorized Main Container node "+dsc.getNode().getName(), jse);
				}

			}
			myLogger.log(Logger.INFO, "Main container ring re-arranged: label = "+myLabel+" monitored label = "+monitoredLabel);
		}

		private void removeReplica(String smAddr, int index) throws IMTPException {
			replicas.remove(index);
			adjustLabels(index);
		}

		private void adjustLabels(int index) {
			if (index < myLabel) {
				myLabel--;
				monitoredLabel--;
				if (monitoredLabel == -1) {
					monitoredLabel += replicas.size();
				}
			} else if (myLabel == 0) {
				// Handle the ring wrap-around case...
				monitoredLabel--;
			}
			myLogger.log(Logger.INFO, "Main container ring re-arranged: label = "+myLabel+" monitored label = "+monitoredLabel);
		}

		private void fillGADT(AID[] agents, ContainerID[] containers) throws JADESecurityException {
			for (int i = 0; i < agents.length; i++) {

				try {
					// FIXME: What about principal and ownership?
					myMain.bornAgent(agents[i], containers[i], null, null, true);
					//log("Agent "+agents[i].getName()+" inserted into GADT", 2);
					if (myLogger.isLoggable(Logger.CONFIG))
						myLogger.log(Logger.CONFIG, "Agent " + agents[i].getName() + " inserted into GADT");

				} catch (NotFoundException nfe) {
					// It should never happen...
					nfe.printStackTrace();
				} catch (NameClashException nce) {
					// It should never happen...
					nce.printStackTrace();
				}
			}

		}

		private void bornAgent(AID name, ContainerID cid, JADEPrincipal principal, Credentials credentials) throws NameClashException, NotFoundException {
			// Retrieve the ownership from the credentials
			String ownership = "NONE";
			if (credentials != null) {
				JADEPrincipal ownerPr = credentials.getOwner();
				if (ownerPr != null) {
					ownership = ownerPr.getName();
				}
			}
			try {
				// If the name is already in the GADT, throws NameClashException
				myMain.bornAgent(name, cid, principal, ownership, false);
				//log("Agent "+name.getName()+" inserted into GADT", 2);
				if (myLogger.isLoggable(Logger.CONFIG))
					myLogger.log(Logger.CONFIG, "Agent " + name.getName() + " inserted into GADT");

			} catch (NameClashException nce) {
				try {
					ContainerID oldCid = myMain.getContainerID(name);
					Node n = myMain.getContainerNode(oldCid).getNode();

					// Perform a non-blocking ping to check...
					n.ping(false);

					// Ping succeeded: rethrow the NameClashException
					throw nce;
				} catch (NameClashException nce2) {
					throw nce2; // Let this one through...
				} catch (Exception e) {
					// Ping failed: forcibly replace the dead agent...
					myMain.bornAgent(name, cid, principal, ownership, true);
					//log("Agent "+name.getName()+" inserted into GADT", 2);
					if (myLogger.isLoggable(Logger.CONFIG))
						myLogger.log(Logger.CONFIG, "Agent " + name.getName() + " inserted into GADT");

				}
			}
		}

		private void deadAgent(AID name) throws NotFoundException {
			myMain.deadAgent(name, false);
		}

		private void suspendedAgent(AID name) throws NotFoundException {
			myMain.suspendedAgent(name);
		}

		private void resumedAgent(AID name) throws NotFoundException {
			myMain.resumedAgent(name);
		}

		private void newMTP(MTPDescriptor mtp, ContainerID cid) throws IMTPException {
			myMain.newMTP(mtp, cid);
		}

		private void deadMTP(MTPDescriptor mtp, ContainerID cid) throws IMTPException {
			myMain.deadMTP(mtp, cid);
		}

		private void newTool(AID tool) throws IMTPException {
			myMain.toolAdded(tool);
		}

		private void deadTool(AID tool) throws IMTPException {
			myMain.toolRemoved(tool);
		}

		public void dumpReplicas() {
			try {
				System.out.println("--- " + getLocalNode().getName() + "[" + myLabel + "] ---");
				System.out.println("--- Monitoring node [" + monitoredLabel + "] ---");
				System.out.println("--- Replica list ---");
				Object[] slices = replicas.toArray();
				for (int i = 0; i < slices.length; i++) {
					MainReplicationSlice slice = (MainReplicationSlice) slices[i];
					System.out.println("----- " + slice.getNode().getName() + "[" + i + "] -----");
				}
				System.out.println("--- End ---");
			} catch (Throwable t) {
				t.printStackTrace();
			}
		}

		private void dumpGADT() {
			AID[] agents = myMain.agentNames();
			System.out.println("--- Agent List ---");
			for (int i = 0; i < agents.length; i++) {
				System.out.println("    Agent: " + agents[i].getLocalName());
			}
			System.out.println("------------------");
		}

		// Implementation of the NodeEventListener interface

		public void nodeAdded(Node n) {
			//log("Start monitoring node <"+n.getName()+">", 2);
			if (myLogger.isLoggable(Logger.CONFIG))
				myLogger.log(Logger.CONFIG, "Start monitoring node <" + n.getName() + ">");

		}

		public void nodeRemoved(Node n) {
			if (myLogger.isLoggable(Logger.CONFIG)) {
				myLogger.log(Logger.CONFIG, "Node <" + n.getName() + "> TERMINATED");
			}

			try {
				replicas.remove(monitoredLabel);
				
				// GC-ADD-18022007-START
				// Possibly the AMS is dead --> Start intercepting platform and MTP events on behalf of the 
				// new AMS if any. 
				AMSEventQueueFeeder feeder = null;
				if (!snapshotOnFailure) {
					feeder = new AMSEventQueueFeeder(new InputQueue(), myContainer.getID());
					myMain.addListener(feeder);
				}
				// GC-ADD-18022007-END
				
				myPlatformManager.removeReplica(monitoredSvcMgr, false);
				myPlatformManager.removeNode(new NodeDescriptor(n), false);

				// Broadcast a 'removeReplica()' method (exclude yourself from bcast)
				GenericCommand hCmd = new GenericCommand(MainReplicationSlice.H_REMOVEREPLICA, MainReplicationSlice.NAME, null);
				hCmd.addParam(monitoredSvcMgr);
				hCmd.addParam(new Integer(monitoredLabel));
				broadcastToReplicas(hCmd, EXCLUDE_MYSELF);

				int oldLabel = myLabel;

				adjustLabels(monitoredLabel);

				// -- Attach to the new neighbour slice...
				MainReplicationSlice newSlice = (MainReplicationSlice) replicas.get(monitoredLabel);
				attachTo(monitoredLabel, newSlice);

				// GC-MODIFY-18022007-START
				// Become the new leader if it is the case...
				if ((oldLabel != 0) && (myLabel == 0)) {
					myLogger.log(Logger.INFO, "-- I'm the new leader ---");
					myContainer.becomeLeader(feeder);
				}
				else {
					if (feeder != null) {
						// NO new AMS --> No need for intercepting events anymore
						myMain.removeListener(feeder);
					}
				}
				// GC-MODIFY-18022007-END

			} catch (IMTPException imtpe) {
				imtpe.printStackTrace();
			} catch (ServiceException se) {
				se.printStackTrace();
			}
		}

		public void nodeUnreachable(Node n) {
			myLogger.log(Logger.WARNING, "Main node <" + n.getName() + "> UNREACHABLE");
		}

		public void nodeReachable(Node n) {
			myLogger.log(Logger.INFO, "Main Node <" + n.getName() + "> REACHABLE");
		}

		// The active object monitoring the remote node
		private NodeFailureMonitor nodeMonitor;

		// The integer label of the monitored slice
		private int monitoredLabel;

		private String monitoredSvcMgr;

	} // End of ServiceComponent class

	private AgentContainer myContainer;

	private ServiceComponent localSlice;

	private Filter outFilter;
	private Filter inFilter;

	private int myLabel = -1;
	private final List replicas = new LinkedList();
	private boolean snapshotOnFailure = false;

	// Owned copies of Main Container and Service Manager
	private MainContainerImpl myMain;
	private PlatformManagerImpl myPlatformManager;

	private void broadcastToReplicas(HorizontalCommand cmd, boolean includeSelf) throws IMTPException, ServiceException {
		Object[] slices = replicas.toArray();

		String localNodeName = getLocalNode().getName();
		for (int i = 0; i < slices.length; i++) {
			MainReplicationSlice slice = (MainReplicationSlice) slices[i];

			String sliceName = slice.getNode().getName();
			if (includeSelf || !sliceName.equals(localNodeName)) {
				slice.serve(cmd);
				Object ret = cmd.getReturnValue();
				if (ret instanceof Throwable) {
					// FIXME: This may happen due to the fact that the replica is terminating. E.g. a tool running on 
					// the terminating replica that deregisters from the AMS: the DeadTool event may be processed
					// when the replica is already dead. In these cases we should find a way to hide the exception
					myLogger.log(Logger.SEVERE, "Error propagating H-command " + cmd.getName() + " to slice " + sliceName);
					((Throwable) ret).printStackTrace();
				}
			}
		}
	}
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -