📄 scheduler.cs
字号:
///''''''''''''''''''''''''''''''''''''''''''''''''
//Remove the object in the ScheduleInProgress collection
///''''''''''''''''''''''''''''''''''''''''''''''''
RemoveFromScheduleInProgress(scheduleHistoryItem);
///''''''''''''''''''''''''''''''''''''''''''''''''
//A SchedulerClient is notifying us that their
//process has completed. Decrease our ActiveThreadCount
///''''''''''''''''''''''''''''''''''''''''''''''''
Interlocked.Decrement(ref activeThreadCount);
///''''''''''''''''''''''''''''''''''''''''''''''''
//Update the schedule item object property
//to note the end time and next start
///''''''''''''''''''''''''''''''''''''''''''''''''
scheduleHistoryItem.EndDate = DateTime.Now;
if (scheduleHistoryItem.ScheduleSource == ScheduleSource.STARTED_FROM_EVENT)
{
scheduleHistoryItem.NextStart = Null.NullDate;
}
else
{
if (scheduleHistoryItem.CatchUpEnabled)
{
switch (scheduleHistoryItem.TimeLapseMeasurement)
{
case "s":
scheduleHistoryItem.NextStart = scheduleHistoryItem.NextStart.AddSeconds(scheduleHistoryItem.TimeLapse);
break;
case "m":
scheduleHistoryItem.NextStart = scheduleHistoryItem.NextStart.AddMinutes(scheduleHistoryItem.TimeLapse);
break;
case "h":
scheduleHistoryItem.NextStart = scheduleHistoryItem.NextStart.AddHours(scheduleHistoryItem.TimeLapse);
break;
case "d":
scheduleHistoryItem.NextStart = scheduleHistoryItem.NextStart.AddDays(scheduleHistoryItem.TimeLapse);
break;
}
}
else
{
switch (scheduleHistoryItem.TimeLapseMeasurement)
{
case "s":
scheduleHistoryItem.NextStart = scheduleHistoryItem.StartDate.AddSeconds(scheduleHistoryItem.TimeLapse);
break;
case "m":
scheduleHistoryItem.NextStart = scheduleHistoryItem.StartDate.AddMinutes(scheduleHistoryItem.TimeLapse);
break;
case "h":
scheduleHistoryItem.NextStart = scheduleHistoryItem.StartDate.AddHours(scheduleHistoryItem.TimeLapse);
break;
case "d":
scheduleHistoryItem.NextStart = scheduleHistoryItem.StartDate.AddDays(scheduleHistoryItem.TimeLapse);
break;
}
}
}
///''''''''''''''''''''''''''''''''''''''''''''''''
//Update the ScheduleHistory in the database
///''''''''''''''''''''''''''''''''''''''''''''''''
CoreScheduler.UpdateScheduleHistory(scheduleHistoryItem);
LogInfo eventLogInfo = new LogInfo();
if (scheduleHistoryItem.NextStart != Null.NullDate)
{
///''''''''''''''''''''''''''''''''''''''''''''''''
//Put the object back into the ScheduleQueue
//collection with the new NextStart date.
///''''''''''''''''''''''''''''''''''''''''''''''''
scheduleHistoryItem.StartDate = Null.NullDate;
scheduleHistoryItem.EndDate = Null.NullDate;
scheduleHistoryItem.LogNotes = "";
scheduleHistoryItem.ProcessGroup = - 1;
AddToScheduleQueue(scheduleHistoryItem);
}
if (schedulerClient.ScheduleHistoryItem.RetainHistoryNum > 0)
{
///''''''''''''''''''''''''''''''''''''''''''''''''
//Write out the log entry for this event
///''''''''''''''''''''''''''''''''''''''''''''''''
EventLogController eventLog = new EventLogController();
eventLogInfo.AddProperty("TYPE", schedulerClient.GetType().FullName);
eventLogInfo.AddProperty("THREAD ID", Thread.CurrentThread.GetHashCode().ToString());
eventLogInfo.AddProperty("NEXT START", Convert.ToString(scheduleHistoryItem.NextStart));
eventLogInfo.AddProperty("SOURCE", schedulerClient.ScheduleHistoryItem.ScheduleSource.ToString());
eventLogInfo.AddProperty("ACTIVE THREADS", activeThreadCount.ToString());
eventLogInfo.AddProperty("FREE THREADS", FreeThreads.ToString());
eventLogInfo.AddProperty("READER TIMEOUTS", readerTimeouts.ToString());
eventLogInfo.AddProperty("WRITER TIMEOUTS", writerTimeouts.ToString());
eventLogInfo.AddProperty("IN PROGRESS", GetScheduleInProgressCount().ToString());
eventLogInfo.AddProperty("IN QUEUE", GetScheduleQueueCount().ToString());
eventLogInfo.LogTypeKey = "SCHEDULER_EVENT_COMPLETED";
eventLog.AddLog(eventLogInfo);
}
}
catch (Exception exception)
{
Exceptions.Exceptions.ProcessSchedulerException(exception);
}
}
static public void WorkErrored (SchedulerClient schedulerClient, Exception exception)
{
try
{
ScheduleHistoryItem scheduleHistoryItem;
scheduleHistoryItem = schedulerClient.ScheduleHistoryItem;
///'''''''''''''''''''''''''''''''''''''''''''''''
//Remove the object in the ScheduleInProgress collection
///'''''''''''''''''''''''''''''''''''''''''''''''
RemoveFromScheduleInProgress(scheduleHistoryItem);
///''''''''''''''''''''''''''''''''''''''''''''''''
//A SchedulerClient is notifying us that their
//process has errored. Decrease our ActiveThreadCount
///''''''''''''''''''''''''''''''''''''''''''''''''
Interlocked.Decrement(ref activeThreadCount);
Exceptions.Exceptions.ProcessSchedulerException(exception);
///'''''''''''''''''''''''''''''''''''''''''''''''
//Update the schedule item object property
//to note the end time and next start
///'''''''''''''''''''''''''''''''''''''''''''''''
scheduleHistoryItem.EndDate = DateTime.Now;
if (scheduleHistoryItem.ScheduleSource == ScheduleSource.STARTED_FROM_EVENT)
{
scheduleHistoryItem.NextStart = Null.NullDate;
}
else if (scheduleHistoryItem.RetryTimeLapse != Null.NullInteger)
{
switch (scheduleHistoryItem.RetryTimeLapseMeasurement)
{
case "s":
scheduleHistoryItem.NextStart = scheduleHistoryItem.StartDate.AddSeconds(scheduleHistoryItem.RetryTimeLapse);
break;
case "m":
scheduleHistoryItem.NextStart = scheduleHistoryItem.StartDate.AddMinutes(scheduleHistoryItem.RetryTimeLapse);
break;
case "h":
scheduleHistoryItem.NextStart = scheduleHistoryItem.StartDate.AddHours(scheduleHistoryItem.RetryTimeLapse);
break;
case "d":
scheduleHistoryItem.NextStart = scheduleHistoryItem.StartDate.AddDays(scheduleHistoryItem.RetryTimeLapse);
break;
}
}
///''''''''''''''''''''''''''''''''''''''''''''''''
//Update the ScheduleHistory in the database
///''''''''''''''''''''''''''''''''''''''''''''''''
CoreScheduler.UpdateScheduleHistory(scheduleHistoryItem);
if (scheduleHistoryItem.NextStart != Null.NullDate && scheduleHistoryItem.RetryTimeLapse != Null.NullInteger)
{
///''''''''''''''''''''''''''''''''''''''''''''''''
//Put the object back into the ScheduleQueue
//collection with the new NextStart date.
///''''''''''''''''''''''''''''''''''''''''''''''''
scheduleHistoryItem.StartDate = Null.NullDate;
scheduleHistoryItem.EndDate = Null.NullDate;
scheduleHistoryItem.LogNotes = "";
scheduleHistoryItem.ProcessGroup = - 1;
AddToScheduleQueue(scheduleHistoryItem);
}
if (schedulerClient.ScheduleHistoryItem.RetainHistoryNum > 0)
{
///''''''''''''''''''''''''''''''''''''''''''''''''
//Write out the log entry for this event
///''''''''''''''''''''''''''''''''''''''''''''''''
EventLogController eventLog = new EventLogController();
LogInfo eventLogInfo = new LogInfo();
eventLogInfo.AddProperty("THREAD ID", Thread.CurrentThread.GetHashCode().ToString());
eventLogInfo.AddProperty("TYPE", schedulerClient.GetType().FullName);
if (exception != null)
{
eventLogInfo.AddProperty("EXCEPTION", exception.Message);
}
eventLogInfo.AddProperty("RESCHEDULED FOR", Convert.ToString(scheduleHistoryItem.NextStart));
eventLogInfo.AddProperty("SOURCE", schedulerClient.ScheduleHistoryItem.ScheduleSource.ToString());
eventLogInfo.AddProperty("ACTIVE THREADS", activeThreadCount.ToString());
eventLogInfo.AddProperty("FREE THREADS", FreeThreads.ToString());
eventLogInfo.AddProperty("READER TIMEOUTS", readerTimeouts.ToString());
eventLogInfo.AddProperty("WRITER TIMEOUTS", writerTimeouts.ToString());
eventLogInfo.AddProperty("IN PROGRESS", GetScheduleInProgressCount().ToString());
eventLogInfo.AddProperty("IN QUEUE", GetScheduleQueueCount().ToString());
eventLogInfo.LogTypeKey = "SCHEDULER_EVENT_FAILURE";
eventLog.AddLog(eventLogInfo);
}
}
catch (Exception currentException)
{
Exceptions.Exceptions.ProcessSchedulerException(currentException);
}
}
public static void PurgeScheduleHistory ()
{
SchedulingController schedulingController = new SchedulingController();
schedulingController.PurgeScheduleHistory();
}
public static void RunEventSchedule (EventName eventName)
{
try
{
EventLogController eventLog = new EventLogController();
LogInfo eventLogInfo = new LogInfo();
eventLogInfo.AddProperty("EVENT", eventName.ToString());
eventLogInfo.LogTypeKey = "SCHEDULE_FIRED_FROM_EVENT";
eventLog.AddLog(eventLogInfo);
///''''''''''''''''''''''''''''''''''''''''''''''''
//We allow for three threads to run simultaneously.
//As long as we have an open thread, continue.
///''''''''''''''''''''''''''''''''''''''''''''''''
///''''''''''''''''''''''''''''''''''''''''''''''''
//Load the queue to determine which schedule
//items need to be run.
///''''''''''''''''''''''''''''''''''''''''''''''''
LoadQueueFromEvent(eventName);
while (GetScheduleQueueCount() > 0)
{
SetScheduleStatus(ScheduleStatus.RUNNING_EVENT_SCHEDULE);
///''''''''''''''''''''''''''''''''''''''''''''''''
//Fire off the events that need running.
///''''''''''''''''''''''''''''''''''''''''''''''''
try
{
queueReadWriteLock.AcquireReaderLock(readTimeout);
try
{
// It is safe for this thread to read from
// the shared resource.
if (GetScheduleQueueCount() > 0)
{
//FireEvents(False)
FireEvents(true);
}
Interlocked.Increment(ref reads);
}
finally
{
// Ensure that the lock is released.
queueReadWriteLock.ReleaseReaderLock();
}
}
catch (ApplicationException)
{
// The reader lock request timed out.
Interlocked.Increment(ref readerTimeouts);
}
if (writerTimeouts > 20 || readerTimeouts > 20)
{
///''''''''''''''''''''''''''''''''''''''''''''''''
//Wait for 10 minutes so we don't fill up the logs
///''''''''''''''''''''''''''''''''''''''''''''''''
Thread.Sleep(600000); //sleep for 10 seconds
}
else
{
///''''''''''''''''''''''''''''''''''''''''''''''''
//Wait for 10 seconds to avoid cpu overutilization
///''''''''''''''''''''''''''''''''''''''''''''''''
Thread.Sleep(10000); //sleep for 10 seconds
}
if (GetScheduleQueueCount() == 0)
{
return;
}
}
}
catch (Exception exception)
{
Exceptions.Exceptions.ProcessSchedulerException(exception);
}
}
public static void Start ()
{
try
{
activeThreadCount = 0;
///''''''''''''''''''''''''''''''''''''''''''''''''
//This is where the action begins.
//Loop until KeepRunning = false
///''''''''''''''''''''''''''''''''''''''''''''''''
if (Services.Scheduling.SchedulingProvider.SchedulerMode != SchedulerMode.REQUEST_METHOD || debug)
{
EventLogController eventLog = new EventLogController();
LogInfo eventLogInfo = new LogInfo();
eventLogInfo.LogTypeKey = "SCHEDULER_STARTED";
eventLog.AddLog(eventLogInfo);
}
while (KeepRunning)
{
try
{
if (Scheduling.SchedulingProvider.SchedulerMode == SchedulerMode.TIMER_METHOD)
{
SetScheduleStatus(ScheduleStatus.RUNNING_TIMER_SCHEDULE);
}
else
{
SetScheduleStatus(ScheduleStatus.RUNNING_REQUEST_SCHEDULE);
}
///''''''''''''''''''''''''''''''''''''''''''''''''
//Load the queue to determine which schedule
//items need to be run.
///''''''''''''''''''''''''''''''''''''''''''''''''
LoadQueueFromTimer();
///''''''''''''''''''''''''''''''''''''''''''''''''
//Keep track of when the queue was last refreshed
//so we can perform a refresh periodically
///''''''''''''''''''''''''''''''''''''''''''''''''
DateTime LastQueueRefresh = DateTime.Now;
bool refreshQueueSchedule = false;
///''''''''''''''''''''''''''''''''''''''''''''''''
//We allow for [MaxThreadCount] threads to run
//simultaneously. As long as we have an open thread
//and we don't have to refresh the queue, continue
//to loop.
///''''''''''''''''''''''''''''''''''''''''''''''''
while (FreeThreads > 0 && !refreshQueueSchedule && KeepRunning && !forceReloadSchedule)
{
///''''''''''''''''''''''''''''''''''''''''''''''''
//Fire off the events that need running.
///''''''''''''''''''''''''''''''''''''''''''''''''
try
{
if (Scheduling.SchedulingProvider.SchedulerMode == SchedulerMode.TIMER_METHOD)
{
SetScheduleStatus(ScheduleStatus.RUNNING_TIMER_SCHEDULE);
}
else
{
SetScheduleStatus(ScheduleStatus.RUNNING_REQUEST_SCHEDULE);
}
queueReadWriteLock.AcquireReaderLock(readTimeout);
try
{
// It is safe for this thread to read from
// the shared resource.
if (GetScheduleQueueCount() > 0)
{
FireEvents(true);
}
if (!KeepThreadAlive)
{
return;
}
Interlocked.Increment(ref reads);
}
finally
{
// Ensure that the lock is released.
queueReadWriteLock.ReleaseReaderLock();
}
}
catch (ApplicationException)
{
// The reader lock request timed out.
Interlocked.Increment(ref readerTimeouts);
}
if (writerTimeouts > 20 || readerTimeouts > 20)
{
///''''''''''''''''''''''''''''''''''''''''''''''''
//Some kind of deadlock on a resource.
//Wait for 10 minutes so we don't fill up the logs
///''''''''''''''''''''''''''''''''''''''''''''''''
if (KeepRunning)
{
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -