⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sinkproxy.java

📁 The Staged Event-Driven Architecture (SEDA) is a new design for building scalable Internet services.
💻 JAVA
字号:
/*  * Copyright (c) 2001 by Matt Welsh and The Regents of the University of  * California. All rights reserved. * * Permission to use, copy, modify, and distribute this software and its * documentation for any purpose, without fee, and without written agreement is * hereby granted, provided that the above copyright notice and the following * two paragraphs appear in all copies of this software. *  * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *  * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. * * Author: Matt Welsh <mdw@cs.berkeley.edu> *  */package seda.sandStorm.internal;import seda.sandStorm.api.*;import seda.sandStorm.api.internal.*;import java.util.*;/** * Used as a proxy to observe and measure communication behavior between  * stages. By handing out a SinkProxy instead of a FiniteQueue, it is * possible to gather statistics on event communication between stages. * This is used by StageGraph to construct a graph of the communication * patterns between stages. * * @author Matt Welsh */public class SinkProxy implements SinkIF, ProfilableIF {  private static final boolean DEBUG = false;  private ManagerIF mgr;  private StageWrapperIF toStage;  private StageGraph stageGraph;  public SinkIF thesink;  private Thread client = null;  private Hashtable clientTbl = null;  /**    * Maintains a running sum of the number of elements enqueued onto    * this sink.   */  public int enqueueCount;  /**    * Maintains a running sum of the number of elements successfully    * enqueued onto this sink (that is, not rejected by the enqueue predicate).   */  public int enqueueSuccessCount;  /**    * Used to maintain a timer for statistics gathering.    */  public long timer;  /**   * Create a SinkProxy for the given sink.   *   * @param sink The sink to create a proxy for.   * @param mgr The associated manager.   * @param toStage The stage which this sink pushes events to.   */  public SinkProxy(SinkIF sink, ManagerIF mgr, StageWrapperIF toStage) {    this.thesink = sink;    this.mgr = mgr;    this.stageGraph = mgr.getProfiler().getGraphProfiler();    this.toStage = toStage;    this.enqueueCount = 0;    this.enqueueSuccessCount = 0;    this.timer = 0;  }  /**    * Return the size of the queue.   */  public int size() {    if (thesink == null) return 0;    return thesink.size();  }  public void enqueue(QueueElementIF enqueueMe) throws SinkException {    recordUse();    enqueueCount++;     thesink.enqueue(enqueueMe);    enqueueSuccessCount++;   }  public boolean enqueue_lossy(QueueElementIF enqueueMe) {    recordUse();    enqueueCount++;     boolean pass = thesink.enqueue_lossy(enqueueMe);    if (pass) enqueueSuccessCount++;    return pass;  }  public void enqueue_many(QueueElementIF[] enqueueMe) throws SinkException {    recordUse();    if (enqueueMe != null) {      enqueueCount += enqueueMe.length;    }    thesink.enqueue_many(enqueueMe);    if (enqueueMe != null) {      enqueueSuccessCount += enqueueMe.length;    }  }  /**    * Return the profile size of the queue.   */  public int profileSize() {    return size();  }  public Object enqueue_prepare(QueueElementIF enqueueMe[]) throws SinkException {    recordUse();    if (enqueueMe != null) {      enqueueCount += enqueueMe.length;    }    Object key = thesink.enqueue_prepare(enqueueMe);    if (enqueueMe != null) {      enqueueSuccessCount += enqueueMe.length;    }    return key;  }  public void enqueue_commit(Object key) {    thesink.enqueue_commit(key);  }  public void enqueue_abort(Object key) {    thesink.enqueue_abort(key);  }  public void setEnqueuePredicate(EnqueuePredicateIF pred) {    thesink.setEnqueuePredicate(pred);  }  public EnqueuePredicateIF getEnqueuePredicate() {    return thesink.getEnqueuePredicate();  }  public String toString() {    return "[SinkProxy for toStage="+toStage+"]";  }  private void recordUse() {    if (DEBUG) System.err.println("SinkProxy: Recording use of "+this+" by thread "+Thread.currentThread());    if (client == null) {      client = Thread.currentThread();      StageGraphEdge edge = new StageGraphEdge();      edge.fromStage = stageGraph.getStageFromThread(client);      edge.toStage = toStage;      edge.sink = this;      stageGraph.addEdge(edge);    } else {      Thread t = Thread.currentThread();      if (client != t) {	if (clientTbl == null) clientTbl = new Hashtable();	if (clientTbl.get(t) == null) {	  clientTbl.put(t, t);	  StageGraphEdge edge = new StageGraphEdge();	  edge.fromStage = stageGraph.getStageFromThread(t);	  edge.toStage = toStage;	  edge.sink = this;	  stageGraph.addEdge(edge);	}      }    }  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -