📄 quartzschedulerthread.java
字号:
now = System.currentTimeMillis(); long triggerTime = trigger.getNextFireTime().getTime(); long timeUntilTrigger = triggerTime - now; long spinInterval = 10; // this looping may seem a bit silly, but it's the // current work-around // for a dead-lock that can occur if the Thread.sleep() // is replaced with // a obj.wait() that gets notified when the signal is // set... // so to be able to detect the signal change without // sleeping the entire // timeUntilTrigger, we spin here... don't worry // though, this spinning // doesn't even register 0.2% cpu usage on a pentium 4. int numPauses = (int) (timeUntilTrigger / spinInterval); while (numPauses >= 0 && !signaled) { try { Thread.sleep(spinInterval); } catch (InterruptedException ignore) { } now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; numPauses = (int) (timeUntilTrigger / spinInterval); } if (signaled) { try { qsRsrcs.getJobStore().releaseAcquiredTrigger( ctxt, trigger); } catch (JobPersistenceException jpe) { qs.notifySchedulerListenersError( "An error occured while releasing trigger '" + trigger.getFullName() + "'", jpe); // db connection must have failed... keep // retrying until it's up... releaseTriggerRetryLoop(trigger); } catch (RuntimeException e) { getLog().error( "releaseTriggerRetryLoop: RuntimeException " +e.getMessage(), e); // db connection must have failed... keep // retrying until it's up... releaseTriggerRetryLoop(trigger); } signaled = false; continue; } // set trigger to 'executing' TriggerFiredBundle bndle = null; synchronized(pauseLock) { if(!halted) { try { bndle = qsRsrcs.getJobStore().triggerFired(ctxt, trigger); } catch (SchedulerException se) { qs.notifySchedulerListenersError( "An error occured while firing trigger '" + trigger.getFullName() + "'", se); } catch (RuntimeException e) { getLog().error( "RuntimeException while firing trigger " + trigger.getFullName(), e); // db connection must have failed... keep // retrying until it's up... releaseTriggerRetryLoop(trigger); } } // it's possible to get 'null' if the trigger was paused, // blocked, or other similar occurances that prevent it being // fired at this time... or if the scheduler was shutdown (halted) if (bndle == null) { try { qsRsrcs.getJobStore().releaseAcquiredTrigger(ctxt, trigger); } catch (SchedulerException se) { qs.notifySchedulerListenersError( "An error occured while releasing trigger '" + trigger.getFullName() + "'", se); // db connection must have failed... keep retrying // until it's up... releaseTriggerRetryLoop(trigger); } continue; } // TODO: improvements: // // 2- make sure we can get a job runshell before firing trigger, or // don't let that throw an exception (right now it never does, // but the signature says it can). // 3- acquire more triggers at a time (based on num threads available?) JobRunShell shell = null; try { shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell(); shell.initialize(qs, bndle); } catch (SchedulerException se) { try { qsRsrcs.getJobStore().triggeredJobComplete(ctxt, trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR); } catch (SchedulerException se2) { qs.notifySchedulerListenersError( "An error occured while placing job's triggers in error state '" + trigger.getFullName() + "'", se2); // db connection must have failed... keep retrying // until it's up... errorTriggerRetryLoop(bndle); } continue; } if (qsRsrcs.getThreadPool().runInThread(shell) == false) { try { // this case should never happen, as it is indicative of the // scheduler being shutdown or a bug in the thread pool or // a thread pool being used concurrently - which the docs // say not to do... getLog().error("ThreadPool.runInThread() return false!"); qsRsrcs.getJobStore().triggeredJobComplete(ctxt, trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR); } catch (SchedulerException se2) { qs.notifySchedulerListenersError( "An error occured while placing job's triggers in error state '" + trigger.getFullName() + "'", se2); // db connection must have failed... keep retrying // until it's up... releaseTriggerRetryLoop(trigger); } } } continue; } } else { // if(availTreadCount > 0) continue; // should never happen, if threadPool.blockForAvailableThreads() follows contract } // this looping may seem a bit silly, but it's the current // work-around // for a dead-lock that can occur if the Thread.sleep() is replaced // with // a obj.wait() that gets notified when the signal is set... // so to be able to detect the signal change without sleeping the // entier // getRandomizedIdleWaitTime(), we spin here... don't worry though, // the // CPU usage of this spinning can't even be measured on a pentium // 4. long now = System.currentTimeMillis(); long waitTime = now + getRandomizedIdleWaitTime(); long timeUntilContinue = waitTime - now; long spinInterval = 10; int numPauses = (int) (timeUntilContinue / spinInterval); while (numPauses > 0 && !signaled) { try { Thread.sleep(10L); } catch (InterruptedException ignore) { } now = System.currentTimeMillis(); timeUntilContinue = waitTime - now; numPauses = (int) (timeUntilContinue / spinInterval); } } catch(RuntimeException re) { getLog().error("Runtime error occured in main trigger firing loop.", re); } } // loop... // drop references to scheduler stuff to aid garbage collection... qs = null; qsRsrcs = null; } public void errorTriggerRetryLoop(TriggerFiredBundle bndle) { int retryCount = 0; try { while (!halted) { try { Thread.sleep(getDbFailureRetryInterval()); // retry every N // seconds (the db // connection must // be failed) retryCount++; qsRsrcs.getJobStore().triggeredJobComplete(ctxt, bndle.getTrigger(), bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR); retryCount = 0; break; } catch (JobPersistenceException jpe) { if(retryCount % 4 == 0) { qs.notifySchedulerListenersError( "An error occured while releasing trigger '" + bndle.getTrigger().getFullName() + "'", jpe); } } catch (RuntimeException e) { getLog().error("releaseTriggerRetryLoop: RuntimeException "+e.getMessage(), e); } catch (InterruptedException e) { getLog().error("releaseTriggerRetryLoop: InterruptedException "+e.getMessage(), e); } } } finally { if(retryCount == 0) { getLog().info("releaseTriggerRetryLoop: connection restored."); } } } public void releaseTriggerRetryLoop(Trigger trigger) { int retryCount = 0; try { while (!halted) { try { Thread.sleep(getDbFailureRetryInterval()); // retry every N // seconds (the db // connection must // be failed) retryCount++; qsRsrcs.getJobStore().releaseAcquiredTrigger(ctxt, trigger); retryCount = 0; break; } catch (JobPersistenceException jpe) { if(retryCount % 4 == 0) { qs.notifySchedulerListenersError( "An error occured while releasing trigger '" + trigger.getFullName() + "'", jpe); } } catch (RuntimeException e) { getLog().error("releaseTriggerRetryLoop: RuntimeException "+e.getMessage(), e); } catch (InterruptedException e) { getLog().error("releaseTriggerRetryLoop: InterruptedException "+e.getMessage(), e); } } } finally { if(retryCount == 0) { getLog().info("releaseTriggerRetryLoop: connection restored."); } } } public Log getLog() { return log; }} // end of QuartzSchedulerThread
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -