📄 .#meednode.java.1.13
字号:
} Message updateMsg = new Message(network.getNextRoutingMessageId(), network.getCurrentTime(), parent, msg.getSourceNode(), 0, size, true); updateMsg.setIsRoutingMessage(true); // ~ Stats.add( requestMsg ); updateMsg.storeData(routingUpdateKey, updates); ArrayList<Message> arr = new ArrayList<Message>(1); arr.add(updateMsg); // Create a new iterator for this thing outboundQueues.appendIterator(contact.getReverseContact(), arr.iterator()); } network.vprint(Verbose.DEBUG1, "MEED " + this.parent + " got summary vector from " + msg.getSourceNode()); return true; } // Is this a message with updates? ArrayList updates = (ArrayList) msg.getData(routingUpdateKey); if (updates != null) { // Mark that we are finished updating the routing for this // node network.vprint(Verbose.DEBUG1, "MEED " + this.parent + " finished getting updates from " + msg.getSourceNode()); gettingUpdates.remove(contact); // Process the updates boolean didUpdate = false; for (Iterator i = updates.iterator(); i.hasNext();) { // Record if this update actually did something boolean updated = topology.processUpdate((RoutingUpdate) i.next()); network.vprint(Verbose.DEBUG1, "MEED ProcessedUpdate: " + updated); didUpdate = updated || didUpdate; } // All other nodes that are currently connected had the same // topology state as this node. // They need this update as well. Flood it to all the other // contacts if (didUpdate) { for (Iterator<Contact> i = connectedContacts.iterator(); i.hasNext();) { Contact c = i.next(); // If this is a different contact than this message // came // from if (c.getDest() != contact.getSource()) { // TODO: Should the src be this node, or the // original source? Does it matter Message m = new Message(network.getNextRoutingMessageId(), network.getCurrentTime(), parent, c.getDest(), 0, msg.getProtocolLength(), true); m.storeData(routingUpdateKey, updates); ArrayList<Message> arr = new ArrayList<Message>(1); arr.add(m); // Place this message at the head of the queue outboundQueues.insertIterator(c, arr.iterator()); } } } if (!isGettingUpdates()) { // Our routing state is now completely up to date: // Signal that the topology has changed // this will signal all relevant routing agents to do // their thing. We then wake up all the idle contacts network.vprint(Verbose.DEBUG1, "MEED " + this.parent + " - routing state has been updated - signalling all contacts idle."); topologyChanged(); parent.signalAllIdleContacts(); } return true; } // Verify that we did not miss anything assert msg.getData(routingSummaryKey) == null && msg.getData(routingUpdateKey) == null; return false; } /** Process incoming topology messages. */ public boolean acceptMessage(Message msg, Contact contact, int size) { // System.out.println("acceptMessage in " + this); if (size < msg.getLength()) return false; if (msg.isRoutingMessage()) { return receivedProtocolMessage(msg, contact); } // TODO - what to return???? // This protocol layer doesn't handle data messages, so it is // probably safe to say false here, but I will have to check it ;) return false; } /** * Record contact statistics when the link comes up. */ public void contactUp(Contact ct) { // It might be only contact directed FROM this node! if (ct.getSource() != parent) return; assert (!connectedContacts.contains(ct)); metric.contactUp(ct, network.getCurrentTime()); // See if our link state has changed and needs to be updated HashMap<Contact, Float> linkState = metric.contactWeights(network.getCurrentTime()); if (linkState != null) { topologyChanged(); // invalidate routing cache // Put the routing update into an object selfSequence += 1; RoutingUpdate r = new RoutingUpdate(); r.node = this.parent; r.sequenceNumber = selfSequence; r.contactWeights = linkState; boolean updated = topology.processUpdate(r); network.vprint(Verbose.DEBUG1, "MEED ProcessedUpdate (self) for linkState: " + linkState + ": " + updated); // Flood this local topology update to all other nodes for (Iterator<Contact> i = connectedContacts.iterator(); i.hasNext();) { Contact c = i.next(); assert (c != ct); // Package this up in an update message // base size is 16 bytes (number of updates, node id, // sequence number, length, then 8 bytes per link (node id, // contact weight)) int size = 16 + 8 * linkState.size(); ArrayList<RoutingUpdate> temp = new ArrayList<RoutingUpdate>(); temp.add(r); Message updateMsg = new Message(network.getNextRoutingMessageId(), network.getCurrentTime(), parent, c.getDest(), 0, size, true); updateMsg.storeData(routingUpdateKey, temp); ArrayList<Message> arr = new ArrayList<Message>(1); arr.add(updateMsg); // TODO - check if ReverseContact is used correctly here! // Create a new iterator for this thing outboundQueues.appendIterator(c.getReverseContact(), arr.iterator()); } } } /** * Records the contacts that are sending updates to this node. * gettingUpdates contains contacts directed TO this Node (this node is * their destination) */ private HashSet<Contact> gettingUpdates = new HashSet<Contact>(); public boolean isGettingUpdates() { return gettingUpdates.size() != 0; } /** * Exchange link state information when the link first becomes * available. */ public boolean contactIdle(Contact ct) { if (!connectedContacts.contains(ct)) { connectedContacts.add(ct); // Invalidate the routing state now that the link is available topologyChanged(); network.vprint(Verbose.DEBUG1, "MEED " + this.parent + " - routing state invalidated."); // This is a newly connected node: send the routing table // summary HashMap routingSummary = topology.getRoutingSummary(); // 4 bytes per node id, 4 bytes per weight, 4 bytes for the // length int size = routingSummary.size() * 8 + 4; Message msg = new Message(network.getNextRoutingMessageId(), network.getCurrentTime(), parent, ct .getDest(), 0, size, true); msg.storeData(routingSummaryKey, routingSummary); ct.sendMessage(msg); } network.vprint(Verbose.DEBUG1, "MEED " + this.parent + " TopologyIdleContact for " + ct + " returns " + (!isGettingUpdates()) + "; GettingUpdates: " + gettingUpdates); // We force the queue to go idle if we *are* getting updates return (!isGettingUpdates()); } public void contactDown(Contact ct) { connectedContacts.remove(ct); gettingUpdates.remove(ct); metric.contactDown(ct, network.getCurrentTime()); // Contacts going up and down mean that the routing state changes: // invalidate // the cached routing topologyChanged(); } /** * The topology of the current DTN. */ private class DTNTopology implements Dijkstra.Graph<Node, Contact, Message> { public DTNTopology() { } /** Maps contacts to their most recent weights. */ private HashMap<Contact, Float> contactWeights = new HashMap<Contact, Float>(); /** Maps nodes to their most recent sequence numbers. */ private HashMap<Node, Integer> nodeSequenceNumbers = new HashMap<Node, Integer>(); /** Maps nodes to their contacts. */ private HashMap<Node, ArrayList<Contact>> nodeContacts = new HashMap<Node, ArrayList<Contact>>(); /** * Returns the weight for the specified link object. The time and * obj are used so the graph can define weights that depend on the * context. */ public float getWeight(Contact ofContact, float time, Message contextMsg) // public float linkWeight(Graph.Link link, float time, Object // obj) { float weight = 0; // If the specified contact is currently up, then the weight for // local routing is zero if (connectedContacts.contains(ofContact)) weight = 0; else weight = contactWeights.get(ofContact).floatValue(); // ~ System.out.println( Simulator.time() + " Contact " + link + // " weight " + weight ); return weight; } /** * Returns an Iterator over all the Contacts that leave 'fromNode' * Node. */ public Iterator<? extends Contact> getArcsOut(Node fromNode) { ArrayList<Contact> contacts = nodeContacts.get(fromNode); if (contacts != null) return contacts.iterator(); else return (new ArrayList<Contact>()).iterator(); } /** Returns the source node for arc. */ public Node getSource(Contact arc) { return arc.getSource(); } /** Returns the destination node for arc. */ public Node getDest(Contact arc) { return arc.getDest(); } public int compareNodes(Node a, Node b) { return a.compareTo(b); } // TODO - Is is needed? public HashMap<? extends Node, ? extends Integer> getRoutingSummary() { return new HashMap<Node, Integer>( nodeSequenceNumbers ); } public boolean processUpdate(RoutingUpdate update) { return this.processUpdate(update.node, update.sequenceNumber, update.contactWeights); } public boolean processUpdate(Node node, int sequenceNumber, HashMap<Contact, Float> updatedContactWeights) { // Verify that this update is more recent than the previous one if (nodeSequenceNumbers.get(node) != null && nodeSequenceNumbers.get(node).intValue() >= sequenceNumber) { // This update is the same or older than our current state: // ignore it return false; } // Update the sequence number nodeSequenceNumbers.put(node, new Integer(sequenceNumber)); // Remove current contacts, in case the new update has *fewer* // contacts than the // previous update (a contact has expired due to being down for // too long) ArrayList<Contact> contacts = nodeContacts.get(node); nodeContacts.put(node, new ArrayList<Contact>()); if (contacts != null) { for (Iterator<Contact> i = contacts.iterator(); i.hasNext();) { Contact c = i.next(); assert c.getSource() == node; assert contactWeights.containsKey(c); contactWeights.remove(c); } } // Add all the new contacts contactWeights.putAll(updatedContactWeights); ArrayList<Contact> cToAdd = new ArrayList<Contact>(updatedContactWeights.keySet()); nodeContacts.put(node, cToAdd); // Add "invalid" sequence numbers for any nodes which we have // just discovered // This assures that routing will still work, even though we // don't have a sequence number // for those nodes yet Integer invalidSequence = new Integer(-1); for (Iterator<Contact> i = updatedContactWeights.keySet().iterator(); i.hasNext();) { Contact c = i.next(); assert c.getSource() == node; if (!nodeSequenceNumbers.containsKey(c.getDest())) { nodeSequenceNumbers.put(c.getDest(), invalidSequence); } network.vprint(Verbose.DEBUG1, "MEED Update contact: Weight of " + c + " = " + updatedContactWeights.get(c)); // ~ if ( id() == 0 ) System.out.println( Simulator.time() + // " update contact: " + c.src() + " -> " + c.dst() + " = " // + updatedContactWeights.get( c ) ); } // We actually did process an update return true; } /** * Create an array of routing updates that are more accurate than * the ones represented by the routingSummary. */ public ArrayList<RoutingUpdate> getRoutingUpdates(HashMap<? extends Node, ? extends Integer> routingSummary) { ArrayList<RoutingUpdate> updates = new ArrayList<RoutingUpdate>(); for (Iterator<Map.Entry<Node, Integer>> i = nodeSequenceNumbers.entrySet().iterator(); i.hasNext();) { Map.Entry<Node, Integer> entry = i.next(); Node node = entry.getKey(); Integer ourSequence = entry.getValue(); // A -1 indicates that our topology is currently not // complete, and we actually // have no information about that node, but we know it // exists because we have // a contact to it if (ourSequence.intValue() == -1) continue; Integer otherSequence = routingSummary.get(node); if (otherSequence == null || otherSequence.intValue() < ourSequence.intValue()) { // TIf they are missing the information, or if their // information is out of date, // send them the information about this node RoutingUpdate update = new RoutingUpdate(); update.node = node; update.sequenceNumber = ourSequence.intValue(); // Copy the contact weights from our topology state update.contactWeights = new HashMap<Contact, Float>(); for (Iterator j = nodeContacts.get(node).iterator(); j.hasNext();) { Contact c = (Contact) j.next(); update.contactWeights.put(c, contactWeights.get(c)); } updates.add(update); } } return updates; } /** Tests if we need any of the updates in the routing summary. */ // TODO: Can this be combined with the previous test somehow? public boolean needRoutingUpdates(HashMap<? extends Node, ? extends Integer> routingSummary) { for (Iterator i = routingSummary.entrySet().iterator(); i.hasNext();) { Map.Entry entry = (Map.Entry) i.next(); Node node = (Node) entry.getKey(); Integer otherSequence = (Integer) entry.getValue(); Integer ourSequence = nodeSequenceNumbers.get(node); if (ourSequence == null || ourSequence.intValue() < otherSequence.intValue()) { // They have a more recent update that we need return true; } } return false; } } } /** Routes messages based on the topology provided. */ public static class TopologyRoutingEventHandler implements ProtocolHandler, MEEDTopologyObserver { private ProtocolStackNode parent = null; private MessageIteratorHandler outboundQueues = null; private Collection<Message> buffer = null; private MEEDTopologyEventHandler topology = null; public TopologyRoutingEventHandler(ProtocolStackNode node, MEEDTopologyEventHandler topology, Collection<Message> buffer, MessageIteratorHandler outQueues) { this.parent = node; this.buffer = buffer; this.outboundQueues = outQueues; this.topology = topology; topology.addObserver(this); } /** * Records the contacts that have already received all messages we could * have sent to them. This set contains contacts directed FROM this Node * (this node is their source) */ HashSet<Contact> finishedWithDestinations = new HashSet<Contact>(); public boolean contactIdle(Contact ct)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -