defaultqueuehandler.java

来自「opennms得相关源码 请大家看看」· Java 代码 · 共 289 行

JAVA
289
字号
//// This file is part of the OpenNMS(R) Application.//// OpenNMS(R) is Copyright (C) 2002-2003 The OpenNMS Group, Inc.  All rights reserved.// OpenNMS(R) is a derivative work, containing both original code, included code and modified// code that was published under the GNU General Public License. Copyrights for modified // and included code are below.//// OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc.//// Modifications://// 2003 Jan 31: Cleaned up some unused imports.// // Original code base Copyright (C) 1999-2001 Oculan Corp.  All rights reserved.//// This program is free software; you can redistribute it and/or modify// it under the terms of the GNU General Public License as published by// the Free Software Foundation; either version 2 of the License, or// (at your option) any later version.//// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the// GNU General Public License for more details.                                                            //// You should have received a copy of the GNU General Public License// along with this program; if not, write to the Free Software// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.//       // For more information contact: //      OpenNMS Licensing       <license@opennms.org>//      http://www.opennms.org///      http://www.opennms.com///// Tab Size = 8//package org.opennms.netmgt.notifd;import java.util.Iterator;import java.util.List;import java.util.SortedMap;import org.apache.log4j.Category;import org.opennms.core.utils.ThreadCategory;import org.opennms.core.utils.TimeConverter;/** * This class is used as a thread for executing notices for events that are * discovered by the notice daemon. The notices are read from an scheduler queue * and the processes are created by the fiber. Each created process is added to * garbage collection list that is periodically polled and culled based upon the * status of the process or how long the process is run. If the process has run * long than allocated it is terminated during collection. *  * @author <a href="mailto:jason@opennms.org">Jason Johns </a> * @author <a href="http://www.opennms.org/>OpenNMS </a> *  */public class DefaultQueueHandler implements NotifdQueueHandler {    /**     * The input queue of runnable commands.     */    private NoticeQueue m_noticeQueue;    /**     * The name of this Fiber     */    private String m_queueID;    /**     * How long to sleep between processing more notices     */    private long m_interval;    /**     * The status of this fiber.     */    private int m_status;    /**     *      */    public DefaultQueueHandler() {        m_status = START_PENDING;    }    /**     *      */    public void setQueueID(String queueID) {        m_queueID = queueID;    }    /**     *      */    public void setNoticeQueue(NoticeQueue noticeQueue) {        m_noticeQueue = noticeQueue;    }    /**     *      */    public void setInterval(String interval) {        m_interval = TimeConverter.convertToMillis(interval);    }    /**     * The main worker of the fiber. This method is executed by the encapsualted     * thread to read commands from the execution queue and to execute those     * commands. If the thread is interrupted or the status changes to     * <code>STOP_PENDING</code> then the method will return as quickly as     * possible.     *      */    public void run() {        synchronized (this) {            m_status = RUNNING;        }        for (;;) {            synchronized (this) {                // if stopped or stop pending then break out                //                if (m_status == STOP_PENDING || m_status == STOPPED)                    break;                // if paused or pause pending then block                //                while (m_status == PAUSE_PENDING || m_status == PAUSED) {                    m_status = PAUSED;                    try {                        wait();                    } catch (InterruptedException ex) {                        // exit                        break;                    }                }                // if resume pending then change to running                //                if (m_status == RESUME_PENDING)                    m_status = RUNNING;            }            processQueue();            synchronized (this) {                // wait for the next iteration                try {                    wait(m_interval);                } catch (InterruptedException ex) {                    // exit                    break;                }            }        } // end infinite loop        synchronized (this) {            m_status = STOPPED;        }    } // end run    /**     *      */    public void processQueue() {        Category log = ThreadCategory.getInstance(getClass());        try {            Long now = new Long(System.currentTimeMillis());            SortedMap readyNotices = m_noticeQueue.headMap(now);            Iterator i = readyNotices.values().iterator();            while (i.hasNext()) {                Object o = i.next();                if (o instanceof NotificationTask) {                    NotificationTask task = (NotificationTask) o;                    task.start();                } else if (o instanceof List) {                    List list = (List) o;                    for (int j = 0; j < list.size(); j++) {                        NotificationTask task = (NotificationTask) list.get(j);                        task.start();                    }                }            }            readyNotices.clear();            log.debug("current state of tree: ");            log.debug("\n" + m_noticeQueue);        } catch (Exception e) {            log.error(e.getMessage(), e);        }    }    /**     * Starts the fiber. If the fiber has already been run or is currently     * running then an exception is generated. The status of the fiber is     * updated to <code>STARTING</code> and will transisition to <code>     * RUNNING</code>     * when the fiber finishes initializing and begins processing the     * encapsulaed queue.     *      * @throws java.lang.IllegalStateException     *             Thrown if the fiber is stopped or has never run.     *      */    public synchronized void start() {        m_status = STARTING;        Thread thread = new Thread(this, m_queueID);        thread.start();    }    /**     * Stops a currently running fiber. If the fiber has already been stopped     * then the command is silently ignored. If the fiber was never started then     * an exception is generated.     *      * @throws java.lang.IllegalStateException     *             Thrown if the fiber was never started.     *      */    public synchronized void stop() {        if (m_status != STOPPED)            m_status = STOP_PENDING;        notifyAll();    }    /**     * Pauses a currently running fiber. If the fiber was not in a running or     * resuming state then the command is silently discarded. If the fiber is     * not running or has terminated then an exception is generated.     *      * @throws java.lang.IllegalStateException     *             Thrown if the fiber is stopped or has never run.     *      */    public synchronized void pause() {        if (m_status == RUNNING || m_status == RESUME_PENDING) {            m_status = PAUSE_PENDING;            notifyAll();        }    }    /**     * Resumes the fiber if it is paused. If the fiber was not in a paused or     * pause pending state then the request is discarded. If the fiber has not     * been started or has already stopped then an exception is generated.     *      * @throws java.lang.IllegalStateException     *             Thrown if the fiber is stopped or has never run.     *      */    public synchronized void resume() {        if (m_status == PAUSED || m_status == PAUSE_PENDING) {            m_status = RESUME_PENDING;            notifyAll();        }    }    /**     * Returns the name of this fiber.     *      * @return The name of the fiber.     */    public String getName() {        return m_queueID;    }    /**     * Returns the current status of the pausable fiber.     *      * @return The current status of the fiber.     *      * @see org.opennms.core.fiber.PausableFiber     * @see org.opennms.core.fiber.Fiber     */    public synchronized int getStatus() {        return m_status;    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?