📄 eventipcmanagerdefaultimpl.java
字号:
//// This file is part of the OpenNMS(R) Application.//// OpenNMS(R) is Copyright (C) 2002-2003 The OpenNMS Group, Inc. All rights reserved.// OpenNMS(R) is a derivative work, containing both original code, included code and modified// code that was published under the GNU General Public License. Copyrights for modified // and included code are below.//// OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc.//// Modifications://// 2003 Jan 31: Cleaned up some unused imports.// 2002 Oct 24: Changed all references to HashTable to HashMap//// Original code base Copyright (C) 1999-2001 Oculan Corp. All rights reserved.//// This program is free software; you can redistribute it and/or modify// it under the terms of the GNU General Public License as published by// the Free Software Foundation; either version 2 of the License, or// (at your option) any later version.//// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the// GNU General Public License for more details. //// You should have received a copy of the GNU General Public License// along with this program; if not, write to the Free Software// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.// // For more information contact: // OpenNMS Licensing <license@opennms.org>// http://www.opennms.org/// http://www.opennms.com///// Tab Size = 8//package org.opennms.netmgt.eventd;import java.io.IOException;import java.util.ArrayList;import java.util.HashMap;import java.util.Iterator;import java.util.List;import org.apache.log4j.Category;import org.exolab.castor.xml.MarshalException;import org.exolab.castor.xml.ValidationException;import org.opennms.core.concurrent.RunnableConsumerThreadPool;import org.opennms.core.queue.FifoQueue;import org.opennms.core.queue.FifoQueueException;import org.opennms.core.queue.FifoQueueImpl;import org.opennms.core.utils.ThreadCategory;import org.opennms.netmgt.config.EventdConfigFactory;import org.opennms.netmgt.xml.event.Event;import org.opennms.netmgt.xml.event.Events;import org.opennms.netmgt.xml.event.Log;/** * An implementation of the EventIpcManager interface that can be used to * communicate between services in the same JVM * * @author <A HREF="mailto:sowmya@opennms.org">Sowmya Nataraj </A> * @author <A HREF="http://www.opennms.org">OpenNMS.org </A> */public class EventIpcManagerDefaultImpl implements EventIpcManager { /** * Hashtable of list of event listeners keyed by event UEI */ private HashMap m_ueiListeners; /** * The list of event listeners interested in all events */ private List m_listeners; /** * Hashtable of event listener threads keyed by the listener's id */ private HashMap m_listenerThreads; /** * The thread pool handling the events */ private RunnableConsumerThreadPool m_eventHandlerPool; /** * The query string to get the next event id from the database sequence */ private String m_getNextEventIdStr; /** * A thread dedicated to each listener. The events meant for each listener * is added to a dedicated queue when the 'sendNow()' is called. The * ListenerThread reads events off of this queue and sends it to the * appropriate listener */ private class ListenerThread extends Object implements Runnable { /** * Listener to which this thread is dedicated */ private EventListener m_listener; /** * Queue from which events for the listener are to be read */ private FifoQueue m_queue; /** * The thread that is running this runnable. */ private Thread m_delegateThread; /** * if set true then the thread should exist as soon as possible. */ private volatile boolean m_shutdown; /** * Constructor */ ListenerThread(EventListener listener, FifoQueue lq) { m_shutdown = false; m_listener = listener; ; m_queue = lq; m_delegateThread = new Thread(this, listener.getName()); } public FifoQueue getQueue() { return m_queue; } /** * The run method preforms the actual work for the runnable. It loops * infinitely until the shutdown flag is set, during which time it * processes queue elements. Each element in the queue should be a * instance of {@link org.opennms.netmgt.xml.event.Event}. After each * event is read, the 'onEvent' method of the listener is invoked. * */ public void run() { Category log = ThreadCategory.getInstance(this.getClass()); if (log.isDebugEnabled()) log.debug("In ListenerThread " + m_listener.getName() + " run"); while (!m_shutdown) { Object obj = null; try { obj = m_queue.remove(500); if (obj == null) continue; } catch (InterruptedException ie) { m_shutdown = true; break; } catch (FifoQueueException fqE) { m_shutdown = true; break; } try { if (obj != null && obj instanceof Event) { Event event = (Event) obj; if (log.isDebugEnabled()) log.debug("run: calling onEvent on " + m_listener.getName() + " for event " + event.getUei()); m_listener.onEvent(event); } } catch (Throwable t) { log.warn("run: an unexpected error occured during ListenerThread " + m_listener.getName() + " run", t); } } } /** * Starts up the thread. */ public void start() { m_shutdown = false; m_delegateThread.start(); } /** * Sets the stop flag in the thread. */ public void stop() { m_shutdown = true; } } /** * Constructor */ public EventIpcManagerDefaultImpl() { m_ueiListeners = new HashMap(); m_listeners = new ArrayList(); m_listenerThreads = new HashMap(); // load the eventd configuration and get the number of threads that // should process the events EventdConfigFactory eFactory = null; try { EventdConfigFactory.reload(); eFactory = EventdConfigFactory.getInstance(); } catch (MarshalException ex) { Category log = ThreadCategory.getInstance(this.getClass()); log.error("Failed to load eventd configuration", ex); throw new UndeclaredEventException(ex); } catch (ValidationException ex) { Category log = ThreadCategory.getInstance(this.getClass()); log.error("Failed to load eventd configuration", ex); throw new UndeclaredEventException(ex); } catch (IOException ex) { Category log = ThreadCategory.getInstance(this.getClass()); log.error("Failed to load eventd configuration", ex); throw new UndeclaredEventException(ex); } // get number of threads int numReceivers = eFactory.getReceivers(); // create handler pool m_eventHandlerPool = new RunnableConsumerThreadPool("EventHandlerPool", 0.6f, 1.0f, numReceivers); // start pool m_eventHandlerPool.start(); // database sequence query string m_getNextEventIdStr = eFactory.getGetNextEventID(); } /** * Called by a service to send an event to other listeners. */ public synchronized void sendNow(Event event) { // create a new event handler for the event and queue it to the // eventhandler thread pool Events events = new Events(); events.addEvent(event); Log eventLog = new Log(); eventLog.setEvents(events); sendNow(eventLog); } /** * Called by a service to send a set of events to other listeners. */ public synchronized void sendNow(Log eventLog) { // create a new event handler for the events and queue it to the // eventhandler thread pool try { m_eventHandlerPool.getRunQueue().add(new EventHandler(eventLog, m_getNextEventIdStr)); } catch (InterruptedException iE) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -