jobstoresupport.java
来自「Quartz 是个开源的作业调度框架」· Java 代码 · 共 1,807 行 · 第 1/5 页
JAVA
1,807 行
return misfireThreshold;
}
/**
* The the number of milliseconds by which a trigger must have missed its
* next-fire-time, in order for it to be considered "misfired" and thus
* have its misfire instruction applied.
*
* @param misfireThreshold
*/
public void setMisfireThreshold(long misfireThreshold) {
if (misfireThreshold < 1)
throw new IllegalArgumentException(
"Misfirethreashold must be larger than 0");
this.misfireThreshold = misfireThreshold;
}
public boolean isDontSetAutoCommitFalse() {
return dontSetAutoCommitFalse;
}
/**
* Don't call set autocommit(false) on connections obtained from the
* DataSource. This can be helpfull in a few situations, such as if you
* have a driver that complains if it is called when it is already off.
*
* @param b
*/
public void setDontSetAutoCommitFalse(boolean b) {
dontSetAutoCommitFalse = b;
}
public boolean isTxIsolationLevelSerializable() {
return setTxIsolationLevelSequential;
}
/**
* Set the transaction isolation level of DB connections to sequential.
*
* @param b
*/
public void setTxIsolationLevelSerializable(boolean b) {
setTxIsolationLevelSequential = b;
}
/**
* <p>
* Set the JDBC driver delegate class.
* </p>
*
* @param delegateClassName
* the delegate class name
*/
public void setDriverDelegateClass(String delegateClassName)
throws InvalidConfigurationException {
this.delegateClassName = delegateClassName;
}
/**
* <p>
* Get the JDBC driver delegate class name.
* </p>
*
* @return the delegate class name
*/
public String getDriverDelegateClass() {
return delegateClassName;
}
public String getSelectWithLockSQL() {
return selectWithLockSQL;
}
/**
* <p>
* set the SQL statement to use to select and lock a row in the "locks"
* table.
* </p>
*
* @see StdRowLockSemaphore
*/
public void setSelectWithLockSQL(String string) {
selectWithLockSQL = string;
}
protected ClassLoadHelper getClassLoadHelper() {
return classLoadHelper;
}
//---------------------------------------------------------------------------
// interface methods
//---------------------------------------------------------------------------
Log getLog() {
return LogFactory.getLog(getClass());
}
/**
* <p>
* Called by the QuartzScheduler before the <code>JobStore</code> is
* used, in order to give the it a chance to initialize.
* </p>
*/
public void initialize(ClassLoadHelper loadHelper,
SchedulerSignaler signaler) throws SchedulerConfigException {
if (dsName == null) { throw new SchedulerConfigException(
"DataSource name not set."); }
classLoadHelper = loadHelper;
this.signaler = signaler;
if (!getUseDBLocks() && !isClustered()) {
getLog()
.info(
"Using thread monitor-based data access locking (synchronization).");
lockHandler = new SimpleSemaphore();
} else {
getLog()
.info(
"Using db table-based data access locking (synchronization).");
lockHandler = new StdRowLockSemaphore(getTablePrefix(),
getSelectWithLockSQL());
}
if (!isClustered()) {
try {
cleanVolatileTriggerAndJobs();
} catch (SchedulerException se) {
throw new SchedulerConfigException(
"Failure occured during job recovery.", se);
}
}
}
/**
* @see org.quartz.spi.JobStore#schedulerStarted()
*/
public void schedulerStarted() throws SchedulerException {
if (isClustered()) {
clusterManagementThread = new ClusterManager(this);
clusterManagementThread.initialize();
}
else {
try {
recoverJobs();
} catch (SchedulerException se) {
throw new SchedulerConfigException(
"Failure occured during job recovery.", se);
}
}
misfireHandler = new MisfireHandler(this);
misfireHandler.initialize();
}
/**
* <p>
* Called by the QuartzScheduler to inform the <code>JobStore</code> that
* it should free up all of it's resources because the scheduler is
* shutting down.
* </p>
*/
public void shutdown() {
if (clusterManagementThread != null)
clusterManagementThread.shutdown();
if (misfireHandler != null) misfireHandler.shutdown();
try {
DBConnectionManager.getInstance().shutdown(getDataSource());
} catch (SQLException sqle) {
getLog().warn("Database connection shutdown unsuccessful.", sqle);
}
}
public boolean supportsPersistence() {
return true;
}
//---------------------------------------------------------------------------
// helper methods for subclasses
//---------------------------------------------------------------------------
protected Connection getConnection() throws JobPersistenceException {
try {
Connection conn = DBConnectionManager.getInstance().getConnection(
getDataSource());
if (conn == null) { throw new SQLException(
"Could not get connection from DataSource '"
+ getDataSource() + "'"); }
try {
if (!isDontSetAutoCommitFalse()) conn.setAutoCommit(false);
if(isTxIsolationLevelSerializable())
conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
} catch (SQLException ingore) {
} catch (Exception e) {
if(conn != null)
try { conn.close(); } catch(Throwable tt) {}
throw new JobPersistenceException(
"Failure setting up connection.", e);
}
return conn;
} catch (SQLException sqle) {
throw new JobPersistenceException(
"Failed to obtain DB connection from data source '"
+ getDataSource() + "': " + sqle.toString(), sqle);
} catch (Exception e) {
throw new JobPersistenceException(
"Failed to obtain DB connection from data source '"
+ getDataSource() + "': " + e.toString(), e,
JobPersistenceException.ERR_PERSISTENCE_CRITICAL_FAILURE);
}
}
protected void releaseLock(Connection conn, String lockName, boolean doIt) {
if (doIt && conn != null) {
try {
getLockHandler().releaseLock(conn, lockName);
} catch (LockException le) {
getLog().error("Error returning lock: " + le.getMessage(),
le);
}
}
}
/**
* <p>
* Removes all volatile data
* </p>
*
* @throws JobPersistenceException
* if jobs could not be recovered
*/
protected abstract void cleanVolatileTriggerAndJobs()
throws JobPersistenceException;
/**
* <p>
* Removes all volatile data.
* </p>
*
* @throws JobPersistenceException
* if jobs could not be recovered
*/
protected void cleanVolatileTriggerAndJobs(Connection conn)
throws JobPersistenceException {
try {
// find volatile jobs & triggers...
Key[] volatileTriggers = getDelegate().selectVolatileTriggers(conn);
Key[] volatileJobs = getDelegate().selectVolatileJobs(conn);
for (int i = 0; i < volatileTriggers.length; i++) {
removeTrigger(conn, null, volatileTriggers[i].getName(),
volatileTriggers[i].getGroup());
}
getLog().info(
"Removed " + volatileTriggers.length
+ " Volatile Trigger(s).");
for (int i = 0; i < volatileJobs.length; i++) {
removeJob(conn, null, volatileJobs[i].getName(),
volatileJobs[i].getGroup(), true);
}
getLog().info(
"Removed " + volatileJobs.length + " Volatile Job(s).");
// clean up any fired trigger entries
getDelegate().deleteVolatileFiredTriggers(conn);
} catch (Exception e) {
throw new JobPersistenceException("Couldn't clean volatile data: "
+ e.getMessage(), e);
}
}
/**
* <p>
* Will recover any failed or misfired jobs and clean up the data store as
* appropriate.
* </p>
*
* @throws JobPersistenceException
* if jobs could not be recovered
*/
protected abstract void recoverJobs() throws JobPersistenceException;
/**
* <p>
* Will recover any failed or misfired jobs and clean up the data store as
* appropriate.
* </p>
*
* @throws JobPersistenceException
* if jobs could not be recovered
*/
protected void recoverJobs(Connection conn) throws JobPersistenceException {
try {
// update inconsistent job states
int rows = getDelegate().updateTriggerStatesFromOtherStates(conn,
STATE_WAITING, STATE_ACQUIRED, STATE_BLOCKED);
rows += getDelegate().updateTriggerStatesFromOtherStates(conn,
STATE_PAUSED, STATE_PAUSED_BLOCKED, STATE_PAUSED_BLOCKED);
getLog().info(
"Freed " + rows
+ " triggers from 'acquired' / 'blocked' state.");
// clean up misfired jobs
getDelegate().updateTriggerStateFromOtherStatesBeforeTime(conn,
STATE_MISFIRED, STATE_WAITING, STATE_WAITING,
getMisfireTime()); // only waiting
recoverMisfiredJobs(conn, true);
// recover jobs marked for recovery that were not fully executed
Trigger[] recoveringJobTriggers = getDelegate()
.selectTriggersForRecoveringJobs(conn);
getLog()
.info(
"Recovering "
+ recoveringJobTriggers.length
+ " jobs that were in-progress at the time of the last shut-down.");
for (int i = 0; i < recoveringJobTriggers.length; ++i) {
if (jobExists(conn, recoveringJobTriggers[i].getJobName(),
recoveringJobTriggers[i].getJobGroup())) {
recoveringJobTriggers[i].computeFirstFireTime(null);
storeTrigger(conn, null, recoveringJobTriggers[i], null, false,
STATE_WAITING, false, true);
}
}
getLog().info("Recovery complete.");
// remove lingering 'complete' triggers...
Key[] ct = getDelegate().selectTriggersInState(conn, STATE_COMPLETE);
for(int i=0; ct != null && i < ct.length; i++)
removeTrigger(conn, null, ct[i].getName(), ct[i].getGroup());
getLog().info(
"Removed " + ct.length
+ " 'complete' triggers.");
// clean up any fired trigger entries
int n = getDelegate().deleteFiredTriggers(conn);
getLog().info("Removed " + n + " stale fired job entries.");
} catch (Exception e) {
throw new JobPersistenceException("Couldn't recover jobs: "
+ e.getMessage(), e);
}
}
protected long getMisfireTime() {
long misfireTime = System.currentTimeMillis();
if (getMisfireThreshold() > 0) misfireTime -= getMisfireThreshold();
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?