📄 schedule.c
字号:
if ((ret = schd_get_queue_limits(qptr->queue)) != 0) { DBPRT(("get_all_queue_info: get_queue_limits for %s failed.\n", qptr->queue->qname)); va_end(ap); return (ret); } /* * Set the queue flags if limits are exceeded. Don't bother * getting a reason string. */ schd_check_queue_limits(qptr->queue, NULL); } count ++; } va_end(ap); return (0);}static intschedule_restart(Job *joblist){ char *id = "schedule_restart"; Job *job, *nextjob; QueueList *qptr; int found, changed; changed = found = 0; for (job = joblist; job != NULL; job = nextjob) { nextjob = job->next; if (job->state != 'Q') continue; /* * See if the job is queued on one of the batch queues. If not, * go on to the next job. */ for (qptr = schd_BatchQueues; qptr != NULL; qptr = qptr->next) if (strcmp(qptr->queue->qname, job->qname) == 0) break; if (qptr == NULL) continue; found++; if (schd_SCHED_RESTART_ACTION == SCHD_RESTART_RERUN) { (void)sprintf(log_buffer, "Restart job '%s' on queue '%s'.", job->jobid, job->qname); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, log_buffer); DBPRT(("%s: %s\n", id, log_buffer)); schd_comment_job(job, schd_JobMsg[JOB_RESTARTED], JOB_COMMENT_REQUIRED); if (schd_run_job_on(job, job->queue, schd_SCHED_HOST, LEAVE_JOB_COMMENT)) { (void)sprintf(log_buffer, "Unable to run job '%s' on queue '%s'.", job->jobid, job->qname); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, log_buffer); } else changed ++; } else /* (SCHED_RESTART_ACTION == SCHD_RESTART_RESUBMIT) */ { if (schd_TEST_ONLY) { DBPRT(("%s: would have moved %s back to queue %s\n", id, job->jobid, schd_SubmitQueue->queue->qname)); } else { /* Move the job back to its originating queue. */ if (pbs_movejob(connector, job->jobid, job->oqueue, NULL) != 0) { (void)sprintf(log_buffer, "failed to move %s to queue %s, %d", job->jobid, job->oqueue, pbs_errno); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, log_buffer); DBPRT(("%s: %s\n", id, log_buffer)); } else { (void)sprintf(log_buffer, "Requeued job '%s' on queue '%s'.", job->jobid, job->oqueue); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, log_buffer); DBPRT(("%s: %s\n", id, log_buffer)); schd_comment_job(job, schd_JobMsg[JOB_RESUBMITTED], JOB_COMMENT_REQUIRED); changed ++; } } } } if (found) { if (schd_SCHED_RESTART_ACTION == SCHD_RESTART_RERUN) { (void)sprintf(log_buffer, "Re-ran %d jobs (of %d) found queued on run queues.\n", changed, found); } else { (void)sprintf(log_buffer, "Moved %d queued jobs (of %d) from run queues back to '%s'.\n", changed, found, schd_SubmitQueue->queue->qname); } log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, log_buffer); DBPRT(("%s: %s\n", id, log_buffer)); } return (changed);}static Job *reject_unrunnables(Job *jobs){ Job *this, *nextjob; char tmpstr[300]; for (this = jobs; this != NULL; this = nextjob) { nextjob = this->next; if (!schd_job_can_queue(this)) { /* * If this job is at the head of the list, we must deal with * it specially. We need to advance the list pointer forward * so that further scheduling will not be done on the now * bogus job. Advance 'jobs', and make 'nextjob' the 'next' * pointer for the new head of the list. */ if (this == jobs) { jobs = jobs->next; nextjob = jobs ? jobs->next : NULL; } DBPRT(("job %s does not fit on any execution queue - reject\n", this->jobid)); schd_reject_job(this, "Job will not fit on any execution queue.\n" "\n" "Use 'qstat -q' to get execution queue limits.\n"); continue; } /* * Enforce maximum job limits * "Big" jobs are given a maximum walltime limit (WALLT_LARGE_LIMIT) * that differs from "small" jobs. (Job size distinction based on * the size specified by SMALL_JOB_MAX.) We need to reject any job * which violate these limits. * * Special-priority jobs are not affected. */ if (!(this->flags & JFLAGS_PRIORITY) && (schd_SMALL_JOB_MAX > 0)) { if (this->nodes <= schd_SMALL_JOB_MAX) { if (this->walltime > schd_WALLT_SMALL_LIMIT) { if (this == jobs) { jobs = jobs->next; nextjob = jobs ? jobs->next : NULL; } DBPRT(("job %s exceeds Small job walltime limit - reject\n", this->jobid)); sprintf(tmpstr, "Job exceeds maximum walltime limit (%s) policy\n" "\tfor small jobs (1 - %d nodes).\n", schd_sec2val(schd_WALLT_SMALL_LIMIT), schd_SMALL_JOB_MAX); schd_reject_job(this, tmpstr); continue; } } else { if (this->walltime > schd_WALLT_LARGE_LIMIT) { if (this == jobs) { jobs = jobs->next; nextjob = jobs ? jobs->next : NULL; } DBPRT(("job %s exceeds Large job walltime limit - reject\n", this->jobid)); sprintf(tmpstr, "Job exceeds maximum walltime limit (%s) policy\n" "\tfor large jobs (%d+ nodes).\n", schd_sec2val(schd_WALLT_LARGE_LIMIT), schd_SMALL_JOB_MAX+1); schd_reject_job(this, tmpstr); continue; } } } } return (jobs);}static int make_job_dump (char *dumpfile){ char *id = "make_job_dump"; FILE *dump; QueueList *qptr; /* * Attempt to open the dump file, creating it if necessary. It should * be truncated each time this runs, so don't open with append mode. */ if ((dump = fopen(dumpfile, "w")) == NULL) { (void)sprintf(log_buffer, "Cannot write to %s: %s\n", dumpfile, strerror(errno)); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, log_buffer); DBPRT(("%s: %s\n", id, log_buffer)); return (-1); } /* Head the file with a timestamp. */ fprintf(dump, "%s\n", ctime(&schd_TimeNow)); /* And some more useful information about the state of the world. */ fprintf(dump, "Scheduler running on '%s'\n", schd_ThisHost); fprintf(dump, "Prime-time is "); if (schd_ENFORCE_PRIME_TIME && schd_TimeNow >= schd_ENFORCE_PRIME_TIME) { fprintf(dump, "from %s ", schd_sec2val(schd_PRIME_TIME_START)); fprintf(dump, "to %s.\n", schd_sec2val(schd_PRIME_TIME_END)); } else fprintf(dump, "not enforced.\n"); fprintf(dump, "\nJOBS LISTED IN ORDER FROM HIGHEST TO LOWEST PRIORITY\n\n"); /* Now dump the jobs queued on the various queues, in order of priority. */ qptr = schd_SubmitQueue; if (qptr->queue->jobs) { fprintf(dump, "Jobs on submit queue '%s':\n", qptr->queue->qname); dump_sorted_jobs (dump, qptr->queue->jobs); } for (qptr = schd_DedQueues; qptr != NULL; qptr = qptr->next) { if (qptr->queue->jobs) { fprintf(dump, "Jobs on dedicated queue '%s':\n", qptr->queue->qname); dump_sorted_jobs (dump, qptr->queue->jobs); } } if (fclose(dump)) { (void)sprintf(log_buffer, "close(%s): %s\n", dumpfile, strerror(errno)); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, log_buffer); DBPRT(("%s: %s\n", id, log_buffer)); return (-1); } return (0);}static int dump_sorted_jobs (FILE *dump, Job *joblist){ Job *job; int njobs; int elig_mesg = 0;#define DUMP_JID_LEN 16#define DUMP_STATE_LEN 1#define DUMP_OWNER_LEN 8#define DUMP_NODES_LEN 3#define DUMP_PRIORITY_LEN 4#define DUMP_WALLT_LEN 8#define DUMP_WAITT_LEN 8#define DUMP_ELIGI_LEN 9 /* time plus '*' if wait != eligible */#define DUMP_FLAGS_LEN 18 char jid[DUMP_JID_LEN + 1]; char owner[DUMP_OWNER_LEN + 1]; char wallt[DUMP_WALLT_LEN + 1]; char waitt[DUMP_WAITT_LEN + 1]; char eligi[DUMP_ELIGI_LEN + 1]; char flags[DUMP_FLAGS_LEN + 1]; fprintf(dump, " %*s %*s %*s %*s %*s %*s %*s %*s %*s\n", -DUMP_JID_LEN, "Job ID", -DUMP_STATE_LEN, "S", -DUMP_OWNER_LEN, "Owner", -DUMP_NODES_LEN, "PEs", -DUMP_PRIORITY_LEN, "Pri", -DUMP_WALLT_LEN, "Walltime", -DUMP_WAITT_LEN, "Q'd for", -DUMP_ELIGI_LEN, "Eligible", -DUMP_FLAGS_LEN, "Flags"); fprintf(dump, " %*s %c %*s %*s %*s %*s %*s %*s %*s\n", -DUMP_JID_LEN, "----------------", '-', -DUMP_OWNER_LEN, "--------", -DUMP_NODES_LEN, "---", -DUMP_PRIORITY_LEN, "----", -DUMP_WALLT_LEN, "--------", -DUMP_WAITT_LEN, "--------", -DUMP_ELIGI_LEN, "---------", -DUMP_FLAGS_LEN, "------------------"); for (njobs = 0, job = joblist; job != NULL; job = job->next) { njobs++; strncpy(jid, job->jobid, DUMP_JID_LEN); strncpy(owner, job->owner, DUMP_OWNER_LEN); strcpy(wallt, schd_sec2val(job->walltime)); strcpy(waitt, schd_sec2val(job->time_queued)); strcpy(eligi, schd_sec2val(job->eligible)); if (job->time_queued != job->eligible) { strcat(eligi, "*"); elig_mesg ++; } flags[0] = '\0'; /* Watch length of 'flags[]' array! */ if (job->flags & JFLAGS_INTERACTIVE) strcat(flags, "Int "); /* "Priority" jobs are marked as being waiting, even if they're new. */ if (job->flags & JFLAGS_PRIORITY) strcat(flags, "High "); else if (job->flags & JFLAGS_WAITING) strcat(flags, "Wait "); if (job->flags & JFLAGS_DEDICATED) strcat(flags, "Ded "); /* Trim off the trailing space if any flags were listed. */ if (flags[0] != '\0') flags[strlen(flags) - 1] = '\0';fflush(dump); fprintf(dump, " %*s %c %*s %*d %*d %*s %*s %*s %*s\n", -DUMP_JID_LEN, jid, job->state, -DUMP_OWNER_LEN, owner, -DUMP_NODES_LEN, job->nodes, -DUMP_PRIORITY_LEN, job->priority, -DUMP_WALLT_LEN, wallt, -DUMP_WAITT_LEN, waitt, -DUMP_ELIGI_LEN, eligi, -DUMP_FLAGS_LEN, flags); } fprintf(dump, " Total: %d job%s\n\n", njobs, (njobs == 1) ? "" : "s"); if (elig_mesg) { fprintf(dump, "Jobs marked with a ``*'' have an etime different " "from their ctime.\n\n"); } return (njobs);} /* Fix added by jjones per Dr. Hook to change behavior of jobs being * moved from submit queue to exec queue before run, at request of * ERDC. */void fix_jim(Queue *submit,Queue *jim){ /* there'll be no running jobs on 'jim', but the scheduler */ /* _assumes_ that jobs *only* run out of that queue ... :-P */ jim->running = submit->running; submit->running = 0; /* also, the scheduler would be surprised to find any */ /* resources "assigned" to any queue but 'jim' ... */ jim->nodes_assn = submit->nodes_assn; jim->nodes_rsvd = submit->nodes_rsvd; /* _probably_ unnecessary */ submit->nodes_assn = submit->nodes_rsvd = 0; /* for the "draining" logic, we should probably adjust 'empty_by' */ jim->empty_by = submit->empty_by; return;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -