⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 quartzschedulerthread.java

📁 时间调度相关的开源代码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
                        // timeUntilTrigger, we spin here... don't worry
                        // though, this spinning
                        // doesn't even register 0.2% cpu usage on a pentium 4.
                        long numPauses = (timeUntilTrigger / spinInterval);
                        while (numPauses >= 0) {

                            try {
                                Thread.sleep(spinInterval);
                            } catch (InterruptedException ignore) {
                            }
                            
                            if (isScheduleChanged()) {
                            	if(isCandidateNewTimeEarlierWithinReason(triggerTime)) {
                            		// above call does a clearSignaledSchedulingChange()
                            		try {
    	                                qsRsrcs.getJobStore().releaseAcquiredTrigger(
    	                                        ctxt, trigger);
    	                            } catch (JobPersistenceException jpe) {
    	                                qs.notifySchedulerListenersError(
    	                                        "An error occured while releasing trigger '"
    	                                                + trigger.getFullName() + "'",
    	                                        jpe);
    	                                // db connection must have failed... keep
    	                                // retrying until it's up...
    	                                releaseTriggerRetryLoop(trigger);
    	                            } catch (RuntimeException e) {
    	                                getLog().error(
    	                                    "releaseTriggerRetryLoop: RuntimeException "
    	                                    +e.getMessage(), e);
    	                                // db connection must have failed... keep
    	                                // retrying until it's up...
    	                                releaseTriggerRetryLoop(trigger);
    	                            }
    	                            trigger = null;
    	                            break;
                            	}
                            }

                            now = System.currentTimeMillis();
                            timeUntilTrigger = triggerTime - now;
                            numPauses = (timeUntilTrigger / spinInterval);
                        }

                        if(trigger == null)
                        	continue;
                        
                        // set trigger to 'executing'
                        TriggerFiredBundle bndle = null;

                        synchronized(sigLock) {
                            if(!halted) {
                                try {
                                    bndle = qsRsrcs.getJobStore().triggerFired(ctxt,
                                            trigger);
                                } catch (SchedulerException se) {
                                    qs.notifySchedulerListenersError(
                                            "An error occured while firing trigger '"
                                                    + trigger.getFullName() + "'", se);
                                } catch (RuntimeException e) {
                                    getLog().error(
                                        "RuntimeException while firing trigger " +
                                        trigger.getFullName(), e);
                                    // db connection must have failed... keep
                                    // retrying until it's up...
                                    releaseTriggerRetryLoop(trigger);
                                }
                            }

                            // it's possible to get 'null' if the trigger was paused,
                            // blocked, or other similar occurences that prevent it being
                            // fired at this time...  or if the scheduler was shutdown (halted)
                            if (bndle == null) {
                                try {
                                    qsRsrcs.getJobStore().releaseAcquiredTrigger(ctxt,
                                            trigger);
                                } catch (SchedulerException se) {
                                    qs.notifySchedulerListenersError(
                                            "An error occured while releasing trigger '"
                                                    + trigger.getFullName() + "'", se);
                                    // db connection must have failed... keep retrying
                                    // until it's up...
                                    releaseTriggerRetryLoop(trigger);
                                }
                                continue;
                            }

                            // TODO: improvements:
                            //
                            // 2- make sure we can get a job runshell before firing trigger, or
                            //   don't let that throw an exception (right now it never does,
                            //   but the signature says it can).
                            // 3- acquire more triggers at a time (based on num threads available?)


                            JobRunShell shell = null;
                            try {
                                shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell();
                                shell.initialize(qs, bndle);
                            } catch (SchedulerException se) {
                                try {
                                    qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
                                            trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
                                } catch (SchedulerException se2) {
                                    qs.notifySchedulerListenersError(
                                            "An error occured while placing job's triggers in error state '"
                                                    + trigger.getFullName() + "'", se2);
                                    // db connection must have failed... keep retrying
                                    // until it's up...
                                    errorTriggerRetryLoop(bndle);
                                }
                                continue;
                            }

                            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                                try {
                                    // this case should never happen, as it is indicative of the
                                    // scheduler being shutdown or a bug in the thread pool or
                                    // a thread pool being used concurrently - which the docs
                                    // say not to do...
                                    getLog().error("ThreadPool.runInThread() return false!");
                                    qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
                                            trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
                                } catch (SchedulerException se2) {
                                    qs.notifySchedulerListenersError(
                                            "An error occured while placing job's triggers in error state '"
                                                    + trigger.getFullName() + "'", se2);
                                    // db connection must have failed... keep retrying
                                    // until it's up...
                                    releaseTriggerRetryLoop(trigger);
                                }
                            }
                        }

                        continue;
                    }
                } else { // if(availTreadCount > 0)
                    continue; // should never happen, if threadPool.blockForAvailableThreads() follows contract
                }

                // this looping may seem a bit silly, but it's the current
                // work-around
                // for a dead-lock that can occur if the Thread.sleep() is replaced
                // with
                // a obj.wait() that gets notified when the signal is set...
                // so to be able to detect the signal change without sleeping the
                // entier
                // getRandomizedIdleWaitTime(), we spin here... don't worry though,
                // the
                // CPU usage of this spinning can't even be measured on a pentium
                // 4.
                long now = System.currentTimeMillis();
                long waitTime = now + getRandomizedIdleWaitTime();
                long timeUntilContinue = waitTime - now;
                long spinInterval = 10;
                long numPauses = (timeUntilContinue / spinInterval);
    
                while (numPauses > 0) {
                	if(isScheduleChanged()) 
                		break;
    
                    try {
                        Thread.sleep(10L);
                    } catch (InterruptedException ignore) {
                    }
    
                    now = System.currentTimeMillis();
                    timeUntilContinue = waitTime - now;
                    numPauses = (timeUntilContinue / spinInterval);
                }
            } catch(RuntimeException re) {
                getLog().error("Runtime error occured in main trigger firing loop.", re);
            }
        } // loop...

        // drop references to scheduler stuff to aid garbage collection...
        qs = null;
        qsRsrcs = null;
    }

    private boolean isCandidateNewTimeEarlierWithinReason(long oldTime) {
    	
		// So here's the deal: We know due to being signaled that 'the schedule'
		// has changed.  We may know (if getSignaledNextFireTime() != 0) the
		// new earliest fire time.  We may not (in which case we will assume
		// that the new time is earlier than the trigger we have acquired).
		// In either case, we only want to abandon our acquired trigger and
		// go looking for a new one if "it's worth it".  It's only worth it if
		// the time cost incurred to abandon the trigger and acquire a new one 
		// is less than the time until the currently acquired trigger will fire,
		// otherwise we're just "thrashing" the job store (e.g. database).
		//
		// So the question becomes when is it "worth it"?  This will depend on
		// the job store implementation (and of course the particular database
		// or whatever behind it).  Ideally we would depend on the job store 
		// implementation to tell us the amount of time in which it "thinks"
		// it can abandon the acquired trigger and acquire a new one.  However
		// we have no current facility for having it tell us that, so we make
		// and somewhat educated but arbitrary guess ;-).

    	synchronized(sigLock) {
			
			boolean earlier = false;
			
			if(getSignaledNextFireTime() == 0)
				earlier = true;
			else if(getSignaledNextFireTime() < oldTime )
				earlier = true;
			
			if(earlier) {
				// so the new time is considered earlier, but is it enough earlier?
				long diff = System.currentTimeMillis() - oldTime;
				if(diff < (qsRsrcs.getJobStore().supportsPersistence() ? 120L : 10L))
					earlier = false;
			}
			
			clearSignaledSchedulingChange();
			
			return earlier;
        }
	}

	public void errorTriggerRetryLoop(TriggerFiredBundle bndle) {
        int retryCount = 0;
        try {
            while (!halted) {
                try {
                    Thread.sleep(getDbFailureRetryInterval()); // retry every N
                    // seconds (the db
                    // connection must
                    // be failed)
                    retryCount++;
                    qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
                            bndle.getTrigger(), bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
                    retryCount = 0;
                    break;
                } catch (JobPersistenceException jpe) {
                    if(retryCount % 4 == 0) {
                        qs.notifySchedulerListenersError(
                            "An error occured while releasing trigger '"
                                    + bndle.getTrigger().getFullName() + "'", jpe);
                    }
                } catch (RuntimeException e) {
                    getLog().error("releaseTriggerRetryLoop: RuntimeException "+e.getMessage(), e);
                } catch (InterruptedException e) {
                    getLog().error("releaseTriggerRetryLoop: InterruptedException "+e.getMessage(), e);
                }
            }
        } finally {
            if(retryCount == 0) {
                getLog().info("releaseTriggerRetryLoop: connection restored.");
            }
        }
    }
    
    public void releaseTriggerRetryLoop(Trigger trigger) {
        int retryCount = 0;
        try {
            while (!halted) {
                try {
                    Thread.sleep(getDbFailureRetryInterval()); // retry every N
                    // seconds (the db
                    // connection must
                    // be failed)
                    retryCount++;
                    qsRsrcs.getJobStore().releaseAcquiredTrigger(ctxt, trigger);
                    retryCount = 0;
                    break;
                } catch (JobPersistenceException jpe) {
                    if(retryCount % 4 == 0) {
                        qs.notifySchedulerListenersError(
                            "An error occured while releasing trigger '"
                                    + trigger.getFullName() + "'", jpe);
                    }
                } catch (RuntimeException e) {
                    getLog().error("releaseTriggerRetryLoop: RuntimeException "+e.getMessage(), e);
                } catch (InterruptedException e) {
                    getLog().error("releaseTriggerRetryLoop: InterruptedException "+e.getMessage(), e);
                }
            }
        } finally {
            if(retryCount == 0) {
                getLog().info("releaseTriggerRetryLoop: connection restored.");
            }
        }
    }
    
    public Log getLog() {
        return log;
    }

} // end of QuartzSchedulerThread

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -