⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pack_queues.c

📁 openPBS的开放源代码
💻 C
📖 第 1 页 / 共 2 页
字号:
  	 * this queue, looking at those that we previously marked as being	 * eligable for checkpointing; count # jobs necessary to free enough	 * resources to run job, and amount freed by doing so.         */		ncpus_avail = queue->ncpus_max - queue->ncpus_assn;	jobs_to_chkpt = 0;	DBPRT(("CHK: %s avail=%d job2ckp=%d\n", queue->qname, ncpus_avail,	    jobs_to_chkpt));    	for (jobptr = queue->jobs; jobptr != NULL; jobptr = jobptr->next) {	    	/* assumption: Running jobs are at top/front of list */	    if (jobptr->state != 'R')		break;	    /* don't let an over-usage job push out a non-over-usage job	     * unless the target job is over its run limit, and they are	     * from the same oqueue, and they have different owners	     */	    if (priority_job->flags & JFLAGS_CHKPT_OK) {	        if (jobptr->flags & JFLAGS_CHKPT_OK) {		    /* both jobs are over usage...  */	 	    if (!strcmp(priority_job->oqueue, jobptr->oqueue) &&		        (priority_job->sort_order == jobptr->sort_order) &&		        (jobptr->flags & JFLAGS_RUNLIMIT) &&		        (strcmp(priority_job->owner, jobptr->owner))) {		        /* and from same oqueue with same sort order		         * and have different owners, and the target 		         * job is over its runlimit*/			     ; /* then proceed; otherwise skip the job */		    } else 		        continue;		} else 		    continue;	    }	    if (jobptr->flags & JFLAGS_CHKPT_OK ||		jobptr->flags & JFLAGS_RUNLIMIT) {		if (!(jobptr->flags & JFLAGS_PRIORITY) &&		   (!(jobptr->flags & JFLAGS_WAITING))) {		    /* only consider jobs from queues that have used more time		     * than the priority job has */		    if ((priority_job->flags & JFLAGS_PRIORITY) ||		        (priority_job->flags & JFLAGS_WAITING)  ||		        (priority_job->sort_order < jobptr->sort_order) ||		        ((priority_job->sort_order == jobptr->sort_order) &&		        (strcmp(priority_job->owner, jobptr->owner)))) {		        jobs_to_chkpt++;		        ncpus_avail += jobptr->ncpus;		    }		}	    }	    if (ncpus_avail >= priority_job->ncpus)		break;	}	if (jobs_to_chkpt == 0 || ncpus_avail < priority_job->ncpus)	    continue;	if (jobs_to_chkpt < best_job_cnt) {	    best_job_cnt = jobs_to_chkpt;	    best_queue = queue;	}    }    if (jobs_to_chkpt == 0 || best_queue == NULL) {        /* Hummm, looking only at low-priority jobs didn't turn up anything.	 * So check how important this job is. If it's not High Priority	 * then just skip it. Otherwise, try harder...	 */	if ((priority_job->flags & JFLAGS_PRIORITY) ||	    (priority_job->flags & JFLAGS_WAITING)) {            sprintf(log_buffer,"No low-pri jobs to checkpoint, retrying... ");            log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER,id,log_buffer);	} else {	    return(NULL);	}        /*         * Loop through all running jobs on all queues again, but this time         * we are willing to checkpoint ANY non-Express, non-Waiting jobs.         */            best_job_cnt = 100000;        best_queue = NULL;            for (qptr = qlist; qptr != NULL; qptr = qptr->next) {            queue = qptr->queue;                /*              * If this queue is missing its resource info, or if its 	     * STOPPED, etc.,  skip it.             */            if (qptr->queue->rsrcs == NULL            ||	       (qptr->queue->flags & QFLAGS_DISABLED) ||	       (qptr->queue->flags & QFLAGS_NODEDOWN) ||	       (qptr->queue->flags & QFLAGS_STOPPED))    	          continue;    	    /* 	     * Verify that this architecture and/or exechost are 	     * correct for this job.	     */            if (priority_job->arch != NULL) {                if (strcmp(priority_job->arch, qptr->queue->rsrcs->arch)) {	            sprintf(reason, "%s (%s)", schd_JobMsg[NO_ARCH],		        priority_job->arch);		    continue;	        }            }            if (priority_job->exechost != NULL) {                if (strcmp(priority_job->exechost, qptr->queue->exechost)) {	            sprintf(reason, "%s (%s)", schd_JobMsg[WAIT_CHKPT_HOST],		        priority_job->exechost);		    continue;	        }            }                /*             * Check if *this* job can run in this queue or not, based on 	     * queue minimum and maximum limits.              */            if (!schd_job_fits_queue(priority_job, queue, reason))    	        continue;                        /*             * If this job has a user access control list, check that this             * job can be allowed in it.             */            if (queue->useracl && (queue->flags & QFLAGS_USER_ACL)) {    	        if (!schd_useracl_okay(priority_job, queue, reason))    	            continue;	    }                /*             * We found a queue on which this job can run. Now we need to	     * decide if this is the best queue for this job. Walk the list	     * of jobs for this queue, looking at those that we previously	     * marked as being eligable for checkpointing; count # jobs	     * necessary to free enough resources to run job, and amount	     * freed by doing so.             */    		    ncpus_avail = queue->ncpus_max - queue->ncpus_assn;	    jobs_to_chkpt = 0;        	    for (jobptr = queue->jobs; jobptr != NULL; jobptr = jobptr->next) {    	        DBPRT(("CHK: %s avail=%d job2ckp=%d\n", queue->qname,		    ncpus_avail, jobs_to_chkpt));	        if (jobptr->state != 'R')		    break;    	        if (!(jobptr->flags & JFLAGS_PRIORITY) &&	            !(jobptr->flags & JFLAGS_WAITING)) {    		    if ((priority_job->flags & JFLAGS_PRIORITY) ||		        (priority_job->flags & JFLAGS_WAITING)) {		    	jobs_to_chkpt++;		    	ncpus_avail += jobptr->ncpus; 		    	DBPRT(("CHK:   job %s %d ncpus\n", jobptr->jobid,			    jobptr->ncpus));		    } else {		        if  ((priority_job->sort_order < jobptr->sort_order) ||		    	    ((priority_job->sort_order == jobptr->sort_order) &&		    	     (strcmp(priority_job->owner, jobptr->owner)))) {		    	    jobs_to_chkpt++;		    	    ncpus_avail += jobptr->ncpus; 		    	    DBPRT(("CHK:   job %s %d ncpus\n", jobptr->jobid,			        jobptr->ncpus));		        }		    }	        }    	        if (ncpus_avail >= priority_job->ncpus)		    break;	    }	    if (jobs_to_chkpt == 0 || ncpus_avail < priority_job->ncpus)	        continue;    	    if (jobs_to_chkpt < best_job_cnt) {	        best_job_cnt = jobs_to_chkpt;	        best_queue = queue;	    }        }            if (jobs_to_chkpt == 0 || best_queue == NULL) {            sprintf(log_buffer,"Found NO jobs to checkpoint for %s",		priority_job->jobid);            log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER,id,log_buffer);            return(NULL);        }    }    /* If we reach this point, then we know the best queue to use. So walk     * that queue again, stopping the the N jobs necessary to free the     * resources we need.     */    ncpus_avail = best_queue->ncpus_max - best_queue->ncpus_assn;    for (jobptr = best_queue->jobs; jobptr != NULL; jobptr = jobptr->next) {   	DBPRT(("CHK: Avail: %d of %d needed\n", ncpus_avail,	    priority_job->ncpus));	if (priority_job->flags & JFLAGS_CHKPT_OK) {	    if (jobptr->flags & JFLAGS_CHKPT_OK) {		/* both jobs are over usage...  */	 	if (!strcmp(priority_job->oqueue, jobptr->oqueue) &&		    (priority_job->sort_order == jobptr->sort_order) &&		    (jobptr->flags & JFLAGS_RUNLIMIT) &&		    (strcmp(priority_job->owner, jobptr->owner))) {		    /* and from same oqueue with same sort order		    * and have different owners, and the target 		    * job is over its runlimit*/			; /* then proceed; otherwise skip the job */		} else 		    continue;	    } else 		continue;	}        if (jobptr->flags & JFLAGS_CHKPT_OK ||            jobptr->flags & JFLAGS_RUNLIMIT) {	  if (!(jobptr->flags & JFLAGS_PRIORITY) &&	      !(jobptr->flags & JFLAGS_WAITING)) {    	    if ((priority_job->flags & JFLAGS_PRIORITY) ||	        (priority_job->flags & JFLAGS_WAITING)  ||	        (priority_job->sort_order < jobptr->sort_order) ||	       ((priority_job->sort_order == jobptr->sort_order) &&		(strcmp(priority_job->owner, jobptr->owner)))) {                DBPRT(("CHK: %s would free %d cpus\n", jobptr->jobid,		    jobptr->ncpus));	        if (!schd_checkpoint_job(jobptr)) {		    /* Hummm- checkpoint of this job failed; better skip it and		     * retry the whole routine again; given that we may have already		     * checkpointed some jobs, the next time thru hopefully we will		     * be able to run our high-priority job. If not, then what?		     */		    jobptr->flags &= ~JFLAGS_CHKPT_OK;		    jobptr->flags |=  JFLAGS_CHKPTD;        	    sprintf(log_buffer,"WARNING: checkpoint error for %s, retrying...",		        jobptr->jobid);	            log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER,id,log_buffer);		    return(make_room_for_job(priority_job, qlist, reason));	        }	        ncpus_avail += jobptr->ncpus;            }        }      }       if (ncpus_avail >= priority_job->ncpus)	  break;    }    if (ncpus_avail < priority_job->ncpus) {	/* oops, didn't get enough cpus. Must have not been enough 	 * low priority jobs; But since we are in this part of the	 * routine, we *know* that there are enough jobs, so we need	 * to make a second pass, this time getting ANY non-priority	 * or non-waiting jobs.	 */        for (jobptr = best_queue->jobs;jobptr != NULL;jobptr=jobptr->next) {   	    DBPRT(("CHK2: Avail: %d of %d needed\n", ncpus_avail,	        priority_job->ncpus));	    if (!(jobptr->flags & JFLAGS_PRIORITY) &&	        !(jobptr->flags & JFLAGS_WAITING)  &&		(jobptr->state == 'R')) {		if ((priority_job->flags & JFLAGS_PRIORITY) ||		    (priority_job->flags & JFLAGS_WAITING)) {	            if (!schd_checkpoint_job(jobptr)) {		        /* Hummm- checkpoint of this job failed; better skip it		         * and retry the whole routine again; given that we may		         * have already checkpointed some jobs, the next time		         * thru hopefully we will be able to run our high-priority		         * job. If not, then what?		         */		        jobptr->flags &= ~JFLAGS_CHKPT_OK;		        jobptr->flags |=  JFLAGS_CHKPTD;		        DBPRT(("CHK:   chkpt error, retrying...\n"));		        return(make_room_for_job(priority_job, qlist, reason));	            }	            ncpus_avail += jobptr->ncpus;		} else {		    if ((priority_job->sort_order < jobptr->sort_order) ||	       	       ((priority_job->sort_order = jobptr->sort_order) &&			(strcmp(priority_job->owner, jobptr->owner)))) {	                if (!schd_checkpoint_job(jobptr)) {		            jobptr->flags &= ~JFLAGS_CHKPT_OK;		            jobptr->flags |=  JFLAGS_CHKPTD;		            DBPRT(("CHK:   chkpt error, retrying...\n"));		            return(make_room_for_job(priority_job, qlist, reason));	                }	                ncpus_avail += jobptr->ncpus;		    }		}	    }            if (ncpus_avail >= priority_job->ncpus)	        break;	}    }    /* If we still don't have enough cpus, then we have a real problem.     * Bail outta here, and hope real hard that the next iteration things     * will have improved.     */    if (ncpus_avail < priority_job->ncpus)	return(NULL);    /* otherwise, we have suspended/checkpointed/ or otherwise cleared space     * for this priority job; return a pointer to this queue so we can run     * this job     */    return(best_queue);}int schd_checkpoint_job(Job *job){    char *id = "checkpoint_job";    int ret = 0, done = 0;    if ( job->state == 'Q' ) {        sprintf(log_buffer,"WARNING: tried to checkpoint QUEUED job %s; WHY?",	   job->jobid);	log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER,id,log_buffer);	return (1);    }	    if ( 100.0 * job->time_left / job->walltime <= schd_SUSPEND_THRESHOLD ) {	/* Attempt to suspend the job */        ret = pbs_sigjob(connector, job->jobid, "suspend", NULL);        if (ret) {	    if (job_just_exited(ret, job))	        return(0); /* try again */	    else 	        sprintf(log_buffer,		    "suspend job %s FAILED (%d); trying checkpoint",		    job->jobid, ret);	} else {	    sprintf(log_buffer,"suspended job %s", job->jobid);	    done=1;	}	log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER,id,log_buffer);    }        if (done) {        schd_UNcharge_job(job, job->queue, job->queue->rsrcs);	schd_move_job_to(job, schd_SubmitQueue->queue);        return(done);    } else {        /* Attempt to checkpoint the job */        ret = pbs_holdjob(connector, job->jobid, "s", NULL);        if (ret) {	    if (job_just_exited(ret, job))	        return(0); /* try again */	    else	        sprintf(log_buffer,"checkpoint job %s FAILED (%d)",		    job->jobid, ret);	} else {	    /* DEBUG: there's a window of opportunity between the Server	     * ACK'ing the qhold, and MOM actually completing the the 	     * checkpoint. Bob is investigating this. Until its resolved,	     * we are disabling the forceable requeue of jobs (if both	     * suspend and checkpoint fail).	     */	    /*	     * if (schd_FORCE_REQUEUE)             *   pbs_rerunjob(connector, job->jobid, NULL);	     */	    sprintf(log_buffer,"checkpointed/requeued job %s", job->jobid);	    done=1;	}	log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER,id,log_buffer);    }    /* If we succeeded in stopping this job, then reduce the usage counters     * for this job's resources.     */    if (done) {        schd_UNcharge_job(job, job->queue, job->queue->rsrcs);	/* Move the job from its run queue back to o-queue. */ 	pbs_movejob(connector, job->jobid, job->oqueue, NULL);	schd_move_job_to(job, schd_SubmitQueue->queue);    }    /* in either case, release the hold on the job... */    pbs_rlsjob(connector, job->jobid, "s", NULL);    return(done);}int job_just_exited(int error_code, Job *job){    if (error_code == 15001 /* Unknown Job Id */) {        job->state = 'E';        schd_UNcharge_job(job, job->queue, job->queue->rsrcs);	return (1);    }    return (0);}int has_suspended_jobs(Queue *queue, Job *job){    Job    *jobptr, *nextjob;    int count = 0;    char   *id = "has_suspended_jobs";    for (jobptr = queue->jobs; jobptr != NULL; jobptr = nextjob) {	nextjob = jobptr->next;	if (jobptr->flags & JFLAGS_SUSPENDED) {	    if (!strcmp(jobptr->owner, job->owner)) {	        if (!strcmp(jobptr->oqueue, job->oqueue)) {	            count++;		}	    }	}    }    return(count);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -