⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 requesthandler.java

📁 Chord package into p2psim
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
			return;
		}
		logger.debug("Trying to send failure response. Failure reason "
				+ failure);
		Response failureResponse = new Response(Response.REQUEST_FAILED,
				request.getRequestType(), request.getReplyWith());
		failureResponse.setFailureReason(failure);
		failureResponse.setThrowable(t);
		try {
			synchronized (this.out) {
				this.out.writeObject(failureResponse);
				this.out.flush();
				this.out.reset();
			}
			logger.debug("Response send.");
		} catch (IOException e) {
			if (this.connected) {
				logger.debug("Connection seems to be broken down. Could not "
						+ "send failure response. Connection is closed. ", e);
				this.disconnect();
			}
		}
	}

	/**
	 * Invokes methods on {@link #node}.
	 * 
	 * @param methodType
	 *            The type of the method to invoke. See {@link MethodConstants}.
	 * @param parameters
	 *            The parameters to pass to the method.
	 * @return The result of the invoked method. May be <code>null</code> if
	 *         method is void.
	 * @throws Exception
	 */
	Serializable invokeMethod(int methodType, Serializable[] parameters)
			throws Exception {

		String method = MethodConstants.getMethodName(methodType);
		this.waitForMethod(method);
		/* If we got disconnected while waiting */
		if (!this.connected) {
			/* throw an Exception */
			throw new CommunicationException("Connection closed.");
		}
		Serializable result = null;
		logger.debug("Trying to invoke method " + methodType
				+ " with parameters: ");
		for (Serializable parameter : parameters) {
			logger.debug(parameter);
		}
		switch (methodType) {
		case MethodConstants.FIND_SUCCESSOR: {
			Node chordNode = this.node.findSuccessor((ID) parameters[0]);
			result = new RemoteNodeInfo(chordNode.getNodeURL(), chordNode.getNodeID());
			break;
		}
		case MethodConstants.GET_NODE_ID: {
			result = this.node.getNodeID();
			break;
		}
		case MethodConstants.INSERT_ENTRY: {
			this.node.insertEntry((Entry) parameters[0]);
			break;
		}
		case MethodConstants.INSERT_REPLICAS: {
			this.node.insertReplicas((Set<Entry>) parameters[0]);
			break;
		}
		case MethodConstants.LEAVES_NETWORK: {
			RemoteNodeInfo nodeInfo = (RemoteNodeInfo) parameters[0];
			this.node.leavesNetwork(SocketProxy.create(nodeInfo.getNodeURL(),
					this.node.getNodeURL(), nodeInfo.getNodeID()));
			break;
		}
		case MethodConstants.NOTIFY: {
			RemoteNodeInfo nodeInfo = (RemoteNodeInfo) parameters[0];
			List<Node> l = this.node.notify(SocketProxy.create(nodeInfo
					.getNodeURL(), this.node.getNodeURL(), nodeInfo.getNodeID()));
			List<RemoteNodeInfo> nodeInfos = new LinkedList<RemoteNodeInfo>();
			for (Node current : l) {
				nodeInfos.add(new RemoteNodeInfo(current.getNodeURL(),
						current.getNodeID()));
			}
			result = (Serializable) nodeInfos;
			break;
		}
		case MethodConstants.NOTIFY_AND_COPY: {
			RemoteNodeInfo nodeInfo = (RemoteNodeInfo) parameters[0];
			RefsAndEntries refs = this.node.notifyAndCopyEntries(SocketProxy
					.create(nodeInfo.getNodeURL(), this.node.getNodeURL(), nodeInfo
							.getNodeID()));
			List<Node> l = refs.getRefs();
			List<RemoteNodeInfo> nodeInfos = new LinkedList<RemoteNodeInfo>();
			for (Node current : l) {
				nodeInfos.add(new RemoteNodeInfo(current.getNodeURL(),
						current.getNodeID()));
			}
			RemoteRefsAndEntries rRefs = new RemoteRefsAndEntries(refs
					.getEntries(), nodeInfos);
			result = rRefs;
			break;
		}
		case MethodConstants.PING: {
			logger.debug("Invoking ping()");
			this.node.ping();
			logger.debug("ping() invoked.");
			break;
		}
		case MethodConstants.REMOVE_ENTRY: {
			this.node.removeEntry((Entry) parameters[0]);
			break;
		}
		case MethodConstants.REMOVE_REPLICAS: {
			this.node.removeReplicas((ID) parameters[0],
					(Set<Entry>) parameters[1]);
			break;
		}
		case MethodConstants.RETRIEVE_ENTRIES: {
			result = (Serializable) this.node
					.retrieveEntries((ID) parameters[0]);
			break;
		}
		default: {
			logger.warn("Unknown method requested " + method);
			throw new Exception("Unknown method requested " + method);
		}
		}
		logger.debug("Returning result.");
		return result;
	}

	/**
	 * This method is used to block threads that want to make a method call
	 * until the method invocation is permitted by the endpoint. Invocation of a
	 * method depends on the state of the endpoint.
	 * 
	 * @param method
	 *            The name of the method to invoke. TODO: change this to another
	 *            type.
	 */
	private void waitForMethod(String method) {

		logger
				.debug(method
						+ " allowed? "
						+ !(Collections.binarySearch(
								Endpoint.METHODS_ALLOWED_IN_ACCEPT_ENTRIES,
								method) >= 0));
		synchronized (this.waitingThreads) {
			while ((!(this.state == Endpoint.ACCEPT_ENTRIES))
					&& (this.connected)
					&& ((Collections.binarySearch(
							Endpoint.METHODS_ALLOWED_IN_ACCEPT_ENTRIES, method) >= 0))) {

				Thread currentThread = Thread.currentThread();
				boolean debug = logger.isEnabledFor(DEBUG);
				if (debug) {
					logger.debug("HERE!!!" + currentThread
							+ " waiting for permission to " + "execute "
							+ method);
				}
				this.waitingThreads.add(currentThread);
				try {
					this.waitingThreads.wait();
				} catch (InterruptedException e) {
					// do nothing
				}
				if (debug) {
					logger.debug("HERE!!!" + currentThread
							+ " has been notified.");
				}
				this.waitingThreads.remove(currentThread);
			}
		}
		logger.debug("waitForMethod(" + method + ") returns!");
	}

	/**
	 * Disconnect this RequestHandler. Forces the socket, which this
	 * RequestHandler is bound to, to be closed and {@link #run()}to be
	 * stopped.
	 */
	public void disconnect() {

		logger.info("Disconnecting.");
		if (this.connected) {
			/* cause the while loop in run() method to be finished */
			/* and notify all threads waiting for execution of a method */
			synchronized (this.waitingThreads) {
				this.connected = false;
				this.waitingThreads.notifyAll();
			}
			/* release reference to node. */
			this.node = null;
			/* try to close the socket */
			try {
				synchronized (this.out) {
					this.out.close();
					this.out = null;
				}
			} catch (IOException e) {
				/* should not occur */
				/* if closing of socket fails, that does not matter!??? */
				logger.debug("Exception while closing output stream "
						+ this.out);
			}
			try {
				this.in.close();
				this.in = null;
			} catch (IOException e) {
				/* should not occur */
				/* if closing of socket fails, that does not matter!??? */
				logger.debug("Exception while closing input stream" + this.in);
			}
			try {
				logger.info("Closing socket " + this.connection);
				this.connection.close();
				this.connection = null;
				logger.info("Socket closed.");
			} catch (IOException e) {
				/* should not occur */
				/* if closing of socket fails, that does not matter!??? */
				logger.debug("Exception while closing socket "
						+ this.connection);
			}
			this.endpoint.deregister(this);
		}
		logger.debug("Disconnected.");
	}

	/**
	 * Test if this RequestHandler is disconnected
	 * 
	 * @return <code>true</code> if this is still connected to its remote end.
	 */
	public boolean isConnected() {
		return this.connected;
	}

	public void notify(int newState) {
		logger.debug("notify(" + newState + ") called.");
		this.state = newState;
		/* notify all threads waiting for a state change */
		synchronized (this.waitingThreads) {
			logger.debug("HERE!!! Notifying waiting threads. "
					+ this.waitingThreads);
			this.waitingThreads.notifyAll();
		}
	}

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -