📄 requesthandler.java
字号:
return;
}
logger.debug("Trying to send failure response. Failure reason "
+ failure);
Response failureResponse = new Response(Response.REQUEST_FAILED,
request.getRequestType(), request.getReplyWith());
failureResponse.setFailureReason(failure);
failureResponse.setThrowable(t);
try {
synchronized (this.out) {
this.out.writeObject(failureResponse);
this.out.flush();
this.out.reset();
}
logger.debug("Response send.");
} catch (IOException e) {
if (this.connected) {
logger.debug("Connection seems to be broken down. Could not "
+ "send failure response. Connection is closed. ", e);
this.disconnect();
}
}
}
/**
* Invokes methods on {@link #node}.
*
* @param methodType
* The type of the method to invoke. See {@link MethodConstants}.
* @param parameters
* The parameters to pass to the method.
* @return The result of the invoked method. May be <code>null</code> if
* method is void.
* @throws Exception
*/
Serializable invokeMethod(int methodType, Serializable[] parameters)
throws Exception {
String method = MethodConstants.getMethodName(methodType);
this.waitForMethod(method);
/* If we got disconnected while waiting */
if (!this.connected) {
/* throw an Exception */
throw new CommunicationException("Connection closed.");
}
Serializable result = null;
logger.debug("Trying to invoke method " + methodType
+ " with parameters: ");
for (Serializable parameter : parameters) {
logger.debug(parameter);
}
switch (methodType) {
case MethodConstants.FIND_SUCCESSOR: {
Node chordNode = this.node.findSuccessor((ID) parameters[0]);
result = new RemoteNodeInfo(chordNode.getNodeURL(), chordNode.getNodeID());
break;
}
case MethodConstants.GET_NODE_ID: {
result = this.node.getNodeID();
break;
}
case MethodConstants.INSERT_ENTRY: {
this.node.insertEntry((Entry) parameters[0]);
break;
}
case MethodConstants.INSERT_REPLICAS: {
this.node.insertReplicas((Set<Entry>) parameters[0]);
break;
}
case MethodConstants.LEAVES_NETWORK: {
RemoteNodeInfo nodeInfo = (RemoteNodeInfo) parameters[0];
this.node.leavesNetwork(SocketProxy.create(nodeInfo.getNodeURL(),
this.node.getNodeURL(), nodeInfo.getNodeID()));
break;
}
case MethodConstants.NOTIFY: {
RemoteNodeInfo nodeInfo = (RemoteNodeInfo) parameters[0];
List<Node> l = this.node.notify(SocketProxy.create(nodeInfo
.getNodeURL(), this.node.getNodeURL(), nodeInfo.getNodeID()));
List<RemoteNodeInfo> nodeInfos = new LinkedList<RemoteNodeInfo>();
for (Node current : l) {
nodeInfos.add(new RemoteNodeInfo(current.getNodeURL(),
current.getNodeID()));
}
result = (Serializable) nodeInfos;
break;
}
case MethodConstants.NOTIFY_AND_COPY: {
RemoteNodeInfo nodeInfo = (RemoteNodeInfo) parameters[0];
RefsAndEntries refs = this.node.notifyAndCopyEntries(SocketProxy
.create(nodeInfo.getNodeURL(), this.node.getNodeURL(), nodeInfo
.getNodeID()));
List<Node> l = refs.getRefs();
List<RemoteNodeInfo> nodeInfos = new LinkedList<RemoteNodeInfo>();
for (Node current : l) {
nodeInfos.add(new RemoteNodeInfo(current.getNodeURL(),
current.getNodeID()));
}
RemoteRefsAndEntries rRefs = new RemoteRefsAndEntries(refs
.getEntries(), nodeInfos);
result = rRefs;
break;
}
case MethodConstants.PING: {
logger.debug("Invoking ping()");
this.node.ping();
logger.debug("ping() invoked.");
break;
}
case MethodConstants.REMOVE_ENTRY: {
this.node.removeEntry((Entry) parameters[0]);
break;
}
case MethodConstants.REMOVE_REPLICAS: {
this.node.removeReplicas((ID) parameters[0],
(Set<Entry>) parameters[1]);
break;
}
case MethodConstants.RETRIEVE_ENTRIES: {
result = (Serializable) this.node
.retrieveEntries((ID) parameters[0]);
break;
}
default: {
logger.warn("Unknown method requested " + method);
throw new Exception("Unknown method requested " + method);
}
}
logger.debug("Returning result.");
return result;
}
/**
* This method is used to block threads that want to make a method call
* until the method invocation is permitted by the endpoint. Invocation of a
* method depends on the state of the endpoint.
*
* @param method
* The name of the method to invoke. TODO: change this to another
* type.
*/
private void waitForMethod(String method) {
logger
.debug(method
+ " allowed? "
+ !(Collections.binarySearch(
Endpoint.METHODS_ALLOWED_IN_ACCEPT_ENTRIES,
method) >= 0));
synchronized (this.waitingThreads) {
while ((!(this.state == Endpoint.ACCEPT_ENTRIES))
&& (this.connected)
&& ((Collections.binarySearch(
Endpoint.METHODS_ALLOWED_IN_ACCEPT_ENTRIES, method) >= 0))) {
Thread currentThread = Thread.currentThread();
boolean debug = logger.isEnabledFor(DEBUG);
if (debug) {
logger.debug("HERE!!!" + currentThread
+ " waiting for permission to " + "execute "
+ method);
}
this.waitingThreads.add(currentThread);
try {
this.waitingThreads.wait();
} catch (InterruptedException e) {
// do nothing
}
if (debug) {
logger.debug("HERE!!!" + currentThread
+ " has been notified.");
}
this.waitingThreads.remove(currentThread);
}
}
logger.debug("waitForMethod(" + method + ") returns!");
}
/**
* Disconnect this RequestHandler. Forces the socket, which this
* RequestHandler is bound to, to be closed and {@link #run()}to be
* stopped.
*/
public void disconnect() {
logger.info("Disconnecting.");
if (this.connected) {
/* cause the while loop in run() method to be finished */
/* and notify all threads waiting for execution of a method */
synchronized (this.waitingThreads) {
this.connected = false;
this.waitingThreads.notifyAll();
}
/* release reference to node. */
this.node = null;
/* try to close the socket */
try {
synchronized (this.out) {
this.out.close();
this.out = null;
}
} catch (IOException e) {
/* should not occur */
/* if closing of socket fails, that does not matter!??? */
logger.debug("Exception while closing output stream "
+ this.out);
}
try {
this.in.close();
this.in = null;
} catch (IOException e) {
/* should not occur */
/* if closing of socket fails, that does not matter!??? */
logger.debug("Exception while closing input stream" + this.in);
}
try {
logger.info("Closing socket " + this.connection);
this.connection.close();
this.connection = null;
logger.info("Socket closed.");
} catch (IOException e) {
/* should not occur */
/* if closing of socket fails, that does not matter!??? */
logger.debug("Exception while closing socket "
+ this.connection);
}
this.endpoint.deregister(this);
}
logger.debug("Disconnected.");
}
/**
* Test if this RequestHandler is disconnected
*
* @return <code>true</code> if this is still connected to its remote end.
*/
public boolean isConnected() {
return this.connected;
}
public void notify(int newState) {
logger.debug("notify(" + newState + ") called.");
this.state = newState;
/* notify all threads waiting for a state change */
synchronized (this.waitingThreads) {
logger.debug("HERE!!! Notifying waiting threads. "
+ this.waitingThreads);
this.waitingThreads.notifyAll();
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -