📄 socketproxy.java
字号:
new Serializable[] { nodeInfo });
/* send request */
try {
logger.debug("Trying to send request " + request);
this.send(request);
} catch (CommunicationException ce) {
logger.debug("Connection failed!");
throw ce;
}
/* wait for response */
logger.debug("Waiting for response for request " + request);
Response response = this.waitForResponse(request);
logger.debug("Response " + response + " arrived.");
if (response.isFailureResponse()) {
throw new CommunicationException(response.getFailureReason());
} else {
/* No result here */
return;
}
}
/**
* @param entry
* @throws CommunicationException
*/
public void removeEntry(Entry entry) throws CommunicationException {
this.makeSocketAvailable();
logger.debug("Trying to remove entry " + entry + ".");
/* prepare request for method findSuccessor */
Request request = this.createRequest(MethodConstants.REMOVE_ENTRY,
new Serializable[] { entry });
/* send request */
try {
logger.debug("Trying to send request " + request);
this.send(request);
} catch (CommunicationException ce) {
logger.debug("Connection failed!");
throw ce;
}
/* wait for response */
logger.debug("Waiting for response for request " + request);
Response response = this.waitForResponse(request);
logger.debug("Response " + response + " arrived.");
if (response.isFailureResponse()) {
throw new CommunicationException(response.getFailureReason());
} else {
/* No result here */
return;
}
}
/**
* @param sendingNodeID
* @param replicas
* @throws CommunicationException
*/
public void removeReplicas(ID sendingNodeID, Set<Entry> replicas)
throws CommunicationException {
this.makeSocketAvailable();
logger.debug("Trying to remove replicas " + replicas + ".");
/* prepare request for method insertEntry */
Request request = this.createRequest(MethodConstants.REMOVE_REPLICAS,
new Serializable[] { sendingNodeID, (Serializable) replicas });
/* send request */
try {
logger.debug("Trying to send request " + request);
this.send(request);
} catch (CommunicationException ce) {
logger.debug("Connection failed!");
throw ce;
}
/* wait for response */
logger.debug("Waiting for response for request " + request);
Response response = this.waitForResponse(request);
logger.debug("Response " + response + " arrived.");
if (response.isFailureResponse()) {
throw new CommunicationException(response.getFailureReason());
} else {
/* No result here */
return;
}
}
public Set<Entry> retrieveEntries(ID id) throws CommunicationException {
this.makeSocketAvailable();
logger.debug("Trying to retrieve entries for ID " + id);
/* prepare request for method findSuccessor */
Request request = this.createRequest(MethodConstants.RETRIEVE_ENTRIES,
new Serializable[] { id });
/* send request */
try {
logger.debug("Trying to send request " + request);
this.send(request);
} catch (CommunicationException ce) {
logger.debug("Connection failed!");
throw ce;
}
/* wait for response */
logger.debug("Waiting for response for request " + request);
Response response = this.waitForResponse(request);
logger.debug("Response " + response + " arrived.");
if (response.isFailureResponse()) {
throw new CommunicationException(response.getFailureReason(),
response.getThrowable());
} else {
try {
Set<Entry> result = (Set<Entry>) response.getResult();
return result;
} catch (ClassCastException cce) {
throw new CommunicationException(
"Could not understand result! " + response.getResult());
}
}
}
/**
* This method has to be called at first in every method that uses the
* socket to connect to the node this is the proxy for. This method
* establishes the connection if not already done. This method has to be
* called as this proxy can be serialized and the reference to the socket is
* transient. So by calling this method after a transfer the connection to
* the node is reestablished. The same applies for {@link #logger}and
* {@link #responses}.
*
* @throws CommunicationException
*/
private synchronized void makeSocketAvailable()
throws CommunicationException {
if (this.disconnected) {
throw new CommunicationException("Connection to remote host "
+ " is broken down. ");
}
logger.debug("makeSocketAvailable() called. "
+ "Testing for socket availability");
if (this.responses == null) {
this.responses = new HashMap<String, Response>();
}
if (this.waitingThreads == null) {
this.waitingThreads = new HashMap<String, Thread>();
}
if (this.mySocket == null) {
try {
logger.info("Opening new socket to " + this.nodeURL);
this.mySocket = new Socket(this.nodeURL.getHost(), this.nodeURL
.getPort());
logger.debug("Socket created: " + this.mySocket);
this.out = new ObjectOutputStream(this.mySocket
.getOutputStream());
this.in = new ObjectInputStream(this.mySocket.getInputStream());
Thread t = new Thread(this, "SocketProxy_Thread_"
+ this.nodeURL);
t.start();
// new PingThread().start();
} catch (UnknownHostException e) {
throw new CommunicationException("Unknown host: "
+ this.nodeURL.getHost());
} catch (IOException ioe) {
throw new CommunicationException("Could not set up IO channel "
+ "to host " + this.nodeURL.getHost(), ioe);
}
}
logger.debug("makeSocketAvailable() finished. Socket " + this.mySocket);
}
/**
* Finalization ensures that the socket is closed if this proxy is not
* needed anymore.
*
* @throws Throwable
*/
protected void finalize() throws Throwable {
logger.debug("Finalization running.");
}
/**
* Tells this proxy that it is not needed anymore.
*/
public synchronized void disconnect() {
logger.info("Destroying connection to url " + this.nodeURL);
synchronized (proxies) {
/*
* added on 21.03.2006 by sven. See documentation of method
* createProxyKey(String, String);
*/
String proxyKey = SocketProxy.createProxyKey(this.urlOfLocalNode, this.nodeURL);
proxies.remove(proxyKey);
}
this.disconnected = true;
try {
if (this.out != null) {
try {
/*
* notify endpoint this is connected to, about shut down of
* this proxy
*/
logger.debug("Sending shutdown notification to endpoint.");
Request request = this.createRequest(
MethodConstants.SHUTDOWN, new Serializable[0]);
logger.debug("Notification send.");
this.out.writeObject(request);
this.out.close();
this.out = null;
logger.debug("OutputStream " + this.out + " closed.");
} catch (IOException e) {
/* should not occur */
logger.warn("Exception during closing of output stream "
+ this.out);
}
}
if (this.in != null) {
try {
this.in.close();
logger.debug("InputStream " + this.in + " closed.");
this.in = null;
} catch (IOException e) {
/* should not occur */
logger.warn("Exception during closing of input stream"
+ this.in);
}
}
if (this.mySocket != null) {
try {
logger.info("Closing socket " + this.mySocket + ".");
this.mySocket.close();
this.mySocket = null;
} catch (IOException e) {
/* should not occur */
logger.warn("Exception during closing of socket "
+ this.mySocket);
}
}
} catch (Throwable t) {
logger.warn("Exception during disconnection of SocketProxy", t);
t.printStackTrace();
}
this.connectionBrokenDown();
}
/**
* The run methods waits for incoming {@link Response responses}made by
* this proxy and puts them into a datastructure from where the can be
* collected by the associated method call that made a
* {@link Request request}to the {@link Node node}, that this is the proxy
* for.
*/
public void run() {
while (!this.disconnected) {
try {
Response response = (Response) this.in.readObject();
logger.debug("Response " + response + "received!");
this.responseReceived(response);
} catch (ClassNotFoundException cnfe) {
/* should not occur, as all classes must be locally available */
logger
.fatal(
"ClassNotFoundException occured during deserialization "
+ "of response. There is something seriously wrong "
+ " here! ", cnfe);
} catch (IOException e) {
if (!this.disconnected) {
logger.warn("Could not read response from stream!", e);
} else {
logger.debug(this + ": Connection has been closed!");
}
this.connectionBrokenDown();
}
}
}
/**
* @param potentialPredecessor
* @return See {@link Node#notifyAndCopyEntries(Node)}.
* @throws CommunicationException
*/
public RefsAndEntries notifyAndCopyEntries(Node potentialPredecessor)
throws CommunicationException {
this.makeSocketAvailable();
RemoteNodeInfo nodeInfoToSend = new RemoteNodeInfo(
potentialPredecessor.nodeURL, potentialPredecessor.nodeID);
/* prepare request for method notifyAndCopyEntries */
Request request = this.createRequest(MethodConstants.NOTIFY_AND_COPY,
new Serializable[] { nodeInfoToSend });
/* send request */
try {
logger.debug("Trying to send request " + request);
this.send(request);
} catch (CommunicationException ce) {
logger.debug("Connection failed!");
throw ce;
}
/* wait for response */
logger.debug("Waiting for response for request " + request);
Response response = this.waitForResponse(request);
logger.debug("Response " + response + " arrived.");
if (response.isFailureResponse()) {
throw new CommunicationException(response.getFailureReason(),
response.getThrowable());
} else {
try {
RemoteRefsAndEntries result = (RemoteRefsAndEntries) response
.getResult();
List<Node> newReferences = new LinkedList<Node>();
List<RemoteNodeInfo> references = result.getNodeInfos();
for (RemoteNodeInfo nodeInfo : references) {
if (nodeInfo.getNodeURL().equals(this.urlOfLocalNode)) {
newReferences.add(Endpoint.getEndpoint(
this.urlOfLocalNode).getNode());
} else {
newReferences.add(create(nodeInfo.getNodeURL(),
this.urlOfLocalNode, nodeInfo.getNodeID()));
}
}
return new RefsAndEntries(newReferences, result.getEntries());
} catch (ClassCastException cce) {
throw new CommunicationException(
"Could not understand result! " + response.getResult());
}
}
}
/**
* The string representation of this proxy. Created when {@link #toString()}
* is invoked for the first time.
*/
private String stringRepresentation = null;
/**
* @return String representation of this.
*/
public String toString() {
if (this.stringRepresentation == null) {
StringBuilder builder = new StringBuilder();
builder.append("(SocketProxy. Connection from ");
builder.append(this.urlOfLocalNode);
builder.append(" to ");
builder.append(this.nodeURL);
builder.append(" with id ");
builder.append(this.nodeID);
builder.append(". Using socket ");
builder.append(this.mySocket);
builder.append(")");
this.stringRepresentation = builder.toString();
}
return this.stringRepresentation;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -