jobstorecmt.java

来自「Quartz 是个开源的作业调度框架」· Java 代码 · 共 1,470 行 · 第 1/4 页

JAVA
1,470
字号
        boolean transOwner = false;
        try {
            getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
            transOwner = true;
            //getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);

            pauseAll(conn, ctxt);
        } finally {
        	try {
                releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
        	} finally {
                closeConnection(conn);
        	}
        }
    }

    /**
     * <p>
     * Resume (un-pause) all triggers - equivalent of calling <code>resumeTriggerGroup(group)</code>
     * on every group.
     * </p>
     * 
     * <p>
     * If any <code>Trigger</code> missed one or more fire-times, then the
     * <code>Trigger</code>'s misfire instruction will be applied.
     * </p>
     * 
     * @see #pauseAll(SchedulingContext)
     */
    public void resumeAll(SchedulingContext ctxt)
            throws JobPersistenceException {
        Connection conn = getConnection();
        boolean transOwner = false;
        try {
            getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
            transOwner = true;
            //getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);

            resumeAll(conn, ctxt);
        } finally {
        	try {
                releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
        	} finally {
                closeConnection(conn);
        	}
        }
    }

    //---------------------------------------------------------------------------
    // trigger firing methods
    //---------------------------------------------------------------------------

    /**
     * <p>
     * Get a handle to the next trigger to be fired, and mark it as 'reserved'
     * by the calling scheduler.
     * </p>
     * 
     * @see #releaseAcquiredTrigger(SchedulingContext, Trigger)
     */
    public Trigger acquireNextTrigger(SchedulingContext ctxt, long noLaterThan)
            throws JobPersistenceException {
        Connection conn = null;
        boolean transOwner = false;

        try {
            conn = getNonManagedTXConnection();

            getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
            transOwner = true;
            //getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);

            Trigger trigger = acquireNextTrigger(conn, ctxt, noLaterThan);

            conn.commit();
            return trigger;
        } catch (JobPersistenceException e) {
            rollbackConnection(conn);
            throw e;
        } catch (Exception e) {
            rollbackConnection(conn);
            throw new JobPersistenceException(
                    "Error acquiring next firable trigger: " + e.getMessage(),
                    e);
        } finally {
        	try {
                releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
        	} finally {
                closeConnection(conn);
        	}
        }
    }

    /**
     * <p>
     * Inform the <code>JobStore</code> that the scheduler no longer plans to
     * fire the given <code>Trigger</code>, that it had previously acquired
     * (reserved).
     * </p>
     */
    public void releaseAcquiredTrigger(SchedulingContext ctxt, Trigger trigger)
            throws JobPersistenceException {
        Connection conn = null;
        boolean transOwner = false;

        try {
            conn = getNonManagedTXConnection();

            getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
            transOwner = true;
            //getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);

            releaseAcquiredTrigger(conn, ctxt, trigger);
            conn.commit();
        } catch (JobPersistenceException e) {
            rollbackConnection(conn);
            throw e;
        } catch (Exception e) {
            rollbackConnection(conn);
            throw new JobPersistenceException(
                    "Error releasing acquired trigger: " + e.getMessage(), e);
        } finally {
        	try {
                releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
        	} finally {
                closeConnection(conn);
        	}
        }
    }

    /**
     * <p>
     * Inform the <code>JobStore</code> that the scheduler is now firing the
     * given <code>Trigger</code> (executing its associated <code>Job</code>),
     * that it had previously acquired (reserved).
     * </p>
     * 
     * @return null if the trigger or it's job or calendar no longer exist, or
     *         if the trigger was not successfully put into the 'executing'
     *         state.
     */
    public TriggerFiredBundle triggerFired(SchedulingContext ctxt,
            Trigger trigger) throws JobPersistenceException {
        Connection conn = null;
        boolean transOwner = false;

        try {
            conn = getNonManagedTXConnection();

            getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
            transOwner = true;
            //getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);

            TriggerFiredBundle tfb = null;
            JobPersistenceException err = null;
            try {
                tfb = triggerFired(conn, ctxt, trigger);
            } catch (JobPersistenceException jpe) {
                if (jpe.getErrorCode() != SchedulerException.ERR_PERSISTENCE_JOB_DOES_NOT_EXIST)
                        throw jpe;
                err = jpe;
            }

            if (err != null) throw err;

            conn.commit();
            return tfb;
        } catch (JobPersistenceException e) {
            rollbackConnection(conn);
            throw e;
        } catch (Exception e) {
            rollbackConnection(conn);
            throw new JobPersistenceException("TX failure: " + e.getMessage(),
                    e);
        } finally {
        	try {
                releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
        	} finally {
                closeConnection(conn);
        	}
        }
    }

    /**
     * <p>
     * Inform the <code>JobStore</code> that the scheduler has completed the
     * firing of the given <code>Trigger</code> (and the execution its
     * associated <code>Job</code>), and that the <code>{@link org.quartz.JobDataMap}</code>
     * in the given <code>JobDetail</code> should be updated if the <code>Job</code>
     * is stateful.
     * </p>
     */
    public void triggeredJobComplete(SchedulingContext ctxt, Trigger trigger,
            JobDetail jobDetail, int triggerInstCode)
            throws JobPersistenceException {
        Connection conn = null;
        boolean transOwner = false;

        try {
            conn = getNonManagedTXConnection();
            getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
            transOwner = true;
            //getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);

            triggeredJobComplete(conn, ctxt, trigger, jobDetail,
                    triggerInstCode);

            conn.commit();
        } catch (JobPersistenceException e) {
            rollbackConnection(conn);
            throw e;
        } catch (Exception e) {
            rollbackConnection(conn);
            throw new JobPersistenceException("TX failure: " + e.getMessage(),
                    e);
        } finally {
        	try {
                releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
        	} finally {
                closeConnection(conn);
        	}
        }
    }

    protected boolean doRecoverMisfires() throws JobPersistenceException {
        Connection conn = null;
        boolean transOwner = false;
        boolean moreToDo = false;

        try {
            conn = getNonManagedTXConnection();

            getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
            transOwner = true;

            try {
                moreToDo = recoverMisfiredJobs(conn, false);
            } catch (Exception e) {
                throw new JobPersistenceException(e.getMessage(), e);
            }

            conn.commit();

            return moreToDo;
        } catch (JobPersistenceException e) {
            rollbackConnection(conn);
            throw e;
        } catch (Exception e) {
            rollbackConnection(conn);
            throw new JobPersistenceException("TX failure: " + e.getMessage(),
                    e);
        } finally {
        	try {
                releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
        	} finally {
                closeConnection(conn);
        	}
        }

    }

    protected boolean doCheckin() throws JobPersistenceException {
        Connection conn = null;

        boolean transOwner = false;
        boolean transStateOwner = false;
        boolean recovered = false;

        try {
            conn = getNonManagedTXConnection();

            // Other than the first time, always checkin first to make sure there is 
            // work to be done before we aquire / the lock (since that is expensive, 
            // and is almost never necessary)
            List failedRecords = (firstCheckIn) ? null : clusterCheckIn(conn);
            
            if (firstCheckIn || (failedRecords.size() > 0)) {
                getLockHandler().obtainLock(conn, LOCK_STATE_ACCESS);
                transStateOwner = true;
                
                // Now that we own the lock, make sure we still have work to do. 
                // The first time through, we also need to make sure we update/create our state record
                failedRecords = (firstCheckIn) ? clusterCheckIn(conn) : findFailedInstances(conn);
    
                if (failedRecords.size() > 0) {
                    getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
                    //getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);
                    transOwner = true;
    
                    clusterRecover(conn, failedRecords);
                    recovered = true;
                }
            }
            conn.commit();
        } catch (JobPersistenceException e) {
            rollbackConnection(conn);
            throw e;
        } catch (Exception e) {
            rollbackConnection(conn);
            throw new JobPersistenceException("TX failure: " + e.getMessage(),
                    e);
        } finally {
        	try {
        		releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
        	} finally {
            	try {
            		releaseLock(conn, LOCK_STATE_ACCESS, transStateOwner);
            	} finally {
            		closeConnection(conn);
            	}
        	}
        }

        firstCheckIn = false;

        return recovered;
    }

    //---------------------------------------------------------------------------
    // private helpers
    //---------------------------------------------------------------------------


    protected Connection getNonManagedTXConnection()
            throws JobPersistenceException {
        try {
            Connection conn = DBConnectionManager.getInstance().getConnection(
                    getNonManagedTXDataSource());

            if (conn == null) { throw new SQLException(
                    "Could not get connection from DataSource '"
                            + getNonManagedTXDataSource() + "'"); }

            try {
	            if (!isDontSetNonManagedTXConnectionAutoCommitFalse())
	                    conn.setAutoCommit(false);
	
	            if (isTxIsolationLevelReadCommitted())
	                conn.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
            } catch (SQLException ingore) {
            } catch (Exception e) {
            	if(conn != null)
            		try { conn.close(); } catch(Throwable tt) {}
            		throw new JobPersistenceException(
            				"Failure setting up connection.", e);
            }

            return conn;
        } catch (SQLException sqle) {
            throw new JobPersistenceException(
                    "Failed to obtain DB connection from data source '"
                            + getNonManagedTXDataSource() + "': "
                            + sqle.toString(), sqle);
        } catch (Exception e) {
            throw new JobPersistenceException(
                    "Failed to obtain DB connection from data source '"
                            + getNonManagedTXDataSource() + "': "
                            + e.toString(), e,
                    JobPersistenceException.ERR_PERSISTENCE_CRITICAL_FAILURE);
        }
    }

}

// EOF

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?