📄 .#replicationmeednode.java.1.16
字号:
public Dijkstra<Node, Contact, ProtocolStackMessage> routing = null; public HashMap<Node, TreeNode> down = null; } } public static class TopologyRepRoutingEventHandler extends TopologyRoutingEventHandler { private static int repID = getHandlerID(); protected ReplicationMEEDNode repParent = null; public TopologyRepRoutingEventHandler(ReplicationMEEDNode node, MEEDTopologyEventHandler topology, FinalDestinationEventHandler finalA, Collection<ProtocolStackMessage> buffer) { super(repID, node, topology, finalA, buffer); repParent = node; } public void removeMessageCopies(Integer id) { ProtocolStackMessage m = repParent.mIDs.get(id); if (m != null) { repParent.mIDs.remove(id); buffer.remove(m); if (parent.getNetwork().vShouldLog(Verbose.INFO)) parent.getNetwork().vprint("Removing RMEED copy of ACKed message " + m); parent.freeCapacity(m); } } public boolean contactIdle(Contact ct) { if (this.parent.getNetwork().vShouldLog(Verbose.DEBUG1)) this.parent.getNetwork().vprint("MEED " + this.parent + " RMEEDRouting idleContact: " + ct); // The MEED topology agent should prevent us from getting // idleContact if it is busy getting updates assert !topology.isGettingUpdates(); // If we are not busy getting updates from anyone, and we have not // yet finished with this destination, // place an iterator if (!finishedWithDestinations.contains(ct)) { Iterator<ProtocolStackMessage> i = buffer.iterator(); while (i.hasNext()) { ProtocolStackMessage m = i.next(); if (m.getSourceNode() == parent && !m.forbiddenNodes().contains(ct.getDest())) { ct.sendMessage(m); i.remove(); assert !ct.isIdle(); return true; } } } return super.contactIdle(ct); } public boolean messageSent(ProtocolStackMessage msg, Contact ct, int sizeSent) { if (msg.isRoutingMessage()) { // This agent doesn't handle routing messages return false; } // It is replicated message. if (msg.getProtocolID() == PROTO_RMEED_ID) { // It was not really sent - we should re-add it to the buffer. if (sizeSent < msg.getLength()) { repParent.storeInRepBuffer(msg); return false; } // It was delivered to the final destination. We should remove // its size (it was already removed from replicaBuffer when it // was sent) if (repParent.oneReplica || ct.getDest() == msg.getDestNode()) { repParent.freeCapacity(msg); repParent.mIDs.remove(msg.getId()); return false; } Double exp = (Double) msg.getData(messageExpiryTime); if (exp.doubleValue() < parent.getNetwork().getCurrentTime()) { repParent.freeCapacity(msg); repParent.mIDs.remove(msg.getId()); return false; } // Only source replicates messages if (msg.getSourceNode() != parent) { repParent.freeCapacity(msg); repParent.mIDs.remove(msg.getId()); return false; } TreeSet<Node> set = msg.forbiddenNodes(); if (set == null) { msg.createForbiddenNodes(); set = msg.forbiddenNodes(); } if (msg.getSourceNode() == parent) { Iterator<Contact> it = parent.getContacts(); boolean cntLeft = false; while (it.hasNext()) { Node n = it.next().getDest(); if (!set.contains(n)) { cntLeft = true; break; } } // Message which was just sent has all nodes which we are // connected to on its forbidden list. We don't have to // keep this message. if (!cntLeft) { repParent.freeCapacity(msg); repParent.mIDs.remove(msg.getId()); return false; } } // We create copy of sent message ProtocolStackMessage m = new ProtocolStackMessage(msg); m.createForbiddenNodes(); m.forbiddenNodes().addAll(msg.forbiddenNodes()); m.forbiddenNodes().add(ct.getDest()); m.setProtocolID(PROTO_RMEED_ID); repParent.storeInRepBuffer(m); // And we don't change repSize or primaryQueueOccupied return false; } // It is MEED message. if (sizeSent < msg.getLength()) { return false; } // It was delivered to the final destination. Not much more to do... if (ct.getDest() == msg.getDestNode()) { return false; } // Only source replicates messages if (msg.getSourceNode() != parent) { return false; } ProtocolStackMessage m; // We check if we have replica of this message (it is // possible!). // If yes, we just add forbidden node, and reset replica counter // to // 0. m = repParent.mIDs.get(msg.getId()); if (m != null) { TreeSet<Node> set = m.forbiddenNodes(); if (set == null) { m.createForbiddenNodes(); set = m.forbiddenNodes(); } set.add(ct.getDest()); Double exp = (Double) m.getData(messageExpiryTime); Double pred = (Double) msg.getData(MSG_PRED_DELAY); assert pred != null; double nExp = repParent.expiryTime(pred); if (nExp > exp) { m.storeData(messageExpiryTime, new Double(nExp)); } return false; } // We create a copy, mark it is a replica, mark the Node that just // received the message as "forbidden" and add it into // replicaBuffer. m = new ProtocolStackMessage(msg); m.createForbiddenNodes(); m.forbiddenNodes().add(ct.getDest()); m.forbiddenNodes().add(parent); m.setProtocolID(PROTO_RMEED_ID); // Not enough buffer space if (!parent.canAllocateCapacity(m, handlerID)) { return false; } Double pred = (Double) msg.getData(MSG_PRED_DELAY); assert pred != null; m.storeData(messageExpiryTime, new Double(repParent.expiryTime(pred.doubleValue()))); parent.allocateCapacity(m, handlerID); repParent.storeInRepBuffer(m); return false; } /** Should we accept the specified message for routing? */ public boolean acceptMessage(ProtocolStackMessage msg, Contact source, int size) { if (msg.isRoutingMessage() || size < msg.getLength()) { // Routing or incomplete message - we don't care. return false; } if (msg.getProtocolID() != PROTO_RMEED_ID) { // It is not replica of the message. return false; } assert msg.getDestNode() != parent : "acceptMessage called for data message for this node in " + parent; if (!repParent.oneReplica) { // We should check if we don't have this message already in the // replicas buffer. We don't have to do this for first-copies - // they have higher priority anyway, and will be dealed with in // messageSent. ProtocolStackMessage m = repParent.mIDs.get(msg.getId()); if (m != null) { if (m.getCreationTime() < msg.getCreationTime()) { // Set the creation time to higher value m.setCreationTime(msg.getCreationTime()); } m.createForbiddenNodes(); m.forbiddenNodes().addAll(msg.forbiddenNodes()); return false; } } // We don't have this message yet! // We have to check buffer space. if (!parent.canAllocateCapacity(msg, handlerID)) { return false; } parent.allocateCapacity(msg, handlerID); repParent.storeInRepBuffer(msg); // Notify the meed routing agent that a new message is available // It will handle sending new updates out to other nodes repParent.finalRepDestinationAgent.notifyNewMessage(msg); repParent.routingRepAgent.notifyNewMessage(msg); return false; } public int canAllocate(ProtocolStackMessage forMsg, int hID) { ListIterator<ProtocolStackMessage> it = repParent.replicationBuffer .listIterator(repParent.replicationBuffer.size()); int canFree = 0; double f; if (hID == handlerID) { f = forMsg.getCreationTime(); } else { // Protocol with higher priority - we will remove all the // messages if we have to! f = Double.MAX_VALUE; } // We have to do this twice. There is possibility, that we can't // remove a replica - it is counted in repSize, but it is not in our // buffer because it's beeing sent now. // Also, we remove only messages with greater counter value! Double exp; double curTime = parent.getNetwork().getCurrentTime(); while (it.hasPrevious()) { ProtocolStackMessage m = it.previous(); if (m.getCreationTime() < f) { canFree += m.getLength(); } else { exp = (Double) m.getData(messageExpiryTime); if (exp != null && exp.doubleValue() < curTime) { canFree += m.getLength(); } else { break; } } } return canFree; } public int allocate(ProtocolStackMessage forMsg, int hID) { double f; if (hID == handlerID) { // The same protocol f = forMsg.getCreationTime(); } else { // Protocol with higher priority - we will remove all the // messages if we have to! f = Double.MAX_VALUE; } ListIterator<ProtocolStackMessage> it = repParent.replicationBuffer .listIterator(repParent.replicationBuffer.size()); int toFree, canFree = 0; toFree = forMsg.getLength() - parent.availableCapacity(); Double exp; while (canFree < toFree && it.hasPrevious()) { ProtocolStackMessage m = it.previous(); exp = (Double) m.getData(messageExpiryTime); if (exp != null && exp.doubleValue() < parent.getNetwork().getCurrentTime()) { if (parent.getNetwork().vShouldLog(Verbose.NOTIFY)) parent.getNetwork().vprint("DROPPED replica of " + m + " - it has expired at: " + exp); canFree += m.getLength(); repParent.mIDs.remove(m.getId()); it.remove(); } else if (m.getCreationTime() < f) { if (parent.getNetwork().vShouldLog(Verbose.NOTIFY)) parent.getNetwork().vprint("DROPPED replica of " + m + " to free buffer space for " + forMsg); canFree += m.getLength(); repParent.mIDs.remove(m.getId()); it.remove(); } else { assert false : "canAllocate should be called first to check if it is possible!"; } } return canFree; } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -