📄 mainreplicationservice.java
字号:
}
}
public VerticalCommand serve(HorizontalCommand cmd) {
try {
String cmdName = cmd.getName();
Object[] params = cmd.getParams();
if (cmdName.equals(MainReplicationSlice.H_GETLABEL)) {
Integer i = new Integer(getLabel());
cmd.setReturnValue(i);
} else if (cmdName.equals(MainReplicationSlice.H_GETPLATFORMMANAGERADDRESS)) {
cmd.setReturnValue(getPlatformManagerAddress());
} else if (cmdName.equals(MainReplicationSlice.H_ADDREPLICA)) {
String sliceName = (String) params[0];
String smAddr = (String) params[1];
int sliceIndex = ((Integer) params[2]).intValue();
NodeDescriptor dsc = (NodeDescriptor) params[3];
Vector services = (Vector) params[4];
addReplica(sliceName, smAddr, sliceIndex, dsc, services);
} else if (cmdName.equals(MainReplicationSlice.H_REMOVEREPLICA)) {
String smAddr = (String) params[0];
int sliceIndex = ((Integer) params[1]).intValue();
removeReplica(smAddr, sliceIndex);
} else if (cmdName.equals(MainReplicationSlice.H_FILLGADT)) {
AID[] agents = (AID[]) params[0];
ContainerID[] containers = (ContainerID[]) params[1];
fillGADT(agents, containers);
} else if (cmdName.equals(MainReplicationSlice.H_BORNAGENT)) {
AID name = (AID) params[0];
ContainerID cid = (ContainerID) params[1];
bornAgent(name, cid, cmd.getPrincipal(), cmd.getCredentials());
} else if (cmdName.equals(MainReplicationSlice.H_DEADAGENT)) {
AID name = (AID) params[0];
deadAgent(name);
} else if (cmdName.equals(MainReplicationSlice.H_SUSPENDEDAGENT)) {
AID name = (AID) params[0];
suspendedAgent(name);
} else if (cmdName.equals(MainReplicationSlice.H_RESUMEDAGENT)) {
AID name = (AID) params[0];
resumedAgent(name);
} else if (cmdName.equals(MainReplicationSlice.H_NEWMTP)) {
MTPDescriptor mtp = (MTPDescriptor) params[0];
ContainerID cid = (ContainerID) params[1];
newMTP(mtp, cid);
} else if (cmdName.equals(MainReplicationSlice.H_DEADMTP)) {
MTPDescriptor mtp = (MTPDescriptor) params[0];
ContainerID cid = (ContainerID) params[1];
deadMTP(mtp, cid);
} else if (cmdName.equals(MainReplicationSlice.H_NEWTOOL)) {
AID tool = (AID) params[0];
newTool(tool);
} else if (cmdName.equals(MainReplicationSlice.H_DEADTOOL)) {
AID tool = (AID) params[0];
deadTool(tool);
}
} catch (Throwable t) {
cmd.setReturnValue(t);
}
return null;
}
private int getLabel() throws IMTPException {
return myLabel;
}
private String getPlatformManagerAddress() throws IMTPException {
return myPlatformManager.getLocalAddress();
}
private void addReplica(String sliceName, String smAddr, int sliceIndex, NodeDescriptor dsc, Vector services) throws IMTPException, ServiceException {
//Get a fresh slice: in this way the address is always right (and old address is overridden!!!)
MainReplicationSlice slice = (MainReplicationSlice) getFreshSlice(sliceName);
replicas.add(sliceIndex, slice);
// If first in line, close the ring by monitoring the newly arrived slice,
// and start sending data to the new slice...
if (myLabel == 0) {
attachTo(sliceIndex, slice);
// Send all the data about the GADT...
AID[] names = myMain.agentNames();
ContainerID[] containers = new ContainerID[names.length];
for (int i = 0; i < names.length; i++) {
try {
containers[i] = myMain.getContainerID(names[i]);
} catch (NotFoundException nfe) {
// It should never happen...
nfe.printStackTrace();
}
}
// FIXME: What about principal and ownership?
slice.fillGADT(names, containers);
// Update the status of each suspended agent...
AMSAgentDescription amsd = new AMSAgentDescription();
amsd.setState(AMSAgentDescription.SUSPENDED);
List suspendedAgents = myMain.amsSearch(amsd, -1); // '-1' means 'all the results'
Iterator it = suspendedAgents.iterator();
while (it.hasNext()) {
AMSAgentDescription desc = (AMSAgentDescription) it.next();
try {
slice.suspendedAgent(desc.getName());
} catch (NotFoundException nfe) {
// It should never happen...
nfe.printStackTrace();
}
}
// Send the tool list...
AID[] tools = myMain.agentTools();
for (int i = 0; i < tools.length; i++) {
slice.newTool(tools[i]);
}
// Finally issue a NEW_NODE VCommand and a NEW_SLICE VCommand for each service to allow
// local services to propagate service specific information to their slices in the new
// Main Container node.
try {
myPlatformManager.addMainContainerNode(dsc, services);
}
catch (JADESecurityException jse) {
// Should we do something more?
myLogger.log(Logger.WARNING, "Unauthorized Main Container node "+dsc.getNode().getName(), jse);
}
}
myLogger.log(Logger.INFO, "Main container ring re-arranged: label = "+myLabel+" monitored label = "+monitoredLabel);
}
private void removeReplica(String smAddr, int index) throws IMTPException {
replicas.remove(index);
adjustLabels(index);
}
private void adjustLabels(int index) {
if (index < myLabel) {
myLabel--;
monitoredLabel--;
if (monitoredLabel == -1) {
monitoredLabel += replicas.size();
}
} else if (myLabel == 0) {
// Handle the ring wrap-around case...
monitoredLabel--;
}
myLogger.log(Logger.INFO, "Main container ring re-arranged: label = "+myLabel+" monitored label = "+monitoredLabel);
}
private void fillGADT(AID[] agents, ContainerID[] containers) throws JADESecurityException {
for (int i = 0; i < agents.length; i++) {
try {
// FIXME: What about principal and ownership?
myMain.bornAgent(agents[i], containers[i], null, null, true);
//log("Agent "+agents[i].getName()+" inserted into GADT", 2);
if (myLogger.isLoggable(Logger.CONFIG))
myLogger.log(Logger.CONFIG, "Agent " + agents[i].getName() + " inserted into GADT");
} catch (NotFoundException nfe) {
// It should never happen...
nfe.printStackTrace();
} catch (NameClashException nce) {
// It should never happen...
nce.printStackTrace();
}
}
}
private void bornAgent(AID name, ContainerID cid, JADEPrincipal principal, Credentials credentials) throws NameClashException, NotFoundException {
// Retrieve the ownership from the credentials
String ownership = "NONE";
if (credentials != null) {
JADEPrincipal ownerPr = credentials.getOwner();
if (ownerPr != null) {
ownership = ownerPr.getName();
}
}
try {
// If the name is already in the GADT, throws NameClashException
myMain.bornAgent(name, cid, principal, ownership, false);
//log("Agent "+name.getName()+" inserted into GADT", 2);
if (myLogger.isLoggable(Logger.CONFIG))
myLogger.log(Logger.CONFIG, "Agent " + name.getName() + " inserted into GADT");
} catch (NameClashException nce) {
try {
ContainerID oldCid = myMain.getContainerID(name);
Node n = myMain.getContainerNode(oldCid).getNode();
// Perform a non-blocking ping to check...
n.ping(false);
// Ping succeeded: rethrow the NameClashException
throw nce;
} catch (NameClashException nce2) {
throw nce2; // Let this one through...
} catch (Exception e) {
// Ping failed: forcibly replace the dead agent...
myMain.bornAgent(name, cid, principal, ownership, true);
//log("Agent "+name.getName()+" inserted into GADT", 2);
if (myLogger.isLoggable(Logger.CONFIG))
myLogger.log(Logger.CONFIG, "Agent " + name.getName() + " inserted into GADT");
}
}
}
private void deadAgent(AID name) throws NotFoundException {
myMain.deadAgent(name, false);
}
private void suspendedAgent(AID name) throws NotFoundException {
myMain.suspendedAgent(name);
}
private void resumedAgent(AID name) throws NotFoundException {
myMain.resumedAgent(name);
}
private void newMTP(MTPDescriptor mtp, ContainerID cid) throws IMTPException {
myMain.newMTP(mtp, cid);
}
private void deadMTP(MTPDescriptor mtp, ContainerID cid) throws IMTPException {
myMain.deadMTP(mtp, cid);
}
private void newTool(AID tool) throws IMTPException {
myMain.toolAdded(tool);
}
private void deadTool(AID tool) throws IMTPException {
myMain.toolRemoved(tool);
}
public void dumpReplicas() {
try {
System.out.println("--- " + getLocalNode().getName() + "[" + myLabel + "] ---");
System.out.println("--- Monitoring node [" + monitoredLabel + "] ---");
System.out.println("--- Replica list ---");
Object[] slices = replicas.toArray();
for (int i = 0; i < slices.length; i++) {
MainReplicationSlice slice = (MainReplicationSlice) slices[i];
System.out.println("----- " + slice.getNode().getName() + "[" + i + "] -----");
}
System.out.println("--- End ---");
} catch (Throwable t) {
t.printStackTrace();
}
}
private void dumpGADT() {
AID[] agents = myMain.agentNames();
System.out.println("--- Agent List ---");
for (int i = 0; i < agents.length; i++) {
System.out.println(" Agent: " + agents[i].getLocalName());
}
System.out.println("------------------");
}
// Implementation of the NodeEventListener interface
public void nodeAdded(Node n) {
//log("Start monitoring node <"+n.getName()+">", 2);
if (myLogger.isLoggable(Logger.CONFIG))
myLogger.log(Logger.CONFIG, "Start monitoring node <" + n.getName() + ">");
}
public void nodeRemoved(Node n) {
if (myLogger.isLoggable(Logger.CONFIG)) {
myLogger.log(Logger.CONFIG, "Node <" + n.getName() + "> TERMINATED");
}
try {
replicas.remove(monitoredLabel);
// GC-ADD-18022007-START
// Possibly the AMS is dead --> Start intercepting platform and MTP events on behalf of the
// new AMS if any.
AMSEventQueueFeeder feeder = null;
if (!snapshotOnFailure) {
feeder = new AMSEventQueueFeeder(new InputQueue(), myContainer.getID());
myMain.addListener(feeder);
}
// GC-ADD-18022007-END
myPlatformManager.removeReplica(monitoredSvcMgr, false);
myPlatformManager.removeNode(new NodeDescriptor(n), false);
// Broadcast a 'removeReplica()' method (exclude yourself from bcast)
GenericCommand hCmd = new GenericCommand(MainReplicationSlice.H_REMOVEREPLICA, MainReplicationSlice.NAME, null);
hCmd.addParam(monitoredSvcMgr);
hCmd.addParam(new Integer(monitoredLabel));
broadcastToReplicas(hCmd, EXCLUDE_MYSELF);
int oldLabel = myLabel;
adjustLabels(monitoredLabel);
// -- Attach to the new neighbour slice...
MainReplicationSlice newSlice = (MainReplicationSlice) replicas.get(monitoredLabel);
attachTo(monitoredLabel, newSlice);
// GC-MODIFY-18022007-START
// Become the new leader if it is the case...
if ((oldLabel != 0) && (myLabel == 0)) {
myLogger.log(Logger.INFO, "-- I'm the new leader ---");
myContainer.becomeLeader(feeder);
}
else {
if (feeder != null) {
// NO new AMS --> No need for intercepting events anymore
myMain.removeListener(feeder);
}
}
// GC-MODIFY-18022007-END
} catch (IMTPException imtpe) {
imtpe.printStackTrace();
} catch (ServiceException se) {
se.printStackTrace();
}
}
public void nodeUnreachable(Node n) {
myLogger.log(Logger.WARNING, "Main node <" + n.getName() + "> UNREACHABLE");
}
public void nodeReachable(Node n) {
myLogger.log(Logger.INFO, "Main Node <" + n.getName() + "> REACHABLE");
}
// The active object monitoring the remote node
private NodeFailureMonitor nodeMonitor;
// The integer label of the monitored slice
private int monitoredLabel;
private String monitoredSvcMgr;
} // End of ServiceComponent class
private AgentContainer myContainer;
private ServiceComponent localSlice;
private Filter outFilter;
private Filter inFilter;
private int myLabel = -1;
private final List replicas = new LinkedList();
private boolean snapshotOnFailure = false;
// Owned copies of Main Container and Service Manager
private MainContainerImpl myMain;
private PlatformManagerImpl myPlatformManager;
private void broadcastToReplicas(HorizontalCommand cmd, boolean includeSelf) throws IMTPException, ServiceException {
Object[] slices = replicas.toArray();
String localNodeName = getLocalNode().getName();
for (int i = 0; i < slices.length; i++) {
MainReplicationSlice slice = (MainReplicationSlice) slices[i];
String sliceName = slice.getNode().getName();
if (includeSelf || !sliceName.equals(localNodeName)) {
slice.serve(cmd);
Object ret = cmd.getReturnValue();
if (ret instanceof Throwable) {
// FIXME: This may happen due to the fact that the replica is terminating. E.g. a tool running on
// the terminating replica that deregisters from the AMS: the DeadTool event may be processed
// when the replica is already dead. In these cases we should find a way to hide the exception
myLogger.log(Logger.SEVERE, "Error propagating H-command " + cmd.getName() + " to slice " + sliceName);
((Throwable) ret).printStackTrace();
}
}
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -