📄 stage.java
字号:
/* Copyright (c) 2003, The Regents of the University of California, through Lawrence Berkeley National Laboratory (subject to receipt of any required approvals from the U.S. Dept. of Energy). All rights reserved.*/package gov.lbl.dsd.sea;import gov.lbl.dsd.sea.event.ExceptionEvent;import java.util.Date;import java.util.Timer;import java.util.TimerTask;import EDU.oswego.cs.dl.util.concurrent.Executor;import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;/** * A stage has an input queue into which another stage puts events; A stage has * one or more threads, which take events from the input queue and hand them to * the stage's event handler which processes the events in a non-blocking * asynchronous manner. * <p> * Note on exception handling: If a {@link RuntimeException}is raised within * {@link EventHandler#handle(Object)}of this stage's event handler, then this * stage's ExceptionHandler is called, which should handle the exception in an * appropriate application specific manner. If no ExceptionHandler is defined * (because it is <code>null</code>) the exception is simply rethrown. * * @author whoschek@lbl.gov * @author $Author: gegles $ * @version $Revision: 1.18 $, $Date: 2004/09/16 16:57:15 $ */public class Stage { private final String name; // the name of this stage private final EventHandler handler; // user provided object handling events private final ExceptionHandler exceptionHandler; // called when EventHandler.handle throw an exception private Timer timer; // queues and delivers future events for enqueue(Event, Date) private Executor executor; // thread handling policy private boolean isStarted = false; private ExecutorFactory executorFactory; // used to create a new executor when start is called private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(Stage.class); /** * Creates a new stage with the given name, event handler, executor factory and exceptionHandler. */ public Stage(String stageName, EventHandler handler, ExecutorFactory executorFactory, ExceptionHandler exceptionHandler) { this.name = stageName; this.executorFactory = executorFactory; this.exceptionHandler = exceptionHandler; this.executor = null; // do not create an executor until start is being called this.timer = null; // do not start timer thread unless it is needed via enqueue(Event, Date) this.handler = handler; handler.setStage(this); } /** * Returns the name of this stage. */ public String getName() { return this.name; } /** * Returns a string representation of the receiver. */ public String toString() { return this.getName(); } /** * Initializes the stage and its event handler; Must be called before enqueuing events (otherwise enqueues are ignored); * This method can be called at any time but only has an effect only if the stage is stopped. * * @return <code>this</code> (for convenient call chaining only) */ public Stage start() { synchronized (this) { if (this.isStarted) { log.warn("start: improper but mostly harmless state"); return this; } this.executor = this.executorFactory.createExecutor(); this.isStarted = true; } if (log.isDebugEnabled()) log.debug(getName() + ": now initializing handler..."); handler.onStart(); if (log.isDebugEnabled()) log.debug(getName() + ": sucessfullly initialized handler."); return this; } /** * Cleans up this stage and its event handler, as well as related threads; * Once a stage has been destroyed it can be initialized again by using start; * This method can be called at any time but only has an effect if the stage is started. */ public void stop() { synchronized (this) { if (!this.isStarted) { log.warn("stop: improper but mostly harmless state"); return; } this.isStarted = false; if (log.isDebugEnabled()) log.debug(getName() + ": now shutting down..."); if (this.timer != null) { this.timer.cancel(); // throws away scheduled future events } if (this.executor instanceof QueuedExecutor) { ((QueuedExecutor) this.executor).shutdownAfterProcessingCurrentlyQueuedTasks(); //((QueuedExecutor) this.executor).shutdownNow(); } else if (this.executor instanceof PooledExecutor) { boolean interruptAll = false; PooledExecutor pooledExec = (PooledExecutor) this.executor; //pooledExec.shutdownNow(); pooledExec.shutdownAfterProcessingCurrentlyQueuedTasks(); if (interruptAll) pooledExec.interruptAll(); try { log.debug(getName() + ": awaiting termination..."); ((PooledExecutor) this.executor).awaitTerminationAfterShutdown(); log.debug(getName() + ": terminated with success."); } catch (InterruptedException e) {} } this.executor = null; } this.handler.onStop(); if (log.isDebugEnabled()) log.debug(getName() + ": successfully shut down."); } /** * Enqueues the given event onto this stage. */ public void enqueue(Object event) { try { this.enqueueInterruptable(event); } catch (InterruptedException e) { throw new RuntimeException(e); } } /** * Schedules the given event to be enqueued onto this stage at the given (future) time. */ public void enqueue(final Object event, Date date) { if (log.isTraceEnabled()) log.trace(getName() + ": scheduling event to be queued at future time=" + date + ", now=" + new Date() + ", event=" + event); this.getTimer().schedule( new TimerTask() { // anonymous inner class public void run() { if (log.isTraceEnabled()) log.trace(getName() + ": timer triggered; now queueing event=" + event); enqueue(event); } } , date); } /** * Enqueues the given event onto this stage. * <p> * On an exception, we also log the stack trace of the thread enqueueing * the event, which is VERY helpful for debugging purposes, because one can * actually see the stack trace of the methods and thread that enqueued the * event causing problems. */ protected void enqueueInterruptable(final Object event) throws InterruptedException { synchronized (this) { if (!this.isStarted) { log.warn(getName() + ": enqueue: improper but mostly harmless state"); return; } } // save stack trace of current thread for potential later use (TRICK) final RuntimeException stackTraceOfEnqueueingThread = new RuntimeException("Failure in EventHandler.handle(event)"); this.executor.execute( new Runnable() { public void run() { if (log.isTraceEnabled()) log.trace("ENTER event handling [" + getName() + "], event=" + event); try { handler.handle(event); } catch (RuntimeException e) { log.error("Oopsla: stackTraceOfEnqueueingThread: ", stackTraceOfEnqueueingThread); log.error("Oopsla: ", e instanceof ExceptionEvent ? e : new ExceptionEvent(e, event, Stage.this)); if (exceptionHandler == null) { throw e; } else { exceptionHandler.onException(e, event, Stage.this); } } catch (Error e) { log.error("Oopsla: stackTraceOfEnqueueingThread: ", stackTraceOfEnqueueingThread); log.error("Oopsla: ", new ExceptionEvent(e, event, Stage.this)); throw e; } finally { if (log.isTraceEnabled()) log.trace("EXIT event handling [" + getName() + "], event=" + event); } } } ); } protected synchronized Timer getTimer() { if (this.timer == null) this.timer = new Timer(); return this.timer; } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -