⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 responsetimecontrollermulticlass.java

📁 The Staged Event-Driven Architecture (SEDA) is a new design for building scalable Internet services.
💻 JAVA
字号:
/*  * Copyright (c) 2001 by Matt Welsh and The Regents of the University of  * California. All rights reserved. * * Permission to use, copy, modify, and distribute this software and its * documentation for any purpose, without fee, and without written agreement is * hereby granted, provided that the above copyright notice and the following * two paragraphs appear in all copies of this software. *  * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *  * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. * * Author: Matt Welsh <mdw@cs.berkeley.edu> *  */package seda.sandStorm.internal;import seda.sandStorm.api.*;import seda.sandStorm.api.internal.*;import seda.sandStorm.core.*;import seda.sandStorm.main.*;import seda.util.*;import java.util.*;/** * An implementation of ResponseTimeController that uses a direct  * adjustment of queue thresholds based on the error in the 90th * percentile response time. Allows multiple class SLAs. *  * @author   Matt Welsh */public class ResponseTimeControllerMulticlass extends ResponseTimeControllerDirect {  private static final boolean DEBUG = true;  private static final int MEASUREMENT_SIZE = 100;  private static final long MEASUREMENT_TIME = 1000;  private static final double SMOOTH_CONST = 0.7;  private static final int NINETIETH = (int)((double)MEASUREMENT_SIZE * 0.9);  private static final double LOW_WATER = -0.1;  private static final double HIGH_WATER = 0.0;  private static final double ADDITIVE_INCREASE = 2.0;  private static final double MULTIPLICATIVE_INCREASE = 1.1;  private static final double MULTIPLICATIVE_DECREASE = 1.2; //2  private static final double MULTIPLICATIVE_DECREASE_LOWPRIO = 10;  private static final double MULTIPLICATIVE_DECREASE_HIPRIO = 1.2;  private static final double INIT_RATE = 100.0;  private static final int INIT_DEPTH = 1;  private static final double MAX_RATE = 5000.0;  private static final double MIN_RATE = 0.05;  private static final double MIN_90th = 1.0e-5;  private static final int LOWCOUNT_THRESH = 20;  private static final boolean SAVE_MAX_RATE = false;  private static final int MAX_CLASSES = 10;  private int NUM_CLASSES;  private String name;  private boolean enabled;  private cinfo carr[];  class cinfo {    int theclass;    double adjtime;    double targetRT;    long measurements[], sortedmeasurements[];    int cur_measurement = 0;    int num_measurements = 0;    double curRate;    int lowCount = 0;    double ninetiethRT;    double err, last_err;    boolean adjust = false;    boolean preempted = false;    double maxRate = -1.0;    boolean last_increased = false;    cinfo(int theclass, double target) {      this.theclass = theclass;      this.targetRT = target;      this.measurements = new long[MEASUREMENT_SIZE];      this.sortedmeasurements = new long[MEASUREMENT_SIZE];      this.curRate = ((MulticlassRateLimitingPredicate)pred).getTargetRate(theclass);      this.adjtime = System.currentTimeMillis();      System.err.println("RTControllerMulticlass: Class "+theclass+" targetRT "+targetRT+", curRate "+this.curRate);    }    void setRate(double newrate) {      curRate = newrate;      if (curRate < MIN_RATE) curRate = MIN_RATE;      if (curRate > MAX_RATE) curRate = MAX_RATE;      if (SAVE_MAX_RATE && maxRate > 0.0 && curRate > maxRate) curRate = maxRate;      ((MulticlassRateLimitingPredicate)pred).setTargetRate(theclass, curRate);    }    void addMeasurement(long time) {      measurements[cur_measurement] = time;      cur_measurement++;       num_measurements++;       if (cur_measurement == MEASUREMENT_SIZE) {	cur_measurement = 0;	adjust = true;      }    }    void record90th(int numsort, long curtime) {      System.arraycopy(measurements, 0, sortedmeasurements, 0, numsort);      Arrays.sort(sortedmeasurements, 0, numsort);      long cur = sortedmeasurements[ (int)(0.9 * (double)numsort) ];      ninetiethRT = (SMOOTH_CONST * (double)ninetiethRT*1.0) + ((1.0 - SMOOTH_CONST) * ((double)cur * 1.0));       if (ninetiethRT < MIN_90th) ninetiethRT = 0;      if (theclass == 0) stage.getStats().record90thRT(ninetiethRT);      adjtime = curtime;      // Avoid timeout causing us to always read old value      if (cur_measurement == 0) sortedmeasurements[0] = 0L;    }    boolean adjust(long curtime) {      int numsort = MEASUREMENT_SIZE;      if (num_measurements > 0 && (curtime - adjtime) >= MEASUREMENT_TIME) {	adjust = true;	numsort = cur_measurement;	cur_measurement = 0;      }       if (!adjust) return false;      adjust = false;      record90th(numsort, curtime);      if (!enabled) return false;      if (targetRT == -1) return false;      num_measurements = 0;      err = (ninetiethRT - targetRT) / targetRT;      if (err < LOW_WATER) {	// We are below our target - increase our rate only	if (preempted) {	  // Not allowed to increase - preempted by higher priority	  preempted = false;	  return false;	}	setRate(curRate + rateAdd(err));	last_increased = true;	lowCount = 0;      } else if (err > HIGH_WATER) {	// We are above our target - reduce rates of all lower classes	boolean found = false;	for (int c2 = 0; c2 < theclass; c2++) {	  cinfo ci2 = carr[c2];	  ci2.preempted = true;	  if (ci2.curRate > MIN_RATE) {	    found = true;	    ci2.setRate(ci2.curRate / MULTIPLICATIVE_DECREASE_LOWPRIO);	  }	}	if (found) lowCount = 0;	if (!found && ((++lowCount >= LOWCOUNT_THRESH) || (theclass == 0))) {	  // Didn't find anyone else to penalize; adjust ourselves	  setRate(curRate / MULTIPLICATIVE_DECREASE);	  if (last_increased) maxRate = curRate;	  last_increased = false;	} else {	  // Found someone else to penalize or not at LOWCOUNT_THRESH	  setRate(curRate / MULTIPLICATIVE_DECREASE_HIPRIO);	  last_increased = false;	}      } else {	last_increased = false;      }      return true;    }  }  public ResponseTimeControllerMulticlass(ManagerIF mgr, StageWrapperIF stage) throws IllegalArgumentException {    super(mgr, stage);    this.name = stage.getStage().getName();    SandstormConfig config = mgr.getConfig();    // First count number of classes    for (int c = 0; c < MAX_CLASSES; c++) {      double t = config.getDouble("stages."+name+".rtController.multiclass.class"+(c)+"Target");      if (t == -1) {	t = config.getDouble("global.rtController.multiclass.class"+c+"Target");      }      if (t != -1) NUM_CLASSES++;    }    if (NUM_CLASSES == 0) {       NUM_CLASSES = 1;    }    this.pred = new MulticlassRateLimitingPredicate(stage.getStage().getSink(), NUM_CLASSES, INIT_RATE, INIT_DEPTH);    stage.getStage().getSink().setEnqueuePredicate(pred);    this.carr = new cinfo[NUM_CLASSES];    for (int c = 0; c < NUM_CLASSES; c++) {      double t = config.getDouble("stages."+name+".rtController.multiclass.class"+(c)+"Target");      if (t == -1) {	t = config.getDouble("global.rtController.multiclass.class"+c+"Target");      }      this.carr[c] = new cinfo(c, t);    }    System.err.println("RTControllerMulticlass9 <"+name+">: MEASUREMENT_SIZE="+MEASUREMENT_SIZE+", SMOOTH_CONST="+SMOOTH_CONST+", LOW_WATER="+LOW_WATER+", HIGH_WATER="+HIGH_WATER+", ADDITIVE_INCREASE="+ADDITIVE_INCREASE+", MULTIPLCATIVE_DECREASE="+MULTIPLICATIVE_DECREASE);    this.enabled = true;  }  // Additive increase function  private double rateAdd(double err) {    // LOW_WATER gets increase of 0    if (err > -0.5) return 0;    else return ADDITIVE_INCREASE * ((-1.0 * err) + LOW_WATER);  }  public synchronized void adjustThreshold(QueueElementIF fetched[], long procTime) {    long curtime = System.currentTimeMillis();    for (int i = 0; i < fetched.length; i++) {      if (fetched[i] instanceof TimeStampedEvent) {	TimeStampedEvent ev = (TimeStampedEvent)fetched[i];	long time = ev.timestamp;	if (time != 0) {	  int theclass = 0;	  if (ev instanceof ClassQueueElementIF) {	    ClassQueueElementIF cqel = (ClassQueueElementIF)ev;	    theclass = cqel.getRequestClass();	    if (theclass == -1) theclass = 0;	  }	  carr[theclass].addMeasurement(curtime - time);	}      }    }    boolean adjusted_any = false;    for (int c = NUM_CLASSES-1; c >= 0; c--) {      if (carr[c].adjust(curtime)) {	adjusted_any = true;      }    }    if (adjusted_any) {      for (int c = 0; c < NUM_CLASSES; c++) {	if (DEBUG) System.err.println("RTController <"+name+"> class "+c+": ninetiethRT "+MDWUtil.format(carr[c].ninetiethRT)+" target "+MDWUtil.format(carr[c].targetRT)+" rate now "+MDWUtil.format(carr[c].curRate));      }    }  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -