⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sstimer.java

📁 The Staged Event-Driven Architecture (SEDA) is a new design for building scalable Internet services.
💻 JAVA
字号:
/*  * Copyright (c) 2001 by Matt Welsh and The Regents of the University of  * California. All rights reserved. * * Permission to use, copy, modify, and distribute this software and its * documentation for any purpose, without fee, and without written agreement is * hereby granted, provided that the above copyright notice and the following * two paragraphs appear in all copies of this software. *  * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *  * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. * * Author: Matt Welsh <mdw@cs.berkeley.edu> *  */package seda.sandStorm.core;import seda.sandStorm.api.*;import seda.util.*;/** * The ssTimer class provides a mechanism for registering * timer events that will go off at some future time.  The future time * can be specified in absolute or relative terms.  When the timer goes * off, an element is placed on a queue.  There is no way to unregister * a timer.  Events will be delivered guaranteed, but the time that they * are delivered may slip depending on stuff like how loaded the system * is and all that. * <P> * WARNING: you should use cancelEvent to cancel timers that you no longer * need, otherwise you will waste many, many cycles on unneeded timer * firings.  This was the bottleneck in vSpace and the DDS before we * fixed it.  For example, if you set a timer to go off on a cross-CPU * task to detect failure, then if the task returns successfully, cancel * the timer! * * @author   Matt Welsh and Steve Gribble */public class ssTimer implements Runnable, ProfilableIF {  private static final boolean DEBUG = false;  private ssTimerEvent                 head_event = null;  private ssTimerEvent                 tail_event = null;  private Thread                     thr;  private Object                     sync_o;  private boolean                    die_thread;  private int                        num_events = 0;  public ssTimer() {    sync_o = new Object();    die_thread = false;    thr = new Thread(this, "SandStorm ssTimer thread");    thr.start();  }  public static class ssTimerEvent {    public long            time_millis;    public QueueElementIF  obj;    public SinkIF queue;    public ssTimerEvent      nextE;    public ssTimerEvent      prevE;        public ssTimerEvent(long m, QueueElementIF o, SinkIF q) {      time_millis = m;      obj = o;      queue = q;      nextE = null;      prevE = null;    }    public String toString() {      return "ssTimerEvent<"+hashCode()+">";    }  }  /**   * Object <code>obj</code> will be placed on SinkIF <code>queue</code>   * no earlier than <code>millis</code> milliseconds from now.   *   * @param millis  the number of milliseconds from now when the event will   *                take place   * @param obj     the object that will be placed on the queue   * @param queue   the queue on which the object will be placed   */  public ssTimer.ssTimerEvent registerEvent(long millis, QueueElementIF obj,					  SinkIF queue) {    long time_millis = System.currentTimeMillis() + millis;    ssTimerEvent newTimer = new ssTimerEvent(time_millis, obj, queue);        insertEvent(newTimer);    return newTimer;  }  /**   * Object <code>obj</code> will be placed on SinkIF <code>queue</code>   * no earlier than absolute time <code>the_date</code>.   *   * @param the_date the date when the event will take place - if this date   *                 is in the past, the event will happen right away   * @param obj      the object that will be placed on the queue   * @param queue    the queue on which the object will be placed   */  public ssTimer.ssTimerEvent registerEvent(java.util.Date the_date,					  QueueElementIF obj,					  SinkIF queue) {    ssTimerEvent newTimer = new ssTimerEvent(the_date.getTime(),					 obj, queue);    insertEvent(newTimer);    return newTimer;  }  /**   * Kills off this timer object, dropping all pending events on floor.   */  public void doneWithTimer() {    die_thread = true;    synchronized(sync_o) {      sync_o.notify();    }  }  /**   * How many events yet to fire?   */  public int size() {    return num_events;  }  /**   * Return the profile size of this timer.   */  public int profileSize() {    return size();  }  /**   * Cancels all events.   */  public void cancelAll() {    synchronized(sync_o) {      head_event = tail_event = null;      num_events = 0;    }  }  /**   * Cancels the firing of this timer event.   *   * @param evt   the ssTimer.ssTimerEvent to cancel.  This ssTimerEvent   *              is returned to you when you call registerEvent   */  public void cancelEvent(ssTimerEvent evt) {    if (evt == null)      return;    try {      synchronized(sync_o) {	if (evt == tail_event) {	  // is this only list item?	  if (tail_event == head_event) {	    tail_event = head_event = null;	    num_events--;	    return;	  }	  // not only list item, is at tail, so lop off tail	  tail_event = tail_event.prevE;	  tail_event.nextE = null;	  num_events--;	  return;	} else if (evt == head_event) {	  // not only list item, is at head, so lop off head	  head_event = head_event.nextE;	  head_event.prevE = null;	  num_events--;	  return;	} else {	  // make sure event didn't fire already	  if ((evt.prevE != null) && (evt.nextE != null)) {	    // in middle somewhere	    evt.prevE.nextE = evt.nextE;	    evt.nextE.prevE = evt.prevE;	    num_events--;	    return;	  }	}      }    } finally {      evt.nextE = null;      evt.prevE = null;    }  }  // takes the event, does insertion-sort into ssTimerEvent linked list  private void insertEvent(ssTimerEvent newTimer) {    boolean do_notify = false;    synchronized(sync_o) {      if (head_event == null) {        // list empty	if (DEBUG) System.err.println("ssTimer: Inserting first event, num pending "+num_events+" event "+newTimer);	tail_event = newTimer;	head_event = newTimer;	num_events++;        do_notify = true;      } else if (head_event.time_millis > newTimer.time_millis) {	// insert head	if (DEBUG) System.err.println("ssTimer: Inserting event at head, num pending "+num_events+" event "+newTimer);	newTimer.nextE = head_event;	head_event.prevE = newTimer;	head_event = newTimer;	num_events++;        do_notify = true;      } else if (tail_event.time_millis <= newTimer.time_millis) {        // insert tail	if (DEBUG) System.err.println("ssTimer: Inserting event at tail, num pending "+num_events+" event "+newTimer);	newTimer.prevE = tail_event;	tail_event.nextE = newTimer;	tail_event = newTimer;	num_events++;	// if not insert at head, no notify! :)      } else {        // insert somewhere in middle :(	if (DEBUG) System.err.println("ssTimer: Inserting new event in middle, num pending "+num_events+" event "+newTimer);        ssTimerEvent prevE = tail_event.prevE;        ssTimerEvent curE = tail_event;        boolean gotit = false;        while((prevE != null) && (gotit == false)) {          if (prevE.time_millis <= newTimer.time_millis) {	    prevE.nextE = newTimer;	    curE.prevE = newTimer;	    newTimer.nextE = curE;	    newTimer.prevE = prevE;	    // if not insert at head, no notify! :)            gotit = true;	  }	  curE = prevE;	  prevE = prevE.prevE;        }	num_events++;      }      if (do_notify) {	sync_o.notify();      }    }  }  private void process_head() {    ssTimerEvent fire = null;    long wait_time = -1;    long curTime = System.currentTimeMillis();    if (head_event.time_millis <= curTime) {      // fire off event      fire = head_event;      if (DEBUG) System.err.println("Firing "+fire+" -> "+head_event.nextE+" "+(num_events-1)+" pending");      head_event = head_event.nextE;      if (head_event == null) {	// was only event	tail_event = null;      } else {	// reset back pointer	head_event.prevE = null;      }      if ((head_event == null) && (num_events != 1)) {	System.err.println("ssTimer: Warning: No more events to process, but still have "+(num_events-1)+" pending. This is a bug; please contact <mdw@cs.berkeley.edu>");      }      fire.nextE = null;      fire.prevE = null;      num_events--;    } else {      // sleep till head      if (DEBUG) System.err.println("ssTimer: head is "+(head_event.time_millis - curTime)+" ms in the future");      wait_time = head_event.time_millis - curTime;      if (wait_time != -1) {	try {	  sync_o.wait(wait_time);	} catch (InterruptedException ie) {	  // Ignore	}      }    }    if (fire != null) {      fire.queue.enqueue_lossy(fire.obj);    }   }  public void run() {    synchronized(sync_o) {      while(die_thread == false) {	try {       	  if (head_event != null) {	    process_head();	  } else {	    if (die_thread == true)	      return;	    try {	      sync_o.wait(500);	    } catch (InterruptedException ie) {	    }	  }	} catch (Throwable t) {  	  t.printStackTrace(); 	}      }    }  }  private static class GQEString implements QueueElementIF {    private String ns = null;    private long inj;    public GQEString(String f) {      ns = f;      inj = System.currentTimeMillis();    }    public String toString() {      return ns + " elapsed="+(System.currentTimeMillis() - inj);    }  }  public static void main(String args[]) {    FiniteQueue q = new FiniteQueue();    ssTimer te = new ssTimer();    ssTimer.ssTimerEvent t1, t10, t20, t30, t40, t50, t250, t500, t2500, t1500, t3500, t15000, t8000;    System.out.println("adding 1 millisecond event");    t1 = te.registerEvent(1, new GQEString("1"), q);    System.out.println("adding 10 millisecond event");    t10 = te.registerEvent(10, new GQEString("10"), q);    System.out.println("adding 20 millisecond event");    t20 = te.registerEvent(20, new GQEString("20"), q);    System.out.println("adding 30 millisecond event");    t30 = te.registerEvent(30, new GQEString("30"), q);    System.out.println("adding 40 millisecond event");    t40 = te.registerEvent(40, new GQEString("40"), q);    System.out.println("adding 50 millisecond event");    t50 = te.registerEvent(50, new GQEString("50"), q);    System.out.println("adding 250 millisecond event");    t250 = te.registerEvent(250, new GQEString("250"), q);    System.out.println("adding 500 millisecond event");    t500 = te.registerEvent(500, new GQEString("500"), q);    System.out.println("adding 2500 millisecond event");    t2500 = te.registerEvent(2500, new GQEString("2500"), q);    System.out.println("adding 1500 millisecond event");    t1500 = te.registerEvent(1500, new GQEString("1500"), q);    System.out.println("adding 3500 millisecond event");    t3500 = te.registerEvent(3500, new GQEString("3500"), q);    System.out.println("adding 15000 millisecond event");    t15000 = te.registerEvent(15000, new GQEString("15000"), q);    System.out.println("adding 8000 millisecond event");    t8000 = te.registerEvent(8000, new GQEString("8000"), q);    int num_got = 0;    while (num_got < 13) {      QueueElementIF nextEl[] = q.dequeue_all();      if (nextEl != null) {	num_got += nextEl.length;	System.out.println("got " + nextEl.length + " event" +	  (nextEl.length > 1 ? "s" : ""));	for (int i=0; i<nextEl.length; i++)	  System.out.println("  " + i + ": " + nextEl[i]);	System.out.println("total num got so far is: " + num_got);	System.out.println("num remain is: " + te.size());	if (num_got == 3)	  te.cancelEvent(t2500);      } else {	try {	  Thread.currentThread().sleep(5);	} catch (InterruptedException ie) {	}      }    }    te.doneWithTimer();  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -