⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 simplescheduler.java

📁 bpel执行引擎用来执行bpel业务流程
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.  See the NOTICE file * distributed with this work for additional information * regarding copyright ownership.  The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License.  You may obtain a copy of the License at * *    http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied.  See the License for the * specific language governing permissions and limitations * under the License. */package org.apache.ode.scheduler.simple;import java.util.*;import java.util.concurrent.Callable;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.CopyOnWriteArraySet;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.atomic.AtomicLong;import javax.transaction.Status;import javax.transaction.Synchronization;import javax.transaction.SystemException;import javax.transaction.Transaction;import javax.transaction.TransactionManager;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.ode.bpel.iapi.ContextException;import org.apache.ode.bpel.iapi.Scheduler;/** * A reliable and relatively simple scheduler that uses a database to persist information about  * scheduled tasks. *  * The challange is to achieve high performance in a small memory footprint without loss of reliability * while supporting distributed/clustered configurations. *  * The design is based around three time horizons: "immediate", "near future", and "everything else".  * Immediate jobs (i.e. jobs that are about to be up) are written to the database and kept in * an in-memory priority queue. When they execute, they are removed from the database. Near future * jobs are placed in the database and assigned to the current node, however they are not stored in * memory. Periodically jobs are "upgraded" from near-future to immediate status, at which point they * get loaded into memory. Jobs that are further out in time, are placed in the database without a  * node identifer; when they are ready to be "upgraded" to near-future jobs they are assigned to one * of the known live nodes. Recovery is rather straighforward, with stale node identifiers being  * reassigned to known good nodes.        *  * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m ) * */public class SimpleScheduler implements Scheduler, TaskRunner {    private static final Log __log = LogFactory.getLog(SimpleScheduler.class);    /**     * Jobs scheduled with a time that is between [now, now+immediateInterval] will be assigned to the current node, and placed     * directly on the todo queue.     */    long _immediateInterval = 30000;    /**     * Jobs sccheduled with a time that is between (now+immediateInterval,now+nearFutureInterval) will be assigned to the current     * node, but will not be placed on the todo queue (the promoter will pick them up).     */    long _nearFutureInterval = 10 * 60 * 1000;    /** 10s of no communication and you are deemed dead. */    long _staleInterval = 10000;    TransactionManager _txm;    ExecutorService _exec;    String _nodeId;    /** Maximum number of jobs in the "near future" / todo queue. */    volatile int _todoLimit = 200;    /** The object that actually handles the jobs. */    volatile JobProcessor _jobProcessor;    private SchedulerThread _todo;    private DatabaseDelegate _db;    /** All the nodes we know about */    private CopyOnWriteArraySet<String> _knownNodes = new CopyOnWriteArraySet<String>();    /** When we last heard from our nodes. */    private ConcurrentHashMap<String, Long> _lastHeartBeat = new ConcurrentHashMap<String, Long>();    private boolean _running;    /** Time for next upgrade. */    private AtomicLong _nextUpgrade = new AtomicLong();    /** Time for next job load */    private AtomicLong _nextScheduleImmediate = new AtomicLong();    private Random _random = new Random();    public SimpleScheduler(String nodeId, DatabaseDelegate del) {        _nodeId = nodeId;        _db = del;        _todo = new SchedulerThread(this);    }    public void setNodeId(String nodeId) {        _nodeId = nodeId;    }    public void setStaleInterval(long staleInterval) {        _staleInterval = staleInterval;    }    public void setImmediateInterval(long immediateInterval) {        _immediateInterval = immediateInterval;    }    public void setNearFutureInterval(long nearFutureInterval) {        _nearFutureInterval = nearFutureInterval;    }    public void setTransactionManager(TransactionManager txm) {        _txm = txm;    }    public void setDatabaseDelegate(DatabaseDelegate dbd) {        _db = dbd;    }    public void setExecutorService(ExecutorService executorService) {        _exec = executorService;    }    public void cancelJob(String jobId) throws ContextException {        // TODO: maybe later, not really necessary.    }    public <T> Future<T> execIsolatedTransaction(final Callable<T> transaction) throws Exception, ContextException {        return _exec.submit(new Callable<T>() {            public T call() throws Exception {                try {                    return execTransaction(transaction);                } catch (Exception e) {                    __log.error("An exception occured while executing an isolated transaction, " +                            "the transaction is going to be abandoned.", e);                    return null;                }            }        });    }    public <T> T execTransaction(Callable<T> transaction) throws Exception, ContextException {        try {            _txm.begin();        } catch (Exception ex) {            String errmsg = "Internal Error, could not begin transaction.";            throw new ContextException(errmsg, ex);        }        boolean success = false;        try {            T retval = transaction.call();            success = true;            return retval;        } catch (Exception ex) {            throw ex;        } finally {            if (success)                _txm.commit();            else                _txm.rollback();        }    }    public void registerSynchronizer(final Synchronizer synch) throws ContextException {        try {            _txm.getTransaction().registerSynchronization(new Synchronization() {                public void beforeCompletion() {                    synch.beforeCompletion();                }                public void afterCompletion(int status) {                    synch.afterCompletion(status == Status.STATUS_COMMITTED);                }            });        } catch (Exception e) {            throw new ContextException("Unable to register synchronizer.", e);        }    }    public String schedulePersistedJob(final Map<String, Object> jobDetail, Date when) throws ContextException {        long ctime = System.currentTimeMillis();        if (when == null)            when = new Date(ctime);        if (__log.isDebugEnabled())            __log.debug("scheduling " + jobDetail + " for " + when);        boolean immediate = when.getTime() <= ctime + _immediateInterval;        boolean nearfuture = !immediate && when.getTime() <= ctime + _nearFutureInterval;        Job job = new Job(when.getTime(), true, jobDetail);        try {            if (immediate) {                // If we have too many jobs in the queue, we don't allow any new ones                if (_todo.size() > _todoLimit)                    throw new ContextException("The execution queue is backed up... Forcing ContextException");                // Immediate scheduling means we put it in the DB for safe keeping                _db.insertJob(job, _nodeId, true);                // And add it to our todo list .                addTodoOnCommit(job);                __log.debug("scheduled immediate job: " + job.jobId);            } else if (nearfuture) {                // Near future, assign the job to ourselves (why? -- this makes it very unlikely that we                // would get two nodes trying to process the same instance, which causes unsightly rollbacks).                _db.insertJob(job, _nodeId, false);                __log.debug("scheduled near-future job: " + job.jobId);            } else /* far future */{                // Not the near future, we don't assign a node-id, we'll assign it later.                _db.insertJob(job, null, false);                __log.debug("scheduled far-future job: " + job.jobId);            }        } catch (DatabaseException dbe) {            __log.error("Database error.", dbe);            throw new ContextException("Database error.", dbe);        }        return job.jobId;    }    public String scheduleVolatileJob(boolean transacted, Map<String, Object> jobDetail) throws ContextException {        Job job = new Job(System.currentTimeMillis(), transacted, jobDetail);        job.persisted = false;        addTodoOnCommit(job);        return job.toString();    }    public void setJobProcessor(JobProcessor processor) throws ContextException {        _jobProcessor = processor;    }    public void shutdown() {        stop();        _jobProcessor = null;        _txm = null;        _todo = null;    }    public synchronized void start() {        if (_running)            return;        if (_exec == null)            _exec = Executors.newCachedThreadPool();        _todo.clearTasks(UpgradeJobsTask.class);        _todo.clearTasks(LoadImmediateTask.class);        _todo.clearTasks(CheckStaleNodes.class);        _knownNodes.clear();        try {            execTransaction(new Callable<Void>() {                public Void call() throws Exception {                    _knownNodes.addAll(_db.getNodeIds());                    return null;                }            });        } catch (Exception ex) {            __log.error("Error retrieving node list.", ex);            throw new ContextException("Error retrieving node list.", ex);        }        // Pretend we got a heartbeat...        for (String s : _knownNodes)            _lastHeartBeat.put(s, System.currentTimeMillis());        // schedule immediate job loading for now!        _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis()));

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -