⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jobpoller.java

📁 国外的一套开源CRM
💻 JAVA
字号:
/*
 * $Id: JobPoller.java,v 1.6 2004/01/24 22:02:16 ajzeneski Exp $
 *
 * Copyright (c) 2002 The Open For Business Project - www.ofbiz.org
 *
 * Permission is hereby granted, free of charge, to any person obtaining a
 * copy of this software and associated documentation files (the "Software"),
 * to deal in the Software without restriction, including without limitation
 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
 * and/or sell copies of the Software, and to permit persons to whom the
 * Software is furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included
 * in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
 * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
 * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT
 * OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
 * THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 *
 */
package org.ofbiz.service.job;

import java.util.*;

import org.ofbiz.service.config.ServiceConfigUtil;
import org.ofbiz.base.util.Debug;

/**
 * JobPoller - Polls for persisted jobs to run.
 *
 * @author     <a href="mailto:jaz@ofbiz.org">Andy Zeneski</a>
 * @author     <a href="mailto:">Magnus Rosenquist</a>
 * @version    $Revision: 1.6 $
 * @since      2.0
 */
public class JobPoller implements Runnable {

    public static final String module = JobPoller.class.getName();

    public static final int MIN_THREADS = 1;
    public static final int MAX_THREADS = 15;
    public static final int MAX_JOBS = 3;
    public static final int POLL_WAIT = 20000;
    //public static final long MAX_TTL = 18000000;

    protected Thread thread = null;
    protected LinkedList pool = null;
    protected LinkedList run = null;
    protected JobManager jm = null;

    protected volatile boolean isRunning = false;

    /**
     * Creates a new JobScheduler
     * @param jm JobManager associated with this scheduler
     */
    public JobPoller(JobManager jm) {
        this.jm = jm;
        this.run = new LinkedList();

        // create the thread pool
        this.pool = createThreadPool();

        // re-load crashed jobs
        this.jm.reloadCrashedJobs();

        // start the thread only if polling is enabled
        if (pollEnabled()) {

            // create the poller thread
            thread = new Thread(this, this.toString());
            thread.setDaemon(false);

            // start the poller
            this.isRunning = true;
            thread.start();
        }
    }

    protected JobPoller() {}

    public synchronized void run() {
        if (Debug.infoOn()) Debug.logInfo("JobPoller: (" + thread.getName() + ") Thread Running...", module);
        try {
            // wait 30 seconds before the first poll
            wait(30000);
        } catch (InterruptedException e) {
        }
        while (isRunning) {
            try {
                // grab a list of jobs to run.
                Iterator poll = jm.poll();

                while (poll.hasNext()) {
                    Job job = (Job) poll.next();

                    if (job.isValid())
                        queueNow(job);
                }
                wait(pollWaitTime());
            } catch (InterruptedException e) {
                Debug.logError(e, module);
                stop();
            }
        }
        if (Debug.infoOn()) Debug.logInfo("JobPoller: (" + thread.getName() + ") Thread ending...", module);
    }

    /**
     * Returns the JobManager
     */
    public JobManager getManager() {
        return jm;
    }

    /**
     * Stops the JobPoller
     */
    public void stop() {
        isRunning = false;
        destroyThreadPool();
    }

    public List getPoolState() {
        List stateList = new ArrayList();
        Iterator i = this.pool.iterator();
        while (i.hasNext()) {
            JobInvoker invoker = (JobInvoker) i.next();
            Map stateMap = new HashMap();
            stateMap.put("threadName", invoker.getName());
            stateMap.put("jobName", invoker.getJobName());
            stateMap.put("serviceName", invoker.getServiceName());
            stateMap.put("runTime", new Long(invoker.getCurrentRuntime()));
            stateMap.put("status", new Integer(invoker.getCurrentStatus()));
            stateList.add(stateMap);
        }
        return stateList;
    }

    /**
     * Stops all threads in the threadPool and clears
     * the pool as final step.
     */
    private void destroyThreadPool() {
        Debug.logInfo("Destroying thread pool...", module);
        Iterator it = pool.iterator();
        while (it.hasNext()) {
            JobInvoker ji = (JobInvoker) it.next();
            ji.stop();
        }
        pool.clear();
    }

    public synchronized void killThread(String threadName) {
        JobInvoker inv = findThread(threadName);
        if (inv != null) {
            inv.kill();
            this.pool.remove(inv);
        }
    }

    private JobInvoker findThread(String threadName) {
        Iterator i = this.pool.iterator();
        while (i.hasNext()) {
            JobInvoker inv = (JobInvoker) i.next();
            if (threadName.equals(inv.getName())) {
                return inv;
            }
        }
        return null;
    }

    /**
     * Returns the next job to run
     */
    public synchronized Job next() {
        if (run.size() > 0)
            return (Job) run.removeFirst();
        return null;
    }

    /**
     * Adds a job to the RUN queue
     */
    public synchronized void queueNow(Job job) {
        run.add(job);
        if (Debug.verboseOn()) Debug.logVerbose("New run queue size: " + run.size(), module);
        if (run.size() > pool.size() && pool.size() < maxThreads()) {
            int calcSize = (run.size() / jobsPerThread()) - (pool.size());
            int addSize = calcSize > maxThreads() ? maxThreads() : calcSize;

            for (int i = 0; i < addSize; i++) {
                JobInvoker iv = new JobInvoker(this, invokerWaitTime());
                pool.add(iv);
            }
        }
    }

    /**
     * Removes a thread from the pool.
     * @param invoker The invoker to remove.
     */
    public synchronized void removeThread(JobInvoker invoker) {
        pool.remove(invoker);
        invoker.stop();
        if (pool.size() < minThreads()) {
            for (int i = 0; i < minThreads() - pool.size(); i++) {
                JobInvoker iv = new JobInvoker(this, invokerWaitTime());
                pool.add(iv);
            }
        }
    }

    // Creates the invoker pool
    private LinkedList createThreadPool() {
        LinkedList threadPool = new LinkedList();

        while (threadPool.size() < minThreads()) {
            JobInvoker iv = new JobInvoker(this, invokerWaitTime());
            threadPool.add(iv);
        }

        return threadPool;
    }

    private int maxThreads() {
        int max = MAX_THREADS;

        try {
            max = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "max-threads"));
        } catch (NumberFormatException nfe) {
            Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module);
        }
        return max;
    }

    private int minThreads() {
        int min = MIN_THREADS;

        try {
            min = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "min-threads"));
        } catch (NumberFormatException nfe) {
            Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module);
        }
        return min;
    }

    private int jobsPerThread() {
        int jobs = MAX_JOBS;

        try {
            jobs = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "jobs"));
        } catch (NumberFormatException nfe) {
            Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module);
        }
        return jobs;
    }

    private int invokerWaitTime() {
        int wait = JobInvoker.WAIT_TIME;

        try {
            wait = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "wait-millis"));
        } catch (NumberFormatException nfe) {
            Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module);
        }
        return wait;
    }

    private int pollWaitTime() {
        int poll = POLL_WAIT;

        try {
            poll = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "poll-db-millis"));
        } catch (NumberFormatException nfe) {
            Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module);
        }
        return poll;
    }

    private boolean pollEnabled() {
        String enabled = ServiceConfigUtil.getElementAttr("thread-pool", "poll-enabled");

        if (enabled.equalsIgnoreCase("false"))
            return false;

        // also make sure we have a delegator to use for polling
        if (jm.getDelegator() == null) {
            Debug.logWarning("No delegator referenced; not starting job poller.", module);
            return false;
        }

        return true;
    }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -