📄 reliabletest.java
字号:
for (int i = 0; i < args.length; i++) { if (args[i].equals("-quiet")) { IS_QUIET = true; } else if (args[i].equals("-sender")) { IS_SENDER = true; } else if (args[i].equals("-receiver")) { IS_SENDER = false; } else if (args[i].equals("-server")) { IS_SENDER = false; IS_SERVER = true; } else if (args[i].equals("-waitrdv")) { waitRdv = true; } else if (args[i].equals("-delay") && i + 1 < args.length) { String delayStr = args[++i]; try { DELAY = Integer.parseInt(delayStr); } catch (NumberFormatException ex) { System.err.println("Invalid delay: " + delayStr + USAGE); return; } } else if (args[i].equals("-iterations") && i + 1 < args.length) { String iterStr = args[++i]; try { ITERATIONS = Integer.parseInt(iterStr); } catch (NumberFormatException ex) { System.err.println("Invalid iterations: " + iterStr + USAGE); return; } } else if (args[i].equals("-drop") && i + 1 < args.length) { String dropStr = args[++i]; try { DROP_MSG = Integer.parseInt(dropStr); } catch (NumberFormatException ex) { System.err.println("Invalid drop message: " + dropStr + USAGE); return; } } else if (args[i].equals("-bw") && i + 1 < args.length) { String bwStr = args[++i]; try { BW_LIMIT = Integer.parseInt(bwStr); } catch (NumberFormatException ex) { System.err.println("Invalid bw: " + bwStr + USAGE); return; } } else if (args[i].equals("-pl") && i + 1 < args.length) { String plStr = args[++i]; try { PIPE_LEN = Integer.parseInt(plStr); } catch (NumberFormatException ex) { System.err.println("Invalid pl: " + plStr + USAGE); return; } } else if (args[i].equals("-lat") && i + 1 < args.length) { String latStr = args[++i]; try { LATENCY = Integer.parseInt(latStr); } catch (NumberFormatException ex) { System.err.println("Invalid lat: " + latStr + USAGE); return; } } else if (args[i].equals("-minload") && i + 1 < args.length) { String minlStr = args[++i]; try { MIN_LOAD = Integer.parseInt(minlStr); } catch (NumberFormatException ex) { System.err.println("Invalid minload: " + minlStr + USAGE); return; } } else if (args[i].equals("-maxload") && i + 1 < args.length) { String maxlStr = args[++i]; try { MAX_LOAD = Integer.parseInt(maxlStr); } catch (NumberFormatException ex) { System.err.println("Invalid maxload: " + maxlStr + USAGE); return; } } else if (args[i].equals("-password") && i + 1 < args.length) { PASSWORD = args[++i]; } else if (args[i].equals("-principal") && i + 1 < args.length) { PRINCIPAL = args[++i]; } else if (args[i].equals("-name") && i + 1 < args.length) { MSG_PIPE_NAME = args[++i] + "MsgPipe"; ACK_PIPE_NAME = args[i] + "AckPipe"; } else if (args[i].equals("-debug")) { DEBUG = true; } else if (args[i].equals("-adapt")) { ADAPTIVE = true; } else if (args[i].equals("-help")) { System.err.println(USAGE); System.err.println(HELP); return; } } System.out.println( (IS_SENDER ? "Sender" : "Receiver") + "\n--------" + "\n quiet: " + IS_QUIET + "\n delay: " + DELAY + "\n iterations: " + ITERATIONS + "\n drop: " + DROP_MSG + "\n bw: " + BW_LIMIT + "\n pl: " + PIPE_LEN + "\n latency: " + LATENCY + "\n min load: " + MIN_LOAD + "\n max load: " + MAX_LOAD + "\n adaptive: " + ADAPTIVE + "\n debug: " + DEBUG); } public void rendezvousEvent(RendezvousEvent event) { synchronized (rdvConnectLock) { rdvConnectLock.notifyAll(); } } public void test() { if (IS_SENDER) { doSender(); longPause(); longPause(); doSender(); } else { do { doReceiver(); } while (IS_SERVER); } } private PipeAdvertisement createPipeAdv(String pipeName) { PipeAdvertisement padv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType()); String pipeType = PipeService.UnicastType; String composite = pipeName + pipeType; byte[] seed = Integer.toHexString(composite.hashCode()).getBytes(); PeerGroupID pgID = netPeerGroup.getPeerGroupID(); PipeID pipeId = IDFactory.newPipeID(pgID, seed); padv.setName(pipeName); padv.setPipeID(pipeId); padv.setType(pipeType); return padv; } private void pause() { synchronized (this) { try { wait(DELAY); } catch (InterruptedException e) {} } } private void longPause() { try { synchronized (this) { wait(10 * DELAY); } } catch (InterruptedException e) {} } private void doSender() { try { int sequence = 0; // Create the ros and the outgoing adaptor to go with it before // the outputpipe exists. The pipe will be set when ready. In the // meantime, send() would just fail or block. However, in the sender // we do not start before the pipes are created; we do not need to. // The same trick is more usefull to the receiver. outgoing = new OutgoingPipeAdaptorSync(null); ros = ADAPTIVE ? new ReliableOutputStream(outgoing, new AdaptiveFlowControl()) : new ReliableOutputStream(outgoing, new FixedFlowControl(40)); for (int i = 0; i < ITERATIONS; i++) { // if we do not already have it resolved, retry // to open the output pipe every so often if (outputPipe == null) { pause(); if ((i % 11) == 0) { PipeAdvertisement padv = IS_SENDER ? msgPipeAdv : ackPipeAdv; if (padv == null) { discoverySvc.getRemoteAdvertisements(null, DiscoveryService.ADV, "Name", MSG_PIPE_NAME, 10, this); discoverySvc.getRemoteAdvertisements(null, DiscoveryService.ADV, "Name", ACK_PIPE_NAME, 10, this); if (DEBUG) { System.out.println("launched remote discovery for " + MSG_PIPE_NAME + " and " + ACK_PIPE_NAME); } // wait for discovery response to come in continue; } if (DEBUG) { System.out.println("re-resolving output pipe " + padv.getName()); } try { pipeSvc.createOutputPipe(padv, this); } catch (IOException ex) { System.err.println(ex.getMessage()); return; } } } // No need to start before we could supply the pipes; messages // would go nowhere and/or the other side would not be able to // resolve the ack pipe. if (outputPipe == null || inputPipe == null) { continue; } } System.out.print("Sending..."); System.out.flush(); for (int i = 0; i <= ITERATIONS; i++) { Message msg = new Message(); if (i == ITERATIONS) { msg.addMessageElement(new StringMessageElement(MESSAGE_TAG, "mclose", null)); } else { String messageId = "m" + Integer.toString(nextMessageId++); msg.addMessageElement(new StringMessageElement(MESSAGE_TAG, messageId, null)); // add a random load element int index = random.nextInt(loadElements.size()); byte[] le = (byte[]) loadElements.get(index); MessageElement elm = new ByteArrayMessageElement(PAYLOAD_TAG, MIME_BINARY, le, null); msg.addMessageElement(elm); } msg.addMessageElement(new StringMessageElement(SENT_AT_TAG, Long.toString(System.currentTimeMillis()), null)); try { sequence = ros.send(msg); // System.out.println(messageId + " " + // msg.getByteLength() + "b " + // sequence + "seq " + // ros.getMaxAck() + "ack"); } catch (Throwable e) { e.printStackTrace(); return; } } System.out.print("closing..."); System.out.flush(); while (ros.getMaxAck() != sequence) { pause(); } ros.close(); inputPipe.close(); outputPipe.close(); msgPipeAdv = null; ackPipeAdv = null; outputPipe = null; inputPipe = null; outgoing = null; incoming = null; ros = null; System.out.println("Done"); } catch (Throwable t) { t.printStackTrace(); } } public void discoveryEvent(DiscoveryEvent event) { Enumeration ae = event.getResponse().getResponses(); while (ae.hasMoreElements()) { String str = (String) ae.nextElement(); // create Advertisement from response Advertisement adv = null; try { adv = AdvertisementFactory.newAdvertisement(MimeMediaType.XMLUTF8, new StringReader(str)); } catch (IOException ex) { System.err.println("error parsing discovery response"); System.err.println(ex.getMessage()); continue; } if (adv instanceof PipeAdvertisement) { PipeAdvertisement pipeAdv = (PipeAdvertisement) adv; String pipeName = pipeAdv.getName(); if (MSG_PIPE_NAME.equals(pipeName)) { msgPipeAdv = pipeAdv; if (DEBUG) { System.out.println("discovered msg pipe: " + pipeName); } try { pipeSvc.createOutputPipe(msgPipeAdv, this); if (DEBUG) { System.out.println("opened msg pipe for output"); } } catch (IOException ex) { System.err.println(ex.getMessage()); } } else if (ACK_PIPE_NAME.equals(pipeName)) { ackPipeAdv = pipeAdv; if (DEBUG) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -