rtcmanager.java
来自「opennms得相关源码 请大家看看」· Java 代码 · 共 781 行 · 第 1/2 页
JAVA
781 行
//// This file is part of the OpenNMS(R) Application.//// OpenNMS(R) is Copyright (C) 2002-2003 The OpenNMS Group, Inc. All rights reserved.// OpenNMS(R) is a derivative work, containing both original code, included code and modified// code that was published under the GNU General Public License. Copyrights for modified // and included code are below.//// OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc.//// Copyright (C) 1999-2001 Oculan Corp. All rights reserved.//// This program is free software; you can redistribute it and/or modify// it under the terms of the GNU General Public License as published by// the Free Software Foundation; either version 2 of the License, or// (at your option) any later version.//// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the// GNU General Public License for more details. //// You should have received a copy of the GNU General Public License// along with this program; if not, write to the Free Software// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.// // For more information contact: // OpenNMS Licensing <license@opennms.org>// http://www.opennms.org/// http://www.opennms.com///// Tab Size = 8//package org.opennms.netmgt.rtc;import java.io.IOException;import java.lang.reflect.UndeclaredThrowableException;import java.util.Map;import java.util.Timer;import java.util.TimerTask;import org.apache.log4j.Category;import org.exolab.castor.xml.MarshalException;import org.exolab.castor.xml.ValidationException;import org.opennms.core.concurrent.RunnableConsumerThreadPool;import org.opennms.core.fiber.PausableFiber;import org.opennms.core.utils.ThreadCategory;import org.opennms.netmgt.config.RTCConfigFactory;/** * Maintains calculations for categories. * <P> * The RTCManager maintains data required so as to calculate availability for * the different categories configured in categories.xml * </P> * * <P> * The RTC initializes its data from the database when it comes up. It then * subscribes to the Events subsystem to receive events of interest to keep the * data up-to-date * </P> * * <P> * Availability data is sent out to listeners who indicate that they are * listening by sending an RTC 'subscribe' event. The subscribe event has an URL * and user/passwd info. so RTC can post data to the URL * </P> * * <P> * The RTC has two timers(a low threshold and a high threshold) and a counter * that can run upto a configurable max number of events - these are used to * determine when availability information is to be sent out when event streams * are coming in at normal rates. When no events are received, a timer * configured with a user configured time(defaulting to one minute) decides the * interval at which data is sent * </P> * * @author <A HREF="mailto:sowmya@opennms.org">Sowmya Kumaraswamy </A> * @author <A HREF="http://www.opennms.org">OpenNMS.org </A> * * @see org.opennms.netmgt.rtc.RTCConstants * @see org.opennms.netmgt.rtc.DataSender * @see org.opennms.netmgt.rtc.DataManager */public final class RTCManager implements PausableFiber { /** * The log4j category used to log debug messsages and statements. */ private static final String LOG4J_CATEGORY = "OpenNMS.RTCManager"; /** * Singleton instance of this class */ private static final RTCManager m_singleton = new RTCManager(); /** * The id for the low threshold timer task */ private static final String LOWT_TASK = "lowTtask"; /** * The id for the high threshold timer task */ private static final String HIGHT_TASK = "highTtask"; /** * The id for the user refresh timer task */ private static final String USERTIMER = "userTimer"; /** * The initial number of updater threads */ private static final int NUM_UPDATERS = 5; /** * The configurable rolling window read from the properties file */ private static long m_rollingWindow = -1; /** * The RTC timer */ private Timer m_timer; /** * The low threshold timer task */ private TimerTask m_lowTtask; /** * The low threshold refresh interval. The low threshold at which data is * sent out */ private long m_lowThresholdInterval = -1; /** * The high threshold timer task */ private TimerTask m_highTtask; /** * The high threshold refresh interval. The high threshold at which data is * sent out */ private long m_highThresholdInterval = -1; /** * The user refresh timer task */ private TimerTask m_userTask; /** * The user refresh interval. The interval at which data is sent even if no * events are received */ private long m_userRefreshInterval = -1; /** * The counter keeping track of the number of messages */ private int m_counter = -1; /** * The maximum number of events that are received before a resend. Note that * this or the timers going off, whichever occurs first triggers a resend */ private int MAX_EVENTS_BEFORE_RESEND = -1; /** * The events receiver */ private BroadcastEventProcessor m_eventReceiver; /** * The RunnableConsumerThreadPool that runs updaters that interpret and * update the data */ private RunnableConsumerThreadPool m_updaterPool; /** * The DataSender */ private DataSender m_dataSender; /** * manager of the data maintained by the RTC */ private static DataManager m_dataMgr; /** * The current status of this fiber */ private int m_status; /** * The timer scheduled task that runs and informs the RTCManager when the * timer goes off */ private class RTCTimerTask extends TimerTask { /** * The timer id */ private String m_id; /** * Constructor for the timer task * * @param id * the timertask ID */ RTCTimerTask(String id) { m_id = id; } /** * Return the ID * * @return the ID */ public String getID() { return m_id; } /** * Starts the task. When run, simply inform the manager that this has * been called by the timer */ public void run() { timerTaskComplete(this); } } /** * Handles a completed task. * * <P> * If the low threshold or high threshold timers expire, send category data * out and set both timer(task)s to null so they can be reset when the next * event comes in * <P> * * <P> * If the user refresh timer is the one that expired, send category data out * and reset the user timer(task) * <P> * * @param tt * the task that is finishing. */ private synchronized void timerTaskComplete(RTCTimerTask tt) { Category log = ThreadCategory.getInstance(getClass()); if (log.isDebugEnabled()) log.debug("TimerTask \'" + tt.getID() + "\' complete, status: " + m_status); if (tt.getID().equals(LOWT_TASK)) { // cancel user timer boolean ret = m_userTask.cancel(); if (log.isDebugEnabled()) log.debug("timerTaskComplete: " + USERTIMER + " cancelled: " + ret); // send out the info and reset both timers if (m_highTtask != null) { ret = m_highTtask.cancel(); if (log.isDebugEnabled()) log.debug("timerTaskComplete: " + HIGHT_TASK + " cancelled: " + ret); m_highTtask = null; } if (m_status == RUNNING) { m_dataSender.notifyToSend(); } m_lowTtask = null; m_counter = -1; // reset the user timer m_timer.schedule((m_userTask = new RTCTimerTask(USERTIMER)), 0, m_userRefreshInterval); if (log.isDebugEnabled()) log.debug("timerTaskComplete: " + USERTIMER + " scheduled"); } else if (tt.getID().equals(HIGHT_TASK)) { // cancel user timer boolean ret = m_userTask.cancel(); if (log.isDebugEnabled()) log.debug("timerTaskComplete: " + USERTIMER + " cancelled: " + ret); // send the category information out reset all timers if (m_lowTtask != null) { ret = m_lowTtask.cancel(); if (log.isDebugEnabled()) log.debug("timerTaskComplete: " + LOWT_TASK + " cancelled: " + ret); m_lowTtask = null; } if (m_status == RUNNING) { m_dataSender.notifyToSend(); } m_highTtask = null; m_counter = -1; // reset the user timer m_timer.schedule((m_userTask = new RTCTimerTask(USERTIMER)), 0, m_userRefreshInterval); if (log.isDebugEnabled()) log.debug("timerTaskComplete: " + USERTIMER + " scheduled"); } else if (tt.getID().equals(USERTIMER)) { // send if not pasued if (m_status == RUNNING) { m_dataSender.notifyToSend(); } } } /** * The constructor for the RTCManager * */ public RTCManager() { m_status = START_PENDING; } /** * Check the timer tasks. Reset any of the timer tasks if they need to be * reset (indicated by their being set to null on timer task completion). If * the events counter has exceeded maxEventsBeforeResend, send data out and * reset timers */ public synchronized void checkTimerTasksOnEventReceipt() { Category log = ThreadCategory.getInstance(getClass()); if (log.isDebugEnabled()) log.debug("checkTimerTasksOnEventReceipt: Checking if timer tasks need to be reset or data needs to be sent out"); // cancel user timer boolean ret = m_userTask.cancel(); if (log.isDebugEnabled()) log.debug("checkTimerTasksOnEventReceipt: " + USERTIMER + " cancelled: " + ret); // Check the counter to see if timers need to be started afresh if (m_counter == -1) { m_counter = 0; // // set timers // // set the low threshold timer task if (m_lowTtask == null) { try { m_timer.schedule((m_lowTtask = new RTCTimerTask(LOWT_TASK)), m_lowThresholdInterval); if (log.isDebugEnabled()) log.debug("checkTimerTasksOnEventReceipt: " + LOWT_TASK + " scheduled"); } catch (IllegalStateException isE) { log.error("checkTimerTasksOnEventReceipt: Illegal State adding new RTCTimerTask", isE); } } // set the high threshold timer task only if currently null if (m_highTtask == null) { try { m_timer.schedule((m_highTtask = new RTCTimerTask(HIGHT_TASK)), m_highThresholdInterval); if (log.isDebugEnabled()) log.debug("checkTimerTasksOnEventReceipt: " + HIGHT_TASK + " scheduled"); } catch (IllegalStateException isE) { log.error("checkTimerTasksOnEventReceipt: Illegal State adding new RTCTimerTask", isE); } } } if (MAX_EVENTS_BEFORE_RESEND > 0 && m_counter >= MAX_EVENTS_BEFORE_RESEND) { if (log.isDebugEnabled()) log.debug("checkTimerTasksOnEventReceipt: max events before resend limit reached, resetting timers"); // send the category information out and reset all timers if (m_lowTtask != null) { ret = m_lowTtask.cancel(); if (log.isDebugEnabled()) log.debug("checkTimerTasksOnEventReceipt: " + LOWT_TASK + " cancelled: " + ret); m_lowTtask = null; } if (m_highTtask != null) {
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?