📄 p4_bm.c
字号:
# ifdef TCMP tcmp_init(NULL,p4_get_my_cluster_id(),shmem_getclunid());# endif# if defined(IPSC860) || defined(CM5) || defined(NCUBE) || defined(SP1_EUI) || defined(SP1_EUIH) for (i = 1; i <= nslaves; i++) { p4_dprintfl(90,"doing initial sync with local slave %d\n",i);# if defined(IPSC860) csend((long) SYNC_MSG, &bm_msg, (long) sizeof(struct bm_rm_msg), (long) i, (long) NODE_PID); crecv(INITIAL_INFO, &bm_msg, (long) sizeof(struct bm_rm_msg));# endif# if defined(CM5) CMMD_send_noblock(i, SYNC_MSG, &bm_msg, sizeof(struct bm_rm_msg)); CMMD_receive(CMMD_ANY_NODE, INITIAL_INFO, (void *) &bm_msg, sizeof(struct bm_rm_msg));# endif# if defined(NCUBE) nwrite(&bm_msg, sizeof(struct bm_rm_msg), i, SYNC_MSG, &unused_flag); from = NCUBE_ANY_NODE; type = INITIAL_INFO; nread(&bm_msg, sizeof(struct bm_rm_msg), &from, &type, &unused_flag);# endif# if defined(SP1_EUI) mpc_bsend(&bm_msg, sizeof(struct bm_rm_msg), i, SYNC_MSG); from = ANY_P4TYPE_EUI; type = INITIAL_INFO; mpc_brecv(&bm_msg, sizeof(struct bm_rm_msg), &from, &type, &unused_flag);# endif# if defined(SP1_EUIH) len = sizeof(struct bm_rm_msg); type = SYNC_MSG; mp_bsend(&bm_msg, &len, &i, &type); from = ANY_P4TYPE_EUIH; type = INITIAL_INFO; len = sizeof(struct bm_rm_msg); mp_brecv(&bm_msg, &len, &from, &type, &unused_flag);# endif port = p4_n_to_i(bm_msg.port); slave_idx = p4_n_to_i(bm_msg.slave_idx); slave_pid = p4_n_to_i(bm_msg.slave_pid); switch_port = p4_n_to_i(bm_msg.switch_port); /* big master installing local slaves */ install_in_proctable(0, port, slave_pid, bm_msg.host_name, bm_msg.local_name, slave_idx, P4_MACHINE_TYPE, switch_port); p4_global->local_slave_count++; }# else#if defined(SUN_SOLARIS)/***** Shyam code, removed by RL { processorid_t proc = 0; if(p_online(proc,P_STATUS) != P_ONLINE) printf("Could not bind parent to processor 0\n"); else { processor_bind(P_PID,P_MYID,proc, &proc); printf("Bound parent to processor 0 , previous binding was %d\n", proc); } }*****/#endif for (slave_idx = 1; slave_idx <= nslaves; slave_idx++) { p4_dprintfl(20, "creating local slave %d of %d\n",slave_idx,nslaves); slave_pid = fork_p4(); if (slave_pid < 0) p4_error("create_bm_processes fork", slave_pid); else if (slave_pid) p4_dprintfl(10, "created local slave %d\n", slave_idx); if (slave_pid == 0) /* At this point, we are the slave. */ { sprintf(whoami_p4, "bm_slave_%d_%d", slave_idx, (int)getpid()); p4_free(p4_local); /* Doesn't work for weird memory model. */ p4_local = alloc_local_slave();# ifdef CAN_DO_SOCKET_MSGS if (!(p4_global->local_communication_only)) { p4_local->listener_fd = end_1;# if !defined(THREAD_LISTENER) close(end_2);# endif close(listener_fd); }#ifndef THREAD_LISTENER SIGNAL_P4(LISTENER_ATTN_SIGNAL, handle_connection_interrupt);#endif# endif /* hang for a valid proctable */ p4_lock(&p4_global->slave_lock); p4_unlock(&p4_global->slave_lock); p4_local->my_id = p4_get_my_id_from_proc();#if defined(SUN_SOLARIS)/***** Shyam code, removed by RL { int no_processors; processorid_t bindproc; no_processors = sysconf(_SC_NPROCESSORS_ONLN); bindproc = (p4_local->my_id) % no_processors; if(p_online(bindproc,P_STATUS) != P_ONLINE) { printf("could not bind slave %d to processor %d", p4_local->my_id, bindproc); } else { printf("Bound slave %d to processor %d\n", p4_local->my_id,bindproc); processor_bind(P_PID,P_MYID,bindproc, &bindproc); printf("previous binding was %d\n",bindproc); } }*****/#endif setup_conntab(); sprintf(whoami_p4, "p%d_%d", p4_local->my_id, (int)getpid()); usc_init(); init_usclock();# ifdef TCMP tcmp_init(NULL,p4_get_my_cluster_id(),shmem_getclunid());# endif /* sync with master twice: once to make sure all slaves have got proctable, and second after the master has synced with the remote processes */ p4_barrier(&(p4_global->cluster_barrier),p4_num_cluster_ids()); p4_barrier(&(p4_global->cluster_barrier),p4_num_cluster_ids()); p4_dprintfl(20, "local slave starting\n"); ALOG_SETUP(p4_local->my_id,ALOG_TRUNCATE); ALOG_LOG(p4_local->my_id,BEGIN_USER,0,""); return (0); } /* master installing local slaves */ install_in_proctable(0, p4_global->listener_port, slave_pid, p4_global->my_host_name, p4_global->my_host_name, slave_idx, P4_MACHINE_TYPE, p4_global->proctable[0].switch_port); p4_global->local_slave_count++; }# endif# if defined(CM5) for (i=nslaves+1; i < CMMD_partition_size(); i++) CMMD_send_noblock(i, DIE, &bm_msg, sizeof(struct bm_rm_msg));# endif# if defined(NCUBE) for (i=nslaves+1; i < ncubesize(); i++) nwrite(&bm_msg, sizeof(struct bm_rm_msg), i, DIE, &unused_flag);# endif# if defined(SP1_EUI) for (i=nslaves+1; i < eui_numtasks; i++) mpc_bsend(&bm_msg, sizeof(struct bm_rm_msg), i, DIE);# endif# if defined(SP1_EUIH) for (i=nslaves+1; i < euih_numtasks; i++) { len = sizeof(struct bm_rm_msg); type = DIE; mp_bsend(&bm_msg, &len, &i, &type); }# endif /* Done creating slaves. Now fork off the listener */# if !defined(IPSC860) && !defined(CM5) && !defined(NCUBE) && !defined(SP1_EUI) && !defined(SP1_EUIH)# if defined(CAN_DO_SOCKET_MSGS) && !defined(NO_LISTENER) && !defined(THREAD_LISTENER) if (!(p4_global->local_communication_only)) { listener_pid = fork_p4(); if (listener_pid < 0) p4_error("create_bm_processes listener fork", listener_pid); if (listener_pid == 0) { sprintf(whoami_p4, "bm_list_%d", (int)getpid()); /* Inside listener */ p4_local = alloc_local_listener(); l->listening_fd = listener_fd; l->slave_fd[0] = end_2; close(end_1); { /* exec external listener process */ char *listener_prg = LISTENER_PATHNAME; if (*listener_prg) { char dbg_c[10], max_c[10], lfd_c[10], sfd_c[10]; sprintf(dbg_c, "%d", p4_debug_level); sprintf(max_c, "%d", p4_global->max_connections); sprintf(lfd_c, "%d", l->listening_fd); sprintf(sfd_c, "%d", l->slave_fd[0]); p4_dprintfl(70, "exec %s %s %s %s %s\n", listener_prg, dbg_c, max_c, lfd_c, sfd_c); execlp(listener_prg, listener_prg, dbg_c, max_c, lfd_c, sfd_c, NULL); p4_dprintfl(70, "exec failed (errno= %d), using buildin\n", errno); } } listener(); exit(0); } }# endif /* Else we're still in the big master */ sprintf(whoami_p4, "p0_%d", (int)getpid());# if defined(THREAD_LISTENER) /* If there is only one process, then we will not have created a listener port or corresponding fd. In that case, we don't need the listener (it just does an while(1) {accept(listener_fd);...}, so there isn't anything to do if only a single process is running). */ if (p4_global->listener_fd >= 0) { p4_dprintfl(50,"creating listener thread\n"); /* pthread_mutex_init( &p4_local->conntab_lock, 0 ); */ p4_create_thread( trc, thread_listener, 66 ); p4_dprintfl(50,"created listener thread\n"); } /* else trc = 0; */ /* NT version put the last arg of CreateThread into listener_pid */# endif /* We need to close the fds from the listener setup */# if defined(CAN_DO_SOCKET_MSGS) && !defined(NO_LISTENER) if (!(p4_global->local_communication_only)) { p4_local->listener_fd = end_1;# if !defined(THREAD_LISTENER) close(listener_fd); close(end_2);# endif p4_global->listener_pid = listener_pid; }# endif# endif dump_global(80); p4_dprintfl(90, "create_bm_processes: exiting\n"); return (nslaves);}P4VOID procgroup_to_proctable(pg)struct p4_procgroup *pg;{ int i, j, ptidx; struct p4_procgroup_entry *pe; if (strcmp(pg->entries[0].host_name,"local") == 0) { strcpy(p4_global->proctable[0].host_name,p4_global->my_host_name); } else { p4_dprintfl(10,"hostname in first line of procgroup is %s\n", pg->entries[0].host_name); strcpy(p4_global->my_host_name,pg->entries[0].host_name); strcpy(p4_global->proctable[0].host_name,pg->entries[0].host_name); } get_qualified_hostname(p4_global->proctable[0].host_name, HOSTNAME_LEN); p4_dprintfl(10,"hostname for first entry in proctable is %s\n", p4_global->proctable[0].host_name); p4_global->proctable[0].group_id = 0; ptidx = 1; for (i=0, pe=pg->entries; i < pg->num_entries; i++, pe++) { for (j=0; j < pe->numslaves_in_group; j++) { if (i == 0) strcpy(p4_global->proctable[ptidx].host_name, p4_global->proctable[0].host_name); else strcpy(p4_global->proctable[ptidx].host_name,pe->host_name); get_qualified_hostname(p4_global->proctable[ptidx].host_name, HOSTNAME_LEN); p4_global->proctable[ptidx].group_id = i;# ifdef CAN_DO_SOCKET_MSGS { struct hostent *hp = gethostbyname_p4(p4_global->proctable[ptidx].host_name); struct sockaddr_in *listener_sin = &p4_global->proctable[ptidx].sockaddr; bzero( (P4VOID*) listener_sin, sizeof(struct sockaddr_in) ); bcopy((P4VOID *) hp->h_addr, (P4VOID *)&listener_sin->sin_addr, hp->h_length); listener_sin->sin_family = hp->h_addrtype; /* Set a dummy port so that we can detect that the field has been initialized */ listener_sin->sin_port = 1; }# endif ptidx++; } p4_global->num_in_proctable = ptidx; }}P4VOID sync_with_remotes(){ struct bm_rm_msg msg; int i, fd, node, num_rms, rm[P4_MAXPROCS]; p4_dprintfl(90, "sync_with_remotes: starting\n");# ifdef CAN_DO_SOCKET_MSGS p4_get_cluster_masters(&num_rms, rm); for (i = 1; i < num_rms; i++) { node = rm[i]; fd = p4_local->conntab[node].port; net_recv(fd, &msg, sizeof(msg)); msg.type = p4_n_to_i(msg.type); if (msg.type != SYNC_MSG) p4_error("sync_with_remotes: bad type rcvd\n",msg.type); } for (i = 1; i < num_rms; i++) { node = rm[i]; fd = p4_local->conntab[node].port; msg.type = p4_i_to_n(SYNC_MSG); net_send(fd, &msg, sizeof(msg), P4_FALSE); }# endif}P4VOID send_proc_table(){ int slave_idx, ent; int fd; struct bm_rm_msg msg; struct proc_info *pe; p4_dprintfl(90, "send_proc_table: starting\n");# ifdef CAN_DO_SOCKET_MSGS for (slave_idx = 1; slave_idx < p4_global->num_in_proctable; slave_idx++) { if (p4_global->proctable[slave_idx].slave_idx != 0) continue; fd = p4_local->conntab[slave_idx].port; p4_dprintfl(90, "sending proctable to slave %d on %d:\n", slave_idx, fd); if (fd < 0) p4_error("send_proc_table: rm entry doesn't have valid fd", fd); for (ent = 0, pe = p4_global->proctable; ent < p4_global->num_in_proctable; ent++, pe++) { msg.type = p4_i_to_n(PROC_TABLE_ENTRY); msg.port = p4_i_to_n(pe->port); msg.unix_id = p4_i_to_n(pe->unix_id); msg.slave_idx = p4_i_to_n(pe->slave_idx); msg.group_id = p4_i_to_n(pe->group_id); strcpy(msg.host_name, pe->host_name); strcpy(msg.machine_type,pe->machine_type); msg.switch_port = p4_i_to_n(pe->switch_port); net_send(fd, &msg, sizeof(msg), P4_FALSE); p4_dprintfl(90, "%s sent proctable entry to slave %d: %s \n", p4_global->proctable[0].host_name, ent+1, pe->host_name); } p4_dprintfl(90, " sending end_of_proc_table\n"); msg.type = p4_i_to_n(PROC_TABLE_END); net_send(fd, &msg, sizeof(msg), P4_FALSE); }# endif}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -