⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 multibandwidth.java

📁 The Staged Event-Driven Architecture (SEDA) is a new design for building scalable Internet services.
💻 JAVA
字号:
/*  * Copyright (c) 2000 by Matt Welsh and The Regents of the University of  * California. All rights reserved. * * Permission to use, copy, modify, and distribute this software and its * documentation for any purpose, without fee, and without written agreement is * hereby granted, provided that the above copyright notice and the following * two paragraphs appear in all copies of this software. *  * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *  * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. * * Author: Matt Welsh <mdw@cs.berkeley.edu> *  *//* This is a server which accepts multiple connections from clients running * the 'Bandwidth' or 'MultiClient' clients and reads packets from them,  * sending bursts of a given size. The idea is to demonstrate the  * performance of the aSocket layer with multiple client machines loading it.  */import seda.sandStorm.api.*;import seda.sandStorm.core.*;import seda.sandStorm.lib.aSocket.*;import seda.sandStorm.main.*;import  java.net.*;import  java.io.*;import  java.util.*;public class MultiBandwidth {    private static ATcpServerSocket servsock;  private static QueueIF comp_q = null;  private static Hashtable conn_table;  private static int num_connections = 0;  private static int num_active_connections = 0;  private static final boolean DEBUG = false;  private static final boolean VERBOSE = true;  private static boolean BLOCKING_DEQUEUE = true;  private static boolean FAIRNESS_REPORT = true;  private static final int PORTNUM = 5721;  private static int SEND_MESSAGE_SIZE;  private static int SEND_BURST_SIZE;  private static int RECV_MESSAGE_SIZE;  private static int RECV_BURST_SIZE;  public MultiBandwidth() {    conn_table = new Hashtable();  }  public void setup() throws IOException, UnknownHostException {    comp_q = new FiniteQueue();    servsock = new ATcpServerSocket(PORTNUM, comp_q);  }  class ConnState {    SinkIF sink;    int num_messages;    int cur_offset, cur_length_target;    int burstsSent;    boolean firstRead;    ConnState(SinkIF sink) {      this.sink = sink;      this.num_messages = 0;      this.cur_offset = 0;      this.cur_length_target = RECV_MESSAGE_SIZE*RECV_BURST_SIZE;      this.firstRead = true;    }    boolean readDone(int size) {      if (firstRead) {	firstRead = false;	num_active_connections++;      }      if (DEBUG) System.err.println("Adding "+size+" bytes to "+this.cur_offset);      this.cur_offset += size;      if (DEBUG) System.err.println("Total now "+this.cur_offset+", target "+this.cur_length_target);      if (this.cur_offset == this.cur_length_target) {	if (DEBUG) System.err.println("Reached cur_length_target");        num_messages++;        return true;      } else {        return false;      }    }    void reset() {      this.cur_offset = 0;      this.cur_length_target = RECV_MESSAGE_SIZE*RECV_BURST_SIZE;    }  }  public void doIt() throws SinkClosedException, IOException {    int i;    int n = 0;    int num_received = 0;    byte sendmsg[] = new byte[SEND_MESSAGE_SIZE];    BufferElement sendbuf = new BufferElement(sendmsg);    QueueElementIF fetched[];    long before, after;    long total_bytes = 0;    before = System.currentTimeMillis();    while (true) {            if (DEBUG) System.err.println("Waiting for dequeue...");      if (BLOCKING_DEQUEUE) {        while ((fetched = comp_q.blocking_dequeue_all(-1)) == null) ;      } else {        while ((fetched = comp_q.dequeue_all()) == null) ;      }      //System.err.println("FETCHED "+fetched.length+" NEW EVENTS");      for (i = 0; i < fetched.length; i++) {		if (fetched[i] instanceof ATcpConnection) {	  System.err.println("Got connection "+num_connections);	  ATcpConnection conn = (ATcpConnection)fetched[i];	  num_connections++;	  conn.startReader(comp_q);	  ConnState cs = new ConnState(conn);	  conn_table.put(conn, cs);        } else if (fetched[i] instanceof ATcpInPacket) {          ATcpInPacket pkt = (ATcpInPacket)fetched[i];	  ATcpConnection conn = pkt.getConnection();          //System.err.println("Got packet: "+pkt.size()+" from conn "+conn);	  total_bytes += pkt.size(); 	  n++;	  //if (n % (num_connections * 100) == 0) {	  after = System.currentTimeMillis();	  if ((after - before) >= 5000) {	    printTime(before, after, total_bytes);	    before = System.currentTimeMillis();	    total_bytes = 0;	  }	  ConnState cs = (ConnState)conn_table.get(conn);	  if (cs.readDone(pkt.size())) {	    if (DEBUG) System.err.println("Sending burst");	    for (int j = 0; j < SEND_BURST_SIZE; j++) {              if (!cs.sink.enqueue_lossy(sendbuf)) {		System.err.println("WARNING: Could not enqueue message onto sink "+cs.sink);	      }	      cs.burstsSent++;            }	    cs.reset();	  }	} else if (fetched[i] instanceof SinkDrainedEvent) {	  if (DEBUG) System.err.println("Got SinkDrainedEvent!");	} else if (fetched[i] instanceof SinkClosedEvent) {	  if (DEBUG) System.err.println("Got SinkClosedEvent!");	  System.err.println("Closed connection "+num_connections);	  SinkClosedEvent sce = (SinkClosedEvent)fetched[i];	  conn_table.remove(sce.sink);	  num_connections--;	  num_active_connections--;	} else {	  System.err.println("Sender got unknown comp_q event: "+fetched[i].toString());	}      }    }  }  private static void printTime(long t1, long t2, long msg_size) {    long diff = t2-t1;    double mbits = (msg_size * 8.0)/(1024*1024);    double mbps = mbits / (double)(diff * 1.0e-3);    System.err.println("Completed: "+msg_size + " bytes in " + diff + " msec");    System.err.println("\t"+mbps+" mbps bandwidth");    System.err.println("\t"+num_connections+" connections");    System.err.println("\t"+num_active_connections+" active connections");    // Compute fairness    if (FAIRNESS_REPORT) {      int totalSent = 0, n = 0;      int numBad = 0;      Enumeration e = conn_table.elements();      while (e.hasMoreElements()) {	ConnState cs = (ConnState)e.nextElement();	totalSent += cs.burstsSent;	n++;	if (cs.burstsSent == 0) numBad++;      }      double averageSent = totalSent / n;      double t = 0;      e = conn_table.elements();      while (e.hasMoreElements()) {	ConnState cs = (ConnState)e.nextElement();	t += (cs.burstsSent - averageSent) * (cs.burstsSent - averageSent);	cs.burstsSent = 0;      }      double stddev = Math.sqrt(t / (n - 1));      System.err.println("\t"+averageSent+" avg bursts/conn, stddev "+stddev);      System.err.println("\t"+numBad+" connections with no bursts");    }  }  private static void usage() {    System.err.println("usage: MultiBandwidth <send-msg-size> <send-burst-size> <recv-msg-size> <recv-burst-size>");    System.exit(1);  }  public static void main(String args[]) {    MultiBandwidth np;    boolean sending = false;    if (args.length != 4) usage();    SEND_MESSAGE_SIZE = Integer.decode(args[0]).intValue();    SEND_BURST_SIZE = Integer.decode(args[1]).intValue();    RECV_MESSAGE_SIZE = Integer.decode(args[2]).intValue();    RECV_BURST_SIZE = Integer.decode(args[3]).intValue();    try {      System.err.println("MultiBandwidth: send message size="+SEND_MESSAGE_SIZE+", send burst size="+SEND_BURST_SIZE+", recv message size="+RECV_MESSAGE_SIZE+", recv burst size="+RECV_BURST_SIZE+", rx block="+BLOCKING_DEQUEUE);      np = new MultiBandwidth();      np.setup();      np.doIt();    } catch (Exception e) {      System.err.println("MultiBandwidth.main() got exception: "+e);      e.printStackTrace();    }  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -