⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 multiclientthreaded.java

📁 The Staged Event-Driven Architecture (SEDA) is a new design for building scalable Internet services.
💻 JAVA
字号:
/*  * Copyright (c) 2000 by Matt Welsh and The Regents of the University of  * California. All rights reserved. * * Permission to use, copy, modify, and distribute this software and its * documentation for any purpose, without fee, and without written agreement is * hereby granted, provided that the above copyright notice and the following * two paragraphs appear in all copies of this software. *  * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *  * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. * * Author: Matt Welsh <mdw@cs.berkeley.edu> *  *//*  * This program creates a number of outgoing socket connections * (each handled by its own thread), and writes bursts of packets to * each socket. After each burst a second burst of messages is received,  * and the next burst is sent. Using the -q option makes the client  * only open a connection, and not send any data; this is used  * to measure the overhead of idle connections to a server. */import java.net.*;import java.io.*;import java.util.*;import seda.util.*;public class MultiClientThreaded extends Thread {  private static final boolean DEBUG = false;  private static final int PORTNUM = 5721;  private static final boolean OVERALL_REPORT = true;  private static final boolean VERBOSE_REPORT = false;  private static final boolean HISTOGRAM_REPORT = true;  private static String REMOTE_HOST;  private static int NUM_CONNECTIONS;  private static int SEND_MESSAGE_SIZE;  private static int RECV_MESSAGE_SIZE;  private static int SEND_BURST_SIZE;  private static int RECV_BURST_SIZE;  private static boolean QUIET;  // Time in ms between reporting measurements  private static final int BENCH_DELAY = 5000;  // Time in ms to delay before starting  private static final int SLEEP_TIME = 5000;  // Number of bench samples before we exit; if zero, run forever  private static final int NUMBER_RUNS = 20;    private Socket sock;  private int tnum;  private boolean benchthread;  private static Object lock;  private static int num_connections = 0;  private static long max_connect_time = 0;  private static long total_connect_time = 0;  private static int num_completions = 0;  private static int num_bytes = 0;  private static long max_response_time = 0;  private static long total_response_time = 0;  private static Hashtable conn_time_hist, resp_time_hist;  private static final int CONN_HIST_BUCKETSIZE = 10;  private static final int RESP_HIST_BUCKETSIZE = 10;  private static int numBenchRuns = 0;  private static int clientNumBurstsSent[];  private static int clientNumBurstsReceived[];  public MultiClientThreaded(int tnum) {    this.tnum = tnum;    this.benchthread = false;  }  public MultiClientThreaded() {    this.benchthread = true;  }  public void run() {    if (benchthread) doBenchThread();    else doClientThread();  }  private static void printResults(String msg, int numiters, int message_size, long t1, long t2) {    double usec = (t2-t1)*1.0e3;    double usecper = (usec/numiters);    double megabits = (message_size*numiters*8.0)/(1024*1024);    double bw = (megabits * 1.0e3)/(t2-t1);    System.out.println(msg+":");    System.out.println("\t"+numiters+" "+message_size+"-byte messages sent in "+usec+" usec, or "+usecper+" usec/iter.");    System.out.println("\t"+bw+" megabits/sec send bandwidth.");  }  private void doBenchThread() {    long t1, t2;    long num_conns, total_conn, max_conn;    long num_comps, num_bts, total_resp, max_resp;    while (true) {      t1 = System.currentTimeMillis();      MDWUtil.sleep(BENCH_DELAY);      if (OVERALL_REPORT) {	synchronized (lock) {	  t2 = System.currentTimeMillis();	  num_conns = num_connections; num_connections = 0;	  total_conn = total_connect_time; total_connect_time = 0;	  max_conn = max_connect_time; max_connect_time = 0;	  num_comps = num_completions; num_completions = 0;          num_bts = num_bytes; num_bytes = 0;	  total_resp = total_response_time; total_response_time = 0;	  max_resp = max_response_time; max_response_time = 0;	}	double sec = (t2-t1) * 1.0e-3;	double conns_per_sec = (double)num_conns*1.0 / sec;	double comps_per_sec = (double)num_comps*1.0 / sec;	double bytes_per_sec = (double)num_bts*1.0 / sec;	double mbit_per_sec = (bytes_per_sec * 8.0) / (1024.0 * 1024.0);	double avg_conn_time = (double)total_conn*1.0 / num_conns;	double avg_resp_time = (double)total_resp*1.0 / num_comps;	System.err.println("Connect Rate:\t"+MDWUtil.format(conns_per_sec)+" connections/sec, "+num_conns+" conns");	System.err.println("Overall rate:\t"+MDWUtil.format(comps_per_sec)+" completions/sec");	System.err.println("Bandwidth:\t"+MDWUtil.format(bytes_per_sec)+" bytes/sec, "+MDWUtil.format(mbit_per_sec)+" Mbit/sec");	System.err.println("Connect Time:\t"+MDWUtil.format(avg_conn_time)+" ms, max "+MDWUtil.format(max_conn)+" ms");	System.err.println("Response Time:\t"+MDWUtil.format(avg_resp_time)+" ms, max "+MDWUtil.format(max_resp)+" ms\n");      }      if (HISTOGRAM_REPORT) {	synchronized (conn_time_hist) {	  Enumeration e = conn_time_hist.keys();	  while (e.hasMoreElements()) {	    Integer bucket = (Integer)e.nextElement();	    int time = bucket.intValue() * CONN_HIST_BUCKETSIZE;	    int val = ((Integer)conn_time_hist.get(bucket)).intValue();	    System.err.println("CT "+time+" ms "+val+" count");	  }	  conn_time_hist.clear();	  System.err.println("\n");	}	synchronized (resp_time_hist) {	  Enumeration e = resp_time_hist.keys();	  while (e.hasMoreElements()) {	    Integer bucket = (Integer)e.nextElement();	    int time = bucket.intValue() * RESP_HIST_BUCKETSIZE;	    int val = ((Integer)resp_time_hist.get(bucket)).intValue();	    System.err.println("RT "+time+" ms "+val+" count");	  }	  resp_time_hist.clear();	  System.err.println("\n");	}      }      numBenchRuns++;      if (numBenchRuns == NUMBER_RUNS) {	if (OVERALL_REPORT) {	  System.err.println("Fairness report:");	  synchronized (lock) {	    int totalSent = 0, totalReceived = 0;	    double avgSent, avgReceived;	    for (int i = 0; i < NUM_CONNECTIONS; i++) {	      totalSent += clientNumBurstsSent[i];	      totalReceived += clientNumBurstsReceived[i];	    }	    avgSent = totalSent / NUM_CONNECTIONS;	    avgReceived = totalReceived / NUM_CONNECTIONS;	    System.err.println("Bursts sent: "+totalSent+" total, "+avgSent+" average");	    System.err.println("Bursts received: "+totalReceived+" total, "+avgReceived+" average");	    for (int i = 0; i < NUM_CONNECTIONS; i++) {	      System.err.println("Client "+i+" "+clientNumBurstsSent[i]+" sent, "+clientNumBurstsReceived[i]+" received");	    }	  }	}	System.err.println("Benchmark Finished.\n");	System.exit(0);      }    }  }  private void doClientThread() {    long tconn1 = 0, tconn2 = 0;    long total_lat = 0, total_conn_lat = 0;    try {      // Sleep for random time before making connection      Random r = new Random();      int s = (Math.abs(r.nextInt()) % SLEEP_TIME) + SLEEP_TIME;      try {	System.err.println("Client thread "+tnum+" doing initial sleep "+s+"ms");	Thread.currentThread().sleep(s);      } catch (InterruptedException ie) {	// Ignore      }      tconn1 = System.currentTimeMillis();      sock = new Socket(REMOTE_HOST, PORTNUM);      tconn2 = System.currentTimeMillis();      if (OVERALL_REPORT || HISTOGRAM_REPORT) {	long conntime = tconn2 - tconn1;	if (OVERALL_REPORT) {	  synchronized (lock) {	    if (conntime > max_connect_time) max_connect_time = conntime;	    total_connect_time += conntime;	    num_connections++;	  }	}	if (HISTOGRAM_REPORT) {	  Integer ct = new Integer((int)(conntime / CONN_HIST_BUCKETSIZE));	  synchronized (conn_time_hist) {	    Integer val = (Integer)conn_time_hist.remove(ct);	    if (val == null) {	      conn_time_hist.put(ct, new Integer(1));	    } else {	      val = new Integer(val.intValue() + 1);	      conn_time_hist.put(ct, val);	    }	  }	}      }      Thread.currentThread().yield();      if (QUIET) {	System.err.println("Client thread "+tnum+" started");	while (true) {	  try {	    Thread.currentThread().sleep(100000);	  } catch (InterruptedException ie) {	    // Ignore	  }	}      } else {	// Sleep so all the connections go through	long t1, t2;	t1 = System.currentTimeMillis();	s = (Math.abs(r.nextInt()) % SLEEP_TIME) + SLEEP_TIME;	while (System.currentTimeMillis() < (t1+s)) {	  try {	    System.err.println("Client thread "+tnum+" doing second sleep "+s+"ms");	    Thread.currentThread().sleep(s);	  } catch (InterruptedException ie) {	    // Ignore	  }	}	System.err.println("Client thread "+tnum+" started");	doBandwidth();      }      sock.close();    } catch (Exception e) {      System.err.println("Client thread "+tnum+" got exception: "+e.getMessage());      e.printStackTrace();    }  }  private void doBandwidth() throws IOException {    OutputStream os = sock.getOutputStream();    InputStream is = sock.getInputStream();    byte barr[] = new byte[SEND_MESSAGE_SIZE];    byte barr2[] = new byte[RECV_MESSAGE_SIZE];    int i = 0;    long t1, t2;    long treq1, treq2;    for (i = 0; i < barr.length; i++) {      barr[i] = (byte)(i & 0xff);    }    System.out.println("Starting client thread "+tnum+": send message size "+SEND_MESSAGE_SIZE+", send burst "+SEND_BURST_SIZE+", recv message size "+RECV_MESSAGE_SIZE+", recv burst "+RECV_BURST_SIZE);    t1 = System.currentTimeMillis();    i = 0;    while (true) {      if (DEBUG) System.err.println("["+i+"] Sender "+tnum+": Sending message...");      treq1 = System.currentTimeMillis();      os.write(barr);       os.flush();            Thread.currentThread().yield();      i++;      if ((i % SEND_BURST_SIZE) == 0) {	synchronized(lock) {	  clientNumBurstsSent[tnum]++;	}        if (DEBUG) System.err.println("Receiving burst...");        int m = 0;	while (m < RECV_BURST_SIZE) {	  int n = 0;          while (n < barr2.length) {	    int c = is.read(barr2, n, (barr2.length - n));	    n += c;	    if (DEBUG) System.err.println("  Received "+n+" bytes, total "+n);          }	  m++;	  if (DEBUG) System.err.println("  Got msg "+m+" out of "+RECV_BURST_SIZE);        }        treq2 = System.currentTimeMillis();	synchronized(lock) {	  clientNumBurstsReceived[tnum]++;	}	if (OVERALL_REPORT || HISTOGRAM_REPORT) {	  long resptime = treq2 - treq1;	  if (OVERALL_REPORT) {	    synchronized (lock) {	      if (resptime > max_response_time) max_response_time = resptime;	      total_response_time += resptime;	      num_completions++;	      num_bytes += RECV_MESSAGE_SIZE * RECV_BURST_SIZE;	    }	  }	  if (HISTOGRAM_REPORT) {	    Integer rt = new Integer((int)(resptime / RESP_HIST_BUCKETSIZE));	    synchronized (resp_time_hist) {	      Integer val = (Integer)resp_time_hist.remove(rt);	      if (val == null) {		resp_time_hist.put(rt, new Integer(1));	      } else {		val = new Integer(val.intValue() + 1);		resp_time_hist.put(rt, val);	      }	    }	  }	}	t2 = System.currentTimeMillis();	if (VERBOSE_REPORT) {  	  printResults("Client thread "+tnum, SEND_BURST_SIZE, SEND_MESSAGE_SIZE, t1, t2);	}	t1 = System.currentTimeMillis();      }    }  }  public static void main(String args[]) {    if (HISTOGRAM_REPORT) {      conn_time_hist = new Hashtable();      resp_time_hist = new Hashtable();    }    try {      if ((args.length < 6) || (args.length > 7)) {	System.err.println("Usage: java MultiClientThreaded [-q] <remote host> <num clients> <send message size> <send burst size> <recv message size> <recv burst size>");	System.err.println("\t-q\tDo not send any messages (be quiet)");	System.exit(-1);      }      int n = 0;      if (args[0].equals("-q")) {	QUIET = true;	n++;      } else {	QUIET = false;      }      REMOTE_HOST = args[n+0];      NUM_CONNECTIONS = Integer.valueOf(args[n+1]).intValue();      SEND_MESSAGE_SIZE = Integer.valueOf(args[n+2]).intValue();      SEND_BURST_SIZE = Integer.valueOf(args[n+3]).intValue();      RECV_MESSAGE_SIZE = Integer.valueOf(args[n+4]).intValue();      RECV_BURST_SIZE = Integer.valueOf(args[n+5]).intValue();      lock = new Object();      MultiClientThreaded benchthread = null;      if (OVERALL_REPORT || HISTOGRAM_REPORT) {	benchthread = new MultiClientThreaded();	clientNumBurstsSent = new int[NUM_CONNECTIONS];	clientNumBurstsReceived = new int[NUM_CONNECTIONS];      }      if (QUIET) {        Random r = new Random();        int s = Math.abs(r.nextInt()) % 5000;        try {    	  System.err.println("Doing initial sleep "+s+"ms");   	  Thread.currentThread().sleep(s);    	} catch (InterruptedException ie) {  	  // Ignore   	}	Socket sarr[] = new Socket[NUM_CONNECTIONS];	for (int i = 0; i < NUM_CONNECTIONS; i++) {	  sarr[i] = new Socket(REMOTE_HOST, PORTNUM);          try {   	    Thread.currentThread().sleep(10);      	  } catch (InterruptedException ie) {  	    // Ignore   	  }	}	System.err.println("Done creating connections");	while (true) {	  try {	    Thread.currentThread().sleep(100000);	  } catch (InterruptedException ie) {	    // Ignore	  }	}      } else {	for (int i = 0; i < NUM_CONNECTIONS; i++) {  	  MultiClientThreaded mc = new MultiClientThreaded(i);  	  mc.start();   	}        if (OVERALL_REPORT || HISTOGRAM_REPORT) {	  // Wait until all threads have started communicating	  MDWUtil.sleep(20000);  	  benchthread.start();        }      }          } catch (Exception e) {      System.out.println("MultiClientThreaded: Got exception: "+e.getMessage());      e.printStackTrace();    }  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -