📄 fifo_byqueue.basl
字号:
if( (ncpus_req GE 0 AND ncpus_left GE 0) AND (ncpus_req GT ncpus_left) ) { dt = datetimeGet(); print(dt); msg = "[" + JobIdGet(j) + "]: "; print(msg); print("# of cpus required: "); print(ncpus_req); print("> # of cpus left: "); print(ncpus_left); return(FALSE); } return(TRUE);}// runJob: runs job j depending if resAvail flag is set to TRUE or not.// returns SUCCESS or if job ran, or FAIL if not.Int runJob(Job j, Int resAvail) { Int retcode; String jid; switch(resAvail) { case TRUE: { retcode = JobAction(j, SYNCRUN, NULLSTR); if( retcode EQ FAIL ) { printmsg(j, "SYNCRUN of job failed!"); } else { printmsg(j, "SYNCRUN job."); return(SUCCESS); } } case FALSE: { retcode = comment_update(j, "not enough resources available"); if( retcode EQ FAIL ) { printmsg(j, "MODIFYATTR of job's comment field failed!"); } else { printmsg(j, "cannot run job: resource temporarily unavailable"); } } case -1: { jid = JobIdGet(j); retcode = JobAction(j, DELETE, "Resource request unsatisfiable!"); if( retcode EQ SUCCESS ) { printmsg2(jid, "deleted. Resource request unsatisfiable!"); } else { printmsg2(jid, "deletion failed!"); } } } return(FAIL);}Void setSchedPeriod(){ DateTime current_time; current_time = datetimeGet(); switch(current_time) { case in PRIMETIME: { PRIME_PERIOD = TRUE; } case in NON_PRIMETIME: { PRIME_PERIOD = FALSE; } }}Void perform_queue_sort(Server s){ Set Que queues; queues = ServerQueuesGet(s); if( (PRIME_PERIOD AND PRIMETIME_SORTQUE) OR (!PRIME_PERIOD AND NON_PRIMETIME_SORTQUE) ) { Sort(queues, QuePriorityGet, DESC); }}Int JobCputGet( Job job ) { Int cput; cput = JobIntResReqGet(job, "cput"); return(cput);}Int JobWalltimeGet( Job job ) { Int wallt; wallt = JobIntResReqGet(job, "walltime"); return(wallt);}Size JobMemGet( Job job ) { Size mem; mem = JobSizeResReqGet(job, "mem"); return(mem);}Void sortJobs(Set Job jobs, String sortMethod) { switch(sortMethod) { case "shortest_job_first": { Sort(jobs, JobCputGet, ASC); } case "longest_job_first": { Sort(jobs, JobCputGet, DESC); } case "smallest_memory_first": { Sort(jobs, JobMemGet, ASC); } case "largest_memory_first": { Sort(jobs, JobMemGet, DESC); } case "high_priority_first": { Sort(jobs, JobPriorityGet, DESC); } case "low_priority_first": { Sort(jobs, JobPriorityGet, ASC); } case "large_walltime_first": { Sort(jobs, JobWalltimeGet, DESC); } case "short_walltime_first": { Sort(jobs, JobWalltimeGet, ASC); } }}Void perform_job_sort(Server s){ Que q; Set Que ques; Set Job jobs; if( PRIME_PERIOD ) { ques = ServerQueuesGet(s); foreach( q in ques ) { jobs = QueJobsGet(q); sortJobs(jobs, PRIMETIME_SORTJOB_METHOD); } } else { ques = ServerQueuesGet(s); foreach( q in ques ) { jobs = QueJobsGet(q); sortJobs(jobs, NON_PRIMETIME_SORTJOB_METHOD); } }}Int satisfy_constraint(Job j, Server s, Que q, Int nrun, Int q_nrun, Job stj){ String euser; String egroup; Int nrun_per_user; Int nrun_per_group; Int q_nrun_per_user; Int q_nrun_per_group; String jobid; String hostname; String qname; String buf; Int cond_s; Int cond_d; Int cond_qru; Int cond_qrg; Int cond_qr; Int cond_ru; Int cond_rg; Int cond_r; cond_s = TRUE; cond_d = TRUE; cond_qru = TRUE; cond_qrg = TRUE; cond_qr = TRUE; cond_ru = TRUE; cond_rg = TRUE; cond_r = TRUE; jobid = JobIdGet(j); hostname = ServerInetAddrGet(s); qname = QueNameGet(q); euser = JobEffectiveUserNameGet(j); egroup = JobEffectiveGroupNameGet(j); if(MAXRUN LE 0 OR MAXRUN GT nrun) { nrun_per_group = numRunningPerGroup(s, egroup); if(MAXRUN_PERGRP LE 0 OR MAXRUN_PERGRP GT nrun_per_group) { nrun_per_user = numRunningPerUser(s, euser); if(MAXRUN_PERUSER LE 0 OR MAXRUN_PERUSER GT nrun_per_user) { if(QUE_MAXRUN LE 0 OR QUE_MAXRUN GT q_nrun) { q_nrun_per_group = numRunningInQuePerGroup(q, egroup); if(QUE_MAXRUN_PERGRP LE 0 OR QUE_MAXRUN_PERGRP GT q_nrun_per_group) { q_nrun_per_user = numRunningInQuePerUser(q, euser); if(QUE_MAXRUN_PERUSER LE 0 OR QUE_MAXRUN_PERUSER GT q_nrun_per_user) { if(!crossDedTimeBoundary(j)) { if(stj EQ NOJOB OR stj EQ j) { return(TRUE); } else { cond_s = FALSE; } } else { cond_d = FALSE; } } else { cond_qru = FALSE; } } else { cond_qrg = FALSE; } } else { cond_qr = FALSE; } } else { cond_ru = FALSE; } } else { cond_rg = FALSE; } } else { cond_r = FALSE; } if( !cond_r ) { buf = "running job will exceed max_running of host:" + hostname; comment_update(j, buf); return(FALSE); } if( !cond_rg ) { buf = "running job will exceed max_group_run of host:" + hostname + " under group:" + egroup; comment_update(j, buf); return(FALSE); } if( !cond_ru ) { buf = "running job will exceed max_user_run of host:" + hostname + " under user:" + euser; comment_update(j, buf); return(FALSE); } if( !cond_qr ) { buf = "running job will exceed max_running of queue@host:" + qname + "@" + hostname; comment_update(j, buf); return(FALSE); } if( !cond_qrg ) { buf = "running job will exceed max_group_run of queue@host:" + qname + "@" + hostname + " under group:" + egroup; comment_update(j, buf); return(FALSE); } if( !cond_qru ) { buf = "running job will exceed max_user_run of queue@host:" + qname + "@" + hostname + " under user:" + euser; comment_update(j, buf); return(FALSE); } if( !cond_d ) { comment_update(j, "running job will cross dedicated time"); return(FALSE); } if( !cond_s ) { comment_update(j, "not the starving job."); return(FALSE); } return(FALSE);}// **********************************************************************// * *// * Global assignments. *// * *// **********************************************************************DEDQ = "dedicated";DEDTIME_START = (01|01|2035@00:00:00);DEDTIME_END = (01|01|2035@00:00:00);// specify how many secs a job has been queued before it is considered// starving.STARVE_TIME_SECS = 86400; // 24 hrs * (60 mins / 1 hr)* (60 secs / 1 min)// Specify the start time and end time of primetime periodPRIMETIME = ((04:00:00), (17:30:00));// Specify the start time and end time of non-primetime periodNON_PRIMETIME = ((17:30:00), (04:00:00));// ======================================================================// = =// = Scheduling Strategy Specification =// = NOTE: Different scheduling strategies can be undertaken depending =// = on time period. =// = =// ======================================================================// Sorting of queues: TRUE or FALSE (1 or 0) to sort the queues by priorityPRIMETIME_SORTQUE = TRUE;NON_PRIMETIME_SORTQUE = TRUE;// Push of Starving job: TRUE or FALSE (1 or 0) to give priority to starving// jobs (jobs which have been queued for more than STARVE_TIME_SECS). All// starving jobs will be run first before all other starving jobs.PRIMETIME_HELP_STARVING_JOB = TRUE;NON_PRIMETIME_HELP_STARVING_JOB = TRUE;// Sorting of jobs in a queue: Specify one of the following job sort strategies:// "no_sort"// "shortest_job_first"// "longest_job_first"// "smallest_memory_first"// "largest_memory_first"// "high_priority_first"// "low_priority_first"// "large_walltime_first"// "short_walltime_first"PRIMETIME_SORTJOB_METHOD = "shortest_job_first";NON_PRIMETIME_SORTJOB_METHOD = "shortest_job_first";// Strict fifo: TRUE or FALSE (0 or 1) to run jobs in a strict fifo order. If// one job cannot run, then move on to the next queue.// NOTE: If this is set to TRUE, it might be wise to specify "no_sort" for// PRIMETIME_SORTJOB_METHOD or NON_PRIMETIME_SORTJOB_METHOD so as to// preserve internal ordering of the jobs. PRIMETIME_STRICT_FIFO = FALSE;NON_PRIMETIME_STRICT_FIFO = FALSE;// **********************************************************************// * *// * main scheduling code *// * *// **********************************************************************sched_main(){ Server local; Set Que queues; Que q; Set Job jobs; Job j; Job stj; Int nrun; Int q_nrun; Int resAvail; Int ranjob; DateTime dt; String jid; setSchedPeriod(); dt = datetimeGet(); print(dt); switch(PRIME_PERIOD) { case TRUE: { print("Primetime Scheduling started::::::::::::::::::::"); } case FALSE: { print("Non-Primetime Scheduling started::::::::::::::::::::"); } } local = AllServersLocalHostGet(); perform_queue_sort(local); queues = ServerQueuesGet(local); stj = findMostStarvedJob(local); perform_job_sort(local); nrun = numRunning(local); MAXRUN = ServerMaxRunJobsGet(local); MAXRUN_PERUSER = ServerMaxRunJobsPerUserGet(local); MAXRUN_PERGRP = ServerMaxRunJobsPerGroupGet(local); // jobs are selected by queue foreach( q in queues ) { if( QueTypeGet(q) NEQ QTYPE_E OR QueStateGet(q) NEQ SCHED_ENABLED ) { continue; } jobs = QueJobsGet(q); q_nrun = numRunningInQue(q); QUE_MAXRUN = QueMaxRunJobsGet(q); QUE_MAXRUN_PERUSER = QueMaxRunJobsPerUserGet(q); QUE_MAXRUN_PERGRP = QueMaxRunJobsPerGroupGet(q); foreach( j in jobs ) { ranjob = FALSE; jid = JobIdGet(j); if(JobStateGet(j) EQ QUEUED) { if(satisfy_constraint(j, local, q, nrun, q_nrun, stj)) { resAvail = resource_available(local, j); if( runJob(j, resAvail) EQ SUCCESS ) { nrun++; q_nrun++; ranjob = TRUE; } } if( !ranjob ) { if( (PRIME_PERIOD AND PRIMETIME_STRICT_FIFO) OR (!PRIME_PERIOD AND NON_PRIMETIME_STRICT_FIFO) ) { printmsg2(jid, "did not run. Going into next queue since we run strict fifo."); break; } } else { if( stj EQ j ) { printmsg2(jid, "Checking next queue since we already run 1 starving job from current queue."); break; } } } else { printmsg2(jid, "not running job since it's not QUEUED"); } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -