⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tm.c

📁 openPBS的开放源代码
💻 C
📖 第 1 页 / 共 2 页
字号:
/**         OpenPBS (Portable Batch System) v2.3 Software License* * Copyright (c) 1999-2000 Veridian Information Solutions, Inc.* All rights reserved.* * ---------------------------------------------------------------------------* For a license to use or redistribute the OpenPBS software under conditions* other than those described below, or to purchase support for this software,* please contact Veridian Systems, PBS Products Department ("Licensor") at:* *    www.OpenPBS.org  +1 650 967-4675                  sales@OpenPBS.org*                        877 902-4PBS (US toll-free)* ---------------------------------------------------------------------------* * This license covers use of the OpenPBS v2.3 software (the "Software") at* your site or location, and, for certain users, redistribution of the* Software to other sites and locations.  Use and redistribution of* OpenPBS v2.3 in source and binary forms, with or without modification,* are permitted provided that all of the following conditions are met.* After December 31, 2001, only conditions 3-6 must be met:* * 1. Commercial and/or non-commercial use of the Software is permitted*    provided a current software registration is on file at www.OpenPBS.org.*    If use of this software contributes to a publication, product, or*    service, proper attribution must be given; see www.OpenPBS.org/credit.html* * 2. Redistribution in any form is only permitted for non-commercial,*    non-profit purposes.  There can be no charge for the Software or any*    software incorporating the Software.  Further, there can be no*    expectation of revenue generated as a consequence of redistributing*    the Software.* * 3. Any Redistribution of source code must retain the above copyright notice*    and the acknowledgment contained in paragraph 6, this list of conditions*    and the disclaimer contained in paragraph 7.* * 4. Any Redistribution in binary form must reproduce the above copyright*    notice and the acknowledgment contained in paragraph 6, this list of*    conditions and the disclaimer contained in paragraph 7 in the*    documentation and/or other materials provided with the distribution.* * 5. Redistributions in any form must be accompanied by information on how to*    obtain complete source code for the OpenPBS software and any*    modifications and/or additions to the OpenPBS software.  The source code*    must either be included in the distribution or be available for no more*    than the cost of distribution plus a nominal fee, and all modifications*    and additions to the Software must be freely redistributable by any party*    (including Licensor) without restriction.* * 6. All advertising materials mentioning features or use of the Software must*    display the following acknowledgment:* *     "This product includes software developed by NASA Ames Research Center,*     Lawrence Livermore National Laboratory, and Veridian Information *     Solutions, Inc.*     Visit www.OpenPBS.org for OpenPBS software support,*     products, and information."* * 7. DISCLAIMER OF WARRANTY* * THIS SOFTWARE IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND. ANY EXPRESS* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT* ARE EXPRESSLY DISCLAIMED.* * IN NO EVENT SHALL VERIDIAN CORPORATION, ITS AFFILIATED COMPANIES, OR THE* U.S. GOVERNMENT OR ANY OF ITS AGENCIES BE LIABLE FOR ANY DIRECT OR INDIRECT,* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,* OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,* EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.* * This license will be governed by the laws of the Commonwealth of Virginia,* without reference to its choice of law rules.*/#include <pbs_config.h>   /* the master config generated by configure */#include	<stdio.h>#include	<stdlib.h>#include	<unistd.h>#include	<limits.h>#include	<fcntl.h>#include	<netdb.h>#include	<string.h>#include	<errno.h>#include	<assert.h>#include	<sys/types.h>#include	<sys/socket.h>#include	<sys/time.h>#include	<netinet/in.h>#include	<netdb.h>#include	"dis.h"#include	"dis_init.h"#include	"tm.h"#include	"net_connect.h"static	char	ident[] = "@(#) $RCSfile: tm.c,v $ $Revision: 2.1.10.6 $";/*** Set up a debug print macro.*/#ifdef  DEBUG#define DBPRT(x) \{ \	int	err = errno; \	printf x; \	errno = err; \}#define	DOID(x)		static char id[] = x;#else#define DBPRT(x)#define DOID(x)#endif#ifndef	MIN#define	MIN(a, b)	(((a) < (b)) ? (a) : (b))#endif/***	Allocate some string space to hold the values passed in the**	enviornment from MOM.*/static	char		*tm_jobid = NULL;static	int		tm_jobid_len = 0;static	char		*tm_jobcookie = NULL;static	int		tm_jobcookie_len = 0;static	tm_task_id	tm_jobtid = TM_NULL_TASK;static	tm_node_id	tm_jobndid = TM_ERROR_NODE;static	int		tm_momport = 0;static	int		local_conn = -1;static	int		init_done = 0;void	DIS_tcp_funcs();/***	Events are the central focus of this library.  They are tracked**	in a hash table.  Many of the library calls return events.  They**	are recorded and as information is received from MOM's, the**	event is updated and marked so tm_poll() can return it to the user.*/#define	EVENT_HASH	128typedef struct	event_info {	tm_event_t		e_event;	/* event number */	tm_node_id		e_node;		/* destination node */	int			e_mtype;	/* message type sent */	void			*e_info;	/* possible returned info */	struct	event_info	*e_next;	/* link to next event */	struct	event_info	*e_prev;	/* link to prev event */}	event_info;static	event_info	*event_hash[EVENT_HASH];static	int		event_count = 0;/***	Find an event number or return a NULL.*/static event_info *find_event(x)    tm_event_t	x;{	event_info	*ep;	for (ep=event_hash[x % EVENT_HASH]; ep; ep=ep->e_next) {		if (ep->e_event == x)			break;	}	return ep;}/***	Delete an event.*/static voiddel_event(ep)    event_info	*ep;{	/* unlink event from hash list */	if (ep->e_prev)		ep->e_prev->e_next = ep->e_next;	else		event_hash[ep->e_event % EVENT_HASH] = ep->e_next;	if (ep->e_next)		ep->e_next->e_prev = ep->e_prev;	/*	**	Free any memory saved with the event.  This depends	**	on whay type of event it is.	*/	switch (ep->e_mtype) {	case TM_INIT:	case TM_SPAWN:	case TM_SIGNAL:	case TM_OBIT:	case TM_POSTINFO:		break;	case TM_TASKS:	case TM_GETINFO:	case TM_RESOURCES:		free(ep->e_info);		break;	default:		DBPRT(("del_event: unknown event command %d\n", ep->e_mtype))		break;	}	free(ep);	if (--event_count == 0) {		close(local_conn);		local_conn = -1;	}	return;}/***	Create a new event number.*/static tm_event_tnew_event(){	static	tm_event_t	next_event = TM_NULL_EVENT+1;	event_info		*ep;	tm_event_t		ret;	if (next_event == INT_MAX)		next_event = TM_NULL_EVENT+1;	for (;;) {		ret = next_event++;		for (ep=event_hash[ret % EVENT_HASH]; ep; ep=ep->e_next) {			if (ep->e_event == ret)				break;	/* innter loop: this number is in use */		}		if (ep == NULL)			break;		/* this number is not in use */	}	return ret;}/***	Link new event number into the above hash table.*/static voidadd_event(event, node, type, info)    tm_event_t		event;    tm_node_id		node;    int			type;    void		*info;{	event_info		*ep, **head;	ep = (event_info *)malloc(sizeof(event_info));	assert(ep != NULL);	head = &event_hash[event % EVENT_HASH];	ep->e_event = event;	ep->e_node = node;	ep->e_mtype = type;	ep->e_info = info;	ep->e_next = *head;	ep->e_prev = NULL;	if (*head)		(*head)->e_prev = ep;	*head = ep;	event_count++;	return;}/***	Sessions must be tracked by the library so tm_taskid objects**	can be resolved into real tasks on real nodes.**	We will use a hash table.*/#define	TASK_HASH	256typedef	struct	task_info {	char			*t_jobid;	/* jobid */	tm_task_id		 t_task;	/* task id */	tm_node_id		 t_node;	/* node id */	struct	task_info	*t_next;	/* link to next task */}	task_info;static	task_info	*task_hash[TASK_HASH];/***	Find a task table entry for a given task number or return a NULL.*/static task_info *find_task(x)    tm_task_id	x;{	task_info	*tp;	for (tp=task_hash[x % TASK_HASH]; tp; tp=tp->t_next) {		if (tp->t_task == x)			break;	}	return tp;}/***	Create a new task entry and link it into the above hash**	table.*/static tm_task_idnew_task(jobid, node, task)    char	*jobid;    tm_node_id	node;    tm_task_id	task;{	DOID("new_task")	task_info		*tp, **head;	DBPRT(("%s: jobid=%s node=%d task=%lu\n",		id, jobid, node, (unsigned long)task))	if (jobid != tm_jobid && strcmp(jobid, tm_jobid) != 0) {		DBPRT(("%s: task job %s not my job %s\n",				id, jobid, tm_jobid))		return TM_NULL_TASK;	}	if (node == TM_ERROR_NODE) {		DBPRT(("%s: called with TM_ERROR_NODE\n", id))		return TM_NULL_TASK;	}	if ((tp = find_task(task)) != NULL) {		DBPRT(("%s: task %lu found with node %d should be %d\n",			id, (unsigned long)task, tp->t_node, node))		return task;	}	if ((tp = (task_info *)malloc(sizeof(task_info))) == NULL)		return TM_NULL_TASK;	head = &task_hash[task % TASK_HASH];	tp->t_jobid = tm_jobid;	tp->t_task  = task;	tp->t_node  = node;	tp->t_next = *head;	*head = tp;	return task;}/***	Delete a task.====== right now, this is not used.===static voiddel_task(x)    tm_task_id	x;{	task_info	*tp, *prev;	prev = NULL;	for (tp=task_hash[x % TASK_HASH]; tp; prev=tp, tp=tp->t_next) {		if (tp->t_task == x)			break;	}	if (tp) {		if (prev)			prev->t_next = tp->t_next;		else			task_hash[x % TASK_HASH] = tp->t_next;		tp->t_next = NULL;		if (tp->t_jobid != tm_jobid)			free(tp->t_jobid);		free(tp);	}	return;}*//***	The nodes are tracked in an array.*/static	tm_node_id	*node_table = NULL;/***	localmom() - make a connection to the local pbs_mom****	The connection will remain open as long as there is an**	outstanding event.*/#define PBS_NET_RC_FATAL -1#define PBS_NET_RC_RETRY -2static intlocalmom(){	static  int	       have_addr = 0;	static  struct in_addr hostaddr;	struct	hostent	*hp;	int		 i;	struct sockaddr_in remote;	int              sock;	if (local_conn >= 0)		return local_conn;	/* already have open connection */	if (have_addr == 0) {		/* lookup "localhost" and save address */		if ((hp = gethostbyname("localhost")) == NULL) {			DBPRT(("tm_init: localhost not found\n"))                	return -1;        	}		assert(hp->h_length <= sizeof(hostaddr));		memcpy(&hostaddr, hp->h_addr_list[0], hp->h_length);		have_addr = 1;	}	for (i=0; i<5; i++) {		/* get socket */		sock = socket(AF_INET, SOCK_STREAM, 0);		if (sock < 0)			return -1;					/* connect to specified local pbs_mom and port */		remote.sin_addr = hostaddr;		remote.sin_port = htons((unsigned short)tm_momport);		remote.sin_family = AF_INET;		if (connect(sock,(struct sockaddr *)&remote,sizeof(remote))<0) {			switch (errno) {			    case EINTR:			    case EADDRINUSE:			    case ETIMEDOUT:			    case ECONNREFUSED:				(void)close(sock);				sleep(1);				continue;			    default:				(void)close(sock);				return -1;			}		} else {			local_conn = sock;			break;		}	}	DIS_tcp_setup(local_conn);	return (local_conn);}/***	startcom() - send request header to local pbs_mom.**	If required, make connection to her.*/static intstartcom(com, event)     int	com;     tm_event_t	event;{	int     ret;	if (localmom() == -1)		return -1;	ret = diswsi(local_conn, TM_PROTOCOL);	if (ret != DIS_SUCCESS)		goto done;	ret = diswsi(local_conn, TM_PROTOCOL_VER);	if (ret != DIS_SUCCESS)		goto done;	ret = diswcs(local_conn, tm_jobid, tm_jobid_len);	if (ret != DIS_SUCCESS)		goto done;	ret = diswcs(local_conn, tm_jobcookie, tm_jobcookie_len);	if (ret != DIS_SUCCESS)		goto done;	ret = diswsi(local_conn, com);	if (ret != DIS_SUCCESS)		goto done;	ret = diswsi(local_conn, event);	if (ret != DIS_SUCCESS)		goto done;	ret = diswui(local_conn, tm_jobtid);	if (ret != DIS_SUCCESS)		goto done;	return DIS_SUCCESS; done:	DBPRT(("startcom: send error %s\n", dis_emsg[ret]))	close(local_conn);	local_conn = -1;	return ret;}/***	Initialize the Task Manager interface.*/inttm_init(info, roots)    void		*info;		/* in, currently unused */    struct tm_roots	*roots;		/* out */{	tm_event_t		nevent, revent;	char			*env, *hold;	int			err;	int			nerr = 0;	if (init_done)		return TM_BADINIT;	if ((tm_jobid = getenv("PBS_JOBID")) == NULL)		return TM_EBADENVIRONMENT;	tm_jobid_len = strlen(tm_jobid);	if ((tm_jobcookie = getenv("PBS_JOBCOOKIE")) == NULL)		return TM_EBADENVIRONMENT;	tm_jobcookie_len = strlen(tm_jobcookie);	if ((env = getenv("PBS_NODENUM")) == NULL)		return TM_EBADENVIRONMENT;	tm_jobndid = (tm_node_id)strtol(env, &hold, 10);	if (env == hold)		return TM_EBADENVIRONMENT;	if ((env = getenv("PBS_TASKNUM")) == NULL)		return TM_EBADENVIRONMENT;	if ((tm_jobtid = atoi(env)) == 0)		return TM_EBADENVIRONMENT;	if ((env = getenv("PBS_MOMPORT")) == NULL)		return TM_EBADENVIRONMENT;	if ((tm_momport = atoi(env)) == 0)		return TM_EBADENVIRONMENT;	init_done = 1;	nevent = new_event();	/*	 * send the following request:	 *	header		(tm_init)	 *	int		node number	 *	int		task number	 */	if (startcom(TM_INIT, nevent) != DIS_SUCCESS)		return TM_ESYSTEM;	DIS_tcp_wflush(local_conn);	add_event(nevent, TM_ERROR_NODE, TM_INIT, (void *)roots);	if ((err = tm_poll(TM_NULL_EVENT, &revent, 1, &nerr)) != TM_SUCCESS)		return err;	return nerr;}/***	Copy out node info.  No communication with pbs_mom is needed.*/inttm_nodeinfo(list, nnodes)    tm_node_id		**list;    int			*nnodes;{	tm_node_id	*np;	int		i;	int		n = 0;	if (!init_done)		return TM_BADINIT;	if (node_table == NULL)		return TM_ESYSTEM;	for (np=node_table; *np != TM_ERROR_NODE; np++)		n++;		/* how many nodes */	np = (tm_node_id *)calloc(n, sizeof(tm_node_id));	for (i=0; i<n; i++)		np[i] = node_table[i];	*list = np;	*nnodes = i;	return TM_SUCCESS;}/***	Starts <argv>[0] with environment <envp> at <where>.*/inttm_spawn(argc, argv, envp, where, tid, event)    int		 argc;		/* in  */    char       **argv;		/* in  */    char       **envp;		/* in  */    tm_node_id	 where;		/* in  */    tm_task_id	*tid;		/* out */    tm_event_t	*event;		/* out */{	char		*cp;	int		i;	if (!init_done)		return TM_BADINIT;	if (argc <= 0 || argv == NULL || argv[0] == NULL || *argv[0] == '\0')		return TM_ENOTFOUND;	*event = new_event();	if (startcom(TM_SPAWN, *event) != DIS_SUCCESS)		return TM_ENOTCONNECTED;	if (diswsi(local_conn, where) != DIS_SUCCESS)	/* send where */		return TM_ENOTCONNECTED;	if (diswsi(local_conn, argc) != DIS_SUCCESS)	/* send argc */		return TM_ENOTCONNECTED;	/* send argv strings across */	for (i=0; i < argc; i++) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -