📄 p4_rm.c
字号:
bm_msg.port = p4_i_to_n(listener_port); net_send(bm_fd, &bm_msg, sizeof(struct bm_rm_msg), P4_FALSE); rm_host[0] = '\0'; get_qualified_hostname(rm_host,100);#ifdef CAN_DO_SWITCH_MSGS rm_switch_port = getswport(rm_host);#else rm_switch_port = -1;#endif /* Send my info to the bm */ bm_msg.type = p4_i_to_n(REMOTE_MASTER_INFO); bm_msg.slave_idx = p4_i_to_n(0); bm_msg.slave_pid = p4_i_to_n(getpid()); bm_msg.switch_port = p4_i_to_n(rm_switch_port); strncpy(bm_msg.host_name,rm_host,HOSTNAME_LEN); strncpy(bm_msg.local_name,g->my_host_name,HOSTNAME_LEN); strncpy(bm_msg.machine_type,P4_MACHINE_TYPE,sizeof(bm_msg.machine_type)); net_send(bm_fd, &bm_msg, sizeof(struct bm_rm_msg), P4_FALSE); g->local_slave_count = 0;# ifdef TCMP tcmp_init(NULL,p4_get_my_cluster_id(),shmem_getclunid());# endif# if defined(IPSC860) || defined(CM5) || defined(NCUBE) || defined(SP1_EUI) || defined(SP1_EUIH) for (slave_idx = 1; slave_idx <= nslaves - 1; slave_idx++) {# if defined(IPSC860) crecv(INITIAL_INFO, &bm_msg, (long) sizeof(struct bm_rm_msg));# endif# if defined(CM5) CMMD_receive(CMMD_ANY_NODE, INITIAL_INFO, (void *) &bm_msg, sizeof(struct bm_rm_msg));# endif# if defined(NCUBE) from = NCUBE_ANY_NODE; type = INITIAL_INFO; nread(&bm_msg, sizeof(struct bm_rm_msg), &from, &type, &unused_flag);# endif# if defined(SP1_EUI) from = ANY_P4TYPE_EUI; type = INITIAL_INFO; mpc_brecv(&bm_msg, sizeof(struct bm_rm_msg), &from, &type, &unused_flag);# endif# if defined(SP1_EUIH) from = ANY_P4TYPE_EUIH; type = INITIAL_INFO; len = sizeof(struct bm_rm_msg); mpc_brecv(&bm_msg, &len, &from, &type, &unused_flag);# endif net_send(bm_fd, &bm_msg, sizeof(struct bm_rm_msg), P4_FALSE); g->local_slave_count++; }# else for (slave_idx = 1; slave_idx <= nslaves - 1; slave_idx++) { p4_dprintfl(20,"remote master creating local slave %d\n",slave_idx); # if !defined(NO_LISTENER) get_pipe(&end_1, &end_2); listener_info->slave_fd[slave_idx] = end_2;# endif slave_pid = fork_p4();# if !defined(NO_LISTENER) listener_info->slave_pid[slave_idx] = slave_pid;# endif if (slave_pid) p4_dprintfl(10,"remote master created local slave %d\n",slave_idx); if (slave_pid == 0) { /* In the slave process */ sprintf(whoami_p4, "rm_s_%d_%d_%d", rm_num, slave_idx, (int)getpid()); p4_local = alloc_local_slave(); /* Check for environment variables that redirect stdin */ mpiexec_reopen_stdin();# if !defined(NO_LISTENER) {#ifdef USE_NONBLOCKING_LISTENER_SOCKETS int rc = p4_make_socket_nonblocking( end_1 ); if (rc < 0) { p4_error("create_rm_processes: set listener nonblocking", rc); }#endif /* USE_NONBLOCKING_LISTENER_SOCKETS */ p4_local->listener_fd = end_1;# if !defined(THREAD_LISTENER) close(end_2);# endif }#endif close( listener_fd ); /* hang for a valid proctable. The master holds this lock until the slave processes are created, so this lock/unlock ensures that we wait until the proctable is valid. */ p4_lock(&g->slave_lock); p4_unlock(&g->slave_lock); /* Don't enable the interrupt handler until a valid proctable exists. To handle the possibility that an interrupt may be lost, the listener will reissue interrupts if the slave does not respond quickly. */#ifndef THREAD_LISTENER SIGNAL_P4(LISTENER_ATTN_SIGNAL, handle_connection_interrupt);#endif p4_local->my_id = p4_get_my_id_from_proc(); sprintf(whoami_p4, "p%d_%d", p4_get_my_id(), (int)getpid()); setup_conntab(); usc_init(); init_usclock();# ifdef TCMP tcmp_init(NULL,p4_get_my_cluster_id(),shmem_getclunid());# endif /* sync with local master twice: once to make sure all slaves have got proctable, and second after the master has synced bm */ p4_barrier(&(p4_global->cluster_barrier),p4_num_cluster_ids()); p4_barrier(&(p4_global->cluster_barrier),p4_num_cluster_ids()); p4_dprintfl(20, "remote process starting\n"); ALOG_SETUP(p4_local->my_id,ALOG_TRUNCATE); ALOG_LOG(p4_local->my_id,BEGIN_USER,0,""); return; } /* if slave_pid == 0 */ /* else slave_pid != 0, ie., I am the parent */# if !defined(NO_LISTENER) /* slave holds this end */ close(end_1);# endif /* Send off the slave info to the bm */ bm_msg.type = p4_i_to_n(REMOTE_SLAVE_INFO); bm_msg.slave_idx = p4_i_to_n(slave_idx); bm_msg.slave_pid = p4_i_to_n(slave_pid); bm_msg.switch_port = p4_i_to_n(rm_switch_port); strcpy(bm_msg.machine_type,P4_MACHINE_TYPE); /* strcpy(bm_msg.slave_hosthame, ); */ net_send(bm_fd, &bm_msg, sizeof(struct bm_rm_msg), P4_FALSE); g->local_slave_count++; }#endif /* Send the end message to the bm */ bm_msg.type = p4_i_to_n(REMOTE_SLAVE_INFO_END); net_send(bm_fd, &bm_msg, sizeof(struct bm_rm_msg), P4_FALSE); /* * Done creating slaves. Now fork off the listener .. we've already * created the socket and bound a port to it */ /* setup listener port even in no-listener case, because this process may use it to do direct connections */ g->listener_port = listener_port; g->listener_fd = listener_fd;# if !defined(IPSC860) && !defined(CM5) && !defined(NCUBE) && !defined(SP1_EUI) && !defined(SP1_EUIH) get_pipe(&end_1, &end_2); p4_local->listener_fd = end_1; listener_info->slave_fd[0] = end_2;#ifdef USE_NONBLOCKING_LISTENER_SOCKETS { int rc = p4_make_socket_nonblocking( end_1 ); if (rc < 0) { p4_error("create_rm_processes: set listener nonblocking", rc); } }#endif /* USE_NONBLOCKING_LISTENER_SOCKETS */# if !defined(NO_LISTENER) && !defined(THREAD_LISTENER) listener_pid = fork_p4(); if (listener_pid == 0) { /* Inside listener */ listener_info->slave_pid[0] = getppid(); close(end_1); sprintf(whoami_p4, "rm_l_%d_%d", rm_num, (int)getpid()); p4_dprintfl(70, "inside listener pid %d\n", getpid()); { /* exec external listener process */ char *listener_prg = LISTENER_PATHNAME; if (*listener_prg) { char dbg_c[10], max_c[10], lfd_c[10], sfd_c[10]; /* p4_error("external listener not supported", 0); */ sprintf(dbg_c, "%d", p4_debug_level); sprintf(max_c, "%d", p4_global->max_connections); sprintf(lfd_c, "%d", listener_info->listening_fd); sprintf(sfd_c, "%d", listener_info->slave_fd[0]); p4_dprintfl(70, "exec %s %s %s %s %s\n", listener_prg, dbg_c, max_c, lfd_c, sfd_c); execlp(listener_prg, listener_prg, dbg_c, max_c, lfd_c, sfd_c, NULL); p4_dprintfl(70, "exec failed (errno= %d), using buildin\n", errno); } } listener(); exit(0); }# endif close(listener_fd); close(end_2); /* Else we're still in the remote master */# if defined(THREAD_LISTENER) p4_dprintfl(50,"creating listener thread\n"); /* This is the Windows NT version (HANDLE trc) */ /* pthread_mutex_init( &p4_local->conntab_lock, 0 ); */ p4_create_thread( trc, thread_listener, 66 ); /* NT version put the last arg of CreateThread into listener_pid */ p4_dprintfl(50,"created listener thread\n");# else p4_dprintfl(70, "created listener pid %d\n", listener_pid); g->listener_pid = listener_pid;# endif# endif /* !IPSC860 etc */ rm_flag = 1; /* I am the remote master */}P4VOID receive_proc_table(int bm_fd){ P4BOOL done; struct bm_rm_msg msg; int type; int port, unix_id, slave_idx, group_id; int switch_port; p4_dprintfl(90, "receive_proc_table\n"); for (done = P4_FALSE; !done;) { if (net_recv(bm_fd, &msg, sizeof(msg)) == PRECV_EOF) p4_error("recv_proc_table: got net_send_eof", bm_fd); type = p4_n_to_i(msg.type); switch (type) { case PROC_TABLE_ENTRY: group_id = p4_n_to_i(msg.group_id); port = p4_n_to_i(msg.port); unix_id = p4_n_to_i(msg.unix_id); slave_idx = p4_n_to_i(msg.slave_idx); switch_port = p4_n_to_i(msg.switch_port); p4_dprintfl(90, "got entry gid=%d host=%s port=%d unix_id=%d slave_idx=%d switch_port=%d\n", group_id,msg.host_name,port,unix_id,slave_idx,switch_port); /* remote master loading proctable from big master */ install_in_proctable(group_id, port, unix_id, msg.host_name, msg.local_name, slave_idx, msg.machine_type, switch_port); break; case PROC_TABLE_END: done = P4_TRUE; break; default: p4_dprintf("receive_proc_table: got invalid message type %d\n", type); break; } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -