📄 simplescheduler.java
字号:
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */package org.apache.ode.scheduler.simple;import java.util.*;import java.util.concurrent.Callable;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.CopyOnWriteArraySet;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.atomic.AtomicLong;import javax.transaction.Status;import javax.transaction.Synchronization;import javax.transaction.SystemException;import javax.transaction.Transaction;import javax.transaction.TransactionManager;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.ode.bpel.iapi.ContextException;import org.apache.ode.bpel.iapi.Scheduler;/** * A reliable and relatively simple scheduler that uses a database to persist information about * scheduled tasks. * * The challange is to achieve high performance in a small memory footprint without loss of reliability * while supporting distributed/clustered configurations. * * The design is based around three time horizons: "immediate", "near future", and "everything else". * Immediate jobs (i.e. jobs that are about to be up) are written to the database and kept in * an in-memory priority queue. When they execute, they are removed from the database. Near future * jobs are placed in the database and assigned to the current node, however they are not stored in * memory. Periodically jobs are "upgraded" from near-future to immediate status, at which point they * get loaded into memory. Jobs that are further out in time, are placed in the database without a * node identifer; when they are ready to be "upgraded" to near-future jobs they are assigned to one * of the known live nodes. Recovery is rather straighforward, with stale node identifiers being * reassigned to known good nodes. * * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m ) * */public class SimpleScheduler implements Scheduler, TaskRunner { private static final Log __log = LogFactory.getLog(SimpleScheduler.class); /** * Jobs scheduled with a time that is between [now, now+immediateInterval] will be assigned to the current node, and placed * directly on the todo queue. */ long _immediateInterval = 30000; /** * Jobs sccheduled with a time that is between (now+immediateInterval,now+nearFutureInterval) will be assigned to the current * node, but will not be placed on the todo queue (the promoter will pick them up). */ long _nearFutureInterval = 10 * 60 * 1000; /** 10s of no communication and you are deemed dead. */ long _staleInterval = 10000; TransactionManager _txm; ExecutorService _exec; String _nodeId; /** Maximum number of jobs in the "near future" / todo queue. */ volatile int _todoLimit = 200; /** The object that actually handles the jobs. */ volatile JobProcessor _jobProcessor; private SchedulerThread _todo; private DatabaseDelegate _db; /** All the nodes we know about */ private CopyOnWriteArraySet<String> _knownNodes = new CopyOnWriteArraySet<String>(); /** When we last heard from our nodes. */ private ConcurrentHashMap<String, Long> _lastHeartBeat = new ConcurrentHashMap<String, Long>(); private boolean _running; /** Time for next upgrade. */ private AtomicLong _nextUpgrade = new AtomicLong(); /** Time for next job load */ private AtomicLong _nextScheduleImmediate = new AtomicLong(); private Random _random = new Random(); public SimpleScheduler(String nodeId, DatabaseDelegate del) { _nodeId = nodeId; _db = del; _todo = new SchedulerThread(this); } public void setNodeId(String nodeId) { _nodeId = nodeId; } public void setStaleInterval(long staleInterval) { _staleInterval = staleInterval; } public void setImmediateInterval(long immediateInterval) { _immediateInterval = immediateInterval; } public void setNearFutureInterval(long nearFutureInterval) { _nearFutureInterval = nearFutureInterval; } public void setTransactionManager(TransactionManager txm) { _txm = txm; } public void setDatabaseDelegate(DatabaseDelegate dbd) { _db = dbd; } public void setExecutorService(ExecutorService executorService) { _exec = executorService; } public void cancelJob(String jobId) throws ContextException { // TODO: maybe later, not really necessary. } public <T> Future<T> execIsolatedTransaction(final Callable<T> transaction) throws Exception, ContextException { return _exec.submit(new Callable<T>() { public T call() throws Exception { try { return execTransaction(transaction); } catch (Exception e) { __log.error("An exception occured while executing an isolated transaction, " + "the transaction is going to be abandoned.", e); return null; } } }); } public <T> T execTransaction(Callable<T> transaction) throws Exception, ContextException { try { _txm.begin(); } catch (Exception ex) { String errmsg = "Internal Error, could not begin transaction."; throw new ContextException(errmsg, ex); } boolean success = false; try { T retval = transaction.call(); success = true; return retval; } catch (Exception ex) { throw ex; } finally { if (success) _txm.commit(); else _txm.rollback(); } } public void registerSynchronizer(final Synchronizer synch) throws ContextException { try { _txm.getTransaction().registerSynchronization(new Synchronization() { public void beforeCompletion() { synch.beforeCompletion(); } public void afterCompletion(int status) { synch.afterCompletion(status == Status.STATUS_COMMITTED); } }); } catch (Exception e) { throw new ContextException("Unable to register synchronizer.", e); } } public String schedulePersistedJob(final Map<String, Object> jobDetail, Date when) throws ContextException { long ctime = System.currentTimeMillis(); if (when == null) when = new Date(ctime); if (__log.isDebugEnabled()) __log.debug("scheduling " + jobDetail + " for " + when); boolean immediate = when.getTime() <= ctime + _immediateInterval; boolean nearfuture = !immediate && when.getTime() <= ctime + _nearFutureInterval; Job job = new Job(when.getTime(), true, jobDetail); try { if (immediate) { // If we have too many jobs in the queue, we don't allow any new ones if (_todo.size() > _todoLimit) throw new ContextException("The execution queue is backed up... Forcing ContextException"); // Immediate scheduling means we put it in the DB for safe keeping _db.insertJob(job, _nodeId, true); // And add it to our todo list . addTodoOnCommit(job); __log.debug("scheduled immediate job: " + job.jobId); } else if (nearfuture) { // Near future, assign the job to ourselves (why? -- this makes it very unlikely that we // would get two nodes trying to process the same instance, which causes unsightly rollbacks). _db.insertJob(job, _nodeId, false); __log.debug("scheduled near-future job: " + job.jobId); } else /* far future */{ // Not the near future, we don't assign a node-id, we'll assign it later. _db.insertJob(job, null, false); __log.debug("scheduled far-future job: " + job.jobId); } } catch (DatabaseException dbe) { __log.error("Database error.", dbe); throw new ContextException("Database error.", dbe); } return job.jobId; } public String scheduleVolatileJob(boolean transacted, Map<String, Object> jobDetail) throws ContextException { Job job = new Job(System.currentTimeMillis(), transacted, jobDetail); job.persisted = false; addTodoOnCommit(job); return job.toString(); } public void setJobProcessor(JobProcessor processor) throws ContextException { _jobProcessor = processor; } public void shutdown() { stop(); _jobProcessor = null; _txm = null; _todo = null; } public synchronized void start() { if (_running) return; if (_exec == null) _exec = Executors.newCachedThreadPool(); _todo.clearTasks(UpgradeJobsTask.class); _todo.clearTasks(LoadImmediateTask.class); _todo.clearTasks(CheckStaleNodes.class); _knownNodes.clear(); try { execTransaction(new Callable<Void>() { public Void call() throws Exception { _knownNodes.addAll(_db.getNodeIds()); return null; } }); } catch (Exception ex) { __log.error("Error retrieving node list.", ex); throw new ContextException("Error retrieving node list.", ex); } // Pretend we got a heartbeat... for (String s : _knownNodes) _lastHeartBeat.put(s, System.currentTimeMillis()); // schedule immediate job loading for now! _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis()));
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -