📄 pack_queues.c
字号:
* this queue, looking at those that we previously marked as being * eligable for checkpointing; count # jobs necessary to free enough * resources to run job, and amount freed by doing so. */ ncpus_avail = queue->ncpus_max - queue->ncpus_assn; jobs_to_chkpt = 0; DBPRT(("CHK: %s avail=%d job2ckp=%d\n", queue->qname, ncpus_avail, jobs_to_chkpt)); for (jobptr = queue->jobs; jobptr != NULL; jobptr = jobptr->next) { /* assumption: Running jobs are at top/front of list */ if (jobptr->state != 'R') break; /* don't let an over-usage job push out a non-over-usage job * unless the target job is over its run limit, and they are * from the same oqueue, and they have different owners */ if (priority_job->flags & JFLAGS_CHKPT_OK) { if (jobptr->flags & JFLAGS_CHKPT_OK) { /* both jobs are over usage... */ if (!strcmp(priority_job->oqueue, jobptr->oqueue) && (priority_job->sort_order == jobptr->sort_order) && (jobptr->flags & JFLAGS_RUNLIMIT) && (strcmp(priority_job->owner, jobptr->owner))) { /* and from same oqueue with same sort order * and have different owners, and the target * job is over its runlimit*/ ; /* then proceed; otherwise skip the job */ } else continue; } else continue; } if (jobptr->flags & JFLAGS_CHKPT_OK || jobptr->flags & JFLAGS_RUNLIMIT) { if (!(jobptr->flags & JFLAGS_PRIORITY) && (!(jobptr->flags & JFLAGS_WAITING))) { /* only consider jobs from queues that have used more time * than the priority job has */ if ((priority_job->flags & JFLAGS_PRIORITY) || (priority_job->flags & JFLAGS_WAITING) || (priority_job->sort_order < jobptr->sort_order) || ((priority_job->sort_order == jobptr->sort_order) && (strcmp(priority_job->owner, jobptr->owner)))) { jobs_to_chkpt++; ncpus_avail += jobptr->ncpus; } } } if (ncpus_avail >= priority_job->ncpus) break; } if (jobs_to_chkpt == 0 || ncpus_avail < priority_job->ncpus) continue; if (jobs_to_chkpt < best_job_cnt) { best_job_cnt = jobs_to_chkpt; best_queue = queue; } } if (jobs_to_chkpt == 0 || best_queue == NULL) { /* Hummm, looking only at low-priority jobs didn't turn up anything. * So check how important this job is. If it's not High Priority * then just skip it. Otherwise, try harder... */ if ((priority_job->flags & JFLAGS_PRIORITY) || (priority_job->flags & JFLAGS_WAITING)) { sprintf(log_buffer,"No low-pri jobs to checkpoint, retrying... "); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER,id,log_buffer); } else { return(NULL); } /* * Loop through all running jobs on all queues again, but this time * we are willing to checkpoint ANY non-Express, non-Waiting jobs. */ best_job_cnt = 100000; best_queue = NULL; for (qptr = qlist; qptr != NULL; qptr = qptr->next) { queue = qptr->queue; /* * If this queue is missing its resource info, or if its * STOPPED, etc., skip it. */ if (qptr->queue->rsrcs == NULL || (qptr->queue->flags & QFLAGS_DISABLED) || (qptr->queue->flags & QFLAGS_NODEDOWN) || (qptr->queue->flags & QFLAGS_STOPPED)) continue; /* * Verify that this architecture and/or exechost are * correct for this job. */ if (priority_job->arch != NULL) { if (strcmp(priority_job->arch, qptr->queue->rsrcs->arch)) { sprintf(reason, "%s (%s)", schd_JobMsg[NO_ARCH], priority_job->arch); continue; } } if (priority_job->exechost != NULL) { if (strcmp(priority_job->exechost, qptr->queue->exechost)) { sprintf(reason, "%s (%s)", schd_JobMsg[WAIT_CHKPT_HOST], priority_job->exechost); continue; } } /* * Check if *this* job can run in this queue or not, based on * queue minimum and maximum limits. */ if (!schd_job_fits_queue(priority_job, queue, reason)) continue; /* * If this job has a user access control list, check that this * job can be allowed in it. */ if (queue->useracl && (queue->flags & QFLAGS_USER_ACL)) { if (!schd_useracl_okay(priority_job, queue, reason)) continue; } /* * We found a queue on which this job can run. Now we need to * decide if this is the best queue for this job. Walk the list * of jobs for this queue, looking at those that we previously * marked as being eligable for checkpointing; count # jobs * necessary to free enough resources to run job, and amount * freed by doing so. */ ncpus_avail = queue->ncpus_max - queue->ncpus_assn; jobs_to_chkpt = 0; for (jobptr = queue->jobs; jobptr != NULL; jobptr = jobptr->next) { DBPRT(("CHK: %s avail=%d job2ckp=%d\n", queue->qname, ncpus_avail, jobs_to_chkpt)); if (jobptr->state != 'R') break; if (!(jobptr->flags & JFLAGS_PRIORITY) && !(jobptr->flags & JFLAGS_WAITING)) { if ((priority_job->flags & JFLAGS_PRIORITY) || (priority_job->flags & JFLAGS_WAITING)) { jobs_to_chkpt++; ncpus_avail += jobptr->ncpus; DBPRT(("CHK: job %s %d ncpus\n", jobptr->jobid, jobptr->ncpus)); } else { if ((priority_job->sort_order < jobptr->sort_order) || ((priority_job->sort_order == jobptr->sort_order) && (strcmp(priority_job->owner, jobptr->owner)))) { jobs_to_chkpt++; ncpus_avail += jobptr->ncpus; DBPRT(("CHK: job %s %d ncpus\n", jobptr->jobid, jobptr->ncpus)); } } } if (ncpus_avail >= priority_job->ncpus) break; } if (jobs_to_chkpt == 0 || ncpus_avail < priority_job->ncpus) continue; if (jobs_to_chkpt < best_job_cnt) { best_job_cnt = jobs_to_chkpt; best_queue = queue; } } if (jobs_to_chkpt == 0 || best_queue == NULL) { sprintf(log_buffer,"Found NO jobs to checkpoint for %s", priority_job->jobid); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER,id,log_buffer); return(NULL); } } /* If we reach this point, then we know the best queue to use. So walk * that queue again, stopping the the N jobs necessary to free the * resources we need. */ ncpus_avail = best_queue->ncpus_max - best_queue->ncpus_assn; for (jobptr = best_queue->jobs; jobptr != NULL; jobptr = jobptr->next) { DBPRT(("CHK: Avail: %d of %d needed\n", ncpus_avail, priority_job->ncpus)); if (priority_job->flags & JFLAGS_CHKPT_OK) { if (jobptr->flags & JFLAGS_CHKPT_OK) { /* both jobs are over usage... */ if (!strcmp(priority_job->oqueue, jobptr->oqueue) && (priority_job->sort_order == jobptr->sort_order) && (jobptr->flags & JFLAGS_RUNLIMIT) && (strcmp(priority_job->owner, jobptr->owner))) { /* and from same oqueue with same sort order * and have different owners, and the target * job is over its runlimit*/ ; /* then proceed; otherwise skip the job */ } else continue; } else continue; } if (jobptr->flags & JFLAGS_CHKPT_OK || jobptr->flags & JFLAGS_RUNLIMIT) { if (!(jobptr->flags & JFLAGS_PRIORITY) && !(jobptr->flags & JFLAGS_WAITING)) { if ((priority_job->flags & JFLAGS_PRIORITY) || (priority_job->flags & JFLAGS_WAITING) || (priority_job->sort_order < jobptr->sort_order) || ((priority_job->sort_order == jobptr->sort_order) && (strcmp(priority_job->owner, jobptr->owner)))) { DBPRT(("CHK: %s would free %d cpus\n", jobptr->jobid, jobptr->ncpus)); if (!schd_checkpoint_job(jobptr)) { /* Hummm- checkpoint of this job failed; better skip it and * retry the whole routine again; given that we may have already * checkpointed some jobs, the next time thru hopefully we will * be able to run our high-priority job. If not, then what? */ jobptr->flags &= ~JFLAGS_CHKPT_OK; jobptr->flags |= JFLAGS_CHKPTD; sprintf(log_buffer,"WARNING: checkpoint error for %s, retrying...", jobptr->jobid); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER,id,log_buffer); return(make_room_for_job(priority_job, qlist, reason)); } ncpus_avail += jobptr->ncpus; } } } if (ncpus_avail >= priority_job->ncpus) break; } if (ncpus_avail < priority_job->ncpus) { /* oops, didn't get enough cpus. Must have not been enough * low priority jobs; But since we are in this part of the * routine, we *know* that there are enough jobs, so we need * to make a second pass, this time getting ANY non-priority * or non-waiting jobs. */ for (jobptr = best_queue->jobs;jobptr != NULL;jobptr=jobptr->next) { DBPRT(("CHK2: Avail: %d of %d needed\n", ncpus_avail, priority_job->ncpus)); if (!(jobptr->flags & JFLAGS_PRIORITY) && !(jobptr->flags & JFLAGS_WAITING) && (jobptr->state == 'R')) { if ((priority_job->flags & JFLAGS_PRIORITY) || (priority_job->flags & JFLAGS_WAITING)) { if (!schd_checkpoint_job(jobptr)) { /* Hummm- checkpoint of this job failed; better skip it * and retry the whole routine again; given that we may * have already checkpointed some jobs, the next time * thru hopefully we will be able to run our high-priority * job. If not, then what? */ jobptr->flags &= ~JFLAGS_CHKPT_OK; jobptr->flags |= JFLAGS_CHKPTD; DBPRT(("CHK: chkpt error, retrying...\n")); return(make_room_for_job(priority_job, qlist, reason)); } ncpus_avail += jobptr->ncpus; } else { if ((priority_job->sort_order < jobptr->sort_order) || ((priority_job->sort_order = jobptr->sort_order) && (strcmp(priority_job->owner, jobptr->owner)))) { if (!schd_checkpoint_job(jobptr)) { jobptr->flags &= ~JFLAGS_CHKPT_OK; jobptr->flags |= JFLAGS_CHKPTD; DBPRT(("CHK: chkpt error, retrying...\n")); return(make_room_for_job(priority_job, qlist, reason)); } ncpus_avail += jobptr->ncpus; } } } if (ncpus_avail >= priority_job->ncpus) break; } } /* If we still don't have enough cpus, then we have a real problem. * Bail outta here, and hope real hard that the next iteration things * will have improved. */ if (ncpus_avail < priority_job->ncpus) return(NULL); /* otherwise, we have suspended/checkpointed/ or otherwise cleared space * for this priority job; return a pointer to this queue so we can run * this job */ return(best_queue);}int schd_checkpoint_job(Job *job){ char *id = "checkpoint_job"; int ret = 0, done = 0; if ( job->state == 'Q' ) { sprintf(log_buffer,"WARNING: tried to checkpoint QUEUED job %s; WHY?", job->jobid); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER,id,log_buffer); return (1); } if ( 100.0 * job->time_left / job->walltime <= schd_SUSPEND_THRESHOLD ) { /* Attempt to suspend the job */ ret = pbs_sigjob(connector, job->jobid, "suspend", NULL); if (ret) { if (job_just_exited(ret, job)) return(0); /* try again */ else sprintf(log_buffer, "suspend job %s FAILED (%d); trying checkpoint", job->jobid, ret); } else { sprintf(log_buffer,"suspended job %s", job->jobid); done=1; } log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER,id,log_buffer); } if (done) { schd_UNcharge_job(job, job->queue, job->queue->rsrcs); schd_move_job_to(job, schd_SubmitQueue->queue); return(done); } else { /* Attempt to checkpoint the job */ ret = pbs_holdjob(connector, job->jobid, "s", NULL); if (ret) { if (job_just_exited(ret, job)) return(0); /* try again */ else sprintf(log_buffer,"checkpoint job %s FAILED (%d)", job->jobid, ret); } else { /* DEBUG: there's a window of opportunity between the Server * ACK'ing the qhold, and MOM actually completing the the * checkpoint. Bob is investigating this. Until its resolved, * we are disabling the forceable requeue of jobs (if both * suspend and checkpoint fail). */ /* * if (schd_FORCE_REQUEUE) * pbs_rerunjob(connector, job->jobid, NULL); */ sprintf(log_buffer,"checkpointed/requeued job %s", job->jobid); done=1; } log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER,id,log_buffer); } /* If we succeeded in stopping this job, then reduce the usage counters * for this job's resources. */ if (done) { schd_UNcharge_job(job, job->queue, job->queue->rsrcs); /* Move the job from its run queue back to o-queue. */ pbs_movejob(connector, job->jobid, job->oqueue, NULL); schd_move_job_to(job, schd_SubmitQueue->queue); } /* in either case, release the hold on the job... */ pbs_rlsjob(connector, job->jobid, "s", NULL); return(done);}int job_just_exited(int error_code, Job *job){ if (error_code == 15001 /* Unknown Job Id */) { job->state = 'E'; schd_UNcharge_job(job, job->queue, job->queue->rsrcs); return (1); } return (0);}int has_suspended_jobs(Queue *queue, Job *job){ Job *jobptr, *nextjob; int count = 0; char *id = "has_suspended_jobs"; for (jobptr = queue->jobs; jobptr != NULL; jobptr = nextjob) { nextjob = jobptr->next; if (jobptr->flags & JFLAGS_SUSPENDED) { if (!strcmp(jobptr->owner, job->owner)) { if (!strcmp(jobptr->oqueue, job->oqueue)) { count++; } } } } return(count);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -