📄 replicationmeednode.java
字号:
if (msg.getSourceNode() == parent) { Iterator<Contact> it = parent.getContacts(); boolean cntLeft = false; while (it.hasNext()) { Node n = it.next().getDest(); if (!set.contains(n)) { cntLeft = true; break; } } // Message which was just sent has all nodes which we are // connected to on its forbidden list. We don't have to // keep this message. if (!cntLeft) { repParent.freeCapacity(msg); repParent.mIDs.remove(msg.getId()); return false; } } // We create copy of sent message ProtocolStackMessage m = new ProtocolStackMessage(msg); m.createForbiddenNodes(); m.forbiddenNodes().addAll(msg.forbiddenNodes()); m.forbiddenNodes().add(ct.getDest()); m.setProtocolID(PROTO_RMEED_ID); repParent.storeInRepBuffer(m); // And we don't change repSize or primaryQueueOccupied return false; } // It is MEED message. if (sizeSent < msg.getLength()) { return false; } // It was delivered to the final destination. Not much more to do... if (ct.getDest() == msg.getDestNode()) { return false; } ProtocolStackMessage m; // We check if we have replica of this message (it is // possible!). // If yes, we just add forbidden node, and reset replica counter // to // 0. m = repParent.mIDs.get(msg.getId()); if (m != null) { TreeSet<Node> set = m.forbiddenNodes(); if (set == null) { m.createForbiddenNodes(); set = m.forbiddenNodes(); } set.add(ct.getDest()); Double exp = (Double) m.getData(messageExpiryTime); Double pred = (Double) msg.getData(MSG_PRED_DELAY); assert pred != null; double nExp = repParent.expiryTime(pred); if (nExp > exp) { m.storeData(messageExpiryTime, new Double(nExp)); } return false; } if (msg.getSourceNode() != parent && !repParent.replicateOnAlternPath && !repParent.replicateOnBacktracking) { return false; } // We create a copy, mark it is a replica, mark the Node that // just received the message as "forbidden" and add it into // replicaBuffer. m = new ProtocolStackMessage(msg); m.createForbiddenNodes(); m.forbiddenNodes().add(ct.getDest()); m.setProtocolID(PROTO_RMEED_ID); boolean doReplica = false; // We are the source of this message, and we want to create // replicas! if (msg.getSourceNode() == parent && repParent.replicateInSource) { doReplica = true; } if (!doReplica && repParent.replicateOnBacktracking) { Node lastN = (Node) msg.getData(MSG_LAST_NODE); // This Message went back! if (lastN == ct.getDest()) doReplica = true; } if (!doReplica && repParent.replicateOnAlternPath) { MEEDDijkstra routingR, routingOrg; routingOrg = parent.topologyAgent.routingCache(msg); if (routingOrg == null) { return false; } ArrayList<Contact> route = routingOrg.routeTo(m.getDestNode()); if (route == null) { return false; } TreeSet<Node> set = new TreeSet<Node>(); Iterator<Contact> it = route.iterator(); while (it.hasNext()) { Node nxt = it.next().getDest(); if (nxt != m.getDestNode()) set.add(nxt); } if (set.size() < 1) { return false; } routingR = parent.topologyAgent.getTreeCache(set); if (routingR == null) { return false; } double costR, costO; costR = routingR.getCost(m.getDestNode()); costO = routingOrg.getCost(msg.getDestNode()); if (costR >= Double.MAX_VALUE || costO >= Double.MAX_VALUE || (costR - costO) >= costO * repParent.orgAltRepRatio) { return false; } if (parent.getNetwork().vShouldLog(Verbose.NOTIFY)) parent.getNetwork().vprint( "Creating replica of " + msg + "; Org path: " + routingOrg.routeTo(m.getDestNode()) + " [" + costO + "]; Alternative: " + routingR.routeTo(m.getDestNode()) + "[" + costR + "]"); doReplica = true; } if (!doReplica) { return false; } // Not enough buffer space if (!parent.canAllocateCapacity(m, handlerID)) { return false; } Double pred = (Double) msg.getData(MSG_PRED_DELAY); assert pred != null; m.storeData(messageExpiryTime, new Double(repParent.expiryTime(pred.doubleValue()))); parent.allocateCapacity(m, handlerID); repParent.storeInRepBuffer(m); return false; } /** Should we accept the specified message for routing? */ public boolean acceptMessage(ProtocolStackMessage msg, Contact source, int size) { if (msg.isRoutingMessage() || size < msg.getLength()) { // Routing or incomplete message - we don't care. return false; } if (msg.getProtocolID() != PROTO_RMEED_ID) { // It is not replica of the message. return false; } assert msg.getDestNode() != parent : "acceptMessage called for data message for this node in " + parent; // If we are using any method which can produce multiple replicas // of the same message: if (repParent.multipleSourceReplicas || repParent.replicateOnAlternPath || repParent.replicateOnBacktracking) { // We should check if we don't have this message already in the // replicas buffer. We don't have to do this for first-copies - // they have higher priority anyway, and will be dealed with in // messageSent. ProtocolStackMessage m = repParent.mIDs.get(msg.getId()); if (m != null) { m.createForbiddenNodes(); m.forbiddenNodes().addAll(msg.forbiddenNodes()); return false; } } // We don't have this message yet! // We have to check buffer space. if (!parent.canAllocateCapacity(msg, handlerID)) { return false; } parent.allocateCapacity(msg, handlerID); repParent.storeInRepBuffer(msg); // Notify the meed routing agent that a new message is available // It will handle sending new updates out to other nodes repParent.finalRepDestinationAgent.notifyNewMessage(msg); repParent.routingRepAgent.notifyNewMessage(msg); return false; } public int canAllocate(ProtocolStackMessage forMsg, int hID) { ListIterator<ProtocolStackMessage> it = repParent.replicationBuffer .listIterator(repParent.replicationBuffer.size()); int canFree = 0; double f; if (hID == handlerID) { f = forMsg.getCreationTime(); } else { // Protocol with higher priority - we will remove all the // messages if we have to! f = Double.MAX_VALUE; } // We have to do this twice. There is possibility, that we can't // remove a replica - it is counted in repSize, but it is not in our // buffer because it's beeing sent now. // Also, we remove only messages with greater counter value! Double exp; double curTime = parent.getNetwork().getCurrentTime(); while (it.hasPrevious()) { ProtocolStackMessage m = it.previous(); if (m.getCreationTime() < f) { canFree += m.getLength(); } else { exp = (Double) m.getData(messageExpiryTime); if (exp != null && exp.doubleValue() < curTime) { canFree += m.getLength(); } else { break; } } } return canFree; } public int allocate(ProtocolStackMessage forMsg, int hID) { double f; if (hID == handlerID) { // The same protocol f = forMsg.getCreationTime(); } else { // Protocol with higher priority - we will remove all the // messages if we have to! f = Double.MAX_VALUE; } ListIterator<ProtocolStackMessage> it = repParent.replicationBuffer .listIterator(repParent.replicationBuffer.size()); int toFree, canFree = 0; toFree = forMsg.getLength() - parent.availableCapacity(); Double exp; while (canFree < toFree && it.hasPrevious()) { ProtocolStackMessage m = it.previous(); exp = (Double) m.getData(messageExpiryTime); if (exp != null && exp.doubleValue() < parent.getNetwork().getCurrentTime()) { if (parent.getNetwork().vShouldLog(Verbose.NOTIFY)) parent.getNetwork().vprint("DROPPED replica of " + m + " - it has expired at: " + exp); canFree += m.getLength(); repParent.mIDs.remove(m.getId()); it.remove(); } else if (m.getCreationTime() < f) { if (parent.getNetwork().vShouldLog(Verbose.NOTIFY)) parent.getNetwork().vprint("DROPPED replica of " + m + " to free buffer space for " + forMsg); canFree += m.getLength(); repParent.mIDs.remove(m.getId()); it.remove(); } else { assert false : "canAllocate should be called first to check if it is possible!"; } } return canFree; } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -