ramjobstore.java

来自「Quartz 是个开源的作业调度框架」· Java 代码 · 共 1,527 行 · 第 1/4 页

JAVA
1,527
字号
                    if (tw.trigger.getNextFireTime() != null)
                            timeTriggers.add(tw);
                    tw = null;
                    continue;
                }

                if(tw.trigger.getNextFireTime().getTime() > noLaterThan) {
                    timeTriggers.add(tw);
                    return null;
                }
                
                tw.state = TriggerWrapper.STATE_ACQUIRED;

                tw.trigger.setFireInstanceId(getFiredTriggerRecordId());
                Trigger trig = (Trigger) tw.trigger.clone();
                return trig;
            }
        }

        return null;
    }

    /**
     * <p>
     * Inform the <code>JobStore</code> that the scheduler no longer plans to
     * fire the given <code>Trigger</code>, that it had previously acquired
     * (reserved).
     * </p>
     */
    public void releaseAcquiredTrigger(SchedulingContext ctxt, Trigger trigger) {
        synchronized (triggerLock) {
        TriggerWrapper tw = (TriggerWrapper) triggersByFQN.get(TriggerWrapper
                .getTriggerNameKey(trigger));
        if (tw != null && tw.state == TriggerWrapper.STATE_ACQUIRED) {
            tw.state = TriggerWrapper.STATE_WAITING;
                timeTriggers.add(tw);
            }
        }
    }

    /**
     * <p>
     * Inform the <code>JobStore</code> that the scheduler is now firing the
     * given <code>Trigger</code> (executing its associated <code>Job</code>),
     * that it had previously acquired (reserved).
     * </p>
     */
    public TriggerFiredBundle triggerFired(SchedulingContext ctxt,
            Trigger trigger) {

        synchronized (triggerLock) {
        TriggerWrapper tw = (TriggerWrapper) triggersByFQN.get(TriggerWrapper
                .getTriggerNameKey(trigger));
        // was the trigger deleted since being acquired?
        if (tw == null || tw.trigger == null) return null;
        // was the trigger completed since being acquired?
        if (tw.state == TriggerWrapper.STATE_COMPLETE) return null;
        // was the trigger paused since being acquired?
        if (tw.state == TriggerWrapper.STATE_PAUSED) return null;
        // was the trigger blocked since being acquired?
        if (tw.state == TriggerWrapper.STATE_BLOCKED) return null;
        // was the trigger paused and blocked since being acquired?
        if (tw.state == TriggerWrapper.STATE_PAUSED_BLOCKED) return null;
        
        Calendar cal = null;
        if (tw.trigger.getCalendarName() != null)
                cal = retrieveCalendar(ctxt, tw.trigger.getCalendarName());
        Date prevFireTime = trigger.getPreviousFireTime();
        // call triggered on our copy, and the scheduler's copy
        tw.trigger.triggered(cal);
        trigger.triggered(cal);
        //tw.state = TriggerWrapper.STATE_EXECUTING;
        tw.state = TriggerWrapper.STATE_WAITING;

        TriggerFiredBundle bndle = new TriggerFiredBundle(retrieveJob(ctxt,
                trigger.getJobName(), trigger.getJobGroup()), trigger, cal,
                false, new Date(), trigger.getPreviousFireTime(), prevFireTime,
                trigger.getNextFireTime());

        JobDetail job = bndle.getJobDetail();

        if (job.isStateful()) {
                ArrayList trigs = getTriggerWrappersForJob(job.getName(), job
                        .getGroup());
                Iterator itr = trigs.iterator();
                while (itr.hasNext()) {
                    TriggerWrapper ttw = (TriggerWrapper) itr.next();
                    if(ttw.state == TriggerWrapper.STATE_WAITING)
                        ttw.state = TriggerWrapper.STATE_BLOCKED;
                    if(ttw.state == TriggerWrapper.STATE_PAUSED)
                        ttw.state = TriggerWrapper.STATE_PAUSED_BLOCKED;
                    timeTriggers.remove(ttw);
                }
                blockedJobs.add(JobWrapper.getJobNameKey(job));
        } else if (tw.trigger.getNextFireTime() != null) {
            synchronized (triggerLock) {
                timeTriggers.add(tw);
            }
        }

        return bndle;
    }
    }

    /**
     * <p>
     * Inform the <code>JobStore</code> that the scheduler has completed the
     * firing of the given <code>Trigger</code> (and the execution its
     * associated <code>Job</code>), and that the <code>{@link org.quartz.JobDataMap}</code>
     * in the given <code>JobDetail</code> should be updated if the <code>Job</code>
     * is stateful.
     * </p>
     */
    public void triggeredJobComplete(SchedulingContext ctxt, Trigger trigger,
            JobDetail jobDetail, int triggerInstCode) {

        synchronized (triggerLock) {

            String jobKey = JobWrapper.getJobNameKey(jobDetail.getName(), jobDetail
                    .getGroup());
            JobWrapper jw = (JobWrapper) jobsByFQN.get(jobKey);
            TriggerWrapper tw = (TriggerWrapper) triggersByFQN.get(TriggerWrapper
                    .getTriggerNameKey(trigger));
    
            // It's possible that the job is null if:
            //   1- it was deleted during execution
            //   2- RAMJobStore is being used only for volatile jobs / triggers
            //      from the JDBC job store
            if (jw != null) {
                JobDetail jd = jw.jobDetail;
    
                if (jobDetail.isStateful()) {
                    JobDataMap newData = jobDetail.getJobDataMap();
                    if (newData != null) newData.clearDirtyFlag();
                    jd.setJobDataMap(newData);
                        blockedJobs.remove(JobWrapper.getJobNameKey(jd));
                        ArrayList trigs = getTriggerWrappersForJob(jd.getName(), jd
                                .getGroup());
                        Iterator itr = trigs.iterator();
                        while (itr.hasNext()) {
                            TriggerWrapper ttw = (TriggerWrapper) itr.next();
                            if (ttw.state == TriggerWrapper.STATE_BLOCKED) {
                                ttw.state = TriggerWrapper.STATE_WAITING;
                                timeTriggers.add(ttw);
                            }
                            if (ttw.state == TriggerWrapper.STATE_PAUSED_BLOCKED) {
                                ttw.state = TriggerWrapper.STATE_PAUSED;
                            }
                        }
                    }
                }
            else { // even if it was deleted, there may be cleanup to do
                blockedJobs.remove(JobWrapper.getJobNameKey(jobDetail));
            }
    
            // check for trigger deleted during execution...
            if (tw != null) {
                if (triggerInstCode == Trigger.INSTRUCTION_DELETE_TRIGGER) {
                    
                    if(trigger.getNextFireTime() == null) {
                        // double check for possible reschedule within job 
                        // execution, which would cancel the need to delete...
                        if(tw.getTrigger().getNextFireTime() == null) 
                            removeTrigger(ctxt, trigger.getName(), trigger.getGroup());
                    }
                    else
                        removeTrigger(ctxt, trigger.getName(), trigger.getGroup());
                }
                else if (triggerInstCode == Trigger.INSTRUCTION_SET_TRIGGER_COMPLETE) {
                    tw.state = TriggerWrapper.STATE_COMPLETE;
                        timeTriggers.remove(tw);
                }
                else if(triggerInstCode == Trigger.INSTRUCTION_SET_TRIGGER_ERROR) {
                    getLog().info("Trigger " + trigger.getFullName() + " set to ERROR state.");
                    tw.state = TriggerWrapper.STATE_ERROR;
                }
                else if (triggerInstCode == Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR) {
                    getLog().info("All triggers of Job " 
                            + trigger.getFullJobName() + " set to ERROR state.");
                    setAllTriggersOfJobToState(
                            trigger.getJobName(), 
                            trigger.getJobGroup(),
                            TriggerWrapper.STATE_ERROR);
                }
                else if (triggerInstCode == Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_COMPLETE) {
                    setAllTriggersOfJobToState(
                            trigger.getJobName(), 
                            trigger.getJobGroup(),
                            TriggerWrapper.STATE_COMPLETE);
                }
            }
        }
    }

    protected void setAllTriggersOfJobToState(String jobName, String jobGroup, int state) {
        ArrayList tws = getTriggerWrappersForJob(jobName, jobGroup);
        Iterator itr = tws.iterator();
        while (itr.hasNext()) {
            TriggerWrapper tw = (TriggerWrapper) itr.next();
            tw.state = state;
            if(state != TriggerWrapper.STATE_WAITING)
                timeTriggers.remove(tw);
        }
    }
    
    protected String peekTriggers() {

        StringBuffer str = new StringBuffer();
        TriggerWrapper tw = null;

        synchronized (triggerLock) {
            Iterator itr = triggersByFQN.keySet().iterator();
            while (itr.hasNext()) {
                tw = (TriggerWrapper) triggersByFQN.get(itr.next());
                str.append(tw.trigger.getName());
                str.append("/");
            }
        }
        str.append(" | ");

        synchronized (triggerLock) {
            Iterator itr = timeTriggers.iterator();
            while (itr.hasNext()) {
                tw = (TriggerWrapper) itr.next();
                str.append(tw.trigger.getName());
                str.append("->");
            }
        }

        return str.toString();
    }

    /** 
     * @see org.quartz.spi.JobStore#getPausedTriggerGroups(org.quartz.core.SchedulingContext)
     */
    public Set getPausedTriggerGroups(SchedulingContext ctxt) throws JobPersistenceException {
        HashSet set = new HashSet();
        
        set.addAll(pausedTriggerGroups);
        
        return set;
    }


}

/*******************************************************************************
 * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
 * 
 * Helper Classes. * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
 */

class TriggerComparator implements Comparator {

    public int compare(Object obj1, Object obj2) {
        TriggerWrapper trig1 = (TriggerWrapper) obj1;
        TriggerWrapper trig2 = (TriggerWrapper) obj2;

        int comp = trig1.trigger.compareTo(trig2.trigger);

        if (comp == 0)
            return trig1.trigger.getFullName().compareTo(trig2.trigger.getFullName());

        return comp;
    }

    public boolean equals(Object obj) {
        if (obj instanceof TriggerComparator) return true;

        return false;
    }
}

class JobWrapper {

    public String key;

    public JobDetail jobDetail;

    JobWrapper(JobDetail jobDetail) {
        this.jobDetail = jobDetail;
        key = getJobNameKey(jobDetail);
    }

    JobWrapper(JobDetail jobDetail, String key) {
        this.jobDetail = jobDetail;
        this.key = key;
    }

    static String getJobNameKey(JobDetail jobDetail) {
        return jobDetail.getGroup() + "_$x$x$_" + jobDetail.getName();
    }

    static String getJobNameKey(String jobName, String groupName) {
        return groupName + "_$x$x$_" + jobName;
    }

    public boolean equals(Object obj) {
        if (obj instanceof JobWrapper) {
            JobWrapper jw = (JobWrapper) obj;
            if (jw.key.equals(this.key)) return true;
        }

        return false;
    }
    
    public int hashCode() {
        return key.hashCode(); 
    }
    

}

class TriggerWrapper {

    public String key;

    public String jobKey;

    public Trigger trigger;

    public int state = STATE_WAITING;

    public final static int STATE_WAITING = 0;

    public final static int STATE_ACQUIRED = 1;

    public final static int STATE_EXECUTING = 2;

    public final static int STATE_COMPLETE = 3;

    public final static int STATE_PAUSED = 4;

    public final static int STATE_BLOCKED = 5;

    public final static int STATE_PAUSED_BLOCKED = 6;

    public final static int STATE_ERROR = 7;
    
    TriggerWrapper(Trigger trigger) {
        this.trigger = trigger;
        key = getTriggerNameKey(trigger);
        this.jobKey = JobWrapper.getJobNameKey(trigger.getJobName(), trigger
                .getJobGroup());
    }

    TriggerWrapper(Trigger trigger, String key) {
        this.trigger = trigger;
        this.key = key;
        this.jobKey = JobWrapper.getJobNameKey(trigger.getJobName(), trigger
                .getJobGroup());
    }

    static String getTriggerNameKey(Trigger trigger) {
        return trigger.getGroup() + "_$x$x$_" + trigger.getName();
    }

    static String getTriggerNameKey(String triggerName, String groupName) {
        return groupName + "_$x$x$_" + triggerName;
    }

    public boolean equals(Object obj) {
        if (obj instanceof TriggerWrapper) {
            TriggerWrapper tw = (TriggerWrapper) obj;
            if (tw.key.equals(this.key)) return true;
        }

        return false;
    }

    public int hashCode() {
        return key.hashCode(); 
    }

    
    public Trigger getTrigger() {
        return this.trigger;
    }

}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?