⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 quartzschedulerthread.java

📁 Quartz is a full-featured, open source job scheduling system that can be integrated with, or used al
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
                    }

                    if (trigger != null) {

                        now = System.currentTimeMillis();
                        long triggerTime = trigger.getNextFireTime().getTime();
                        long timeUntilTrigger = triggerTime - now;
                        while(timeUntilTrigger > 0) {
	                        synchronized(sigLock) {
		                        try {
		                        	// we could have blocked a long while
		                        	// on 'synchronize', so we must recompute
		                        	now = System.currentTimeMillis();
		                            timeUntilTrigger = triggerTime - now;
		                            if(timeUntilTrigger > 1)
		                            	sigLock.wait(timeUntilTrigger);
		                        } catch (InterruptedException ignore) {
		                        }
	                        }		                        
	                        if (isScheduleChanged()) {
	                        	if(isCandidateNewTimeEarlierWithinReason(triggerTime)) {
	                        		// above call does a clearSignaledSchedulingChange()
	                        		try {
		                                qsRsrcs.getJobStore().releaseAcquiredTrigger(
		                                        ctxt, trigger);
		                            } catch (JobPersistenceException jpe) {
		                                qs.notifySchedulerListenersError(
		                                        "An error occured while releasing trigger '"
		                                                + trigger.getFullName() + "'",
		                                        jpe);
		                                // db connection must have failed... keep
		                                // retrying until it's up...
		                                releaseTriggerRetryLoop(trigger);
		                            } catch (RuntimeException e) {
		                                getLog().error(
		                                    "releaseTriggerRetryLoop: RuntimeException "
		                                    +e.getMessage(), e);
		                                // db connection must have failed... keep
		                                // retrying until it's up...
		                                releaseTriggerRetryLoop(trigger);
		                            }
		                            trigger = null;
		                            break;
	                        	}
	                        }
	                        now = System.currentTimeMillis();
	                        timeUntilTrigger = triggerTime - now;
                        }
                        if(trigger == null)
                        	continue;
                        
                        // set trigger to 'executing'
                        TriggerFiredBundle bndle = null;

                        boolean goAhead = true;
                        synchronized(sigLock) {
                        	goAhead = !halted;
                        }
                        if(goAhead) {
                            try {
                                bndle = qsRsrcs.getJobStore().triggerFired(ctxt,
                                        trigger);
                            } catch (SchedulerException se) {
                                qs.notifySchedulerListenersError(
                                        "An error occured while firing trigger '"
                                                + trigger.getFullName() + "'", se);
                            } catch (RuntimeException e) {
                                getLog().error(
                                    "RuntimeException while firing trigger " +
                                    trigger.getFullName(), e);
                                // db connection must have failed... keep
                                // retrying until it's up...
                                releaseTriggerRetryLoop(trigger);
                            }
                        }
                        
                        // it's possible to get 'null' if the trigger was paused,
                        // blocked, or other similar occurrences that prevent it being
                        // fired at this time...  or if the scheduler was shutdown (halted)
                        if (bndle == null) {
                            try {
                                qsRsrcs.getJobStore().releaseAcquiredTrigger(ctxt,
                                        trigger);
                            } catch (SchedulerException se) {
                                qs.notifySchedulerListenersError(
                                        "An error occured while releasing trigger '"
                                                + trigger.getFullName() + "'", se);
                                // db connection must have failed... keep retrying
                                // until it's up...
                                releaseTriggerRetryLoop(trigger);
                            }
                            continue;
                        }

                        // TODO: improvements:
                        //
                        // 2- make sure we can get a job runshell before firing trigger, or
                        //   don't let that throw an exception (right now it never does,
                        //   but the signature says it can).
                        // 3- acquire more triggers at a time (based on num threads available?)


                        JobRunShell shell = null;
                        try {
                            shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell();
                            shell.initialize(qs, bndle);
                        } catch (SchedulerException se) {
                            try {
                                qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
                                        trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
                            } catch (SchedulerException se2) {
                                qs.notifySchedulerListenersError(
                                        "An error occured while placing job's triggers in error state '"
                                                + trigger.getFullName() + "'", se2);
                                // db connection must have failed... keep retrying
                                // until it's up...
                                errorTriggerRetryLoop(bndle);
                            }
                            continue;
                        }

                        if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                            try {
                                // this case should never happen, as it is indicative of the
                                // scheduler being shutdown or a bug in the thread pool or
                                // a thread pool being used concurrently - which the docs
                                // say not to do...
                                getLog().error("ThreadPool.runInThread() return false!");
                                qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
                                        trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
                            } catch (SchedulerException se2) {
                                qs.notifySchedulerListenersError(
                                        "An error occured while placing job's triggers in error state '"
                                                + trigger.getFullName() + "'", se2);
                                // db connection must have failed... keep retrying
                                // until it's up...
                                releaseTriggerRetryLoop(trigger);
                            }
                        }

                        continue;
                    }
                } else { // if(availTreadCount > 0)
                    continue; // should never happen, if threadPool.blockForAvailableThreads() follows contract
                }

                long now = System.currentTimeMillis();
                long waitTime = now + getRandomizedIdleWaitTime();
                long timeUntilContinue = waitTime - now;
                synchronized(sigLock) {
                	try {
						sigLock.wait(timeUntilContinue);
					} catch (InterruptedException ignore) {
					}
                }

            } catch(RuntimeException re) {
                getLog().error("Runtime error occured in main trigger firing loop.", re);
            }
        } // loop...

        // drop references to scheduler stuff to aid garbage collection...
        qs = null;
        qsRsrcs = null;
    }

    private boolean isCandidateNewTimeEarlierWithinReason(long oldTime) {
    	
		// So here's the deal: We know due to being signaled that 'the schedule'
		// has changed.  We may know (if getSignaledNextFireTime() != 0) the
		// new earliest fire time.  We may not (in which case we will assume
		// that the new time is earlier than the trigger we have acquired).
		// In either case, we only want to abandon our acquired trigger and
		// go looking for a new one if "it's worth it".  It's only worth it if
		// the time cost incurred to abandon the trigger and acquire a new one 
		// is less than the time until the currently acquired trigger will fire,
		// otherwise we're just "thrashing" the job store (e.g. database).
		//
		// So the question becomes when is it "worth it"?  This will depend on
		// the job store implementation (and of course the particular database
		// or whatever behind it).  Ideally we would depend on the job store 
		// implementation to tell us the amount of time in which it "thinks"
		// it can abandon the acquired trigger and acquire a new one.  However
		// we have no current facility for having it tell us that, so we make
		// a somewhat educated but arbitrary guess ;-).

    	synchronized(sigLock) {
			
			boolean earlier = false;
			
			if(getSignaledNextFireTime() == 0)
				earlier = true;
			else if(getSignaledNextFireTime() < oldTime )
				earlier = true;
			
			if(earlier) {
				// so the new time is considered earlier, but is it enough earlier?
				// le
				long diff = oldTime - System.currentTimeMillis();
				if(diff < (qsRsrcs.getJobStore().supportsPersistence() ? 80L : 7L))
					earlier = false;
			}
			
			clearSignaledSchedulingChange();
			
			return earlier;
        }
	}

	public void errorTriggerRetryLoop(TriggerFiredBundle bndle) {
        int retryCount = 0;
        try {
            while (!halted) {
                try {
                    Thread.sleep(getDbFailureRetryInterval()); // retry every N
                    // seconds (the db
                    // connection must
                    // be failed)
                    retryCount++;
                    qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
                            bndle.getTrigger(), bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
                    retryCount = 0;
                    break;
                } catch (JobPersistenceException jpe) {
                    if(retryCount % 4 == 0) {
                        qs.notifySchedulerListenersError(
                            "An error occured while releasing trigger '"
                                    + bndle.getTrigger().getFullName() + "'", jpe);
                    }
                } catch (RuntimeException e) {
                    getLog().error("releaseTriggerRetryLoop: RuntimeException "+e.getMessage(), e);
                } catch (InterruptedException e) {
                    getLog().error("releaseTriggerRetryLoop: InterruptedException "+e.getMessage(), e);
                }
            }
        } finally {
            if(retryCount == 0) {
                getLog().info("releaseTriggerRetryLoop: connection restored.");
            }
        }
    }
    
    public void releaseTriggerRetryLoop(Trigger trigger) {
        int retryCount = 0;
        try {
            while (!halted) {
                try {
                    Thread.sleep(getDbFailureRetryInterval()); // retry every N
                    // seconds (the db
                    // connection must
                    // be failed)
                    retryCount++;
                    qsRsrcs.getJobStore().releaseAcquiredTrigger(ctxt, trigger);
                    retryCount = 0;
                    break;
                } catch (JobPersistenceException jpe) {
                    if(retryCount % 4 == 0) {
                        qs.notifySchedulerListenersError(
                            "An error occured while releasing trigger '"
                                    + trigger.getFullName() + "'", jpe);
                    }
                } catch (RuntimeException e) {
                    getLog().error("releaseTriggerRetryLoop: RuntimeException "+e.getMessage(), e);
                } catch (InterruptedException e) {
                    getLog().error("releaseTriggerRetryLoop: InterruptedException "+e.getMessage(), e);
                }
            }
        } finally {
            if(retryCount == 0) {
                getLog().info("releaseTriggerRetryLoop: connection restored.");
            }
        }
    }
    
    public Log getLog() {
        return log;
    }

} // end of QuartzSchedulerThread

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -