📄 p4_bm.c
字号:
p4_dprintfl(10, "created local slave %d\n", slave_idx); } if (slave_pid == 0) /* At this point, we are the slave. */ { sprintf(whoami_p4, "bm_slave_%d_%d", slave_idx, (int)getpid()); p4_free(p4_local); /* Doesn't work for weird memory model. */ p4_local = alloc_local_slave(); /* Check for environment variables that redirect stdin */ mpiexec_reopen_stdin();# ifdef CAN_DO_SOCKET_MSGS if (!(p4_global->local_communication_only)) {#ifdef USE_NONBLOCKING_LISTENER_SOCKETS /* Set the listener socket to be nonblocking. */ int rc = p4_make_socket_nonblocking( end_1 ); if (rc < 0) { p4_error("create_bm_processes: set listener nonblocking", rc); }#endif /* USE_NONBLOCKING_LISTENER_SOCKETS */ p4_local->listener_fd = end_1;# if !defined(THREAD_LISTENER) close(end_2);# endif close(listener_fd); }# endif /* CAN_DO_SOCKET_MSGS */ /* hang for a valid proctable. Note that the master locks the slave lock before is starts creating the slave processes, so the initial lock is not acquired until *after* the master releases the lock. */ p4_lock(&p4_global->slave_lock); p4_unlock(&p4_global->slave_lock);#ifdef CAN_DO_SOCKET_MSGS /* Wait to install the listener interrupt handler until the proctable is valid. The listener will reissue the interrupt if the slave misses because it was waiting on the lock around the proctable */#ifndef THREAD_LISTENER SIGNAL_P4(LISTENER_ATTN_SIGNAL, handle_connection_interrupt);#endif#endif /* CAN_DO_SOCKET_MSGS */ p4_local->my_id = p4_get_my_id_from_proc();#if defined(SUN_SOLARIS)/***** Shyam code, removed by RL { int no_processors; processorid_t bindproc; no_processors = sysconf(_SC_NPROCESSORS_ONLN); bindproc = (p4_local->my_id) % no_processors; if(p_online(bindproc,P_STATUS) != P_ONLINE) { printf("could not bind slave %d to processor %d", p4_local->my_id, bindproc); } else { printf("Bound slave %d to processor %d\n", p4_local->my_id,bindproc); processor_bind(P_PID,P_MYID,bindproc, &bindproc); printf("previous binding was %d\n",bindproc); } }*****/#endif /* SUN_SOLARIS */ setup_conntab(); sprintf(whoami_p4, "p%d_%d", p4_local->my_id, (int)getpid()); usc_init(); init_usclock();# ifdef TCMP tcmp_init(NULL,p4_get_my_cluster_id(),shmem_getclunid());# endif /* sync with master twice: once to make sure all slaves have got proctable, and second after the master has synced with the remote processes */ p4_barrier(&(p4_global->cluster_barrier),p4_num_cluster_ids()); p4_barrier(&(p4_global->cluster_barrier),p4_num_cluster_ids()); p4_dprintfl(20, "local slave starting\n"); ALOG_SETUP(p4_local->my_id,ALOG_TRUNCATE); ALOG_LOG(p4_local->my_id,BEGIN_USER,0,""); return (0); }# ifdef CAN_DO_SOCKET_MSGS /* slave holds this end */ close(end_1);# endif /* master installing local slaves */ install_in_proctable(0, p4_global->listener_port, slave_pid, p4_global->my_host_name, p4_global->my_host_name, slave_idx, P4_MACHINE_TYPE, p4_global->proctable[0].switch_port); p4_global->local_slave_count++; }# endif# if defined(CM5) for (i=nslaves+1; i < CMMD_partition_size(); i++) CMMD_send_noblock(i, DIE, &bm_msg, sizeof(struct bm_rm_msg));# endif# if defined(NCUBE) for (i=nslaves+1; i < ncubesize(); i++) nwrite(&bm_msg, sizeof(struct bm_rm_msg), i, DIE, &unused_flag);# endif# if defined(SP1_EUI) for (i=nslaves+1; i < eui_numtasks; i++) mpc_bsend(&bm_msg, sizeof(struct bm_rm_msg), i, DIE);# endif# if defined(SP1_EUIH) for (i=nslaves+1; i < euih_numtasks; i++) { len = sizeof(struct bm_rm_msg); type = DIE; mp_bsend(&bm_msg, &len, &i, &type); }# endif /* Done creating slaves. Now fork off the listener */# if !defined(IPSC860) && !defined(CM5) && !defined(NCUBE) && !defined(SP1_EUI) && !defined(SP1_EUIH)# if defined(CAN_DO_SOCKET_MSGS) && !defined(NO_LISTENER) && !defined(THREAD_LISTENER) if (!(p4_global->local_communication_only)) { /* communication big master <--> listener */ get_pipe(&end_1, &end_2); p4_local->listener_fd = end_1; listener_info->slave_fd[0] = end_2;#ifdef USE_NONBLOCKING_LISTENER_SOCKETS /* Set the listener socket to be nonblocking. */ { int rc = p4_make_socket_nonblocking( end_1 ); if (rc < 0) { p4_error("create_bm_processes: set listener nonblocking", rc); } }#endif /* USE_NONBLOCKING_LISTENER_SOCKETS */ /* Now, create the listener */ listener_pid = fork_p4(); if (listener_pid < 0) { p4_error("create_bm_processes listener fork", listener_pid); } if (listener_pid == 0) { /* I am the listener */ listener_info->slave_pid[0] = getppid(); close(end_1); sprintf(whoami_p4, "bm_list_%d", (int)getpid()); /* Inside listener */ p4_local = alloc_local_listener(); { /* exec external listener process */ char *listener_prg = LISTENER_PATHNAME; if (*listener_prg) { char dbg_c[10], max_c[10], lfd_c[10], sfd_c[10]; /* p4_error("external listener not supported", 0); */ sprintf(dbg_c, "%d", p4_debug_level); sprintf(max_c, "%d", p4_global->max_connections); sprintf(lfd_c, "%d", listener_info->listening_fd); sprintf(sfd_c, "%d", listener_info->slave_fd[0]); p4_dprintfl(70, "exec %s %s %s %s %s\n", listener_prg, dbg_c, max_c, lfd_c, sfd_c); execlp(listener_prg, listener_prg, dbg_c, max_c, lfd_c, sfd_c, NULL); p4_dprintfl(70, "exec failed (errno= %d), using buildin\n", errno); } } listener(); exit(0); } }# endif /* Else we're still in the big master */ sprintf(whoami_p4, "p0_%d", (int)getpid());# if defined(THREAD_LISTENER) /* If there is only one process, then we will not have created a listener port or corresponding fd. In that case, we don't need the listener (it just does an while(1) {accept(listener_fd);...}, so there isn't anything to do if only a single process is running). */ if (p4_global->listener_fd >= 0) { p4_dprintfl(50,"creating listener thread\n"); /* pthread_mutex_init( &p4_local->conntab_lock, 0 ); */ p4_create_thread( trc, thread_listener, 66 ); p4_dprintfl(50,"created listener thread\n"); } /* else trc = 0; */ /* NT version put the last arg of CreateThread into listener_pid */# endif /* We need to close the fds from the listener setup, in big master * process, slave number 0. */# if defined(CAN_DO_SOCKET_MSGS) && !defined(NO_LISTENER) if (!(p4_global->local_communication_only)) {# if !defined(THREAD_LISTENER) close(listener_fd); close(end_2);# endif p4_global->listener_pid = listener_pid; }# endif# endif dump_global(80); p4_dprintfl(90, "create_bm_processes: exiting\n"); return (nslaves);}P4VOID procgroup_to_proctable( struct p4_procgroup *pg){ int i, j, ptidx; struct p4_procgroup_entry *pe; if (strcmp(pg->entries[0].host_name,"local") == 0) { strcpy(p4_global->proctable[0].host_name,p4_global->my_host_name); } else { p4_dprintfl(10,"hostname in first line of procgroup is %s\n", pg->entries[0].host_name); strcpy(p4_global->my_host_name,pg->entries[0].host_name); strcpy(p4_global->proctable[0].host_name,pg->entries[0].host_name); } get_qualified_hostname(p4_global->proctable[0].host_name,HOSTNAME_LEN); p4_dprintfl(10,"hostname for first entry in proctable is %s\n", p4_global->proctable[0].host_name); p4_global->proctable[0].group_id = 0; ptidx = 1; for (i=0, pe=pg->entries; i < pg->num_entries; i++, pe++) { for (j=0; j < pe->numslaves_in_group; j++) { if (i == 0) strcpy(p4_global->proctable[ptidx].host_name, p4_global->proctable[0].host_name); else strcpy(p4_global->proctable[ptidx].host_name,pe->host_name); get_qualified_hostname(p4_global->proctable[ptidx].host_name, HOSTNAME_LEN); p4_global->proctable[ptidx].group_id = i;# ifdef CAN_DO_SOCKET_MSGS { struct hostent *hp = gethostbyname_p4(p4_global->proctable[ptidx].host_name); struct sockaddr_in *listener_sockaddr = &p4_global->proctable[ptidx].sockaddr;#ifdef LAZY_GETHOSTBYNAME /* Since we just set the *address* above,we can confirm the data next */ p4_procgroup_setsockaddr( &p4_global->proctable[ptidx] );#endif bzero( (P4VOID*) listener_sockaddr, sizeof(struct sockaddr_in) ); bcopy((P4VOID *) hp->h_addr, (P4VOID *)&listener_sockaddr->sin_addr, hp->h_length); listener_sockaddr->sin_family = hp->h_addrtype; /* Set a dummy port so that we can detect that the field has been initialized */ listener_sockaddr->sin_port = 1; }# endif ptidx++; } p4_global->num_in_proctable = ptidx; }}P4VOID sync_with_remotes(){ struct bm_rm_msg msg; int i, fd, node, num_rms, rm[P4_MAXPROCS]; p4_dprintfl(90, "sync_with_remotes: starting\n");# ifdef CAN_DO_SOCKET_MSGS p4_get_cluster_masters(&num_rms, rm); for (i = 1; i < num_rms; i++) { node = rm[i]; fd = p4_local->conntab[node].port; net_recv(fd, &msg, sizeof(msg)); msg.type = p4_n_to_i(msg.type); if (msg.type != SYNC_MSG) p4_error("sync_with_remotes: bad type rcvd\n",msg.type); } for (i = 1; i < num_rms; i++) { node = rm[i]; fd = p4_local->conntab[node].port; msg.type = p4_i_to_n(SYNC_MSG); net_send(fd, &msg, sizeof(msg), P4_FALSE); }# endif}P4VOID send_proc_table(){ int slave_idx, ent; int fd; struct bm_rm_msg msg; struct proc_info *pe; p4_dprintfl(90, "send_proc_table: starting\n");# ifdef CAN_DO_SOCKET_MSGS for (slave_idx = 1; slave_idx < p4_global->num_in_proctable; slave_idx++) { if (p4_global->proctable[slave_idx].slave_idx != 0) continue; fd = p4_local->conntab[slave_idx].port; p4_dprintfl(90, "sending proctable to slave %d on %d:\n", slave_idx, fd); if (fd < 0) p4_error("send_proc_table: rm entry doesn't have valid fd", fd); for (ent = 0, pe = p4_global->proctable; ent < p4_global->num_in_proctable; ent++, pe++) { msg.type = p4_i_to_n(PROC_TABLE_ENTRY); msg.port = p4_i_to_n(pe->port); msg.unix_id = p4_i_to_n(pe->unix_id); msg.slave_idx = p4_i_to_n(pe->slave_idx); msg.group_id = p4_i_to_n(pe->group_id); strcpy(msg.host_name, pe->host_name); strcpy(msg.local_name, pe->local_name); strcpy(msg.machine_type,pe->machine_type); msg.switch_port = p4_i_to_n(pe->switch_port); net_send(fd, &msg, sizeof(msg), P4_FALSE); p4_dprintfl(90, "%s sent proctable entry to slave %d: %s \n", p4_global->proctable[0].host_name, ent+1, pe->host_name); } p4_dprintfl(90, " sending end_of_proc_table\n"); msg.type = p4_i_to_n(PROC_TABLE_END); net_send(fd, &msg, sizeof(msg), P4_FALSE); }# endif}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -