📄 tm.c
字号:
cp = argv[i]; if (diswcs(local_conn, cp, strlen(cp)) != DIS_SUCCESS) return TM_ENOTCONNECTED; } /* send envp strings across */ if (envp != NULL) { for (i=0; (cp = envp[i]) != NULL; i++) { if (diswcs(local_conn, cp, strlen(cp)) != DIS_SUCCESS) return TM_ENOTCONNECTED; } } if (diswcs(local_conn, "", 0) != DIS_SUCCESS) return TM_ENOTCONNECTED; DIS_tcp_wflush(local_conn); add_event(*event, where, TM_SPAWN, (void *)tid); return TM_SUCCESS;}/*** Sends a <sig> signal to all the process groups in the task** signified by the handle, <tid>.*/inttm_kill(tid, sig, event) tm_task_id tid; /* in */ int sig; /* in */ tm_event_t *event; /* out */{ task_info *tp; if (!init_done) return TM_BADINIT; if ((tp = find_task(tid)) == NULL) return TM_ENOTFOUND; *event = new_event(); if (startcom(TM_SIGNAL, *event) != DIS_SUCCESS) return TM_ENOTCONNECTED; if (diswsi(local_conn, tp->t_node) != DIS_SUCCESS) return TM_ENOTCONNECTED; if (diswsi(local_conn, tid) != DIS_SUCCESS) return TM_ENOTCONNECTED; if (diswsi(local_conn, sig) != DIS_SUCCESS) return TM_ENOTCONNECTED; DIS_tcp_wflush(local_conn); add_event(*event, tp->t_node, TM_SIGNAL, NULL); return TM_SUCCESS;}/*** Returns an event that can be used to learn when a task** dies.*/inttm_obit(tid, obitval, event) tm_task_id tid; /* in */ int *obitval; /* out */ tm_event_t *event; /* out */{ task_info *tp; if (!init_done) return TM_BADINIT; if ((tp = find_task(tid)) == NULL) return TM_ENOTFOUND; *event = new_event(); if (startcom(TM_OBIT, *event) != DIS_SUCCESS) return TM_ESYSTEM; if (diswsi(local_conn, tp->t_node) != DIS_SUCCESS) return TM_ESYSTEM; if (diswsi(local_conn, tid) != DIS_SUCCESS) return TM_ESYSTEM; DIS_tcp_wflush(local_conn); add_event(*event, tp->t_node, TM_OBIT, (void *)obitval); return TM_SUCCESS;}struct taskhold { tm_task_id *list; int size; int *ntasks;};/*** Makes a request for the list of tasks on <node>. If <node>** is a valid node number, it returns the event that the list of** tasks on <node> is available.*/inttm_taskinfo(node, tid_list, list_size, ntasks, event) tm_node_id node; /* in */ tm_task_id *tid_list; /* out */ int list_size; /* in */ int *ntasks; /* out */ tm_event_t *event; /* out */{ struct taskhold *thold; if (!init_done) return TM_BADINIT; if (tid_list == NULL || list_size == 0 || ntasks == NULL) return TM_EBADENVIRONMENT; *event = new_event(); if (startcom(TM_TASKS, *event) != DIS_SUCCESS) return TM_ESYSTEM; if (diswsi(local_conn, node) != DIS_SUCCESS) return TM_ESYSTEM; DIS_tcp_wflush(local_conn); thold = (struct taskhold *)malloc(sizeof(struct taskhold)); assert(thold != NULL); thold->list = tid_list; thold->size = list_size; thold->ntasks = ntasks; add_event(*event, node, TM_TASKS, (void *)thold); return TM_SUCCESS;}/*** Returns the job-relative node number that holds or held <tid>. In** case of an error, it returns TM_ERROR_NODE.*/inttm_atnode(tid, node) tm_task_id tid; /* in */ tm_node_id *node; /* out */{ task_info *tp; if (!init_done) return TM_BADINIT; if ((tp = find_task(tid)) == NULL) return TM_ENOTFOUND; *node = tp->t_node; return TM_SUCCESS;}struct reschold { char *resc; int len;};/*** Makes a request for a string specifying the resources** available on <node>. If <node> is a valid node number, it** returns the event that the string specifying the resources on** <node> is available. It returns ERROR_EVENT otherwise.*/inttm_rescinfo(node, resource, len, event) tm_node_id node; /* in */ char *resource; /* out */ int len; /* in */ tm_event_t *event; /* out */{ struct reschold *rhold; if (!init_done) return TM_BADINIT; if (resource == NULL || len == 0) return TM_EBADENVIRONMENT; *event = new_event(); if (startcom(TM_RESOURCES, *event) != DIS_SUCCESS) return TM_ESYSTEM; if (diswsi(local_conn, node) != DIS_SUCCESS) return TM_ESYSTEM; DIS_tcp_wflush(local_conn); rhold = (struct reschold *)malloc(sizeof(struct reschold)); assert(rhold != NULL); rhold->resc = resource; rhold->len = len; add_event(*event, node, TM_RESOURCES, (void *)rhold); return TM_SUCCESS;}/*** Posts the first <nbytes> of a copy of *<info> within MOM on** this node, and associated with this task. If <info> is** non-NULL, it returns the event that the effort to post *<info>** is complete. It returns ERROR_EVENT otherwise.*/inttm_publish(name, info, len, event) char *name; /* in */ void *info; /* in */ int len; /* in */ tm_event_t *event; /* out */{ if (!init_done) return TM_BADINIT; *event = new_event(); if (startcom(TM_POSTINFO, *event) != DIS_SUCCESS) return TM_ESYSTEM; if (diswst(local_conn, name) != DIS_SUCCESS) return TM_ESYSTEM; if (diswcs(local_conn, info, len) != DIS_SUCCESS) return TM_ESYSTEM; DIS_tcp_wflush(local_conn); add_event(*event, TM_ERROR_NODE, TM_POSTINFO, NULL); return TM_SUCCESS;}struct infohold { void *info; int len; int *info_len;};/*** Makes a request for a copy of the info posted by <tid>. If** <tid> is a valid task, it returns the event that the** string specifying the info posted by <tid> is available.*/inttm_subscribe(tid, name, info, len, info_len, event) tm_task_id tid; /* in */ char *name; /* in */ void *info; /* out */ int len; /* in */ int *info_len; /* out */ tm_event_t *event; /* out */{ task_info *tp; struct infohold *ihold; if (!init_done) return TM_BADINIT; if ((tp = find_task(tid)) == NULL) return TM_ENOTFOUND; *event = new_event(); if (startcom(TM_GETINFO, *event) != DIS_SUCCESS) return TM_ESYSTEM; if (diswsi(local_conn, tp->t_node) != DIS_SUCCESS) return TM_ESYSTEM; if (diswsi(local_conn, tid) != DIS_SUCCESS) return TM_ESYSTEM; if (diswst(local_conn, name) != DIS_SUCCESS) return TM_ESYSTEM; DIS_tcp_wflush(local_conn); ihold = (struct infohold *)malloc(sizeof(struct infohold)); assert(ihold != NULL); ihold->info = info; ihold->len = len; ihold->info_len = info_len; add_event(*event, tp->t_node, TM_GETINFO, (void *)ihold); return TM_SUCCESS;}/*** tm_finalize() - close out task manager interface**** This function should be the last one called. It is illegal to call** any other task manager function following this one. All events are** freed and any connection to the task manager (pbs_mom) is closed.** This call is synchronous.*/inttm_finalize(){ event_info *e; int i = 0; if (!init_done) return TM_BADINIT; while (event_count && (i < EVENT_HASH)) { while ((e = event_hash[i]) != NULL) { del_event(e); } ++i; /* check next slot in hash table */ } init_done = 0; return TM_SUCCESS; /* what else */}/*** tm_notify() - set the signal to be sent on event arrival.*/inttm_notify(tm_signal) int tm_signal;{ if (!init_done) return TM_BADINIT; return TM_ENOTIMPLEMENTED;}/*** tm_alloc() - make a request for additional resources.*/inttm_alloc(resources, event) char *resources; tm_event_t *event;{ if (!init_done) return TM_BADINIT; return TM_ENOTIMPLEMENTED;}/*** tm_dealloc() - drop a node from the job.*/inttm_dealloc(node, event) tm_node_id node; tm_event_t *event;{ if (!init_done) return TM_BADINIT; return TM_ENOTIMPLEMENTED;}/*** tm_create_event() - create a persistent event.*/inttm_create_event(event) tm_event_t *event;{ if (!init_done) return TM_BADINIT; return TM_ENOTIMPLEMENTED;}/*** tm_destroy_event() - destroy a persistent event.*/inttm_destroy_event(event) tm_event_t *event;{ if (!init_done) return TM_BADINIT; return TM_ENOTIMPLEMENTED;}/*** tm_register() - link a persistent event with action requests** from the task manager.*/inttm_register(what, event) tm_whattodo_t *what; tm_event_t *event;{ if (!init_done) return TM_BADINIT; return TM_ENOTIMPLEMENTED;}#define FOREVER 2592000/*** tm_poll - poll to see if an event has be completed.**** If "poll_event" is a valid event handle, see if it is completed;** else if "poll_event" is the null event, check for the first event that** is completed.**** result_event is set to the completed event or the null event.**** If wait is non_zero, wait for "poll_event" to be completed.**** If an error ocurs, set tm_errno non-zero.*/inttm_poll(poll_event, result_event, wait, tm_errno) tm_event_t poll_event; tm_event_t *result_event; int wait; int *tm_errno;{ DOID("tm_poll") int num, i; int ret, mtype, nnodes; int prot, protver; int *obitvalp; event_info *ep = NULL; tm_task_id tid, *tidp; tm_event_t nevent; tm_node_id node; char *jobid; char *info; struct tm_roots *roots; struct taskhold *thold; struct infohold *ihold; struct reschold *rhold; extern time_t pbs_tcp_timeout; if (!init_done) return TM_BADINIT; if (result_event == NULL) return TM_EBADENVIRONMENT; *result_event = TM_ERROR_EVENT; if (poll_event != TM_NULL_EVENT) return TM_ENOTIMPLEMENTED; if (tm_errno == NULL) return TM_EBADENVIRONMENT; if (event_count == 0) { DBPRT(("%s: no events waiting\n", id)) return TM_ENOTFOUND; } if (local_conn < 0) { DBPRT(("%s: INTERNAL ERROR %d events but no connection\n", id, event_count)) return TM_ENOTCONNECTED; } /* ** Setup tcp dis routines with a wait value appropriate for ** the value of wait the user set. */ pbs_tcp_timeout = wait ? FOREVER : 0; DIS_tcp_funcs(); prot = disrsi(local_conn, &ret); if (ret == DIS_EOD) { *result_event = TM_NULL_EVENT; return TM_SUCCESS; } else if (ret != DIS_SUCCESS) { DBPRT(("%s: protocol number dis error %d\n", id, ret)) goto err; } if (prot != TM_PROTOCOL) { DBPRT(("%s: bad protocol number %d\n", id, prot)) goto err; } /* ** We have seen the start of a message. Set the timeout value ** so we wait for the remaining data of a message. */ pbs_tcp_timeout = FOREVER; protver = disrsi(local_conn, &ret); if (ret != DIS_SUCCESS) { DBPRT(("%s: protocol version dis error %d\n", id, ret)) goto err; } if (protver != TM_PROTOCOL_VER) { DBPRT(("%s: bad protocol version %d\n", id, protver)) goto err; } mtype = disrsi(local_conn, &ret); if (ret != DIS_SUCCESS) { DBPRT(("%s: mtype dis error %d\n", id, ret)) goto err; } nevent = disrsi(local_conn, &ret); if (ret != DIS_SUCCESS) { DBPRT(("%s: event dis error %d\n", id, ret)) goto err; } *result_event = nevent; DBPRT(("%s: got event %d return %d\n", id, nevent, mtype)) if ((ep = find_event(nevent)) == NULL) { DBPRT(("%s: No event found for number %d\n", id, nevent)); (void)close(local_conn); local_conn = -1; return TM_ENOEVENT; } if (mtype == TM_ERROR) { /* problem, read error num */ *tm_errno = disrsi(local_conn, &ret); DBPRT(("%s: event %d error %d\n", id, nevent, *tm_errno)); goto done; } *tm_errno = TM_SUCCESS; switch (ep->e_mtype) { /* ** auxiliary info ( ** number of nodes int; ** nodeid[0] int; ** ... ** nodeid[n-1] int; ** parent jobid string; ** parent nodeid int; ** parent taskid int; ** ) */ case TM_INIT: nnodes = disrsi(local_conn, &ret); if (ret != DIS_SUCCESS) { DBPRT(("%s: INIT failed nnodes\n", id)) goto err; } node_table = (tm_node_id *)calloc(nnodes+1, sizeof(tm_node_id)); DBPRT(("%s: INIT nodes %d\n", id, nnodes)) for (i=0; i<nnodes; i++) { node_table[i] = disrsi(local_conn, &ret); if (ret != DIS_SUCCESS) { DBPRT(("%s: INIT failed nodeid %d\n", id, i)) goto err; } } node_table[nnodes] = TM_ERROR_NODE; jobid = disrst(local_conn, &ret); if (ret != DIS_SUCCESS) { DBPRT(("%s: INIT failed jobid\n", id)) goto err; } DBPRT(("%s: INIT daddy jobid %s\n", id, jobid)) node = disrsi(local_conn, &ret); if (ret != DIS_SUCCESS) { DBPRT(("%s: INIT failed parent nodeid\n", id)) goto err; } DBPRT(("%s: INIT daddy node %d\n", id, node)) tid = disrsi(local_conn, &ret); if (ret != DIS_SUCCESS) { DBPRT(("%s: INIT failed parent taskid\n", id)) goto err; } DBPRT(("%s: INIT daddy tid %lu\n", id, (unsigned long)tid)) roots = (struct tm_roots *)ep->e_info; roots->tm_parent = new_task(jobid, node, tid); roots->tm_me = new_task(tm_jobid, tm_jobndid, tm_jobtid); roots->tm_nnodes = nnodes; roots->tm_ntasks = 0; /* TODO */ roots->tm_taskpoolid = -1; /* what? */ roots->tm_tasklist = NULL; /* TODO */ break; case TM_TASKS: thold = (struct taskhold *)ep->e_info; tidp = thold->list; num = thold->size; for (i=0;; i++) { tid = disrsi(local_conn, &ret); if (tid == TM_NULL_TASK) break; if (ret != DIS_SUCCESS) goto err; if (i < num) { tidp[i] = new_task(tm_jobid, ep->e_node, tid); } } if (i < num) tidp[i] = TM_NULL_TASK; *(thold->ntasks) = i; break; case TM_SPAWN: tid = disrsi(local_conn, &ret); if (ret != DIS_SUCCESS) { DBPRT(("%s: SPAWN failed tid\n", id)) goto err; } tidp = (tm_task_id *)ep->e_info; *tidp = new_task(tm_jobid, ep->e_node, tid); break; case TM_SIGNAL: break; case TM_OBIT: obitvalp = (int *)ep->e_info; *obitvalp = disrsi(local_conn, &ret); if (ret != DIS_SUCCESS) { DBPRT(("%s: OBIT failed obitval\n", id)) goto err; } break; case TM_POSTINFO: break; case TM_GETINFO: ihold = (struct infohold *)ep->e_info; info = disrcs(local_conn, (size_t *)ihold->info_len, &ret); if (ret != DIS_SUCCESS) { DBPRT(("%s: GETINFO failed info\n", id)) break; } memcpy(ihold->info, info, MIN(*ihold->info_len, ihold->len)); free(info); break; case TM_RESOURCES: rhold = (struct reschold *)ep->e_info; info = disrst(local_conn, &ret); if (ret != DIS_SUCCESS) break; strncpy(rhold->resc, info, rhold->len); free(info); break; default: DBPRT(("%s: unknown event command %d\n", id, ep->e_mtype)) goto err; }done: del_event(ep); return TM_SUCCESS;err: if (ep) del_event(ep); (void)close(local_conn); local_conn = -1; return TM_ENOTCONNECTED;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -