⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 socketproxy.java

📁 Chord package into p2psim
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/***************************************************************************
 *                                                                         *
 *                             SocketProxy.java                            *
 *                            -------------------                          *
 *   date                 : 12.08.2004                                     *
 *   copyright            : (C) 2004-2008 Distributed and                  *
 *                              Mobile Systems Group                       *
 *                              Lehrstuhl fuer Praktische Informatik       *
 *                              Universitaet Bamberg                       *
 *                              http://www.uni-bamberg.de/pi/              *
 *   email                : sven.kaffille@uni-bamberg.de                   *
 *                          karsten.loesing@uni-bamberg.de                 *
 *                                                                         *
 *                                                                         *
 ***************************************************************************/

/***************************************************************************
 *                                                                         *
 *   This program is free software; you can redistribute it and/or modify  *
 *   it under the terms of the GNU General Public License as published by  *
 *   the Free Software Foundation; either version 2 of the License, or     *
 *   (at your option) any later version.                                   *
 *                                                                         *
 *   A copy of the license can be found in the license.txt file supplied   *
 *   with this software or at: http://www.gnu.org/copyleft/gpl.html        *
 *                                                                         *
 ***************************************************************************/
package de.uniba.wiai.lspi.chord.com.socket;

import static de.uniba.wiai.lspi.util.logging.Logger.LogLevel.DEBUG;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;

import de.uniba.wiai.lspi.chord.com.CommunicationException;
import de.uniba.wiai.lspi.chord.com.Endpoint;
import de.uniba.wiai.lspi.chord.com.Entry;
import de.uniba.wiai.lspi.chord.com.Node;
import de.uniba.wiai.lspi.chord.com.Proxy;
import de.uniba.wiai.lspi.chord.com.RefsAndEntries;
import de.uniba.wiai.lspi.chord.data.ID;
import de.uniba.wiai.lspi.chord.data.URL;
import de.uniba.wiai.lspi.util.logging.Logger;

/**
 * This is the implementation of {@link Proxy} for the socket protocol. This
 * connects to the {@link SocketEndpoint endpoint} of the node it represents by
 * means of <code>Sockets</code>.
 * 
 * @author sven
 * @version 1.0.5
 */
public final class SocketProxy extends Proxy implements Runnable {

	/**
	 * The logger for instances of this class.
	 */
	private final static Logger logger = Logger.getLogger(SocketProxy.class);

	/**
	 * Map of existing proxies. Key: {@link String}, Value: {@link SocketProxy}.
	 * changed on 21.03.2006 by sven. See documentation of method
	 * {@link #createProxyKey(URL, URL)}
	 * 
	 */
	private static Map<String, SocketProxy> proxies = new HashMap<String, SocketProxy>();

	/**
	 * The {@link URL}of the node that uses this proxy to connect to the node,
	 * which is represented by this proxy.
	 * 
	 */
	private URL urlOfLocalNode = null;

	/**
	 * Counter for requests that have been made by this proxy. Also required to
	 * create unique identifiers for {@link Request requests}.
	 */
	private long requestCounter = -1;

	/**
	 * The socket that provides the connection to the node that this is the
	 * Proxy for. This is transient as a proxy can be transferred over the
	 * network. After transfer this socket has to be restored by reconnecting to
	 * the node.
	 */
	private transient Socket mySocket;

	/**
	 * The {@link ObjectOutputStream}this Proxy writes objects to. This is
	 * transient as a proxy can be transferred over the network. After transfer
	 * this stream has to be restored.
	 */
	private transient ObjectOutputStream out;

	/**
	 * The {@link ObjectInputStream}this Proxy reads objects from. This is
	 * transient as a proxy can be transferred over the network. After transfer
	 * this stream has to be restored.
	 */
	private transient ObjectInputStream in;

	/**
	 * The {@link ObjectInputStream} this Proxy reads objects from. This is
	 * transient as a proxy can be transferred over the network. After transfer
	 * this stream has to be restored.
	 */
	private transient Map<String, Response> responses;

	/**
	 * {@link Map} where threads are put in that are waiting for a repsonse.
	 * Key: identifier of the request (same as for the response). Value: The
	 * Thread itself.
	 */
	private transient Map<String, WaitingThread> waitingThreads;

	/**
	 * This indicates that an exception occured while waiting for responses and
	 * that the connection to the {@link Node node}, that this is the proxy
	 * for, could not be reestablished.
	 */
	private volatile boolean disconnected = false;

	/**
	 * Establishes a connection from <code>urlOfLocalNode</code> to
	 * <code>url</code>. The connection is represented by the returned
	 * <code>SocketProxy</code>.
	 * 
	 * @param url
	 *            The {@link URL} to connect to.
	 * @param urlOfLocalNode
	 *            {@link URL} of local node that establishes the connection.
	 * @return <code>SocketProxy</code> representing the established
	 *         connection.
	 * @throws CommunicationException
	 *             Thrown if establishment of connection to <code>url</code>
	 *             failed.
	 */
	public static SocketProxy create(URL urlOfLocalNode, URL url)
			throws CommunicationException {
		synchronized (proxies) {
			/*
			 * added on 21.03.2006 by sven. See documentation of method
			 * createProxyKey(URL, URL);
			 */
			String proxyKey = SocketProxy.createProxyKey(urlOfLocalNode, url);
			logger.debug("Known proxies " + SocketProxy.proxies.keySet());
			if (proxies.containsKey(proxyKey)) {
				logger.debug("Returning existing proxy for " + url);
				return proxies.get(proxyKey);
			} else {
				logger.debug("Creating new proxy for " + url);
				SocketProxy newProxy = new SocketProxy(url, urlOfLocalNode);
				proxies.put(proxyKey, newProxy);
				return newProxy;
			}
		}
	}

	/**
	 * Closes all outgoing connections to other peers. Allows the local peer to
	 * shutdown cleanly.
	 * 
	 */
	static void shutDownAll() {
		Set<String> keys = proxies.keySet();
		for (String key : keys) {
			proxies.get(key).disconnect();
		}
		proxies.clear();
	}

	/**
	 * Creates a <code>SocketProxy</code> representing the connection from
	 * <code>urlOfLocalNode</code> to <code>url</code>. The connection is
	 * established when the first (remote) invocation with help of the
	 * <code>SocketProxy</code> occurs.
	 * 
	 * @param url
	 *            The {@link URL} of the remote node.
	 * @param urlOfLocalNode
	 *            The {@link URL} of local node.
	 * @param nodeID
	 *            The {@link ID} of the remote node.
	 * @return SocketProxy
	 */
	protected static SocketProxy create(URL url, URL urlOfLocalNode, ID nodeID) {
		synchronized (proxies) {
			/*
			 * added on 21.03.2006 by sven. See documentation of method
			 * createProxyKey(String, String);
			 */
			String proxyKey = SocketProxy.createProxyKey(urlOfLocalNode, url);
			logger.debug("Known proxies " + SocketProxy.proxies.keySet());
			if (proxies.containsKey(proxyKey)) {
				logger.debug("Returning existing proxy for " + url);
				return proxies.get(proxyKey);
			} else {
				logger.debug("Creating new proxy for " + url);
				SocketProxy proxy = new SocketProxy(url, urlOfLocalNode, nodeID);
				proxies.put(proxyKey, proxy);
				return proxy;
			}
		}
	}

	/**
	 * Method that creates a unique key for a SocketProxy to be stored in
	 * {@link #proxies}.
	 * 
	 * This is important for the methods {@link #create(URL, URL)},
	 * {@link #create(URL, URL, ID)}, and {@link #disconnect()}, so that
	 * socket communication also works when it is used within one JVM.
	 * 
	 * Added by sven 21.03.2006, as before SocketProxy were stored in
	 * {@link #proxies} with help of their remote URL as key, so that they were
	 * a kind of singleton for that URL. But the key has to consist of the URL
	 * of the local peer, that uses the proxy, and the remote URL as
	 * SocketProxies must only be (kind of) a singleton per local and remote
	 * URL.
	 * 
	 * @param localURL
	 * @param remoteURL
	 * @return The key to store the SocketProxy
	 */
	private static String createProxyKey(URL localURL, URL remoteURL) {
		return localURL.toString() + "->" + remoteURL.toString();
	}

	/**
	 * Corresponding constructor to factory method {@link #create(URL, URL, ID)}.
	 * 
	 * @see #create(URL, URL, ID)
	 * @param url
	 * @param urlOfLocalNode1
	 * @param nodeID1
	 */
	protected SocketProxy(URL url, URL urlOfLocalNode1, ID nodeID1) {
		super(url);
		if (url == null || urlOfLocalNode1 == null || nodeID1 == null) {
			throw new IllegalArgumentException("null");
		}
		this.urlOfLocalNode = urlOfLocalNode1;
		this.nodeID = nodeID1;
	}

	/**
	 * Corresponding constructor to factory method {@link #create(URL, URL)}.
	 * 
	 * @see #create(URL, URL)
	 * @param url
	 * @param urlOfLocalNode1
	 * @throws CommunicationException
	 */
	private SocketProxy(URL url, URL urlOfLocalNode1)
			throws CommunicationException {
		super(url);
		if (url == null || urlOfLocalNode1 == null) {
			throw new IllegalArgumentException(
					"URLs must not be null!");
		}
		this.urlOfLocalNode = urlOfLocalNode1;
		this.initializeNodeID();
		logger.info("SocketProxy for " + url + " has been created.");
	}

	/**
	 * Private method to send requests over the socket. This method is
	 * synchronized to ensure that no other thread concurrently accesses the
	 * {@link ObjectOutputStream output stream}<code>out</code> while sending
	 * {@link Request request}.
	 * 
	 * @param request
	 *            The {@link Request}to be sent.
	 * @throws CommunicationException
	 *             while writing to {@link ObjectOutputStream output stream}.
	 */
	private synchronized void send(Request request)
			throws CommunicationException {
		try {
			logger.debug("Sending request " + request.getReplyWith());
			this.out.writeObject(request);
			this.out.flush();
			this.out.reset();
		} catch (IOException e) {
			throw new CommunicationException("Could not connect to node "
					+ this.nodeURL, e);
		}
	}

	/**
	 * Private method to create an identifier that enables this to associate a
	 * {@link Response response}with a {@link Request request}made before.
	 * This method is synchronized to protect {@link #requestCounter}from race
	 * conditions.
	 * 
	 * @param methodIdentifier
	 *            Integer identifying the method this method is called from.
	 * @return Unique Identifier for the request.
	 */
	private synchronized String createIdentifier(int methodIdentifier) {
		/* Create unique identifier from */
		StringBuilder uid = new StringBuilder();
		/* Time stamp */
		uid.append(System.currentTimeMillis());
		uid.append("-");
		/* counter and */
		uid.append(this.requestCounter++);
		/* methodIdentifier */
		uid.append("-");
		uid.append(methodIdentifier);
		return uid.toString();
	}

	/**
	 * Called in a method that is delegated to the {@link Node node}, that this
	 * is the proxy for. This method blocks the thread that calls the particular
	 * method until a {@link Response response} is received.
	 * 
	 * @param request
	 * @return The {@link Response} for <code>request</code>.
	 * @throws CommunicationException
	 */
	private Response waitForResponse(Request request)
			throws CommunicationException {

		String responseIdentifier = request.getReplyWith();
		Response response = null;
		logger.debug("Trying to wait for response with identifier "
				+ responseIdentifier + " for method "
				+ MethodConstants.getMethodName(request.getRequestType()));

		synchronized (this.responses) {
			logger.debug("No of responses " + this.responses.size());
			/* Test if we got disconnected while waiting for lock on object */
			if (this.disconnected) {
				throw new CommunicationException("Connection to remote host "
						+ " is broken down. ");
			}
			/*
			 * Test if response is already available (Maybe response arrived
			 * before we reached this point).
			 */
			response = this.responses.remove(responseIdentifier);
			if (response != null) {
				return response;
			}

			/* WAIT FOR RESPONSE */
			/* add current thread to map of threads waiting for a response */
			WaitingThread wt = new WaitingThread(Thread.currentThread());
			this.waitingThreads.put(responseIdentifier, wt);
			while (!wt.hasBeenWokenUp()) {
				try {
					/*
					 * Wait until notified or time out is reached.
					 */
					logger.debug("Waiting for response to arrive.");
					this.responses.wait();
				} catch (InterruptedException e) {
					/*
					 * does not matter as this is intended Thread is interrupted
					 * if response arrives
					 */
				}
			}
			logger.debug("Have been woken up from waiting for response.");

			/* remove thread from map of threads waiting for a response */
			this.waitingThreads.remove(responseIdentifier);
			/* try to get the response if available */
			response = this.responses.remove(responseIdentifier);
			logger.debug("Response for request with identifier "
					+ responseIdentifier + " for method "
					+ MethodConstants.getMethodName(request.getRequestType())
					+ " received.");
			/* if no response availabe */
			if (response == null) {
				logger.debug("No response received.");
				/* we have been disconnected */
				if (this.disconnected) {
					logger.info("Connection to remote host lost.");
					throw new CommunicationException(
							"Connection to remote host " + " is broken down. ");
				}
				/* or time out has elapsed */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -