jobstoresupport.java

来自「Quartz 是个开源的作业调度框架」· Java 代码 · 共 1,807 行 · 第 1/5 页

JAVA
1,807
字号
        return misfireThreshold;
    }

    /**
     * The the number of milliseconds by which a trigger must have missed its
     * next-fire-time, in order for it to be considered "misfired" and thus
     * have its misfire instruction applied.
     * 
     * @param misfireThreshold
     */
    public void setMisfireThreshold(long misfireThreshold) {
        if (misfireThreshold < 1)
                throw new IllegalArgumentException(
                        "Misfirethreashold must be larger than 0");
        this.misfireThreshold = misfireThreshold;
    }

    public boolean isDontSetAutoCommitFalse() {
        return dontSetAutoCommitFalse;
    }

    /**
     * Don't call set autocommit(false) on connections obtained from the
     * DataSource. This can be helpfull in a few situations, such as if you
     * have a driver that complains if it is called when it is already off.
     * 
     * @param b
     */
    public void setDontSetAutoCommitFalse(boolean b) {
        dontSetAutoCommitFalse = b;
    }

    public boolean isTxIsolationLevelSerializable() {
      return setTxIsolationLevelSequential;
    }

    /**
     * Set the transaction isolation level of DB connections to sequential.
     * 
     * @param b
     */
    public void setTxIsolationLevelSerializable(boolean b) {
      setTxIsolationLevelSequential = b;
    }

    
    /**
     * <p>
     * Set the JDBC driver delegate class.
     * </p>
     * 
     * @param delegateClassName
     *          the delegate class name
     */
    public void setDriverDelegateClass(String delegateClassName)
            throws InvalidConfigurationException {
        this.delegateClassName = delegateClassName;
    }

    /**
     * <p>
     * Get the JDBC driver delegate class name.
     * </p>
     * 
     * @return the delegate class name
     */
    public String getDriverDelegateClass() {
        return delegateClassName;
    }

    public String getSelectWithLockSQL() {
        return selectWithLockSQL;
    }

    /**
     * <p>
     * set the SQL statement to use to select and lock a row in the "locks"
     * table.
     * </p>
     * 
     * @see StdRowLockSemaphore
     */
    public void setSelectWithLockSQL(String string) {
        selectWithLockSQL = string;
    }

    protected ClassLoadHelper getClassLoadHelper() {
        return classLoadHelper;
    }

    //---------------------------------------------------------------------------
    // interface methods
    //---------------------------------------------------------------------------

    Log getLog() {
        return LogFactory.getLog(getClass());
    }

    /**
     * <p>
     * Called by the QuartzScheduler before the <code>JobStore</code> is
     * used, in order to give the it a chance to initialize.
     * </p>
     */
    public void initialize(ClassLoadHelper loadHelper,
            SchedulerSignaler signaler) throws SchedulerConfigException {

        if (dsName == null) { throw new SchedulerConfigException(
                "DataSource name not set."); }

        classLoadHelper = loadHelper;
        this.signaler = signaler;

        if (!getUseDBLocks() && !isClustered()) {
            getLog()
                    .info(
                            "Using thread monitor-based data access locking (synchronization).");
            lockHandler = new SimpleSemaphore();
        } else {
            getLog()
                    .info(
                            "Using db table-based data access locking (synchronization).");
            lockHandler = new StdRowLockSemaphore(getTablePrefix(),
                    getSelectWithLockSQL());
        }

        if (!isClustered()) {
            try {
                cleanVolatileTriggerAndJobs();
            } catch (SchedulerException se) {
                throw new SchedulerConfigException(
                        "Failure occured during job recovery.", se);
            }
        }
    }

    /**
     * @see org.quartz.spi.JobStore#schedulerStarted()
     */
    public void schedulerStarted() throws SchedulerException {

        if (isClustered()) {
            clusterManagementThread = new ClusterManager(this);
            clusterManagementThread.initialize();
        }
        else {
            try {
                recoverJobs();
            } catch (SchedulerException se) {
                throw new SchedulerConfigException(
                        "Failure occured during job recovery.", se);
            }
        }

        misfireHandler = new MisfireHandler(this);
        misfireHandler.initialize();
    }
    
    /**
     * <p>
     * Called by the QuartzScheduler to inform the <code>JobStore</code> that
     * it should free up all of it's resources because the scheduler is
     * shutting down.
     * </p>
     */
    public void shutdown() {
        if (clusterManagementThread != null)
                clusterManagementThread.shutdown();

        if (misfireHandler != null) misfireHandler.shutdown();
        
        try {
            DBConnectionManager.getInstance().shutdown(getDataSource());
        } catch (SQLException sqle) {
            getLog().warn("Database connection shutdown unsuccessful.", sqle);
        }        
    }

    public boolean supportsPersistence() {
        return true;
    }

    //---------------------------------------------------------------------------
    // helper methods for subclasses
    //---------------------------------------------------------------------------

    

    protected Connection getConnection() throws JobPersistenceException {
        try {
            Connection conn = DBConnectionManager.getInstance().getConnection(
                    getDataSource());

            if (conn == null) { throw new SQLException(
                    "Could not get connection from DataSource '"
                    + getDataSource() + "'"); }

            try {
                if (!isDontSetAutoCommitFalse()) conn.setAutoCommit(false);

                if(isTxIsolationLevelSerializable())
                  conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
            } catch (SQLException ingore) {
            } catch (Exception e) {
            	if(conn != null)
            		try { conn.close(); } catch(Throwable tt) {}
            		throw new JobPersistenceException(
            				"Failure setting up connection.", e);
            }

            return conn;
        } catch (SQLException sqle) {
            throw new JobPersistenceException(
                    "Failed to obtain DB connection from data source '"
                    + getDataSource() + "': " + sqle.toString(), sqle);
        } catch (Exception e) {
            throw new JobPersistenceException(
                    "Failed to obtain DB connection from data source '"
                    + getDataSource() + "': " + e.toString(), e,
                    JobPersistenceException.ERR_PERSISTENCE_CRITICAL_FAILURE);
        }
    }
    
    protected void releaseLock(Connection conn, String lockName, boolean doIt) {
      if (doIt && conn != null) {
        try {
          getLockHandler().releaseLock(conn, lockName);
        } catch (LockException le) {
          getLog().error("Error returning lock: " + le.getMessage(),
              le);
        }
      }
    }
    
    /**
     * <p>
     * Removes all volatile data
     * </p>
     * 
     * @throws JobPersistenceException
     *           if jobs could not be recovered
     */
    protected abstract void cleanVolatileTriggerAndJobs()
            throws JobPersistenceException;

    /**
     * <p>
     * Removes all volatile data.
     * </p>
     * 
     * @throws JobPersistenceException
     *           if jobs could not be recovered
     */
    protected void cleanVolatileTriggerAndJobs(Connection conn)
            throws JobPersistenceException {
        try {
            // find volatile jobs & triggers...
            Key[] volatileTriggers = getDelegate().selectVolatileTriggers(conn);
            Key[] volatileJobs = getDelegate().selectVolatileJobs(conn);

            for (int i = 0; i < volatileTriggers.length; i++) {
                removeTrigger(conn, null, volatileTriggers[i].getName(),
                        volatileTriggers[i].getGroup());
            }
            getLog().info(
                    "Removed " + volatileTriggers.length
                            + " Volatile Trigger(s).");

            for (int i = 0; i < volatileJobs.length; i++) {
                removeJob(conn, null, volatileJobs[i].getName(),
                        volatileJobs[i].getGroup(), true);
            }
            getLog().info(
                    "Removed " + volatileJobs.length + " Volatile Job(s).");

            // clean up any fired trigger entries
            getDelegate().deleteVolatileFiredTriggers(conn);

        } catch (Exception e) {
            throw new JobPersistenceException("Couldn't clean volatile data: "
                    + e.getMessage(), e);
        }
    }

    /**
     * <p>
     * Will recover any failed or misfired jobs and clean up the data store as
     * appropriate.
     * </p>
     * 
     * @throws JobPersistenceException
     *           if jobs could not be recovered
     */
    protected abstract void recoverJobs() throws JobPersistenceException;

    /**
     * <p>
     * Will recover any failed or misfired jobs and clean up the data store as
     * appropriate.
     * </p>
     * 
     * @throws JobPersistenceException
     *           if jobs could not be recovered
     */
    protected void recoverJobs(Connection conn) throws JobPersistenceException {
        try {
            // update inconsistent job states
            int rows = getDelegate().updateTriggerStatesFromOtherStates(conn,
                    STATE_WAITING, STATE_ACQUIRED, STATE_BLOCKED);

            rows += getDelegate().updateTriggerStatesFromOtherStates(conn,
                        STATE_PAUSED, STATE_PAUSED_BLOCKED, STATE_PAUSED_BLOCKED);
            
            getLog().info(
                    "Freed " + rows
                            + " triggers from 'acquired' / 'blocked' state.");

            // clean up misfired jobs
            getDelegate().updateTriggerStateFromOtherStatesBeforeTime(conn,
                    STATE_MISFIRED, STATE_WAITING, STATE_WAITING,
                    getMisfireTime()); // only waiting
            recoverMisfiredJobs(conn, true);
            
            // recover jobs marked for recovery that were not fully executed
            Trigger[] recoveringJobTriggers = getDelegate()
                    .selectTriggersForRecoveringJobs(conn);
            getLog()
                    .info(
                            "Recovering "
                                    + recoveringJobTriggers.length
                                    + " jobs that were in-progress at the time of the last shut-down.");

            for (int i = 0; i < recoveringJobTriggers.length; ++i) {
                if (jobExists(conn, recoveringJobTriggers[i].getJobName(),
                        recoveringJobTriggers[i].getJobGroup())) {
                    recoveringJobTriggers[i].computeFirstFireTime(null);
                    storeTrigger(conn, null, recoveringJobTriggers[i], null, false,
                            STATE_WAITING, false, true);
                }
            }
            getLog().info("Recovery complete.");

            // remove lingering 'complete' triggers...
            Key[] ct = getDelegate().selectTriggersInState(conn, STATE_COMPLETE);
            for(int i=0; ct != null && i < ct.length; i++)
              removeTrigger(conn, null, ct[i].getName(), ct[i].getGroup());
            getLog().info(
                "Removed " + ct.length
                + " 'complete' triggers.");
            
            // clean up any fired trigger entries
            int n = getDelegate().deleteFiredTriggers(conn);
            getLog().info("Removed " + n + " stale fired job entries.");
        } catch (Exception e) {
            throw new JobPersistenceException("Couldn't recover jobs: "
                    + e.getMessage(), e);
        }
    }

    protected long getMisfireTime() {
        long misfireTime = System.currentTimeMillis();
        if (getMisfireThreshold() > 0) misfireTime -= getMisfireThreshold();

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?