📄 jobstorecmt.java
字号:
* <p> * When <code>resumeAll()</code> is called (to un-pause), trigger misfire * instructions WILL be applied. * </p> * * @see #resumeAll(SchedulingContext) * @see #pauseTriggerGroup(SchedulingContext, String) */ public void pauseAll(SchedulingContext ctxt) throws JobPersistenceException { Connection conn = getConnection(); boolean transOwner = false; try { getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS); transOwner = true; //getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS); pauseAll(conn, ctxt); } finally { releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner); closeConnection(conn); } } /** * <p> * Resume (un-pause) all triggers - equivalent of calling <code>resumeTriggerGroup(group)</code> * on every group. * </p> * * <p> * If any <code>Trigger</code> missed one or more fire-times, then the * <code>Trigger</code>'s misfire instruction will be applied. * </p> * * @see #pauseAll(SchedulingContext) */ public void resumeAll(SchedulingContext ctxt) throws JobPersistenceException { Connection conn = getConnection(); boolean transOwner = false; try { getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS); transOwner = true; //getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS); resumeAll(conn, ctxt); } finally { releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner); closeConnection(conn); } } //--------------------------------------------------------------------------- // trigger firing methods //--------------------------------------------------------------------------- /** * <p> * Get a handle to the next trigger to be fired, and mark it as 'reserved' * by the calling scheduler. * </p> * * @see #releaseAcquiredTrigger(SchedulingContext, Trigger) */ public Trigger acquireNextTrigger(SchedulingContext ctxt) throws JobPersistenceException { Connection conn = null; boolean transOwner = false; try { conn = getNonManagedTXConnection(); getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS); transOwner = true; //getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS); Trigger trigger = acquireNextTrigger(conn, ctxt); conn.commit(); return trigger; } catch (JobPersistenceException e) { rollbackConnection(conn); throw e; } catch (Exception e) { rollbackConnection(conn); throw new JobPersistenceException( "Error acquiring next firable trigger: " + e.getMessage(), e); } finally { if (conn != null) { releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner); closeConnection(conn); } } } /** * <p> * Inform the <code>JobStore</code> that the scheduler no longer plans to * fire the given <code>Trigger</code>, that it had previously acquired * (reserved). * </p> */ public void releaseAcquiredTrigger(SchedulingContext ctxt, Trigger trigger) throws JobPersistenceException { Connection conn = null; boolean transOwner = false; try { conn = getNonManagedTXConnection(); getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS); transOwner = true; //getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS); releaseAcquiredTrigger(conn, ctxt, trigger); conn.commit(); } catch (JobPersistenceException e) { rollbackConnection(conn); throw e; } catch (Exception e) { rollbackConnection(conn); throw new JobPersistenceException( "Error releasing acquired trigger: " + e.getMessage(), e); } finally { if (conn != null) { releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner); closeConnection(conn); } } } /** * <p> * Inform the <code>JobStore</code> that the scheduler is now firing the * given <code>Trigger</code> (executing its associated <code>Job</code>), * that it had previously acquired (reserved). * </p> * * @return null if the trigger or it's job or calendar no longer exist, or * if the trigger was not successfully put into the 'executing' * state. */ public TriggerFiredBundle triggerFired(SchedulingContext ctxt, Trigger trigger) throws JobPersistenceException { Connection conn = null; boolean transOwner = false; try { conn = getNonManagedTXConnection(); getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS); transOwner = true; //getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS); TriggerFiredBundle tfb = null; JobPersistenceException err = null; try { tfb = triggerFired(conn, ctxt, trigger); } catch (JobPersistenceException jpe) { if (jpe.getErrorCode() != SchedulerException.ERR_PERSISTENCE_JOB_DOES_NOT_EXIST) throw jpe; err = jpe; } if (err != null) throw err; conn.commit(); return tfb; } catch (JobPersistenceException e) { rollbackConnection(conn); throw e; } catch (Exception e) { rollbackConnection(conn); throw new JobPersistenceException("TX failure: " + e.getMessage(), e); } finally { if (conn != null) { releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner); closeConnection(conn); } } } /** * <p> * Inform the <code>JobStore</code> that the scheduler has completed the * firing of the given <code>Trigger</code> (and the execution its * associated <code>Job</code>), and that the <code>{@link org.quartz.JobDataMap}</code> * in the given <code>JobDetail</code> should be updated if the <code>Job</code> * is stateful. * </p> */ public void triggeredJobComplete(SchedulingContext ctxt, Trigger trigger, JobDetail jobDetail, int triggerInstCode) throws JobPersistenceException { Connection conn = null; boolean transOwner = false; try { conn = getNonManagedTXConnection(); getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS); transOwner = true; //getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS); triggeredJobComplete(conn, ctxt, trigger, jobDetail, triggerInstCode); conn.commit(); } catch (JobPersistenceException e) { rollbackConnection(conn); throw e; } catch (Exception e) { rollbackConnection(conn); throw new JobPersistenceException("TX failure: " + e.getMessage(), e); } finally { if (conn != null) { releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner); closeConnection(conn); } } } protected boolean doRecoverMisfires() throws JobPersistenceException { Connection conn = null; boolean transOwner = false; boolean moreToDo = false; try { conn = getNonManagedTXConnection(); getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS); transOwner = true; try { moreToDo = recoverMisfiredJobs(conn, false); } catch (Exception e) { throw new JobPersistenceException(e.getMessage(), e); } conn.commit(); return moreToDo; } catch (JobPersistenceException e) { rollbackConnection(conn); throw e; } catch (Exception e) { rollbackConnection(conn); throw new JobPersistenceException("TX failure: " + e.getMessage(), e); } finally { if (conn != null) { releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner); closeConnection(conn); } } } protected boolean doCheckin() throws JobPersistenceException { Connection conn = null; boolean transOwner = false; boolean transStateOwner = false; boolean recovered = false; try { conn = getNonManagedTXConnection(); getLockHandler().obtainLock(conn, LOCK_STATE_ACCESS); transStateOwner = true; List failedRecords = clusterCheckIn(conn); if (failedRecords.size() > 0) { getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS); //getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS); transOwner = true; clusterRecover(conn, failedRecords); recovered = true; } conn.commit(); } catch (JobPersistenceException e) { rollbackConnection(conn); throw e; } catch (Exception e) { rollbackConnection(conn); throw new JobPersistenceException("TX failure: " + e.getMessage(), e); } finally { if (conn != null) { releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner); releaseLock(conn, LOCK_STATE_ACCESS, transStateOwner); closeConnection(conn); } } firstCheckIn = false; return recovered; } //--------------------------------------------------------------------------- // private helpers //--------------------------------------------------------------------------- protected Connection getNonManagedTXConnection() throws JobPersistenceException { try { Connection conn = DBConnectionManager.getInstance().getConnection( getNonManagedTXDataSource()); if (conn == null) { throw new SQLException( "Could not get connection from DataSource '" + getNonManagedTXDataSource() + "'"); } if (!isDontSetNonManagedTXConnectionAutoCommitFalse()) conn.setAutoCommit(false); if (isTxIsolationLevelReadCommitted()) conn.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED); return conn; } catch (SQLException sqle) { throw new JobPersistenceException( "Failed to obtain DB connection from data source '" + getNonManagedTXDataSource() + "': " + sqle.toString(), sqle); } catch (Exception e) { throw new JobPersistenceException( "Failed to obtain DB connection from data source '" + getNonManagedTXDataSource() + "': " + e.toString(), e, JobPersistenceException.ERR_PERSISTENCE_CRITICAL_FAILURE); } }}// EOF
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -