📄 symphonynode.java
字号:
int size = closed.size();
NodeHandle current = null;
for (int i = 0; i < size; i++)
{
current = (NodeHandle)closed.get(i);
sendMessage(nodeHandle, current, current, CLOSE_NEIGHBOUR_CONNECT, REFRESH, NeighbourMessagePool.getMessage(neighbourSet.getFarthestNeighbours()));
}
}
/**
* Add all NodeHandle in the <b>c</b> Collection to the neighbour set.
* @param c Collection with neighbours.
* @param adviceForClosing Send a CLOSE_NEIGHBOUR message to the removed neighbours
* @return true if any neighbour has been added. false in other case.
*/
public boolean addToNeighbourSet(java.util.Collection c, boolean adviceForClosing) {
if (neighbourSet.addAll(c)) {
modifiedNeighbours = true;
updateOutcommingList();
/*
* We must check if incomming or outcomming connections have neighbours
* because QUERY_CONNECT messages may have been received and accepted
* before SET_INFO messages have been arrived.
*/
closeStaleLongConnections(incommingSet);
closeStaleLongConnections(outcommingSet);
if (adviceForClosing)
sendClosedNeighbour(neighbourSet.getRemovedNeighbours());
fixedNeighbours = false; //require more time to afirm this node is stabilized
return true;
}
fixedNeighbours = (getNeighbourSet().size() - 1) == SymphonyNode.getSuccessorsNumber() * 2;
return false;
}
/**
* Add the <b>neighbour</B> to hte neighbour set.
* @param neighbour Neighbour NodeHandle to be added.
*/
public void addToNeighbourSet(NodeHandle neighbour) {
if (neighbourSet.add(neighbour))
{
modifiedNeighbours = true;
updateOutcommingList();
}
}
/**
* Gets the neightbour set.
* @return The neighbour set.
*/
public Collection getNeighbourSet() {
return neighbourSet.getSortedSet();
}
/**
* Gets the farthest neighbours.
* @return The farthest neighbours.
* @see planet.symphony.SortedKList#getFarthestNeighbours()
*/
public Collection getFarthestNeighbours()
{
return neighbourSet.getFarthestNeighbours();
}
/**
* Test if the NodeHandle <b>o</b> is in the neighbour set.
* @param o The NodeHandle to be test.
* @return true if <b>o</b> is in the set.
*/
public boolean neighbourSetContains(NodeHandle o) {
return neighbourSet.contains(o);
}
/**
* Removes the <b>o</b> from the neighbourSet
* @param o The NodeHandle to be removed.
* @return true if it is removed or false in other case.
*/
public boolean removeNeighbour(NodeHandle o) {
return neighbourSet.remove(o);
}
/**
* Returns the NodeHandle of node successor.
* @return The NodeHandle of node successor.
* @see planet.commonapi.Node#getSucc()
*/
public NodeHandle getSucc() {
NodeHandle succ = (NodeHandle) neighbourSet.getFirstSucc();
if (succ == null) return nodeHandle;
return succ;
}
/**
* Returns the NodeHandle of node predecessor.
* @return The NodeHandle of node predecessor.
* @see planet.commonapi.Node#getPred()
*/
public NodeHandle getPred() {
NodeHandle pred = (NodeHandle) neighbourSet.getFirstPred();
if (pred == null) return nodeHandle;
return pred;
}
/**
* Get the outcomming set of long distance connections.
* @return the outcomming set of long distance connections.
*/
public List getOutcommingSet() {
return this.outcommingSet;
}
/**
* Get the incomming set of long distance connections.
* @return the incomming set of long distance connections.
*/
public List getIncommingSet() {
return this.incommingSet;
}
/**
* This node joins to the overlay network by the bootstrap node with NodeHandle <b>bootstrap</b>.
* @param bootstrap NodeHandle of the bootstrap node.
* @see planet.commonapi.Node#join(planet.commonapi.NodeHandle)
*/
public void join(NodeHandle bootstrap) {
incommingSet.clear();
outcommingSet.clear();
if (id.equals(bootstrap.getId())) {
n = 1;
fixedNeighbours = true;
} else {
n = 0;
sendMessage(nodeHandle, bootstrap, bootstrap, QUERY_JOIN, REFRESH, new JoinMessage(nodeHandle, bootstrap));
fixedNeighbours = false;
}
}
/**
* Leaves the node from the overlay network.
*
* @see planet.commonapi.Node#leave()
*/
public void leave() {
Iterator[] c = {
incommingSet.iterator(),
outcommingSet.iterator(),
neighbourSet.getNeighbourSet().iterator(),
};
int[] msgtype = {
CLOSE_LONG_CONNECT,
CLOSE_LONG_CONNECT,
CLOSE_NEIGHBOUR_CONNECT,
};
for(int i = 0; i < c.length; i++) {
Iterator it = c[i];
while (it.hasNext()) {
NodeHandle current = (NodeHandle) it.next();
sendMessage(nodeHandle, current, current, msgtype[i], REFRESH, null);
}
}
this.fixedNeighbours = true;
this.alive = false;
}
/**
* Returns a RouteMessage to be sent to acquire a new long distance.
* @return a RouteMessage to be sent to acquire a new long distance.
*/
public RouteMessage getNewLongDistance() {
if (requestedNewLongDistance) return null;
requestedNewLongDistance = true; //this flag ensures only one QUERY_CONNECT per step
int retries = 10;
while (true) {
SymphonyId symId = (SymphonyId) id;
double x = symId.getDoubleValue() + Math.exp(Math.log((double) n) * (r.nextDouble() - 1.0));
int i = (int) x;
x -= i;
Id xid = new SymphonyId().setValues(x);
NodeHandle succ = getSucc();
NodeHandle pred = getPred();
if (!xid.between(pred.getId(), id) && !xid.between(id, succ.getId()))
try {
NodeHandle xidNH = GenericFactory.buildNodeHandle(xid, true);
return buildMessage(nodeHandle, xidNH, route(xidNH), QUERY_CONNECT, REFRESH, null);
} catch(InitializationException e) {
Logger.log("[" + id + "] cannot build a NodeHandle for a RouteMessage", Logger.ERROR_LOG);
}
else if (retries > 1) retries--;
else {
//on this case, cannot find a long distance link
return null;
}
}
}
/**
* Evaluates if its network context has changed. In afirmative case, a
* new long distance query is sent.
*/
private void updateOutcommingList() {
double nnew = estimation();
boolean recalc = true;
if ( n!=0 )
{
double ratio = (double) nnew / (double) n;
if ( (ratio >= 0.5) && (ratio <= 2) ) {
recalc = false;
}
}
if (recalc) {
n = nnew;
retriesNewLongDistance = 0;
RouteMessage newLink = getNewLongDistance();
if (newLink == null) return;
sendMessage(newLink);
}
}
/**
* Makes the network size estimation.
* @return The estimated network size.
*/
private double estimation() {
double length = 0.0;
int cnt = 0;
double idDouble = ((SymphonyId) id).getDoubleValue();
double firstPred;
double temp;
estimationNH = (NodeHandle)neighbourSet.getFirstSucc();
if (estimationNH != null)
{
cnt++;
temp = ((SymphonyId)estimationNH.getId()).getDoubleValue();
temp = temp - idDouble;
length += (temp<0.0)?temp+1.0:temp;
}
estimationNH = (NodeHandle)neighbourSet.getFirstPred();
if (estimationNH != null)
{
firstPred = ((SymphonyId)estimationNH.getId()).getDoubleValue();
cnt++;
temp = idDouble - firstPred;
length += (temp<0.0)?temp+1.0:temp;
estimationNH = (NodeHandle)neighbourSet.getSecondPred();
if (estimationNH != null)
{
cnt++;
temp = ((SymphonyId)estimationNH.getId()).getDoubleValue();
temp = firstPred - temp;
length += (temp<0.0)? temp + 1.0 : temp;
}
}
if (cnt == 0) return 1.0;
else return ((double) cnt) / length;
}
/**
* Dispatch any incomming RouteMessage and update the local state or
* is resent to other node.
* @param msg Incomming RouteMessage to be treat.
*/
public void dispatcher(RouteMessage msg) {
/* Broadcast still unavailable */
NodeHandle src = msg.getSource();
NodeHandle dest = msg.getDestination();
switch(msg.getMode()) {
// Mode = REQUEST, Mode = REQUEST
case REQUEST:
case REFRESH:
Logger.log("Uncatched message Type [" + msg.getType() + "]", Logger.ERROR_LOG);
GenericFactory.freeMessage(msg);
break;
// Mode = ERROR
case Globals.ERROR:
switch(msg.getType()) {
case QUERY_CONNECT:
RouteMessage newLink = getNewLongDistance();
if (newLink == null) return;
else sendMessage(newLink);
break;
case ACCEPT_CONNECT:
NodeHandle srcHandle = msg.getSource();
incommingSet.remove(srcHandle);
GenericFactory.freeMessage(msg);
break;
default:
Logger.log("Uncatched message Mode [" + msg.getMode() + "]", Logger.ERROR_LOG);
}
break;
default: Logger.log("Uncatched message Mode [" + msg.getMode() + "]", Logger.ERROR_LOG);
}
}
/**
* This method is invoked to each simulation step to process the internal data,
* and all incomming RouteMessage.
* @param actualStep The current simulation step.
* @return true if the node is stabilized or false in other case.
* @see planet.commonapi.Node#process(int)
*/
public boolean process(int actualStep) {
requestedNewLongDistance = false; //permits only one QUERY_CONNECT by step
super.process(actualStep);
while (hasMoreMessages()) {
//planet.results.LinkStateResults.updateIncoming(this.id);
RouteMessage msg = nextMessage();
try {
behPool.onMessage(msg, this);
GenericFactory.freeMessage(msg);
} catch (planet.commonapi.behaviours.exception.NoSuchBehaviourException e) {
dispatcher(msg);
} catch (planet.commonapi.behaviours.exception.NoBehaviourDispatchedException d) {
dispatcher(msg);
}
}
if (fixedNeighbours)
{
if (statisticStabilizationSteps>0)
statisticStabilizationSteps--;
statisticStabilized = statisticStabilizationSteps==0;
} else
{
statisticStabilized = false;
statisticStabilizationSteps = SymphonyNode.getSuccessorsNumber()*24;
}
invokeByStepToAllApplications();
return !isStabilized();
}
/**
* The isLocalMessage's method is an extension method for commonapi specs. This method
* is used to allow BehavioursPool decide wether the incoming RouteMessage is for the
* local node or for a remote node. Remenber, this decision may only be addressed by
* the underlying overlay protocol. For example, for Symphony's Lookup protocol a node
* is responsible for all RouteMessages whose keys have as an immediate succesor the
* node's id or have as destination the own node.
* @see planet.commonapi.behaviours.BehavioursPool
* @return True if the incoming RouteMessage taken as input its for the local node.
*/
public boolean isLocalMessage(RouteMessage msg) {
NodeHandle recipient = msg.getDestination();
return recipient.equals(this.nodeHandle)
|| (getPred() != null && recipient.getId().betweenE(getPred().getId(), this.id));
}
/**
* This method returns the internal data in a Hashtable, where the key
* is a String with the field name, and the value, the related object.
* @return A Hashtable with all internal information.
* @see planet.generic.commonapi.NodeImpl#getInfo()
*/
public Hashtable getInfo() {
Hashtable info = new Hashtable();
info.put("neighbourSet", neighbourSet.getNeighbourSet());
info.put("estimation", new Double(n));
return info;
}
/**
* @param appId String that includes the application identification.
* @param to Node that must receive this message. If cannot represents
* a real node.
* @param nextHop Node that must receive this message as first hop.
* @param msg Message to be sent.
* If it is null, the message must be routed.
*/
public void routeData(String appId,
NodeHandle to, NodeHandle nextHop, Message msg) {
NodeHandle hop = nextHop;
if (hop == null) {
hop = route(to);
if (hop == null) Logger.log("route return null!", Logger.ERROR_LOG);
}
RouteMessage temp = buildMessage(nodeHandle,to,hop,DATA,REQUEST,msg,GenericFactory.generateKey());
temp.setApplicationId(appId);
sendData(temp);
}
/**
* Shows in the standard out all internal routing information.
* @see planet.commonapi.Node#printNode()
*/
public void printNode() {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -