⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 simplescheduler.java

📁 bpel执行引擎用来执行bpel业务流程
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
        // schedule check for stale nodes, make it random so that the nodes don't overlap.        _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + (long) (_random.nextDouble() * _staleInterval)));        // do the upgrade sometime (random) in the immediate interval.        _todo.enqueue(new UpgradeJobsTask(System.currentTimeMillis() + (long) (_random.nextDouble() * _immediateInterval)));        _todo.start();        _running = true;    }    public synchronized void stop() {        if (!_running)            return;        _todo.stop();        _todo.clearTasks(UpgradeJobsTask.class);        _todo.clearTasks(LoadImmediateTask.class);        _todo.clearTasks(CheckStaleNodes.class);        _running = false;    }    /**     * Run a job in the current thread.     *     * @param job     *            job to run.     */    protected void runJob(final Job job) {        final Scheduler.JobInfo jobInfo = new Scheduler.JobInfo(job.jobId, job.detail,                (Integer)(job.detail.get("retry") != null ? job.detail.get("retry") : 0));        _exec.submit(new Callable<Void>() {            public Void call() throws Exception {                if (job.transacted) {                    try {                        execTransaction(new Callable<Void>() {                            public Void call() throws Exception {                                _jobProcessor.onScheduledJob(jobInfo);                                if (job.persisted)                                    if (!_db.deleteJob(job.jobId, _nodeId))                                        throw new JobNoLongerInDbException(job.jobId,_nodeId);                                return null;                            }                        });                    } catch (JobNoLongerInDbException jde) {                        // This may happen if two node try to do the same job... we try to avoid                        // it the synchronization is a best-effort but not perfect.                        __log.debug("job no longer in db forced rollback.");                    } catch (JobProcessorException jpe) {                        if (jpe.retry) {                            __log.error("Error while processing transaction, retrying.", jpe);                            doRetry(job);                        } else {                            __log.error("Error while processing transaction, no retry.", jpe);                        }                    } catch (Exception ex) {                        __log.error("Error while executing transaction", ex);                    }                } else {                    _jobProcessor.onScheduledJob(jobInfo);                }                return null;            }        });    }    private void addTodoOnCommit(final Job job) {        registerSynchronizer(new Synchronizer() {            public void afterCompletion(boolean success) {                if (success) {                    _todo.enqueue(job);                }            }            public void beforeCompletion() {            }        });    }    public boolean isTransacted() {        try {            Transaction tx = _txm.getTransaction();            return (tx != null && tx.getStatus() != Status.STATUS_NO_TRANSACTION);        } catch (SystemException e) {            throw new ContextException("Internal Error: Could not obtain transaction status.");        }    }    public void runTask(Task task) {        if (task instanceof Job)            runJob((Job) task);        if (task instanceof SchedulerTask)            ((SchedulerTask) task).run();    }    public void updateHeartBeat(String nodeId) {        if (nodeId == null)            return;        if (_nodeId.equals(nodeId))            return;        _lastHeartBeat.put(nodeId, System.currentTimeMillis());        _knownNodes.add(nodeId);    }    boolean doLoadImmediate() {        __log.debug("LOAD IMMEDIATE started");        List<Job> jobs;        try {            do {                jobs = execTransaction(new Callable<List<Job>>() {                    public List<Job> call() throws Exception {                        return _db.dequeueImmediate(_nodeId, System.currentTimeMillis() + _immediateInterval, 10);                    }                });                for (Job j : jobs) {                    if (__log.isDebugEnabled())                        __log.debug("todo.enqueue job from db: " + j.jobId + " for " + j.schedDate);                    _todo.enqueue(j);                }            } while (jobs.size() == 10);            return true;        } catch (Exception ex) {            __log.error("Error loading immediate jobs from database.", ex);            return false;        } finally {            __log.debug("LOAD IMMEDIATE complete");        }    }    boolean doUpgrade() {        __log.debug("UPGRADE started");        final ArrayList<String> knownNodes = new ArrayList<String>(_knownNodes);        // Don't forget about self.        knownNodes.add(_nodeId);        Collections.sort(knownNodes);        // We're going to try to upgrade near future jobs using the db only.        // We assume that the distribution of the trailing digits in the        // scheduled time are uniformly distributed, and use modular division        // of the time by the number of nodes to create the node assignment.        // This can be done in a single update statement.        final long maxtime = System.currentTimeMillis() + _nearFutureInterval;        try {            return execTransaction(new Callable<Boolean>() {                public Boolean call() throws Exception {                    int numNodes = knownNodes.size();                    for (int i = 0; i < numNodes; ++i) {                        String node = knownNodes.get(i);                        _db.updateAssignToNode(node, i, numNodes, maxtime);                    }                    return true;                }            });        } catch (Exception ex) {            __log.error("Database error upgrading jobs.", ex);            return false;        } finally {            __log.debug("UPGRADE complete");        }    }    /**     * Re-assign stale node's jobs to self.     * @param nodeId     */    void recoverStaleNode(final String nodeId) {        __log.debug("recovering stale node " + nodeId);        try {            int numrows = execTransaction(new Callable<Integer>() {                public Integer call() throws Exception {                    return _db.updateReassign(nodeId, _nodeId);                }            });            __log.debug("reassigned " + numrows + " jobs to self. ");            // We can now forget about this node, if we see it again, it will be            // "new to us"            _knownNodes.remove(nodeId);            _lastHeartBeat.remove(nodeId);            // Force a load-immediate to catch anything new from the recovered node.            doLoadImmediate();        } catch (Exception ex) {            __log.error("Database error reassigning node.", ex);        } finally {            __log.debug("node recovery complete");        }    }    private void doRetry(Job job) throws DatabaseException {        Calendar retryTime = Calendar.getInstance();        retryTime.add(Calendar.SECOND, 2);        job.detail.put("retry", job.detail.get("retry") != null ? (((Integer)job.detail.get("retry")) + 1) : 1);        Job jobRetry = new Job(retryTime.getTime().getTime(), true, job.detail);        _db.insertJob(jobRetry, _nodeId, false);    }    private abstract class SchedulerTask extends Task implements Runnable {        SchedulerTask(long schedDate) {            super(schedDate);        }    }    private class LoadImmediateTask extends SchedulerTask {        LoadImmediateTask(long schedDate) {            super(schedDate);        }        public void run() {            boolean success = false;            try {                success = doLoadImmediate();            } finally {                if (success)                    _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + (long) (_immediateInterval * .75)));                else                    _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + 100));            }        }    }    /**     * Upgrade jobs from far future to immediate future (basically, assign them to a node).     * @author mszefler     *     */    private class UpgradeJobsTask extends SchedulerTask {        UpgradeJobsTask(long schedDate) {            super(schedDate);        }        public void run() {            long ctime = System.currentTimeMillis();            long ntime = _nextUpgrade.get();            __log.debug("UPGRADE task for " + schedDate + " fired at " + ctime);            // We could be too early, this can happen if upgrade gets delayed due to another            // node            if (_nextUpgrade.get() > System.currentTimeMillis()) {                __log.debug("UPGRADE skipped -- wait another " + (ntime - ctime) + "ms");                _todo.enqueue(new UpgradeJobsTask(ntime));                return;            }            boolean success = false;            try {                success = doUpgrade();            } finally {                long future = System.currentTimeMillis() + (success ? (long) (_nearFutureInterval * .50) : 100);                _nextUpgrade.set(future);                _todo.enqueue(new UpgradeJobsTask(future));                __log.debug("UPGRADE completed, success = " + success + "; next time in " + (future - ctime) + "ms");            }        }    }    /**     * Check if any of the nodes in our cluster are stale.     */    private class CheckStaleNodes extends SchedulerTask {        CheckStaleNodes(long schedDate) {            super(schedDate);        }        public void run() {            _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + _staleInterval));            __log.debug("CHECK STALE NODES started");            for (String nodeId : _knownNodes) {                Long lastSeen = _lastHeartBeat.get(nodeId);                if (lastSeen == null || (System.currentTimeMillis() - lastSeen) > _staleInterval)                    recoverStaleNode(nodeId);            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -