📄 schedule.c
字号:
/* * Sort the list of jobs based on several criteria. The sorting routine * returns a pointer to the new head of the list created by relinking * the elements of the linked list, or NULL if an error occurs. Zero * the original list pointer to reduce confusion -- the same list, in * different order, now lives on schd_AllJobs. */ schd_AllJobs = schd_sort_jobs(jobs); jobs = NULL; if (schd_AllJobs == NULL) {#ifdef DEBUG (void)sprintf(log_buffer, ">>> Sorting Failed - Cancel Scheduling Cycle <<<"); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, log_buffer);#endif DBPRT(("%s\n", log_buffer)); return (1); } /* * Get the queue utilization for each queue. Any jobs on schd_AllJobs * (set by sort_jobs() above) that belong to the queue will be placed * on the queue->jobs list. */ error = schd_get_queue_util(); if (error < 0) { DBPRT(("get_all_queue_info() failed\n")); return (1); /* Bogus queue - don't recycle. */ } else if (error > 0) { DBPRT(("queue failed sanity check - wait and recycle.\n")); sleep(WAIT_FOR_QUEUE_SANITY); return (0); /* Attempt to recycle scheduler. */ } /* Dump the list of jobs being scheduled from submit queue. */ if (schd_JOB_DUMPFILE) { make_job_dump(schd_JOB_DUMPFILE); } if (schd_SubmitQueue->queue->jobs && !(schd_SubmitQueue->queue->flags & (QFLAGS_DISABLED | QFLAGS_STOPPED))) { total_ran = schd_pack_queues(schd_SubmitQueue->queue->jobs, schd_BatchQueues, reason); }/* DEBUG: for Pat Wright at MSIC: print the resouces used each iteration * if (total_ran > 0) { * */ (void)sprintf(log_buffer, "System resources after scheduling:"); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, log_buffer); schd_dump_rsrclist(); /* * DEBUG: continuation of above test for Pat Wright } * */ /* We need to save the Share Usage data peridoically, so that a restart * of pbs_sched doesn't loose it. */ if (schd_save_shares()) /* is it time yet? */ schd_share_info("w"); /* yes, so do it */ (void)sprintf(log_buffer, ">>> End Scheduling Cycle (ran %d jobs) <<<", total_ran); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, log_buffer); DBPRT(("%s\n", log_buffer)); return (1);}static intschedule_restart(Job *joblist){ char *id = "schedule_restart"; Job *job, *nextjob; QueueList *qptr; int found, changed; changed = found = 0; for (job = joblist; job != NULL; job = nextjob) { nextjob = job->next; if (job->state != 'Q') continue; /* * See if the job is queued on one of the batch queues. If not, * go on to the next job. */ for (qptr = schd_BatchQueues; qptr != NULL; qptr = qptr->next) if (strcmp(qptr->queue->qname, job->qname) == 0) break; if (qptr == NULL) continue; found++; if (schd_SCHED_RESTART_ACTION == SCHD_RESTART_RERUN) { (void)sprintf(log_buffer, "Restart job '%s' on queue '%s'.", job->jobid, job->qname); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, log_buffer); DBPRT(("%s: %s\n", id, log_buffer)); schd_comment_job(job, schd_JobMsg[JOB_RESTARTED], JOB_COMMENT_REQUIRED); if (schd_run_job_on(job, job->queue, schd_SCHED_HOST, LEAVE_JOB_COMMENT)) { (void)sprintf(log_buffer, "Unable to run job '%s' on queue '%s'.", job->jobid, job->qname); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, log_buffer); } else changed ++; } else /* (SCHED_RESTART_ACTION == SCHD_RESTART_RESUBMIT) */ { /* Move the job back to its originating queue. */ if (pbs_movejob(connector, job->jobid, job->oqueue, NULL) != 0) { (void)sprintf(log_buffer, "failed to move %s to queue %s, %d", job->jobid, job->oqueue, pbs_errno); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, log_buffer); DBPRT(("%s: %s\n", id, log_buffer)); } else { (void)sprintf(log_buffer, "Requeued job '%s' on queue '%s'.", job->jobid, job->oqueue); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, log_buffer); DBPRT(("%s: %s\n", id, log_buffer)); schd_comment_job(job, schd_JobMsg[JOB_RESUBMITTED], JOB_COMMENT_REQUIRED); changed ++; } } } if (found) { if (schd_SCHED_RESTART_ACTION == SCHD_RESTART_RERUN) { (void)sprintf(log_buffer, "Re-ran %d jobs (of %d) found queued on run queues.\n", changed, found); } else { (void)sprintf(log_buffer, "Moved %d queued jobs (of %d) from run queues back to '%s'.\n", changed, found, schd_SubmitQueue->queue->qname); } log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, log_buffer); DBPRT(("%s: %s\n", id, log_buffer)); } return (changed);}static int make_job_dump (char *dumpfile){ char *id = "make_job_dump"; FILE *dump; QueueList *qptr; (void)sprintf(log_buffer, "Sorted jobs: %s", schd_JOB_DUMPFILE); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, log_buffer); /* * Attempt to open the dump file, creating it if necessary. It should * be truncated each time this runs, so don't open with append mode. */ if ((dump = fopen(dumpfile, "w")) == NULL) { (void)sprintf(log_buffer, "Cannot write to %s: %s\n", dumpfile, strerror(errno)); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, log_buffer); DBPRT(("%s: %s\n", id, log_buffer)); return (-1); } /* Head the file with a timestamp. */ fprintf(dump, "%s\n", ctime(&schd_TimeNow)); /* Include the version string compiled into the scheduler binary. */ fprintf(dump, "%s\n", schd_VersionString); /* And some more useful information about the state of the world. */ fprintf(dump, "Scheduler running on '%s'\n", schd_ThisHost); fprintf(dump, "Prime-time is "); if (schd_ENFORCE_PRIME_TIME) { fprintf(dump, "from %s ", schd_sec2val(schd_PRIME_TIME_START)); fprintf(dump, "to %s.\n", schd_sec2val(schd_PRIME_TIME_END)); } else fprintf(dump, "not enforced.\n"); fprintf(dump, "\nJOBS LISTED IN ORDER FROM HIGHEST TO LOWEST PRIORITY\n\n"); /* Now dump the jobs queued on the various queues, in order of sort_order. */ qptr = schd_SubmitQueue; if (qptr->queue->jobs) { fprintf(dump, "Jobs on submit queue '%s':\n", qptr->queue->qname); dump_sorted_jobs (dump, qptr->queue->jobs); } if (fclose(dump)) { (void)sprintf(log_buffer, "close(%s): %s\n", dumpfile, strerror(errno)); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, log_buffer); DBPRT(("%s: %s\n", id, log_buffer)); return (-1); } return (0);}static int dump_sorted_jobs (FILE *dump, Job *joblist){ Job *job; int njobs, elig_mesg = 0;#define DUMP_PRIORITY 6#define DUMP_JID_LEN 10#define DUMP_STATE_LEN 1#define DUMP_QUEUE_LEN 8#define DUMP_OWNER_LEN 8#define DUMP_NODES_LEN 3#define DUMP_WALLT_LEN 8#define DUMP_ELIGI_LEN 9 /* time plus '*' if wait != eligible */#define DUMP_FLAGS_LEN 10 char jid[DUMP_JID_LEN + 1]; char queue[DUMP_QUEUE_LEN + 1]; char owner[DUMP_OWNER_LEN + 1]; char wallt[DUMP_WALLT_LEN + 1]; char eligi[DUMP_ELIGI_LEN + 1]; char flags[DUMP_FLAGS_LEN + 1]; fprintf(dump, " %*s %*s %*s %*s %*s %*s %*s %*s %*s\n", -DUMP_PRIORITY, "Pri", -DUMP_JID_LEN, "Job ID", -DUMP_STATE_LEN, "S", -DUMP_OWNER_LEN, "Owner", -DUMP_QUEUE_LEN, "Queue", -DUMP_NODES_LEN, "Nds", -DUMP_WALLT_LEN, "Walltime", -DUMP_ELIGI_LEN, "Eligible", -DUMP_FLAGS_LEN, "Flags"); fprintf(dump, " %*s %*s %c %*s %*s %*s %*s %*s %*s\n", -DUMP_PRIORITY, "-----", -DUMP_JID_LEN, "---------", '-', -DUMP_QUEUE_LEN, "--------", -DUMP_OWNER_LEN, "--------", -DUMP_NODES_LEN, "---", -DUMP_WALLT_LEN, "--------", -DUMP_ELIGI_LEN, "---------", -DUMP_FLAGS_LEN, "------"); for (njobs = 0, job = joblist; job != NULL; job = job->next) { njobs++; strncpy(jid, job->jobid, DUMP_JID_LEN); strncpy(owner, job->owner, DUMP_OWNER_LEN); strncpy(queue, job->oqueue, DUMP_QUEUE_LEN); strcpy(wallt, schd_sec2val(job->walltime)); strcpy(eligi, schd_sec2val(job->eligible)); if (job->time_queued != job->eligible) { strcat(eligi, "*"); elig_mesg ++; } flags[0] = '\0'; /* Watch length of 'flags[]' array! */ if (job->flags & JFLAGS_PRIORITY) strcat(flags, "High "); if (job->flags & JFLAGS_PRIORITY) strcat(flags, "Wait "); if (job->flags & JFLAGS_RUNLIMIT) strcat(flags, "RUNLIM"); if (job->flags & JFLAGS_CHKPT_OK) strcat(flags, "CKPTOK"); if (job->flags & JFLAGS_CHKPTD) strcat(flags, "CKPTD"); if (job->flags & JFLAGS_SUSPENDED) strcat(flags, "SUSP"); /* Trim off the trailing space if any flags were listed. */ if (flags[0] != '\0') flags[strlen(flags) - 1] = '\0'; fprintf(dump, " %*d %*s %c %*s %*s %*d %*s %*s %*s\n", -DUMP_PRIORITY, job->sort_order, -DUMP_JID_LEN, jid, job->state, -DUMP_OWNER_LEN, owner, -DUMP_QUEUE_LEN, queue, -DUMP_NODES_LEN, job->ncpus, -DUMP_WALLT_LEN, wallt, -DUMP_ELIGI_LEN, eligi, -DUMP_FLAGS_LEN, flags); } fprintf(dump, " Total: %d job%s\n\n", njobs, (njobs == 1) ? "" : "s"); if (elig_mesg) { fprintf(dump, "Jobs marked with a ``*'' have an etime different " "from their ctime.\n\n"); } return (njobs);}int schd_get_max_ncpus(void){ char *id = "get_max_ncpus"; Batch_Status *bs, *bsp; AttrList *attr; static AttrList alist[] = {{NULL, ATTR_rescmax, "", ""}}; int ret = 0; /* Query the server for status of the max ncpus attribute */ if ((bs = pbs_statserver(connector, alist, NULL)) == NULL) { sprintf(log_buffer, "pbs_statserver failed: %d", pbs_errno); log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_SERVER, id, log_buffer); DBPRT(("%s: %s\n", id, log_buffer)); return (ret); } /* Process the list of attributes returned by the server. */ for (bsp = bs; bsp != NULL; bsp = bsp->next) { for (attr = bsp->attribs; attr != NULL; attr = attr->next) { if (!strcmp(attr->name, "resources_max")) { if (!strcmp(attr->resource, "ncpus")) { ret = atoi(attr->value); break; } } } } pbs_statfree(bs); return (ret);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -