⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 socketproxy.java

📁 Chord package into p2psim
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
			throw new CommunicationException(response.getFailureReason());
		} else {
			/* No result here */
			return;
		}

	}

	/**
	 * @param sendingNodeID
	 * @param replicas
	 * @throws CommunicationException
	 */
	public void removeReplicas(ID sendingNodeID, Set<Entry> replicas)
			throws CommunicationException {
		this.makeSocketAvailable();

		logger.debug("Trying to remove replicas " + replicas + ".");

		/* prepare request for method insertEntry */
		Request request = this.createRequest(MethodConstants.REMOVE_REPLICAS,
				new Serializable[] { sendingNodeID, (Serializable) replicas });
		/* send request */
		try {
			logger.debug("Trying to send request " + request);
			this.send(request);
		} catch (CommunicationException ce) {
			logger.debug("Connection failed!");
			throw ce;
		}
		/* wait for response */
		logger.debug("Waiting for response for request " + request);
		Response response = this.waitForResponse(request);
		logger.debug("Response " + response + " arrived.");
		if (response.isFailureResponse()) {
			throw new CommunicationException(response.getFailureReason());
		} else {
			/* No result here */
			return;
		}
	}

	public Set<Entry> retrieveEntries(ID id) throws CommunicationException {
		this.makeSocketAvailable();

		logger.debug("Trying to retrieve entries for ID " + id);

		/* prepare request for method findSuccessor */
		Request request = this.createRequest(MethodConstants.RETRIEVE_ENTRIES,
				new Serializable[] { id });
		/* send request */
		try {
			logger.debug("Trying to send request " + request);
			this.send(request);
		} catch (CommunicationException ce) {
			logger.debug("Connection failed!");
			throw ce;
		}
		/* wait for response */
		logger.debug("Waiting for response for request " + request);
		Response response = this.waitForResponse(request);
		logger.debug("Response " + response + " arrived.");
		if (response.isFailureResponse()) {
			throw new CommunicationException(response.getFailureReason(),
					response.getThrowable());
		} else {
			try {
				Set<Entry> result = (Set<Entry>) response.getResult();
				return result;
			} catch (ClassCastException cce) {
				throw new CommunicationException(
						"Could not understand result! " + response.getResult());
			}
		}
	}

	/**
	 * This method has to be called at first in every method that uses the
	 * socket to connect to the node this is the proxy for. This method
	 * establishes the connection if not already done. This method has to be
	 * called as this proxy can be serialized and the reference to the socket is
	 * transient. So by calling this method after a transfer the connection to
	 * the node is reestablished. The same applies for {@link #logger}and
	 * {@link #responses}.
	 * 
	 * @throws CommunicationException
	 */
	private void makeSocketAvailable() throws CommunicationException {
		if (this.disconnected) {
			throw new CommunicationException("Connection from "
					+ this.urlOfLocalNode + " to remote host " + this.nodeURL
					+ " is broken down. ");
		}

		logger.debug("makeSocketAvailable() called. "
				+ "Testing for socket availability");

		if (this.responses == null) {
			this.responses = new HashMap<String, Response>();
		}
		if (this.waitingThreads == null) {
			this.waitingThreads = new HashMap<String, WaitingThread>();
		}
		if (this.mySocket == null) {
			try {
				logger.info("Opening new socket to " + this.nodeURL);
				this.mySocket = new Socket(this.nodeURL.getHost(), this.nodeURL
						.getPort());
				logger.debug("Socket created: " + this.mySocket);
				this.mySocket.setSoTimeout(5000);
				this.out = new ObjectOutputStream(this.mySocket
						.getOutputStream());
				this.in = new ObjectInputStream(this.mySocket.getInputStream());
				logger.debug("Sending connection request!");
				out.writeObject(new Request(MethodConstants.CONNECT,
						"Initial Connection"));
				try {
					// set time out, in case the other side does not answer!
					Response resp = null;
					boolean timedOut = false;
					try {
						logger.debug("Waiting for connection response!");
						resp = (Response) in.readObject();
					} catch (SocketTimeoutException e) {
						logger.info("Connection timed out!");
						timedOut = true;
					}
					this.mySocket.setSoTimeout(0);
					if (timedOut) {
						throw new CommunicationException(
								"Connection to remote host timed out!");
					}
					if (resp != null
							&& resp.getStatus() == Response.REQUEST_SUCCESSFUL) {
						Thread t = new Thread(this, "SocketProxy_Thread_"
								+ this.nodeURL);
						t.start();
					} else {
						throw new CommunicationException(
								"Establishing connection failed!");
					}
				} catch (ClassNotFoundException e) {
					throw new CommunicationException(
							"Unexpected result received! " + e.getMessage(), e);
				} catch (ClassCastException e) {
					throw new CommunicationException(
							"Unexpected result received! " + e.getMessage(), e);
				}
			} catch (UnknownHostException e) {
				throw new CommunicationException("Unknown host: "
						+ this.nodeURL.getHost());
			} catch (IOException ioe) {
				throw new CommunicationException("Could not set up IO channel "
						+ "to host " + this.nodeURL.getHost(), ioe);
			}
		}
		logger.debug("makeSocketAvailable() finished. Socket " + this.mySocket);
	}

	/**
	 * Finalization ensures that the socket is closed if this proxy is not
	 * needed anymore.
	 * 
	 * @throws Throwable
	 */
	protected void finalize() throws Throwable {
		logger.debug("Finalization running.");
	}

	/**
	 * Tells this proxy that it is not needed anymore.
	 */
	public void disconnect() {

		logger.info("Destroying connection from " + this.urlOfLocalNode
				+ " to " + this.nodeURL);

		synchronized (proxies) {
			/*
			 * added on 21.03.2006 by sven. See documentation of method
			 * createProxyKey(String, String);
			 */
			String proxyKey = SocketProxy.createProxyKey(this.urlOfLocalNode,
					this.nodeURL);
			Object o = proxies.remove(proxyKey);
		}
		this.disconnected = true;
		try {
			if (this.out != null) {
				try {
					/*
					 * notify endpoint this is connected to, about shut down of
					 * this proxy
					 */
					logger.debug("Sending shutdown notification to endpoint.");
					Request request = this.createRequest(
							MethodConstants.SHUTDOWN, new Serializable[0]);
					logger.debug("Notification send.");
					this.out.writeObject(request);
					this.out.close();
					this.out = null;
					logger.debug("OutputStream " + this.out + " closed.");
				} catch (IOException e) {
					/* should not occur */
					logger.debug(this
							+ ": Exception during closing of output stream "
							+ this.out, e);
				}
			}
			if (this.in != null) {
				try {
					this.in.close();
					logger.debug("InputStream " + this.in + " closed.");
					this.in = null;
				} catch (IOException e) {
					/* should not occur */
					logger.debug("Exception during closing of input stream"
							+ this.in);
				}
			}
			if (this.mySocket != null) {
				try {
					logger.info("Closing socket " + this.mySocket + ".");
					this.mySocket.close();
				} catch (IOException e) {
					/* should not occur */
					logger.debug("Exception during closing of socket "
							+ this.mySocket);
				}
				this.mySocket = null;
			}
		} catch (Throwable t) {
			logger.warn("Unexpected exception during disconnection of SocketProxy", t);
		}
		this.connectionBrokenDown();
	}

	/**
	 * The run methods waits for incoming
	 * {@link de.uniba.wiai.lspi.chord.com.socket.Response} made by this proxy
	 * and puts them into a datastructure from where the can be collected by the
	 * associated method call that made a
	 * {@link de.uniba.wiai.lspi.chord.com.socket.Request} to the {@link Node},
	 * that this is the proxy for.
	 */
	public void run() {
		while (!this.disconnected) {
			try {
				Response response = (Response) this.in.readObject();
				logger.debug("Response " + response + "received!");
				this.responseReceived(response);
			} catch (ClassNotFoundException cnfe) {
				/* should not occur, as all classes must be locally available */
				logger
						.fatal(
								"ClassNotFoundException occured during deserialization "
										+ "of response. There is something seriously wrong "
										+ " here! ", cnfe);
			} catch (IOException e) {
				if (!this.disconnected) {
					logger.warn("Could not read response from stream!", e);
				} else {
					logger.debug(this + ": Connection has been closed!");
				}
				this.connectionBrokenDown();
			}
		}
	}

	/**
	 * @param potentialPredecessor
	 * @return See {@link Node#notifyAndCopyEntries(Node)}.
	 * @throws CommunicationException
	 */
	public RefsAndEntries notifyAndCopyEntries(Node potentialPredecessor)
			throws CommunicationException {
		this.makeSocketAvailable();

		RemoteNodeInfo nodeInfoToSend = new RemoteNodeInfo(potentialPredecessor
				.getNodeURL(), potentialPredecessor.getNodeID());

		/* prepare request for method notifyAndCopyEntries */
		Request request = this.createRequest(MethodConstants.NOTIFY_AND_COPY,
				new Serializable[] { nodeInfoToSend });
		/* send request */
		try {
			logger.debug("Trying to send request " + request);
			this.send(request);
		} catch (CommunicationException ce) {
			logger.debug("Connection failed!");
			throw ce;
		}
		/* wait for response */
		logger.debug("Waiting for response for request " + request);
		Response response = this.waitForResponse(request);
		logger.debug("Response " + response + " arrived.");
		if (response.isFailureResponse()) {
			throw new CommunicationException(response.getFailureReason(),
					response.getThrowable());
		} else {
			try {
				RemoteRefsAndEntries result = (RemoteRefsAndEntries) response
						.getResult();
				List<Node> newReferences = new LinkedList<Node>();
				List<RemoteNodeInfo> references = result.getNodeInfos();
				for (RemoteNodeInfo nodeInfo : references) {
					if (nodeInfo.getNodeURL().equals(this.urlOfLocalNode)) {
						newReferences.add(Endpoint.getEndpoint(
								this.urlOfLocalNode).getNode());
					} else {
						newReferences.add(create(nodeInfo.getNodeURL(),
								this.urlOfLocalNode, nodeInfo.getNodeID()));
					}
				}
				return new RefsAndEntries(newReferences, result.getEntries());
			} catch (ClassCastException cce) {
				throw new CommunicationException(
						"Could not understand result! " + response.getResult());
			}
		}
	}

	/**
	 * The string representation of this proxy. Created when {@link #toString()}
	 * is invoked for the first time.
	 */
	private String stringRepresentation = null;

	/**
	 * @return String representation of this.
	 */
	public String toString() {
		if (this.nodeID == null || this.mySocket == null) {
			return "Unconnected SocketProxy from " + this.urlOfLocalNode + " to " + this.nodeURL; 
		}
		if (this.stringRepresentation == null) {
			StringBuilder builder = new StringBuilder();
			builder.append("Connection from Node[url=");
			builder.append(this.urlOfLocalNode);
			builder.append(", socket=");
			builder.append(this.mySocket);
			builder.append("] to Node[id=");
			builder.append(this.nodeID);
			builder.append(", url=");
			builder.append(this.nodeURL);
			builder.append("]");
			this.stringRepresentation = builder.toString();
		}
		return this.stringRepresentation;
	}

	/**
	 * Wraps a thread, which is waiting for a response.
	 * 
	 * @author sven
	 * 
	 */
	private static class WaitingThread {

		private boolean hasBeenWokenUp = false;

		private Thread thread;

		private WaitingThread(Thread thread) {
			this.thread = thread;
		}

		/**
		 * Returns <code>true</code> when the thread has been woken up by
		 * invoking {@link #wakeUp()}
		 * 
		 * @return
		 */
		boolean hasBeenWokenUp() {
			return this.hasBeenWokenUp;
		}

		/**
		 * Wake up the thread that is waiting for a response.
		 * 
		 */
		void wakeUp() {
			this.hasBeenWokenUp = true;
			this.thread.interrupt();
		}

		public String toString() {
			return this.thread.toString() + ": Waiting? "
					+ !this.hasBeenWokenUp();
		}
	}

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -