📄 scheduler.cs
字号:
Thread.Sleep(600000); //sleep for 10 minutes
}
else
{
return;
}
}
else
{
///''''''''''''''''''''''''''''''''''''''''''''''''
//Wait for 10 seconds to avoid cpu overutilization
///''''''''''''''''''''''''''''''''''''''''''''''''
if (KeepRunning)
{
Thread.Sleep(10000); //sleep for 10 seconds
}
else
{
return;
}
//Refresh queue from database every 10 minutes
//if there are no items currently in progress
if ((LastQueueRefresh.AddMinutes(10) <= DateTime.Now || forceReloadSchedule) && FreeThreads == maxThreadCount)
{
refreshQueueSchedule = true;
break;
}
}
}
///''''''''''''''''''''''''''''''''''''''''''''''''
//There are no available threads, all threads are being
//used. Wait 10 seconds until one is available
///''''''''''''''''''''''''''''''''''''''''''''''''
if (KeepRunning)
{
if (!refreshQueueSchedule)
{
SetScheduleStatus(ScheduleStatus.WAITING_FOR_OPEN_THREAD);
Thread.Sleep(10000); //sleep for 10 seconds
}
}
else
{
return;
}
}
catch (Exception exception)
{
Exceptions.Exceptions.ProcessSchedulerException(exception);
///''''''''''''''''''''''''''''''''''''''''''''''''
//sleep for 10 minutes
///''''''''''''''''''''''''''''''''''''''''''''''''
Thread.Sleep(600000);
}
}
}
finally
{
if (Scheduling.SchedulingProvider.SchedulerMode == SchedulerMode.TIMER_METHOD || Scheduling.SchedulingProvider.SchedulerMode == SchedulerMode.DISABLED)
{
SetScheduleStatus(ScheduleStatus.STOPPED);
}
else
{
SetScheduleStatus(ScheduleStatus.WAITING_FOR_REQUEST);
}
if (Services.Scheduling.SchedulingProvider.SchedulerMode != SchedulerMode.REQUEST_METHOD || debug)
{
EventLogController eventLog = new EventLogController();
LogInfo eventLogInfo = new LogInfo();
eventLogInfo.LogTypeKey = "SCHEDULER_STOPPED";
eventLog.AddLog(eventLogInfo);
}
}
}
public static void FireEvents (bool asynchronous)
{
///''''''''''''''''''''''''''''''''''''''''''''''''
//This method uses a thread pool to
//call the SchedulerClient methods that need
//to be called.
///''''''''''''''''''''''''''''''''''''''''''''''''
///''''''''''''''''''''''''''''''''''''''''''''''''
//For each item in the queue that there
//is an open thread for, set the object
//in the array to a new ProcessGroup object.
//Pass in the ScheduleItem to the ProcessGroup
//so the ProcessGroup can pass it around for
//logging and notifications.
///''''''''''''''''''''''''''''''''''''''''''''''''
int intScheduleQueueCount = GetScheduleQueueCount();
int numToRun = intScheduleQueueCount;
int numRun = 0;
//If numToRun > FreeThreads Then
// numToRun = FreeThreads
//End If
for (int i = 0; i < intScheduleQueueCount; i++)
{
if (!KeepRunning)
{
return;
}
int processGroup = GetProcessGroup();
ScheduleItem scheduleItem = ((ScheduleItem) scheduleQueue[i]);
if (scheduleItem.NextStart <= DateTime.Now && scheduleItem.Enabled && ! IsInProgress(scheduleItem) && ! HasDependenciesConflict(scheduleItem) && numRun < numToRun)
{
scheduleItem.ProcessGroup = processGroup;
if (Scheduling.SchedulingProvider.SchedulerMode == SchedulerMode.TIMER_METHOD)
{
scheduleItem.ScheduleSource = ScheduleSource.STARTED_FROM_TIMER;
}
else if (Scheduling.SchedulingProvider.SchedulerMode == SchedulerMode.REQUEST_METHOD)
{
scheduleItem.ScheduleSource = ScheduleSource.STARTED_FROM_BEGIN_REQUEST;
}
if (asynchronous)
{
arrProcessGroup[processGroup].AddQueueUserWorkItem(scheduleItem);
}
else
{
arrProcessGroup[processGroup].RunSingleTask(scheduleItem);
}
if (debug)
{
EventLogController eventLog = new EventLogController();
LogInfo eventLogInfo = new LogInfo();
eventLogInfo.AddProperty("EVENT ADDED TO PROCESS GROUP " + scheduleItem.ProcessGroup.ToString(), scheduleItem.TypeFullName);
eventLogInfo.AddProperty("SCHEDULE ID", scheduleItem.ScheduleID.ToString());
eventLogInfo.LogTypeKey = "DEBUG";
eventLog.AddLog(eventLogInfo);
}
numRun ++;
}
else
{
if (debug)
{
bool appended = false;
System.Text.StringBuilder strDebug = new System.Text.StringBuilder("Task not run because ");
if (scheduleItem.NextStart > DateTime.Now)
{
strDebug.Append(" task is scheduled for " + scheduleItem.NextStart.ToString());
appended = true;
}
//If Not (objScheduleItem.NextStart <> Null.NullDate And objScheduleItem.ScheduleSource <> ScheduleSource.STARTED_FROM_EVENT) Then
// If appended Then strDebug.Append(" and")
// strDebug.Append(" task's NextStart <> NullDate and it's wasn't started from an EVENT")
// appended = True
//End If
if (! scheduleItem.Enabled)
{
if (appended)
{
strDebug.Append(" and");
}
strDebug.Append(" task is not enabled");
appended = true;
}
if (IsInProgress(scheduleItem))
{
if (appended)
{
strDebug.Append(" and");
}
strDebug.Append(" task is already in progress");
appended = true;
}
if (HasDependenciesConflict(scheduleItem))
{
if (appended)
{
strDebug.Append(" and");
}
strDebug.Append(" task has conflicting dependency");
appended = true;
}
EventLogController eventLog = new EventLogController();
LogInfo eventLogInfo = new LogInfo();
eventLogInfo.AddProperty("EVENT NOT RUN REASON", strDebug.ToString());
eventLogInfo.AddProperty("SCHEDULE ID", scheduleItem.ScheduleID.ToString());
eventLogInfo.AddProperty("TYPE FULL NAME", scheduleItem.TypeFullName);
eventLogInfo.LogTypeKey = "DEBUG";
eventLog.AddLog(eventLogInfo);
}
}
}
}
static public void LoadQueueFromTimer ()
{
forceReloadSchedule = false;
SchedulingController schedulingController = new SchedulingController();
ArrayList list = schedulingController.GetSchedule();
foreach (ScheduleHistoryItem scheduleHistoryItem in list)
{
if (! IsInQueue(scheduleHistoryItem) && scheduleHistoryItem.TimeLapse != Null.NullInteger &&
scheduleHistoryItem.TimeLapseMeasurement != Null.NullString && scheduleHistoryItem.Enabled)
{
if (Scheduling.SchedulingProvider.SchedulerMode == SchedulerMode.TIMER_METHOD)
{
scheduleHistoryItem.ScheduleSource = ScheduleSource.STARTED_FROM_TIMER;
}
else if (Scheduling.SchedulingProvider.SchedulerMode == SchedulerMode.REQUEST_METHOD)
{
scheduleHistoryItem.ScheduleSource = ScheduleSource.STARTED_FROM_BEGIN_REQUEST;
}
AddToScheduleQueue(scheduleHistoryItem);
}
}
}
static public void LoadQueueFromEvent (EventName eventName)
{
SchedulingController schedulingController = new SchedulingController();
ArrayList list = schedulingController.GetScheduleByEvent(eventName.ToString(), Globals.ServerName);
foreach (ScheduleHistoryItem scheduleHistoryItem in list)
{
if (! IsInQueue(scheduleHistoryItem) && ! IsInProgress(scheduleHistoryItem) &&
! HasDependenciesConflict(scheduleHistoryItem) && scheduleHistoryItem.Enabled)
{
scheduleHistoryItem.ScheduleSource = ScheduleSource.STARTED_FROM_EVENT;
AddToScheduleQueue(scheduleHistoryItem);
}
}
}
private static int GetProcessGroup()
{
//return a random process group
Random r = new Random();
return r.Next(0, numberOfProcessGroups - 1);
}
internal static bool IsInQueue(ScheduleItem scheduleItem)
{
bool returnValue = false;
try
{
queueReadWriteLock.AcquireReaderLock(readTimeout);
try
{
// It is safe for this thread to read from
// the shared resource.
if (GetScheduleQueueCount() > 0)
{
foreach (ScheduleItem currentScheduleItem in ScheduleQueue)
{
if (scheduleItem.ScheduleID == currentScheduleItem.ScheduleID)
{
returnValue = true;
}
}
}
Interlocked.Increment(ref reads);
}
finally
{
// Ensure that the lock is released.
queueReadWriteLock.ReleaseReaderLock();
}
}
catch (ApplicationException)
{
// The reader lock request timed out.
Interlocked.Increment(ref readerTimeouts);
}
return returnValue;
}
private static bool IsInProgress(ScheduleItem scheduleItem)
{
bool returnValue = false;
try
{
inProgressReadWriteLock.AcquireReaderLock(readTimeout);
try
{
// It is safe for this thread to read from
// the shared resource.
if (ScheduleInProgress.Count > 0)
{
foreach (ScheduleItem currentScheduleItem in ScheduleInProgress)
{
if (scheduleItem.ScheduleID == currentScheduleItem.ScheduleID)
{
returnValue = true;
}
}
}
Interlocked.Increment(ref reads);
}
finally
{
// Ensure that the lock is released.
inProgressReadWriteLock.ReleaseReaderLock();
}
}
catch (ApplicationException)
{
// The reader lock request timed out.
Interlocked.Increment(ref readerTimeouts);
}
return returnValue;
}
public static bool HasDependenciesConflict(ScheduleItem scheduleItem)
{
bool returnValue = false;
try
{
inProgressReadWriteLock.AcquireReaderLock(readTimeout);
try
{
// It is safe for this thread to read from
// the shared resource.
if (ScheduleInProgress != null && scheduleItem.ObjectDependencies.Length > 0)
{
foreach(ScheduleItem currentScheduleItem in ScheduleInProgress)
{
if (currentScheduleItem.ObjectDependencies.Length > 0)
{
if (currentScheduleItem.HasObjectDependencies(scheduleItem.ObjectDependencies))
{
returnValue = true;
}
}
}
}
Interlocked.Increment(ref reads);
}
finally
{
// Ensure that the lock is released.
inProgressReadWriteLock.ReleaseReaderLock();
}
}
catch (ApplicationException)
{
// The reader lock request timed out.
Interlocked.Increment(ref readerTimeouts);
}
return returnValue;
}
public static ScheduleHistoryItem AddScheduleHistory(ScheduleHistoryItem scheduleHistoryItem)
{
try
{
SchedulingController schedulingController = new SchedulingController();
int scheduleHistoryID;
scheduleHistoryID = schedulingController.AddScheduleHistory(scheduleHistoryItem);
scheduleHistoryItem.ScheduleHistoryID = scheduleHistoryID;
}
catch (Exception exception)
{
Exceptions.Exceptions.ProcessSchedulerException(exception);
}
return scheduleHistoryItem;
}
public static void UpdateScheduleHistory (ScheduleHistoryItem scheduleHistoryItem)
{
try
{
SchedulingController schedulingController = new SchedulingController();
schedulingController.UpdateScheduleHistory(scheduleHistoryItem);
}
catch (Exception exception)
{
Exceptions.Exceptions.ProcessSchedulerException(exception);
}
}
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -