⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 udpbench.java

📁 The Staged Event-Driven Architecture (SEDA) is a new design for building scalable Internet services.
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
	      //ORANGE: deal with this case if put in timers to avoid infinite loops if all	      // pkts in a volley are droppped	      throw new IOException("expecting pkt: "+seqnum+", but received: "+ret);	    }	  }	}	if (DEBUG) System.err.println("["+i+"] Receiver: Sending message "+seqnum+"...");	putInt(barr, seqnum);	if (nonblocking) {	  for(int k=0; k<PACKET_BURST_SIZE; k++) {	    if (useselect) {	      while ((write_selitem.revents & Selectable.WRITE_READY) == 0) {		write_selset.select(SELECT_TIMEOUT);	      }	      write_selitem.revents = 0;	    }	    DatagramPacket q = new DatagramPacket(barr, 0, barr.length, REMADDR, PORTNUM);	    nbsock.nbSend(q);	  }	} else {	  /* Blocking */	  DatagramPacket q = new DatagramPacket(barr, MESSAGE_SIZE, REMADDR, PORTNUM);	  for(int k=0; k<PACKET_BURST_SIZE; k++) {	    sock.send(q);	  }	}	seqnum++;      } //outer while(true) that loops until get sentinel      t2 = System.currentTimeMillis();      printResults("Pingpong test", received, MESSAGE_SIZE, t1, t2);    }  }    /* what doBandwidth does:   * sender starts off, sends NUM_MESSAGES packets to the receiver, then sends   * a burst of sentinel packets, and then waits for an ack (another    * sentinel packet) from the receiver. the receiver starts listening for    * any packets.    * it's possible for some packets to be floating around from pingpong, so   * it's important to change the size of MESSAGE_SIZE to be something different   * (in main, i do a +1). if received pkt.size == MESSAGE_SIZE, increase our   * counter, if pkt.size == ACK_BAND, then the sender is done sending, so   * fire off a volley of ACK_BAND sized packets back at the sender as an ack.   * any other size is probably some remnant from the ping pong test, so just   * ignore it.   */  private static void doBandwidth() throws IOException {    NonblockingDatagramSocket nbsock = null;    byte barr[] = new byte[MESSAGE_SIZE];    byte barr2[] = new byte[ACK_BAND];    int i;    long t1, t2;    if (nonblocking) {      nbsock = (NonblockingDatagramSocket)sock;    }    for (i = 0; i < barr.length; i++) {      barr[i] = (byte)(i & 0xff);    }    System.err.println("Starting bandwidth test: message size "+MESSAGE_SIZE+", num messages "+NUM_MESSAGES);    if (sending) {      t1 = System.currentTimeMillis();      for (i = 0; i < NUM_MESSAGES; i++) {	DatagramPacket p = new DatagramPacket(barr, MESSAGE_SIZE, REMADDR, PORTNUM);        if (DEBUG2) System.err.println("["+i+"] Sender: Sending message...");	if (nonblocking) {	  if (useselect) {	    while ((write_selitem.revents & Selectable.WRITE_READY) == 0) {	      write_selset.select(SELECT_TIMEOUT);	    }	    write_selitem.revents = 0;	  }	  nbsock.nbSend(p);        } else {	  /* Blocking */	  sock.send(p);	}      }      if (DEBUG) System.err.println("Sender: Sending sentinel packets....");      // hope that PACKET_BURST_SIZE packets will do it      for(i=0; i<PACKET_BURST_SIZE; i++) {	DatagramPacket sent = new DatagramPacket(barr2, ACK_BAND, REMADDR, PORTNUM);	if (nonblocking) {	  if (useselect) {            while ((write_selitem.revents & Selectable.WRITE_READY) == 0)  {              write_selset.select(SELECT_TIMEOUT);            }	    write_selitem.revents = 0;          }	  nbsock.nbSend(sent);	} else {	  sock.send(sent);	}      }      if (DEBUG) System.err.println("Sender: Listening for ack....");      while(true) {	DatagramPacket p = new DatagramPacket(barr, MESSAGE_SIZE);	if (nonblocking) {	  if (useselect) {            while ((read_selitem.revents & Selectable.READ_READY) == 0)  {              read_selset.select(SELECT_TIMEOUT);            }	    read_selitem.revents = 0;          }	  p.setLength(MESSAGE_SIZE);	  nbsock.nbReceive(p);	} else {	  /* Blocking */	  sock.receive(p);	}	//got confirmation, so stop	if(p.getLength() == ACK_BAND) break;      }      t2 = System.currentTimeMillis();      printResults("Bandwidth test", NUM_MESSAGES, MESSAGE_SIZE, t1, t2);    } else {      /* Receiving */      int received = 0;  //counter that keeps track of actual # received      if (DEBUG2) System.err.println("["+i+"] Receiver: Receiving message...");      t1 = System.currentTimeMillis();      while(true) {	DatagramPacket p = new DatagramPacket(barr, MESSAGE_SIZE);        if (nonblocking) {	  if (useselect) {	    while ((read_selitem.revents & Selectable.READ_READY) == 0) {	      read_selset.select(SELECT_TIMEOUT);	    }	    read_selitem.revents = 0;	  }	  p.setLength(MESSAGE_SIZE);	  nbsock.nbReceive(p);	} else {          /* Blocking */	  sock.receive(p);        }	if(p.getLength() == ACK_BAND) break;	else if(p.getLength() == MESSAGE_SIZE) received++;      }      if (DEBUG) System.err.println("Receiver: Sending ack...");      for(int k=0; k<PACKET_BURST_SIZE; k++) {	DatagramPacket p = new DatagramPacket(barr2, ACK_BAND, REMADDR, PORTNUM);	if (nonblocking) {	  if (useselect) {	    while ((write_selitem.revents & Selectable.WRITE_READY) == 0) {  	      write_selset.select(SELECT_TIMEOUT);            }	    write_selitem.revents = 0;	  }	  nbsock.nbSend(p);	} else {	  /* Blocking */	  sock.send(p);	}      }      t2 = System.currentTimeMillis();      printResults("Bandwidth test", received, MESSAGE_SIZE, t1, t2);    }  }  public static void usage() {    System.err.println("Usage: java UDPBench [-n] [-s] [send | recv] <remote node> <num messages> <message size>");    System.err.println("Options:");    System.err.println("  -n\t\tUse nonblocking sockets");    System.err.println("  -s\t\tUse select/poll interface (implies -n)");  }  public static void main(String args[]) {    ServerSocket servsock = null;    NonblockingServerSocket nbservsock = null;    try {      if ((args.length < 4) || (args.length > 5)) {	usage();	System.exit(-1);      }      sending = false;      nonblocking = false;      useselect = false;      int n;      for (n = 0; n < 2; ) {        if (args[n].equals("-n")) {          nonblocking = true;	  n++;        } else if (args[n].equals("-s")) {	  useselect = true;	  nonblocking = true;	  n++;	} else {	  break;	}      }      if (args[n].equals("send")) sending = true;      REMADDR  = InetAddress.getByName(args[n+1]);      NUM_MESSAGES = Integer.valueOf(args[n+2]).intValue();      MESSAGE_SIZE = Integer.valueOf(args[n+3]).intValue();      if(MESSAGE_SIZE < ACK_BAND) {	// to make sure we don't send a sentinel packet by mistake	MESSAGE_SIZE = ACK_BAND+1;      }            System.err.println("Connecting to "+args[n+1]+":"+PORTNUM+",  at: "+REMADDR);      boolean connected = false;      while (!connected) {	try {	  if (nonblocking) {	      sock = new NonblockingDatagramSocket(PORTNUM);	      ((NonblockingDatagramSocket)sock).connect(REMADDR, PORTNUM);	  } else {	      sock = new DatagramSocket(PORTNUM);	      sock.connect(REMADDR, PORTNUM);	  }	  if (nonblocking && useselect) {	    read_selset = new SelectSet();	    read_selitem = new SelectItem((NonblockingDatagramSocket)sock, (short)Selectable.READ_READY);	    read_selset.add(read_selitem);	    write_selset = new SelectSet();	    write_selitem = new SelectItem((NonblockingDatagramSocket)sock, (short)Selectable.WRITE_READY);	    write_selset.add(write_selitem);	  }	  connected = true;	} catch (IOException e) {	}	if (!connected) {	  try {	    Thread.currentThread().sleep(100);	  } catch (InterruptedException e) {	  }	}      }      System.err.println("Connected.....");            doPingpong();      MESSAGE_SIZE++;      doBandwidth();      sock.close();    } catch (Exception e) {      System.err.println("TCPBench: Got exception: "+e.getMessage());      e.printStackTrace();    }  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -