jobstorecmt.java

来自「Quartz 是个开源的作业调度框架」· Java 代码 · 共 1,470 行 · 第 1/4 页

JAVA
1,470
字号
/* 
 * Copyright 2004-2005 OpenSymphony 
 * 
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not 
 * use this file except in compliance with the License. You may obtain a copy 
 * of the License at 
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0 
 *   
 * Unless required by applicable law or agreed to in writing, software 
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 
 * License for the specific language governing permissions and limitations 
 * under the License.
 * 
 */

/*
 * Previously Copyright (c) 2001-2004 James House
 */
package org.quartz.impl.jdbcjobstore;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.Set;

import org.quartz.Calendar;
import org.quartz.JobDetail;
import org.quartz.JobPersistenceException;
import org.quartz.ObjectAlreadyExistsException;
import org.quartz.SchedulerConfigException;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.core.SchedulingContext;
import org.quartz.spi.ClassLoadHelper;
import org.quartz.spi.SchedulerSignaler;
import org.quartz.spi.TriggerFiredBundle;
import org.quartz.utils.DBConnectionManager;

/**
 * <p>
 * <code>JobStoreCMT</code> is meant to be used in an application-server
 * environment that provides container-managed-transactions. No commit /
 * rollback will be1 handled by this class.
 * </p>
 * 
 * <p>
 * If you need commit / rollback, use <code>{@link
 * org.quartz.impl.jdbcjobstore.JobStoreTX}</code>
 * instead.
 * </p>
 * 
 * @author <a href="mailto:jeff@binaryfeed.org">Jeffrey Wescott</a>
 * @author James House
 * @author Srinivas Venkatarangaiah
 *  
 */
public class JobStoreCMT extends JobStoreSupport {

    /*
     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     * 
     * Data members.
     * 
     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     */

    protected String nonManagedTxDsName;

    // Great name huh?
    protected boolean dontSetNonManagedTXConnectionAutoCommitFalse = false;

    
    protected boolean setTxIsolationLevelReadCommitted = false;
    
    /*
     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     * 
     * Interface.
     * 
     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     */

    /**
     * <p>
     * Set the name of the <code>DataSource</code> that should be used for
     * performing database functions.
     * </p>
     */
    public void setNonManagedTXDataSource(String nonManagedTxDsName) {
        this.nonManagedTxDsName = nonManagedTxDsName;
    }

    /**
     * <p>
     * Get the name of the <code>DataSource</code> that should be used for
     * performing database functions.
     * </p>
     */
    public String getNonManagedTXDataSource() {
        return nonManagedTxDsName;
    }

    public boolean isDontSetNonManagedTXConnectionAutoCommitFalse() {
        return dontSetNonManagedTXConnectionAutoCommitFalse;
    }

    /**
     * Don't call set autocommit(false) on connections obtained from the
     * DataSource. This can be helpfull in a few situations, such as if you
     * have a driver that complains if it is called when it is already off.
     * 
     * @param b
     */
    public void setDontSetNonManagedTXConnectionAutoCommitFalse(boolean b) {
        dontSetNonManagedTXConnectionAutoCommitFalse = b;
    }


    public boolean isTxIsolationLevelReadCommitted() {
        return setTxIsolationLevelReadCommitted;
    }

    /**
     * Set the transaction isolation level of DB connections to sequential.
     * 
     * @param b
     */
    public void setTxIsolationLevelReadCommitted(boolean b) {
        setTxIsolationLevelReadCommitted = b;
    }
    

    public void initialize(ClassLoadHelper loadHelper,
            SchedulerSignaler signaler) throws SchedulerConfigException {

        if (nonManagedTxDsName == null)
                throw new SchedulerConfigException(
                        "Non-ManagedTX DataSource name not set!");

        setUseDBLocks(true); // *must* use DB locks with CMT...

        super.initialize(loadHelper, signaler);

        getLog().info("JobStoreCMT initialized.");
    }
    
    public void shutdown() {

        super.shutdown();
        
        try {
            DBConnectionManager.getInstance().shutdown(getNonManagedTXDataSource());
        } catch (SQLException sqle) {
            getLog().warn("Database connection shutdown unsuccessful.", sqle);
        }
    }
    

    //---------------------------------------------------------------------------
    // JobStoreSupport methods
    //---------------------------------------------------------------------------

    /**
     * <p>
     * Recover any failed or misfired jobs and clean up the data store as
     * appropriate.
     * </p>
     * 
     * @throws JobPersistenceException
     *           if jobs could not be recovered
     */
    protected void recoverJobs() throws JobPersistenceException {
        Connection conn = null;

        boolean transOwner = false;

        try {
            conn = getNonManagedTXConnection();

            getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
            transOwner = true;
            //getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);

            recoverJobs(conn);

            conn.commit();
        } catch (JobPersistenceException e) {
            rollbackConnection(conn);
            throw e;
        } catch (Exception e) {
            rollbackConnection(conn);
            throw new JobPersistenceException("Error recovering jobs: "
                    + e.getMessage(), e);
        } finally {
        	try {
                releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
        	} finally {
                closeConnection(conn);
        	}
        }
    }

    protected void cleanVolatileTriggerAndJobs() throws JobPersistenceException {
        Connection conn = null;

        boolean transOwner = false;

        try {
            conn = getNonManagedTXConnection();

            getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
            transOwner = true;
            //getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);

            cleanVolatileTriggerAndJobs(conn);

            conn.commit();
        } catch (JobPersistenceException e) {
            rollbackConnection(conn);
            throw e;
        } catch (Exception e) {
            rollbackConnection(conn);
            throw new JobPersistenceException("Error cleaning volatile data: "
                    + e.getMessage(), e);
        } finally {
        	try {
                releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
        	} finally {
                closeConnection(conn);
        	}
        }
    }

    //---------------------------------------------------------------------------
    // job / trigger storage methods
    //---------------------------------------------------------------------------

    /**
     * <p>
     * Store the given <code>{@link org.quartz.JobDetail}</code> and <code>{@link org.quartz.Trigger}</code>.
     * </p>
     * 
     * @param newJob
     *          The <code>JobDetail</code> to be stored.
     * @param newTrigger
     *          The <code>Trigger</code> to be stored.
     * @throws ObjectAlreadyExistsException
     *           if a <code>Job</code> with the same name/group already
     *           exists.
     */
    public void storeJobAndTrigger(SchedulingContext ctxt, JobDetail newJob,
            Trigger newTrigger) throws ObjectAlreadyExistsException,
            JobPersistenceException {
        Connection conn = getConnection();
        boolean transOwner = false;
        try {
            if(isLockOnInsert()) {
                getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
                transOwner = true;
                //getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);
            }

            if (newJob.isVolatile() && !newTrigger.isVolatile()) {
                JobPersistenceException jpe = new JobPersistenceException(
                        "Cannot associate non-volatile "
                                + "trigger with a volatile job!");
                jpe.setErrorCode(SchedulerException.ERR_CLIENT_ERROR);
                throw jpe;
            }

            storeJob(conn, ctxt, newJob, false);
            storeTrigger(conn, ctxt, newTrigger, newJob, false,
                    Constants.STATE_WAITING, false, false);
        } finally {
        	try {
                releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
        	} finally {
                closeConnection(conn);
        	}
        }
    }

    /**
     * <p>
     * Store the given <code>{@link org.quartz.JobDetail}</code>.
     * </p>
     * 
     * @param newJob
     *          The <code>JobDetail</code> to be stored.
     * @param replaceExisting
     *          If <code>true</code>, any <code>Job</code> existing in the
     *          <code>JobStore</code> with the same name & group should be
     *          over-written.
     * @throws ObjectAlreadyExistsException
     *           if a <code>Job</code> with the same name/group already
     *           exists, and replaceExisting is set to false.
     */
    public void storeJob(SchedulingContext ctxt, JobDetail newJob,
            boolean replaceExisting) throws ObjectAlreadyExistsException,
            JobPersistenceException {
        Connection conn = getConnection();
        boolean transOwner = false;
        try {
            if(isLockOnInsert() || replaceExisting) {
                getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
                transOwner = true;
                //getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);
            }
            
            storeJob(conn, ctxt, newJob, replaceExisting);
        } finally {
        	try {
                releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
        	} finally {
                closeConnection(conn);
        	}
        }
    }

    /**
     * <p>
     * Remove (delete) the <code>{@link org.quartz.Job}</code> with the given
     * name, and any <code>{@link org.quartz.Trigger}</code> s that reference
     * it.
     * </p>
     * 
     * <p>
     * If removal of the <code>Job</code> results in an empty group, the
     * group should be removed from the <code>JobStore</code>'s list of
     * known group names.
     * </p>
     * 
     * @param jobName
     *          The name of the <code>Job</code> to be removed.
     * @param groupName
     *          The group name of the <code>Job</code> to be removed.
     * @return <code>true</code> if a <code>Job</code> with the given name &
     *         group was found and removed from the store.
     */
    public boolean removeJob(SchedulingContext ctxt, String jobName,
            String groupName) throws JobPersistenceException {
        Connection conn = getConnection();
        boolean transOwner = false;
        try {
            getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
            transOwner = true;
            //getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);

            return removeJob(conn, ctxt, jobName, groupName, true);
        } finally {
        	try {
                releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
        	} finally {
                closeConnection(conn);
        	}
        }
    }

    /**
     * <p>
     * Retrieve the <code>{@link org.quartz.JobDetail}</code> for the given
     * <code>{@link org.quartz.Job}</code>.
     * </p>
     * 
     * @param jobName
     *          The name of the <code>Job</code> to be retrieved.

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?