📄 simplescheduler.java
字号:
// schedule check for stale nodes, make it random so that the nodes don't overlap. _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + (long) (_random.nextDouble() * _staleInterval))); // do the upgrade sometime (random) in the immediate interval. _todo.enqueue(new UpgradeJobsTask(System.currentTimeMillis() + (long) (_random.nextDouble() * _immediateInterval))); _todo.start(); _running = true; } public synchronized void stop() { if (!_running) return; _todo.stop(); _todo.clearTasks(UpgradeJobsTask.class); _todo.clearTasks(LoadImmediateTask.class); _todo.clearTasks(CheckStaleNodes.class); _running = false; } /** * Run a job in the current thread. * * @param job * job to run. */ protected void runJob(final Job job) { final Scheduler.JobInfo jobInfo = new Scheduler.JobInfo(job.jobId, job.detail, (Integer)(job.detail.get("retry") != null ? job.detail.get("retry") : 0)); _exec.submit(new Callable<Void>() { public Void call() throws Exception { if (job.transacted) { try { execTransaction(new Callable<Void>() { public Void call() throws Exception { _jobProcessor.onScheduledJob(jobInfo); if (job.persisted) if (!_db.deleteJob(job.jobId, _nodeId)) throw new JobNoLongerInDbException(job.jobId,_nodeId); return null; } }); } catch (JobNoLongerInDbException jde) { // This may happen if two node try to do the same job... we try to avoid // it the synchronization is a best-effort but not perfect. __log.debug("job no longer in db forced rollback."); } catch (JobProcessorException jpe) { if (jpe.retry) { __log.error("Error while processing transaction, retrying.", jpe); doRetry(job); } else { __log.error("Error while processing transaction, no retry.", jpe); } } catch (Exception ex) { __log.error("Error while executing transaction", ex); } } else { _jobProcessor.onScheduledJob(jobInfo); } return null; } }); } private void addTodoOnCommit(final Job job) { registerSynchronizer(new Synchronizer() { public void afterCompletion(boolean success) { if (success) { _todo.enqueue(job); } } public void beforeCompletion() { } }); } public boolean isTransacted() { try { Transaction tx = _txm.getTransaction(); return (tx != null && tx.getStatus() != Status.STATUS_NO_TRANSACTION); } catch (SystemException e) { throw new ContextException("Internal Error: Could not obtain transaction status."); } } public void runTask(Task task) { if (task instanceof Job) runJob((Job) task); if (task instanceof SchedulerTask) ((SchedulerTask) task).run(); } public void updateHeartBeat(String nodeId) { if (nodeId == null) return; if (_nodeId.equals(nodeId)) return; _lastHeartBeat.put(nodeId, System.currentTimeMillis()); _knownNodes.add(nodeId); } boolean doLoadImmediate() { __log.debug("LOAD IMMEDIATE started"); List<Job> jobs; try { do { jobs = execTransaction(new Callable<List<Job>>() { public List<Job> call() throws Exception { return _db.dequeueImmediate(_nodeId, System.currentTimeMillis() + _immediateInterval, 10); } }); for (Job j : jobs) { if (__log.isDebugEnabled()) __log.debug("todo.enqueue job from db: " + j.jobId + " for " + j.schedDate); _todo.enqueue(j); } } while (jobs.size() == 10); return true; } catch (Exception ex) { __log.error("Error loading immediate jobs from database.", ex); return false; } finally { __log.debug("LOAD IMMEDIATE complete"); } } boolean doUpgrade() { __log.debug("UPGRADE started"); final ArrayList<String> knownNodes = new ArrayList<String>(_knownNodes); // Don't forget about self. knownNodes.add(_nodeId); Collections.sort(knownNodes); // We're going to try to upgrade near future jobs using the db only. // We assume that the distribution of the trailing digits in the // scheduled time are uniformly distributed, and use modular division // of the time by the number of nodes to create the node assignment. // This can be done in a single update statement. final long maxtime = System.currentTimeMillis() + _nearFutureInterval; try { return execTransaction(new Callable<Boolean>() { public Boolean call() throws Exception { int numNodes = knownNodes.size(); for (int i = 0; i < numNodes; ++i) { String node = knownNodes.get(i); _db.updateAssignToNode(node, i, numNodes, maxtime); } return true; } }); } catch (Exception ex) { __log.error("Database error upgrading jobs.", ex); return false; } finally { __log.debug("UPGRADE complete"); } } /** * Re-assign stale node's jobs to self. * @param nodeId */ void recoverStaleNode(final String nodeId) { __log.debug("recovering stale node " + nodeId); try { int numrows = execTransaction(new Callable<Integer>() { public Integer call() throws Exception { return _db.updateReassign(nodeId, _nodeId); } }); __log.debug("reassigned " + numrows + " jobs to self. "); // We can now forget about this node, if we see it again, it will be // "new to us" _knownNodes.remove(nodeId); _lastHeartBeat.remove(nodeId); // Force a load-immediate to catch anything new from the recovered node. doLoadImmediate(); } catch (Exception ex) { __log.error("Database error reassigning node.", ex); } finally { __log.debug("node recovery complete"); } } private void doRetry(Job job) throws DatabaseException { Calendar retryTime = Calendar.getInstance(); retryTime.add(Calendar.SECOND, 2); job.detail.put("retry", job.detail.get("retry") != null ? (((Integer)job.detail.get("retry")) + 1) : 1); Job jobRetry = new Job(retryTime.getTime().getTime(), true, job.detail); _db.insertJob(jobRetry, _nodeId, false); } private abstract class SchedulerTask extends Task implements Runnable { SchedulerTask(long schedDate) { super(schedDate); } } private class LoadImmediateTask extends SchedulerTask { LoadImmediateTask(long schedDate) { super(schedDate); } public void run() { boolean success = false; try { success = doLoadImmediate(); } finally { if (success) _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + (long) (_immediateInterval * .75))); else _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + 100)); } } } /** * Upgrade jobs from far future to immediate future (basically, assign them to a node). * @author mszefler * */ private class UpgradeJobsTask extends SchedulerTask { UpgradeJobsTask(long schedDate) { super(schedDate); } public void run() { long ctime = System.currentTimeMillis(); long ntime = _nextUpgrade.get(); __log.debug("UPGRADE task for " + schedDate + " fired at " + ctime); // We could be too early, this can happen if upgrade gets delayed due to another // node if (_nextUpgrade.get() > System.currentTimeMillis()) { __log.debug("UPGRADE skipped -- wait another " + (ntime - ctime) + "ms"); _todo.enqueue(new UpgradeJobsTask(ntime)); return; } boolean success = false; try { success = doUpgrade(); } finally { long future = System.currentTimeMillis() + (success ? (long) (_nearFutureInterval * .50) : 100); _nextUpgrade.set(future); _todo.enqueue(new UpgradeJobsTask(future)); __log.debug("UPGRADE completed, success = " + success + "; next time in " + (future - ctime) + "ms"); } } } /** * Check if any of the nodes in our cluster are stale. */ private class CheckStaleNodes extends SchedulerTask { CheckStaleNodes(long schedDate) { super(schedDate); } public void run() { _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + _staleInterval)); __log.debug("CHECK STALE NODES started"); for (String nodeId : _knownNodes) { Long lastSeen = _lastHeartBeat.get(nodeId); if (lastSeen == null || (System.currentTimeMillis() - lastSeen) > _staleInterval) recoverStaleNode(nodeId); } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -