📄 epidemicstatemachinenode.java
字号:
} public void idleContact() { // ~ System.out.println( "ContactState::idleContact() state = " // + // state.getClass() ); state.idleContact(); } public boolean handleLocalMessage(ProtocolStackMessage m) { return state.handleLocalMessage(m); } private abstract class AbstractState implements State { public AbstractState() { } public void enterState() { if (parent.getNetwork().vShouldLog(Verbose.DEBUG4)) parent.getNetwork().vprint("NEW_STATE: " + this.getClass().getName() + " in " + parent); } public void idleContact() { } public void notifyNewMessage() { } public void sendingAborted(ProtocolStackMessage m) { } public boolean handleLocalMessage(ProtocolStackMessage m) { assert !m.isRoutingMessage(); return false; } } private abstract class HandleSummary extends AbstractState { public boolean handleLocalMessage(ProtocolStackMessage m) { assert (otherSummary == null); // Store the summary vector for later (ignoring unchecked // warning) @SuppressWarnings("unchecked") ArrayList<? extends ProtocolStackMessage> summary = (ArrayList<? extends ProtocolStackMessage>) m .getData(MSG_SUMMARY_VECTOR_KEY); if (summary != null) { otherSummary = summary; // We did handle this message return true; } return false; // we did not handle this message } } private class WaitSendSummary extends HandleSummary { public WaitSendSummary() { super(); } public void enterState() { super.enterState(); // We are sending a summary vector, which means we expect to // receive // a new request vector: forget our current request requestVector = null; } public void idleContact() { // The other node has our most up to date information needSendNewMessage = false; // This is a newly connected node: send the summary vector // NOTE: This assumes 32-bit message IDs (following Vahdat // and // Becker) // (msg id, msg size, msg timestamp) = 12 bytes // it also adds a bit of overhead (4 bytes: message length) int size = buffer.size() * 12 + 4; ProtocolStackMessage msg = new ProtocolStackMessage(parent.getNetwork().getNextRoutingMessageId(), parent.getNetwork().getCurrentTime(), parent, contact.getDest(), 0, size, true); msg.setProtocolID(PROTO_EP_ID); // This sends the current snapshot of the message queue // contents (summary vector) msg.storeData(MSG_SUMMARY_VECTOR_KEY, new ArrayList<ProtocolStackMessage>(buffer)); if (parent.getNetwork().vShouldLog(Verbose.DEBUG5)) parent.getNetwork().vprint("ADV (" + parent + ") to " + contact + " messages: " + buffer); contact.sendMessage(msg); changeState(new WaitForSummary()); } } private class WaitForSummary extends HandleSummary { public WaitForSummary() { super(); } public void enterState() { super.enterState(); // We already have the summary vector: send out the request // vector if (otherSummary != null) changeState(new WaitSendRequest()); } public boolean handleLocalMessage(ProtocolStackMessage m) { // Check if this is a summary vector: the first time // through, it // *must* be, // but it is possible that we are doing this while the other // node is sending us messages: // We have received a new message and need to inform the // other // node about it. boolean result = super.handleLocalMessage(m); if (result) { // Next step: send out the request vector changeState(new WaitSendRequest()); // We did handle this message return true; } return false; } } /** * This class exists to share the message handling between the * WaitSendRequest and WaitForRequest classes. */ private abstract class HandleRequest extends AbstractState { public boolean handleLocalMessage(ProtocolStackMessage m) { @SuppressWarnings("unchecked") ArrayList<? extends ProtocolStackMessage> temp = (ArrayList<? extends ProtocolStackMessage>) m .getData(MSG_REQUEST_KEY); if (temp != null) { assert requestVector == null; // get the request vector requestVector = temp; // We *must* get a request vector before any messages // from // the // other node arrive assert requestVector != null; // We did handle this message return true; } return false; } } private class WaitSendRequest extends HandleRequest { // Create and signal the idle contact public WaitSendRequest() { super(); } public void idleContact() { assert (otherSummary != null); int bufSize = buffer.size() + otherSummary.size(); if (bufSize < 1) bufSize = 1; ArrayList<ProtocolStackMessage> request = new ArrayList<ProtocolStackMessage>(); int remCap = parent.availableCapacity() + bufferSize; Iterator<? extends ProtocolStackMessage> iS = otherSummary.iterator(); Iterator<? extends ProtocolStackMessage> iB = buffer.iterator(); ProtocolStackMessage othM = null; ProtocolStackMessage bufM = null; while (iS.hasNext() && (parent.bufferCapacity() < 0 || remCap > 0)) { if (othM == null) othM = iS.next(); if (bufM == null & iB.hasNext()) bufM = iB.next(); if (othM.getDestNode() == parent) { if (!parent.receivedMessage(othM)) { request.add(0, othM); } othM = null; } else { // there is no bufM - we used all our current // buffer, or the next "other" message is newer than // our current newer one. if (bufM == null || othM.getCreationTime() > bufM.getCreationTime()) { if (!bufferContains(othM) && !hadMessages.contains(othM.getId())) { request.add(othM); remCap -= othM.getLength(); } othM = null; } else { // We use buffer space for our own message remCap -= bufM.getLength(); bufM = null; } } } // if(iS.hasNext()) // System.out.println("BUF!"); // Package this up in a request message (message ids + total // length) int size = request.size() * 4 + 4; // ~ System.out.println( selfNode() + " requesting: " + // request // ); ProtocolStackMessage requestMsg = new ProtocolStackMessage(parent.getNetwork() .getNextRoutingMessageId(), parent.getNetwork().getCurrentTime(), parent, contact.getDest(), 0, size, true); requestMsg.setProtocolID(PROTO_EP_ID); requestMsg.storeData(MSG_REQUEST_KEY, request); if (parent.getNetwork().vShouldLog(Verbose.DEBUG5)) parent.getNetwork().vprint( "REQ (" + parent + ") from " + contact + " messages: " + request + "; HAVE messages: " + buffer); contact.sendMessage(requestMsg); // Wait for the request from the other node changeState(new WaitForRequest()); } } private class WaitForRequest extends HandleRequest { public WaitForRequest() { super(); } public void enterState() { super.enterState(); // We already have the request vector if (requestVector != null) { changeState(new SendingMessages()); } } public boolean handleLocalMessage(ProtocolStackMessage m) { boolean result = super.handleLocalMessage(m); if (result) { changeState(new SendingMessages()); return true; } return false; } } private abstract class HandleSummaryInterruption extends AbstractState { public boolean handleLocalMessage(ProtocolStackMessage msg) { // See if there is a summary vector @SuppressWarnings("unchecked") ArrayList<? extends ProtocolStackMessage> summary = (ArrayList<? extends ProtocolStackMessage>) msg .getData(MSG_SUMMARY_VECTOR_KEY); if (summary != null) { // We got a new summary vector! otherSummary = summary; // Go send a summary vector, and do *not* go to // WaitForSummary, but go // directly to WaitSendRequest changeState(new WaitSendSummary()); return true; } // We did not handle this message return false; } } private class SendingMessages extends HandleSummaryInterruption { // Create and signal the idle contact public SendingMessages() { super(); } public void idleContact() { // ~ System.out.println( id() + " // SendingMessages.idleContact() // request: " + requestVector + " buffer " + buffer ); // Go through the request list and find a message to send for (Iterator<? extends ProtocolStackMessage> i = requestVector.iterator(); i.hasNext();) { ProtocolStackMessage m = i.next(); ProtocolStackMessage bm = getBufferedMsg(m.getId()); if (bm != null) { // This message is in the buffer, and in the request // list remove it from the request: we don't want to // send it again i.remove(); contact.sendMessage(bm); return; } } // If we get here, none of the messages in the request list // are in our buffer OR the request list is empty: Go to the // idle state. changeState(new Idle()); } public void sendingAborted(ProtocolStackMessage m) { assert requestVector != null; ArrayList<ProtocolStackMessage> al = new ArrayList<ProtocolStackMessage>(requestVector); al.add(0, m); requestVector = al; } } private class Idle extends HandleSummaryInterruption { public Idle() { super(); } public void enterState() { super.enterState(); // Check if we already have new messages to send notifyNewMessage(); } public void sendingAborted(ProtocolStackMessage m) { if (requestVector != null) { ArrayList<ProtocolStackMessage> al = new ArrayList<ProtocolStackMessage>(requestVector); al.add(0, m); requestVector = al; changeState(new SendingMessages()); parent.signalContactIdle(contact); } else { if (parent.getNetwork().vShouldLog(Verbose.ERR)) parent.getNetwork().vprint("MESSAGE_LOST: " + m + " in " + parent); } } public void notifyNewMessage() { if (needSendNewMessage) { // As part of the protocol exchange, we need to wait for // the // summary vector from the other node otherSummary = null; // We have new messages for the other node changeState(new WaitSendSummary()); } } } } } private interface State { public void enterState(); public void idleContact(); public void sendingAborted(ProtocolStackMessage m); public void notifyNewMessage(); public boolean handleLocalMessage(ProtocolStackMessage m); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -