⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 servicemanagerimpl.java

📁 java实现的P2P多agent中间件
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
		try {
			slice = myPlatformManager.findSlice(serviceKey, sliceKey);
		} catch (IMTPException imtpe) {
			if (reconnect()) {
				slice = myPlatformManager.findSlice(serviceKey, sliceKey);
			} else {
				throw imtpe;
			}
		}
		return checkLocal(slice);
	}

	public Service.Slice[] findAllSlices(String serviceKey) throws IMTPException, ServiceException {
		Vector v = null;
		try {
			v = myPlatformManager.findAllSlices(serviceKey);
		} catch (IMTPException imtpe) {
			if (reconnect()) {
				v = myPlatformManager.findAllSlices(serviceKey);
			} else {
				throw imtpe;
			}
		}
		if (v == null) {
			return null;
		} else {
			Service.Slice[] ss = new Service.Slice[v.size()];
			for (int i = 0; i < ss.length; ++i) {
				ss[i] = checkLocal((Service.Slice) v.elementAt(i));
			}
			return ss;
		}
	}

	/////////////////////////////////////////////////
	// Other service installation related methods
	/////////////////////////////////////////////////

	private void installServiceLocally(ServiceDescriptor svcDsc) throws IMTPException, ServiceException {
		Service svc = svcDsc.getService();

		// Install the service filters
		Filter fOut = svc.getCommandFilter(Filter.OUTGOING);
		if (fOut != null) {
			fOut.setServiceName(svc.getName());
			myCommandProcessor.addFilter(fOut, Filter.OUTGOING);
		}
		Filter fIn = svc.getCommandFilter(Filter.INCOMING);
		if (fIn != null) {
			if (fIn == fOut) {
				// NOTE that fOut is certainly != null
				myCommandProcessor.removeFilter(fOut, Filter.OUTGOING);
				throw new ServiceException("The same filter object cannot be used as both incoming and outgoing filter.");
			}
			fIn.setServiceName(svc.getName());
			myCommandProcessor.addFilter(fIn, Filter.INCOMING);
		}

		// Install the service sinks
		Sink sSrc = svc.getCommandSink(Sink.COMMAND_SOURCE);
		if (sSrc != null) {
			myCommandProcessor.registerSink(sSrc, Sink.COMMAND_SOURCE, svc.getName());
		}
		Sink sTgt = svc.getCommandSink(Sink.COMMAND_TARGET);
		if (sTgt != null) {
			myCommandProcessor.registerSink(sTgt, Sink.COMMAND_TARGET, svc.getName());
		}

		// Export the local slice so that it can be reached through the network
		Service.Slice localSlice = svc.getLocalSlice();
		if (localSlice != null) {
			localNode.exportSlice(svc.getName(), localSlice);
		}

		// Add the service to the local service finder so that it can be found
		localServices.put(svc.getName(), svcDsc);

		// If this service extends BaseService, attach it to the Command Processor
		if (svc instanceof BaseService) {
			BaseService bs = (BaseService) svc;
			bs.setCommandProcessor(myCommandProcessor);
		}		
	}

	private void uninstallServiceLocally(String name) throws IMTPException, ServiceException {
		ServiceDescriptor svcDsc = (ServiceDescriptor) localServices.get(name);
		if (svcDsc != null) {
			Service svc = svcDsc.getService();

			// Stop the service
			svc.shutdown();

			// Uninstall the service filters 
			Filter fOut = svc.getCommandFilter(Filter.OUTGOING);
			if (fOut != null) {
				myCommandProcessor.removeFilter(fOut, Filter.OUTGOING);
			}
			Filter fIn = svc.getCommandFilter(Filter.INCOMING);
			if (fIn != null) {
				myCommandProcessor.removeFilter(fIn, Filter.INCOMING);
			}

			// Uninistall the service sinks
			Sink sSrc = svc.getCommandSink(Sink.COMMAND_SOURCE);
			if (sSrc != null) {
				myCommandProcessor.deregisterSink(Sink.COMMAND_SOURCE, svc.getName());
			}
			Sink sTgt = svc.getCommandSink(Sink.COMMAND_TARGET);
			if (sTgt != null) {
				myCommandProcessor.deregisterSink(Sink.COMMAND_TARGET, svc.getName());
			}
		}

		// Unexport the service slice
		localNode.unexportSlice(name);

		// Remove the service
		localServices.remove(name);
	}

	
	////////////////////////////////////////////////////
	// Main container fault management related methods
	////////////////////////////////////////////////////
	void platformManagerDead(String deadPMAddr, String notifyingPMAddr) throws IMTPException {
		myLogger.log(Logger.INFO, "PlatformManager at "+deadPMAddr+" no longer valid!");
		
		if (deadPMAddr.equals(myPlatformManager.getLocalAddress())) {
			// Issue a DEAD_PLATFORM_MANAGER incoming vertical command
			GenericCommand gCmd = new GenericCommand(Service.DEAD_PLATFORM_MANAGER, null, null);
			gCmd.addParam(myPlatformManager.getLocalAddress());
			Object result = myCommandProcessor.processIncoming(gCmd);
			if (result instanceof Throwable) {
				myLogger.log(Logger.WARNING, "Unexpected error processing DEAD_PLATFORM_MANAGER command.");
				((Throwable) result).printStackTrace();
			}
		}
		
		if (deadPMAddr.equals(notifyingPMAddr)) {
			// This is a PlatformManager that recovered from a fault
			reattach(notifyingPMAddr);
		}
		else {
			addAddress(notifyingPMAddr);
			removeAddress(deadPMAddr);
		}
	}

	/**
	 * This method implements the platform reattachement procedure that is activated after a fault
	 * and a successive recover of the Main Container.
	 * This is package scoped since it is called by BaseNode.platformManagerDead()
	 * @see jade.core.faultRecovery.FaultRecoveryService
	 */
	synchronized void reattach(String pmAddr) {
		// We reattach to the recovered PM either if it is our PM or if our
		// PM is invalid (a previous reattach/reconnect attempt failed).
		// Otherwise we just do nothing
		if (invalidPlatformManager || pmAddr.equals(myPlatformManager.getLocalAddress())) {
			invalidPlatformManager = true;
			try {
				myPlatformManager = myIMTPManager.getPlatformManagerProxy(pmAddr);
				String name = myPlatformManager.addNode(localNodeDescriptor, getLocalServices(), false);
				if (!name.equals(localNodeDescriptor.getName())) {
					myLogger.log(Logger.WARNING, "Container name changed re-attaching to PlatformManager: new name = " + name);
				}
				adjustName(name);

				handlePMRefreshed(pmAddr);

				// Issue a REATTACHED incoming V-Command
				System.out.println("Issuing REATTACHED Incoming command");
				GenericCommand gCmd = new GenericCommand(Service.REATTACHED, null, null);
				Object result = myCommandProcessor.processIncoming(gCmd);
				if (result instanceof Throwable) {
					myLogger.log(Logger.SEVERE, "Unexpected error processing REATTACHED command.");
					((Throwable) result).printStackTrace();
				}

				myLogger.log(Logger.INFO, "Re-attached to PlatformManager at " + pmAddr);
			} catch (Exception e) {
				myLogger.log(Logger.SEVERE, "Cannot re-attach to PlatformManager at " + pmAddr + ". " + e);
				e.printStackTrace();
			}
		}
	}

	/**
	 * This method implements the main reconnection procedure that is activated when the main container this container 
	 * is connected to crashes and a backup main container becomes the leader.
	 * @see jade.core.replication.MainReplicationService
	 */
	private synchronized boolean reconnect() {
		if (!terminating) {
			// Check if the current PlatformManager is actually down (another thread
			// may have reconnected in the meanwhile)
			try {
				myPlatformManager.ping();
				return true;
			} 
			catch (IMTPException imtpe) {
				// The current PlatformManager is actually down --> try to reconnect
				invalidPlatformManager = true;
				
				Iterator it = backupManagers.keySet().iterator();
				while (it.hasNext()) {
					String addr = (String) it.next();
					try {
						myPlatformManager = (PlatformManager) backupManagers.get(addr);
						myLogger.log(Logger.INFO, "Reconnecting to PlatformManager at address " + myPlatformManager.getLocalAddress());
	
						myPlatformManager.adopt(localNode, null);
						handlePMRefreshed(addr);
	
						myLogger.log(Logger.INFO, "Reconnection OK");
						return true;
					} 
					catch (Exception e) {
						myLogger.log(Logger.WARNING, "Reconnection failed");
						// Ignore it and try the next address...
					}
				}
			}
		}
		return false;
	}

	private void handlePMRefreshed(String pmAddr) {
		// Clear any cached slice of the Main container
		Object[] services = localServices.values().toArray();
		for (int i = 0; i < services.length; ++i) {
			ServiceDescriptor svcDsc = (ServiceDescriptor) services[i];
			Service svc = svcDsc.getService();
			if (svc instanceof BaseService) {
				((BaseService) svc).clearCachedSlice(MAIN_SLICE);
			}
		}
		myIMTPManager.reconnected(myPlatformManager);
		backupManagers.remove(pmAddr);
		invalidPlatformManager = false;
	}

	public Vector getLocalServices() {
		Object[] services = localServices.values().toArray();
		Vector ss = new Vector(services.length);
		for (int i = 0; i < services.length; ++i) {
			ss.addElement(services[i]);
		}
		return ss;
	}

	
	//////////////////////////////////////////////////
	// Private utility methods
	//////////////////////////////////////////////////
	private void adjustName(String name) {
		localNodeDescriptor.setName(name);
		localNode.setName(name);
		ContainerID cid = localNodeDescriptor.getContainer();
		if (cid != null) {
			cid.setName(name);
		}
	}

	private Service.Slice checkLocal(Service.Slice slice) throws ServiceException {
		if (slice != null) {
			// If the slice is for the local node be sure it includes the real local
			// node and not a proxy
			Node n = slice.getNode();
			if (n.getName().equals(localNode.getName()) && !n.equals(localNode)) {
				((SliceProxy) slice).setNode(localNode);
			}
		}
		return slice;
	}
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -