runnableconsumerthreadpool.java
来自「opennms得相关源码 请大家看看」· Java 代码 · 共 712 行 · 第 1/2 页
JAVA
712 行
//// This file is part of the OpenNMS(R) Application.//// OpenNMS(R) is Copyright (C) 2002-2003 The OpenNMS Group, Inc. All rights reserved.// OpenNMS(R) is a derivative work, containing both original code, included code and modified// code that was published under the GNU General Public License. Copyrights for modified // and included code are below.//// OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc.// // Copyright (C) 1999-2001 Oculan Corp. All rights reserved.//// This program is free software; you can redistribute it and/or modify// it under the terms of the GNU General Public License as published by// the Free Software Foundation; either version 2 of the License, or// (at your option) any later version.//// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the// GNU General Public License for more details.//// You should have received a copy of the GNU General Public License// along with this program; if not, write to the Free Software// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.//// For more information contact:// OpenNMS Licensing <license@opennms.org>// http://www.opennms.org/// http://www.opennms.com/////// Tab Size = 8////package org.opennms.core.concurrent;import java.util.ArrayList;import java.util.List;import org.apache.log4j.Category;import org.opennms.core.fiber.Fiber;import org.opennms.core.queue.ClosableFifoQueue;import org.opennms.core.queue.FifoQueue;import org.opennms.core.queue.FifoQueueClosedException;import org.opennms.core.queue.FifoQueueException;import org.opennms.core.queue.FifoQueueImpl;import org.opennms.core.utils.ThreadCategory;public class RunnableConsumerThreadPool extends Object implements Fiber { /** * The queue where runnable objects are added. */ private SizingFifoQueue m_delegateQ; /** * The list of running fibers in the pool. The list allows the size of the * pool to vary. */ private Fiber[] m_fibers; /** * The name of the pool. */ private String m_poolName; /** * The high water mark ratio for the pool. */ private float m_hiRatio; /** * The low water mark ratio for the pool. */ private float m_loRatio; /** * The maximum size for the thread pool. */ private int m_maxSize; /** * The log4j prefix used when starting up a new fiber! */ private String m_log4jPrefix; /** * The pool status */ private int m_poolStatus; /** * The set of listeners to call when a Runnable completes successfully. */ private List m_completedListeners; /** * The thread group that all pool threads belong to. */ private ThreadGroup m_tGroup; /** * The set of listeners to call when a Runnable fails to complete * successfully. */ private List m_errorListeners; /** * <p> * This class is used to create a queue that auto adjust the number of * threads in the pool. Each time an addition or removal of queue elements * occur the pool will be adjusted. This will cause some synchronization * overhead, but it should be correct implementation. * </p> * * <p> * To avoid the condition of toggleing a single thread the lo and hi water * marks should have a large cushion between them. * </p> * * @author <a href="mailto:weave@oculan.com">Brian Weaver </a> * @author <a href="http://www.opennms.org/">OpenNMS </a> * */ private class SizingFifoQueue extends FifoQueueImpl implements ClosableFifoQueue { /** * Determines if the queue is open or closed. If the queue is closed * then an exception is thrown on queue additions. Also, queue removals * will cause a queue exception if the queue is empty. */ private volatile boolean m_isClosed = false; /** * Adjust the size of the thread pool based on the ratio of queue * elements to threads. The thread pool is adjusted by the lo and hi * water marks which are a ratio of elements to threads. */ private void adjust() { int e = size(); synchronized (m_fibers) { int alive = livingFiberCount(); float ratio = (float) e / (float) (alive <= 0 ? 1 : alive); // Never stop the last thread!? // if (alive > 1 && ratio <= m_loRatio) { // IF // 1) Fibers greater than one, and... // 2) ratio less than low water mark // Fiber f = null; int last = Fiber.START_PENDING; for (int x = 0; x < m_fibers.length; x++) { if (m_fibers[x] != null) { switch (m_fibers[x].getStatus()) { case Fiber.RUNNING: if (last < Fiber.RUNNING) { f = m_fibers[x]; last = f.getStatus(); } break; case Fiber.STOP_PENDING: if (last < Fiber.STOP_PENDING) { f = null; last = Fiber.STOP_PENDING; } break; } } } if (f != null && f.getStatus() != Fiber.STOP_PENDING) { Category log = ThreadCategory.getInstance(this.getClass()); if (log.isDebugEnabled()) log.debug("adjust: calling stop on fiber " + f.getName()); f.stop(); } } else if (((alive == 0 && e > 0) || ratio > m_hiRatio) && alive < m_maxSize) { // If // 1a) Fibers equal to zero and queue not empty, or.. // 1a) ratio greater than hiRatio, and... // 2) Fibers less than max size // for (int x = 0; x < m_fibers.length; x++) { if (m_fibers[x] == null || m_fibers[x].getStatus() == Fiber.STOPPED) { Fiber f = new FiberThreadImpl(m_poolName + "-fiber" + x); f.start(); m_fibers[x] = f; Category log = ThreadCategory.getInstance(this.getClass()); if (log.isDebugEnabled()) log.debug("adjust: started fiber " + f.getName() + " ratio = " + ratio + ", alive = " + alive); break; } } } } // synchronized m_fibers } /** * Returns true if the queue is currently open. * * @return True if the queue is open. */ public boolean isOpen() { return !m_isClosed; } /** * Returns true if the queue is currently closed. * * @return True if the queue is closed. */ public boolean isClosed() { return m_isClosed; } /** * Closes a currently open queue. When a queue is closed is should still * allow elements already in the queue to be removed, but new elements * should not be added. * * @exception org.opennms.core.queue.FifoQueueException * Thrown if an error occurs closing the queue. */ public void close() throws FifoQueueException { m_isClosed = true; } /** * Ensures that the queue is open and new elements can be added to the * queue. * * @exception org.opennms.core.queue.FifoQueueException * Thrown if an error occurs opening the queue. */ public void open() throws FifoQueueException { m_isClosed = false; } /** * Inserts a new element into the queue. * * @param element * The object to append to the queue. * * @exception org.opennms.core.queue.FifoQueueException * Thrown if a queue error occurs. * @exception java.lang.InterruptedException * Thrown if the thread is interrupted. */ public void add(Object element) throws FifoQueueException, InterruptedException { if (m_isClosed) throw new FifoQueueClosedException("Queue Closed"); super.add(element); adjust(); } /** * Inserts a new element into the queue. If the queue has reached an * implementation limit and the <code> * timeout</code> expires, then a * false value is returned to the caller. * * @param element * The object to append to the queue. * @param timeout * The time to wait on the insertion to succeed. * * @exception org.opennms.core.queue.FifoQueueException * Thrown if a queue error occurs. * @exception java.lang.InterruptedException * Thrown if the thread is interrupted. * * @return True if the element was successfully added to the queue * before the timeout expired, false otherwise. */ public boolean add(Object element, long timeout) throws FifoQueueException, InterruptedException { if (m_isClosed) throw new FifoQueueClosedException("Queue Closed"); boolean result = super.add(element, timeout); adjust(); return result; } /** * Removes the oldest element from the queue. * * @exception org.opennms.core.queue.FifoQueueException * Thrown if a queue error occurs. * @exception java.lang.InterruptedException * Thrown if the thread is interrupted. * * @return The oldest object in the queue. */ public Object remove() throws FifoQueueException, InterruptedException { if (m_isClosed && size() == 0) throw new FifoQueueClosedException("Queue Closed"); Object result = super.remove(); adjust(); return result; } /** * Removes the next element from the queue if one becomes available * before the timeout expires. If the timeout expires before an element * is available then a <code>null</code> reference is returned to the * caller. * * @param timeout * The time to wait on an object to be available. * * @exception org.opennms.core.queue.FifoQueueException * Thrown if a queue error occurs. * @exception java.lang.InterruptedException * Thrown if the thread is interrupted. * * @return The oldest object in the queue, or <code>null</code> if one * is not available. */ public Object remove(long timeout) throws FifoQueueException, InterruptedException { if (m_isClosed && size() == 0) throw new FifoQueueClosedException("Queue Closed"); Object result = super.remove(timeout); adjust(); return result; } } // end SizingFifoQueue /** * This class implements the {@link org.opennms.core.fiber.Fiber Fiber} * interface on top of a Java {@link java.lang.Thread Thread}instance. * These fibers are the basic unit of work in the pool structure. Each fiber * reads from the input queue and calls the run method on the associated * instance. When finished the fiber invoked the appropriate callback and * then repeats the process. * * @author <a href="mailto:weave@oculan.com">Brian Weaver </a> * @author <a href="http://www.opennms.org">OpenNMS </a> * */ private class FiberThreadImpl extends Object implements Fiber, Runnable { /** * The core thread that is running this fiber. */ private Thread m_delegateThread; /** * if set true then the thread should exist as soon as possible.
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?