📄 platformmanagerimpl.java
字号:
// Avoid concurrent modification exception
Object[] allServiceKeys = services.keySet().toArray();
for (int i = 0; i < allServiceKeys.length; i++) {
String serviceKey = (String) allServiceKeys[i];
localRemoveSlice(serviceKey, dsc.getName(), propagated);
}
// Remove the node
if (isLocalNode(node)) {
// If it is the local node it surely hosts a container (the main). Remove it
myMain.removeLocalContainer(dsc.getContainer());
} else {
if (myLogger.isLoggable(Logger.INFO)) {
myLogger.log(Logger.INFO, "Removing node <" + dsc.getName() + "> from the platform");
}
// If the node hosted a container remove it as a remote container
if (dsc.getContainer() != null) {
myMain.removeRemoteContainer(dsc.getContainer());
}
}
// Remove the node from the global node list
nodes.remove(dsc.getName());
// Stop monitoring (this has no effect if we were not monitoring the dead node)
Node parent = dsc.getParentNode();
if (parent != null) {
// If the dead node had a parent, notify the failure-monitor monitoring the parent
NodeFailureMonitor failureMonitor = (NodeFailureMonitor) monitors.get(parent.getName());
if (failureMonitor != null) {
failureMonitor.removeChild(node);
}
}
monitors.remove(node.getName());
// Issue a DEAD_NODE vertical command
if (!propagated) {
GenericCommand gCmd = new GenericCommand(Service.DEAD_NODE, null, null);
gCmd.addParam(dsc);
Object result = myCommandProcessor.processIncoming(gCmd);
if (result instanceof Throwable) {
myLogger.log(Logger.WARNING, "Unexpected error processing DEAD_NODE command. Node is " + dsc.getName());
((Throwable) result).printStackTrace();
}
}
}
private void broadcastRemoveNode(NodeDescriptor dsc) throws ServiceException {
// Avoid concurrent modification exception
Object[] rr = replicas.values().toArray();
for (int i = 0; i < rr.length; ++i) {
PlatformManager replica = (PlatformManager) rr[i];
try {
replica.removeNode(dsc, true);
} catch (IMTPException imtpe) {
// Zombie replica. Will be removed soon. Just ignore it
}
}
}
public void addSlice(ServiceDescriptor service, NodeDescriptor dsc, boolean propagated) throws IMTPException, ServiceException {
localAddSlice(service, dsc, propagated);
if (!propagated) {
broadcastAddSlice(service, dsc);
}
}
// This may throw IMTPException since IMTPManager.createSliceProxy throws IMTPException
private void localAddSlice(ServiceDescriptor serviceDsc, NodeDescriptor dsc, boolean propagated) throws IMTPException, ServiceException {
Service service = serviceDsc.getService();
String serviceKey = service.getName();
ServiceEntry e = (ServiceEntry) services.get(serviceKey);
if (e == null) {
if (myLogger.isLoggable(Logger.CONFIG)) {
myLogger.log(Logger.CONFIG, "Adding entry for service <" + serviceKey + ">");
}
e = new ServiceEntry(service);
services.put(serviceKey, e);
}
if (myLogger.isLoggable(Logger.CONFIG)) {
myLogger.log(Logger.CONFIG, "Adding slice for service <" + serviceKey + "> on node <" + dsc.getName() + ">");
}
Node node = dsc.getNode();
Service.Slice slice = null;
if (service.getHorizontalInterface() != null) {
// Create a real SliceProxy
slice = myIMTPManager.createSliceProxy(serviceKey, service.getHorizontalInterface(), node);
} else {
// Create a dummy SliceProxy (it will never be used)
slice = new SliceProxy(service, node);
}
String sliceKey = node.getName();
e.addSlice(sliceKey, slice, node);
if (isLocalNode(node)) {
// The service is just started on this main container
// Register the service-specific behaviour (if any) within the AMS
Behaviour b = service.getAMSBehaviour();
if (b != null) {
myMain.installAMSBehaviour(b);
}
}
if (!propagated) {
issueNewSliceCommand(serviceKey, sliceKey);
}
}
private void broadcastAddSlice(ServiceDescriptor service, NodeDescriptor dsc) throws ServiceException {
// Avoid concurrent modification exception
Object[] rr = replicas.values().toArray();
for (int i = 0; i < rr.length; ++i) {
PlatformManager replica = (PlatformManager) rr[i];
try {
replica.addSlice(service, dsc, true);
} catch (IMTPException imtpe) {
// Zombie replica. Will be removed soon. Just ignore it
}
}
}
public void removeSlice(String serviceKey, String sliceKey, boolean propagated) throws IMTPException, ServiceException {
localRemoveSlice(serviceKey, sliceKey, propagated);
if (!propagated) {
broadcastRemoveSlice(serviceKey, sliceKey);
}
}
private void localRemoveSlice(String serviceKey, String sliceKey, boolean propagated) throws ServiceException {
ServiceEntry e = (ServiceEntry) services.get(serviceKey);
if (e != null) {
if (e.removeSlice(sliceKey) != null) {
if (myLogger.isLoggable(Logger.CONFIG)) {
myLogger.log(Logger.CONFIG, "Removing slice for service <" + serviceKey + "> on node <" + sliceKey + ">");
}
}
// Clear the cache
Service svc = e.getService();
if (svc instanceof BaseService) {
((BaseService) svc).clearCachedSlice(sliceKey);
}
NodeDescriptor dsc = getDescriptor(sliceKey);
if (dsc != null && isLocalNode(dsc.getNode())) {
// The service slice was removed on this node
// Deregister the service-specific behaviour (if any) within the AMS
Behaviour b = e.getService().getAMSBehaviour();
if (b != null) {
myMain.uninstallAMSBehaviour(b);
}
}
if (!propagated) {
GenericCommand gCmd = new GenericCommand(Service.DEAD_SLICE, serviceKey, null);
gCmd.addParam(sliceKey);
Object result = myCommandProcessor.processIncoming(gCmd);
if (result instanceof Throwable) {
myLogger.log(Logger.WARNING, "Unexpected error processing DEAD_SLICE command. Service is " + serviceKey + " node is " + sliceKey);
((Throwable) result).printStackTrace();
}
}
}
}
private void broadcastRemoveSlice(String serviceKey, String sliceKey) throws ServiceException {
// Avoid concurrent modification exception
Object[] rr = replicas.values().toArray();
for (int i = 0; i < rr.length; ++i) {
PlatformManager replica = (PlatformManager) rr[i];
try {
replica.removeSlice(serviceKey, sliceKey, true);
} catch (IMTPException imtpe) {
// Zombie replica. Will be removed soon. Just ignore it
}
}
}
public void addReplica(String newAddr, boolean propagated) throws IMTPException, ServiceException {
PlatformManager newReplica = myIMTPManager.getPlatformManagerProxy(newAddr);
localAddReplica(newReplica, propagated);
if (!propagated) {
broadcastAddReplica(newAddr);
}
// Actually add the new replica only after broadcasting
replicas.put(newReplica.getLocalAddress(), newReplica);
}
// This may throw IMTPException since the new replica must be informed about the platform status
private void localAddReplica(PlatformManager newReplica, boolean propagated) throws IMTPException, ServiceException {
if (myLogger.isLoggable(Logger.INFO)) {
myLogger.log(Logger.INFO, "Adding replica <" + newReplica.getLocalAddress() + "> to the platform");
}
if (!propagated) {
// Inform the new replica about existing nodes and their installed services...
List infos = getAllNodesInfo();
Iterator it = infos.iterator();
while (it.hasNext()) {
NodeInfo info = (NodeInfo) it.next();
try {
newReplica.addNode(info.getNodeDescriptor(), info.getServices(), true);
} catch (JADESecurityException ae) {
// Should never happen since this is a propagated info
ae.printStackTrace();
}
}
// Inform the new replica about other replicas
// Avoid concurrent modification exception
Object[] rr = replicas.values().toArray();
for (int i = 0; i < rr.length; ++i) {
PlatformManager replica = (PlatformManager) rr[i];
newReplica.addReplica(replica.getLocalAddress(), true);
}
}
// Issue a NEW_REPLICA command
GenericCommand gCmd = new GenericCommand(Service.NEW_REPLICA, null, null);
gCmd.addParam(newReplica.getLocalAddress());
Object result = myCommandProcessor.processIncoming(gCmd);
if (result instanceof Throwable) {
myLogger.log(Logger.WARNING, "Unexpected error processing NEW_REPLICA command. Replica address is " + newReplica.getLocalAddress());
((Throwable) result).printStackTrace();
}
}
private void broadcastAddReplica(String newAddr) throws ServiceException {
// Avoid concurrent modification exception
Object[] rr = replicas.values().toArray();
for (int i = 0; i < rr.length; ++i) {
PlatformManager replica = (PlatformManager) rr[i];
try {
replica.addReplica(newAddr, true);
} catch (IMTPException imtpe) {
// Zombie replica. Will be removed soon. Just ignore it
}
}
}
public void removeReplica(String address, boolean propagated) throws IMTPException, ServiceException {
localRemoveReplica(address, propagated);
if (!propagated) {
broadcastRemoveReplica(address);
}
}
private void localRemoveReplica(String address, boolean propagated) throws ServiceException {
if (myLogger.isLoggable(Logger.INFO)) {
myLogger.log(Logger.INFO, "Removing replica <" + address + "> from the platform");
}
// Remove the old replica
replicas.remove(address);
if (!propagated) {
// Notify first all non-child and non-main nodes.
Object[] allNodes = nodes.values().toArray();
for (int i = 0; i < allNodes.length; i++) {
NodeDescriptor dsc = (NodeDescriptor) allNodes[i];
if (dsc.getParentNode() == null) {
Node n = dsc.getNode();
if (!n.hasPlatformManager()) {
try {
n.platformManagerDead(address, getLocalAddress());
} catch (IMTPException imtpe) {
// The node daid while no one was monitoring it
removeTerminatedNode(n);
}
}
}
}
// Then notify all child nodes.
for (int i = 0; i < allNodes.length; i++) {
NodeDescriptor dsc = (NodeDescriptor) allNodes[i];
if (dsc.getParentNode() != null) {
Node n = dsc.getNode();
try {
n.platformManagerDead(address, getLocalAddress());
} catch (IMTPException imtpe) {
// The node daid while no one was monitoring it
removeTerminatedNode(n);
}
}
}
}
// Issue a DEAD_REPLICA command
GenericCommand gCmd = new GenericCommand(Service.DEAD_REPLICA, null, null);
gCmd.addParam(address);
Object result = myCommandProcessor.processIncoming(gCmd);
if (result instanceof Throwable) {
myLogger.log(Logger.WARNING, "Unexpected error processing DEAD_REPLICA command. Replica address is " + address);
((Throwable) result).printStackTrace();
}
}
private void broadcastRemoveReplica(String address) throws ServiceException {
// Avoid concurrent modification exception
Object[] rr = replicas.values().toArray();
for (int i = 0; i < rr.length; ++i) {
PlatformManager replica = (PlatformManager) rr[i];
try {
replica.removeReplica(address, true);
}
catch (IMTPException imtpe) {
// Zombie replica. Will be removed soon. Just ignore it
}
}
}
public void adopt(Node n, Node[] children) throws IMTPException {
String name = n.getName();
NodeDescriptor dsc = getDescriptor(name);
if (dsc != null) {
monitor(dsc);
myLogger.log(Logger.INFO, "Node <" + n.getName() + "> adopted");
} else {
myLogger.log(Logger.WARNING, "NO descriptor found for node <" + n.getName() + "> requesting adoption. Ignore...");
}
}
public void ping() throws IMTPException {
// Just do nothing
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -