⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 schedule.c

📁 openPBS的开放源代码
💻 C
📖 第 1 页 / 共 2 页
字号:
    /*     * Sort the list of jobs based on several criteria. The sorting routine     * returns a pointer to the new head of the list created by relinking     * the elements of the linked list, or NULL if an error occurs. Zero     * the original list pointer to reduce confusion -- the same list, in     * different order, now lives on schd_AllJobs.     */    schd_AllJobs = schd_sort_jobs(jobs);    jobs = NULL;    if (schd_AllJobs == NULL) {#ifdef DEBUG	(void)sprintf(log_buffer, 	    ">>> Sorting Failed - Cancel Scheduling Cycle <<<");	log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, log_buffer);#endif	DBPRT(("%s\n", log_buffer));	return (1);    }    /*     * Get the queue utilization for each queue. Any jobs on schd_AllJobs     * (set by sort_jobs() above) that belong to the queue will be placed     * on the queue->jobs list.     */    error = schd_get_queue_util();    if (error < 0) {	DBPRT(("get_all_queue_info() failed\n"));	return (1);	/* Bogus queue - don't recycle. */    } else if (error > 0) {	DBPRT(("queue failed sanity check - wait and recycle.\n"));	sleep(WAIT_FOR_QUEUE_SANITY);	return (0);	/* Attempt to recycle scheduler. */    }    /* Dump the list of jobs being scheduled from submit queue. */    if (schd_JOB_DUMPFILE) {	make_job_dump(schd_JOB_DUMPFILE);    }    if (schd_SubmitQueue->queue->jobs && 	!(schd_SubmitQueue->queue->flags & (QFLAGS_DISABLED | QFLAGS_STOPPED)))    {	total_ran = schd_pack_queues(schd_SubmitQueue->queue->jobs,	    schd_BatchQueues, reason);    }/* DEBUG: for Pat Wright at MSIC: print the resouces used each iteration *    if (total_ran > 0) { * */	(void)sprintf(log_buffer, "System resources after scheduling:");	log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, log_buffer);	schd_dump_rsrclist(); /*  * DEBUG: continuation of above test for Pat Wright    }  *  */    /* We need to save the Share Usage data peridoically, so that a restart     * of pbs_sched doesn't loose it.     */    if (schd_save_shares())		/* is it time yet? */        schd_share_info("w");		/* yes, so do it   */    (void)sprintf(log_buffer, ">>>  End Scheduling Cycle (ran %d jobs)  <<<",	total_ran);    log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, log_buffer);    DBPRT(("%s\n", log_buffer));    return (1);}static intschedule_restart(Job *joblist){    char   *id = "schedule_restart";    Job    *job, *nextjob;    QueueList *qptr;    int     found, changed;    changed = found = 0;    for (job = joblist; job != NULL; job = nextjob) {	nextjob = job->next;	if (job->state != 'Q')	    continue;	/* 	 * See if the job is queued on one of the batch queues.  If not,	 * go on to the next job.	 */	for (qptr = schd_BatchQueues; qptr != NULL; qptr = qptr->next)	    if (strcmp(qptr->queue->qname, job->qname) == 0)		break;	if (qptr == NULL)	    continue;	found++;	if (schd_SCHED_RESTART_ACTION == SCHD_RESTART_RERUN) {	    (void)sprintf(log_buffer, "Restart job '%s' on queue '%s'.",		job->jobid, job->qname);	    log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, 		id, log_buffer);	    DBPRT(("%s: %s\n", id, log_buffer));	    schd_comment_job(job, schd_JobMsg[JOB_RESTARTED],		JOB_COMMENT_REQUIRED);	    if (schd_run_job_on(job, job->queue, schd_SCHED_HOST, 		LEAVE_JOB_COMMENT)) 	    {		(void)sprintf(log_buffer,		    "Unable to run job '%s' on queue '%s'.", job->jobid,			job->qname);		log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id,		    log_buffer);	    } else		changed ++;	} else /* (SCHED_RESTART_ACTION == SCHD_RESTART_RESUBMIT) */ {	    /* Move the job back to its originating queue. */	    if (pbs_movejob(connector, job->jobid, job->oqueue, NULL) != 0) {		    (void)sprintf(log_buffer, 			"failed to move %s to queue %s, %d", job->jobid, 			job->oqueue, pbs_errno);		    log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, 			log_buffer);		    DBPRT(("%s: %s\n", id, log_buffer));	    } else {		    (void)sprintf(log_buffer, 			"Requeued job '%s' on queue '%s'.", job->jobid, 			job->oqueue);		    log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, 			id, log_buffer);		    DBPRT(("%s: %s\n", id, log_buffer));		    schd_comment_job(job, schd_JobMsg[JOB_RESUBMITTED],			JOB_COMMENT_REQUIRED);		    changed ++;	    }	}    }    if (found) {	if (schd_SCHED_RESTART_ACTION == SCHD_RESTART_RERUN) {	    (void)sprintf(log_buffer, 		"Re-ran %d jobs (of %d) found queued on run queues.\n", 		    changed, found);	} else {	    (void)sprintf(log_buffer, 		"Moved %d queued jobs (of %d) from run queues back to '%s'.\n", 		    changed, found, schd_SubmitQueue->queue->qname);	}	log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, log_buffer);	DBPRT(("%s: %s\n", id, log_buffer));    }    return (changed);}static int make_job_dump (char *dumpfile){    char    *id = "make_job_dump";    FILE    *dump;    QueueList *qptr;    (void)sprintf(log_buffer, "Sorted jobs: %s", schd_JOB_DUMPFILE);    log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, log_buffer);    /*      * Attempt to open the dump file, creating it if necessary.  It should     * be truncated each time this runs, so don't open with append mode.     */    if ((dump = fopen(dumpfile, "w")) == NULL) {	(void)sprintf(log_buffer, "Cannot write to %s: %s\n", dumpfile,	    strerror(errno));	log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, log_buffer);	DBPRT(("%s: %s\n", id, log_buffer));	return (-1);    }    /* Head the file with a timestamp. */    fprintf(dump, "%s\n", ctime(&schd_TimeNow));    /* Include the version string compiled into the scheduler binary. */    fprintf(dump, "%s\n", schd_VersionString);    /* And some more useful information about the state of the world. */    fprintf(dump, "Scheduler running on '%s'\n", schd_ThisHost);    fprintf(dump, "Prime-time is ");    if (schd_ENFORCE_PRIME_TIME) {	fprintf(dump, "from %s ", schd_sec2val(schd_PRIME_TIME_START));	fprintf(dump, "to %s.\n", schd_sec2val(schd_PRIME_TIME_END));    } else	fprintf(dump, "not enforced.\n");    fprintf(dump, "\nJOBS LISTED IN ORDER FROM HIGHEST TO LOWEST PRIORITY\n\n");    /* Now dump the jobs queued on the various queues, in order of sort_order. */    qptr = schd_SubmitQueue;    if (qptr->queue->jobs) {	fprintf(dump, "Jobs on submit queue '%s':\n", qptr->queue->qname);	dump_sorted_jobs (dump, qptr->queue->jobs);    }    if (fclose(dump)) {	(void)sprintf(log_buffer, "close(%s): %s\n", dumpfile, strerror(errno));	log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, log_buffer);	DBPRT(("%s: %s\n", id, log_buffer));	return (-1);    }    return (0);}static int dump_sorted_jobs (FILE *dump, Job *joblist){    Job     *job;    int      njobs, elig_mesg = 0;#define DUMP_PRIORITY		6#define DUMP_JID_LEN		10#define DUMP_STATE_LEN		1#define DUMP_QUEUE_LEN		8#define DUMP_OWNER_LEN		8#define DUMP_NODES_LEN		3#define DUMP_WALLT_LEN		8#define DUMP_ELIGI_LEN		9	/* time plus '*' if wait != eligible */#define DUMP_FLAGS_LEN		10    char     jid[DUMP_JID_LEN + 1];    char     queue[DUMP_QUEUE_LEN + 1];    char     owner[DUMP_OWNER_LEN + 1];    char     wallt[DUMP_WALLT_LEN + 1];    char     eligi[DUMP_ELIGI_LEN + 1];    char     flags[DUMP_FLAGS_LEN + 1];    fprintf(dump, " %*s %*s %*s %*s %*s %*s %*s %*s %*s\n",	-DUMP_PRIORITY,		"Pri",	-DUMP_JID_LEN,		"Job ID", 	-DUMP_STATE_LEN,	"S", 	-DUMP_OWNER_LEN,	"Owner", 	-DUMP_QUEUE_LEN,	"Queue", 	-DUMP_NODES_LEN,	"Nds", 	-DUMP_WALLT_LEN,	"Walltime", 	-DUMP_ELIGI_LEN,	"Eligible", 	-DUMP_FLAGS_LEN,	"Flags");    fprintf(dump, " %*s %*s %c %*s %*s %*s %*s %*s %*s\n",	-DUMP_PRIORITY,		"-----", 	-DUMP_JID_LEN,		"---------", 	'-', 	-DUMP_QUEUE_LEN,	"--------", 	-DUMP_OWNER_LEN,	"--------", 	-DUMP_NODES_LEN,	"---", 	-DUMP_WALLT_LEN,	"--------", 	-DUMP_ELIGI_LEN,	"---------", 	-DUMP_FLAGS_LEN,	"------");    for (njobs = 0, job = joblist; job != NULL; job = job->next) {	njobs++;	strncpy(jid, job->jobid, DUMP_JID_LEN);	strncpy(owner, job->owner, DUMP_OWNER_LEN);	strncpy(queue, job->oqueue, DUMP_QUEUE_LEN);	strcpy(wallt, schd_sec2val(job->walltime));	strcpy(eligi, schd_sec2val(job->eligible));	if (job->time_queued != job->eligible) {	    strcat(eligi, "*");	    elig_mesg ++;	}	flags[0] = '\0';	/* Watch length of 'flags[]' array! */	if (job->flags & JFLAGS_PRIORITY)	    strcat(flags, "High ");	if (job->flags & JFLAGS_PRIORITY)	    strcat(flags, "Wait ");	if (job->flags & JFLAGS_RUNLIMIT)	    strcat(flags, "RUNLIM");	if (job->flags & JFLAGS_CHKPT_OK)	    strcat(flags, "CKPTOK");	if (job->flags & JFLAGS_CHKPTD)	    strcat(flags, "CKPTD");	if (job->flags & JFLAGS_SUSPENDED)	    strcat(flags, "SUSP");	/* Trim off the trailing space if any flags were listed. */	if (flags[0] != '\0')	    flags[strlen(flags) - 1] = '\0';	fprintf(dump, " %*d %*s %c %*s %*s %*d %*s %*s %*s\n",	    -DUMP_PRIORITY,	job->sort_order,	    -DUMP_JID_LEN,	jid,	    job->state,	    -DUMP_OWNER_LEN,	owner,	    -DUMP_QUEUE_LEN,	queue,	    -DUMP_NODES_LEN,	job->ncpus,	    -DUMP_WALLT_LEN,	wallt,	    -DUMP_ELIGI_LEN,	eligi,	    -DUMP_FLAGS_LEN,	flags);    }    fprintf(dump, "    Total: %d job%s\n\n", njobs, (njobs == 1) ? "" : "s");    if (elig_mesg) {	fprintf(dump, "Jobs marked with a ``*'' have an etime different "	    "from their ctime.\n\n");    }    return (njobs);}int schd_get_max_ncpus(void){    char *id = "get_max_ncpus";    Batch_Status *bs, *bsp;    AttrList *attr;    static AttrList alist[] = {{NULL, ATTR_rescmax, "", ""}};    int ret = 0;    /* Query the server for status of the max ncpus attribute */    if ((bs = pbs_statserver(connector, alist, NULL)) == NULL) {	sprintf(log_buffer, "pbs_statserver failed: %d", pbs_errno);	log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_SERVER, id, log_buffer);	DBPRT(("%s: %s\n", id, log_buffer));	return (ret);    }    /* Process the list of attributes returned by the server. */    for (bsp = bs; bsp != NULL; bsp = bsp->next) {        for (attr = bsp->attribs; attr != NULL; attr = attr->next) {	    if (!strcmp(attr->name, "resources_max")) {	        if (!strcmp(attr->resource, "ncpus")) {		    ret = atoi(attr->value);		    break;		}	    }        }    }    pbs_statfree(bs);    return (ret);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -