📄 .#meednode.java.1.13
字号:
package implementations;import java.util.ArrayList;import java.util.HashMap;import java.util.Map;import java.util.HashSet;import java.util.Iterator;import java.util.Collection;import java.util.NoSuchElementException;import simulator.Network;import simulator.Node;import simulator.Message;import simulator.Contact;import simulator.Dijkstra;import simulator.Stats;import util.CommandStatus;import util.Verbose;public class MEEDNode extends ProtocolStackNode{ protected int bufferCapacity = 0; protected int msgDataLength = 10; protected float primaryQueueOccupied = 0; private ArrayList<Message> primaryQueue = new ArrayList<Message>(); private MessageIteratorHandler outboundQueues = null; private FinalDestinationEventHandler finalDestinationAgent = null; private MEEDTopologyEventHandler topologyAgent = null; private TopologyRoutingEventHandler routingAgent = null; // This constructor is used only to create default_node instance, // so there is no need to add handlers. public MEEDNode() { super(); } public MEEDNode(MEEDNode org) { super(org); // The first argument is the buffer size bufferCapacity = org.bufferCapacity; outboundQueues = createMessageIterator(); assert outboundQueues != null; finalDestinationAgent = new FinalDestinationEventHandler(this, primaryQueue, outboundQueues); topologyAgent = new MEEDTopologyEventHandler(this, outboundQueues); routingAgent = new TopologyRoutingEventHandler(this, topologyAgent, primaryQueue, outboundQueues); // Add the event handlers in the proper order appendHandler(outboundQueues); appendHandler(finalDestinationAgent); appendHandler(topologyAgent); appendHandler(routingAgent); } public CommandStatus parseCommandPart(ArrayList<String> part, String path) { String param = part.get(0); CommandStatus ok = new CommandStatus(CommandStatus.COMMAND_OK); if (part.size() != 2) return null; if (param.equals("send")) { sendNewMessage(msgDataLength, network.getNode(part.get(1))); return ok; } int val; try { if (param.equals("buffer_size")) { val = Integer.parseInt(part.get(1)); if (val < 1) { return new CommandStatus("Max buffer size can not be < 1!"); } bufferCapacity = val; return ok; } else if (param.equals("msg_size")) { val = Integer.parseInt(part.get(1)); if (val < 1) { return new CommandStatus("Message Size can not be < 1!"); } msgDataLength = val; return ok; } } catch (NumberFormatException e) { return new CommandStatus("Error parsing value of parameter '" + param + "': " + e); } return null; } protected MessageIteratorHandler createMessageIterator() { return new MessageIteratorHandler(this); } public boolean sendNewMessage(int dataLength, Node destNode) { Message msg = new Message(network.getNextMessageId(), network.getCurrentTime(), this, destNode, dataLength); boolean ret = acceptMessage(msg, null, msg.getLength()); network.vprint(Verbose.NOTIFY, "Sending " + msg + " (" + msg.getSourceNode() + " => " + msg.getDestNode() + ") result: " + ret); // TODO - print errors? network.stats().notify(msg, Stats.MSG_CREATED); return ret; } /** Should we accept the specified message for routing? */ protected boolean acceptMessage(Message msg, Contact source, int size) { if (msg.isRoutingMessage()) { // ProtocolStackNode has to deal with protocol messages return super.acceptMessage(msg, source, size); } assert msg.getDestNode() != this : "acceptMessage called for data message for this node in " + this; // System.out.println("acceptMessage in " + this); // We are willing to store any message, as long as we have space in the // buffer: // We do not include the flooding queue in this computation if (msg.getLength() > bufferCapacity - primaryQueueOccupied) return false; // Free messages in the flooding buffer, if required // ~ while ( bufferCapacity - primaryQueueOccupied - // floodingQueueOccupied < msg.size() ) // ~ { // ~ System.out.println( "expiring message " + primaryQueue.get(0) ); // ~ } // Store the message in the primary queue primaryQueueOccupied += msg.getLength(); assert (bufferCapacity - primaryQueueOccupied >= 0); primaryQueue.add(msg); // Notify the meed routing agent that a new message is available // It will handle sending new updates out to other nodes finalDestinationAgent.notifyNewMessage(msg); routingAgent.notifyNewMessage(msg); // ~ System.out.println( this + " added message " + msg ); return true; } protected Contact getContactToNode(Node dest) { Iterator<Contact> it = getContactsToNode(dest); // TODO - MEED can work only with network where two nodes can be // connected with only one contact (or not connected at all). // We return here first contact from the list of available contacts. if (it.hasNext()) return it.next(); return null; } // TODO: Implement epidemic stuff // ~ private HashSet receivedMessages = new HashSet(); /** * Maps nodes to their most recent summary vector. This is used to restart * the epidemic protocol when new messages arrive. */ // TODO: Implement epidemic stuff // ~ private HashMap lastSummaryVectors = new HashMap(); private static class RoutingUpdate { public Node node; public int sequenceNumber; public HashMap<Contact, Float> contactWeights; public RoutingUpdate() { } } /** * This is a simple handler that delivers any messages destined for the * other end of contacts. */ public static class FinalDestinationEventHandler implements ProtocolHandler { private MEEDNode parent; private Collection<Message> buffer; private MessageIteratorHandler outboundQueues; private HashSet<Contact> finishedDestinations = new HashSet<Contact>(); public FinalDestinationEventHandler(MEEDNode node, Collection<Message> buffer, MessageIteratorHandler outQueues) { this.parent = node; this.buffer = buffer; this.outboundQueues = outQueues; } public void contactUp(Contact ct) { assert !finishedDestinations.contains(ct); } public void contactDown(Contact ct) { finishedDestinations.remove(ct); } // TODO - change: localMsg -> accept(Remote or Routing)Message // public boolean handleLocalMessage(Message m, Contact s) public boolean acceptMessage(Message msg, Contact ct, int size) { // System.out.println("acceptMessage in " + this); return false; } // TODO : OLD version: // public void notifyNewMessage(Contact ct, Message m) public void notifyNewMessage(Message m) { Contact c = parent.getContactToNode(m.getDestNode()); // Find out if the destination is one of the contacts that we are // active with if (c != null && finishedDestinations.remove(c)) { // We did remove something: go signal the contact parent.signalIdleContact(c); } } public boolean contactIdle(Contact ct) { // ~ System.out.println( Simulator.time() + " " + this.parent + " // Final Delivery idleContact " + ct ); if (!finishedDestinations.contains(ct)) { // Remember that we already iterated over this destation finishedDestinations.add(ct); // ~ System.out.println( Simulator.time() + " FinalDelivery // idleContact: " + ct ); // Find any messages destined for this node outboundQueues.appendIterator(ct, new FinalDestinationIterator(ct, buffer)); // Directly tell the outboundQueues to handle this event // TODO - check - appendIterator already called // signalIdleContact(ct) // outboundQueues.contactIdle(ct); } return true; } /** * Filterns the iterator to return messages with the specified * destination. */ private class FinalDestinationIterator implements Iterator<Message> { private Contact contact; private Collection<Message> buffer; public FinalDestinationIterator(Contact ct, Collection<Message> buffer) { contact = ct; this.buffer = buffer; } private Message findNext() { // This is *super* inefficient, but it avoids the // ConcurrentModificationExceptions Iterator<Message> i = buffer.iterator(); while (i.hasNext()) { Message m = i.next(); if (m.getDestNode() == contact.getDest()) { return m; } } return null; } public boolean hasNext() { return (findNext() != null); } public Message next() { Message retval = findNext(); assert buffer.contains(retval); if (retval == null) throw new NoSuchElementException(); // Remove the message from the buffer so other contacts do not // try to send it boolean result = buffer.remove(retval); assert result; return retval; } public void remove() { // TODO - But ProtocolStack tries to do this. // It doesn't hurt, as the message was removed in last next() // But Maybe this is better to be left here, and not to remove // messages in next() method? // assert false; // don't do this } } } public interface MEEDTopologyObserver { /** This method is called when any change to the topology state occurs. */ public void topologyChanged(); } /** Distributes MEED topology updates. */ public static class MEEDTopologyEventHandler implements ProtocolHandler { private MessageIteratorHandler outboundQueues = null; private MEEDNode parent = null; private Network network = null; /** Stores the network topology state. */ private DTNTopology topology = new DTNTopology(); private static Message.Header routingSummaryKey = new Message.Header(); private static Message.Header routingUpdateKey = new Message.Header(); /** * Caches the routing state for efficiency. It only changes when * contacts come up or go down. */ private Dijkstra<Node, Contact, Message> lastRouting = null; private int selfSequence = 0; private MEEDComputation metric = new MEEDComputation(); private HashSet<Contact> connectedContacts = new HashSet<Contact>(); public MEEDTopologyEventHandler(MEEDNode node, MessageIteratorHandler outQueues) { this.outboundQueues = outQueues; this.parent = node; this.network = node.getNetwork(); } private ArrayList<MEEDTopologyObserver> observers = new ArrayList<MEEDTopologyObserver>(); public void addObserver(MEEDTopologyObserver observer) { observers.add(observer); } /** * When the topology changes we invalidate our cache and broadcast the * event to any observers. */ private void topologyChanged() { lastRouting = null; for (Iterator i = observers.iterator(); i.hasNext();) { ((MEEDTopologyObserver) i.next()).topologyChanged(); } // When the topology changes, wake up any idle contacts parent.signalAllIdleContacts(); } public Dijkstra<Node, Contact, Message> routingCache() { // If we are getting updates, do not let others get the current // topology, as it is about to change if (isGettingUpdates()) return null; if (lastRouting == null) { lastRouting = new Dijkstra<Node, Contact, Message>(topology, parent, network.getCurrentTime(), null); network.vprint(Verbose.DEBUG1, "MEED " + lastRouting.toString()); } return lastRouting; } private boolean receivedProtocolMessage(Message msg, Contact contact) { // Is this a routing message with a routing summary vector? @SuppressWarnings("unchecked") HashMap<? extends Node, ? extends Integer> routingSummary = (HashMap<? extends Node, ? extends Integer>) msg .getData(routingSummaryKey); if (routingSummary != null) { if (!contact.isUp()) return false; // Get the list of updates for the other node ArrayList updates = topology.getRoutingUpdates(routingSummary); if (topology.needRoutingUpdates(routingSummary)) { network .vprint(Verbose.DEBUG1, "MEED " + this.parent + " needs updates from " + msg.getSourceNode()); // We need a routing update: record that the routing // state is changing assert !gettingUpdates.contains(contact); gettingUpdates.add(contact); } // TODO: Include info about buffer space? This is actually // more complex than it seems, but maybe a // naive version will still improve things? if (updates.size() > 0) { // Package this up in an update message // base size is 4 bytes (number of updates) int size = 4; for (Iterator i = updates.iterator(); i.hasNext();) { size += 12; // add the "node id, sequence number, // length // of contact information" size += 8 * ((RoutingUpdate) i.next()).contactWeights.size(); // 4 // bytes // other // node // id, // 4 // bytes // contact // weight
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -