invoker.java

来自「mysql集群」· Java 代码 · 共 273 行

JAVA
273
字号
/*
 * 	This program is free software; you can redistribute it and/or modify it under the terms of 
 * the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, 
 * or (at your option) any later version. 
 * 
 * 	This program is distributed in the hope that it will be useful, 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  
 * See the GNU General Public License for more details. 
 * 	You should have received a copy of the GNU General Public License along with this program; 
 * if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 */
package com.meidusa.amoeba.util;import java.util.Arrays;import java.util.HashMap;import org.apache.log4j.Logger;/** * The invoker is used to invoke self-contained units of code on an * invoking thread. Each invoker is associated with its own thread and * that thread is used to invoke all of the units posted to that invoker * in the order in which they were posted. The invoker also provides a * convenient mechanism for processing the result of an invocation back on * the main thread. * * <p> The invoker is a useful tool for services that need to block and * therefore cannot be run on the main thread. For example, an interactive * application might provide an invoker on which to run database queries. * * <p> Bear in mind that each invoker instance runs units on its own * thread and care must be taken to ensure that code running on separate * invokers properly synchronizes access to shared information. Where * possible, complete isolation of the services provided by a particular * invoker is desirable. */public class Invoker extends LoopingThread{	private static Logger log = Logger.getLogger(Invoker.class);    /**     * The unit encapsulates a unit of executable code that will be run on     * the invoker thread. It also provides facilities for additional code     * to be run on the main thread once the primary code has completed on     * the invoker thread.     */    public static abstract class Unit implements Runnable    {        /** The time at which this unit was placed on the queue. */        public long queueStamp;        /** The default constructor. */        public Unit () {}        /** Creates an invoker unit which will report the supplied name in         * {@link #toString}. */        public Unit (String name)        {            _name = name;        }        /**         * This method is called on the invoker thread and should be used         * to perform the primary function of the unit. It can return true         * to cause the <code>handleResult</code> method to be         * subsequently invoked on the dobjmgr thread (generally to allow         * the results of the invocation to be acted upon back in the         * context of the regular world) or false to indicate that no         * further processing should be performed.         *         * @return true if the <code>handleResult</code> method should be         * invoked on the main thread, false if not.         */        public abstract boolean invoke ();        /**         * Invocation unit implementations can implement this function to         * perform any post-unit-invocation processing back on the main         * thread. It will be invoked if <code>invoke</code> returns true.         */        public void handleResult ()        {            // do nothing by default        }        // we want to be a runnable to make the receiver interface simple,        // but we'd like for invocation unit implementations to be able to        // put their result handling code into an aptly named method        public void run ()        {            handleResult();        }        /** Returns the name of this invoker. */        public String toString ()        {            return _name;        }        protected String _name = "Unknown";    }    /**     * Creates an invoker that will post results to the supplied result     * receiver.     */    public Invoker (String name, RunQueue resultReceiver)    {        super(name);        _receiver = resultReceiver;    }    /**     * Configures the duration after which an invoker unit will be considered     * "long". When they complete, long units have a warning message     * logged. The default long threshold is 500 milliseconds.     */    public void setLongThresholds (long longThreshold)    {        _longThreshold = longThreshold;    }    /**     * Posts a unit to this invoker for subsequent invocation on the     * invoker's thread.     */    public void postUnit (Unit unit)    {        // note the time        unit.queueStamp = System.currentTimeMillis();        // and append it to the queue        _queue.append(unit);    }    // documentation inherited    public void iterate ()    {        // pop the next item off of the queue        Unit unit = _queue.get();        long start;        if (PERF_TRACK) {            // record the time spent on the queue as a special unit            start = System.currentTimeMillis();            synchronized (this) {                _unitsRun++;            }            // record the time spent on the queue as a special unit            recordMetrics("queue_wait_time", start - unit.queueStamp);        }        try {            willInvokeUnit(unit, start);            if (unit.invoke()) {                // if it returned true, we post it to the receiver thread                // to invoke the result processing                _receiver.postRunnable(unit);            }            didInvokeUnit(unit, start);        } catch (Throwable t) {        	log.warn("Invocation unit failed [unit=" + unit + "].",t);        }    }    /**     * Shuts down the invoker thread by queueing up a unit that will cause     * the thread to exit after all currently queued units are processed.     */    public void shutdown ()    {        _queue.append(new Unit() {            public boolean invoke () {                _running = false;                return false;            }        });    }    /**     * Called before we process an invoker unit.     *     * @param unit the unit about to be invoked.     * @param start a timestamp recorded immediately before invocation if     * {@link #PERF_TRACK} is enabled, 0L otherwise.     */    protected void willInvokeUnit (Unit unit, long start)    {    }    /**     * Called before we process an invoker unit.     *     * @param unit the unit about to be invoked.     * @param start a timestamp recorded immediately before invocation if     * {@link #PERF_TRACK} is enabled, 0L otherwise.     */    protected void didInvokeUnit (Unit unit, long start)    {        // track some performance metrics        if (PERF_TRACK) {            long duration = System.currentTimeMillis() - start;            Object key = unit.getClass();            recordMetrics(key, duration);            // report long runners            if (duration > _longThreshold) {                String howLong = (duration >= 10*_longThreshold) ?                    "Really long" : "Long";                log.warn(howLong + " invoker unit [unit=" + unit +                            " (" + key + "), time=" + duration + "ms].");            }        }    }    protected void recordMetrics (Object key, long duration)    {        UnitProfile prof = _tracker.get(key);        if (prof == null) {            _tracker.put(key, prof = new UnitProfile());        }        prof.record(duration);    }    /** Used to track profile information on invoked units. */    protected static class UnitProfile    {        public void record (long duration) {            _totalElapsed += duration;            _histo.addValue((int)duration);        }        public void clear () {            _totalElapsed = 0L;            _histo.clear();        }        public String toString () {            int count = _histo.size();            return _totalElapsed + "ms/" + count + " = " +                (_totalElapsed/count) + "ms avg " +                Arrays.toString(_histo.getBuckets());        }        // track in buckets of 50ms up to 500ms        protected Histogram _histo = new Histogram(0, 50, 10);        protected long _totalElapsed;    }    public int size(){    	return _queue.size();    }    /** The invoker's queue of units to be executed. */    protected Queue<Unit> _queue = new Queue<Unit>();    /** The result receiver with which we're working. */    protected RunQueue _receiver;    /** Tracks the counts of invocations by unit's class. */    protected HashMap<Object,UnitProfile> _tracker =        new HashMap<Object,UnitProfile>();    /** The total number of invoker units run since the last report. */    protected int _unitsRun;    /** The duration of time after which we consider a unit to be delinquent     * and log a warning. */    protected long _longThreshold = 500L;    /** Whether or not to track invoker unit performance. */    protected static final boolean PERF_TRACK = true;}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?