📄 protocolstacknode.java
字号:
package implementations;import java.util.ArrayList;import java.util.Iterator;import java.util.HashSet;import java.util.TreeSet;import java.util.HashMap;import simulator.Contact;import simulator.Message;import simulator.Node;import simulator.Stats;import util.CommandStatus;import util.Verbose;public abstract class ProtocolStackNode extends Node{ private static int protoHandlerID = 0; protected HashSet<Contact> idleContacts = new HashSet<Contact>(); /** * Contains references to each of the event handlers. They are called in * order, until one handles the event. */ private ArrayList<EventHandler> eventHandlers = new ArrayList<EventHandler>(); private ArrayList<EventHandler> bufferHandlers = new ArrayList<EventHandler>(); private HashMap<Integer, ProtocolStackMessage> commonBuffer = new HashMap<Integer, ProtocolStackMessage>(10); private HashMap<Integer, Counter> commonBufferCounts = new HashMap<Integer, Counter>(10); private int bufferCapacity = 0; private int bufferOccupied = 0; private int msgDataLength = 10; private boolean acceptIncompleteMessages = false; public ProtocolStackNode() { super(); } public ProtocolStackNode(ProtocolStackNode org) { super(org); bufferCapacity = org.bufferCapacity; bufferOccupied = 0; } public void setAcceptIncomplete(boolean b) { acceptIncompleteMessages = b; } protected final void clearEventHandlers() { eventHandlers.clear(); } /** Adds a ProtocolHandler to the end of the stack. */ protected final void appendEventHandler(EventHandler handler) { assert !eventHandlers.contains(handler); eventHandlers.add(handler); } /** Adds a ProtocolHandler at the beginning of the stack. */ protected final void insertEventHandler(EventHandler handler) { assert !eventHandlers.contains(handler); eventHandlers.add(0, handler); } protected final void clearBufferHandlers() { bufferHandlers.clear(); } /** Adds a ProtocolHandler to the end of the stack. */ protected final void appendBufferHandler(EventHandler handler) { assert !bufferHandlers.contains(handler); bufferHandlers.add(handler); } /** Adds a ProtocolHandler at the beginning of the stack. */ protected final void insertBufferHandler(EventHandler handler) { assert !bufferHandlers.contains(handler); bufferHandlers.add(0, handler); } public final static int getHandlerID() { return ++protoHandlerID; } public final int bufferCapacity() { return bufferCapacity; } public final int availableCapacity() { if (bufferCapacity < 0) return Integer.MAX_VALUE; return bufferCapacity - bufferOccupied; } public final boolean canAllocateCapacity(ProtocolStackMessage forMsg, int handlerID) { int capacity = forMsg.getLength(); if (bufferCapacity < 0 || bufferCapacity - bufferOccupied >= capacity) return true; Iterator<EventHandler> it = bufferHandlers.iterator(); int allocated = 0; capacity += bufferOccupied - bufferCapacity; while (it.hasNext() && allocated < capacity) { EventHandler handler = it.next(); allocated += handler.canAllocate(forMsg, handlerID); if (handler.handlerID == handlerID) break; } return allocated >= capacity; } public final void allocateCapacity(ProtocolStackMessage forMsg, int handlerID) { if (bufferCapacity < 0) return; int capacity = forMsg.getLength(); if (bufferCapacity - bufferOccupied >= capacity) { bufferOccupied += capacity; assert bufferOccupied <= bufferCapacity; return; } Iterator<EventHandler> it = bufferHandlers.iterator(); int allocated = 0; capacity += bufferOccupied - bufferCapacity; while (it.hasNext() && allocated < capacity) { EventHandler handler = it.next(); int a = handler.allocate(forMsg, handlerID); if (a >= 0) { allocated += a; bufferOccupied -= a; } assert handler.handlerID != handlerID || allocated >= capacity; } // allocateCapacity should be called after canAllocateCapacity assert allocated >= capacity; bufferOccupied += forMsg.getLength(); assert bufferOccupied <= bufferCapacity; return; } public final void freeCapacity(ProtocolStackMessage msg) { if (bufferCapacity >= 0) { bufferOccupied -= msg.getLength(); assert bufferOccupied >= 0; } } public void addBufferedMessage(ProtocolStackMessage msg) { Integer id = msg.getId(); Counter count = commonBufferCounts.get(id); if (count == null) { commonBufferCounts.put(id, new Counter(1)); assert !commonBuffer.containsKey(id); commonBuffer.put(id, msg); } else { count.inc(); } } public void removeBufferedMessage(ProtocolStackMessage msg) { Integer id = new Integer(msg.getId()); assert commonBuffer.containsKey(id); assert commonBufferCounts.containsKey(id); Counter count = commonBufferCounts.get(id); if (count.dec()) { commonBufferCounts.remove(id); commonBuffer.remove(id); } } public void removeMessageCopies(Integer id) { for (EventHandler handler : eventHandlers) { handler.removeMessageCopies(id); } } public ProtocolStackMessage getBufferedMessage(int id) { return commonBuffer.get(id); } public boolean containsBufferedMessage(int id) { return commonBuffer.containsKey(id); } public Iterator<ProtocolStackMessage> getBufferedMessages() { return commonBuffer.values().iterator(); } public Iterator<Integer> getBufferedIDs() { return commonBuffer.keySet().iterator(); } /** * Called when the contact is available for sending messages. It is final * because this node takes control over idleContact handling: If you wish to * mess with it, install a handler. */ public final void contactIdle(Contact ct) { if (!ct.isUp() || !ct.isIdle() || ct.getSource() != this) return; super.contactIdle(ct); contactInternalIdle(ct, true); } protected boolean contactInternalIdle(Contact ct, boolean manageIdleList) { for (EventHandler handler : eventHandlers) { boolean handled = handler.contactIdle(ct); // This idle event handler handled the event, or wanted the queue to // be blocked if (handled) { break; } else if (!ct.isIdle()) { // The handler did nothing: the contact MUST be idle throw new RuntimeException("contactIdle handler '" + handler + "' did not handle the event, but the contact is no longer idle"); } } if (ct.isIdle()) { if (manageIdleList) { // If the contact is still idle, keep track of it for later idleContacts.add(ct); } return false; } return true; } /** Deliver event notifaciotns to all handlers. */ public void contactUp(Contact ct) { super.contactUp(ct); for (EventHandler handler : eventHandlers) { handler.contactUp(ct); } } public void contactDown(Contact ct) { super.contactDown(ct); idleContacts.remove(ct); for (EventHandler handler : eventHandlers) { handler.contactDown(ct); } assert !idleContacts.contains(ct); } public void messageReachedFinalDest(Message msg, Contact contact, int size) { super.messageReachedFinalDest(msg, contact, size); for (EventHandler handler : eventHandlers) { handler.messageReachedFinalDest((ProtocolStackMessage) msg, contact, size); } } /** * Should we accept the specified message? This is called for messages for * other nodes, and for all routing messages. It is not called for data * messages for this node. */ protected boolean acceptMessage(Message m, Contact ct, int size) { boolean ret = false; if (!acceptIncompleteMessages && size < m.getLength()) return false; // This message for sure won't fit if (bufferCapacity >= 0 && m.getLength() > bufferCapacity) return false; ProtocolStackMessage msg = (ProtocolStackMessage) m; for (EventHandler handler : eventHandlers) { if (handler.acceptMessage(msg, ct, size)) ret = true; } return ret; } public void messageSent(Message m, Contact ct, int sizeSent) { ProtocolStackMessage msg = (ProtocolStackMessage) m; for (EventHandler handler : eventHandlers) { boolean stop = handler.messageSent(msg, ct, sizeSent); if (stop) { return; } } } public void messageDelivered(Message m, Contact ct, int sizeDelivered) { ProtocolStackMessage msg = (ProtocolStackMessage) m; for (EventHandler handler : eventHandlers) { boolean stop = handler.messageDelivered(msg, ct, sizeDelivered); if (stop) { return; } } } public boolean addContact(Contact ct) { for (EventHandler handler : eventHandlers) { handler.addContact(ct); } return super.addContact(ct); } public boolean removeContact(Contact ct) { for (EventHandler handler : eventHandlers) { handler.removeContact(ct); } return super.removeContact(ct); } public boolean sendNewMessage(int dataLength, Node destNode) { ProtocolStackMessage msg = new ProtocolStackMessage(network.getNextMessageId(), network.getCurrentTime(), this, destNode, dataLength); boolean ret = acceptMessage(msg, null, msg.getLength()); if (network.vShouldLog(Verbose.NOTIFY)) network.vprint("Sending " + msg + " (" + msg.getSourceNode() + " => " + msg.getDestNode() + ") result: " + ret); if (!ret && network.vShouldLog(Verbose.ERR)) network.vprint("Sending " + msg + " (" + msg.getSourceNode() + " => " + msg.getDestNode() + ") FAILED"); statMsgEntry.setup(msg, network.getCurrentTime(), Stats.MSG_CREATED); network.stats().notify(statMsgEntry); return ret; } public CommandStatus parseCommandPart(ArrayList<String> part, String path) { String param = part.get(0); CommandStatus ok = super.parseCommandPart(part, path); if (ok != null) return ok; ok = new CommandStatus(CommandStatus.COMMAND_OK); if (part.size() != 2) return null; if (param.equals("send")) { sendNewMessage(msgDataLength, network.getNode(part.get(1))); return ok; } int val; try { if (param.equals("buffer_size")) { val = Integer.parseInt(part.get(1)); if (val < -1) { return new CommandStatus("Max buffer size can not be < -1!"); } bufferCapacity = val; if (bufferCapacity >= 0 && bufferCapacity < bufferOccupied) { return new CommandStatus("BufferCapacity can not be < current bufferOccupied!"); } return ok; } else if (param.equals("msg_size")) { val = Integer.parseInt(part.get(1)); if (val < 1) { return new CommandStatus("Message Size can not be < 1!"); } msgDataLength = val; return ok; } } catch (NumberFormatException e) { return new CommandStatus("Error parsing value of parameter '" + param + "': " + e); } return null; } /** * If a contact to the specified destination is currently idle, then this * method will wake it up and cause it to handle traffic. */ public void signalContactIdle(Contact ct) { if (ct.isIdle()) { if (contactInternalIdle(ct, false)) { idleContacts.remove(ct); } } } /** Wakes up all idle contacts. */ public void signalAllContactsIdle() { // ~ System.out.println( Simulator.time() + " signalAllIdleContacts: " + // idleContacts ); Iterator<Contact> i = idleContacts.iterator(); while (i.hasNext()) { Contact c = i.next(); if (c.isIdle()) { if (contactInternalIdle(c, false)) { i.remove(); } } } } /** Wakes up all idle contacts to the given node. */ public void signalAllContactsIdle(ProtocolStackNode toNode) { // ~ System.out.println( Simulator.time() + " signalAllIdleContacts: " + // idleContacts ); Iterator<Contact> i = idleContacts.iterator(); while (i.hasNext()) { Contact c = i.next(); if (c.getDest() == toNode && c.isIdle()) { if (contactInternalIdle(c, false)) { i.remove(); } } } } public static abstract class EventHandler { public int handlerID = 0; public EventHandler(int id) { handlerID = id; } /** * Called when a contact is idle. Event handlers should use this to * inject messages into the contact. It can also be used to block the * queue for some reason. If true is returned, no other handlers will * see this event and the contact can be left idle. However, an * exception will be raised if false is returned and the contact is not * idle. * * @returns true if the event was handled and event propagation should * be blocked, false if event propagation should continue. */ public boolean contactIdle(Contact ct) { return false; } public void contactUp(Contact ct) { } public void contactDown(Contact ct) { } public void addContact(Contact ct) { } public void removeContact(Contact ct) { } /** * Asks if this handler will process the message. The message has just * arrived at the destination of the node and is destined for another * node. If this returns true, the handler accepts responsability for * the message and further processing will not occur. * * @returns true if standard handling should continue, or false if event * propagation should be blocked. * @param size * number of bytes actually delivered (if contacts support * incomplete deliveries it could be < msg.getLength()) * */ public boolean acceptMessage(ProtocolStackMessage msg, Contact ct, int size) { return false; } public boolean messageSent(ProtocolStackMessage msg, Contact ct, int sizeSent) { return false; } public boolean messageDelivered(ProtocolStackMessage msg, Contact ct, int sizeDelivered) { return false; } public int canAllocate(ProtocolStackMessage forMsg, int hID) { return 0; } public int allocate(ProtocolStackMessage forMsg, int hID) { return 0; } public void removeMessageCopies(Integer id) { } public void messageReachedFinalDest(ProtocolStackMessage msg, Contact contact, int size) { } } private static class Counter { public int val = 0; Counter(int i) { val = i; } public void inc() { ++val; } public boolean dec() { if (--val > 0) return false; return true; } } public static class ProtocolStackMessage extends Message { public static final int PROTO_NONE = 0; private static int lastProtocolID = PROTO_NONE; private int protoID = PROTO_NONE; private TreeSet<Node> forbiddenNodes = null; public ProtocolStackMessage(int id, double creationTime, Node source, Node dest, int data) { super(id, creationTime, source, dest, data); } public ProtocolStackMessage(int id, double creationTime, Node source, Node dest, int data, int protocol, boolean isR) { super(id, creationTime, source, dest, data, protocol, isR); } public ProtocolStackMessage(ProtocolStackMessage org) { super(org); } public void setProtocolID(int id) { protoID = id; } public int getProtocolID() { return protoID; } public final static int getNewProtocolID() { return ++lastProtocolID; } public final TreeSet<Node> forbiddenNodes() { return forbiddenNodes; } public final void createForbiddenNodes() { if (forbiddenNodes == null) forbiddenNodes = new TreeSet<Node>(); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -