📄 jobstoresupport.java
字号:
* The the number of milliseconds by which a trigger must have missed its * next-fire-time, in order for it to be considered "misfired" and thus * have its misfire instruction applied. * * @param misfireThreshold */ public void setMisfireThreshold(long misfireThreshold) { if (misfireThreshold < 1) throw new IllegalArgumentException( "Misfirethreashold must be larger than 0"); this.misfireThreshold = misfireThreshold; } public boolean isDontSetAutoCommitFalse() { return dontSetAutoCommitFalse; } /** * Don't call set autocommit(false) on connections obtained from the * DataSource. This can be helpfull in a few situations, such as if you * have a driver that complains if it is called when it is already off. * * @param b */ public void setDontSetAutoCommitFalse(boolean b) { dontSetAutoCommitFalse = b; } public boolean isTxIsolationLevelSerializable() { return setTxIsolationLevelSequential; } /** * Set the transaction isolation level of DB connections to sequential. * * @param b */ public void setTxIsolationLevelSerializable(boolean b) { setTxIsolationLevelSequential = b; } /** * <p> * Set the JDBC driver delegate class. * </p> * * @param delegateClassName * the delegate class name */ public void setDriverDelegateClass(String delegateClassName) throws InvalidConfigurationException { try { delegateClass = Thread.currentThread().getContextClassLoader() .loadClass(delegateClassName); } catch (ClassNotFoundException e) { throw new InvalidConfigurationException("Invalid delegate class: " + delegateClassName); } } /** * <p> * Get the JDBC driver delegate class name. * </p> * * @return the delegate class name */ public String getDriverDelegateClass() { return delegateClass.getName(); } public String getSelectWithLockSQL() { return selectWithLockSQL; } /** * <p> * set the SQL statement to use to select and lock a row in the "locks" * table. * </p> * * @see StdRowLockSemaphore */ public void setSelectWithLockSQL(String string) { selectWithLockSQL = string; } protected ClassLoadHelper getClassLoadHelper() { return classLoadHelper; } //--------------------------------------------------------------------------- // interface methods //--------------------------------------------------------------------------- Log getLog() { return LogFactory.getLog(getClass()); } /** * <p> * Called by the QuartzScheduler before the <code>JobStore</code> is * used, in order to give the it a chance to initialize. * </p> */ public void initialize(ClassLoadHelper loadHelper, SchedulerSignaler signaler) throws SchedulerConfigException { if (dsName == null) { throw new SchedulerConfigException( "DataSource name not set."); } classLoadHelper = loadHelper; this.signaler = signaler; if (!getUseDBLocks() && !isClustered()) { getLog() .info( "Using thread monitor-based data access locking (synchronization)."); lockHandler = new SimpleSemaphore(); } else { getLog() .info( "Using db table-based data access locking (synchronization)."); lockHandler = new StdRowLockSemaphore(getTablePrefix(), getSelectWithLockSQL()); } if (!isClustered()) { try { cleanVolatileTriggerAndJobs(); recoverJobs(); } catch (SchedulerException se) { throw new SchedulerConfigException( "Failure occured during job recovery.", se); } } else { clusterManagementThread = new ClusterManager(this); clusterManagementThread.initialize(); } misfireHandler = new MisfireHandler(this); misfireHandler.initialize(); } /** * <p> * Called by the QuartzScheduler to inform the <code>JobStore</code> that * it should free up all of it's resources because the scheduler is * shutting down. * </p> */ public void shutdown() { if (clusterManagementThread != null) clusterManagementThread.shutdown(); if (misfireHandler != null) misfireHandler.shutdown(); try { DBConnectionManager.getInstance().shutdown(getDataSource()); } catch (SQLException sqle) { getLog().warn("Database connection shutdown unsuccessful.", sqle); } } public boolean supportsPersistence() { return true; } //--------------------------------------------------------------------------- // helper methods for subclasses //--------------------------------------------------------------------------- protected Connection getConnection() throws JobPersistenceException { try { Connection conn = DBConnectionManager.getInstance().getConnection( getDataSource()); if (conn == null) { throw new SQLException( "Could not get connection from DataSource '" + getDataSource() + "'"); } try { if (!isDontSetAutoCommitFalse()) conn.setAutoCommit(false); if(isTxIsolationLevelSerializable()) conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); } catch (SQLException ingore) { } return conn; } catch (SQLException sqle) { throw new JobPersistenceException( "Failed to obtain DB connection from data source '" + getDataSource() + "': " + sqle.toString(), sqle); } catch (Exception e) { throw new JobPersistenceException( "Failed to obtain DB connection from data source '" + getDataSource() + "': " + e.toString(), e, JobPersistenceException.ERR_PERSISTENCE_CRITICAL_FAILURE); } } protected void releaseLock(Connection conn, String lockName, boolean doIt) { if (doIt && conn != null) { try { getLockHandler().releaseLock(conn, lockName); } catch (LockException le) { getLog().error("Error returning lock: " + le.getMessage(), le); } } } /** * <p> * Removes all volatile data * </p> * * @throws JobPersistenceException * if jobs could not be recovered */ protected abstract void cleanVolatileTriggerAndJobs() throws JobPersistenceException; /** * <p> * Removes all volatile data. * </p> * * @throws JobPersistenceException * if jobs could not be recovered */ protected void cleanVolatileTriggerAndJobs(Connection conn) throws JobPersistenceException { try { // find volatile jobs & triggers... Key[] volatileTriggers = getDelegate().selectVolatileTriggers(conn); Key[] volatileJobs = getDelegate().selectVolatileJobs(conn); for (int i = 0; i < volatileTriggers.length; i++) { removeTrigger(conn, null, volatileTriggers[i].getName(), volatileTriggers[i].getGroup()); } getLog().info( "Removed " + volatileTriggers.length + " Volatile Trigger(s)."); for (int i = 0; i < volatileJobs.length; i++) { removeJob(conn, null, volatileJobs[i].getName(), volatileJobs[i].getGroup(), true); } getLog().info( "Removed " + volatileJobs.length + " Volatile Job(s)."); // clean up any fired trigger entries getDelegate().deleteVolatileFiredTriggers(conn); } catch (Exception e) { throw new JobPersistenceException("Couldn't clean volatile data: " + e.getMessage(), e); } } /** * <p> * Will recover any failed or misfired jobs and clean up the data store as * appropriate. * </p> * * @throws JobPersistenceException * if jobs could not be recovered */ protected abstract void recoverJobs() throws JobPersistenceException; /** * <p> * Will recover any failed or misfired jobs and clean up the data store as * appropriate. * </p> * * @throws JobPersistenceException * if jobs could not be recovered */ protected void recoverJobs(Connection conn) throws JobPersistenceException { try { // update inconsistent job states int rows = getDelegate().updateTriggerStatesFromOtherStates(conn, STATE_WAITING, STATE_ACQUIRED, STATE_BLOCKED); getLog().info( "Freed " + rows + " triggers from 'acquired' / 'blocked' state."); // clean up misfired jobs getDelegate().updateTriggerStateFromOtherStatesBeforeTime(conn, STATE_MISFIRED, STATE_WAITING, STATE_WAITING, getMisfireTime()); // only waiting recoverMisfiredJobs(conn, true); // recover jobs marked for recovery that were not fully executed Trigger[] recoveringJobTriggers = getDelegate() .selectTriggersForRecoveringJobs(conn); getLog() .info( "Recovering " + recoveringJobTriggers.length + " jobs that were in-progress at the time of the last shut-down."); for (int i = 0; i < recoveringJobTriggers.length; ++i) { if (jobExists(conn, recoveringJobTriggers[i].getJobName(), recoveringJobTriggers[i].getJobGroup())) { recoveringJobTriggers[i].computeFirstFireTime(null); storeTrigger(conn, null, recoveringJobTriggers[i], null, false, STATE_WAITING, false, true); } } getLog().info("Recovery complete."); // remove lingering 'complete' triggers... Key[] ct = getDelegate().selectTriggersInState(conn, STATE_COMPLETE); for(int i=0; ct != null && i < ct.length; i++) removeTrigger(conn, null, ct[i].getName(), ct[i].getGroup()); getLog().info( "Removed " + ct.length + " 'complete' triggers."); // clean up any fired trigger entries int n = getDelegate().deleteFiredTriggers(conn); getLog().info("Removed " + n + " stale fired job entries."); } catch (Exception e) { throw new JobPersistenceException("Couldn't recover jobs: " + e.getMessage(), e); } } protected long getMisfireTime() { long misfireTime = System.currentTimeMillis(); if (getMisfireThreshold() > 0) misfireTime -= getMisfireThreshold(); return misfireTime; } private int lastRecoverCount = 0; protected boolean recoverMisfiredJobs(Connection conn, boolean recovering) throws JobPersistenceException, NoSuchDelegateException, SQLException, ClassNotFoundException, IOException { Key[] misfiredTriggers = getDelegate().selectTriggersInState(conn, STATE_MISFIRED); if (misfiredTriggers.length > 0 && misfiredTriggers.length > getMaxMisfiresToHandleAtATime()) getLog() .info( "Handling " + getMaxMisfiresToHandleAtATime() + " of " + misfiredTriggers.length + " triggers that missed their scheduled fire-time."); else if (misfiredTriggers.length > 0) getLog().info( "Handling " + misfiredTriggers.length + " triggers that missed their scheduled fire-time."); else getLog().debug( "Found 0 triggers that missed their scheduled fire-time."); lastRecoverCount = misfiredTriggers.length;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -