📄 ramjobstore.java
字号:
TriggerWrapper tw = null; synchronized (triggerLock) { while (tw == null) { try { tw = (TriggerWrapper) timeTriggers.first(); } catch (java.util.NoSuchElementException nsee) { return null; } if (tw == null) return null; if (tw.trigger.getNextFireTime() == null) { timeTriggers.remove(tw); tw = null; continue; } timeTriggers.remove(tw); if (applyMisfire(tw)) { if (tw.trigger.getNextFireTime() != null) timeTriggers.add(tw); tw = null; continue; } tw.state = TriggerWrapper.STATE_ACQUIRED; tw.trigger.setFireInstanceId(getFiredTriggerRecordId()); Trigger trig = (Trigger) tw.trigger.clone(); return trig; } } return null; } /** * <p> * Inform the <code>JobStore</code> that the scheduler no longer plans to * fire the given <code>Trigger</code>, that it had previously acquired * (reserved). * </p> */ public void releaseAcquiredTrigger(SchedulingContext ctxt, Trigger trigger) { synchronized (triggerLock) { TriggerWrapper tw = (TriggerWrapper) triggersByFQN.get(TriggerWrapper .getTriggerNameKey(trigger)); if (tw != null && tw.state != TriggerWrapper.STATE_COMPLETE && tw.state != TriggerWrapper.STATE_PAUSED) { tw.state = TriggerWrapper.STATE_WAITING; timeTriggers.add(tw); } } } /** * <p> * Inform the <code>JobStore</code> that the scheduler is now firing the * given <code>Trigger</code> (executing its associated <code>Job</code>), * that it had previously acquired (reserved). * </p> */ public TriggerFiredBundle triggerFired(SchedulingContext ctxt, Trigger trigger) { synchronized (triggerLock) { TriggerWrapper tw = (TriggerWrapper) triggersByFQN.get(TriggerWrapper .getTriggerNameKey(trigger)); // was the trigger deleted since being acquired? if (tw == null || tw.trigger == null) return null; // was the trigger completed since being acquired? if (tw.state == TriggerWrapper.STATE_COMPLETE) return null; // was the trigger paused since being acquired? if (tw.state == TriggerWrapper.STATE_PAUSED) return null; // was the trigger blocked since being acquired? if (tw.state == TriggerWrapper.STATE_BLOCKED) return null; Calendar cal = null; if (tw.trigger.getCalendarName() != null) cal = retrieveCalendar(ctxt, tw.trigger.getCalendarName()); Date prevFireTime = trigger.getPreviousFireTime(); // call triggered on our copy, and the scheduler's copy tw.trigger.triggered(cal); trigger.triggered(cal); //tw.state = TriggerWrapper.STATE_EXECUTING; tw.state = TriggerWrapper.STATE_WAITING; TriggerFiredBundle bndle = new TriggerFiredBundle(retrieveJob(ctxt, trigger.getJobName(), trigger.getJobGroup()), trigger, cal, false, new Date(), trigger.getPreviousFireTime(), prevFireTime, trigger.getNextFireTime()); JobDetail job = bndle.getJobDetail(); if (job.isStateful()) { ArrayList trigs = getTriggerWrappersForJob(job.getName(), job .getGroup()); Iterator itr = trigs.iterator(); while (itr.hasNext()) { TriggerWrapper ttw = (TriggerWrapper) itr.next(); if(ttw.state == TriggerWrapper.STATE_WAITING) ttw.state = TriggerWrapper.STATE_BLOCKED; timeTriggers.remove(ttw); } blockedJobs.add(JobWrapper.getJobNameKey(job)); } else if (tw.trigger.getNextFireTime() != null) { synchronized (triggerLock) { timeTriggers.add(tw); } } return bndle; } } /** * <p> * Inform the <code>JobStore</code> that the scheduler has completed the * firing of the given <code>Trigger</code> (and the execution its * associated <code>Job</code>), and that the <code>{@link org.quartz.JobDataMap}</code> * in the given <code>JobDetail</code> should be updated if the <code>Job</code> * is stateful. * </p> */ public void triggeredJobComplete(SchedulingContext ctxt, Trigger trigger, JobDetail jobDetail, int triggerInstCode) { synchronized (triggerLock) { String jobKey = JobWrapper.getJobNameKey(jobDetail.getName(), jobDetail .getGroup()); JobWrapper jw = (JobWrapper) jobsByFQN.get(jobKey); TriggerWrapper tw = (TriggerWrapper) triggersByFQN.get(TriggerWrapper .getTriggerNameKey(trigger)); // It's possible that the job is null if: // 1- it was deleted during execution // 2- RAMJobStore is being used only for volatile jobs / triggers // from the JDBC job store if (jw != null) { JobDetail jd = jw.jobDetail; if (jobDetail.isStateful()) { JobDataMap newData = jobDetail.getJobDataMap(); if (newData != null) newData.clearDirtyFlag(); jd.setJobDataMap(newData); blockedJobs.remove(JobWrapper.getJobNameKey(jd)); ArrayList trigs = getTriggerWrappersForJob(jd.getName(), jd .getGroup()); Iterator itr = trigs.iterator(); while (itr.hasNext()) { TriggerWrapper ttw = (TriggerWrapper) itr.next(); if (ttw.state == TriggerWrapper.STATE_BLOCKED) { ttw.state = TriggerWrapper.STATE_WAITING; timeTriggers.add(ttw); } } } } else { // even if it was deleted, there may be cleanup to do blockedJobs.remove(JobWrapper.getJobNameKey(jobDetail)); } // check for trigger deleted during execution... if (tw != null) { if (triggerInstCode == Trigger.INSTRUCTION_DELETE_TRIGGER) { if(trigger.getNextFireTime() == null) { // double check for possible reschedule within job // execution, which would cancel the need to delete... if(tw.getTrigger().getNextFireTime() == null) removeTrigger(ctxt, trigger.getName(), trigger.getGroup()); } else removeTrigger(ctxt, trigger.getName(), trigger.getGroup()); } else if (triggerInstCode == Trigger.INSTRUCTION_SET_TRIGGER_COMPLETE) { tw.state = TriggerWrapper.STATE_COMPLETE; timeTriggers.remove(tw); } else if (triggerInstCode == Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_COMPLETE) { ArrayList tws = getTriggerWrappersForJob(trigger .getJobName(), trigger.getJobGroup()); Iterator itr = tws.iterator(); while (itr.hasNext()) { tw = (TriggerWrapper) itr.next(); tw.state = TriggerWrapper.STATE_COMPLETE; timeTriggers.remove(tw); } } } } } protected String peekTriggers() { StringBuffer str = new StringBuffer(); TriggerWrapper tw = null; synchronized (triggerLock) { Iterator itr = triggersByFQN.keySet().iterator(); while (itr.hasNext()) { tw = (TriggerWrapper) triggersByFQN.get(itr.next()); str.append(tw.trigger.getName()); str.append("/"); } } str.append(" | "); synchronized (triggerLock) { Iterator itr = timeTriggers.iterator(); while (itr.hasNext()) { tw = (TriggerWrapper) itr.next(); str.append(tw.trigger.getName()); str.append("->"); } } return str.toString(); } /** * @see org.quartz.spi.JobStore#getPausedTriggerGroups(org.quartz.core.SchedulingContext) */ public Set getPausedTriggerGroups(SchedulingContext ctxt) throws JobPersistenceException { HashSet set = new HashSet(); set.addAll(pausedTriggerGroups); return set; }}/******************************************************************************* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * Helper Classes. * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */class TriggerComparator implements Comparator { public int compare(Object obj1, Object obj2) { TriggerWrapper trig1 = (TriggerWrapper) obj1; TriggerWrapper trig2 = (TriggerWrapper) obj2; int comp = trig1.trigger.compareTo(trig2.trigger); if (comp == 0) return trig1.trigger.getFullName().compareTo(trig2.trigger.getFullName()); return comp; } public boolean equals(Object obj) { if (obj instanceof TriggerComparator) return true; return false; }}class JobWrapper { public String key; public JobDetail jobDetail; JobWrapper(JobDetail jobDetail) { this.jobDetail = jobDetail; key = getJobNameKey(jobDetail); } JobWrapper(JobDetail jobDetail, String key) { this.jobDetail = jobDetail; this.key = key; } static String getJobNameKey(JobDetail jobDetail) { return jobDetail.getGroup() + "_$x$x$_" + jobDetail.getName(); } static String getJobNameKey(String jobName, String groupName) { return groupName + "_$x$x$_" + jobName; } public boolean equals(Object obj) { if (obj instanceof JobWrapper) { JobWrapper jw = (JobWrapper) obj; if (jw.key.equals(this.key)) return true; } return false; } public int hashCode() { return key.hashCode(); } }class TriggerWrapper { public String key; public String jobKey; public Trigger trigger; public int state = STATE_WAITING; public final static int STATE_WAITING = 0; public final static int STATE_ACQUIRED = 1; public final static int STATE_EXECUTING = 2; public final static int STATE_COMPLETE = 3; public final static int STATE_PAUSED = 4; public final static int STATE_BLOCKED = 5; TriggerWrapper(Trigger trigger) { this.trigger = trigger; key = getTriggerNameKey(trigger); this.jobKey = JobWrapper.getJobNameKey(trigger.getJobName(), trigger .getJobGroup()); } TriggerWrapper(Trigger trigger, String key) { this.trigger = trigger; this.key = key; this.jobKey = JobWrapper.getJobNameKey(trigger.getJobName(), trigger .getJobGroup()); } static String getTriggerNameKey(Trigger trigger) { return trigger.getGroup() + "_$x$x$_" + trigger.getName(); } static String getTriggerNameKey(String triggerName, String groupName) { return groupName + "_$x$x$_" + triggerName; } public boolean equals(Object obj) { if (obj instanceof TriggerWrapper) { TriggerWrapper tw = (TriggerWrapper) obj; if (tw.key.equals(this.key)) return true; } return false; } public int hashCode() { return key.hashCode(); } public Trigger getTrigger() { return this.trigger; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -