⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 platformmanagerimpl.java

📁 java实现的P2P多agent中间件
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
		// Avoid concurrent modification exception
		Object[] allServiceKeys = services.keySet().toArray();
		for (int i = 0; i < allServiceKeys.length; i++) {
			String serviceKey = (String) allServiceKeys[i];
			localRemoveSlice(serviceKey, dsc.getName(), propagated);
		}

		// Remove the node
		if (isLocalNode(node)) {
			// If it is the local node it surely hosts a container (the main). Remove it
			myMain.removeLocalContainer(dsc.getContainer());
		} else {
			if (myLogger.isLoggable(Logger.INFO)) {
				myLogger.log(Logger.INFO, "Removing node <" + dsc.getName() + "> from the platform");
			}

			// If the node hosted a container remove it as a remote container
			if (dsc.getContainer() != null) {
				myMain.removeRemoteContainer(dsc.getContainer());
			}
		}

		// Remove the node from the global node list
		nodes.remove(dsc.getName());

		// Stop monitoring (this has no effect if we were not monitoring the dead node)
		Node parent = dsc.getParentNode();
		if (parent != null) {
			// If the dead node had a parent, notify the failure-monitor monitoring the parent
			NodeFailureMonitor failureMonitor = (NodeFailureMonitor) monitors.get(parent.getName());
			if (failureMonitor != null) {
				failureMonitor.removeChild(node);
			}
		}
		monitors.remove(node.getName());

		// Issue a DEAD_NODE vertical command
		if (!propagated) {
			GenericCommand gCmd = new GenericCommand(Service.DEAD_NODE, null, null);
			gCmd.addParam(dsc);
			Object result = myCommandProcessor.processIncoming(gCmd);
			if (result instanceof Throwable) {
				myLogger.log(Logger.WARNING, "Unexpected error processing DEAD_NODE command. Node is " + dsc.getName());
				((Throwable) result).printStackTrace();
			}
		}
	}

	private void broadcastRemoveNode(NodeDescriptor dsc) throws ServiceException {
		// Avoid concurrent modification exception
		Object[] rr = replicas.values().toArray();
		for (int i = 0; i < rr.length; ++i) {
			PlatformManager replica = (PlatformManager) rr[i];
			try {
				replica.removeNode(dsc, true);
			} catch (IMTPException imtpe) {
				// Zombie replica. Will be removed soon. Just ignore it
			}
		}
	}

	public void addSlice(ServiceDescriptor service, NodeDescriptor dsc, boolean propagated) throws IMTPException, ServiceException {
		localAddSlice(service, dsc, propagated);
		if (!propagated) {
			broadcastAddSlice(service, dsc);
		}
	}

	// This may throw IMTPException since IMTPManager.createSliceProxy throws IMTPException
	private void localAddSlice(ServiceDescriptor serviceDsc, NodeDescriptor dsc, boolean propagated) throws IMTPException, ServiceException {
		Service service = serviceDsc.getService();

		String serviceKey = service.getName();
		ServiceEntry e = (ServiceEntry) services.get(serviceKey);

		if (e == null) {
			if (myLogger.isLoggable(Logger.CONFIG)) {
				myLogger.log(Logger.CONFIG, "Adding entry for service <" + serviceKey + ">");
			}

			e = new ServiceEntry(service);
			services.put(serviceKey, e);
		}
		if (myLogger.isLoggable(Logger.CONFIG)) {
			myLogger.log(Logger.CONFIG, "Adding slice for service <" + serviceKey + "> on node <" + dsc.getName() + ">");
		}

		Node node = dsc.getNode();
		Service.Slice slice = null;
		if (service.getHorizontalInterface() != null) {
			// Create a real SliceProxy
			slice = myIMTPManager.createSliceProxy(serviceKey, service.getHorizontalInterface(), node);
		} else {
			// Create a dummy SliceProxy (it will never be used)
			slice = new SliceProxy(service, node);
		}

		String sliceKey = node.getName();
		e.addSlice(sliceKey, slice, node);

		if (isLocalNode(node)) {
			// The service is just started on this main container
			// Register the service-specific behaviour (if any) within the AMS
			Behaviour b = service.getAMSBehaviour();
			if (b != null) {
				myMain.installAMSBehaviour(b);
			}
		}

		if (!propagated) {
			issueNewSliceCommand(serviceKey, sliceKey);
		}
	}

	private void broadcastAddSlice(ServiceDescriptor service, NodeDescriptor dsc) throws ServiceException {
		// Avoid concurrent modification exception
		Object[] rr = replicas.values().toArray();
		for (int i = 0; i < rr.length; ++i) {
			PlatformManager replica = (PlatformManager) rr[i];
			try {
				replica.addSlice(service, dsc, true);
			} catch (IMTPException imtpe) {
				// Zombie replica. Will be removed soon. Just ignore it
			}
		}
	}

	public void removeSlice(String serviceKey, String sliceKey, boolean propagated) throws IMTPException, ServiceException {
		localRemoveSlice(serviceKey, sliceKey, propagated);
		if (!propagated) {
			broadcastRemoveSlice(serviceKey, sliceKey);
		}
	}

	private void localRemoveSlice(String serviceKey, String sliceKey, boolean propagated) throws ServiceException {
		ServiceEntry e = (ServiceEntry) services.get(serviceKey);

		if (e != null) {
			if (e.removeSlice(sliceKey) != null) {
				if (myLogger.isLoggable(Logger.CONFIG)) {
					myLogger.log(Logger.CONFIG, "Removing slice for service <" + serviceKey + "> on node <" + sliceKey + ">");
				}
			}
			// Clear the cache 
			Service svc = e.getService();
			if (svc instanceof BaseService) {
				((BaseService) svc).clearCachedSlice(sliceKey);
			}

			NodeDescriptor dsc = getDescriptor(sliceKey);
			if (dsc != null && isLocalNode(dsc.getNode())) {
				// The service slice was removed on this node
				// Deregister the service-specific behaviour (if any) within the AMS
				Behaviour b = e.getService().getAMSBehaviour();
				if (b != null) {
					myMain.uninstallAMSBehaviour(b);
				}
			}

			if (!propagated) {
				GenericCommand gCmd = new GenericCommand(Service.DEAD_SLICE, serviceKey, null);
				gCmd.addParam(sliceKey);
				Object result = myCommandProcessor.processIncoming(gCmd);
				if (result instanceof Throwable) {
					myLogger.log(Logger.WARNING, "Unexpected error processing DEAD_SLICE command. Service is " + serviceKey + " node is " + sliceKey);
					((Throwable) result).printStackTrace();
				}
			}
		}
	}

	private void broadcastRemoveSlice(String serviceKey, String sliceKey) throws ServiceException {
		// Avoid concurrent modification exception
		Object[] rr = replicas.values().toArray();
		for (int i = 0; i < rr.length; ++i) {
			PlatformManager replica = (PlatformManager) rr[i];
			try {
				replica.removeSlice(serviceKey, sliceKey, true);
			} catch (IMTPException imtpe) {
				// Zombie replica. Will be removed soon. Just ignore it
			}
		}
	}

	public void addReplica(String newAddr, boolean propagated) throws IMTPException, ServiceException {
		PlatformManager newReplica = myIMTPManager.getPlatformManagerProxy(newAddr);
		localAddReplica(newReplica, propagated);
		if (!propagated) {
			broadcastAddReplica(newAddr);
		}
		// Actually add the new replica only after broadcasting
		replicas.put(newReplica.getLocalAddress(), newReplica);
	}

	// This may throw IMTPException since the new replica must be informed about the platform status
	private void localAddReplica(PlatformManager newReplica, boolean propagated) throws IMTPException, ServiceException {
		if (myLogger.isLoggable(Logger.INFO)) {
			myLogger.log(Logger.INFO, "Adding replica <" + newReplica.getLocalAddress() + "> to the platform");
		}

		if (!propagated) {
			// Inform the new replica about existing nodes and their installed services...
			List infos = getAllNodesInfo();

			Iterator it = infos.iterator();
			while (it.hasNext()) {
				NodeInfo info = (NodeInfo) it.next();
				try {
					newReplica.addNode(info.getNodeDescriptor(), info.getServices(), true);
				} catch (JADESecurityException ae) {
					// Should never happen since this is a propagated info
					ae.printStackTrace();
				}
			}

			// Inform the new replica about other replicas
			// Avoid concurrent modification exception
			Object[] rr = replicas.values().toArray();
			for (int i = 0; i < rr.length; ++i) {
				PlatformManager replica = (PlatformManager) rr[i];
				newReplica.addReplica(replica.getLocalAddress(), true);
			}
		}
		
		// Issue a NEW_REPLICA command
		GenericCommand gCmd = new GenericCommand(Service.NEW_REPLICA, null, null);
		gCmd.addParam(newReplica.getLocalAddress());
		Object result = myCommandProcessor.processIncoming(gCmd);
		if (result instanceof Throwable) {
			myLogger.log(Logger.WARNING, "Unexpected error processing NEW_REPLICA command. Replica address is " + newReplica.getLocalAddress());
			((Throwable) result).printStackTrace();
		}
	}

	private void broadcastAddReplica(String newAddr) throws ServiceException {
		// Avoid concurrent modification exception
		Object[] rr = replicas.values().toArray();
		for (int i = 0; i < rr.length; ++i) {
			PlatformManager replica = (PlatformManager) rr[i];
			try {
				replica.addReplica(newAddr, true);
			} catch (IMTPException imtpe) {
				// Zombie replica. Will be removed soon. Just ignore it
			}
		}
	}

	public void removeReplica(String address, boolean propagated) throws IMTPException, ServiceException {
		localRemoveReplica(address, propagated);
		if (!propagated) {
			broadcastRemoveReplica(address);
		}
	}

	private void localRemoveReplica(String address, boolean propagated) throws ServiceException {
		if (myLogger.isLoggable(Logger.INFO)) {
			myLogger.log(Logger.INFO, "Removing replica <" + address + "> from the platform");
		}

		// Remove the old replica
		replicas.remove(address);

		if (!propagated) {
			// Notify first all non-child and non-main nodes.
			Object[] allNodes = nodes.values().toArray();
			for (int i = 0; i < allNodes.length; i++) {
				NodeDescriptor dsc = (NodeDescriptor) allNodes[i];
				if (dsc.getParentNode() == null) {
					Node n = dsc.getNode();
					if (!n.hasPlatformManager()) {
						try {
							n.platformManagerDead(address, getLocalAddress());

						} catch (IMTPException imtpe) {
							// The node daid while no one was monitoring it
							removeTerminatedNode(n);
						}
					}
				}
			}
			// Then notify all child nodes.
			for (int i = 0; i < allNodes.length; i++) {
				NodeDescriptor dsc = (NodeDescriptor) allNodes[i];
				if (dsc.getParentNode() != null) {
					Node n = dsc.getNode();
					try {
						n.platformManagerDead(address, getLocalAddress());
					} catch (IMTPException imtpe) {
						// The node daid while no one was monitoring it
						removeTerminatedNode(n);
					}
				}
			}
		}
		
		// Issue a DEAD_REPLICA command
		GenericCommand gCmd = new GenericCommand(Service.DEAD_REPLICA, null, null);
		gCmd.addParam(address);
		Object result = myCommandProcessor.processIncoming(gCmd);
		if (result instanceof Throwable) {
			myLogger.log(Logger.WARNING, "Unexpected error processing DEAD_REPLICA command. Replica address is " + address);
			((Throwable) result).printStackTrace();
		}
	}

	private void broadcastRemoveReplica(String address) throws ServiceException {
		// Avoid concurrent modification exception
		Object[] rr = replicas.values().toArray();
		for (int i = 0; i < rr.length; ++i) {
			PlatformManager replica = (PlatformManager) rr[i];
			try {
				replica.removeReplica(address, true);
			} 
			catch (IMTPException imtpe) {
				// Zombie replica. Will be removed soon. Just ignore it
			}
		}
	}

	public void adopt(Node n, Node[] children) throws IMTPException {
		String name = n.getName();
		NodeDescriptor dsc = getDescriptor(name);
		if (dsc != null) {
			monitor(dsc);
			myLogger.log(Logger.INFO, "Node <" + n.getName() + "> adopted");
		} else {
			myLogger.log(Logger.WARNING, "NO descriptor found for node <" + n.getName() + "> requesting adoption. Ignore...");
		}
	}

	public void ping() throws IMTPException {
		// Just do nothing
	}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -