📄 chordnode.java
字号:
protected NodeHandle closestPrecedingFinger(Id id) {
cpfFound = false;
temp[2] = this.nodeHandle; //n_id
for (int i = bitsPerKey - 1; (i > -1 && !cpfFound); i--) {
if (finger[i] != null && finger[i].getId().betweenE(this.id, id)) {
temp[2] = finger[i];
cpfFound = true;
}
}
return temp[2];
}
/**
* Finds the successor of a node. If the result if null because the
* information its not contain in the finger table, it generate a request
* message and the the content response message has been store in the
* finger table
*
* @param pos
* position of NodeHandle in the start table
* @return successor NodeHandle of the node successor
*/
protected NodeHandle findSuccessor(int pos) {
if (predecessor != null && start[pos].betweenE(predecessor.getId(), this.id)) {
return this.nodeHandle;
} else {
return findPredecessor(pos);
}
}
/**
* Finds the predecessor of a node. If the result if null because the
* information its not contain in the finger table, it generate a request
* message and the the content response message has been store in the
* finger table
*
* @param pos
* position of NodeHandle in the start table
* @return successor NodeHandle of the node predecessor
*/
protected NodeHandle findPredecessor(int pos) {
if (start[pos].equals(this.id)) {
return this.nodeHandle;
} else if (finger[0] != null && start[pos].betweenE(this.id, finger[0].getId())) {
return finger[0];
} else {
temp[1] = closestPrecedingFinger(start[pos]);
String key = GenericFactory.generateKey();
sendMessage(key,nodeHandle,temp[1],FIND_PRE,REQUEST,new IdMessage(start[pos]));
addMessageListener(key, new FindPredListener(this, pos));
}
return finger[pos];
}
/**
* The node joins in the network
*
* @param bootstrap
* Id of arbitrary node in the network
*/
public void join(NodeHandle bootstrap) {
if (bootstrap.equals(nodeHandle)) {
for (int i = 0; i < bitsPerKey; i++) {
finger[i] = this.nodeHandle;
}
predecessor = this.nodeHandle;
} else {
String key = GenericFactory.generateKey();
predecessor = null;
NodeHandle boots = null;
sendMessage(key,nodeHandle,bootstrap, FIND_SUCC,REQUEST,null);
addMessageListener(key, new FindSuccListener(this));
}
}
/**
* The node leaves the network
*/
public void leave() {
hasLeaved = true;
sendMessage(null,nodeHandle,predecessor,SET_SUCC,REFRESH, new NodeMessage(finger[0]));
sendMessage(null,nodeHandle,finger[0],SET_PRE,REFRESH,new NodeMessage(predecessor));
Logger.log("LEAVING NODE " + id, Logger.EVENT_LOG);
}
/**
* Verify node immediate successor and tell the successor about this,
* moreover generate and update the successor list.
*/
protected void stabilize() {
RouteMessage aMsg = null;
if (hasReceivedSucc)
{
hasReceivedSucc = false;
String key = GenericFactory.generateKey();
sendMessage(key,nodeHandle,finger[0],GET_PRE,REQUEST,null);
addMessageListener(key, new GetPreListener(this));
}
//if the links are been correct then sends the request to successor
if (finger[0] != null && !finger[0].equals(this.nodeHandle)
&& predecessor != null && !finger[0].equals(predecessor)) {
sendMessage(null,nodeHandle,finger[0],SUCC_LIST,REQUEST,null);
}
}
/**
* Verify if the indicated node is the true predecessor
*
* @param nh
* node to check
*/
protected void notify(NodeHandle nh) {
if (predecessor == null || nh.getId().between(predecessor.getId(), this.id)) {
predecessor = nh;
}
}
/**
* This method allows to update a finger table position.
*/
protected void fixFingers() {
if (nullPointers == 0) {
nullPointers++;
}
setFinger(nullPointers, findSuccessor(nullPointers));
nullPointers = (nullPointers + 1) % bitsPerKey;
}
/**
* Send a message to destination node directly.
*
* @param message
* Message to deliver from application.
* @param hint Proposed node handle for the next hop.
* @param mode Message mode (REQUEST or REFRESH)
*/
protected void sendData(RouteMessage message, NodeHandle hint, int mode) {
message.setNextHopHandle(hint);
sendMessage(message);
Results.incTraffic();
}
/**
* Send a message to unknown destination node via routing.
*
* @param message
* Message to deliver from application.
* @param nextHop Proposed next hop.
*/
protected void routingData(RouteMessage message, NodeHandle nextHop) {
//enrutar missatge via key del missatge (el idMessage)
if (predecessor != null && nextHop.getId().betweenE(predecessor.getId(), this.id)) {
//deliver
EndPoint endpoint = (EndPoint) endpoints.get(message
.getApplicationId());
endpoint.scheduleMessage(message, 0);
} else if (finger[0] != null && nextHop.getId().betweenE(this.id, finger[0].getId())) {
sendData(message, finger[0],REFRESH);
} else {
sendData(message,closestPrecedingFinger(nextHop.getId()),REQUEST);
Results.updateHopsMsg(message.getSource().getId(), message.getKey());
}
}
/**
* Sends a RouteMessage to the ring. If nextHop is null, it shows that the
* destination node is unknown and the message will be routed. If nextHop
* is not null, it's the destination NodeHandle and it must be reached
* directly.
* @param appId The applicationId The application identifier of the incoming
* application message.
* @param to The destination NodeHandle.
* @param nextHop The next hop NodeHandle if it is known.
* @param msg The incoming message from above layers.
*/
public void routeData(String appId,NodeHandle to, NodeHandle nextHop, Message msg) {
RouteMessage toSend = null;
try {
toSend = getDataMessage(appId,nodeHandle,to,nextHop,msg);
if (nextHop != null) this.sendData(toSend, nextHop,REFRESH);
else this.routingData(toSend, to);
} catch (InitializationException e) {
Logger.log("[" + id + "] cannot build a RouteMessage for this data [" + msg + "]",Logger.ERROR_LOG);
}
}
/**
* Initiating a Broadcast Message
*
* @param appId Application which sends this broadcast.
* @param to Destination of the broadcast.
* @param nextHop Next hop.
* @param msg Message to be sent into the broadcast.
*/
public void broadcast(String appId,
NodeHandle to, NodeHandle nextHop, Message msg) {
NodeHandle limit;
NodeHandle r;
for (int i = 0; i < bitsPerKey - 1; i++) {
//Skip a redundant finger
if (!finger[i].equals(finger[i + 1])) {
r = finger[i];
limit = finger[i + 1];
RouteMessage aMsg = null;
try {
//String appId,Id from, Id to, Id nextHop
aMsg = getBroadcastMessage(appId,
this.nodeHandle, r, r,new BroadcastMessage(msg, limit));
sendMessage(aMsg);
Results.incTraffic();
} catch (InitializationException e) {
Logger.log("Cannot build a new instance of RouteMessage for BroadcastMessage",
Logger.ERROR_LOG);
}
}
}
//Process the last finger
RouteMessage aMsg = null;
try {
//String appId,Id from, Id to, Id nextHop
aMsg = getBroadcastMessage(appId, this.nodeHandle,
finger[bitsPerKey - 1],
finger[bitsPerKey - 1],
new BroadcastMessage(msg, nodeHandle));
//I'm the limit of the last finger
sendMessage(aMsg);
Results.incTraffic();
} catch (InitializationException e) {
Logger.log("ERROR: Cannot get a RouteMessage of MessagePool\n"
+ e.getMessage(), Logger.ERROR_LOG);
}
}
/**
* Treats the messages and according to the case, executes the generic
* listeners or listeners specialized, forward the messages or send
* responses messages
*
* @param msg
* IMessage to treat
*
*/
public void dispatcher(RouteMessage msg) {
//Response message but not successor list type (key == null)
if (msg.getMode() == REPLY && msg.getKey() != null) {
String key = msg.getKey();
try {
((MessageListener) listeners.get(key)).onMessage(msg);
} catch (NullPointerException e) {
Logger.log("I'm [" + id + "]; not exist listener for key ["
+ msg.getKey() + "]", Logger.ERROR_LOG);
}
removeMessageListener(key);
} else if (msg.getMode() == Globals.ERROR) {
if (msg.getKey() != null) {
String key_fp = msg.getKey();
Logger.log("Node " + this.id + " destroy message key " + key_fp
+ " type " + Globals.typeToString(msg.getType()) + " content "
+ msg.getMessage(), Logger.MSG_LOG);
MessageListener lst = (MessageListener) listeners.get(key_fp);
if (lst != null) {
removeMessageListener(key_fp);
}
}
//Successor lost
if (finger[0] != null && msg.getSource().equals(finger[0])
&& succList.size() > 0) {
//if not exists, succ_list is unchanged
succList.remove(finger[0]);
if (succList.size() > 0) {
finger[0] = (NodeHandle) succList.firstElement();
//send notify
this.sendMessage(msg,null,nodeHandle,finger[0],finger[0],SET_PRE,REFRESH,new NodeMessage(nodeHandle));
}
} else
//if source not exists, succ_list is unchanged
succList.remove(msg.getSource());
} else {
switch (msg.getType()) {
//DATA
case DATA :
dispatchDataMessage(msg,REQUEST,REFRESH);
break;
//CONTROL
//REFRESH
case SET_SUCC :
NodeHandle succ = ((NodeMessage) msg.getMessage()).getNode();
setSucc(succ);
GenericFactory.freeMessage(msg);
break;
case SET_PRE :
setPred(((NodeMessage) msg.getMessage()).getNode());
GenericFactory.freeMessage(msg);
break;
case NOTIFY :
notify(msg.getSource());
GenericFactory.freeMessage(msg);
break;
case BROADCAST :
NodeHandle r, new_limit;
BroadcastMessage bm = (BroadcastMessage) msg.getMessage();
NodeHandle limit = bm.getLimit();
Id limitId = limit.getId();
planet.commonapi.Message info = bm.getInfo();
for (int i = 0; i < bitsPerKey - 1; i++) {
//Skip a redundant finger
if (!finger[i].equals(finger[i + 1])) {
//Forward while within "Limit"
if (finger[i].getId().between(this.id, limitId)) {
r = finger[i];
//New Limit must not exceed Limit
if (finger[i + 1].getId().between(this.id, limitId)) {
new_limit = finger[i + 1];
} else {
new_limit = limit;
}
//no reuse of RouteMessage msg ==> send one
// message to different nodes ==> requires
// different messages
planet.commonapi.RouteMessage aMsg = null;
try {
// String appId,Id from, Id to, Id nextHop
aMsg = getBroadcastMessage(msg
.getApplicationId(), this.nodeHandle, r, r,
new BroadcastMessage(info,new_limit));
sendMessage(aMsg);
Results.incTraffic();
} catch (InitializationException e) {
Logger.log(
"ERROR: Cannot get a RouteMessage of MessagePool\n"
+ e.getMessage(), Logger.ERROR_LOG);
}
}
}
}
Logger.log("Broadcast : Node " + this.id + " info : "
+ info, Logger.EVENT_LOG);
msg.setMessage(info);
Results.decTraffic();
((EndPoint) endpoints.get(msg.getApplicationId())).scheduleMessage(msg, 0);
break;
//REQUEST
case FIND_SUCC :
//source --> join();
NodeHandle fSucc = findSuccessor(msg.getSource());
if (fSucc != null) {
this.sendMessage(msg,msg.getKey(),nodeHandle,msg.getSource(),msg.getSource(),msg.getType(),REPLY,new NodeMessage(fSucc));
} else {
String key_fp = GenericFactory.generateKey();
NodeHandle aux = closestPrecedingFinger(msg.getSource().getId());
addMessageListener(key_fp, new FindPredListener(this, msg.getKey()));
this.sendMessage(msg,key_fp,nodeHandle,aux,aux,FIND_PRE,REQUEST,new IdMessage(msg.getSource().getId()));
}
break;
case FIND_PRE :
Id idMesg = ((IdMessage) msg.getMessage()).getNode();
if (finger[0] != null && idMesg.betweenE(this.id, finger[0].getId())) {
//return successor
Id msgId = ((IdMessage) msg.getMessage()).getNode();
try {
sendMessage(msg,msg.getKey(),GenericFactory.buildNodeHandle(msgId,true),
msg.getSource(),msg.getSource(),FIND_PRE,REPLY,new NodeMessage(getSucc()));
} catch (InitializationException e1) {
e1.printStackTrace();
}
} else if (idMesg.equals(getId())) {
try {
sendMessage(msg,msg.getKey(),GenericFactory.buildNodeHandle(idMesg, true),
msg.getSource(),msg.getSource(),FIND_PRE,REPLY,new NodeMessage(nodeHandle));
} catch(InitializationException e) {
e.printStackTrace();
}
} else {
//next node
NodeHandle aux = closestPrecedingFinger(idMesg);
sendMessage(msg,msg.getKey(),msg.getSource(),aux,aux,msg.getType(),msg.getMode(),msg.getMessage());
}
break;
case GET_PRE :
//origen --> stabilize();
sendMessage(msg,msg.getKey(),nodeHandle,msg.getSource(),msg.getSource(),GET_PRE,REPLY,new NodeMessage(predecessor));
break;
case SUCC_LIST :
if (msg.getMode() == REQUEST) {
this.sendMessage(msg,msg.getKey(),nodeHandle,msg.getSource(),msg.getSource(),SUCC_LIST,REPLY,new SuccListMessage(succList));
} else if (msg.getMode() == REPLY) {
SuccListMessage succs = (SuccListMessage) msg.getMessage();
succList.clear();
succList.add(msg.getSource());
succList.addAll(succs.getSuccs());
cleanSuccList();
GenericFactory.freeMessage(msg);
}
break;
}
}
}
/**
* Given a time fraction, treats the messages and executes the periodicas
* actions, as for example, the stabilization methods
* @param actualStep Actual simulation step.
* @return true if the node needs to continue the stabilization
* process. false if node is just stabilized.
*/
public boolean process(int actualStep) {
clearFingerChanges();
super.process(actualStep);
while (hasMoreMessages()) {
dispatcher(nextMessage());
}
if (getFingerChanges()>0) stabRate = 0;
else stabRate++;
invokeByStepToAllApplications();
return stabRate < realStabilizationRate;
}
/**
* @see planet.commonapi.Node#isAlive() Test if Node is alive
*/
public boolean isAlive() {
return !hasFailed && !hasLeaved;
}
public String toString() {
return "{ChordNode [" + this.id + "]: succ[" + finger[0] + "]: succ2f["
+ finger[1] + "]: succList[" + succList + "]: pred["
+ predecessor + "]}";
}
/**
* Return the local NodeHandle.
*
* @see planet.commonapi.Node#getLocalHandle()
* @return Actual NodeHandle for this Node.
*/
public NodeHandle getLocalHandle() {
return nodeHandle;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -