collectd.java
来自「opennms得相关源码 请大家看看」· Java 代码 · 共 713 行 · 第 1/2 页
JAVA
713 行
//// This file is part of the OpenNMS(R) Application.//// OpenNMS(R) is Copyright (C) 2002-2003 The OpenNMS Group, Inc. All rights reserved.// OpenNMS(R) is a derivative work, containing both original code, included code and modified// code that was published under the GNU General Public License. Copyrights for modified // and included code are below.//// OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc.//// Modifications://// 2004 Dec 27: Changed SQL_RETRIEVE_INTERFACES to omit interfaces that have been// marked as deleted.// 2004 Feb 12: Rebuild the package to ip list mapping while a new discoveried interface// to be scheduled.// 2003 Jan 31: Cleaned up some unused imports.// // Original code base Copyright (C) 1999-2001 Oculan Corp. All rights reserved.//// This program is free software; you can redistribute it and/or modify// it under the terms of the GNU General Public License as published by// the Free Software Foundation; either version 2 of the License, or// (at your option) any later version.//// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the// GNU General Public License for more details. //// You should have received a copy of the GNU General Public License// along with this program; if not, write to the Free Software// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.// // For more information contact: // OpenNMS Licensing <license@opennms.org>// http://www.opennms.org/// http://www.opennms.com///package org.opennms.netmgt.collectd;import java.io.IOException;import java.lang.reflect.UndeclaredThrowableException;import java.net.InetAddress;import java.net.UnknownHostException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;import java.util.Collections;import java.util.Enumeration;import java.util.HashMap;import java.util.Iterator;import java.util.LinkedList;import java.util.List;import java.util.Map;import java.util.Set;import java.util.TreeMap;import org.apache.log4j.Category;import org.apache.log4j.Priority;import org.exolab.castor.xml.MarshalException;import org.exolab.castor.xml.ValidationException;import org.opennms.core.fiber.PausableFiber;import org.opennms.core.utils.ThreadCategory;import org.opennms.netmgt.config.CollectdConfigFactory;import org.opennms.netmgt.config.DatabaseConnectionFactory;import org.opennms.netmgt.config.PollOutagesConfigFactory;import org.opennms.netmgt.config.collectd.CollectdConfiguration;import org.opennms.netmgt.config.collectd.Collector;import org.opennms.netmgt.scheduler.ReadyRunnable;import org.opennms.netmgt.scheduler.Scheduler;import org.opennms.netmgt.config.collectd.Package;public final class Collectd implements PausableFiber { /** * Log4j category */ private final static String LOG4J_CATEGORY = "OpenNMS.Collectd"; /** * SQL used to retrieve all the interfaces which support a particular * service. */ private final static String SQL_RETRIEVE_INTERFACES = "SELECT DISTINCT ifServices.nodeid,ifServices.ipaddr FROM ifServices, service, ipinterface WHERE ifServices.serviceid = service.serviceid AND service.servicename = ? AND ifServices.ipaddr = ipinterface.ipaddr and ifServices.nodeid = ipinterface.nodeid and ipinterface.ismanaged != 'D'"; /** * SQL used to retrieve all the service id's and names from the database. */ private final static String SQL_RETRIEVE_SERVICE_IDS = "SELECT serviceid,servicename FROM service"; /** * Singleton instance of the Collectd class */ private final static Collectd m_singleton = new Collectd(); /** * Holds map of service names to service identifiers */ private final static Map m_serviceIds = new HashMap(); /** * List of all CollectableService objects. */ private List m_collectableServices; /** * Reference to the collection scheduler */ private Scheduler m_scheduler; /** * Status of the Collectd instance. */ private int m_status; /** * Reference to the event processor */ private BroadcastEventProcessor m_receiver; /** * Map of all available ServiceCollector objects indexed by service name */ private static Map m_svcCollectors; /** * Indicates if scheduling of existing interfaces has been completed * */ private boolean m_schedulingCompleted = false; /** * Constructor. */ private Collectd() { m_scheduler = null; m_status = START_PENDING; m_svcCollectors = Collections.synchronizedMap(new TreeMap()); m_collectableServices = Collections.synchronizedList(new LinkedList()); } /** * Responsible for starting the collection daemon. */ public synchronized void init() { // Set the category prefix ThreadCategory.setPrefix(LOG4J_CATEGORY); // get the category logger final Category log = ThreadCategory.getInstance(); if (log.isDebugEnabled()) log.debug("init: Initializing collection daemon"); loadConfigFactory(log); loadscheduledOutagesConfigFactory(log); if (log.isDebugEnabled()) log.debug("init: Loading collectors"); // Collectd configuration // CollectdConfigFactory cCfgFactory = CollectdConfigFactory.getInstance(); CollectdConfiguration config = cCfgFactory.getConfiguration(); instantiateCollectors(log, config); buildServiceIdMap(log); createScheduler(log, config); ReadyRunnable interfaceScheduler = buildSchedule(log); m_scheduler.schedule(interfaceScheduler, 0); createEventProcessor(log); } private void createEventProcessor(final Category log) { // Create an event receiver. The receiver will // receive events, process them, creates network // interfaces, and schedulers them. // try { if (log.isDebugEnabled()) log.debug("init: Creating event broadcast event processor"); m_receiver = new BroadcastEventProcessor(m_collectableServices); } catch (Throwable t) { if (log.isEnabledFor(Priority.FATAL)) log.fatal("init: Failed to initialized the broadcast event receiver", t); throw new UndeclaredThrowableException(t); } } private ReadyRunnable buildSchedule(final Category log) { // Schedule existing interfaces for data collection ReadyRunnable interfaceScheduler = new ReadyRunnable() { public boolean isReady() { return true; } public void run() { // try { scheduleExistingInterfaces(); } catch (SQLException sqlE) { if (log.isEnabledFor(Priority.ERROR)) log.error("start: Failed to schedule existing interfaces", sqlE); } finally { setSchedulingCompleted(true); } } }; return interfaceScheduler; } private void createScheduler(final Category log, CollectdConfiguration config) { // Create a scheduler // try { if (log.isDebugEnabled()) log.debug("init: Creating collectd scheduler"); m_scheduler = new Scheduler("Collectd", config.getThreads()); } catch (RuntimeException e) { if (log.isEnabledFor(Priority.FATAL)) log.fatal("init: Failed to create collectd scheduler", e); throw e; } } private void buildServiceIdMap(final Category log) { // Make sure we can connect to the database and load // the services table so we can easily convert from // service name to service id // if (log.isDebugEnabled()) log.debug("start: Testing database connection"); java.sql.Connection ctest = null; ResultSet rs = null; try { DatabaseConnectionFactory.init(); ctest = DatabaseConnectionFactory.getInstance().getConnection(); PreparedStatement loadStmt = ctest.prepareStatement(SQL_RETRIEVE_SERVICE_IDS); // go ahead and load the service table // rs = loadStmt.executeQuery(); while (rs.next()) { Integer id = new Integer(rs.getInt(1)); String name = rs.getString(2); m_serviceIds.put(name, id); } } catch (IOException iE) { if (log.isEnabledFor(Priority.FATAL)) log.fatal("start: IOException getting database connection", iE); throw new UndeclaredThrowableException(iE); } catch (MarshalException mE) { if (log.isEnabledFor(Priority.FATAL)) log.fatal("start: Marshall Exception getting database connection", mE); throw new UndeclaredThrowableException(mE); } catch (ValidationException vE) { if (log.isEnabledFor(Priority.FATAL)) log.fatal("start: Validation Exception getting database connection", vE); throw new UndeclaredThrowableException(vE); } catch (SQLException sqlE) { if (log.isEnabledFor(Priority.FATAL)) log.fatal("start: Error accessing database.", sqlE); throw new UndeclaredThrowableException(sqlE); } catch (ClassNotFoundException cnfE) { if (log.isEnabledFor(Priority.FATAL)) log.fatal("start: Error accessing database.", cnfE); throw new UndeclaredThrowableException(cnfE); } finally { if (rs != null) { try { rs.close(); } catch (Exception e) { if (log.isInfoEnabled()) log.info("start: an error occured closing the result set", e); } } if (ctest != null) { try { ctest.close(); } catch (Exception e) { if (log.isInfoEnabled()) log.info("start: an error occured closing the SQL connection", e); } } } } private void instantiateCollectors(final Category log, CollectdConfiguration config) { // Load up an instance of each collector from the config // so that the event processor will have them for // new incomming events to create collectable service objects. // Enumeration eiter = config.enumerateCollector(); while (eiter.hasMoreElements()) { Collector collector = (Collector) eiter.nextElement(); try { if (log.isDebugEnabled()) { log.debug("init: Loading collector " + collector.getService() + ", classname " + collector.getClassName()); } Class cc = Class.forName(collector.getClassName()); ServiceCollector sc = (ServiceCollector) cc.newInstance(); // Attempt to initialize the service collector // Map properties = null; // properties not currently used sc.initialize(properties); m_svcCollectors.put(collector.getService(), sc); } catch (Throwable t) { if (log.isEnabledFor(Priority.WARN)) { log.warn("init: Failed to load collector " + collector.getClassName() + " for service " + collector.getService(), t); } } } } private void loadscheduledOutagesConfigFactory(final Category log) { // Load up the configuration for the scheduled outages. // try { PollOutagesConfigFactory.reload(); } catch (MarshalException ex) { if (log.isEnabledFor(Priority.FATAL)) log.fatal("init: Failed to load poll-outage configuration", ex); throw new UndeclaredThrowableException(ex); } catch (ValidationException ex) { if (log.isEnabledFor(Priority.FATAL)) log.fatal("init: Failed to load poll-outage configuration", ex); throw new UndeclaredThrowableException(ex); } catch (IOException ex) { if (log.isEnabledFor(Priority.FATAL)) log.fatal("init: Failed to load poll-outage configuration", ex); throw new UndeclaredThrowableException(ex); } } private void loadConfigFactory(final Category log) { // Load collectd configuration file // try { CollectdConfigFactory.reload(); } catch (MarshalException ex) { if (log.isEnabledFor(Priority.FATAL)) log.fatal("init: Failed to load collectd configuration", ex); throw new UndeclaredThrowableException(ex); } catch (ValidationException ex) {
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?