📄 p4_bm.c
字号:
#include "p4.h"#include "p4_sys.h"/* extern struct fdentry fdtable[MAXFDENTRIES]; */void p4_peer_msg_handler(char *);int bm_start(int *argc, char **argv){ int bm_switch_port; char *s, pgmname[P4_MAX_PGM_LEN]; int rc, myjsize, myrank; BNR_Group mygroup; setbuf( stdout, NULL ); /* turn off buffering for clients */ alloc_global(); /* sets p4_global */ p4_global->local_communication_only = P4_FALSE; /* hard code for mpd */ p4_local = alloc_local_bm(); if (p4_local == NULL) p4_error("p4_initenv: alloc_local_bm failed\n", 0); rc = BNR_Pre_init( p4_peer_msg_handler ); /* specific to mpich-1 */ rc = BNR_Init( ); rc = BNR_Get_group( &mygroup ); rc = BNR_Get_rank( mygroup, &myrank ); rc = BNR_Get_size( mygroup, &myjsize ); p4_dprintfl(10,"IGNORING SIGPIPE\n"); SIGNAL_P4(SIGPIPE,SIG_IGN); sprintf(whoami_p4, "p%d_%d", myrank, (int)getpid()); p4_global->num_in_proctable = myjsize; /* there really isn't any proctable */ p4_local->my_id = myrank; p4_local->my_job = BNR_Get_group_id( mygroup ); /* default jobid for now */ setup_conntab(); /* get fd for talking to the manager from the environment, where he put it after acquiring it. */ rc = BNR_Man_msgs_fd( &(p4_local->parent_man_fd) ); /* choose a working directory */ if (strlen(p4_wd) && !chdir(p4_wd)) { p4_dprintfl(90, "working directory for %d set to %s\n", p4_local->my_id, p4_wd); } else { strncpy(pgmname,argv[0],P4_MAX_PGM_LEN); pgmname[P4_MAX_PGM_LEN-1] = 0; if ((s = (char *) rindex(pgmname,'/')) != NULL) { *s = '\0'; /* chg to directory name only */ chdir(pgmname); } }# ifdef SYSV_IPC sysv_num_shmids = 0; sysv_shmid[0] = -1; sysv_semid0 = -1; sysv_semid0 = init_sysv_semset(0);# endif MD_initmem(globmemsize); if (*bm_outfile) { freopen(bm_outfile, "w", stdout); freopen(bm_outfile, "w", stderr); } MD_initenv(); bm_switch_port = getswport(p4_global->my_host_name); usc_init(); init_usclock(); /* big master installing himself */ install_in_proctable(0, (-1), getpid(), p4_global->my_host_name, p4_global->my_host_name, 0, P4_MACHINE_TYPE, bm_switch_port); BNR_Fence( mygroup ); /* to make sure p4 data structures are set before interrupts can occur */ return (0);}void p4_peer_msg_handler( msg )char *msg;{ char cmd[32] /*, tohostname[MAXLINE]*/; char c_torank[32],c_toport[32],c_toipaddr[32]; int flags,torank,toport,connection_fd,num_tries,rc,connected,optval; int myid = p4_get_my_id(); unsigned int toipaddr; struct sockaddr_in sa; p4_dprintfl(077,"p4_peer_msg_handler entered with msg :%s:\n",msg ); if (strncmp(msg,"connect_to_me-",14) != 0) { p4_dprintf("invalid msg in p4_peer_msg_handler :%s:\n",msg); return; } sscanf(msg,"%[^-]-%[^-]-%[^-]-%s",cmd,c_torank,c_toipaddr,c_toport); torank = atoi(c_torank); toport = atoi(c_toport); toipaddr = inet_addr(c_toipaddr); bzero((void *)&sa, sizeof(struct sockaddr_in)); bcopy((void *)&toipaddr, (void *)&sa.sin_addr, sizeof(unsigned int)); sa.sin_family = AF_INET; sa.sin_port = htons(toport); if (p4_local->conntab[torank].type == CONN_REMOTE_EST) { p4_dprintfl(077, "p4_peer_msg_handler: already conn'd to %d\n", torank); return; } if (p4_global->dest_id[myid] == torank) /* already connecting */ { if (myid > torank) { p4_dprintfl(077, "p4_peer_msg_handler: already making conn to %d\n", torank); return; } } connected = 0; num_tries = 3; while (!connected && num_tries) { SYSCALL_P4(connection_fd, socket(AF_INET, SOCK_STREAM, 0)); if (connection_fd < 0) p4_error("p4_peer_msg_handler socket", connection_fd); SYSCALL_P4(rc, connect(connection_fd, (struct sockaddr *) &sa, sizeof(struct sockaddr_in))); if (rc < 0) { close(connection_fd); p4_dprintfl( 077, "Connect failed; closed socket %d\n", connection_fd ); if (--num_tries) { p4_dprintfl(077,"p4_peer_msg_handler: connect to %s failed; will try %d more times \n",c_toipaddr,num_tries); sleep(1); } } else { connected = 1; p4_dprintfl(077,"p4_peer_msg_handler: connected to %s\n",c_toipaddr); } } if ( ! connected) { p4_dprintf("p4_peer_msg_handler: failed connect to %s\n",c_toipaddr); p4_error("failed to connnect",-1); } flags = fcntl(connection_fd, F_GETFL, 0); if (flags < 0) p4_error("p4_bm fcntl1", flags); # if defined(HP) flags |= O_NONBLOCK;# else flags |= O_NDELAY; p4_dprintfl(90, "p4_bm: setting ndelay for %d\n",connection_fd);# endif # if defined(RS6000) flags |= O_NONBLOCK;# endif flags = fcntl(connection_fd, F_SETFL, flags); if (flags < 0) p4_error("p4_bm fcntl2", flags); optval = 1; SYSCALL_P4(rc, setsockopt(connection_fd,IPPROTO_TCP,TCP_NODELAY,(char *) &optval,sizeof(optval))); p4_local->conntab[torank].type = CONN_REMOTE_EST; p4_local->conntab[torank].port = connection_fd; p4_local->conntab[torank].same_data_rep = P4_TRUE;/* p4_dprintfl(077, "p4_peer_msg_handler: connected after %d tries, connection_fd=%d host = %s\n", num_tries, connection_fd, tohostname); */ p4_dprintfl(077, "p4_peer_msg_handler: connected after %d tries, connection_fd=%d\n", num_tries, connection_fd); /* We're connected, so we can add this connection to the table */ p4_dprintfl(077, "marked as established fd=%d torank=%d\n", connection_fd, torank); p4_dprintfl(077,"p4_peer_msg_handler done\n" );}int p4_startup(pg)struct p4_procgroup *pg;{ int nslaves; int listener_port, listener_fd; p4_dprintfl(90,"entering p4_startup\n"); if (p4_global == NULL) p4_error("p4 not initialized; perhaps p4_initenv not called",0);/* On some systems (SGI IRIX 6), process exit sometimes kills all processes in the process GROUP. This code attempts to fix that. We DON'T do it if stdin (0) is connected to a terminal, because that disconnects the process from the terminal. */#if defined(HAVE_SETSID) && defined(HAVE_ISATTY) && defined(SET_NEW_PGRP)if (!isatty(0)) { pid_t rc; rc = setsid(); if (rc < 0) { p4_dprintfl( 90, "Could not create new process group\n" ); } else { p4_dprintfl( 80, "Created new process group %d\n", rc ); } }else { p4_dprintfl( 80, "Did not created new process group because isatty returned true\n" ); }#endif procgroup_to_proctable(pg); if (pg->num_entries > 1) p4_global->local_communication_only = P4_FALSE;# ifdef CAN_DO_SOCKET_MSGS if (!p4_global->local_communication_only) { net_setup_anon_listener(10, &listener_port, &listener_fd); p4_global->listener_port = listener_port; p4_global->listener_fd = listener_fd; p4_dprintfl(90, "setup listener on port %d fd %d\n", listener_port, listener_fd); p4_global->proctable[0].port = listener_port;#ifndef THREAD_LISTENER SIGNAL_P4(LISTENER_ATTN_SIGNAL, handle_connection_interrupt);#endif } else p4_global->listener_fd = -1;# endif setup_conntab(); p4_lock(&p4_global->slave_lock); if ((nslaves = create_bm_processes(pg)) < 0) return (-1); if (!p4_am_i_cluster_master()) /* I was forked in create_bm_processes */ return(0);# ifdef CAN_DO_SOCKET_MSGS if (create_remote_processes(pg) < 0) return (-1);# endif /* let local slaves use proc table to identify themselves */ p4_unlock(&p4_global->slave_lock); send_proc_table(); /* to remote masters */# if defined(IPSC860) || defined(CM5) || defined(NCUBE) \ || defined(SP1_EUI) || defined(SP1_EUIH) { struct bm_rm_msg bm_msg; struct p4_procgroup_entry *local_pg; int len, to, type; int i, unused_flag; /* send initial info and proctable to local slaves */ /* must use p4_i_to_n procs because node slave does not know if the msg is forwarded from bm */ local_pg = &(pg->entries[0]); bm_msg.type = p4_i_to_n(INITIAL_INFO); bm_msg.numinproctab = p4_i_to_n(p4_global->num_in_proctable); bm_msg.numslaves = p4_i_to_n(local_pg->numslaves_in_group); bm_msg.debug_level = p4_i_to_n(p4_remote_debug_level); bm_msg.memsize = p4_i_to_n(globmemsize); bm_msg.logging_flag = p4_i_to_n(logging_flag); strcpy(bm_msg.application_id, p4_global->application_id); strcpy(bm_msg.version, P4_PATCHLEVEL); if (strlen( local_pg->slave_full_pathname ) >= P4_MAX_PGM_LEN) { p4_error("Program name is too long, must be less than", P4_MAX_PGM_LEN); } strcpy(bm_msg.pgm, local_pg->slave_full_pathname); strcpy(bm_msg.wdir, p4_wd); for (i = 1; i <= nslaves; i++) { p4_dprintfl(90,"sending initinfo to slave %d of %d\n",i,nslaves);# if defined(IPSC860) csend((long) INITIAL_INFO, &bm_msg, (long) sizeof(struct bm_rm_msg), (long) i, (long) NODE_PID); csend((long) INITIAL_INFO, p4_global->proctable, (long) sizeof(p4_global->proctable), (long) i, (long) NODE_PID);# endif# if defined(CM5) CMMD_send_noblock(i, INITIAL_INFO, &bm_msg, sizeof(struct bm_rm_msg)); CMMD_send_noblock(i, INITIAL_INFO, p4_global->proctable, sizeof(p4_global->proctable));# endif# if defined(NCUBE) nwrite(&bm_msg, sizeof(struct bm_rm_msg), i, INITIAL_INFO, &unused_flag); nwrite(p4_global->proctable, sizeof(p4_global->proctable), i, INITIAL_INFO, &unused_flag);# endif# if defined(SP1_EUI) mpc_bsend(&bm_msg, sizeof(struct bm_rm_msg), i, INITIAL_INFO); mpc_bsend(p4_global->proctable, sizeof(p4_global->proctable), i, INITIAL_INFO);# endif# if defined(SP1_EUIH) len = sizeof(struct bm_rm_msg); to = i; type = INITIAL_INFO; mp_bsend(&bm_msg, &len, &to, &type); len = sizeof(p4_global->proctable); mp_bsend(p4_global->proctable, &len, &to, &type);# endif p4_dprintfl(90,"sent initinfo to slave %d of %d\n",i,nslaves); } } /* End of local declarations */# endif p4_global->low_cluster_id = p4_local->my_id - p4_global->proctable[p4_local->my_id].slave_idx; p4_global->hi_cluster_id = p4_global->low_cluster_id + p4_global->local_slave_count + 1; /* sync with local slaves thus insuring that they have the proctable before syncing with remotes (this keeps remotes from interrupting the local processes too early; then re-sync with local slaves (thus permitting them to interrupt remotes) */ p4_barrier(&(p4_global->cluster_barrier),p4_num_cluster_ids()); /* NEED A SYNC WITH LOCALS THAT DOES A BARRIER WITH PROCS THAT SHARE MEMORY AND MP BARRIER WITH OTHER "LOCAL" PROCESSES */ sync_with_remotes(); p4_barrier(&(p4_global->cluster_barrier),p4_num_cluster_ids()); return (0);}int create_bm_processes(pg)struct p4_procgroup *pg;{ struct p4_procgroup_entry *local_pg; struct listener_data *l = NULL; int nslaves, end_1, end_2; int slave_pid, listener_pid = -1; int slave_idx, listener_fd = -1;# if defined(IPSC860) || defined(CM5) || defined(NCUBE) || defined(SP1_EUI) || defined(SP1_EUIH) /* Message passing systems require additional information */ struct bm_rm_msg bm_msg; int i; int port, switch_port, type, len, from, unused_flag#endif# if defined(THREAD_LISTENER) p4_thread_t trc;# endif p4_dprintfl(90,"entering create_bm_processes\n"); local_pg = &(pg->entries[0]); nslaves = local_pg->numslaves_in_group;# if !defined(IPSC860) && !defined(CM5) && !defined(NCUBE) && !defined(SP1_EUI) && !defined(SP1_EUIH) if (nslaves > P4_MAX_MSG_QUEUES) p4_error("more slaves than msg queues \n", nslaves);# endif/* alloc listener local data since this proc will eventually become listener */# if defined(CAN_DO_SOCKET_MSGS) && !defined(NO_LISTENER) if (!(p4_global->local_communication_only)) { listener_fd = p4_global->listener_fd; listener_info = alloc_listener_info(1); l = listener_info; get_pipe(&end_1, &end_2); /* used even by thread listener */ l->slave_fd[0] = end_2; }# endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -