📄 socketproxy.java
字号:
else {
logger.error("There is no result, but we have not been "
+ "disconnected. Something went seriously wrong!");
throw new CommunicationException(
"Did not receive a response!");
}
}
}
return response;
}
/**
* This method is called by {@link #run()}when it receives a
* {@link Response}. The {@link Thread thread}waiting for response is
* woken up and the response is put into {@link Map responses}.
*
* @param response
*/
private void responseReceived(Response response) {
synchronized (this.responses) {
/* Try to fetch thread waiting for this response */
logger.debug("No of waiting threads " + this.waitingThreads);
WaitingThread waitingThread = this.waitingThreads.get(response
.getInReplyTo());
logger.debug("Response with id " + response.getInReplyTo()
+ "received.");
/* save response */
this.responses.put(response.getInReplyTo(), response);
/* if there is a thread waiting for this response */
if (waitingThread != null) {
/* wake up the thread */
logger.debug("Waking up thread!");
waitingThread.wakeUp();
} else {
// TODO what else? why 'else' anyway?
}
}
}
/**
* Method to indicate that connection to remote {@link Node node} is broken
* down.
*/
private void connectionBrokenDown() {
if (this.responses == null) {
/*
* Nothing to do!
*/
return;
}
/* synchronize on responses, as all threads accessing this proxy do so */
synchronized (this.responses) {
logger.info("Connection broken down!");
this.disconnected = true;
/* wake up all threads */
for (WaitingThread thread : this.waitingThreads.values()) {
logger.debug("Interrupting waiting thread " + thread);
thread.wakeUp();
}
}
}
/**
* Creates a request for the method identified by
* <code>methodIdentifier</code> with the parameters
* <code>parameters</code>. Sets also field
* {@link Request#getReplyWith()}of created {@link Request request}.
*
* @param methodIdentifier
* The identifier of the method to request.
* @param parameters
* The parameters for the request.
* @return The {@link Request request}created.
*/
private Request createRequest(int methodIdentifier,
Serializable[] parameters) {
if (logger.isEnabledFor(DEBUG)) {
logger.debug("Creating request for method "
+ MethodConstants.getMethodName(methodIdentifier)
+ " with parameters "
+ java.util.Arrays.deepToString(parameters));
}
String responseIdentifier = this.createIdentifier(methodIdentifier);
Request request = new Request(methodIdentifier, responseIdentifier);
request.setParameters(parameters);
logger.debug("Request " + request + " created.");
return request;
}
/**
* @param key
* @return The successor of <code>key</code>.
* @throws CommunicationException
*/
public Node findSuccessor(ID key) throws CommunicationException {
this.makeSocketAvailable();
logger.debug("Trying to find successor for ID " + key);
/* prepare request for method findSuccessor */
Request request = this.createRequest(MethodConstants.FIND_SUCCESSOR,
new Serializable[] { key });
/* send request */
try {
logger.debug("Trying to send request " + request);
this.send(request);
} catch (CommunicationException ce) {
logger.debug("Connection failed!");
throw ce;
}
/* wait for response */
logger.debug("Waiting for response for request " + request);
Response response = this.waitForResponse(request);
logger.debug("Response " + response + " arrived.");
if (response.isFailureResponse()) {
throw new CommunicationException(response.getFailureReason());
} else {
try {
RemoteNodeInfo nodeInfo = (RemoteNodeInfo) response.getResult();
if (nodeInfo.getNodeURL().equals(this.urlOfLocalNode)) {
return Endpoint.getEndpoint(this.urlOfLocalNode).getNode();
} else {
return create(nodeInfo.getNodeURL(), this.urlOfLocalNode,
nodeInfo.getNodeID());
}
} catch (ClassCastException e) {
/*
* This should not occur as all nodes should have the same
* classes!
*/
String message = "Could not understand result! "
+ response.getResult();
logger.fatal(message);
throw new CommunicationException(message, e);
}
}
}
/**
* @return The id of the node represented by this proxy.
* @throws CommunicationException
*/
private void initializeNodeID() throws CommunicationException {
if (this.nodeID == null) {
this.makeSocketAvailable();
logger.debug("Trying to get node ID ");
/* prepare request for method findSuccessor */
Request request = this.createRequest(MethodConstants.GET_NODE_ID,
new Serializable[0]);
/* send request */
try {
logger.debug("Trying to send request " + request);
this.send(request);
} catch (CommunicationException ce) {
logger.debug("Connection failed!");
throw ce;
}
/* wait for response */
logger.debug("Waiting for response for request " + request);
Response response = this.waitForResponse(request);
logger.debug("Response " + response + " arrived.");
if (response.isFailureResponse()) {
throw new CommunicationException(response.getFailureReason());
} else {
try {
this.nodeID = (ID) response.getResult();
} catch (ClassCastException e) {
/*
* This should not occur as all nodes should have the same
* classes!
*/
String message = "Could not understand result! "
+ response.getResult();
logger.fatal(message);
throw new CommunicationException(message);
}
}
}
}
/**
* @param potentialPredecessor
* @return List of references for the node invoking this method. See
* {@link Node#notify(Node)}.
*/
public List<Node> notify(Node potentialPredecessor)
throws CommunicationException {
this.makeSocketAvailable();
RemoteNodeInfo nodeInfoToSend = new RemoteNodeInfo(potentialPredecessor
.getNodeURL(), potentialPredecessor.getNodeID());
Request request = this.createRequest(MethodConstants.NOTIFY,
new Serializable[] { nodeInfoToSend });
/* send request to remote node. */
try {
this.send(request);
} catch (CommunicationException e) {
throw e;
}
/* wait for response to arrive */
Response response = this.waitForResponse(request);
if (response.isFailureResponse()) {
throw new CommunicationException(response.getFailureReason());
} else {
try {
List<RemoteNodeInfo> references = (List<RemoteNodeInfo>) response
.getResult();
List<Node> nodes = new LinkedList<Node>();
for (RemoteNodeInfo nodeInfo : references) {
if (nodeInfo.getNodeURL().equals(this.urlOfLocalNode)) {
nodes.add(Endpoint.getEndpoint(this.urlOfLocalNode)
.getNode());
} else {
nodes.add(create(nodeInfo.getNodeURL(),
this.urlOfLocalNode, nodeInfo.getNodeID()));
}
}
return nodes;
} catch (ClassCastException cce) {
throw new CommunicationException(
"Could not understand result! " + response.getResult(),
cce);
}
}
}
/**
* @throws CommunicationException
*/
public void ping() throws CommunicationException {
this.makeSocketAvailable();
boolean debugEnabled = SocketProxy.logger.isEnabledFor(DEBUG);
if (debugEnabled) {
logger.debug("Trying to ping remote node " + this.nodeURL);
}
/* prepare request for method findSuccessor */
Request request = this.createRequest(MethodConstants.PING,
new Serializable[0]);
/* send request */
try {
logger.debug("Trying to send request " + request);
this.send(request);
} catch (CommunicationException ce) {
logger.debug("Connection failed!");
throw ce;
}
/* wait for response */
if (debugEnabled) {
logger.debug("Waiting for response for request " + request);
}
Response response = this.waitForResponse(request);
if (debugEnabled) {
logger.debug("Response " + response + " arrived.");
}
if (response.isFailureResponse()) {
throw new CommunicationException(response.getFailureReason());
} else {
return;
}
}
/**
* @param entry
* @throws CommunicationException
*/
public void insertEntry(Entry entry) throws CommunicationException {
this.makeSocketAvailable();
logger.debug("Trying to insert entry " + entry + ".");
/* prepare request for method insertEntry */
Request request = this.createRequest(MethodConstants.INSERT_ENTRY,
new Serializable[] { entry });
/* send request */
try {
logger.debug("Trying to send request " + request);
this.send(request);
} catch (CommunicationException ce) {
logger.debug("Connection failed!");
throw ce;
}
/* wait for response */
logger.debug("Waiting for response for request " + request);
Response response = this.waitForResponse(request);
logger.debug("Response " + response + " arrived.");
if (response.isFailureResponse()) {
throw new CommunicationException(response.getFailureReason());
} else {
/* No result here */
return;
}
}
/**
* @param replicas
* @throws CommunicationException
*/
public void insertReplicas(Set<Entry> replicas)
throws CommunicationException {
this.makeSocketAvailable();
logger.debug("Trying to insert replicas " + replicas + ".");
/* prepare request for method insertEntry */
Request request = this.createRequest(MethodConstants.INSERT_REPLICAS,
new Serializable[] { (Serializable) replicas });
/* send request */
try {
logger.debug("Trying to send request " + request);
this.send(request);
} catch (CommunicationException ce) {
logger.debug("Connection failed!");
throw ce;
}
/* wait for response */
logger.debug("Waiting for response for request " + request);
Response response = this.waitForResponse(request);
logger.debug("Response " + response + " arrived.");
if (response.isFailureResponse()) {
throw new CommunicationException(response.getFailureReason());
} else {
/* No result here */
return;
}
}
/**
* @param predecessor
* @throws CommunicationException
*/
public void leavesNetwork(Node predecessor) throws CommunicationException {
this.makeSocketAvailable();
logger.debug("Trying to insert notify node that " + predecessor
+ " leaves network.");
RemoteNodeInfo nodeInfo = new RemoteNodeInfo(predecessor.getNodeURL(),
predecessor.getNodeID());
/* prepare request for method insertEntry */
Request request = this.createRequest(MethodConstants.LEAVES_NETWORK,
new Serializable[] { nodeInfo });
/* send request */
try {
logger.debug("Trying to send request " + request);
this.send(request);
} catch (CommunicationException ce) {
logger.debug("Connection failed!");
throw ce;
}
/* wait for response */
logger.debug("Waiting for response for request " + request);
Response response = this.waitForResponse(request);
logger.debug("Response " + response + " arrived.");
if (response.isFailureResponse()) {
throw new CommunicationException(response.getFailureReason());
} else {
/* No result here */
return;
}
}
/**
* @param entry
* @throws CommunicationException
*/
public void removeEntry(Entry entry) throws CommunicationException {
this.makeSocketAvailable();
logger.debug("Trying to remove entry " + entry + ".");
/* prepare request for method findSuccessor */
Request request = this.createRequest(MethodConstants.REMOVE_ENTRY,
new Serializable[] { entry });
/* send request */
try {
logger.debug("Trying to send request " + request);
this.send(request);
} catch (CommunicationException ce) {
logger.debug("Connection failed!");
throw ce;
}
/* wait for response */
logger.debug("Waiting for response for request " + request);
Response response = this.waitForResponse(request);
logger.debug("Response " + response + " arrived.");
if (response.isFailureResponse()) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -