📄 epidemicstatemachinenode.java
字号:
package implementations;import java.util.ArrayList;import java.util.HashMap;import java.util.HashSet;import java.util.Iterator;import java.util.ListIterator;import simulator.Contact;import simulator.Message;import util.Verbose;// TODO - Correct add contact and contact up events to do right things.// (Forget state between ups/downs and buffer messages which are added// while all contacts are down. - But it is MUCH slower!public class EpidemicStateMachineNode extends ProtocolStackNode{ protected static int PROTO_EP_ID = ProtocolStackMessage.getNewProtocolID(); /** This header contains the summary vector. */ protected static final Message.Header MSG_SUMMARY_VECTOR_KEY = new Message.Header(); /** This header contains the requested messages list. */ protected static final Message.Header MSG_REQUEST_KEY = new Message.Header(); protected EpidemicEventHandler epidemicAgent; public EpidemicStateMachineNode() { super(); } public EpidemicStateMachineNode(EpidemicStateMachineNode org) { super(org); } public void initNode() { epidemicAgent = new EpidemicEventHandler(this); appendEventHandler(epidemicAgent); appendBufferHandler(epidemicAgent); } public static class EpidemicEventHandler extends EventHandler { // protected PriorityQueue<ProtocolStackMessage> buffer = new // PriorityQueue<ProtocolStackMessage>(10, // new MsgTimeComparator()); protected ArrayList<ProtocolStackMessage> buffer = new ArrayList<ProtocolStackMessage>(10); protected HashMap<Integer, ProtocolStackMessage> bufferID = new HashMap<Integer, ProtocolStackMessage>(10); protected HashSet<Integer> hadMessages = new HashSet<Integer>(10); protected int bufferSize = 0; /** Maps destination nodes to the state objects for that node. */ protected HashMap<Contact, ContactState> contactStates = new HashMap<Contact, ContactState>(); private static int myID = getHandlerID(); private ProtocolStackNode parent; public EpidemicEventHandler(ProtocolStackNode node) { super(myID); this.parent = node; } /** * Returns true if local buffer contains a message with same ID as * Message which is a parameter to this function. */ public final boolean bufferContains(ProtocolStackMessage m) { return bufferContains(m.getId()); } public final boolean bufferContains(Integer id) { return bufferID.containsKey(id); } public final void bufferAdd(ProtocolStackMessage msg) { if (parent.bufferCapacity() >= 0) { bufferSize += msg.getLength(); assert bufferSize <= parent.bufferCapacity(); } if (buffer.size() < 1) { buffer.add(msg); return; } ListIterator<ProtocolStackMessage> it = buffer.listIterator(); double t = msg.getCreationTime(); while (it.hasNext()) { ProtocolStackMessage m = it.next(); if (m.getCreationTime() <= t) { it.previous(); it.add(msg); return; } } buffer.add(msg); return; } /** Returns Message object from a buffer, which has same ID as given. */ private ProtocolStackMessage getBufferedMsg(int id) { return bufferID.get(new Integer(id)); } public int canAllocate(ProtocolStackMessage forMsg, int hID) { // See if we are permitted to free space in the buffer for the // message: // The buffer is maintained in the oldest to newest order int freedSpace = 0; double olderThan; if (hID == handlerID) { // The same protocol olderThan = forMsg.getCreationTime(); } else { // Protocol with higher priority - we will remove all the // messages if we have to! olderThan = Double.POSITIVE_INFINITY; // We can't say how many buffer space we have to free, // because higher layers might be able to free something before // we // do, but it will happen only in real allocate(). // So we just say how many bytes MAX we are able to free. } ListIterator<ProtocolStackMessage> it = buffer.listIterator(buffer.size()); while (it.hasPrevious()) { ProtocolStackMessage m = it.previous(); // The oldest message is newer or has the same start time than // the new message: quit // The equals is there because we prefer to keep messages we // already have since this results in less changes to buffers, // which means less sending summary vectors if (m.getCreationTime() >= olderThan) { break; } // I am permitted to delete this message freedSpace += m.getLength(); } return freedSpace; } public int allocate(ProtocolStackMessage forMsg, int hID) { double olderThan; if (hID == handlerID) { // The same protocol olderThan = forMsg.getCreationTime(); } else { // Protocol with higher priority - we will remove all the // messages if we have to! olderThan = Double.POSITIVE_INFINITY; } int toFree, canFree = 0; // Here we can say how many more we have to free. toFree = forMsg.getLength() - parent.availableCapacity(); ListIterator<ProtocolStackMessage> it = buffer.listIterator(buffer.size()); // This time actually remove the older messages while (canFree < toFree && it.hasPrevious()) { ProtocolStackMessage m = it.previous(); if (parent.getNetwork().vShouldLog(Verbose.NOTIFY)) parent.getNetwork().vprint("DROPPED epidemic " + m + " to free buffer space for " + forMsg); canFree += m.getLength(); if (parent.bufferCapacity() >= 0) bufferSize -= m.getLength(); it.remove(); // The message we are deleting must be older than the new // message assert m.getCreationTime() < olderThan; bufferID.remove(new Integer(m.getId())); } return canFree; } public boolean contactIdle(Contact ct) { ContactState stateMachine = contactStates.get(ct); assert stateMachine != null; // TODO make state machine.idle return true/false stateMachine.idleContact(); if (ct.isIdle()) return false; return true; } public void addContact(Contact ct) { if (ct.getSource() != parent) return; // Make sure we don't have it already assert !contactStates.containsKey(ct); contactStates.put(ct, new ContactState(ct)); } public void contactUp(Contact ct) { if (ct.getSource() != parent) return; assert contactStates.containsKey(ct); contactStates.get(ct).resetState(); } public void removeContact(Contact ct) { if (ct.getSource() == parent) contactStates.remove(ct); } /** Should we accept the specified message for routing? */ public boolean acceptMessage(ProtocolStackMessage msg, Contact source, int size) { if (size < msg.getLength()) { // Incomplete message - we don't care. return false; } if (msg.getDestNode() == parent) { // Node class will call acceptMessage for Messages with // destination == this only if they are routing messages assert msg.isRoutingMessage(); if (msg.getProtocolID() != PROTO_EP_ID) return false; // ~ System.out.println( Simulator.time() + " " + selfNode() + " // msg " + msg + " contact " + source ); // Delegate protocol messages to contact state machines ContactState stateMachine = contactStates.get(source.getReverseContact()); assert stateMachine != null; boolean ret = stateMachine.handleLocalMessage(msg); return ret; } if (msg.getProtocolID() != PROTO_EP_ID) { if (msg.getProtocolID() != ProtocolStackNode.ProtocolStackMessage.PROTO_NONE) msg = new ProtocolStackMessage(msg); msg.setProtocolID(PROTO_EP_ID); // We have to set it to 0. Maybe other layers set something // else... msg.setProtocolLength(0); } // If we already have the message in the queue, just say that the // message was accepted if (bufferContains(msg)) return true; // We don't have this message yet! // We have to check buffer space. if (!parent.canAllocateCapacity(msg, handlerID)) { return false; } parent.allocateCapacity(msg, handlerID); bufferAdd(msg); bufferID.put(new Integer(msg.getId()), msg); hadMessages.add(msg.getId()); // Tell all contacts that there is a new message for (Iterator<ContactState> i = contactStates.values().iterator(); i.hasNext();) { ContactState stateMachine = i.next(); assert stateMachine != null; stateMachine.notifyNewMessage(msg); } return true; } public boolean messageSent(ProtocolStackMessage msg, Contact ct, int sizeSent) { if (msg.getProtocolID() == PROTO_EP_ID && sizeSent < msg.getLength()) { ContactState stateMachine = contactStates.get(ct); assert stateMachine != null; stateMachine.sendingAborted(msg); } return false; } public void removeMessageCopies(Integer id) { if (!bufferContains(id)) return; int i = id.intValue(); Iterator<ProtocolStackMessage> it = buffer.iterator(); while (it.hasNext()) { ProtocolStackMessage m = it.next(); if (m.getId() == i) { it.remove(); bufferID.remove(id); if (parent.bufferCapacity() >= 0) bufferSize -= m.getLength(); parent.freeCapacity(m); } } } public boolean messageDelivered(ProtocolStackMessage msg, Contact ct, int sizeDeliv) { if (msg.getProtocolID() == PROTO_EP_ID) { if (sizeDeliv < msg.getLength()) { ContactState stateMachine = contactStates.get(ct); assert stateMachine != null; stateMachine.sendingAborted(msg); } else if (!msg.isRoutingMessage() && msg.getDestNode() == ct.getDest()) { parent.removeMessageCopies(msg.getId()); } } return false; } private class ContactState { private Contact contact; private State state; private ArrayList<? extends ProtocolStackMessage> otherSummary; private ArrayList<? extends ProtocolStackMessage> requestVector; private boolean needSendNewMessage = false; public ContactState(Contact c) { contact = c; changeState(new WaitSendSummary()); } public void resetState() { otherSummary = null; requestVector = null; state = new WaitSendSummary(); state.enterState(); } private void changeState(State newState) { assert (state == null || state.getClass() != newState.getClass()); // ~ if ( state != null ) System.out.println( Simulator.time() + // " " // + id() + " state transition from " + state.getClass() + " to // new // state " + newState.getClass() ); state = newState; state.enterState(); parent.signalContactIdle(contact); } public void notifyNewMessage(ProtocolStackMessage msg) { if (otherSummary == null || !otherSummary.contains(msg)) { // ~ System.out.println( id() + " setting flag true because // of // message " + msg ); needSendNewMessage = true; } state.notifyNewMessage(); } public void sendingAborted(ProtocolStackMessage msg) { state.sendingAborted(msg);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -