broadcasteventprocessor.java
来自「opennms得相关源码 请大家看看」· Java 代码 · 共 710 行 · 第 1/2 页
JAVA
710 行
//// This file is part of the OpenNMS(R) Application.//// OpenNMS(R) is Copyright (C) 2002-2003 The OpenNMS Group, Inc. All rights reserved.// OpenNMS(R) is a derivative work, containing both original code, included code and modified// code that was published under the GNU General Public License. Copyrights for modified // and included code are below.//// OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc.//// Modifications://// 2003 Jan 31: Cleaned up some unused imports.//// Original code base Copyright (C) 1999-2001 Oculan Corp. All rights reserved.//// This program is free software; you can redistribute it and/or modify// it under the terms of the GNU General Public License as published by// the Free Software Foundation; either version 2 of the License, or// (at your option) any later version.//// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the// GNU General Public License for more details. //// You should have received a copy of the GNU General Public License// along with this program; if not, write to the Free Software// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.// // For more information contact: // OpenNMS Licensing <license@opennms.org>// http://www.opennms.org/// http://www.opennms.com///// Tab Size = 8//package org.opennms.netmgt.threshd;import java.net.InetAddress;import java.util.ArrayList;import java.util.Enumeration;import java.util.Iterator;import java.util.List;import java.util.ListIterator;import java.util.Map;import org.apache.log4j.Category;import org.opennms.core.utils.ThreadCategory;import org.opennms.netmgt.config.ThreshdConfigFactory;import org.opennms.netmgt.EventConstants;import org.opennms.netmgt.eventd.EventIpcManagerFactory;import org.opennms.netmgt.eventd.EventListener;import org.opennms.netmgt.scheduler.Scheduler;import org.opennms.netmgt.xml.event.Event;import org.opennms.netmgt.xml.event.Parm;import org.opennms.netmgt.xml.event.Parms;import org.opennms.netmgt.xml.event.Value;/** * * @author <a href="mailto:mike@opennms.org">Mike Davidson </a> * @author <a href="http://www.opennms.org/">OpenNMS </a> */final class BroadcastEventProcessor implements EventListener { /** * The map of service names to service thresholders. */ private Map m_monitors; /** * The scheduler assocated with this receiver */ private Scheduler m_scheduler; /** * List of ThresholdableService objects. */ private List m_thresholdableServices; /** * This constructor is called to initilize the JMS event receiver. A * connection to the message server is opened and this instance is setup as * the endpoint for broadcast events. When a new event arrives it is * processed and the appropriate action is taken. * * @param thresholdableServices * List of all the ThresholdableService objects scheduled for * thresholding. * */ BroadcastEventProcessor(List thresholdableServices) { Category log = ThreadCategory.getInstance(getClass()); // Set the configuration for this event // receiver. // m_scheduler = Threshd.getInstance().getScheduler(); m_thresholdableServices = thresholdableServices; // Create the jms message selector installMessageSelector(); } /** * Create message selector to set to the subscription */ private void installMessageSelector() { // Create the JMS selector for the ueis this service is interested in // List ueiList = new ArrayList(); // nodeGainedService ueiList.add(EventConstants.NODE_GAINED_SERVICE_EVENT_UEI); // interfaceIndexChanged // NOTE: No longer interested in this event...if Capsd detects // that in interface's index has changed a // 'reinitializePrimarySnmpInterface' event is generated. // ueiList.add(EventConstants.INTERFACE_INDEX_CHANGED_EVENT_UEI); // primarySnmpInterfaceChanged ueiList.add(EventConstants.PRIMARY_SNMP_INTERFACE_CHANGED_EVENT_UEI); // reinitializePrimarySnmpInterface ueiList.add(EventConstants.REINITIALIZE_PRIMARY_SNMP_INTERFACE_EVENT_UEI); // interfaceReparented ueiList.add(EventConstants.INTERFACE_REPARENTED_EVENT_UEI); // nodeDeleted ueiList.add(EventConstants.NODE_DELETED_EVENT_UEI); // duplicateNodeDeleted ueiList.add(EventConstants.DUP_NODE_DELETED_EVENT_UEI); // interfaceDeleted ueiList.add(EventConstants.INTERFACE_DELETED_EVENT_UEI); // serviceDeleted ueiList.add(EventConstants.SERVICE_DELETED_EVENT_UEI); // scheduled outage configuration change ueiList.add(EventConstants.SCHEDOUTAGES_CHANGED_EVENT_UEI); EventIpcManagerFactory.getInstance().getManager().addEventListener(this, ueiList); } /** * </p> * Closes the current connections to the Java Message Queue if they are * still active. This call may be invoked more than once safely and may be * invoked during object finalization. * </p> * */ synchronized void close() { EventIpcManagerFactory.getInstance().getManager().removeEventListener(this); } /** * This method may be invoked by the garbage thresholding. Once invoked it * ensures that the <code>close</code> method is called <em>at least</em> * once during the cycle of this object. * */ protected void finalize() throws Throwable { close(); // ensure it's closed } public String getName() { return "Threshd:BroadcastEventProcessor"; } /** * This method is invoked by the JMS topic session when a new event is * available for processing. Currently only text based messages are * processed by this callback. Each message is examined for its Universal * Event Identifier and the appropriate action is taking based on each UEI. * * @param event * The event message. * */ public void onEvent(Event event) { Category log = ThreadCategory.getInstance(getClass()); // print out the uei // if (log.isDebugEnabled()) { log.debug("received event, uei = " + event.getUei()); } if(event.getUei().equals(EventConstants.SCHEDOUTAGES_CHANGED_EVENT_UEI)) { log.warn("Reloading Threshd config factory"); try { ThreshdConfigFactory.reload(); } catch (Exception e) { e.printStackTrace(); log.error("Failed to reload ThreshdConfigFactory because "+e.getMessage()); } Threshd.getInstance().refreshServicePackages(); } else if(!event.hasNodeid()) { // For all other events, if the event doesn't have a nodeId it can't be processed. log.info("no database node id found, discarding event"); } else if (event.getUei().equals(EventConstants.NODE_GAINED_SERVICE_EVENT_UEI)) { // If there is no interface then it cannot be processed // if (event.getInterface() == null || event.getInterface().length() == 0) { log.info("no interface found, discarding event"); } else { nodeGainedServiceHandler(event); } } else if (event.getUei().equals(EventConstants.PRIMARY_SNMP_INTERFACE_CHANGED_EVENT_UEI)) { // If there is no interface then it cannot be processed // if (event.getInterface() == null || event.getInterface().length() == 0) { log.info("no interface found, discarding event"); } else { primarySnmpInterfaceChangedHandler(event); } } else if (event.getUei().equals(EventConstants.REINITIALIZE_PRIMARY_SNMP_INTERFACE_EVENT_UEI)) { // If there is no interface then it cannot be processed // if (event.getInterface() == null || event.getInterface().length() == 0) { log.info("no interface found, discarding event"); } else { reinitializePrimarySnmpInterfaceHandler(event); } } else if (event.getUei().equals(EventConstants.INTERFACE_REPARENTED_EVENT_UEI)) { // If there is no interface then it cannot be processed // if (event.getInterface() == null || event.getInterface().length() == 0) { log.info("no interface found, discarding event"); } else { interfaceReparentedHandler(event); } } // NEW NODE OUTAGE EVENTS else if (event.getUei().equals(EventConstants.NODE_DELETED_EVENT_UEI) || event.getUei().equals(EventConstants.DUP_NODE_DELETED_EVENT_UEI)) { nodeDeletedHandler(event); } else if (event.getUei().equals(EventConstants.INTERFACE_DELETED_EVENT_UEI)) { // If there is no interface then it cannot be processed // if (event.getInterface() == null || event.getInterface().length() == 0) { log.info("no interface found, discarding event"); } else { interfaceDeletedHandler(event); } } else if (event.getUei().equals(EventConstants.SERVICE_DELETED_EVENT_UEI)) { // If there is no interface then it cannot be processed // if (event.getInterface() == null || event.getInterface().length() == 0) { log.info("no interface found, discarding event"); } else if (event.getService() == null || event.getService().length() == 0) { // If there is no service then it cannot be processed // log.info("no service found, discarding event"); } else { serviceDeletedHandler(event); } } } // end onEvent() /** * Process the event. * * This event is generated when a managed node which supports SNMP gains a * new interface. In this situation the ThresholdableService object * representing the primary SNMP interface of the node must be * reinitialized. * * The ThresholdableService object associated with the primary SNMP * interface for the node will be marked for reinitialization. * Reinitializing the ThresholdableService object consists of calling the * ServiceThresholder.release() method followed by the * ServiceThresholder.initialize() method which will refresh attributes such * as the interface key list and number of interfaces (both of which most * likely have changed). * * Reinitialization will take place the next time the ThresholdableService * is popped from an interval queue for thresholding. * * If any errors occur scheduling the service no error is returned. * * @param event * The event to process. */ private void reinitializePrimarySnmpInterfaceHandler(Event event) { Category log = ThreadCategory.getInstance(getClass()); if (event.getInterface() == null) { log.error("reinitializePrimarySnmpInterface event is missing an interface."); return; } // Mark the primary SNMP interface for reinitialization in // order to update any modified attributes associated with // the collectable service.. // // Iterate over the ThresholdableService objects in the // updates map and mark any which have the same interface // address for reinitialization // synchronized (m_thresholdableServices) { Iterator iter = m_thresholdableServices.iterator(); while (iter.hasNext()) { ThresholdableService tSvc = (ThresholdableService) iter.next(); InetAddress addr = (InetAddress) tSvc.getAddress(); if (addr.getHostAddress().equals(event.getInterface())) { synchronized (tSvc) { // Got a match! Retrieve the ThresholderUpdates object // associated // with this ThresholdableService. ThresholderUpdates updates = tSvc.getThresholderUpdates(); // Now set the reinitialization flag updates.markForReinitialization(); if (log.isDebugEnabled()) log.debug("reinitializePrimarySnmpInterfaceHandler: marking " + event.getInterface() + " for reinitialization for service SNMP."); } } } } } /** * Process the event, construct a new ThresholdableService object * representing the node/interface combination, and schedule the interface * for thresholding. * * If any errors occur scheduling the interface no error is returned. * * @param event * The event to process. * */ private void nodeGainedServiceHandler(Event event) { Category log = ThreadCategory.getInstance(getClass()); // Currently only support SNMP data thresholding. // if (!event.getService().equals("SNMP") && !event.getService().equals("SNMPv1") && !event.getService().equals("SNMPv2")) return; // Schedule the new service... // Threshd.getInstance().scheduleInterface((int) event.getNodeid(), event.getInterface(), event.getService(), false); } /** * Process the 'primarySnmpInterfaceChanged' event.
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?