eventqueueprocessor.java

来自「opennms得相关源码 请大家看看」· Java 代码 · 共 450 行

JAVA
450
字号
//// This file is part of the OpenNMS(R) Application.//// OpenNMS(R) is Copyright (C) 2002-2003 The OpenNMS Group, Inc.  All rights reserved.// OpenNMS(R) is a derivative work, containing both original code, included code and modified// code that was published under the GNU General Public License. Copyrights for modified // and included code are below.//// OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc.//// Modifications://// 2004 Jan 13: Added this new code for the XML RPC Daemon//// Original code base Copyright (C) 1999-2001 Oculan Corp.  All rights reserved.//// This program is free software; you can redistribute it and/or modify// it under the terms of the GNU General Public License as published by// the Free Software Foundation; either version 2 of the License, or// (at your option) any later version.//// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the// GNU General Public License for more details.//// You should have received a copy of the GNU General Public License// along with this program; if not, write to the Free Software// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.//// For more information contact://      OpenNMS Licensing       <license@opennms.org>//      http://www.opennms.org///      http://www.opennms.com///// Tab Size = 8//package org.opennms.netmgt.xmlrpcd;import java.util.Enumeration;import org.apache.log4j.Category;import org.opennms.core.fiber.PausableFiber;import org.opennms.core.queue.FifoQueue;import org.opennms.core.queue.FifoQueueException;import org.opennms.core.utils.ThreadCategory;import org.opennms.netmgt.EventConstants;import org.opennms.netmgt.config.xmlrpcd.XmlrpcServer;import org.opennms.netmgt.xml.event.Event;import org.opennms.netmgt.xml.event.Parm;import org.opennms.netmgt.xml.event.Parms;import org.opennms.netmgt.xml.event.Value;/** * The EventQueueProcessor processes the events recieved by xmlrpcd and sends * notifications to the external XMLRPC server via XMLRPC protocol. *  * @author <A HREF="mailto:jamesz@opennms.com">James Zuo </A> * @author <A HREF="http://www.opennms.org">OpenNMS.org </A> *  */class EventQueueProcessor implements Runnable, PausableFiber {    /**     * The input queue     */    private FifoQueue m_eventQ;    /**     * The max size of the event queue     */    private int m_maxQSize;    /**     * An object used to communicate with exteranl xmlrpc servers     */    private XmlRpcNotifier m_notifier;    /**     * Current status of the fiber     */    private int m_status;    /**     * The thread that is executing the <code>run</code> method on behalf of     * the fiber.     */    private Thread m_worker;    /**     * The constructor     */    EventQueueProcessor(FifoQueue eventQ, XmlrpcServer[] rpcServers, int retries, int elapseTime, boolean verifyServer, String localServer, int maxQSize) {        m_eventQ = eventQ;        m_maxQSize = maxQSize;        m_notifier = new XmlRpcNotifier(rpcServers, retries, elapseTime, verifyServer, localServer);    }    private void processEvent(Event event) {        Category log = ThreadCategory.getInstance(getClass());        String uei = event.getUei();        if (uei == null) {            // should only get registered events            if (log.isDebugEnabled())                log.debug("Event received with null UEI, ignoring event");            return;        }        if (log.isDebugEnabled())            log.debug("About to process event: " + event.getUei());        // get eventid        long eventId = -1;        if (event.hasDbid())            eventId = event.getDbid();        // get node id        long nodeId = -1;        if (event.hasNodeid())            nodeId = event.getNodeid();        String ipAddr = event.getInterface();        String service = event.getService();        String eventTime = event.getTime();        if (log.isDebugEnabled())            log.debug("Event\nuei\t\t" + uei + "\neventid\t\t" + eventId + "\nnodeid\t\t" + nodeId + "\nipaddr\t\t" + ipAddr + "\nservice\t\t" + service + "\neventtime\t" + (eventTime != null ? eventTime : "<null>"));        if (uei.equals(EventConstants.NODE_LOST_SERVICE_EVENT_UEI)) {            if (!m_notifier.sendServiceDownEvent(event)) {                pushBackEvent(event);            }        } else if (uei.equals(EventConstants.INTERFACE_DOWN_EVENT_UEI)) {            if (!m_notifier.sendInterfaceDownEvent(event)) {                pushBackEvent(event);            }        } else if (uei.equals(EventConstants.NODE_DOWN_EVENT_UEI)) {            if (!m_notifier.sendNodeDownEvent(event)) {                pushBackEvent(event);            }        } else if (uei.equals(EventConstants.NODE_UP_EVENT_UEI)) {            if (!m_notifier.sendNodeUpEvent(event)) {                pushBackEvent(event);            }        } else if (uei.equals(EventConstants.INTERFACE_UP_EVENT_UEI)) {            if (!m_notifier.sendInterfaceUpEvent(event)) {                pushBackEvent(event);            }        } else if (uei.equals(EventConstants.NODE_REGAINED_SERVICE_EVENT_UEI)) {            if (!m_notifier.sendServiceUpEvent(event)) {                pushBackEvent(event);            }        } else if (uei.equals(EventConstants.XMLRPC_NOTIFICATION_EVENT_UEI)) {            xmlrpcNotificationEventHandler(event);        }    }    /**     * Process xmlrpcNotificationEvent according the status flag to determine to     * send a notifyReceivedEvent, or a notifySuccess, or a notifyFailure     * notification to XMLRPC Server.     */    private void xmlrpcNotificationEventHandler(Event event) {        Category log = ThreadCategory.getInstance(getClass());        long txNo = -1L;        String sourceUei = null;        String notification = null;        int status = -1;        Parms parms = event.getParms();        if (parms != null) {            String parmName = null;            Value parmValue = null;            String parmContent = null;            Enumeration parmEnum = parms.enumerateParm();            while (parmEnum.hasMoreElements()) {                Parm parm = (Parm) parmEnum.nextElement();                parmName = parm.getParmName();                parmValue = parm.getValue();                if (parmValue == null)                    continue;                else                    parmContent = parmValue.getContent();                // get txNo                if (parmName.equals(EventConstants.PARM_TRANSACTION_NO)) {                    String temp = parmContent;                    if (log.isDebugEnabled())                        log.debug("ParmName: " + parmName + " /parmContent: " + parmContent);                    try {                        txNo = Long.valueOf(temp).longValue();                    } catch (NumberFormatException nfe) {                        log.warn("Parameter " + EventConstants.PARM_TRANSACTION_NO + " cannot be non-numberic", nfe);                        txNo = -1L;                    }                } else if (parmName.equals(EventConstants.PARM_SOURCE_EVENT_UEI)) {                    sourceUei = parmContent;                    if (log.isDebugEnabled())                        log.debug("ParmName: " + parmName + " /parmContent: " + parmContent);                } else if (parmName.equals(EventConstants.PARM_SOURCE_EVENT_MESSAGE)) {                    notification = parmContent;                    if (log.isDebugEnabled())                        log.debug("ParmName: " + parmName + " /parmContent: " + parmContent);                } else if (parmName.equals(EventConstants.PARM_SOURCE_EVENT_STATUS)) {                    String temp = parmContent;                    if (log.isDebugEnabled())                        log.debug("ParmName: " + parmName + " /parmContent: " + parmContent);                    try {                        status = Integer.valueOf(temp).intValue();                    } catch (NumberFormatException nfe) {                        log.warn("Parameter " + EventConstants.PARM_SOURCE_EVENT_STATUS + " cannot be non-numberic", nfe);                        status = -1;                    }                }            }        }        boolean validParameters = (txNo != -1L) && (sourceUei != null) && (notification != null) && (status != -1);        if (!validParameters) {            log.error("Invalid parameters.");            return;        }        switch (status) {        case EventConstants.XMLRPC_NOTIFY_RECEIVED:            if (!m_notifier.notifyReceivedEvent(txNo, sourceUei, notification)) {                pushBackEvent(event);            }            break;        case EventConstants.XMLRPC_NOTIFY_SUCCESS:            if (!m_notifier.notifySuccess(txNo, sourceUei, notification)) {                pushBackEvent(event);            }            break;        case EventConstants.XMLRPC_NOTIFY_FAILURE:            if (!m_notifier.notifyFailure(txNo, sourceUei, notification)) {                pushBackEvent(event);            }        }    }    /**     * Push the event back to the event queue if OpenNMS failed to send message     * to the external XMLRPC server, so that it could be send to the server     * again later.     */    private void pushBackEvent(Event event) {        Category log = ThreadCategory.getInstance(getClass());        // push the event back to the event queue        try {            if (m_eventQ.size() < m_maxQSize) {                m_eventQ.add(event);                if (log.isDebugEnabled())                    log.debug("Push the event back to queue.");            }            // re-establish connection to xmlrpc servers            m_notifier.createConnection();        } catch (FifoQueueException e) {            log.error("Failed to push the event back to queue", e);        } catch (InterruptedException e) {            log.error("Failed to push the event back to queue", e);        }    }    /**     * Returns true if the status is ok and the thread should continue running.     * If the status returend is false then the thread should exit.     *      */    private synchronized boolean statusOK() {        Category log = ThreadCategory.getInstance(getClass());        //        // Loop until there is a new client or we are shutdown        //        boolean exitThread = false;        boolean exitCheck = false;        while (!exitCheck) {            //            // check the child thread!            //            if (m_worker.isAlive() == false && m_status != STOP_PENDING) {                log.warn(getName() + " terminated abnormally");                m_status = STOP_PENDING;            }            //            // do normal status checks now            //            if (m_status == STOP_PENDING) {                exitCheck = true;                exitThread = true;                m_status = STOPPED;            } else if (m_status == PAUSE_PENDING) {                pause();            } else if (m_status == RESUME_PENDING) {                resume();            } else if (m_status == PAUSED) {                try {                    wait();                } catch (InterruptedException e) {                    m_status = STOP_PENDING;                }            } else if (m_status == RUNNING) {                exitCheck = true;            }        } // end !exit check        return !exitThread;    } // statusOK    /**     * Starts the current fiber. If the fiber has already been started,     * regardless of it's current state, then an IllegalStateException is     * thrown.     *      * @throws java.lang.IllegalStateException     *             Thrown if the fiber has already been started.     *      */    public synchronized void start() {        Category log = ThreadCategory.getInstance(getClass());        if (m_worker != null)            throw new IllegalStateException("The fiber is running or has already run");        m_status = STARTING;        m_worker = new Thread(this, getName());        m_worker.start();        if (log.isDebugEnabled())            log.debug(getName() + " started");    }    /**     * Pauses the current fiber.     */    public synchronized void pause() {        if (m_worker == null || m_worker.isAlive() == false)            throw new IllegalStateException("The fiber is not running");        m_status = PAUSED;        notifyAll();    }    /**     * Resumes the currently paused fiber.     */    public synchronized void resume() {        if (m_worker == null || m_worker.isAlive() == false)            throw new IllegalStateException("The fiber is not running");        m_status = RUNNING;        notifyAll();    }    /**     * <p>     * Stops this fiber. If the fiber has never been started then an     * <code>IllegalStateExceptio</code> is generated.     * </p>     *      * @throws java.lang.IllegalStateException     *             Thrown if the fiber has never been started.     */    public synchronized void stop() {        if (m_worker == null)            throw new IllegalStateException("The fiber has never run");        m_status = STOP_PENDING;        m_worker.interrupt();        notifyAll();    }    /**     * Returns the name of the fiber.     *      * @return The name of the Fiber.     */    public String getName() {        return "EventQueueProcessor";    }    /**     * Returns the current status of the fiber     *      * @return The status of the Fiber.     */    public synchronized int getStatus() {        if (m_worker != null && !m_worker.isAlive())            m_status = STOPPED;        return m_status;    }    /**     * Reads off of the event queue and depends on the uei of the event of read,     * process the event to send a notification to the external XMLRPC server     * via XMLRPC protocol.     *      */    public void run() {        Category log = ThreadCategory.getInstance(getClass());        synchronized (this) {            m_status = RUNNING;        }        while (statusOK()) {            Object obj = null;            try {                obj = m_eventQ.remove(1000);            } catch (InterruptedException iE) {                log.debug("Caught interrupted exception");                log.debug(iE.getLocalizedMessage(), iE);                obj = null;                m_status = STOP_PENDING;            } catch (FifoQueueException qE) {                log.debug("Caught fifo queue exception");                log.debug(qE.getLocalizedMessage(), qE);                obj = null;                m_status = STOP_PENDING;            }            if (obj != null && statusOK()) {                try {                    processEvent((Event) obj);                } catch (Throwable t) {                    log.error("Unexpected error processing event", t);                }            }        }    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?