📄 fifo.c
字号:
* sd - connection descriptor to the pbs server * * returns success/failure * */int scheduling_cycle( int sd ){ server_info *sinfo; /* ptr to the server/queue/job/node info */ job_info *jinfo; /* ptr to the job to see if it can run */ int ret = SUCCESS; /* return code from is_ok_to_run_job() */ char log_msg[MAX_LOG_SIZE]; /* used to log an message about job */ char comment[MAX_COMMENT_SIZE]; /* used to update comment of job */ log(PBSEVENT_DEBUG2, PBS_EVENTCLASS_REQUEST, "", "Entering Schedule"); update_cycle_status(); /* create the server / queue / job / node structures */ if( ( sinfo = query_server( sd ) ) == NULL ) { fprintf(stderr, "Problem with creating server data strucutre\n"); return 0; } if( init_scheduling_cycle( sinfo ) == 0 ) { log(PBSEVENT_DEBUG, PBS_EVENTCLASS_SERVER, sinfo -> name, "init_scheduling_cycle failed."); free_server(sinfo, 1); return 0; } /* main scheduling loop */ while( ( jinfo = next_job(sinfo, 0) ) ) { log(PBSEVENT_DEBUG2, PBS_EVENTCLASS_JOB, jinfo -> name, "Considering job to run"); if((ret = is_ok_to_run_job( sd, sinfo, jinfo -> queue, jinfo )) == SUCCESS) run_update_job( sd, sinfo, jinfo -> queue, jinfo ); else { if( jinfo -> can_never_run ) { log(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jinfo -> name, "Job Deleted because it would never run"); pbs_deljob(sd, jinfo -> name, "Job could never run"); } jinfo -> can_not_run = 1; if( translate_job_fail_code( ret, comment, log_msg ) ) { /* if the comment doesn't get changed, its because it hasn't changed. * if the reason for the job has not changed, we do not need to log it */ if( update_job_comment(sd, jinfo, comment) == 0 ) log(PBSEVENT_SCHED, PBS_EVENTCLASS_JOB, jinfo -> name, log_msg); } if( ret != NOT_QUEUED && cstat.strict_fifo ) update_jobs_cant_run( sd, jinfo -> queue -> jobs, jinfo, COMMENT_STRICT_FIFO, START_AFTER_JOB); } } if( cstat.fair_share ) update_last_running(sinfo); free_server(sinfo, 1); /* free server and queues and jobs */ log(PBSEVENT_DEBUG2, PBS_EVENTCLASS_REQUEST, "", "Leaving schedule\n"); return 0;}/* * * update_last_running - update the last_running job array * keep the currently running jobs till next * scheduling cycle * * sinfo - the server of current jobs * * Returns success/failure * */int update_last_running( server_info *sinfo ){ free_pjobs( last_running, last_running_size ); last_running = create_prev_job_info( sinfo -> running_jobs, sinfo -> sc.running ); last_running_size = sinfo -> sc.running; if( last_running == NULL ) return 0; return 1;}/* * * update_starvation - update starvation information in array of jobs * * jobs - job array to update * * returns the most starving job * */job_info *update_starvation( job_info **jobs ){ job_info *jinfo = NULL; /* used as a ptr to the job to update */ int i; if( jobs != NULL ) { for( i = 0; jobs[i] != NULL; i++ ) { if(jobs[i] -> qtime + conf.max_starve < cstat.current_time && jobs[i] -> is_queued && !jobs[i] -> queue -> dedtime_queue ) { jobs[i] -> is_starving = 1; jobs[i] -> sch_priority = cstat.current_time - jobs[i] -> qtime - conf.max_starve; if( jinfo == NULL || jobs[i] -> sch_priority > jinfo -> sch_priority ) jinfo = jobs[i]; } else ; /* job not starving yet */ } } return jinfo;} /* * * run_update_job - run the job and update the job information * * pbs_sd - connection to pbs_server * sinfo - server job is on * qinfo - queue job resides in * jinfo - the job to run * * returns success/failure - see pbs_errno for more info * */int run_update_job( int pbs_sd, server_info *sinfo, queue_info *qinfo, job_info *jinfo){ int ret; /* return code from pbs_runjob() */ node_info *best_node = NULL; /* best node to run job on */ char *best_node_name = NULL; /* name of best node */ char buf[256] = {'\0'}; /* generic buffer - comments & logging*/ char timebuf[128]; /* buffer to hold the time and date */ resource_req *res; /* ptr to the resource of ncpus */ int ncpus; /* numeric amount of resource ncpus */ char *errmsg; /* used for pbs_geterrmsg() */ strftime(timebuf, 128, "started on %a %b %d at %H:%M", localtime(&cstat.current_time)); if( cstat.load_balancing || cstat.load_balancing_rr ) { best_node = find_best_node( jinfo, sinfo -> timesharing_nodes ); if( best_node != NULL ) { best_node_name = best_node -> name; sprintf(buf, "Job run on node %s - %s", best_node_name, timebuf); } } if( best_node == NULL ) sprintf(buf, "Job %s", timebuf); update_job_comment(pbs_sd, jinfo, buf); buf[0] = '\0'; ret = pbs_runjob(pbs_sd, jinfo -> name, best_node_name, NULL); if( ret == 0 ) { /* If a job is 100% efficent, it will raise the load average by 1 per * cpu is uses. Temporarly inflate load average by that value */ if( cstat.load_balancing && best_node != NULL ) { if( ( res = find_resource_req( jinfo -> resreq, "ncpus" ) ) == NULL ) ncpus = 1; else ncpus = res -> amount; best_node -> loadave += ncpus; } if( cstat.help_starving_jobs && jinfo == cstat.starving_job ) jinfo -> sch_priority = 0; log(PBSEVENT_SCHED, PBS_EVENTCLASS_JOB, jinfo -> name, "Job Run"); update_server_on_run( sinfo, qinfo, jinfo ); update_queue_on_run( qinfo, jinfo ); update_job_on_run( pbs_sd, jinfo ); if( cstat.fair_share ) update_usage_on_run( jinfo ); free(sinfo -> running_jobs); sinfo -> running_jobs = job_filter(sinfo -> jobs, sinfo -> sc.total, check_run_job, NULL); free(qinfo -> running_jobs); qinfo -> running_jobs = job_filter(qinfo -> jobs, qinfo -> sc.total, check_run_job, NULL); } else { errmsg = pbs_geterrmsg( pbs_sd ); sprintf(buf, "Not Running - PBS Error: %s", errmsg); update_job_comment(pbs_sd, jinfo, buf); } return ret;}/* * * next_job - find the next job to be run by the scheduler * * sinfo - the server the jobs are on * init - whether or not to initialize * * returns * - the next job to run * - NULL on error or if there are no more jobs to run * */job_info *next_job( server_info *sinfo, int init ){ /* cjobs is an array of the queue's job_info arrays. It is used to cycle * through the jobs returning 1 job from each queue */ static job_info ***cjobs = NULL; /* last_queue is the index into a queue array of the last time * the function was called */ static int last_queue; /* last_job is the index into a job array of the last time * the function was called */ static int last_job; job_info *rjob = NULL; /* the job to return */ int i; if( cstat.round_robin ) { if( init == INITIALIZE ) { if( cjobs != NULL ) { free(cjobs); cjobs = NULL; } /* * cjobs is an array of pointers which will point to each of the job * arrays in each queue */ if( cjobs == NULL ) { if( ( cjobs = (job_info ***) malloc( sizeof( job_info**) * sinfo -> num_queues) ) == NULL ) { perror("Memory Allocation Error"); return NULL; } last_queue = -1; last_job = 0; for( i = 0; i < sinfo -> num_queues; i++ ) cjobs[i] = sinfo -> queues[i] -> jobs; } } /* end initalization */ else { /* find the next queue to run a job out of */ for(i = 0; i < sinfo -> num_queues && rjob == NULL; i++) { /* we have reached the end of the array, cycle back through */ if( last_queue == (sinfo -> num_queues - 1) ) { last_queue = 0; last_job++; } else /* still more queues to go */ last_queue++; if( cjobs[last_queue] != NULL ) { if( cjobs[last_queue][last_job] == NULL ) cjobs[last_queue] = NULL; else { if( cstat.fair_share ) rjob = extract_fairshare( cjobs[last_queue] ); else if( cjobs[last_queue][last_job] -> can_not_run == 0 ) rjob = cjobs[last_queue][last_job]; } } } } } else if( cstat.by_queue ) { if( init == INITIALIZE ) { last_job = -1; last_queue = 0; } else { last_job++; /* check if we have reached the end of a queue and need to find another */ while( last_queue < sinfo -> num_queues && (sinfo -> queues[last_queue] -> jobs == NULL || sinfo -> queues[last_queue] -> jobs[last_job] == NULL || sinfo -> queues[last_queue] -> jobs[last_job] -> can_not_run)) { if( sinfo -> queues[last_queue] -> jobs == NULL || sinfo -> queues[last_queue] -> jobs[last_job] == NULL ) { last_queue++; last_job = 0; } else if( sinfo -> queues[last_queue] -> jobs[last_job] -> can_not_run ) last_job++; } if( last_queue == sinfo -> num_queues || sinfo -> queues[last_queue] == NULL ) rjob = NULL; else { if( cstat.fair_share ) rjob = extract_fairshare( sinfo -> queues[last_queue] -> jobs ); else { if( last_job < sinfo -> queues[last_queue] -> sc.total ) rjob = sinfo -> queues[last_queue] -> jobs[last_job]; } } } } else /* treat the entire system as one large queue */ { if( init == INITIALIZE ) { last_job = -1; } else { if( cstat.fair_share ) rjob = extract_fairshare( sinfo -> jobs ); else { last_job++; while( sinfo -> jobs[last_job] != NULL && sinfo -> jobs[last_job] -> can_not_run) { last_job++; } rjob = sinfo -> jobs[last_job]; } } } return rjob;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -