jobstorecmt.java
来自「Quartz 是个开源的作业调度框架」· Java 代码 · 共 1,470 行 · 第 1/4 页
JAVA
1,470 行
boolean transOwner = false;
try {
getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
transOwner = true;
//getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);
pauseAll(conn, ctxt);
} finally {
try {
releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
} finally {
closeConnection(conn);
}
}
}
/**
* <p>
* Resume (un-pause) all triggers - equivalent of calling <code>resumeTriggerGroup(group)</code>
* on every group.
* </p>
*
* <p>
* If any <code>Trigger</code> missed one or more fire-times, then the
* <code>Trigger</code>'s misfire instruction will be applied.
* </p>
*
* @see #pauseAll(SchedulingContext)
*/
public void resumeAll(SchedulingContext ctxt)
throws JobPersistenceException {
Connection conn = getConnection();
boolean transOwner = false;
try {
getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
transOwner = true;
//getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);
resumeAll(conn, ctxt);
} finally {
try {
releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
} finally {
closeConnection(conn);
}
}
}
//---------------------------------------------------------------------------
// trigger firing methods
//---------------------------------------------------------------------------
/**
* <p>
* Get a handle to the next trigger to be fired, and mark it as 'reserved'
* by the calling scheduler.
* </p>
*
* @see #releaseAcquiredTrigger(SchedulingContext, Trigger)
*/
public Trigger acquireNextTrigger(SchedulingContext ctxt, long noLaterThan)
throws JobPersistenceException {
Connection conn = null;
boolean transOwner = false;
try {
conn = getNonManagedTXConnection();
getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
transOwner = true;
//getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);
Trigger trigger = acquireNextTrigger(conn, ctxt, noLaterThan);
conn.commit();
return trigger;
} catch (JobPersistenceException e) {
rollbackConnection(conn);
throw e;
} catch (Exception e) {
rollbackConnection(conn);
throw new JobPersistenceException(
"Error acquiring next firable trigger: " + e.getMessage(),
e);
} finally {
try {
releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
} finally {
closeConnection(conn);
}
}
}
/**
* <p>
* Inform the <code>JobStore</code> that the scheduler no longer plans to
* fire the given <code>Trigger</code>, that it had previously acquired
* (reserved).
* </p>
*/
public void releaseAcquiredTrigger(SchedulingContext ctxt, Trigger trigger)
throws JobPersistenceException {
Connection conn = null;
boolean transOwner = false;
try {
conn = getNonManagedTXConnection();
getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
transOwner = true;
//getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);
releaseAcquiredTrigger(conn, ctxt, trigger);
conn.commit();
} catch (JobPersistenceException e) {
rollbackConnection(conn);
throw e;
} catch (Exception e) {
rollbackConnection(conn);
throw new JobPersistenceException(
"Error releasing acquired trigger: " + e.getMessage(), e);
} finally {
try {
releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
} finally {
closeConnection(conn);
}
}
}
/**
* <p>
* Inform the <code>JobStore</code> that the scheduler is now firing the
* given <code>Trigger</code> (executing its associated <code>Job</code>),
* that it had previously acquired (reserved).
* </p>
*
* @return null if the trigger or it's job or calendar no longer exist, or
* if the trigger was not successfully put into the 'executing'
* state.
*/
public TriggerFiredBundle triggerFired(SchedulingContext ctxt,
Trigger trigger) throws JobPersistenceException {
Connection conn = null;
boolean transOwner = false;
try {
conn = getNonManagedTXConnection();
getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
transOwner = true;
//getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);
TriggerFiredBundle tfb = null;
JobPersistenceException err = null;
try {
tfb = triggerFired(conn, ctxt, trigger);
} catch (JobPersistenceException jpe) {
if (jpe.getErrorCode() != SchedulerException.ERR_PERSISTENCE_JOB_DOES_NOT_EXIST)
throw jpe;
err = jpe;
}
if (err != null) throw err;
conn.commit();
return tfb;
} catch (JobPersistenceException e) {
rollbackConnection(conn);
throw e;
} catch (Exception e) {
rollbackConnection(conn);
throw new JobPersistenceException("TX failure: " + e.getMessage(),
e);
} finally {
try {
releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
} finally {
closeConnection(conn);
}
}
}
/**
* <p>
* Inform the <code>JobStore</code> that the scheduler has completed the
* firing of the given <code>Trigger</code> (and the execution its
* associated <code>Job</code>), and that the <code>{@link org.quartz.JobDataMap}</code>
* in the given <code>JobDetail</code> should be updated if the <code>Job</code>
* is stateful.
* </p>
*/
public void triggeredJobComplete(SchedulingContext ctxt, Trigger trigger,
JobDetail jobDetail, int triggerInstCode)
throws JobPersistenceException {
Connection conn = null;
boolean transOwner = false;
try {
conn = getNonManagedTXConnection();
getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
transOwner = true;
//getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);
triggeredJobComplete(conn, ctxt, trigger, jobDetail,
triggerInstCode);
conn.commit();
} catch (JobPersistenceException e) {
rollbackConnection(conn);
throw e;
} catch (Exception e) {
rollbackConnection(conn);
throw new JobPersistenceException("TX failure: " + e.getMessage(),
e);
} finally {
try {
releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
} finally {
closeConnection(conn);
}
}
}
protected boolean doRecoverMisfires() throws JobPersistenceException {
Connection conn = null;
boolean transOwner = false;
boolean moreToDo = false;
try {
conn = getNonManagedTXConnection();
getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
transOwner = true;
try {
moreToDo = recoverMisfiredJobs(conn, false);
} catch (Exception e) {
throw new JobPersistenceException(e.getMessage(), e);
}
conn.commit();
return moreToDo;
} catch (JobPersistenceException e) {
rollbackConnection(conn);
throw e;
} catch (Exception e) {
rollbackConnection(conn);
throw new JobPersistenceException("TX failure: " + e.getMessage(),
e);
} finally {
try {
releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
} finally {
closeConnection(conn);
}
}
}
protected boolean doCheckin() throws JobPersistenceException {
Connection conn = null;
boolean transOwner = false;
boolean transStateOwner = false;
boolean recovered = false;
try {
conn = getNonManagedTXConnection();
// Other than the first time, always checkin first to make sure there is
// work to be done before we aquire / the lock (since that is expensive,
// and is almost never necessary)
List failedRecords = (firstCheckIn) ? null : clusterCheckIn(conn);
if (firstCheckIn || (failedRecords.size() > 0)) {
getLockHandler().obtainLock(conn, LOCK_STATE_ACCESS);
transStateOwner = true;
// Now that we own the lock, make sure we still have work to do.
// The first time through, we also need to make sure we update/create our state record
failedRecords = (firstCheckIn) ? clusterCheckIn(conn) : findFailedInstances(conn);
if (failedRecords.size() > 0) {
getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
//getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);
transOwner = true;
clusterRecover(conn, failedRecords);
recovered = true;
}
}
conn.commit();
} catch (JobPersistenceException e) {
rollbackConnection(conn);
throw e;
} catch (Exception e) {
rollbackConnection(conn);
throw new JobPersistenceException("TX failure: " + e.getMessage(),
e);
} finally {
try {
releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
} finally {
try {
releaseLock(conn, LOCK_STATE_ACCESS, transStateOwner);
} finally {
closeConnection(conn);
}
}
}
firstCheckIn = false;
return recovered;
}
//---------------------------------------------------------------------------
// private helpers
//---------------------------------------------------------------------------
protected Connection getNonManagedTXConnection()
throws JobPersistenceException {
try {
Connection conn = DBConnectionManager.getInstance().getConnection(
getNonManagedTXDataSource());
if (conn == null) { throw new SQLException(
"Could not get connection from DataSource '"
+ getNonManagedTXDataSource() + "'"); }
try {
if (!isDontSetNonManagedTXConnectionAutoCommitFalse())
conn.setAutoCommit(false);
if (isTxIsolationLevelReadCommitted())
conn.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
} catch (SQLException ingore) {
} catch (Exception e) {
if(conn != null)
try { conn.close(); } catch(Throwable tt) {}
throw new JobPersistenceException(
"Failure setting up connection.", e);
}
return conn;
} catch (SQLException sqle) {
throw new JobPersistenceException(
"Failed to obtain DB connection from data source '"
+ getNonManagedTXDataSource() + "': "
+ sqle.toString(), sqle);
} catch (Exception e) {
throw new JobPersistenceException(
"Failed to obtain DB connection from data source '"
+ getNonManagedTXDataSource() + "': "
+ e.toString(), e,
JobPersistenceException.ERR_PERSISTENCE_CRITICAL_FAILURE);
}
}
}
// EOF
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?