📄 tm.c
字号:
/** OpenPBS (Portable Batch System) v2.3 Software License* * Copyright (c) 1999-2000 Veridian Information Solutions, Inc.* All rights reserved.* * ---------------------------------------------------------------------------* For a license to use or redistribute the OpenPBS software under conditions* other than those described below, or to purchase support for this software,* please contact Veridian Systems, PBS Products Department ("Licensor") at:* * www.OpenPBS.org +1 650 967-4675 sales@OpenPBS.org* 877 902-4PBS (US toll-free)* ---------------------------------------------------------------------------* * This license covers use of the OpenPBS v2.3 software (the "Software") at* your site or location, and, for certain users, redistribution of the* Software to other sites and locations. Use and redistribution of* OpenPBS v2.3 in source and binary forms, with or without modification,* are permitted provided that all of the following conditions are met.* After December 31, 2001, only conditions 3-6 must be met:* * 1. Commercial and/or non-commercial use of the Software is permitted* provided a current software registration is on file at www.OpenPBS.org.* If use of this software contributes to a publication, product, or* service, proper attribution must be given; see www.OpenPBS.org/credit.html* * 2. Redistribution in any form is only permitted for non-commercial,* non-profit purposes. There can be no charge for the Software or any* software incorporating the Software. Further, there can be no* expectation of revenue generated as a consequence of redistributing* the Software.* * 3. Any Redistribution of source code must retain the above copyright notice* and the acknowledgment contained in paragraph 6, this list of conditions* and the disclaimer contained in paragraph 7.* * 4. Any Redistribution in binary form must reproduce the above copyright* notice and the acknowledgment contained in paragraph 6, this list of* conditions and the disclaimer contained in paragraph 7 in the* documentation and/or other materials provided with the distribution.* * 5. Redistributions in any form must be accompanied by information on how to* obtain complete source code for the OpenPBS software and any* modifications and/or additions to the OpenPBS software. The source code* must either be included in the distribution or be available for no more* than the cost of distribution plus a nominal fee, and all modifications* and additions to the Software must be freely redistributable by any party* (including Licensor) without restriction.* * 6. All advertising materials mentioning features or use of the Software must* display the following acknowledgment:* * "This product includes software developed by NASA Ames Research Center,* Lawrence Livermore National Laboratory, and Veridian Information * Solutions, Inc.* Visit www.OpenPBS.org for OpenPBS software support,* products, and information."* * 7. DISCLAIMER OF WARRANTY* * THIS SOFTWARE IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND. ANY EXPRESS* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT* ARE EXPRESSLY DISCLAIMED.* * IN NO EVENT SHALL VERIDIAN CORPORATION, ITS AFFILIATED COMPANIES, OR THE* U.S. GOVERNMENT OR ANY OF ITS AGENCIES BE LIABLE FOR ANY DIRECT OR INDIRECT,* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,* OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,* EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.* * This license will be governed by the laws of the Commonwealth of Virginia,* without reference to its choice of law rules.*/#include <pbs_config.h> /* the master config generated by configure */#include <stdio.h>#include <stdlib.h>#include <unistd.h>#include <limits.h>#include <fcntl.h>#include <netdb.h>#include <string.h>#include <errno.h>#include <assert.h>#include <sys/types.h>#include <sys/socket.h>#include <sys/time.h>#include <netinet/in.h>#include <netdb.h>#include "dis.h"#include "dis_init.h"#include "tm.h"#include "net_connect.h"static char ident[] = "@(#) $RCSfile: tm.c,v $ $Revision: 2.1.10.6 $";/*** Set up a debug print macro.*/#ifdef DEBUG#define DBPRT(x) \{ \ int err = errno; \ printf x; \ errno = err; \}#define DOID(x) static char id[] = x;#else#define DBPRT(x)#define DOID(x)#endif#ifndef MIN#define MIN(a, b) (((a) < (b)) ? (a) : (b))#endif/*** Allocate some string space to hold the values passed in the** enviornment from MOM.*/static char *tm_jobid = NULL;static int tm_jobid_len = 0;static char *tm_jobcookie = NULL;static int tm_jobcookie_len = 0;static tm_task_id tm_jobtid = TM_NULL_TASK;static tm_node_id tm_jobndid = TM_ERROR_NODE;static int tm_momport = 0;static int local_conn = -1;static int init_done = 0;void DIS_tcp_funcs();/*** Events are the central focus of this library. They are tracked** in a hash table. Many of the library calls return events. They** are recorded and as information is received from MOM's, the** event is updated and marked so tm_poll() can return it to the user.*/#define EVENT_HASH 128typedef struct event_info { tm_event_t e_event; /* event number */ tm_node_id e_node; /* destination node */ int e_mtype; /* message type sent */ void *e_info; /* possible returned info */ struct event_info *e_next; /* link to next event */ struct event_info *e_prev; /* link to prev event */} event_info;static event_info *event_hash[EVENT_HASH];static int event_count = 0;/*** Find an event number or return a NULL.*/static event_info *find_event(x) tm_event_t x;{ event_info *ep; for (ep=event_hash[x % EVENT_HASH]; ep; ep=ep->e_next) { if (ep->e_event == x) break; } return ep;}/*** Delete an event.*/static voiddel_event(ep) event_info *ep;{ /* unlink event from hash list */ if (ep->e_prev) ep->e_prev->e_next = ep->e_next; else event_hash[ep->e_event % EVENT_HASH] = ep->e_next; if (ep->e_next) ep->e_next->e_prev = ep->e_prev; /* ** Free any memory saved with the event. This depends ** on whay type of event it is. */ switch (ep->e_mtype) { case TM_INIT: case TM_SPAWN: case TM_SIGNAL: case TM_OBIT: case TM_POSTINFO: break; case TM_TASKS: case TM_GETINFO: case TM_RESOURCES: free(ep->e_info); break; default: DBPRT(("del_event: unknown event command %d\n", ep->e_mtype)) break; } free(ep); if (--event_count == 0) { close(local_conn); local_conn = -1; } return;}/*** Create a new event number.*/static tm_event_tnew_event(){ static tm_event_t next_event = TM_NULL_EVENT+1; event_info *ep; tm_event_t ret; if (next_event == INT_MAX) next_event = TM_NULL_EVENT+1; for (;;) { ret = next_event++; for (ep=event_hash[ret % EVENT_HASH]; ep; ep=ep->e_next) { if (ep->e_event == ret) break; /* innter loop: this number is in use */ } if (ep == NULL) break; /* this number is not in use */ } return ret;}/*** Link new event number into the above hash table.*/static voidadd_event(event, node, type, info) tm_event_t event; tm_node_id node; int type; void *info;{ event_info *ep, **head; ep = (event_info *)malloc(sizeof(event_info)); assert(ep != NULL); head = &event_hash[event % EVENT_HASH]; ep->e_event = event; ep->e_node = node; ep->e_mtype = type; ep->e_info = info; ep->e_next = *head; ep->e_prev = NULL; if (*head) (*head)->e_prev = ep; *head = ep; event_count++; return;}/*** Sessions must be tracked by the library so tm_taskid objects** can be resolved into real tasks on real nodes.** We will use a hash table.*/#define TASK_HASH 256typedef struct task_info { char *t_jobid; /* jobid */ tm_task_id t_task; /* task id */ tm_node_id t_node; /* node id */ struct task_info *t_next; /* link to next task */} task_info;static task_info *task_hash[TASK_HASH];/*** Find a task table entry for a given task number or return a NULL.*/static task_info *find_task(x) tm_task_id x;{ task_info *tp; for (tp=task_hash[x % TASK_HASH]; tp; tp=tp->t_next) { if (tp->t_task == x) break; } return tp;}/*** Create a new task entry and link it into the above hash** table.*/static tm_task_idnew_task(jobid, node, task) char *jobid; tm_node_id node; tm_task_id task;{ DOID("new_task") task_info *tp, **head; DBPRT(("%s: jobid=%s node=%d task=%lu\n", id, jobid, node, (unsigned long)task)) if (jobid != tm_jobid && strcmp(jobid, tm_jobid) != 0) { DBPRT(("%s: task job %s not my job %s\n", id, jobid, tm_jobid)) return TM_NULL_TASK; } if (node == TM_ERROR_NODE) { DBPRT(("%s: called with TM_ERROR_NODE\n", id)) return TM_NULL_TASK; } if ((tp = find_task(task)) != NULL) { DBPRT(("%s: task %lu found with node %d should be %d\n", id, (unsigned long)task, tp->t_node, node)) return task; } if ((tp = (task_info *)malloc(sizeof(task_info))) == NULL) return TM_NULL_TASK; head = &task_hash[task % TASK_HASH]; tp->t_jobid = tm_jobid; tp->t_task = task; tp->t_node = node; tp->t_next = *head; *head = tp; return task;}/*** Delete a task.====== right now, this is not used.===static voiddel_task(x) tm_task_id x;{ task_info *tp, *prev; prev = NULL; for (tp=task_hash[x % TASK_HASH]; tp; prev=tp, tp=tp->t_next) { if (tp->t_task == x) break; } if (tp) { if (prev) prev->t_next = tp->t_next; else task_hash[x % TASK_HASH] = tp->t_next; tp->t_next = NULL; if (tp->t_jobid != tm_jobid) free(tp->t_jobid); free(tp); } return;}*//*** The nodes are tracked in an array.*/static tm_node_id *node_table = NULL;/*** localmom() - make a connection to the local pbs_mom**** The connection will remain open as long as there is an** outstanding event.*/#define PBS_NET_RC_FATAL -1#define PBS_NET_RC_RETRY -2static intlocalmom(){ static int have_addr = 0; static struct in_addr hostaddr; struct hostent *hp; int i; struct sockaddr_in remote; int sock; if (local_conn >= 0) return local_conn; /* already have open connection */ if (have_addr == 0) { /* lookup "localhost" and save address */ if ((hp = gethostbyname("localhost")) == NULL) { DBPRT(("tm_init: localhost not found\n")) return -1; } assert(hp->h_length <= sizeof(hostaddr)); memcpy(&hostaddr, hp->h_addr_list[0], hp->h_length); have_addr = 1; } for (i=0; i<5; i++) { /* get socket */ sock = socket(AF_INET, SOCK_STREAM, 0); if (sock < 0) return -1; /* connect to specified local pbs_mom and port */ remote.sin_addr = hostaddr; remote.sin_port = htons((unsigned short)tm_momport); remote.sin_family = AF_INET; if (connect(sock,(struct sockaddr *)&remote,sizeof(remote))<0) { switch (errno) { case EINTR: case EADDRINUSE: case ETIMEDOUT: case ECONNREFUSED: (void)close(sock); sleep(1); continue; default: (void)close(sock); return -1; } } else { local_conn = sock; break; } } DIS_tcp_setup(local_conn); return (local_conn);}/*** startcom() - send request header to local pbs_mom.** If required, make connection to her.*/static intstartcom(com, event) int com; tm_event_t event;{ int ret; if (localmom() == -1) return -1; ret = diswsi(local_conn, TM_PROTOCOL); if (ret != DIS_SUCCESS) goto done; ret = diswsi(local_conn, TM_PROTOCOL_VER); if (ret != DIS_SUCCESS) goto done; ret = diswcs(local_conn, tm_jobid, tm_jobid_len); if (ret != DIS_SUCCESS) goto done; ret = diswcs(local_conn, tm_jobcookie, tm_jobcookie_len); if (ret != DIS_SUCCESS) goto done; ret = diswsi(local_conn, com); if (ret != DIS_SUCCESS) goto done; ret = diswsi(local_conn, event); if (ret != DIS_SUCCESS) goto done; ret = diswui(local_conn, tm_jobtid); if (ret != DIS_SUCCESS) goto done; return DIS_SUCCESS; done: DBPRT(("startcom: send error %s\n", dis_emsg[ret])) close(local_conn); local_conn = -1; return ret;}/*** Initialize the Task Manager interface.*/inttm_init(info, roots) void *info; /* in, currently unused */ struct tm_roots *roots; /* out */{ tm_event_t nevent, revent; char *env, *hold; int err; int nerr = 0; if (init_done) return TM_BADINIT; if ((tm_jobid = getenv("PBS_JOBID")) == NULL) return TM_EBADENVIRONMENT; tm_jobid_len = strlen(tm_jobid); if ((tm_jobcookie = getenv("PBS_JOBCOOKIE")) == NULL) return TM_EBADENVIRONMENT; tm_jobcookie_len = strlen(tm_jobcookie); if ((env = getenv("PBS_NODENUM")) == NULL) return TM_EBADENVIRONMENT; tm_jobndid = (tm_node_id)strtol(env, &hold, 10); if (env == hold) return TM_EBADENVIRONMENT; if ((env = getenv("PBS_TASKNUM")) == NULL) return TM_EBADENVIRONMENT; if ((tm_jobtid = atoi(env)) == 0) return TM_EBADENVIRONMENT; if ((env = getenv("PBS_MOMPORT")) == NULL) return TM_EBADENVIRONMENT; if ((tm_momport = atoi(env)) == 0) return TM_EBADENVIRONMENT; init_done = 1; nevent = new_event(); /* * send the following request: * header (tm_init) * int node number * int task number */ if (startcom(TM_INIT, nevent) != DIS_SUCCESS) return TM_ESYSTEM; DIS_tcp_wflush(local_conn); add_event(nevent, TM_ERROR_NODE, TM_INIT, (void *)roots); if ((err = tm_poll(TM_NULL_EVENT, &revent, 1, &nerr)) != TM_SUCCESS) return err; return nerr;}/*** Copy out node info. No communication with pbs_mom is needed.*/inttm_nodeinfo(list, nnodes) tm_node_id **list; int *nnodes;{ tm_node_id *np; int i; int n = 0; if (!init_done) return TM_BADINIT; if (node_table == NULL) return TM_ESYSTEM; for (np=node_table; *np != TM_ERROR_NODE; np++) n++; /* how many nodes */ np = (tm_node_id *)calloc(n, sizeof(tm_node_id)); for (i=0; i<n; i++) np[i] = node_table[i]; *list = np; *nnodes = i; return TM_SUCCESS;}/*** Starts <argv>[0] with environment <envp> at <where>.*/inttm_spawn(argc, argv, envp, where, tid, event) int argc; /* in */ char **argv; /* in */ char **envp; /* in */ tm_node_id where; /* in */ tm_task_id *tid; /* out */ tm_event_t *event; /* out */{ char *cp; int i; if (!init_done) return TM_BADINIT; if (argc <= 0 || argv == NULL || argv[0] == NULL || *argv[0] == '\0') return TM_ENOTFOUND; *event = new_event(); if (startcom(TM_SPAWN, *event) != DIS_SUCCESS) return TM_ENOTCONNECTED; if (diswsi(local_conn, where) != DIS_SUCCESS) /* send where */ return TM_ENOTCONNECTED; if (diswsi(local_conn, argc) != DIS_SUCCESS) /* send argc */ return TM_ENOTCONNECTED; /* send argv strings across */ for (i=0; i < argc; i++) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -