⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 basiceventmanager.java

📁 一个java方面的消息订阅发送的源码
💻 JAVA
字号:
/**
 * Redistribution and use of this software and associated documentation
 * ("Software"), with or without modification, are permitted provided
 * that the following conditions are met:
 *
 * 1. Redistributions of source code must retain copyright
 *    statements and notices.  Redistributions must also contain a
 *    copy of this document.
 *
 * 2. Redistributions in binary form must reproduce the
 *    above copyright notice, this list of conditions and the
 *    following disclaimer in the documentation and/or other
 *    materials provided with the distribution.
 *
 * 3. The name "Exolab" must not be used to endorse or promote
 *    products derived from this Software without prior written
 *    permission of Exoffice Technologies.  For written permission,
 *    please contact info@exolab.org.
 *
 * 4. Products derived from this Software may not be called "Exolab"
 *    nor may "Exolab" appear in their names without prior written
 *    permission of Exoffice Technologies. Exolab is a registered
 *    trademark of Exoffice Technologies.
 *
 * 5. Due credit should be given to the Exolab Project
 *    (http://www.exolab.org/).
 *
 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
 * OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 * Copyright 2000-2004 (C) Exoffice Technologies Inc. All Rights Reserved.
 *
 * $Id: BasicEventManager.java,v 1.1 2004/11/26 01:50:41 tanderson Exp $
 */
package org.exolab.jms.events;

import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.exolab.jms.service.BasicService;
import org.exolab.jms.service.ServiceException;
import org.exolab.jms.service.ServiceState;
import org.exolab.jms.common.threads.ThreadPool;
import org.exolab.jms.common.util.OrderedQueue;
import org.exolab.jms.threads.ThreadPoolExistsException;
import org.exolab.jms.threads.ThreadPoolManager;


/**
 * The EventManager manages {@link Event} objects. It has methods to
 * register and unregister events. It also extends {@link Runnable} interface
 * which defines the thread responsible for dispatching events.
 * <p>
 * An event is defined to occur at sometime in the future, as specified either
 * by an absolute time through {@link #registerEvent} or as relative time
 * through {@link #registerEventRelative}. An event must have an associated
 * event type and may have an attached <code>Serializable</code>,
 * which is used when the EventManager makes a callback to the registered
 * handler when the event fires.
 * <p>
 * The register methids will return an event identifier which can subsequently
 * be used to unregister the event through the {@link #unregisterEvent} event.
 * This is the only means of unregister an event.
 * <p>
 * If the {@link Event} object is incorrectly specified then the
 * {@link IllegalEventDefinedException} exception is raised.
 * <p>
 * When an event fires the {@link EventManager} is responsible for ensuring
 * that the event handler is notified. If the event handler has since been
 * removed then the EventManager must gracefully abort the delivery and
 * continue processing the next event.
 * <p>
 * Objects of type {@link Event} need to survive subsequent
 * {@link EventManager} restarts, as such they must be persisted, which
 * implies that the {@link EventHandler} needs to also be persisted. The
 * ability to store the {@link EventHandler} as a <code>HandleIfc</code> object
 * which can later be resolved to an object will be required.
 *
 * @version   $Revision: 1.1 $ $Date: 2004/11/26 01:50:41 $
 * @author    <a href="mailto:wood@intalio.com">Chris Wood</a>
 */
public class BasicEventManager
    extends BasicService
    implements EventManager {

    // The unique name of this ThreadPool.
    public transient static final String NAME = "EventManager";

    // The max number of threads for this pool.
    public transient static final int MAX_THREADS = 5;

    /**
     * Maps ids to events.
     */
    private HashMap _events = new HashMap();

    /**
     * Thread pool manager.
     */
    private transient ThreadPool _pool;

    /**
     * Synchonization for the following two collections.
     */
    private transient Object _queueSync = new Object();

    /**
     * Event queue.
     */
    private transient OrderedQueue _queue = new OrderedQueue(_queueComparator);

    /**
     * Used to generate unique queue entry ids.
     */
    private transient long _seed;

    /**
     * this is the name of the EventManagerThread in which events excecute.
     */
    private static final String EVENT_MANAGER_THREAD_NAME =
        "EventManagerThread";

    /**
     * Singleton instance.
     */
    transient static private BasicEventManager _instance = null;

    /**
     * The logger
     */
    private static final Log _log = LogFactory.getLog(BasicEventManager.class);


    /**
     * Return the singleton instance of the EventManager
     *
     * @return    EventManager
     */
    public static BasicEventManager instance() {
        if (_instance == null)
            _instance = new BasicEventManager();

        return _instance;
    }

    protected BasicEventManager() {
        super(EVENT_MANAGER_THREAD_NAME);
    }

    /**
     * Register an event to be fired once and only once at the specified
     * abolsute time. The event object must be Serializable so that it can
     * be persisted and restored across EventManager restarts.
     * <p>
     * If the specified event is ill-defined then the IllegalEventDefined-
     * Exception exception is thrown.
     * <p>
     * Similarly, if the abolsute time has already passed then the exception
     * IllegalEventDefinedException is raised.
     * <p>
     * The method returns an unique event identifier, which can subsequently
     * be used to deregister the event.
     *
     * @param event    information about the event
     * @param abolsute the abolsute time, in ms, that the event
     *                 must fire
     * @return String  unique event identifier
     * @exception IllegalEventDefinedException
     */
    public String registerEvent(Event event, long absolute)
        throws IllegalEventDefinedException {
        synchronized (_queueSync) {
            QueueEntry entry = new QueueEntry(event, absolute, generateId());

            // add entry to the queue.
            _queue.add(entry);
            _events.put(entry.id, entry);

            // notify the event thread.
            _queueSync.notifyAll();
            return entry.id;
        }
    }

    /**
     * Register an event to be fired once and only once at a time relative to
     * now. The event object must be Serializable so that it can be persisted
     * and restored across EventManager restarts.
     * <p>
     * If the specified event is ill-defined then the IllegalEventDefined-
     * Exception exception is thrown.
     * <p>
     * The method returns an unique event identifier, which can subsequently
     * be used to deregister the event.
     *
     * @param event    information about the event
     * @param relative the relative time in ms
     *                 (currently no reference to locale).
     * @return String  unique event identifier,
     * @exception IllegalEventDefinedException
     */
    public String registerEventRelative(Event event, long relative)
        throws IllegalEventDefinedException {
        return registerEvent(event, System.currentTimeMillis() + relative);
    }

    /**
     * Unregister the event specified by the event identifier. If the event
     * does not exist then fail silently.
     *
     * @param String unique event identifier.
     */
    public void unregisterEvent(String id) {
        synchronized (_queueSync) {
            // remove from the events list
            Object obj = _events.remove(id);
            if (obj == null)
                return;
            // remove from the queue.
            _queue.remove(obj);
        }
    }

    // implementation of BasicService.run
    public void run() {
        synchronized (_queueSync) {
            QueueEntry entry;
            long currentTime;
            while (getState() != ServiceState.STOPPED) {
                currentTime = System.currentTimeMillis();
                try {
                    entry = (QueueEntry) _queue.firstElement();
                } catch (java.util.NoSuchElementException ex) {
                    // queue is empty.
                    try {
                        _queueSync.wait();
                    } catch (InterruptedException ex1) {
                        break;
                    }
                    continue;
                }

                if (entry.absolute <= currentTime) {
                    // trigger any expired events
                    try {
                        getThreadPool().execute(entry);
                    } catch (InterruptedException ex) {
                    }
                    _queue.removeFirstElement();
                    _events.remove(entry.id);
                } else {
                    // wait for either the next event to expire or an element to be
                    // added to the queue.
                    try {
                        _queueSync.wait(entry.absolute - currentTime);
                    } catch (InterruptedException ex) {
                        // ignore
                    }
                }
            }
        }
    }

    /**
     * Generate unique queued object identifier.
     */
    private synchronized String generateId() {
        return Long.toString(++_seed);
    }

    public void start() throws ServiceException {
        super.start();
    }

    /**
     * Return a reference ot the thread pool manager. This object is chached
     * for future reference
     *
     * @return      ThreadPool
     */
    private ThreadPool getThreadPool() {
        if (_pool == null) {
            // At startup Event Mgr is triggered before the Service
            // locator has registered the ThreadPoolMgr, causing
            // an exception. Use the instance variable for now to
            // avoid this problem. jimm
            // _pool = (ThreadPoolMgr)ServiceLocator.locateService(
            //    ServiceConstants.ThreadPoolManager);
            try {
                _pool = ThreadPoolManager.instance().createThreadPool
                    (NAME, MAX_THREADS);
            } catch (ThreadPoolExistsException err) {
                _log.error("Thread pool " + NAME + " already exists");
            }
        }

        return _pool;
    }

    /**
     * Compare queue entries on expiration times
     */
    private transient static final Comparator _queueComparator =
        new Comparator() {

            public int compare(Object obj1, Object obj2) {
                QueueEntry qe1 = (QueueEntry) obj1;
                QueueEntry qe2 = (QueueEntry) obj2;

                if (qe1.absolute < qe2.absolute)
                    return -1;
                if (qe1.absolute > qe2.absolute)
                    return 1;
                return 0;
            }

            public boolean equals(Object that) {
                return (this == that);
            }
        };

    /**
     * Entry on the task queue.
     */
    class QueueEntry implements Runnable {

        QueueEntry(Event event, long absolute, String id) {
            this.absolute = absolute;
            this.event = event;
            this.id = id;
        }

        private long absolute;
        private Event event;
        private String id;

        public void run() {
            event.getEventListener().handleEvent(event.getEventType(),
                event.getCallbackObject(), System.currentTimeMillis());
        }
    }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -