runnableconsumerthreadpool.java

来自「opennms得相关源码 请大家看看」· Java 代码 · 共 712 行 · 第 1/2 页

JAVA
712
字号
//// This file is part of the OpenNMS(R) Application.//// OpenNMS(R) is Copyright (C) 2002-2003 The OpenNMS Group, Inc.  All rights reserved.// OpenNMS(R) is a derivative work, containing both original code, included code and modified// code that was published under the GNU General Public License. Copyrights for modified // and included code are below.//// OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc.// // Copyright (C) 1999-2001 Oculan Corp.  All rights reserved.//// This program is free software; you can redistribute it and/or modify// it under the terms of the GNU General Public License as published by// the Free Software Foundation; either version 2 of the License, or// (at your option) any later version.//// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the// GNU General Public License for more details.//// You should have received a copy of the GNU General Public License// along with this program; if not, write to the Free Software// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.//// For more information contact://      OpenNMS Licensing       <license@opennms.org>//      http://www.opennms.org///      http://www.opennms.com/////// Tab Size = 8////package org.opennms.core.concurrent;import java.util.ArrayList;import java.util.List;import org.apache.log4j.Category;import org.opennms.core.fiber.Fiber;import org.opennms.core.queue.ClosableFifoQueue;import org.opennms.core.queue.FifoQueue;import org.opennms.core.queue.FifoQueueClosedException;import org.opennms.core.queue.FifoQueueException;import org.opennms.core.queue.FifoQueueImpl;import org.opennms.core.utils.ThreadCategory;public class RunnableConsumerThreadPool extends Object implements Fiber {    /**     * The queue where runnable objects are added.     */    private SizingFifoQueue m_delegateQ;    /**     * The list of running fibers in the pool. The list allows the size of the     * pool to vary.     */    private Fiber[] m_fibers;    /**     * The name of the pool.     */    private String m_poolName;    /**     * The high water mark ratio for the pool.     */    private float m_hiRatio;    /**     * The low water mark ratio for the pool.     */    private float m_loRatio;    /**     * The maximum size for the thread pool.     */    private int m_maxSize;    /**     * The log4j prefix used when starting up a new fiber!     */    private String m_log4jPrefix;    /**     * The pool status     */    private int m_poolStatus;    /**     * The set of listeners to call when a Runnable completes successfully.     */    private List m_completedListeners;    /**     * The thread group that all pool threads belong to.     */    private ThreadGroup m_tGroup;    /**     * The set of listeners to call when a Runnable fails to complete     * successfully.     */    private List m_errorListeners;    /**     * <p>     * This class is used to create a queue that auto adjust the number of     * threads in the pool. Each time an addition or removal of queue elements     * occur the pool will be adjusted. This will cause some synchronization     * overhead, but it should be correct implementation.     * </p>     *      * <p>     * To avoid the condition of toggleing a single thread the lo and hi water     * marks should have a large cushion between them.     * </p>     *      * @author <a href="mailto:weave@oculan.com">Brian Weaver </a>     * @author <a href="http://www.opennms.org/">OpenNMS </a>     *      */    private class SizingFifoQueue extends FifoQueueImpl implements ClosableFifoQueue {        /**         * Determines if the queue is open or closed. If the queue is closed         * then an exception is thrown on queue additions. Also, queue removals         * will cause a queue exception if the queue is empty.         */        private volatile boolean m_isClosed = false;        /**         * Adjust the size of the thread pool based on the ratio of queue         * elements to threads. The thread pool is adjusted by the lo and hi         * water marks which are a ratio of elements to threads.         */        private void adjust() {            int e = size();            synchronized (m_fibers) {                int alive = livingFiberCount();                float ratio = (float) e / (float) (alive <= 0 ? 1 : alive);                // Never stop the last thread!?                //                if (alive > 1 && ratio <= m_loRatio) {                    // IF                    // 1) Fibers greater than one, and...                    // 2) ratio less than low water mark                    //                    Fiber f = null;                    int last = Fiber.START_PENDING;                    for (int x = 0; x < m_fibers.length; x++) {                        if (m_fibers[x] != null) {                            switch (m_fibers[x].getStatus()) {                            case Fiber.RUNNING:                                if (last < Fiber.RUNNING) {                                    f = m_fibers[x];                                    last = f.getStatus();                                }                                break;                            case Fiber.STOP_PENDING:                                if (last < Fiber.STOP_PENDING) {                                    f = null;                                    last = Fiber.STOP_PENDING;                                }                                break;                            }                        }                    }                    if (f != null && f.getStatus() != Fiber.STOP_PENDING) {                        Category log = ThreadCategory.getInstance(this.getClass());                        if (log.isDebugEnabled())                            log.debug("adjust: calling stop on fiber " + f.getName());                        f.stop();                    }                } else if (((alive == 0 && e > 0) || ratio > m_hiRatio) && alive < m_maxSize) {                    // If                    // 1a) Fibers equal to zero and queue not empty, or..                    // 1a) ratio greater than hiRatio, and...                    // 2) Fibers less than max size                    //                    for (int x = 0; x < m_fibers.length; x++) {                        if (m_fibers[x] == null || m_fibers[x].getStatus() == Fiber.STOPPED) {                            Fiber f = new FiberThreadImpl(m_poolName + "-fiber" + x);                            f.start();                            m_fibers[x] = f;                            Category log = ThreadCategory.getInstance(this.getClass());                            if (log.isDebugEnabled())                                log.debug("adjust: started fiber " + f.getName() + " ratio = " + ratio + ", alive = " + alive);                            break;                        }                    }                }            } // synchronized m_fibers        }        /**         * Returns true if the queue is currently open.         *          * @return True if the queue is open.         */        public boolean isOpen() {            return !m_isClosed;        }        /**         * Returns true if the queue is currently closed.         *          * @return True if the queue is closed.         */        public boolean isClosed() {            return m_isClosed;        }        /**         * Closes a currently open queue. When a queue is closed is should still         * allow elements already in the queue to be removed, but new elements         * should not be added.         *          * @exception org.opennms.core.queue.FifoQueueException         *                Thrown if an error occurs closing the queue.         */        public void close() throws FifoQueueException {            m_isClosed = true;        }        /**         * Ensures that the queue is open and new elements can be added to the         * queue.         *          * @exception org.opennms.core.queue.FifoQueueException         *                Thrown if an error occurs opening the queue.         */        public void open() throws FifoQueueException {            m_isClosed = false;        }        /**         * Inserts a new element into the queue.         *          * @param element         *            The object to append to the queue.         *          * @exception org.opennms.core.queue.FifoQueueException         *                Thrown if a queue error occurs.         * @exception java.lang.InterruptedException         *                Thrown if the thread is interrupted.         */        public void add(Object element) throws FifoQueueException, InterruptedException {            if (m_isClosed)                throw new FifoQueueClosedException("Queue Closed");            super.add(element);            adjust();        }        /**         * Inserts a new element into the queue. If the queue has reached an         * implementation limit and the <code>         * timeout</code> expires, then a         * false value is returned to the caller.         *          * @param element         *            The object to append to the queue.         * @param timeout         *            The time to wait on the insertion to succeed.         *          * @exception org.opennms.core.queue.FifoQueueException         *                Thrown if a queue error occurs.         * @exception java.lang.InterruptedException         *                Thrown if the thread is interrupted.         *          * @return True if the element was successfully added to the queue         *         before the timeout expired, false otherwise.         */        public boolean add(Object element, long timeout) throws FifoQueueException, InterruptedException {            if (m_isClosed)                throw new FifoQueueClosedException("Queue Closed");            boolean result = super.add(element, timeout);            adjust();            return result;        }        /**         * Removes the oldest element from the queue.         *          * @exception org.opennms.core.queue.FifoQueueException         *                Thrown if a queue error occurs.         * @exception java.lang.InterruptedException         *                Thrown if the thread is interrupted.         *          * @return The oldest object in the queue.         */        public Object remove() throws FifoQueueException, InterruptedException {            if (m_isClosed && size() == 0)                throw new FifoQueueClosedException("Queue Closed");            Object result = super.remove();            adjust();            return result;        }        /**         * Removes the next element from the queue if one becomes available         * before the timeout expires. If the timeout expires before an element         * is available then a <code>null</code> reference is returned to the         * caller.         *          * @param timeout         *            The time to wait on an object to be available.         *          * @exception org.opennms.core.queue.FifoQueueException         *                Thrown if a queue error occurs.         * @exception java.lang.InterruptedException         *                Thrown if the thread is interrupted.         *          * @return The oldest object in the queue, or <code>null</code> if one         *         is not available.         */        public Object remove(long timeout) throws FifoQueueException, InterruptedException {            if (m_isClosed && size() == 0)                throw new FifoQueueClosedException("Queue Closed");            Object result = super.remove(timeout);            adjust();            return result;        }    } // end SizingFifoQueue    /**     * This class implements the {@link org.opennms.core.fiber.Fiber Fiber}     * interface on top of a Java {@link java.lang.Thread Thread}instance.     * These fibers are the basic unit of work in the pool structure. Each fiber     * reads from the input queue and calls the run method on the associated     * instance. When finished the fiber invoked the appropriate callback and     * then repeats the process.     *      * @author <a href="mailto:weave@oculan.com">Brian Weaver </a>     * @author <a href="http://www.opennms.org">OpenNMS </a>     *      */    private class FiberThreadImpl extends Object implements Fiber, Runnable {        /**         * The core thread that is running this fiber.         */        private Thread m_delegateThread;        /**         * if set true then the thread should exist as soon as possible.

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?