📄 .#meednode.java.1.41
字号:
} else { assert value <= time; snapshot.put(key, time - value); } } return snapshot; } } public static class ACKHandler extends EventHandler implements EventReceiver { protected static int PROTO_ACK_ID = ProtocolStackMessage.getNewProtocolID(); protected static Message.Header ACK_MSG_ID = new Message.Header(); // TODO - Implement Expiration of ACKs protected static Message.Header ACK_EXPIRY_TIME = new Message.Header(); protected static int ACK_EVENT = 0; protected static int myID = getHandlerID(); protected MEEDNode parent; protected int acksSize = 0; protected HashMap<Integer, ProtocolStackMessage> ackIDs = new HashMap<Integer, ProtocolStackMessage>(); protected HashSet<Integer> sawAcksFor = new HashSet<Integer>(); protected ArrayList<ProtocolStackMessage> acks = new ArrayList<ProtocolStackMessage>(); protected HashMap<Integer, ProtocolStackMessage> buffer = new HashMap<Integer, ProtocolStackMessage>(); // Only for statistics protected HashMap<Integer, ProtocolStackMessage> allOrgs = new HashMap<Integer, ProtocolStackMessage>(); protected HashMap<Contact, HashSet<Integer>> acksForContacts = new HashMap<Contact, HashSet<Integer>>(); protected BasicEvent nextACKEvent = null; protected double nextEventTime = Double.MAX_VALUE; public ACKHandler(MEEDNode node) { super(myID); parent = node; } public void receiveEvent(Event ev) { assert ev == nextACKEvent; assert ev.getTime() == parent.getNetwork().getCurrentTime(); nextACKEvent = null; nextEventTime = Double.MAX_VALUE; double nextTime = Double.MAX_VALUE; double curTime = ev.getTime(); Iterator<ProtocolStackMessage> it = buffer.values().iterator(); ArrayList<ProtocolStackMessage> toAccept = new ArrayList<ProtocolStackMessage>(); while (it.hasNext()) { ProtocolStackMessage m = it.next(); double expiry = ((Double) m.getData(MSG_RESEND_TIME)).doubleValue(); if (expiry <= curTime) { toAccept.add(m); it.remove(); parent.freeCapacity(m); } else if (expiry < nextTime) { nextTime = expiry; } } if (toAccept.size() > 0) { it = toAccept.iterator(); while (it.hasNext()) { ProtocolStackMessage m = it.next(); if (parent.getNetwork().vShouldLog(Verbose.INFO)) parent.getNetwork().vprint("RE-INSERTING Message without ACK on time: " + m + " in " + parent); parent.acceptMessage(m, null, m.getLength()); } } assert nextEventTime == Double.MAX_VALUE; assert nextACKEvent == null; if (nextTime < parent.getNetwork().getCurrentTime()) { setupEvent(parent.getNetwork().getCurrentTime()); } else { setupEvent(nextTime); } } protected void setupEvent(double checkTime) { if (checkTime < nextEventTime) { if (nextACKEvent != null) parent.getNetwork().cancelEvent(nextACKEvent); nextEventTime = checkTime; nextACKEvent = new BasicEvent(checkTime, this, ACK_EVENT, null); if (parent.getNetwork().vShouldLog(Verbose.NOTIFY)) parent.getNetwork().vprint("NEXT_ACK_CHECK scheduled; Time: " + checkTime); parent.getNetwork().scheduleEvent(nextACKEvent); } } public boolean contactIdle(Contact ct) { HashSet<Integer> acksForC = acksForContacts.get(ct); if (acksForC == null || acksForC.size() < 1) return false; Iterator<Integer> it = acksForC.iterator(); while (it.hasNext()) { Integer id = it.next(); ProtocolStackMessage ack = ackIDs.get(id); it.remove(); if (ack != null) { ct.sendMessage(ack); return true; } } return false; } public void addACKToContacts(ProtocolStackMessage ack) { Iterator<Contact> it = parent.getContacts(); Integer id = (Integer) ack.getData(ACK_MSG_ID); assert id != null; while (it.hasNext()) { Contact ct = it.next(); HashSet<Integer> acks = acksForContacts.get(ct); if (acks == null) { acks = new HashSet<Integer>(); acksForContacts.put(ct, acks); } acks.add(id); } } public void removeACKFromContacts(ProtocolStackMessage ack) { Iterator<Contact> it = parent.getContacts(); Integer id = (Integer) ack.getData(ACK_MSG_ID); assert id != null; while (it.hasNext()) { Contact ct = it.next(); HashSet<Integer> acks = acksForContacts.get(ct); if (acks != null) { acks.remove(id); } } } public boolean acceptMessage(ProtocolStackMessage msg, Contact ct, int size) { if (msg.isRoutingMessage()) { if (msg.getProtocolID() == PROTO_ACK_ID) { Integer id = (Integer) msg.getData(ACK_MSG_ID); assert id != null; if (sawAcksFor.contains(id)) return true; sawAcksFor.add(id); if (msg.getDestNode() == parent) { if (parent.getNetwork().vShouldLog(Verbose.INFO)) parent.getNetwork().vprint("ACK RETURNED: ACK " + msg + " for MessageID: " + id); ProtocolStackMessage m = allOrgs.get(id); assert m != null; parent.getNetwork().stats().notify(m, Stats.MSG_ACKED); assert ackIDs.get(id) == null; parent.removeMessageCopies(id); } else { ProtocolStackMessage m = null; boolean added = false; m = ackIDs.get(id); if (m != null) { // This ACK is newer than the one we had, we want to // broadcast it too! if (m.update(msg)) { removeACKFromContacts(m); addACKToContacts(m); added = true; } } else { // We have to check buffer space. if (!parent.canAllocateCapacity(msg, handlerID)) { return false; } parent.allocateCapacity(msg, handlerID); acksSize += msg.getLength(); acks.add(msg); ackIDs.put(id, msg); addACKToContacts(msg); added = true; } parent.removeMessageCopies(id); if (added) parent.signalAllContactsIdle(); } return true; } } else if (msg.getSourceNode() == parent) { // TODO Well, not TODO actually. This is the place we would like // to start a counter for messages which are sent from this // node, but never leave this node. When this time-counter would // hit some value it would report the message as undelivered due // to timeout. Or something like that. Simulator does not have // anybody to report that to, so I don;t have to write that code // :P } return false; } public boolean hasACKFor(Integer id) { return sawAcksFor.contains(id); } public boolean messageSent(ProtocolStackMessage msg, Contact ct, int sizeSent) { if (sizeSent < msg.getLength()) { // We have to re-add this ack to queue for this contact if (msg.isRoutingMessage() && msg.getProtocolID() == PROTO_ACK_ID) { acksForContacts.get(ct).add(msg.getId()); } return false; } Integer id = new Integer(msg.getId()); if (!msg.isRoutingMessage() && hasACKFor(id)) return true; // Message was not sent directly to its final destination! if (msg.getSourceNode() == parent && msg.getDestNode() != ct.getDest() && msg.getProtocolID() == PROTO_MEED_ID) { // If message goes somewhere and returns to the source node, // it is possible, that we already have the copy. // TODO - maybe we should remove copies when they get back to // source? if (!buffer.containsKey(id)) { Integer retrans = (Integer) msg.getData(MSG_RESEND_CNT); if (parent.maxRetrans < 0 || retrans == null || retrans < parent.maxRetrans) { assert msg.getSourceNode() == parent; // It was just sent by MEED algorithm. It should be // possible // to add it to special buffer without canAllocate ProtocolStackMessage m = new ProtocolStackMessage(msg); m.setProtocolID(PROTO_MEED_ID); // We want per copy delay predictions, so we have to // clear // the field saying that this message already left its // source. if (parent.predDelayPerCopy && msg.getSourceNode() == parent) { m.removeData(MSG_LEFT_SOURCE); } m.storeData(MSG_IS_RETRANS, new Integer(1)); if (retrans == null) { m.storeData(MSG_RESEND_CNT, new Integer(1)); } else { m.storeData(MSG_RESEND_CNT, new Integer(retrans + 1)); } if (parent.orgsUseBuffer) parent.allocateCapacity(m, handlerID); buffer.put(id, m); allOrgs.put(id, m); double exp = ((Double) msg.getData(MSG_RESEND_TIME)).doubleValue(); if (exp < parent.getNetwork().getCurrentTime()) { setupEvent(parent.getNetwork().getCurrentTime()); } else { setupEvent(exp); } } } } return false; } public int canAllocate(ProtocolStackMessage forMsg, int hID) { return acksSize; } public int allocate(ProtocolStackMessage forMsg, int hID) { Iterator<ProtocolStackMessage> it = acks.iterator(); int toAlloc = forMsg.getLength() - parent.availableCapacity(); int allocated = 0; while (it.hasNext() && allocated < toAlloc) { ProtocolStackMessage m = it.next(); Integer i = (Integer) m.getData(ACK_MSG_ID); assert i != null; ackIDs.remove(i); removeACKFromContacts(m); it.remove(); acksSize -= m.getLength(); allocated += m.getLength(); } return allocated; } public void removeMessageCopies(Integer id) { ProtocolStackMessage m = buffer.remove(id); if (m != null) { if (parent.getNetwork().vShouldLog(Verbose.INFO)) parent.getNetwork().vprint("Removing saved copy of ACKed message " + m); if (parent.orgsUseBuffer) parent.freeCapacity(m); } } public void messageReachedFinalDest(ProtocolStackMessage msg, Contact contact, int size) { assert !msg.isRoutingMessage(); if (!parent.sendACK || contact == null || contact.getSource() == msg.getSourceNode()) return; // TODO - what size? int s = 4; ProtocolStackMessage newACK = new ProtocolStackMessage(parent.getNetwork().getNextRoutingMessageId(), parent.getNetwork().getCurrentTime(), parent, msg.getSourceNode(), 0, s, true); newACK.setProtocolID(PROTO_ACK_ID); newACK.storeData(ACK_MSG_ID, new Integer(msg.getId())); if (!parent.acceptMessage(newACK, null, newACK.getLength())) { if (parent.getNetwork().vShouldLog(Verbose.WARN)) parent.getNetwork().vprint( "ACK NOT SENT: " + newACK + " - accept failed in " + parent + ". Message: " + msg + " sent from " + msg.getSourceNode()); } else { if (parent.getNetwork().vShouldLog(Verbose.NOTIFY)) parent.getNetwork().vprint( "ACK SENT: " + newACK + " from " + parent + "; Message: " + msg + " sent from " + msg.getSourceNode()); } } } public class MEEDDelivOnlyFilter implements Stats.StatsMsgFilter { public boolean includeMsg(int index, String field, ArrayList<Stats.StatMsgEntry> pos, Stats.StatMsgEntry entry) { if (pos == null) return true; if (field == null || (!field.startsWith("delivered") && !field.equals("status"))) return true; if (entry != null && entry.action != Stats.MSG_DELIVERED) return true; Iterator<Stats.StatMsgEntry> it = pos.iterator(); while (it.hasNext()) { Stats.StatMsgEntry e = it.next(); if (e.action == Stats.MSG_DELIVERED && ((ProtocolStackMessage) e.msg).getProtocolID() == PROTO_MEED_ID) { return true; } } return false; } public Stats.StatsMsgFilter createMsgFilter(ArrayList<String> args) { MEEDDelivOnlyFilter ret = new MEEDDelivOnlyFilter(); // Args[0] = filter // Args[1] = meed_deliv if (args.size() != 2) { throw new IllegalArgumentException( "Illegal 'meed_deliv' filter command! It doesn't accept any parameters!"); } return ret; } } protected static class MEEDDijkstra extends Dijkstra<Node, Contact, ProtocolStackMessage> { MEEDDijkstra(MEEDTopologyEventHandler.DTNTopology graph, Node n, double time, ProtocolStackMessage obj) { super(graph, n, time, obj); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -