📄 reliabletest.java
字号:
System.out.println("discovered ack pipe: " + pipeName); } try { inputPipe = pipeSvc.createInputPipe(ackPipeAdv, this); if (DEBUG) { System.out.println("opened ack pipe for input"); } } catch (IOException ex) { System.err.println(ex.getMessage()); } } } } } private void doReceiver() { try { if (msgPipeAdv == null) { msgPipeAdv = createPipeAdv(MSG_PIPE_NAME); discoverySvc.publish(msgPipeAdv); discoverySvc.remotePublish(msgPipeAdv); inputPipe = pipeSvc.createInputPipe(msgPipeAdv, this); } } catch (IOException ex) { fail(ex.getMessage()); } if (DEBUG) { System.out.println("published msg pipe: " + msgPipeAdv.getName()); System.out.println("opened msg pipe for input"); } // We need to give to our input a reference to our output, but, // obviously, we need to create our input before resolving the output // pipe, otherwise we do not known when to start trying to resolve. // In addition, our output must not loose messages while the pipe is // being resolved: these are acks. All the first packets would remain // un-acked, which would make for a very slow start. // Therefore we need a form of output that can be created before // the pipe. This used to be solved by interposing queues and threads. // Now, we create the outgoing adaptor without a pipe. // The pipe will be set whenever it is ready. In the meantime, the // reliable input stream will block or return false if it is used. outgoing = new OutgoingPipeAdaptorSync(null); ris = new ReliableInputStream(outgoing, 0); try { if (ackPipeAdv == null) { ackPipeAdv = createPipeAdv(ACK_PIPE_NAME); discoverySvc.publish(ackPipeAdv); discoverySvc.remotePublish(ackPipeAdv); } pipeSvc.createOutputPipe(ackPipeAdv, this); } catch (IOException ex) { fail(ex.getMessage()); } if (DEBUG) { System.out.println("published ack pipe: " + ackPipeAdv.getName()); System.out.println("opened ack pipe for output"); } System.out.print("Waiting for sender..."); System.out.flush(); while (!ris.hasNextMessage()) { pause(); } System.out.println("Receiving"); for (int i = 0; outputPipe == null;) { pause(); if ((i++ % 11) == 0) { if (DEBUG) { System.out.println("re-resolving output pipe " + ackPipeAdv.getName()); } try { pipeSvc.createOutputPipe(ackPipeAdv, this); } catch (IOException ex) { System.err.println(ex.getMessage()); } } } long startedAt = 0; long bytesTransferred = 0; long nbMsgs = 0; while (true) { Message msg = null; try { msg = ris.nextMessage(true); } catch (IOException ioe) { System.err.println("Failed to obtain next message"); ioe.printStackTrace(); } ++nbMsgs; printMessageForDebug("doReceiver", msg); String msgId = (msg.getMessageElement(MESSAGE_TAG)).toString(); String sentAt = (msg.getMessageElement(SENT_AT_TAG)).toString(); long size = msg.getByteLength(); long now = System.currentTimeMillis(); long sent = now; try { sent = Long.parseLong(sentAt); } catch (NumberFormatException nex) { System.err.println("could not parse msg send time: " + sentAt); continue; } // We start computing averages only after the first message has // been received. long throughput = 0; if (nbMsgs > (2 * ITERATIONS / 3)) { if (startedAt == 0) { startedAt = System.currentTimeMillis(); } else { long totalTime = now - startedAt; if (totalTime <= 0) { // that can't be. Some time has passed. totalTime = 1; } bytesTransferred += size; throughput = (((1000 * 8 * bytesTransferred) / totalTime) / 1024); } } long delay = now - sent; if (msgId.equals("mclose")) { System.out.println( "\nResults" + "\n-------" + "\ntransferred: " + bytesTransferred + " bytes" + "\nthroughput: " + throughput + " kbps" + "\ncongestion events: " + lostToCongestion); try { ris.close(); } catch (IOException ioe1) {} ris = null; outgoing.close(); outgoing = null; outputPipe = null; longPause(); break; } if (!IS_QUIET) { System.out.print(msgId + " " + size + "b " + delay + "ms " + throughput + "kbps avg\r"); System.out.flush(); } } } private void printMessageForDebug(String header, Message msg) { if (DEBUG) { ElementIterator iter = msg.getMessageElements(); System.out.print(header + " " + IS_SENDER + " ("); while (iter.hasNext()) { MessageElement el = iter.next(); System.out.print(el.getElementName()); if (iter.hasNext()) { System.out.print(", "); } } System.out.println(")"); } } public void pipeMsgEvent(PipeMsgEvent inputPipeEvent) { Message msg = inputPipeEvent.getMessage(); if (msg == null) { return; } printMessageForDebug("pipeMsgEvent", msg); if (dropMessage()) { if (DEBUG) { System.out.println("dropped incoming:" + (IS_SENDER ? "ack" : "msg")); } return; } if (BW_LIMIT < Integer.MAX_VALUE) { bwQueueMsg(msg); return; } // We're redirecting either to the ros or to the ris; depending // on whether we're on one side or the other of the reliable stream. // The other stream obj is null. if (ros != null) { ros.recv(msg); } else if (ris != null) { ris.recv(msg); } } public void outputPipeEvent(OutputPipeEvent outputPipeEvent) { String pid = outputPipeEvent.getPipeID(); // this will happen in the sender if (outputPipe == null && pid.equals(msgPipeAdv.getPipeID().toString())) { outputPipe = outputPipeEvent.getOutputPipe(); outgoing.setPipe(outputPipe); // the next line is not needed in this case, // since here we register 'this' as an // PipeMsgListener (Input Pipe Listener) and // 'manually' redirect the ack messages to // ros.recv() from pipeMsgEvent // incoming = new IncomingPipeAdaptor(inputPipe, ros); if (DEBUG) { System.out.println("resolved msg output pipe " + outputPipe.getName()); } } // this will happen in the receiver if (outputPipe == null && pid.equals(ackPipeAdv.getPipeID().toString())) { outputPipe = outputPipeEvent.getOutputPipe(); outgoing.setPipe(outputPipe); // the next line is not needed in this case, // since here we register 'this' as an // PipeMsgListener (Input Pipe Listener) and // 'manually' redirect the ack messages to // ros.recv() from pipeMsgEvent // incoming = new IncomingPipeAdaptor(inputPipe, ris); if (DEBUG) { System.out.println("resolved ack output pipe " + outputPipe.getName()); } } } synchronized boolean dropMessage() { return ((++dropMsgCount) % DROP_MSG) == 0; } private static final String USAGE = "\nUsage: ReliableTest <options>" + "\n" + "options:\n" + " -help outputs some usefull advice" + " -quiet only output a summary (" + IS_QUIET + ")\n" + " -sender whether to run as sender of messages (" + IS_SENDER + ")\n" + " -receiver whether to run as a receiver of messages (" + !IS_SENDER + ")\n" + " -server whether to run as a permanent receiver of messages (" + IS_SERVER + ")\n" + " -waitrdv wait for a rendezvous connection before starting (not)\n" + " -drop drop every Nth messages (on arrival) (" + DROP_MSG + ")\n" + " -bw simulated bw cap in Kbit/s (on arrival) (" + BW_LIMIT + ")\n" + " -pl simulated pipe length in bytes (only with bw) (" + PIPE_LEN + ")\n" + " -lat simulated latency in ms (only with bw) (" + LATENCY + ")\n" + " -minload smallest of the random payload sizes in bytes (" + MIN_LOAD + ")\n" + " -maxload largest of the random payload sizes in bytes (" + MAX_LOAD + ")\n" + " -name base name for the pipes (ReliableTest)\n" + " -debug whether to turn on debugging in the peer (" + DEBUG + ")\n" + " -adapt Use adaptive flow control (do not)\n" + " -iterations number of times to send a message (" + ITERATIONS + ")\n" + " -delay Basic delay unit (" + DELAY + ")\n" + " -principal net.jxta.tls.principal property (" + PRINCIPAL + ")\n" + " -password net.jxta.tls.password property (" + PASSWORD + ")\n"; private static final String HELP = "Some options serve to simulate particular network conditions.\n" + "These conditions are simulated on the destination side of a\n" + "link. As a result, the options given to the receiver will\n" + "control the behaviour of the data channel, while the options\n" + "to the sender will control the behaviour of the ack channel.\n" + "\n" + "These options are the following:\n" + "-bw, as in \"bandwidth\":\n" + "\tcontrols the time it takes for the slowest segment of the path\n" + "\tto go from begining the transmition of one bit to being ready\n" + "\tto transmit the next one. It is expressed in Kbit per second.\n" + "-lat, as in \"latency\":\n" + "\tcontrols the time it takes for a bit to traverse the network in\n" + "\taddition to the time caused by the bandwidth. In real life, the\n" + "\tprincipal contributors to this time would be signal travel time\n" + "\tper the laws of physics, and packet processing time at each\n" + "\tnode along the path. The essential characteristics of latency,\n" + "\tis that over one given packet's latency time, a number of other\n" + "\tpackets can begin traveling as well.\n" + "-pl, as in \"path length\":\n" + "\tcontrols the actual amount of data that can be traveling along\n" + "\tthe network path (in bytes). In real life, this is the\n" + "\tproduct bandwidth * latency + some buffering, but the three\n" + "\tvalues can be chosen differently in order to simulate extreme\n" + "\tconditions. For example, most often pl would be larger than\n" + "\tthe bandwidth*latency product to reflect buffering capacity at\n" + "\tvarious nodes along the path. Under non-congested conditions\n" + "\tthese buffers are used to store at most one pending packet\n" + "\ton each node, which ensures back-to-back link utilization\n" + "\tdespite statistical variations. In that case, buffering has\n" + "\tno noticeable effect on latency. However, buffering space\n" + "\tcan retard congestion when the capacity of a node is exceeded\n" + "\t(and the apparent latency will increase).\n" + "\tIn practice, under simulated conditions on a single host, it\n" + "\tis impossible to keep the network path fully utilized if\n" + "\tpath length is less than twice bandwidth*latency. Any\n" + "\trealistic network has at least one extra buffer at each node.\n" + "\tSetting a path length smaller than the bandwidth*latency\n" + "\tproduct is somewhat artifical. It roughly corresponds to some\n" + "\tnode along the path delaying packets with no buffer to store\n" + "\tthem while they wait. In effect it results in the bandwidth\n" + "\tlowering to match, but makes congestions harder to predict. So\n" + "\tit is usefull for testing congestion recovery mechanisms.\n";}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -