⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 socketproxy.java

📁 Chord package into p2psim
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
				else {
					logger.error("There is no result, but we have not been "
							+ "disconnected. Something went seriously wrong!");
					throw new CommunicationException(
							"Did not receive a response!");
				}
			}
		}
		return response;
	}

	/**
	 * This method is called by {@link #run()}when it receives a
	 * {@link Response}. The {@link Thread thread}waiting for response is
	 * woken up and the response is put into {@link Map responses}.
	 * 
	 * @param response
	 */
	private void responseReceived(Response response) {
		synchronized (this.responses) {
			/* Try to fetch thread waiting for this response */
			logger.debug("No of waiting threads " + this.waitingThreads);
			WaitingThread waitingThread = this.waitingThreads.get(response
					.getInReplyTo());
			logger.debug("Response with id " + response.getInReplyTo()
					+ "received.");
			/* save response */
			this.responses.put(response.getInReplyTo(), response);
			/* if there is a thread waiting for this response */
			if (waitingThread != null) {
				/* wake up the thread */
				logger.debug("Waking up thread!");
				waitingThread.wakeUp();
			} else {
				// TODO what else? why 'else' anyway?
			}
		}
	}

	/**
	 * Method to indicate that connection to remote {@link Node node} is broken
	 * down.
	 */
	private void connectionBrokenDown() {
		if (this.responses == null) {
			/*
			 * Nothing to do!
			 */
			return;
		}
		/* synchronize on responses, as all threads accessing this proxy do so */
		synchronized (this.responses) {
			logger.info("Connection broken down!");
			this.disconnected = true;
			/* wake up all threads */
			for (WaitingThread thread : this.waitingThreads.values()) {
				logger.debug("Interrupting waiting thread " + thread);
				thread.wakeUp();
			}
		}
	}

	/**
	 * Creates a request for the method identified by
	 * <code>methodIdentifier</code> with the parameters
	 * <code>parameters</code>. Sets also field
	 * {@link Request#getReplyWith()}of created {@link Request request}.
	 * 
	 * @param methodIdentifier
	 *            The identifier of the method to request.
	 * @param parameters
	 *            The parameters for the request.
	 * @return The {@link Request request}created.
	 */
	private Request createRequest(int methodIdentifier,
			Serializable[] parameters) {
		if (logger.isEnabledFor(DEBUG)) {
			logger.debug("Creating request for method "
					+ MethodConstants.getMethodName(methodIdentifier)
					+ " with parameters "
					+ java.util.Arrays.deepToString(parameters));
		}
		String responseIdentifier = this.createIdentifier(methodIdentifier);
		Request request = new Request(methodIdentifier, responseIdentifier);
		request.setParameters(parameters);
		logger.debug("Request " + request + " created.");
		return request;
	}

	/**
	 * @param key
	 * @return The successor of <code>key</code>.
	 * @throws CommunicationException
	 */
	public Node findSuccessor(ID key) throws CommunicationException {
		this.makeSocketAvailable();

		logger.debug("Trying to find successor for ID " + key);

		/* prepare request for method findSuccessor */
		Request request = this.createRequest(MethodConstants.FIND_SUCCESSOR,
				new Serializable[] { key });
		/* send request */
		try {
			logger.debug("Trying to send request " + request);
			this.send(request);
		} catch (CommunicationException ce) {
			logger.debug("Connection failed!");
			throw ce;
		}
		/* wait for response */
		logger.debug("Waiting for response for request " + request);
		Response response = this.waitForResponse(request);
		logger.debug("Response " + response + " arrived.");
		if (response.isFailureResponse()) {
			throw new CommunicationException(response.getFailureReason());
		} else {
			try {
				RemoteNodeInfo nodeInfo = (RemoteNodeInfo) response.getResult();
				if (nodeInfo.getNodeURL().equals(this.urlOfLocalNode)) {
					return Endpoint.getEndpoint(this.urlOfLocalNode).getNode();
				} else {
					return create(nodeInfo.getNodeURL(), this.urlOfLocalNode,
							nodeInfo.getNodeID());
				}
			} catch (ClassCastException e) {
				/*
				 * This should not occur as all nodes should have the same
				 * classes!
				 */
				String message = "Could not understand result! "
						+ response.getResult();
				logger.fatal(message);
				throw new CommunicationException(message, e);
			}
		}
	}

	/**
	 * @return The id of the node represented by this proxy.
	 * @throws CommunicationException
	 */
	private void initializeNodeID() throws CommunicationException {
		if (this.nodeID == null) {
			this.makeSocketAvailable();

			logger.debug("Trying to get node ID ");

			/* prepare request for method findSuccessor */
			Request request = this.createRequest(MethodConstants.GET_NODE_ID,
					new Serializable[0]);
			/* send request */
			try {
				logger.debug("Trying to send request " + request);
				this.send(request);
			} catch (CommunicationException ce) {
				logger.debug("Connection failed!");
				throw ce;
			}
			/* wait for response */
			logger.debug("Waiting for response for request " + request);
			Response response = this.waitForResponse(request);
			logger.debug("Response " + response + " arrived.");
			if (response.isFailureResponse()) {
				throw new CommunicationException(response.getFailureReason());
			} else {
				try {
					this.nodeID = (ID) response.getResult();
				} catch (ClassCastException e) {
					/*
					 * This should not occur as all nodes should have the same
					 * classes!
					 */
					String message = "Could not understand result! "
							+ response.getResult();
					logger.fatal(message);
					throw new CommunicationException(message);
				}
			}
		}
	}

	/**
	 * @param potentialPredecessor
	 * @return List of references for the node invoking this method. See
	 *         {@link Node#notify(Node)}.
	 */
	public List<Node> notify(Node potentialPredecessor)
			throws CommunicationException {
		this.makeSocketAvailable();

		RemoteNodeInfo nodeInfoToSend = new RemoteNodeInfo(potentialPredecessor
				.getNodeURL(), potentialPredecessor.getNodeID());

		Request request = this.createRequest(MethodConstants.NOTIFY,
				new Serializable[] { nodeInfoToSend });

		/* send request to remote node. */
		try {
			this.send(request);
		} catch (CommunicationException e) {
			throw e;
		}

		/* wait for response to arrive */
		Response response = this.waitForResponse(request);
		if (response.isFailureResponse()) {
			throw new CommunicationException(response.getFailureReason());
		} else {
			try {
				List<RemoteNodeInfo> references = (List<RemoteNodeInfo>) response
						.getResult();
				List<Node> nodes = new LinkedList<Node>();
				for (RemoteNodeInfo nodeInfo : references) {
					if (nodeInfo.getNodeURL().equals(this.urlOfLocalNode)) {
						nodes.add(Endpoint.getEndpoint(this.urlOfLocalNode)
								.getNode());
					} else {
						nodes.add(create(nodeInfo.getNodeURL(),
								this.urlOfLocalNode, nodeInfo.getNodeID()));
					}
				}
				return nodes;
			} catch (ClassCastException cce) {
				throw new CommunicationException(
						"Could not understand result! " + response.getResult(),
						cce);
			}
		}
	}

	/**
	 * @throws CommunicationException
	 */
	public void ping() throws CommunicationException {
		this.makeSocketAvailable();

		boolean debugEnabled = SocketProxy.logger.isEnabledFor(DEBUG);

		if (debugEnabled) {
			logger.debug("Trying to ping remote node " + this.nodeURL);
		}

		/* prepare request for method findSuccessor */
		Request request = this.createRequest(MethodConstants.PING,
				new Serializable[0]);
		/* send request */
		try {
			logger.debug("Trying to send request " + request);
			this.send(request);
		} catch (CommunicationException ce) {
			logger.debug("Connection failed!");
			throw ce;
		}
		/* wait for response */
		if (debugEnabled) {
			logger.debug("Waiting for response for request " + request);
		}
		Response response = this.waitForResponse(request);
		if (debugEnabled) {
			logger.debug("Response " + response + " arrived.");
		}
		if (response.isFailureResponse()) {
			throw new CommunicationException(response.getFailureReason());
		} else {
			return;
		}

	}

	/**
	 * @param entry
	 * @throws CommunicationException
	 */
	public void insertEntry(Entry entry) throws CommunicationException {
		this.makeSocketAvailable();

		logger.debug("Trying to insert entry " + entry + ".");

		/* prepare request for method insertEntry */
		Request request = this.createRequest(MethodConstants.INSERT_ENTRY,
				new Serializable[] { entry });
		/* send request */
		try {
			logger.debug("Trying to send request " + request);
			this.send(request);
		} catch (CommunicationException ce) {
			logger.debug("Connection failed!");
			throw ce;
		}
		/* wait for response */
		logger.debug("Waiting for response for request " + request);
		Response response = this.waitForResponse(request);
		logger.debug("Response " + response + " arrived.");
		if (response.isFailureResponse()) {
			throw new CommunicationException(response.getFailureReason());
		} else {
			/* No result here */
			return;
		}
	}

	/**
	 * @param replicas
	 * @throws CommunicationException
	 */
	public void insertReplicas(Set<Entry> replicas)
			throws CommunicationException {
		this.makeSocketAvailable();

		logger.debug("Trying to insert replicas " + replicas + ".");

		/* prepare request for method insertEntry */
		Request request = this.createRequest(MethodConstants.INSERT_REPLICAS,
				new Serializable[] { (Serializable) replicas });
		/* send request */
		try {
			logger.debug("Trying to send request " + request);
			this.send(request);
		} catch (CommunicationException ce) {
			logger.debug("Connection failed!");
			throw ce;
		}
		/* wait for response */
		logger.debug("Waiting for response for request " + request);
		Response response = this.waitForResponse(request);
		logger.debug("Response " + response + " arrived.");
		if (response.isFailureResponse()) {
			throw new CommunicationException(response.getFailureReason());
		} else {
			/* No result here */
			return;
		}
	}

	/**
	 * @param predecessor
	 * @throws CommunicationException
	 */
	public void leavesNetwork(Node predecessor) throws CommunicationException {
		this.makeSocketAvailable();

		logger.debug("Trying to insert notify node that " + predecessor
				+ " leaves network.");

		RemoteNodeInfo nodeInfo = new RemoteNodeInfo(predecessor.getNodeURL(),
				predecessor.getNodeID());

		/* prepare request for method insertEntry */
		Request request = this.createRequest(MethodConstants.LEAVES_NETWORK,
				new Serializable[] { nodeInfo });
		/* send request */
		try {
			logger.debug("Trying to send request " + request);
			this.send(request);
		} catch (CommunicationException ce) {
			logger.debug("Connection failed!");
			throw ce;
		}
		/* wait for response */
		logger.debug("Waiting for response for request " + request);
		Response response = this.waitForResponse(request);
		logger.debug("Response " + response + " arrived.");
		if (response.isFailureResponse()) {
			throw new CommunicationException(response.getFailureReason());
		} else {
			/* No result here */
			return;
		}
	}

	/**
	 * @param entry
	 * @throws CommunicationException
	 */
	public void removeEntry(Entry entry) throws CommunicationException {
		this.makeSocketAvailable();

		logger.debug("Trying to remove entry " + entry + ".");

		/* prepare request for method findSuccessor */
		Request request = this.createRequest(MethodConstants.REMOVE_ENTRY,
				new Serializable[] { entry });
		/* send request */
		try {
			logger.debug("Trying to send request " + request);
			this.send(request);
		} catch (CommunicationException ce) {
			logger.debug("Connection failed!");
			throw ce;
		}
		/* wait for response */
		logger.debug("Waiting for response for request " + request);
		Response response = this.waitForResponse(request);
		logger.debug("Response " + response + " arrived.");
		if (response.isFailureResponse()) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -