⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 scheduler.java

📁 实现了Jms的服务器源码,支持多种适配器,DB,FTP,支持多种数据库
💻 JAVA
字号:
/**
 * Redistribution and use of this software and associated documentation
 * ("Software"), with or without modification, are permitted provided
 * that the following conditions are met:
 *
 * 1. Redistributions of source code must retain copyright
 *    statements and notices.  Redistributions must also contain a
 *    copy of this document.
 *
 * 2. Redistributions in binary form must reproduce the
 *    above copyright notice, this list of conditions and the
 *    following disclaimer in the documentation and/or other
 *    materials provided with the distribution.
 *
 * 3. The name "Exolab" must not be used to endorse or promote
 *    products derived from this Software without prior written
 *    permission of Exoffice Technologies.  For written permission,
 *    please contact info@exolab.org.
 *
 * 4. Products derived from this Software may not be called "Exolab"
 *    nor may "Exolab" appear in their names without prior written
 *    permission of Exoffice Technologies. Exolab is a registered
 *    trademark of Exoffice Technologies.
 *
 * 5. Due credit should be given to the Exolab Project
 *    (http://www.exolab.org/).
 *
 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
 * OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 * Copyright 2000-2003 (C) Exoffice Technologies Inc. All Rights Reserved.
 */

package org.exolab.jms.scheduler;

import java.util.HashMap;
import java.util.LinkedList;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.exolab.core.service.BasicService;
import org.exolab.core.service.ServiceException;
import org.exolab.core.threadPool.ThreadPool;
import org.exolab.jms.config.Configuration;
import org.exolab.jms.config.ConfigurationManager;
import org.exolab.jms.config.SchedulerConfiguration;
import org.exolab.jms.threads.ThreadPoolManager;


/**
 * The scheduler is responsible for executing {@link Runnable} objects
 * using a thread pool. Clients can add these objects to the scheduler
 * and the scheduler will, in fifo order, execute them. If there are no
 * threads currently available, the runnable will wait for one to become
 * available.
 * <p>
 * A client can add or remove {@link Runnable} objects.
 *
 * @version     $Revision: 1.9 $ $Date: 2003/08/17 01:32:25 $
 * @author      <a href="mailto:mourikis@intalio.com">Jim Mourikis</a>
 * @author      <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
 * @author      <a href="mailto:jima@intalio.com">Jim Alateras</a>
 * @see         java.lang.Runnable
 * @see         org.exolab.core.service.BasicService
 * @see         org.exolab.core.threadPool.ThreadPool
 */
public class Scheduler extends BasicService {

    /**
     * This is the thread pool used by the scheduler
     */
    private ThreadPool _threads = null;

    /**
     * The queue of Runnable instances
     */
    private LinkedList _queue = new LinkedList();;

    /**
     * HashMap of Runnable->Integer, representing the number of times a
     * Runnable object exists in the queue
     */
    private HashMap _references = new HashMap();

    /**
     * If true, shuts down the scheduler
     */
    private volatile boolean _stop = false;

    /**
     * This attribute holds the number of threads that the scheduler
     * will use in its pool. It defaults to 6 and has a minimum value
     * of 2
     */
    private int _threadCount = 6;

    /**
     * This is the minimum number of threads that can be used to
     * configure the scheduler. If a lower nmber is specified then
     * it defaults to this value
     */
    private final static int MIN_THREAD_COUNT = 2;

    /**
     * Unique name identifyting this sevice
     */
    private static final String SCHEDULER_NAME = "Scheduler";

    /**
     * This is the singleton instance of this class
     */
    private static Scheduler _instance = null;

    /**
     * The logger
     */
    private static final Log _log = LogFactory.getLog(Scheduler.class);

    /**
     * Creates the singleton instance
     *
     * @throws ServiceException if the scheduler can't be created
     */
    public static Scheduler createInstance() throws ServiceException {
        _instance = new Scheduler();
        return _instance;
    }

    /**
     * Returns the singleton instance
     *
     * @return the singleton instance, or <code>null</code> if it hasn't
     * been initialised
     */
    public static Scheduler instance() {
        return _instance;
    }

    /**
     * Construct an instance of the scheduler. This will read the information
     * from the configuration file and then set up a thread pool.
     *
     * @throws ServiceException if the thread pool can't be initialised
     */
    private Scheduler() throws ServiceException {
        super(SCHEDULER_NAME);

        // access the configuration file.
        Configuration config = ConfigurationManager.getConfig();
        SchedulerConfiguration sched_config =
            config.getSchedulerConfiguration();

        int count = sched_config.getMaxThreads();
        if (count < MIN_THREAD_COUNT) {
            count = MIN_THREAD_COUNT;
        }
        _threadCount = count;

        // create the thread pool
        _threads = ThreadPoolManager.instance().createThreadPool(
            SCHEDULER_NAME, _threadCount);
    }

    /**
     * Add a Runnable object to the scheduler queue.
     * When a thread becomes available, it will be executed.
     *
     * @param       runner              the object to execute
     */
    public void add(Runnable runner) {
        synchronized (_queue) {
            _queue.addLast(runner);
            addReference(runner);
            _queue.notify();
        }
    }

    /**
     * Remove a Runnable object from the scheduler queue.
     *
     * @param       runner              the object to remove
     * @return      boolean             <tt>true</tt> if the object was
     *                                  removed, <tt>false</tt> if it is
     *                                  already running or doesn't exist
     */
    public boolean remove(Runnable runner) {
        boolean result = false;
        synchronized (_queue) {
            result = _queue.remove(runner);
        }
        return result;
    }

    /**
     * Returns if a Runnable object exists in the scheduler queue.
     *
     * @param       runner              the object to remove
     * @return      boolean             <tt>true</tt> if the object exists,
     *                                  <tt>false</tt> if it is already
     *                                  running or doesn't exist
     */
    public boolean contains(Runnable runner) {
        boolean result = false;
        synchronized (_queue) {
            result = (_references.get(runner) != null);
        }
        return result;
    }

    /**
     * Returns true if the scheduler queue is empty
     *
     * @return <tt>true</tt> if the scheduler queue is empty
     */
    public boolean isEmpty() {
        boolean result = false;
        synchronized (_queue) {
            result = _queue.isEmpty();
        }
        return result;
    }

    /**
     * Start the scheduler
     * This can only be terminated by invoking {@link #stop}
     */
    public void run() {
        while (!_stop) {
            Runnable runner = next();
            if (!_stop && runner != null) {
                try {
                    _threads.execute(runner);
                } catch (Exception exception) {
                    _log.error(exception);
                }
            }
        }
    }

    // override BasicService.stop
    public void stop() throws ServiceException {
        // TODO - need a safer way of shutting down threads.
        _threads.stopRequestAllWorkers();
        _stop = true;
        super.stop();
    }

    /**
     * Return the next object in the queue to execute
     * This method blocks until an object becomes available.
     *
     * @return      Runnable            the next object to execute
     */
    protected Runnable next() {
        Runnable result = null;
        synchronized (_queue) {
            while (!_stop && _queue.isEmpty()) {
                try {
                    _queue.wait();
                } catch (InterruptedException ignore) {
                    // do nothing.
                }
            }
            if (!_stop) {
                result = (Runnable) _queue.removeFirst();
                removeReference(result);
            }
        }
        return result;
    }

    /**
     * Increment the reference count to a queued Runnable object,
     * to enable contains() to be as efficient as possible.
     */
    private void addReference(Runnable runner) {
        Integer count = (Integer) _references.get(runner);
        if (count != null) {
            count = new Integer(count.intValue() + 1);
            _references.put(runner, count);
        } else {
            _references.put(runner, new Integer(1));
        }
    }

    /**
     * Decrement the reference count to a queued Runnable object,
     * removing it if no more references remain.
     */
    private void removeReference(Runnable runner) {
        // decrement the no. of references to the Runnable object,
        // removing it when the count reaches 0
        Integer count = (Integer) _references.get(runner);
        if (count.intValue() <= 1) {
            _references.remove(runner);
        } else {
            _references.put(runner, new Integer(count.intValue() - 1));
        }
    }

} //-- Scheduler

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -