📄 udpbench.java
字号:
//ORANGE: deal with this case if put in timers to avoid infinite loops if all // pkts in a volley are droppped throw new IOException("expecting pkt: "+seqnum+", but received: "+ret); } } } if (DEBUG) System.err.println("["+i+"] Receiver: Sending message "+seqnum+"..."); putInt(barr, seqnum); if (nonblocking) { for(int k=0; k<PACKET_BURST_SIZE; k++) { if (useselect) { while ((write_selitem.revents & Selectable.WRITE_READY) == 0) { write_selset.select(SELECT_TIMEOUT); } write_selitem.revents = 0; } DatagramPacket q = new DatagramPacket(barr, 0, barr.length, REMADDR, PORTNUM); nbsock.nbSend(q); } } else { /* Blocking */ DatagramPacket q = new DatagramPacket(barr, MESSAGE_SIZE, REMADDR, PORTNUM); for(int k=0; k<PACKET_BURST_SIZE; k++) { sock.send(q); } } seqnum++; } //outer while(true) that loops until get sentinel t2 = System.currentTimeMillis(); printResults("Pingpong test", received, MESSAGE_SIZE, t1, t2); } } /* what doBandwidth does: * sender starts off, sends NUM_MESSAGES packets to the receiver, then sends * a burst of sentinel packets, and then waits for an ack (another * sentinel packet) from the receiver. the receiver starts listening for * any packets. * it's possible for some packets to be floating around from pingpong, so * it's important to change the size of MESSAGE_SIZE to be something different * (in main, i do a +1). if received pkt.size == MESSAGE_SIZE, increase our * counter, if pkt.size == ACK_BAND, then the sender is done sending, so * fire off a volley of ACK_BAND sized packets back at the sender as an ack. * any other size is probably some remnant from the ping pong test, so just * ignore it. */ private static void doBandwidth() throws IOException { NonblockingDatagramSocket nbsock = null; byte barr[] = new byte[MESSAGE_SIZE]; byte barr2[] = new byte[ACK_BAND]; int i; long t1, t2; if (nonblocking) { nbsock = (NonblockingDatagramSocket)sock; } for (i = 0; i < barr.length; i++) { barr[i] = (byte)(i & 0xff); } System.err.println("Starting bandwidth test: message size "+MESSAGE_SIZE+", num messages "+NUM_MESSAGES); if (sending) { t1 = System.currentTimeMillis(); for (i = 0; i < NUM_MESSAGES; i++) { DatagramPacket p = new DatagramPacket(barr, MESSAGE_SIZE, REMADDR, PORTNUM); if (DEBUG2) System.err.println("["+i+"] Sender: Sending message..."); if (nonblocking) { if (useselect) { while ((write_selitem.revents & Selectable.WRITE_READY) == 0) { write_selset.select(SELECT_TIMEOUT); } write_selitem.revents = 0; } nbsock.nbSend(p); } else { /* Blocking */ sock.send(p); } } if (DEBUG) System.err.println("Sender: Sending sentinel packets...."); // hope that PACKET_BURST_SIZE packets will do it for(i=0; i<PACKET_BURST_SIZE; i++) { DatagramPacket sent = new DatagramPacket(barr2, ACK_BAND, REMADDR, PORTNUM); if (nonblocking) { if (useselect) { while ((write_selitem.revents & Selectable.WRITE_READY) == 0) { write_selset.select(SELECT_TIMEOUT); } write_selitem.revents = 0; } nbsock.nbSend(sent); } else { sock.send(sent); } } if (DEBUG) System.err.println("Sender: Listening for ack...."); while(true) { DatagramPacket p = new DatagramPacket(barr, MESSAGE_SIZE); if (nonblocking) { if (useselect) { while ((read_selitem.revents & Selectable.READ_READY) == 0) { read_selset.select(SELECT_TIMEOUT); } read_selitem.revents = 0; } p.setLength(MESSAGE_SIZE); nbsock.nbReceive(p); } else { /* Blocking */ sock.receive(p); } //got confirmation, so stop if(p.getLength() == ACK_BAND) break; } t2 = System.currentTimeMillis(); printResults("Bandwidth test", NUM_MESSAGES, MESSAGE_SIZE, t1, t2); } else { /* Receiving */ int received = 0; //counter that keeps track of actual # received if (DEBUG2) System.err.println("["+i+"] Receiver: Receiving message..."); t1 = System.currentTimeMillis(); while(true) { DatagramPacket p = new DatagramPacket(barr, MESSAGE_SIZE); if (nonblocking) { if (useselect) { while ((read_selitem.revents & Selectable.READ_READY) == 0) { read_selset.select(SELECT_TIMEOUT); } read_selitem.revents = 0; } p.setLength(MESSAGE_SIZE); nbsock.nbReceive(p); } else { /* Blocking */ sock.receive(p); } if(p.getLength() == ACK_BAND) break; else if(p.getLength() == MESSAGE_SIZE) received++; } if (DEBUG) System.err.println("Receiver: Sending ack..."); for(int k=0; k<PACKET_BURST_SIZE; k++) { DatagramPacket p = new DatagramPacket(barr2, ACK_BAND, REMADDR, PORTNUM); if (nonblocking) { if (useselect) { while ((write_selitem.revents & Selectable.WRITE_READY) == 0) { write_selset.select(SELECT_TIMEOUT); } write_selitem.revents = 0; } nbsock.nbSend(p); } else { /* Blocking */ sock.send(p); } } t2 = System.currentTimeMillis(); printResults("Bandwidth test", received, MESSAGE_SIZE, t1, t2); } } public static void usage() { System.err.println("Usage: java UDPBench [-n] [-s] [send | recv] <remote node> <num messages> <message size>"); System.err.println("Options:"); System.err.println(" -n\t\tUse nonblocking sockets"); System.err.println(" -s\t\tUse select/poll interface (implies -n)"); } public static void main(String args[]) { ServerSocket servsock = null; NonblockingServerSocket nbservsock = null; try { if ((args.length < 4) || (args.length > 5)) { usage(); System.exit(-1); } sending = false; nonblocking = false; useselect = false; int n; for (n = 0; n < 2; ) { if (args[n].equals("-n")) { nonblocking = true; n++; } else if (args[n].equals("-s")) { useselect = true; nonblocking = true; n++; } else { break; } } if (args[n].equals("send")) sending = true; REMADDR = InetAddress.getByName(args[n+1]); NUM_MESSAGES = Integer.valueOf(args[n+2]).intValue(); MESSAGE_SIZE = Integer.valueOf(args[n+3]).intValue(); if(MESSAGE_SIZE < ACK_BAND) { // to make sure we don't send a sentinel packet by mistake MESSAGE_SIZE = ACK_BAND+1; } System.err.println("Connecting to "+args[n+1]+":"+PORTNUM+", at: "+REMADDR); boolean connected = false; while (!connected) { try { if (nonblocking) { sock = new NonblockingDatagramSocket(PORTNUM); ((NonblockingDatagramSocket)sock).connect(REMADDR, PORTNUM); } else { sock = new DatagramSocket(PORTNUM); sock.connect(REMADDR, PORTNUM); } if (nonblocking && useselect) { read_selset = new SelectSet(); read_selitem = new SelectItem((NonblockingDatagramSocket)sock, (short)Selectable.READ_READY); read_selset.add(read_selitem); write_selset = new SelectSet(); write_selitem = new SelectItem((NonblockingDatagramSocket)sock, (short)Selectable.WRITE_READY); write_selset.add(write_selitem); } connected = true; } catch (IOException e) { } if (!connected) { try { Thread.currentThread().sleep(100); } catch (InterruptedException e) { } } } System.err.println("Connected....."); doPingpong(); MESSAGE_SIZE++; doBandwidth(); sock.close(); } catch (Exception e) { System.err.println("TCPBench: Got exception: "+e.getMessage()); e.printStackTrace(); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -