jobstorecmt.java
来自「Quartz 是个开源的作业调度框架」· Java 代码 · 共 1,470 行 · 第 1/4 页
JAVA
1,470 行
/*
* Copyright 2004-2005 OpenSymphony
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy
* of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
*/
/*
* Previously Copyright (c) 2001-2004 James House
*/
package org.quartz.impl.jdbcjobstore;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.Set;
import org.quartz.Calendar;
import org.quartz.JobDetail;
import org.quartz.JobPersistenceException;
import org.quartz.ObjectAlreadyExistsException;
import org.quartz.SchedulerConfigException;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.core.SchedulingContext;
import org.quartz.spi.ClassLoadHelper;
import org.quartz.spi.SchedulerSignaler;
import org.quartz.spi.TriggerFiredBundle;
import org.quartz.utils.DBConnectionManager;
/**
* <p>
* <code>JobStoreCMT</code> is meant to be used in an application-server
* environment that provides container-managed-transactions. No commit /
* rollback will be1 handled by this class.
* </p>
*
* <p>
* If you need commit / rollback, use <code>{@link
* org.quartz.impl.jdbcjobstore.JobStoreTX}</code>
* instead.
* </p>
*
* @author <a href="mailto:jeff@binaryfeed.org">Jeffrey Wescott</a>
* @author James House
* @author Srinivas Venkatarangaiah
*
*/
public class JobStoreCMT extends JobStoreSupport {
/*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*
* Data members.
*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
protected String nonManagedTxDsName;
// Great name huh?
protected boolean dontSetNonManagedTXConnectionAutoCommitFalse = false;
protected boolean setTxIsolationLevelReadCommitted = false;
/*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*
* Interface.
*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
/**
* <p>
* Set the name of the <code>DataSource</code> that should be used for
* performing database functions.
* </p>
*/
public void setNonManagedTXDataSource(String nonManagedTxDsName) {
this.nonManagedTxDsName = nonManagedTxDsName;
}
/**
* <p>
* Get the name of the <code>DataSource</code> that should be used for
* performing database functions.
* </p>
*/
public String getNonManagedTXDataSource() {
return nonManagedTxDsName;
}
public boolean isDontSetNonManagedTXConnectionAutoCommitFalse() {
return dontSetNonManagedTXConnectionAutoCommitFalse;
}
/**
* Don't call set autocommit(false) on connections obtained from the
* DataSource. This can be helpfull in a few situations, such as if you
* have a driver that complains if it is called when it is already off.
*
* @param b
*/
public void setDontSetNonManagedTXConnectionAutoCommitFalse(boolean b) {
dontSetNonManagedTXConnectionAutoCommitFalse = b;
}
public boolean isTxIsolationLevelReadCommitted() {
return setTxIsolationLevelReadCommitted;
}
/**
* Set the transaction isolation level of DB connections to sequential.
*
* @param b
*/
public void setTxIsolationLevelReadCommitted(boolean b) {
setTxIsolationLevelReadCommitted = b;
}
public void initialize(ClassLoadHelper loadHelper,
SchedulerSignaler signaler) throws SchedulerConfigException {
if (nonManagedTxDsName == null)
throw new SchedulerConfigException(
"Non-ManagedTX DataSource name not set!");
setUseDBLocks(true); // *must* use DB locks with CMT...
super.initialize(loadHelper, signaler);
getLog().info("JobStoreCMT initialized.");
}
public void shutdown() {
super.shutdown();
try {
DBConnectionManager.getInstance().shutdown(getNonManagedTXDataSource());
} catch (SQLException sqle) {
getLog().warn("Database connection shutdown unsuccessful.", sqle);
}
}
//---------------------------------------------------------------------------
// JobStoreSupport methods
//---------------------------------------------------------------------------
/**
* <p>
* Recover any failed or misfired jobs and clean up the data store as
* appropriate.
* </p>
*
* @throws JobPersistenceException
* if jobs could not be recovered
*/
protected void recoverJobs() throws JobPersistenceException {
Connection conn = null;
boolean transOwner = false;
try {
conn = getNonManagedTXConnection();
getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
transOwner = true;
//getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);
recoverJobs(conn);
conn.commit();
} catch (JobPersistenceException e) {
rollbackConnection(conn);
throw e;
} catch (Exception e) {
rollbackConnection(conn);
throw new JobPersistenceException("Error recovering jobs: "
+ e.getMessage(), e);
} finally {
try {
releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
} finally {
closeConnection(conn);
}
}
}
protected void cleanVolatileTriggerAndJobs() throws JobPersistenceException {
Connection conn = null;
boolean transOwner = false;
try {
conn = getNonManagedTXConnection();
getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
transOwner = true;
//getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);
cleanVolatileTriggerAndJobs(conn);
conn.commit();
} catch (JobPersistenceException e) {
rollbackConnection(conn);
throw e;
} catch (Exception e) {
rollbackConnection(conn);
throw new JobPersistenceException("Error cleaning volatile data: "
+ e.getMessage(), e);
} finally {
try {
releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
} finally {
closeConnection(conn);
}
}
}
//---------------------------------------------------------------------------
// job / trigger storage methods
//---------------------------------------------------------------------------
/**
* <p>
* Store the given <code>{@link org.quartz.JobDetail}</code> and <code>{@link org.quartz.Trigger}</code>.
* </p>
*
* @param newJob
* The <code>JobDetail</code> to be stored.
* @param newTrigger
* The <code>Trigger</code> to be stored.
* @throws ObjectAlreadyExistsException
* if a <code>Job</code> with the same name/group already
* exists.
*/
public void storeJobAndTrigger(SchedulingContext ctxt, JobDetail newJob,
Trigger newTrigger) throws ObjectAlreadyExistsException,
JobPersistenceException {
Connection conn = getConnection();
boolean transOwner = false;
try {
if(isLockOnInsert()) {
getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
transOwner = true;
//getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);
}
if (newJob.isVolatile() && !newTrigger.isVolatile()) {
JobPersistenceException jpe = new JobPersistenceException(
"Cannot associate non-volatile "
+ "trigger with a volatile job!");
jpe.setErrorCode(SchedulerException.ERR_CLIENT_ERROR);
throw jpe;
}
storeJob(conn, ctxt, newJob, false);
storeTrigger(conn, ctxt, newTrigger, newJob, false,
Constants.STATE_WAITING, false, false);
} finally {
try {
releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
} finally {
closeConnection(conn);
}
}
}
/**
* <p>
* Store the given <code>{@link org.quartz.JobDetail}</code>.
* </p>
*
* @param newJob
* The <code>JobDetail</code> to be stored.
* @param replaceExisting
* If <code>true</code>, any <code>Job</code> existing in the
* <code>JobStore</code> with the same name & group should be
* over-written.
* @throws ObjectAlreadyExistsException
* if a <code>Job</code> with the same name/group already
* exists, and replaceExisting is set to false.
*/
public void storeJob(SchedulingContext ctxt, JobDetail newJob,
boolean replaceExisting) throws ObjectAlreadyExistsException,
JobPersistenceException {
Connection conn = getConnection();
boolean transOwner = false;
try {
if(isLockOnInsert() || replaceExisting) {
getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
transOwner = true;
//getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);
}
storeJob(conn, ctxt, newJob, replaceExisting);
} finally {
try {
releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
} finally {
closeConnection(conn);
}
}
}
/**
* <p>
* Remove (delete) the <code>{@link org.quartz.Job}</code> with the given
* name, and any <code>{@link org.quartz.Trigger}</code> s that reference
* it.
* </p>
*
* <p>
* If removal of the <code>Job</code> results in an empty group, the
* group should be removed from the <code>JobStore</code>'s list of
* known group names.
* </p>
*
* @param jobName
* The name of the <code>Job</code> to be removed.
* @param groupName
* The group name of the <code>Job</code> to be removed.
* @return <code>true</code> if a <code>Job</code> with the given name &
* group was found and removed from the store.
*/
public boolean removeJob(SchedulingContext ctxt, String jobName,
String groupName) throws JobPersistenceException {
Connection conn = getConnection();
boolean transOwner = false;
try {
getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
transOwner = true;
//getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);
return removeJob(conn, ctxt, jobName, groupName, true);
} finally {
try {
releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
} finally {
closeConnection(conn);
}
}
}
/**
* <p>
* Retrieve the <code>{@link org.quartz.JobDetail}</code> for the given
* <code>{@link org.quartz.Job}</code>.
* </p>
*
* @param jobName
* The name of the <code>Job</code> to be retrieved.
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?