📄 p4_utils.c
字号:
*//* IBM AIX 4.3.3 defined ALIGNMENT in sys/socket.h (!) */#define LOG_ALIGN 6#define P4_MEM_ALIGNMENT (1 << LOG_ALIGN)/* P4_MEM_ALIGNMENT is assumed below to be bigger than sizeof(p4_lock_t) + sizeof(Header *), so do not reduce LOG_ALIGN below 4 */union header{ struct { union header *ptr; /* next block if on free list */ unsigned size; /* size of this block */ } s; char align[P4_MEM_ALIGNMENT]; /* Align to P4_MEM_ALIGNMENT byte boundary */};typedef union header Header;static Header **freep; /* pointer to pointer to start of free list */static p4_lock_t *shmem_lock; /* Pointer to lock */P4VOID xx_init_shmalloc( char *memory, unsigned nbytes )/* memory points to a region of shared memory nbytes long. initialize the data structures needed to manage this memory*/{ int nunits = nbytes >> LOG_ALIGN; Header *region = (Header *) memory; /* Quick check that things are OK */ if (P4_MEM_ALIGNMENT != sizeof(Header) || P4_MEM_ALIGNMENT < (sizeof(Header *) + sizeof(p4_lock_t))) { p4_dprintfl(40,"%d %d\n",sizeof(Header),sizeof(p4_lock_t)); p4_error("xx_init_shmem: alignment is wrong", P4_MEM_ALIGNMENT); } if (!region) p4_error("xx_init_shmem: Passed null pointer", 0); if (nunits < 2) p4_error("xx_init_shmem: Initial region is ridiculously small", (int) nbytes); /* * Shared memory region is structured as follows * * 1) (Header *) freep ... free list pointer 2) (p4_lock_t) shmem_lock ... * space to hold lock 3) padding up to alignment boundary 4) First header * of free list */ freep = (Header **) region; /* Free space pointer in first block */ shmem_lock = (p4_lock_t *) (freep + 1); /* Lock still in first block */ (region + 1)->s.ptr = *freep = region + 1; /* Data in rest */ (region + 1)->s.size = nunits - 1; /* One header consumed already */# ifdef SYSV_IPC shmem_lock->semid = sysv_semid0; shmem_lock->semnum = 0;# else p4_lock_init(shmem_lock); /* Initialize the lock */# endif}char *xx_shmalloc( unsigned nbytes ){ Header *p, *prevp; char *address = (char *) NULL; unsigned nunits; /* Force entire routine to be single threaded */ (P4VOID) p4_lock(shmem_lock); nunits = ((nbytes + sizeof(Header) - 1) >> LOG_ALIGN) + 1; prevp = *freep; for (p = prevp->s.ptr;; prevp = p, p = p->s.ptr) { if (p->s.size >= nunits) { /* Big enuf */ if (p->s.size == nunits) /* exact fit */ prevp->s.ptr = p->s.ptr; else { /* allocate tail end */ p->s.size -= nunits; p += p->s.size; p->s.size = nunits; } *freep = prevp; address = (char *) (p + 1); break; } if (p == *freep) { /* wrapped around the free list ... no fit * found */ address = (char *) NULL; break; } } /* End critical region */ (P4VOID) p4_unlock(shmem_lock); if (address == NULL) p4_dprintf("xx_shmalloc: returning NULL; requested %d bytes\n",nbytes); return address;}P4VOID xx_shfree( char *ap ){ Header *bp, *p; /* Begin critical region */ (P4VOID) p4_lock(shmem_lock); if (!ap) return; /* Do nothing with NULL pointers */ bp = (Header *) ap - 1; /* Point to block header */ for (p = *freep; !(bp > p && bp < p->s.ptr); p = p->s.ptr) if (p >= p->s.ptr && (bp > p || bp < p->s.ptr)) break; /* Freed block at start of end of arena */ if (bp + bp->s.size == p->s.ptr) { /* join to upper neighbour */ bp->s.size += p->s.ptr->s.size; bp->s.ptr = p->s.ptr->s.ptr; } else bp->s.ptr = p->s.ptr; if (p + p->s.size == bp) { /* Join to lower neighbour */ p->s.size += bp->s.size; p->s.ptr = bp->s.ptr; } else p->s.ptr = bp; *freep = p; /* End critical region */ (P4VOID) p4_unlock(shmem_lock);}#endifP4VOID get_pipe(int *end_1, int *end_2){ int p[2];# if defined(IPSC860) || defined(CM5) || defined(NCUBE) || defined(SP1_EUI) || defined(SP1_EUIH) p4_dprintf("WARNING: get_pipe: socketpair assumed unavailable on this machine\n"); return;# else if (socketpair(AF_UNIX, SOCK_STREAM, 0, p) < 0) p4_error("get_pipe: socketpair failed ", -1); *end_1 = p[0]; *end_2 = p[1];# endif}P4VOID setup_conntab(){ int i, my_id; p4_dprintfl(60, "setup_conntab: myid=%d, switch_port=%d, app_id=%s\n", p4_local->my_id, p4_global->proctable[p4_local->my_id].switch_port, p4_global->application_id); p4_local->conntab = (struct connection *) p4_malloc(p4_global->num_in_proctable * sizeof(struct connection)); my_id = p4_get_my_id(); for (i = 0; i < p4_global->num_in_proctable; i++) { if (i == my_id) { p4_local->conntab[i].type = CONN_ME;# if defined(IPSC860) || defined(CM5) || defined(NCUBE) || defined(SP1_EUI) || defined(SP1_EIUH) p4_local->conntab[i].port = MYNODE();# endif# if defined(TCMP) p4_local->conntab[i].port = i;# endif } else if (in_same_cluster(i, my_id)) { p4_local->conntab[i].type = CONN_LOCAL;# if defined(IPSC860) || defined(CM5) || defined(NCUBE) || defined(SP1_EUI) || defined(SP1_EUIH) p4_local->conntab[i].port = MYNODE() + i - p4_local->my_id;# endif# if defined(TCMP) p4_local->conntab[i].port = i - p4_global->low_cluster_id;# endif } else if ((p4_global->proctable[my_id].switch_port != -1) && (p4_global->proctable[i].switch_port != -1) && (p4_global->proctable[my_id].switch_port != p4_global->proctable[i].switch_port)) { p4_local->conntab[i].type = CONN_REMOTE_SWITCH; p4_local->conntab[i].switch_port = p4_global->proctable[i].switch_port; } else { p4_local->conntab[i].type = CONN_REMOTE_NON_EST; p4_local->conntab[i].port = p4_global->proctable[i].port; } } p4_dprintfl(60, "conntab after setup_conntab:\n"); dump_conntab(60);}#ifdef SYSV_IPCP4VOID remove_sysv_ipc( void ){ int i; struct p4_global_data *g = p4_global;# if defined(SUN_SOLARIS) union semun{ int val; struct semid_ds *buf; ushort *array; } arg;# else# if defined(IBM3090) || defined(RS6000) || \ defined(TITAN) || defined(DEC5000) || \ defined(HP) || defined(KSR) int arg;# else# if defined(SEMUN_UNDEFINED) union semun { int val; struct semid_ds *buf; unsigned short int *array; struct seminfo *__buf; } arg;# else union semun arg;# endif# endif#endif /* Setup a default union semun value for semctl calls */# if defined(SUN_SOLARIS) arg.val = 0;# else# if defined(IBM3090) || defined(RS6000) || \ defined(TITAN) || defined(DEC5000) || \ defined(HP) || defined(KSR) arg = 0;# else arg.val = 0;# endif# endif /* ignore -1 return codes below due to multiple processes cleaning up the same sysv stuff; commented out "if" used to make sure that only the cluster master cleaned up in each cluster */ /* if (p4_local != NULL && p4_get_my_cluster_id() != 0) return; */ if (sysv_shmid[0] == -1) return; for (i=0; i < sysv_num_shmids; i++) { /* Unmap the addresses - don't do this until you are done with g, since g is inside of the memory */ /* shmdt( sysv_shmat[i] ); */ /* Remove the ids */ shmctl(sysv_shmid[i],IPC_RMID,(struct shmid_ds *)0); } if (g == NULL) return; if (sysv_semid0 != -1) semctl(sysv_semid0,0,IPC_RMID,arg); /* delete initial set */ for (i=1; i < g->sysv_num_semids; i++) /* delete other sets */ { semctl(g->sysv_semid[i],0,IPC_RMID,arg); }}#endifstatic int n_slaves_left;/* This routine is called if the wait fails to complete quickly */#include <sys/time.h>#ifndef TIMEOUT_VALUE_WAIT #define TIMEOUT_VALUE_WAIT 60#endifP4VOID p4_accept_wait_timeout (int);P4VOID p4_accept_wait_timeout(int sigval){ fprintf( stderr, "Timeout in waiting for processes to exit, %d left. This may be due to a defective\n\rsh program (Some versions of Kerberos rsh have been observed to have this\n\problem).\n\This is not a problem with P4 or MPICH but a problem with the operating\n\environment. For many applications, this problem will only slow down\n\process termination.\n", n_slaves_left);/* Why is p4_error commented out? On some systems (like FreeBSD), we need to to kill the generated rsh processes. *//*p4_error( "Timeout in waiting for processes to exit. This may be due to a defective rsh", 0 ); *//* exit(1); */}#ifdef HAVE_SYS_TYPES_H#include <sys/types.h>#endif#ifdef HAVE_SYS_WAIT_H#include <sys/wait.h>#endifint p4_wait_for_end( void ){ int status; int i, n_forked_slaves, pid;#ifndef THREAD_LISTENER struct slave_listener_msg msg;#endif /* p4_socket_stat is a routine that conditionally prints information about the socket status. -p4sctrl stat=y must be selected */ p4_socket_stat( stdout ); ALOG_LOG(p4_local->my_id,END_USER,0,""); ALOG_OUTPUT; /* System call statistics */#ifdef FOO if (0) { int t, count; p4_timein_hostbyname( &t, &count ); printf( "gethostbyname: (%d) calls in %d seconds\n", count, t ); }#endif# if defined(IPSC860) /* Wait for any pending messages to complete */ for (i=0; i < NUMAVAILS; i++) { struct p4_msg *mptr; mptr = p4_global->avail_buffs[i].buff; while (mptr) { if ((mptr->msg_id != -1) && (!msgdone((long) mptr->msg_id))) msgwait((long) mptr->msg_id); mptr = mptr->link; } }# endif# if defined(MEIKO_CS2) mpsc_fini();# endif /* Question: should we just close all the connections, whether we are the master or the slave *first*, before doing the test on is-master or waiting for the other processes to exit. */ if (p4_get_my_cluster_id() != 0) { /* Local slaves don't need to wait for other processes. However, they do need to cleanly close down any open sockets. This is the same code that is used below by the master. */# if defined(CAN_DO_SOCKET_MSGS) /* Tell all of the established connections that we are going away */ for (i = 0; i < p4_global->num_in_proctable; i++) { if (p4_local->conntab[i].type == CONN_REMOTE_EST) { /* Check the socket for any remaining messages, including socket close. Resets connection type to closed if found */ p4_look_for_close( i ); /* If it is still open, send the close message */ if (p4_local->conntab[i].type == CONN_REMOTE_EST) { socket_close_conn( p4_local->conntab[i].port ); /* We could wait for the partner to close; but this should be enough */ p4_local->conntab[i].type = CONN_REMOTE_CLOSED; } } }# endif return (0); } /* Free avail buffers */ free_avail_buffs(); /* Wait for all forked processes except listener to die */ /* Some implementations of RSH can fail to terminate. To work around this, we add a relatively short timeout (note that, at least in MPI programs, by the time we reach this point, everyone should have started to exit. The bug in those rsh version is in NOT using fd_set and the macros for manipluating fd_set in the call to select. These rsh's are assuming that fd's are all <= 31, and silently fail when they are not. */ p4_dprintfl(90, "enter wait_for_end nfpid=%d\n",p4_global->n_forked_pids); SIGNAL_P4(SIGALRM,p4_accept_wait_timeout);#ifndef CRAY { struct itimerval timelimit; struct timeval tval; struct timeval tzero; tval.tv_sec = TIMEOUT_VALUE_WAIT; tval.tv_usec = 0; tzero.tv_sec = 0; tzero.tv_usec = 0; timelimit.it_interval = tzero; /* Only one alarm */ timelimit.it_value = tval; setitimer( ITIMER_REAL, &timelimit, 0 );#else alarm( TIMEOUT_VALUE_WAIT );#endif /* Note that we are now in this routine (ignore some errors, such as failure to write on sockets as we are closing them) */ p4_local->in_wait_for_exit = 1; if (p4_local->listener_fd == (-1)) n_forked_slaves = p4_global->n_forked_pids; else n_forked_slaves = p4_global->n_forked_pids - 1; n_slaves_left = n_forked_slaves; for (i = 0; i < n_forked_slaves; i++) { pid = wait(&status); /* If we got an EINTR, ignore it */ if (pid < 0) { if (errno != EINTR) p4_error("p4_wait_for_end: wait error", pid); p4_dprintfl( 90, "wait returned EINTR\n" ); /* Instead of break, we could restart the wait. We'll take the position that an interrupt should force us to stop waiting on processes */ break; } --n_slaves_left; p4_dprintfl(10, "waited successfully for proc %d, %d left\n", pid, n_slaves_left); }#ifndef CRAY timelimit.it_value = tzero; /* Turn off timer */ setitimer( ITIMER_REAL, &timelimit, 0 ); }#else alarm( 0 );#endif SIGNAL_P4(SIGALRM,SIG_DFL);# if defined(CAN_DO_SOCKET_MSGS) /* Tell all of the established connections that we are going away */ for (i = 0; i < p4_global->num_in_proctable; i++) { if (p4_local->conntab[i].type == CONN_REMOTE_EST) { /* Check the socket for any remaining messages, including socket close. Resets connection type to closed if found */ p4_look_for_close( i ); /* If it is still open, send the close message */ if (p4_local->conntab[i].type == CONN_REMOTE_EST) { socket_close_conn( p4_local->conntab[i].port ); /* We could wait for the partner to close; but this should be enough */ p4_local->conntab[i].type = CONN_REMOTE_CLOSED; } } } /* Tell the listener to die and wait for him to do so (only if it is a separate process) */#ifndef THREAD_LISTENER if (p4_local->listener_fd != (-1)) { p4_dprintfl(90, "tell listener to die listpid=%d fd=%d\n", p4_global->listener_pid, p4_local->listener_fd); msg.type = p4_i_to_n(DIE); msg.from = p4_i_to_n(p4_get_my_id()); net_send(p4_local->listener_fd, &msg, sizeof(msg), P4_FALSE); /* Make sure that no further reads are possible for the LISTENER on this FD */ close( p4_local->listener_fd ); /* This wait is potentially an infinite loop. We can fix this with either an alarm (to terminate it) or by using a nonblocking wait call and a loop. */ pid = wait(&status); p4_dprintfl(90, "detected that proc %d died \n", pid); }#endif /* free listener data structures */ # endif clean_execer_port(); if (p4_get_my_id()) p4_dprintfl(20,"process exiting\n"); p4_dprintfl(90, "exit wait_for_end \n");
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -