📄 ramjobstore.java
字号:
}
if (tw == null) {
return null;
}
if (tw.trigger.getNextFireTime() == null) {
timeTriggers.remove(tw);
tw = null;
continue;
}
timeTriggers.remove(tw);
if (applyMisfire(tw)) {
if (tw.trigger.getNextFireTime() != null) {
timeTriggers.add(tw);
}
tw = null;
continue;
}
if(tw.trigger.getNextFireTime().getTime() > noLaterThan) {
timeTriggers.add(tw);
return null;
}
tw.state = TriggerWrapper.STATE_ACQUIRED;
tw.trigger.setFireInstanceId(getFiredTriggerRecordId());
Trigger trig = (Trigger) tw.trigger.clone();
return trig;
}
}
return null;
}
/**
* <p>
* Inform the <code>JobStore</code> that the scheduler no longer plans to
* fire the given <code>Trigger</code>, that it had previously acquired
* (reserved).
* </p>
*/
public void releaseAcquiredTrigger(SchedulingContext ctxt, Trigger trigger) {
synchronized (triggerLock) {
TriggerWrapper tw = (TriggerWrapper) triggersByFQN.get(TriggerWrapper
.getTriggerNameKey(trigger));
if (tw != null && tw.state == TriggerWrapper.STATE_ACQUIRED) {
tw.state = TriggerWrapper.STATE_WAITING;
timeTriggers.add(tw);
}
}
}
/**
* <p>
* Inform the <code>JobStore</code> that the scheduler is now firing the
* given <code>Trigger</code> (executing its associated <code>Job</code>),
* that it had previously acquired (reserved).
* </p>
*/
public TriggerFiredBundle triggerFired(SchedulingContext ctxt,
Trigger trigger) {
synchronized (triggerLock) {
TriggerWrapper tw = (TriggerWrapper) triggersByFQN.get(TriggerWrapper
.getTriggerNameKey(trigger));
// was the trigger deleted since being acquired?
if (tw == null || tw.trigger == null) {
return null;
}
// was the trigger completed, paused, blocked, etc. since being acquired?
if (tw.state != TriggerWrapper.STATE_ACQUIRED) {
return null;
}
Calendar cal = null;
if (tw.trigger.getCalendarName() != null) {
cal = retrieveCalendar(ctxt, tw.trigger.getCalendarName());
if(cal == null)
return null;
}
Date prevFireTime = trigger.getPreviousFireTime();
// in case trigger was replaced between acquiring and firering
timeTriggers.remove(tw);
// call triggered on our copy, and the scheduler's copy
tw.trigger.triggered(cal);
trigger.triggered(cal);
//tw.state = TriggerWrapper.STATE_EXECUTING;
tw.state = TriggerWrapper.STATE_WAITING;
TriggerFiredBundle bndle = new TriggerFiredBundle(retrieveJob(ctxt,
trigger.getJobName(), trigger.getJobGroup()), trigger, cal,
false, new Date(), trigger.getPreviousFireTime(), prevFireTime,
trigger.getNextFireTime());
JobDetail job = bndle.getJobDetail();
if (job.isStateful()) {
ArrayList trigs = getTriggerWrappersForJob(job.getName(), job
.getGroup());
Iterator itr = trigs.iterator();
while (itr.hasNext()) {
TriggerWrapper ttw = (TriggerWrapper) itr.next();
if(ttw.state == TriggerWrapper.STATE_WAITING) {
ttw.state = TriggerWrapper.STATE_BLOCKED;
}
if(ttw.state == TriggerWrapper.STATE_PAUSED) {
ttw.state = TriggerWrapper.STATE_PAUSED_BLOCKED;
}
timeTriggers.remove(ttw);
}
blockedJobs.add(JobWrapper.getJobNameKey(job));
} else if (tw.trigger.getNextFireTime() != null) {
synchronized (triggerLock) {
timeTriggers.add(tw);
}
}
return bndle;
}
}
/**
* <p>
* Inform the <code>JobStore</code> that the scheduler has completed the
* firing of the given <code>Trigger</code> (and the execution its
* associated <code>Job</code>), and that the <code>{@link org.quartz.JobDataMap}</code>
* in the given <code>JobDetail</code> should be updated if the <code>Job</code>
* is stateful.
* </p>
*/
public void triggeredJobComplete(SchedulingContext ctxt, Trigger trigger,
JobDetail jobDetail, int triggerInstCode) {
synchronized (triggerLock) {
String jobKey = JobWrapper.getJobNameKey(jobDetail.getName(), jobDetail
.getGroup());
JobWrapper jw = (JobWrapper) jobsByFQN.get(jobKey);
TriggerWrapper tw = (TriggerWrapper) triggersByFQN.get(TriggerWrapper
.getTriggerNameKey(trigger));
// It's possible that the job is null if:
// 1- it was deleted during execution
// 2- RAMJobStore is being used only for volatile jobs / triggers
// from the JDBC job store
if (jw != null) {
JobDetail jd = jw.jobDetail;
if (jd.isStateful()) {
JobDataMap newData = jobDetail.getJobDataMap();
if (newData != null) {
newData = (JobDataMap)newData.clone();
newData.clearDirtyFlag();
}
jd.setJobDataMap(newData);
blockedJobs.remove(JobWrapper.getJobNameKey(jd));
ArrayList trigs = getTriggerWrappersForJob(jd.getName(), jd
.getGroup());
Iterator itr = trigs.iterator();
while (itr.hasNext()) {
TriggerWrapper ttw = (TriggerWrapper) itr.next();
if (ttw.state == TriggerWrapper.STATE_BLOCKED) {
ttw.state = TriggerWrapper.STATE_WAITING;
timeTriggers.add(ttw);
}
if (ttw.state == TriggerWrapper.STATE_PAUSED_BLOCKED) {
ttw.state = TriggerWrapper.STATE_PAUSED;
}
}
signaler.signalSchedulingChange(0L);
}
} else { // even if it was deleted, there may be cleanup to do
blockedJobs.remove(JobWrapper.getJobNameKey(jobDetail));
}
// check for trigger deleted during execution...
if (tw != null) {
if (triggerInstCode == Trigger.INSTRUCTION_DELETE_TRIGGER) {
if(trigger.getNextFireTime() == null) {
// double check for possible reschedule within job
// execution, which would cancel the need to delete...
if(tw.getTrigger().getNextFireTime() == null) {
removeTrigger(ctxt, trigger.getName(), trigger.getGroup());
}
} else {
removeTrigger(ctxt, trigger.getName(), trigger.getGroup());
signaler.signalSchedulingChange(0L);
}
} else if (triggerInstCode == Trigger.INSTRUCTION_SET_TRIGGER_COMPLETE) {
tw.state = TriggerWrapper.STATE_COMPLETE;
timeTriggers.remove(tw);
signaler.signalSchedulingChange(0L);
} else if(triggerInstCode == Trigger.INSTRUCTION_SET_TRIGGER_ERROR) {
getLog().info("Trigger " + trigger.getFullName() + " set to ERROR state.");
tw.state = TriggerWrapper.STATE_ERROR;
signaler.signalSchedulingChange(0L);
} else if (triggerInstCode == Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR) {
getLog().info("All triggers of Job "
+ trigger.getFullJobName() + " set to ERROR state.");
setAllTriggersOfJobToState(
trigger.getJobName(),
trigger.getJobGroup(),
TriggerWrapper.STATE_ERROR);
signaler.signalSchedulingChange(0L);
} else if (triggerInstCode == Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_COMPLETE) {
setAllTriggersOfJobToState(
trigger.getJobName(),
trigger.getJobGroup(),
TriggerWrapper.STATE_COMPLETE);
signaler.signalSchedulingChange(0L);
}
}
}
}
protected void setAllTriggersOfJobToState(String jobName, String jobGroup, int state) {
ArrayList tws = getTriggerWrappersForJob(jobName, jobGroup);
Iterator itr = tws.iterator();
while (itr.hasNext()) {
TriggerWrapper tw = (TriggerWrapper) itr.next();
tw.state = state;
if(state != TriggerWrapper.STATE_WAITING) {
timeTriggers.remove(tw);
}
}
}
protected String peekTriggers() {
StringBuffer str = new StringBuffer();
TriggerWrapper tw = null;
synchronized (triggerLock) {
for (Iterator valueIter = triggersByFQN.values().iterator(); valueIter.hasNext();) {
tw = (TriggerWrapper)valueIter.next();
str.append(tw.trigger.getName());
str.append("/");
}
}
str.append(" | ");
synchronized (triggerLock) {
Iterator itr = timeTriggers.iterator();
while (itr.hasNext()) {
tw = (TriggerWrapper) itr.next();
str.append(tw.trigger.getName());
str.append("->");
}
}
return str.toString();
}
/**
* @see org.quartz.spi.JobStore#getPausedTriggerGroups(org.quartz.core.SchedulingContext)
*/
public Set getPausedTriggerGroups(SchedulingContext ctxt) throws JobPersistenceException {
HashSet set = new HashSet();
set.addAll(pausedTriggerGroups);
return set;
}
}
/*******************************************************************************
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*
* Helper Classes. * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*/
class TriggerComparator implements Comparator {
public int compare(Object obj1, Object obj2) {
TriggerWrapper trig1 = (TriggerWrapper) obj1;
TriggerWrapper trig2 = (TriggerWrapper) obj2;
int comp = trig1.trigger.compareTo(trig2.trigger);
if (comp != 0) {
return comp;
}
comp = trig2.trigger.getPriority() - trig1.trigger.getPriority();
if (comp != 0) {
return comp;
}
return trig1.trigger.getFullName().compareTo(trig2.trigger.getFullName());
}
public boolean equals(Object obj) {
return (obj instanceof TriggerComparator);
}
}
class JobWrapper {
public String key;
public JobDetail jobDetail;
JobWrapper(JobDetail jobDetail) {
this.jobDetail = jobDetail;
key = getJobNameKey(jobDetail);
}
JobWrapper(JobDetail jobDetail, String key) {
this.jobDetail = jobDetail;
this.key = key;
}
static String getJobNameKey(JobDetail jobDetail) {
return jobDetail.getGroup() + "_$x$x$_" + jobDetail.getName();
}
static String getJobNameKey(String jobName, String groupName) {
return groupName + "_$x$x$_" + jobName;
}
public boolean equals(Object obj) {
if (obj instanceof JobWrapper) {
JobWrapper jw = (JobWrapper) obj;
if (jw.key.equals(this.key)) {
return true;
}
}
return false;
}
public int hashCode() {
return key.hashCode();
}
}
class TriggerWrapper {
public String key;
public String jobKey;
public Trigger trigger;
public int state = STATE_WAITING;
public static final int STATE_WAITING = 0;
public static final int STATE_ACQUIRED = 1;
public static final int STATE_EXECUTING = 2;
public static final int STATE_COMPLETE = 3;
public static final int STATE_PAUSED = 4;
public static final int STATE_BLOCKED = 5;
public static final int STATE_PAUSED_BLOCKED = 6;
public static final int STATE_ERROR = 7;
TriggerWrapper(Trigger trigger) {
this.trigger = trigger;
key = getTriggerNameKey(trigger);
this.jobKey = JobWrapper.getJobNameKey(trigger.getJobName(), trigger
.getJobGroup());
}
static String getTriggerNameKey(Trigger trigger) {
return trigger.getGroup() + "_$x$x$_" + trigger.getName();
}
static String getTriggerNameKey(String triggerName, String groupName) {
return groupName + "_$x$x$_" + triggerName;
}
public boolean equals(Object obj) {
if (obj instanceof TriggerWrapper) {
TriggerWrapper tw = (TriggerWrapper) obj;
if (tw.key.equals(this.key)) {
return true;
}
}
return false;
}
public int hashCode() {
return key.hashCode();
}
public Trigger getTrigger() {
return this.trigger;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -