📄 replicationmeednode.java
字号:
package implementations;import java.util.ArrayList;import java.util.Collection;import java.util.HashMap;import java.util.TreeSet;import java.util.Iterator;import java.util.ListIterator;import simulator.Contact;import simulator.Node;import simulator.Message;import util.CommandStatus;import util.Verbose;public class ReplicationMEEDNode extends MEEDNode{ protected static int PROTO_RMEED_ID = ProtocolStackMessage.getNewProtocolID(); protected static Message.Header messageExpiryTime = new Message.Header(); protected static Message.Header messageRLastBackNode = new Message.Header(); protected ArrayList<ProtocolStackMessage> replicationBuffer = null; protected HashMap<Integer, ProtocolStackMessage> mIDs = null; protected TopologyRoutingEventHandler routingRepAgent = null; protected FinalDestinationEventHandler finalRepDestinationAgent = null; protected double delivToExpiry = 10; protected double orgAltRepRatio = 0.001; protected boolean replicateOnAlternPath = false; protected boolean replicateOnBacktracking = false; protected boolean replicateInSource = true; protected boolean multipleSourceReplicas = false; // protected boolean // This constructor is used only to create default_node instance, // so there is no need to add handlers. public ReplicationMEEDNode() { super(); } public ReplicationMEEDNode(ReplicationMEEDNode org) { super(org); delivToExpiry = org.delivToExpiry; orgAltRepRatio = org.orgAltRepRatio; replicateOnAlternPath = org.replicateOnAlternPath; replicateOnBacktracking = org.replicateOnBacktracking; replicateInSource = org.replicateInSource; multipleSourceReplicas = org.multipleSourceReplicas; } public void initNode() { replicationBuffer = new ArrayList<ProtocolStackMessage>(); mIDs = new HashMap<Integer, ProtocolStackMessage>(); finalDestinationAgent = new FinalDestinationEventHandler(this, primaryQueue); finalRepDestinationAgent = new RepFinalDestinationEventHandler(this, replicationBuffer); topologyAgent = new MEEDTopologyEventHandler(this); routingAgent = new TopologyRoutingEventHandler(this, topologyAgent, finalDestinationAgent, primaryQueue); routingRepAgent = new TopologyRepRoutingEventHandler(this, topologyAgent, finalRepDestinationAgent, replicationBuffer); // Add the event handlers in the proper order appendEventHandler(finalDestinationAgent); appendEventHandler(finalRepDestinationAgent); appendEventHandler(topologyAgent); appendEventHandler(routingAgent); appendBufferHandler(routingRepAgent); if (useACK) { ACKAgent = new ACKHandler(this); appendEventHandler(ACKAgent); appendBufferHandler(ACKAgent); } appendEventHandler(routingRepAgent); } private void storeInRepBuffer(ProtocolStackMessage msg) { assert replicationBuffer != null; if (replicationBuffer.size() < 1) { replicationBuffer.add(msg); mIDs.put(msg.getId(), msg); return; } ListIterator<ProtocolStackMessage> it = replicationBuffer.listIterator(); double f = msg.getCreationTime(); while (it.hasNext()) { ProtocolStackMessage m = it.next(); if (m.getCreationTime() < f) { it.previous(); it.add(msg); mIDs.put(msg.getId(), msg); return; } } mIDs.put(msg.getId(), msg); replicationBuffer.add(msg); } protected boolean allowSend(ProtocolStackMessage msg, Contact ct) { Double exp = (Double) msg.getData(messageExpiryTime); if (exp != null && exp.doubleValue() < network.getCurrentTime()) { return false; } return super.allowSend(msg, ct); } protected double expiryTime(double predictDelivery) { return predictDelivery * delivToExpiry + network.getCurrentTime(); } public CommandStatus parseCommandPart(ArrayList<String> part, String path) { String param = part.get(0); CommandStatus ok = super.parseCommandPart(part, path); if (ok != null) return ok; ok = new CommandStatus(CommandStatus.COMMAND_OK); if (part.size() != 2) return null; try { if (param.equals("deliv_expiry_factor")) { double f = Double.parseDouble(part.get(1)); if (f < 0) { return new CommandStatus("Deliv to expiry factor can not be < 0!"); } delivToExpiry = f; return ok; } else if (param.equals("alt_path_replicas")) { replicateOnAlternPath = Boolean.parseBoolean(part.get(1)); return ok; } else if (param.equals("backtracking_replicas")) { replicateOnBacktracking = Boolean.parseBoolean(part.get(1)); return ok; } else if (param.equals("source_replica")) { replicateInSource = Boolean.parseBoolean(part.get(1)); return ok; } else if (param.equals("multiple_source_replicas")) { multipleSourceReplicas = Boolean.parseBoolean(part.get(1)); return ok; } else if (param.equals("alt_path_ratio")) { orgAltRepRatio = Double.parseDouble(part.get(1)); return ok; } } catch (NumberFormatException e) { return new CommandStatus("Error parsing value of parameter '" + param + "': " + e); } return null; } public static class RepFinalDestinationEventHandler extends FinalDestinationEventHandler { double lastCheck = -1; ReplicationMEEDNode repParent = null; public RepFinalDestinationEventHandler(ReplicationMEEDNode node, Collection<ProtocolStackMessage> buffer) { super(node, buffer); repParent = node; } public boolean contactIdle(Contact ct) { if (lastCheck < parent.getNetwork().getCurrentTime()) { lastCheck = parent.getNetwork().getCurrentTime(); Iterator<ProtocolStackMessage> it = buffer.iterator(); while (it.hasNext()) { ProtocolStackMessage m = it.next(); Double exp = (Double) m.getData(messageExpiryTime); if (exp.doubleValue() < lastCheck) { parent.freeCapacity(m); it.remove(); repParent.mIDs.remove(m.getId()); } } } return super.contactIdle(ct); } } public static class TopologyRepRoutingEventHandler extends TopologyRoutingEventHandler { private static int repID = getHandlerID(); protected ReplicationMEEDNode repParent = null; public TopologyRepRoutingEventHandler(ReplicationMEEDNode node, MEEDTopologyEventHandler topology, FinalDestinationEventHandler finalA, Collection<ProtocolStackMessage> buffer) { super(repID, node, topology, finalA, buffer); repParent = node; } public void removeMessageCopies(Integer id) { ProtocolStackMessage m = repParent.mIDs.get(id); if (m != null) { repParent.mIDs.remove(id); buffer.remove(m); if (parent.getNetwork().vShouldLog(Verbose.INFO)) parent.getNetwork().vprint("Removing RMEED copy of ACKed message " + m); parent.freeCapacity(m); } } public boolean contactIdle(Contact ct) { if (this.parent.getNetwork().vShouldLog(Verbose.DEBUG1)) this.parent.getNetwork().vprint("MEED " + this.parent + " RMEEDRouting idleContact: " + ct); // The MEED topology agent should prevent us from getting // idleContact if it is busy getting updates assert !topology.isGettingUpdates(); // If we are not busy getting updates from anyone, we have not // yet finished with this destination, and source should // produce multiple replicas - send replicas to other nodes if (repParent.replicateInSource && repParent.multipleSourceReplicas && !finishedWithDestinations.contains(ct)) { Iterator<ProtocolStackMessage> i = buffer.iterator(); while (i.hasNext()) { ProtocolStackMessage m = i.next(); if (m.getSourceNode() == parent && !m.forbiddenNodes().contains(ct.getDest())) { ct.sendMessage(m); i.remove(); assert !ct.isIdle(); return true; } } } return super.contactIdle(ct); } public boolean messageSent(ProtocolStackMessage msg, Contact ct, int sizeSent) { if (msg.isRoutingMessage()) { // This agent doesn't handle routing messages return false; } // It is replicated message. if (msg.getProtocolID() == PROTO_RMEED_ID) { // It was not really sent - we should re-add it to the buffer. if (sizeSent < msg.getLength()) { repParent.storeInRepBuffer(msg); return false; } // It was delivered to the final destination. We should remove // its size (it was already removed from replicaBuffer when it // was sent). We also should remove it if the source doesn't // create multiple replicas - there is no need to check if // we should copy replicated message again, as this is the only // case when replicated message can be copied. if (!repParent.multipleSourceReplicas || ct.getDest() == msg.getDestNode()) { repParent.freeCapacity(msg); repParent.mIDs.remove(msg.getId()); return false; } Double exp = (Double) msg.getData(messageExpiryTime); if (exp.doubleValue() < parent.getNetwork().getCurrentTime()) { repParent.freeCapacity(msg); repParent.mIDs.remove(msg.getId()); return false; } // Only source replicates messages if (msg.getSourceNode() != parent) { repParent.freeCapacity(msg); repParent.mIDs.remove(msg.getId()); return false; } TreeSet<Node> set = msg.forbiddenNodes(); if (set == null) { msg.createForbiddenNodes(); set = msg.forbiddenNodes(); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -