📄 socketproxy.java
字号:
throw new CommunicationException(response.getFailureReason());
} else {
/* No result here */
return;
}
}
/**
* @param sendingNodeID
* @param replicas
* @throws CommunicationException
*/
public void removeReplicas(ID sendingNodeID, Set<Entry> replicas)
throws CommunicationException {
this.makeSocketAvailable();
logger.debug("Trying to remove replicas " + replicas + ".");
/* prepare request for method insertEntry */
Request request = this.createRequest(MethodConstants.REMOVE_REPLICAS,
new Serializable[] { sendingNodeID, (Serializable) replicas });
/* send request */
try {
logger.debug("Trying to send request " + request);
this.send(request);
} catch (CommunicationException ce) {
logger.debug("Connection failed!");
throw ce;
}
/* wait for response */
logger.debug("Waiting for response for request " + request);
Response response = this.waitForResponse(request);
logger.debug("Response " + response + " arrived.");
if (response.isFailureResponse()) {
throw new CommunicationException(response.getFailureReason());
} else {
/* No result here */
return;
}
}
public Set<Entry> retrieveEntries(ID id) throws CommunicationException {
this.makeSocketAvailable();
logger.debug("Trying to retrieve entries for ID " + id);
/* prepare request for method findSuccessor */
Request request = this.createRequest(MethodConstants.RETRIEVE_ENTRIES,
new Serializable[] { id });
/* send request */
try {
logger.debug("Trying to send request " + request);
this.send(request);
} catch (CommunicationException ce) {
logger.debug("Connection failed!");
throw ce;
}
/* wait for response */
logger.debug("Waiting for response for request " + request);
Response response = this.waitForResponse(request);
logger.debug("Response " + response + " arrived.");
if (response.isFailureResponse()) {
throw new CommunicationException(response.getFailureReason(),
response.getThrowable());
} else {
try {
Set<Entry> result = (Set<Entry>) response.getResult();
return result;
} catch (ClassCastException cce) {
throw new CommunicationException(
"Could not understand result! " + response.getResult());
}
}
}
/**
* This method has to be called at first in every method that uses the
* socket to connect to the node this is the proxy for. This method
* establishes the connection if not already done. This method has to be
* called as this proxy can be serialized and the reference to the socket is
* transient. So by calling this method after a transfer the connection to
* the node is reestablished. The same applies for {@link #logger}and
* {@link #responses}.
*
* @throws CommunicationException
*/
private void makeSocketAvailable() throws CommunicationException {
if (this.disconnected) {
throw new CommunicationException("Connection from "
+ this.urlOfLocalNode + " to remote host " + this.nodeURL
+ " is broken down. ");
}
logger.debug("makeSocketAvailable() called. "
+ "Testing for socket availability");
if (this.responses == null) {
this.responses = new HashMap<String, Response>();
}
if (this.waitingThreads == null) {
this.waitingThreads = new HashMap<String, WaitingThread>();
}
if (this.mySocket == null) {
try {
logger.info("Opening new socket to " + this.nodeURL);
this.mySocket = new Socket(this.nodeURL.getHost(), this.nodeURL
.getPort());
logger.debug("Socket created: " + this.mySocket);
this.mySocket.setSoTimeout(5000);
this.out = new ObjectOutputStream(this.mySocket
.getOutputStream());
this.in = new ObjectInputStream(this.mySocket.getInputStream());
logger.debug("Sending connection request!");
out.writeObject(new Request(MethodConstants.CONNECT,
"Initial Connection"));
try {
// set time out, in case the other side does not answer!
Response resp = null;
boolean timedOut = false;
try {
logger.debug("Waiting for connection response!");
resp = (Response) in.readObject();
} catch (SocketTimeoutException e) {
logger.info("Connection timed out!");
timedOut = true;
}
this.mySocket.setSoTimeout(0);
if (timedOut) {
throw new CommunicationException(
"Connection to remote host timed out!");
}
if (resp != null
&& resp.getStatus() == Response.REQUEST_SUCCESSFUL) {
Thread t = new Thread(this, "SocketProxy_Thread_"
+ this.nodeURL);
t.start();
} else {
throw new CommunicationException(
"Establishing connection failed!");
}
} catch (ClassNotFoundException e) {
throw new CommunicationException(
"Unexpected result received! " + e.getMessage(), e);
} catch (ClassCastException e) {
throw new CommunicationException(
"Unexpected result received! " + e.getMessage(), e);
}
} catch (UnknownHostException e) {
throw new CommunicationException("Unknown host: "
+ this.nodeURL.getHost());
} catch (IOException ioe) {
throw new CommunicationException("Could not set up IO channel "
+ "to host " + this.nodeURL.getHost(), ioe);
}
}
logger.debug("makeSocketAvailable() finished. Socket " + this.mySocket);
}
/**
* Finalization ensures that the socket is closed if this proxy is not
* needed anymore.
*
* @throws Throwable
*/
protected void finalize() throws Throwable {
logger.debug("Finalization running.");
}
/**
* Tells this proxy that it is not needed anymore.
*/
public void disconnect() {
logger.info("Destroying connection from " + this.urlOfLocalNode
+ " to " + this.nodeURL);
synchronized (proxies) {
/*
* added on 21.03.2006 by sven. See documentation of method
* createProxyKey(String, String);
*/
String proxyKey = SocketProxy.createProxyKey(this.urlOfLocalNode,
this.nodeURL);
Object o = proxies.remove(proxyKey);
}
this.disconnected = true;
try {
if (this.out != null) {
try {
/*
* notify endpoint this is connected to, about shut down of
* this proxy
*/
logger.debug("Sending shutdown notification to endpoint.");
Request request = this.createRequest(
MethodConstants.SHUTDOWN, new Serializable[0]);
logger.debug("Notification send.");
this.out.writeObject(request);
this.out.close();
this.out = null;
logger.debug("OutputStream " + this.out + " closed.");
} catch (IOException e) {
/* should not occur */
logger.debug(this
+ ": Exception during closing of output stream "
+ this.out, e);
}
}
if (this.in != null) {
try {
this.in.close();
logger.debug("InputStream " + this.in + " closed.");
this.in = null;
} catch (IOException e) {
/* should not occur */
logger.debug("Exception during closing of input stream"
+ this.in);
}
}
if (this.mySocket != null) {
try {
logger.info("Closing socket " + this.mySocket + ".");
this.mySocket.close();
} catch (IOException e) {
/* should not occur */
logger.debug("Exception during closing of socket "
+ this.mySocket);
}
this.mySocket = null;
}
} catch (Throwable t) {
logger.warn("Unexpected exception during disconnection of SocketProxy", t);
}
this.connectionBrokenDown();
}
/**
* The run methods waits for incoming
* {@link de.uniba.wiai.lspi.chord.com.socket.Response} made by this proxy
* and puts them into a datastructure from where the can be collected by the
* associated method call that made a
* {@link de.uniba.wiai.lspi.chord.com.socket.Request} to the {@link Node},
* that this is the proxy for.
*/
public void run() {
while (!this.disconnected) {
try {
Response response = (Response) this.in.readObject();
logger.debug("Response " + response + "received!");
this.responseReceived(response);
} catch (ClassNotFoundException cnfe) {
/* should not occur, as all classes must be locally available */
logger
.fatal(
"ClassNotFoundException occured during deserialization "
+ "of response. There is something seriously wrong "
+ " here! ", cnfe);
} catch (IOException e) {
if (!this.disconnected) {
logger.warn("Could not read response from stream!", e);
} else {
logger.debug(this + ": Connection has been closed!");
}
this.connectionBrokenDown();
}
}
}
/**
* @param potentialPredecessor
* @return See {@link Node#notifyAndCopyEntries(Node)}.
* @throws CommunicationException
*/
public RefsAndEntries notifyAndCopyEntries(Node potentialPredecessor)
throws CommunicationException {
this.makeSocketAvailable();
RemoteNodeInfo nodeInfoToSend = new RemoteNodeInfo(potentialPredecessor
.getNodeURL(), potentialPredecessor.getNodeID());
/* prepare request for method notifyAndCopyEntries */
Request request = this.createRequest(MethodConstants.NOTIFY_AND_COPY,
new Serializable[] { nodeInfoToSend });
/* send request */
try {
logger.debug("Trying to send request " + request);
this.send(request);
} catch (CommunicationException ce) {
logger.debug("Connection failed!");
throw ce;
}
/* wait for response */
logger.debug("Waiting for response for request " + request);
Response response = this.waitForResponse(request);
logger.debug("Response " + response + " arrived.");
if (response.isFailureResponse()) {
throw new CommunicationException(response.getFailureReason(),
response.getThrowable());
} else {
try {
RemoteRefsAndEntries result = (RemoteRefsAndEntries) response
.getResult();
List<Node> newReferences = new LinkedList<Node>();
List<RemoteNodeInfo> references = result.getNodeInfos();
for (RemoteNodeInfo nodeInfo : references) {
if (nodeInfo.getNodeURL().equals(this.urlOfLocalNode)) {
newReferences.add(Endpoint.getEndpoint(
this.urlOfLocalNode).getNode());
} else {
newReferences.add(create(nodeInfo.getNodeURL(),
this.urlOfLocalNode, nodeInfo.getNodeID()));
}
}
return new RefsAndEntries(newReferences, result.getEntries());
} catch (ClassCastException cce) {
throw new CommunicationException(
"Could not understand result! " + response.getResult());
}
}
}
/**
* The string representation of this proxy. Created when {@link #toString()}
* is invoked for the first time.
*/
private String stringRepresentation = null;
/**
* @return String representation of this.
*/
public String toString() {
if (this.nodeID == null || this.mySocket == null) {
return "Unconnected SocketProxy from " + this.urlOfLocalNode + " to " + this.nodeURL;
}
if (this.stringRepresentation == null) {
StringBuilder builder = new StringBuilder();
builder.append("Connection from Node[url=");
builder.append(this.urlOfLocalNode);
builder.append(", socket=");
builder.append(this.mySocket);
builder.append("] to Node[id=");
builder.append(this.nodeID);
builder.append(", url=");
builder.append(this.nodeURL);
builder.append("]");
this.stringRepresentation = builder.toString();
}
return this.stringRepresentation;
}
/**
* Wraps a thread, which is waiting for a response.
*
* @author sven
*
*/
private static class WaitingThread {
private boolean hasBeenWokenUp = false;
private Thread thread;
private WaitingThread(Thread thread) {
this.thread = thread;
}
/**
* Returns <code>true</code> when the thread has been woken up by
* invoking {@link #wakeUp()}
*
* @return
*/
boolean hasBeenWokenUp() {
return this.hasBeenWokenUp;
}
/**
* Wake up the thread that is waiting for a response.
*
*/
void wakeUp() {
this.hasBeenWokenUp = true;
this.thread.interrupt();
}
public String toString() {
return this.thread.toString() + ": Waiting? "
+ !this.hasBeenWokenUp();
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -