⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jobstoresupport.java

📁 Quartz is a full-featured, open source job scheduling system that can be integrated with, or used al
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
        throws JobPersistenceException {
        try {
            // find volatile jobs & triggers...
            Key[] volatileTriggers = getDelegate().selectVolatileTriggers(conn);
            Key[] volatileJobs = getDelegate().selectVolatileJobs(conn);

            for (int i = 0; i < volatileTriggers.length; i++) {
                removeTrigger(conn, null, volatileTriggers[i].getName(),
                        volatileTriggers[i].getGroup());
            }
            getLog().info(
                    "Removed " + volatileTriggers.length
                            + " Volatile Trigger(s).");

            for (int i = 0; i < volatileJobs.length; i++) {
                removeJob(conn, null, volatileJobs[i].getName(),
                        volatileJobs[i].getGroup(), true);
            }
            getLog().info(
                    "Removed " + volatileJobs.length + " Volatile Job(s).");

            // clean up any fired trigger entries
            getDelegate().deleteVolatileFiredTriggers(conn);

        } catch (Exception e) {
            throw new JobPersistenceException("Couldn't clean volatile data: "
                    + e.getMessage(), e);
        }
    }

    /**
     * Recover any failed or misfired jobs and clean up the data store as
     * appropriate.
     * 
     * @throws JobPersistenceException if jobs could not be recovered
     */
    protected void recoverJobs() throws JobPersistenceException {
        executeInNonManagedTXLock(
            LOCK_TRIGGER_ACCESS,
            new VoidTransactionCallback() {
                public void execute(Connection conn) throws JobPersistenceException {
                    recoverJobs(conn);
                }
            });
    }
    
    /**
     * <p>
     * Will recover any failed or misfired jobs and clean up the data store as
     * appropriate.
     * </p>
     * 
     * @throws JobPersistenceException
     *           if jobs could not be recovered
     */
    protected void recoverJobs(Connection conn) throws JobPersistenceException {
        try {
            // update inconsistent job states
            int rows = getDelegate().updateTriggerStatesFromOtherStates(conn,
                    STATE_WAITING, STATE_ACQUIRED, STATE_BLOCKED);

            rows += getDelegate().updateTriggerStatesFromOtherStates(conn,
                        STATE_PAUSED, STATE_PAUSED_BLOCKED, STATE_PAUSED_BLOCKED);
            
            getLog().info(
                    "Freed " + rows
                            + " triggers from 'acquired' / 'blocked' state.");

            // clean up misfired jobs
            recoverMisfiredJobs(conn, true);
            
            // recover jobs marked for recovery that were not fully executed
            Trigger[] recoveringJobTriggers = getDelegate()
                    .selectTriggersForRecoveringJobs(conn);
            getLog()
                    .info(
                            "Recovering "
                                    + recoveringJobTriggers.length
                                    + " jobs that were in-progress at the time of the last shut-down.");

            for (int i = 0; i < recoveringJobTriggers.length; ++i) {
                if (jobExists(conn, recoveringJobTriggers[i].getJobName(),
                        recoveringJobTriggers[i].getJobGroup())) {
                    recoveringJobTriggers[i].computeFirstFireTime(null);
                    storeTrigger(conn, null, recoveringJobTriggers[i], null, false,
                            STATE_WAITING, false, true);
                }
            }
            getLog().info("Recovery complete.");

            // remove lingering 'complete' triggers...
            Key[] ct = getDelegate().selectTriggersInState(conn, STATE_COMPLETE);
            for(int i=0; ct != null && i < ct.length; i++) {
                removeTrigger(conn, null, ct[i].getName(), ct[i].getGroup());
            }
            getLog().info(
                "Removed " + ct.length
                + " 'complete' triggers.");
            
            // clean up any fired trigger entries
            int n = getDelegate().deleteFiredTriggers(conn);
            getLog().info("Removed " + n + " stale fired job entries.");
        } catch (JobPersistenceException e) {
            throw e;
        } catch (Exception e) {
            throw new JobPersistenceException("Couldn't recover jobs: "
                    + e.getMessage(), e);
        }
    }

    protected long getMisfireTime() {
        long misfireTime = System.currentTimeMillis();
        if (getMisfireThreshold() > 0) {
            misfireTime -= getMisfireThreshold();
        }

        return (misfireTime > 0) ? misfireTime : 0;
    }

    /**
     * Helper class for returning the composite result of trying
     * to recover misfired jobs.
     */
    protected static class RecoverMisfiredJobsResult {
        public static final RecoverMisfiredJobsResult NO_OP =
            new RecoverMisfiredJobsResult(false, 0);
        
        private boolean _hasMoreMisfiredTriggers;
        private int _processedMisfiredTriggerCount;
        
        public RecoverMisfiredJobsResult(
            boolean hasMoreMisfiredTriggers, int processedMisfiredTriggerCount) {
            _hasMoreMisfiredTriggers = hasMoreMisfiredTriggers;
            _processedMisfiredTriggerCount = processedMisfiredTriggerCount;
        }
        
        public boolean hasMoreMisfiredTriggers() {
            return _hasMoreMisfiredTriggers;
        }
        public int getProcessedMisfiredTriggerCount() {
            return _processedMisfiredTriggerCount;
        } 
    }
    
    protected RecoverMisfiredJobsResult recoverMisfiredJobs(
        Connection conn, boolean recovering)
        throws JobPersistenceException, SQLException {

        // If recovering, we want to handle all of the misfired
        // triggers right away.
        int maxMisfiresToHandleAtATime = 
            (recovering) ? -1 : getMaxMisfiresToHandleAtATime();
        
        List misfiredTriggers = new ArrayList();
        
        // We must still look for the MISFIRED state in case triggers were left 
        // in this state when upgrading to this version that does not support it. 
        boolean hasMoreMisfiredTriggers =
            getDelegate().selectMisfiredTriggersInStates(
                conn, STATE_MISFIRED, STATE_WAITING, getMisfireTime(), 
                maxMisfiresToHandleAtATime, misfiredTriggers);

        if (hasMoreMisfiredTriggers) {
            getLog().info(
                "Handling the first " + misfiredTriggers.size() +
                " triggers that missed their scheduled fire-time.  " +
                "More misfired triggers remain to be processed.");
        } else if (misfiredTriggers.size() > 0) { 
            getLog().info(
                "Handling " + misfiredTriggers.size() + 
                " trigger(s) that missed their scheduled fire-time.");
        } else {
            getLog().debug(
                "Found 0 triggers that missed their scheduled fire-time.");
            return RecoverMisfiredJobsResult.NO_OP; 
        }

        for (Iterator misfiredTriggerIter = misfiredTriggers.iterator(); misfiredTriggerIter.hasNext();) {
            Key triggerKey = (Key) misfiredTriggerIter.next();
            
            Trigger trig = 
                retrieveTrigger(conn, triggerKey.getName(), triggerKey.getGroup());

            if (trig == null) {
                continue;
            }

            doUpdateOfMisfiredTrigger(conn, null, trig, false, STATE_WAITING, recovering);
        }

        return new RecoverMisfiredJobsResult(
                hasMoreMisfiredTriggers, misfiredTriggers.size());
    }

    protected boolean updateMisfiredTrigger(Connection conn,
            SchedulingContext ctxt, String triggerName, String groupName,
            String newStateIfNotComplete, boolean forceState) // TODO: probably
            // get rid of
            // this
        throws JobPersistenceException {
        try {

            Trigger trig = getDelegate().selectTrigger(conn, triggerName,
                    groupName);

            long misfireTime = System.currentTimeMillis();
            if (getMisfireThreshold() > 0) {
                misfireTime -= getMisfireThreshold();
            }

            if (trig.getNextFireTime().getTime() > misfireTime) {
                return false;
            }

            doUpdateOfMisfiredTrigger(conn, ctxt, trig, forceState, newStateIfNotComplete, false);
            
            return true;

        } catch (Exception e) {
            throw new JobPersistenceException(
                    "Couldn't update misfired trigger '" + groupName + "."
                            + triggerName + "': " + e.getMessage(), e);
        }
    }

    private void doUpdateOfMisfiredTrigger(Connection conn, SchedulingContext ctxt, Trigger trig, boolean forceState, String newStateIfNotComplete, boolean recovering) throws JobPersistenceException {
            Calendar cal = null;
            if (trig.getCalendarName() != null) {
                cal = retrieveCalendar(conn, ctxt, trig.getCalendarName());
            }

            signaler.notifyTriggerListenersMisfired(trig);

            trig.updateAfterMisfire(cal);

            if (trig.getNextFireTime() == null) {
                storeTrigger(conn, ctxt, trig,
                    null, true, STATE_COMPLETE, forceState, recovering);
            } else {
                storeTrigger(conn, ctxt, trig, null, true, newStateIfNotComplete,
                        forceState, false);
            }
    }

    /**
     * <p>
     * Store the given <code>{@link org.quartz.JobDetail}</code> and <code>{@link org.quartz.Trigger}</code>.
     * </p>
     * 
     * @param newJob
     *          The <code>JobDetail</code> to be stored.
     * @param newTrigger
     *          The <code>Trigger</code> to be stored.
     * @throws ObjectAlreadyExistsException
     *           if a <code>Job</code> with the same name/group already
     *           exists.
     */
    public void storeJobAndTrigger(final SchedulingContext ctxt, final JobDetail newJob,
            final Trigger newTrigger) 
        throws ObjectAlreadyExistsException, JobPersistenceException {
        executeInLock(
            (isLockOnInsert()) ? LOCK_TRIGGER_ACCESS : null,
            new VoidTransactionCallback() {
                public void execute(Connection conn) throws JobPersistenceException {
                    if (newJob.isVolatile() && !newTrigger.isVolatile()) {
                        JobPersistenceException jpe = 
                            new JobPersistenceException(
                                "Cannot associate non-volatile trigger with a volatile job!");
                        jpe.setErrorCode(SchedulerException.ERR_CLIENT_ERROR);
                        throw jpe;
                    }

                    storeJob(conn, ctxt, newJob, false);
                    storeTrigger(conn, ctxt, newTrigger, newJob, false,
                            Constants.STATE_WAITING, false, false);
                }
            });
    }
    
    /**
     * <p>
     * Store the given <code>{@link org.quartz.JobDetail}</code>.
     * </p>
     * 
     * @param newJob
     *          The <code>JobDetail</code> to be stored.
     * @param replaceExisting
     *          If <code>true</code>, any <code>Job</code> existing in the
     *          <code>JobStore</code> with the same name & group should be
     *          over-written.
     * @throws ObjectAlreadyExistsException
     *           if a <code>Job</code> with the same name/group already
     *           exists, and replaceExisting is set to false.
     */
    public void storeJob(final SchedulingContext ctxt, final JobDetail newJob,
        final boolean replaceExisting) throws ObjectAlreadyExistsException, JobPersistenceException {
        executeInLock(
            (isLockOnInsert() || replaceExisting) ? LOCK_TRIGGER_ACCESS : null,
            new VoidTransactionCallback() {
                public void execute(Connection conn) throws JobPersistenceException {
                    storeJob(conn, ctxt, newJob, replaceExisting);
                }
            });
    }
    
    /**
     * <p>
     * Insert or update a job.
     * </p>
     */
    protected void storeJob(Connection conn, SchedulingContext ctxt,
            JobDetail newJob, boolean replaceExisting)
        throws ObjectAlreadyExistsException, JobPersistenceException {
        if (newJob.isVolatile() && isClustered()) {
            getLog().info(
                "note: volatile jobs are effectively non-volatile in a clustered environment.");
        }

        boolean existingJob = jobExists(conn, newJob.getName(), newJob
                .getGroup());
        try {
            if (existingJob) {
                if (!replaceExisting) { 
                    throw new ObjectAlreadyExistsException(newJob); 
                }
                getDelegate().updateJobDetail(conn, newJob);
            } else {
                getDelegate().insertJobDetail(conn, newJob);
            }
        } catch (IOException e) {
            throw new JobPersistenceException("Couldn't store job: "
                    + e.getMessage(), e);
        } catch (SQLException e) {
            throw new JobPersistenceException("Couldn't store job: "
                    + e.getMessage(), e);
        }
    }

    /**
     * <p>
     * Check existence of a given job.
     * </p>
     */
    protected boolean jobExists(Connection conn, String jobName,
            String groupName) throws JobPersistenceException {
        try {
            return getDelegate().jobExists(conn, jobName, groupName);
        } catch (SQLException e) {
            throw new JobPersistenceException(
                    "Couldn't determine job existence (" + groupName + "."
                            + jobName + "): " + e.getMessage(), e);
        }
    }


⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -