⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 multiclient.java

📁 The Staged Event-Driven Architecture (SEDA) is a new design for building scalable Internet services.
💻 JAVA
字号:
/*  * Copyright (c) 2000 by Matt Welsh and The Regents of the University of  * California. All rights reserved. * * Permission to use, copy, modify, and distribute this software and its * documentation for any purpose, without fee, and without written agreement is * hereby granted, provided that the above copyright notice and the following * two paragraphs appear in all copies of this software. *  * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *  * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. * * Author: Matt Welsh <mdw@cs.berkeley.edu> *  *//*  * This program creates a number of outgoing socket connections and  * writes bursts of packets to each socket. After each burst a second  * burst of messages is received, and the next burst is sent.  * Using the -q option makes the client only open a connection, and not  * send any data; this is used to measure the overhead of idle connections  * to a server. */import java.net.*;import java.io.*;import java.util.*;import seda.sandStorm.api.*;import seda.sandStorm.core.*;import seda.sandStorm.lib.aSocket.*;import seda.sandStorm.main.*;import seda.util.*;public class MultiClient {  private static final boolean DEBUG = false;  private static final int PORTNUM = 5721;  private static final boolean OVERALL_REPORT = true;  private static final boolean HISTOGRAM_REPORT = true;  private static String REMOTE_HOST;  private static int NUM_CONNECTIONS;  private static int SEND_MESSAGE_SIZE;  private static int RECV_MESSAGE_SIZE;  private static int SEND_BURST_SIZE;  private static int RECV_BURST_SIZE;  private static boolean QUIET;  // Time in ms between reporting measurements  private static final int BENCH_DELAY = 5000;  // Time in ms to delay before starting   private static final int SLEEP_TIME = 5000;  // Number of bench samples before we exit; if zero, run forever  private static final int NUMBER_RUNS = 10;  private static BufferElement sendbuf;  private static QueueIF comp_q;  private static Vector connections;  private static ssTimer timer;  private static int num_connections = 0;  private static long total_connect_time = 0;  private static int num_completions = 0;  private static int num_bytes_read = 0;  private static long total_response_time = 0;  private static long max_response_time = 0;  private static Hashtable resp_time_hist;  private static final int RESP_HIST_BUCKETSIZE = 10;  private static int numBenchRuns = 0;  public MultiClient() {    connections = new Vector();    resp_time_hist = new Hashtable();    timer = new ssTimer();  }  private void printStats(long time_elapsed) {    long num_comps, num_bts, total_resp, max_resp;    if (OVERALL_REPORT) {      num_comps = num_completions; num_completions = 0;      num_bts = num_bytes_read; num_bytes_read = 0;      total_resp = total_response_time; total_response_time = 0;      max_resp = max_response_time; max_response_time = 0;      double sec = time_elapsed * 1.0e-3;      double comps_per_sec = (double)num_comps*1.0 / sec;      double bytes_per_sec = (double)num_bts*1.0 / sec;      double mbit_per_sec = (bytes_per_sec * 8.0) / (1024.0 * 1024.0);      double avg_resp_time = (double)total_resp*1.0 / num_comps;      System.err.println("Overall rate:\t"+MDWUtil.format(comps_per_sec)+" completions/sec");      System.err.println("Bandwidth:\t"+MDWUtil.format(bytes_per_sec)+" bytes/sec ("+MDWUtil.format(mbit_per_sec)+" Mbit/sec)");      System.err.println("Response Time:\t"+MDWUtil.format(avg_resp_time)+" ms, max "+MDWUtil.format(max_resp)+" ms\n");    }    if (HISTOGRAM_REPORT) {      Enumeration e = resp_time_hist.keys();      while (e.hasMoreElements()) {	Integer bucket = (Integer)e.nextElement();	int time = bucket.intValue() * RESP_HIST_BUCKETSIZE;	int val = ((Integer)resp_time_hist.get(bucket)).intValue();	System.err.println("RT "+time+" ms "+val+" count");      }      resp_time_hist.clear();      System.err.println("\n");    }    numBenchRuns++;    if (numBenchRuns == NUMBER_RUNS) {      if (OVERALL_REPORT) {	System.err.println("Fairness report:");	int totalSent = 0, totalReceived = 0;     	double avgSent, avgReceived;	for (int i = 0; i < connections.size(); i++) {	  ConnState cs = (ConnState)connections.elementAt(i);	  totalSent += cs.num_sent;	  totalReceived += cs.num_received;	}	avgSent = totalSent / connections.size();	avgReceived = totalReceived / connections.size();	System.err.println("Bursts sent: "+totalSent+" total, "+avgSent+" average");	System.err.println("Bursts received: "+totalReceived+" total, "+avgReceived+" average");	for (int i = 0; i < connections.size(); i++) {	  ConnState cs = (ConnState)connections.elementAt(i);	  System.err.println("Client "+i+" "+cs.num_sent+" sent, "+cs.num_received+" received");	}      }      System.err.println("Benchmark Finished.\n");      for (int i = 0; i < connections.size(); i++) {	ConnState cs = (ConnState)connections.elementAt(i);	cs.close();      }      System.exit(0);    }  }  class ConnState implements QueueElementIF {    ATcpConnection conn;    int num_sent, num_received;    int cur_offset, cur_length_target;    long burst_sent_time;    ConnState(ATcpConnection conn) {      this.conn = conn;      this.num_sent = 0;      this.num_received = 0;      this.cur_offset = 0;      this.cur_length_target = RECV_MESSAGE_SIZE*RECV_BURST_SIZE;      connections.addElement(this);    }    boolean readDone(int size) {      if (DEBUG) System.err.println("Adding "+size+" bytes to "+this.cur_offset);      num_bytes_read += size;      this.cur_offset += size;      if (this.cur_offset == this.cur_length_target) {	num_received++;	num_completions++;	// Record response time	long t2 = System.currentTimeMillis();	long resptime = (t2 - burst_sent_time);	if (resptime > max_response_time) max_response_time = resptime;	total_response_time += resptime;	if (HISTOGRAM_REPORT) {	  Integer rt = new Integer((int)(resptime / RESP_HIST_BUCKETSIZE));	  Integer val = (Integer)resp_time_hist.remove(rt);	  if (val == null) {	    resp_time_hist.put(rt, new Integer(1));	  } else {	    val = new Integer(val.intValue() + 1);	    resp_time_hist.put(rt, val);	  }	}   	return true;      } else {	return false;      }    }    void reset() {      this.cur_offset = 0;      this.cur_length_target = RECV_MESSAGE_SIZE*RECV_BURST_SIZE;    }    void sendBurst() {      burst_sent_time = System.currentTimeMillis();      for (int i = 0; i < SEND_BURST_SIZE; i++) {	if (!conn.enqueue_lossy(sendbuf)) {	  System.err.println("WARNING: Could not enqueue message onto conn "+conn);	}      }      num_sent++;    }    void close() {      try {	conn.close(comp_q);      } catch (SinkClosedException sce) {	System.err.println("WARNING: Connection "+conn+" already closed");      }    }  }  private void doIt() {    Random rand = new Random();    comp_q = new FiniteQueue();    byte sendmsg[] = new byte[SEND_MESSAGE_SIZE];    for (int i = 0; i < sendmsg.length; i++) {      sendmsg[i] = (byte)(i & 0xff);    }    sendbuf = new BufferElement(sendmsg);    QueueElementIF fetched[];    long time_connect_requested, t1, t2;    // Sleep before starting    long t = (Math.abs(rand.nextInt()) % 5000) + 5000;    MDWUtil.sleep(t);    time_connect_requested = System.currentTimeMillis();    for (int i = 0; i < NUM_CONNECTIONS; i++) {      try {	ATcpClientSocket sock = new ATcpClientSocket(REMOTE_HOST, PORTNUM, comp_q);      } catch (UnknownHostException e) {	System.err.println("Got UnknownHostException: "+e);	return;      }      // Pause between connection requests      t = (Math.abs(rand.nextInt()) % 20) + 20;      MDWUtil.sleep(t);    }    t1 = System.currentTimeMillis();    while (true) {      while ((fetched = comp_q.blocking_dequeue_all(-1)) == null) ;      for (int i = 0; i < fetched.length; i++) {	if (fetched[i] instanceof ATcpConnection) {	  // New connection	  System.err.println("Established connection "+num_connections);	  ATcpConnection conn = (ATcpConnection)fetched[i];	  num_connections++;	  if (num_connections == NUM_CONNECTIONS) {	    long time_connect_done = System.currentTimeMillis();	    total_connect_time = time_connect_done - time_connect_requested;	    double avg_connect_time = total_connect_time / NUM_CONNECTIONS;	    System.err.println("All connections received, avg conn time "+MDWUtil.format(avg_connect_time)+" ms");	  }	  conn.startReader(comp_q);	  ConnState cs = new ConnState(conn);	  conn.userTag = cs;	  if (!QUIET) {	    // Start the burst after SLEEP_TIME	    t = (Math.abs(rand.nextInt()) % SLEEP_TIME) + SLEEP_TIME;	    System.err.println("Pausing connection for "+t+" ms");	    timer.registerEvent(t, cs, comp_q);	  }	} else if (fetched[i] instanceof ConnState) {	  // Connection is ready for first burst	  ConnState cs = (ConnState)fetched[i];	  System.err.println("Starting connection");	  cs.sendBurst();	} else if (fetched[i] instanceof ATcpInPacket) {	  // Incoming packet	  ATcpInPacket pkt = (ATcpInPacket)fetched[i];	  ATcpConnection conn = pkt.getConnection();	  ConnState cs = (ConnState)conn.userTag;	  if (cs.readDone(pkt.size())) {	    cs.sendBurst();	    cs.reset();	  }	} else if (fetched[i] instanceof SinkClosedEvent) {	  num_connections--;	  SinkClosedEvent sce = (SinkClosedEvent)fetched[i];	  ATcpConnection conn = (ATcpConnection)sce.sink;	  ConnState cs = (ConnState)conn.userTag;	  connections.removeElement(cs);	  conn.userTag = null;	} else {	  System.err.println("Got unknown comp_q event: "+fetched[i].toString());	}	// Print stats	t2 = System.currentTimeMillis();	if ((t2 - t1) >= BENCH_DELAY) {	  printStats((t2-t1));	  t1 = t2;	}      }    }  }  public static void main(String args[]) {    try {      if ((args.length < 6) || (args.length > 7)) {	System.err.println("Usage: java MultiClient [-q] <remote host> <num clients> <send message size> <send burst size> <recv message size> <recv burst size>");	System.err.println("\t-q\tDo not send any messages (be quiet)");	System.exit(-1);      }      int n = 0;      if (args[0].equals("-q")) {	QUIET = true;	n++;      } else {	QUIET = false;      }      REMOTE_HOST = args[n+0];      NUM_CONNECTIONS = Integer.valueOf(args[n+1]).intValue();      SEND_MESSAGE_SIZE = Integer.valueOf(args[n+2]).intValue();      SEND_BURST_SIZE = Integer.valueOf(args[n+3]).intValue();      RECV_MESSAGE_SIZE = Integer.valueOf(args[n+4]).intValue();      RECV_BURST_SIZE = Integer.valueOf(args[n+5]).intValue();            if (!QUIET) {	System.out.println("Starting benchmark: num connections "+NUM_CONNECTIONS+", send message size "+SEND_MESSAGE_SIZE+", send burst "+SEND_BURST_SIZE+", recv message size "+RECV_MESSAGE_SIZE+", recv burst "+RECV_BURST_SIZE);      } else {	System.err.println("Starting benchmark: num connections "+NUM_CONNECTIONS+", quiet run");      }      MultiClient mc = new MultiClient();      mc.doIt();    } catch (Exception e) {      System.out.println("MultiClient: Got exception: "+e.getMessage());      e.printStackTrace();    }  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -