⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 p4_bm.c

📁 MPICH是MPI的重要研究,提供了一系列的接口函数,为并行计算的实现提供了编程环境.
💻 C
📖 第 1 页 / 共 2 页
字号:
		p4_dprintfl(10, "created local slave %d\n", slave_idx);	}	if (slave_pid == 0)	/* At this point, we are the slave. */	{	    sprintf(whoami_p4, "bm_slave_%d_%d", slave_idx, (int)getpid());	    p4_free(p4_local);	/* Doesn't work for weird memory model. */	    p4_local = alloc_local_slave();	    /* Check for environment variables that redirect stdin */	    mpiexec_reopen_stdin();#           ifdef CAN_DO_SOCKET_MSGS	    if (!(p4_global->local_communication_only))	    {#ifdef USE_NONBLOCKING_LISTENER_SOCKETS		/* Set the listener socket to be nonblocking.  */		int rc = p4_make_socket_nonblocking( end_1 );		if (rc < 0) {		    p4_error("create_bm_processes: set listener nonblocking",			     rc);		}#endif /* USE_NONBLOCKING_LISTENER_SOCKETS */		p4_local->listener_fd = end_1;#               if !defined(THREAD_LISTENER)		close(end_2);#               endif		close(listener_fd);	    }#           endif /* CAN_DO_SOCKET_MSGS */	    /* hang for a valid proctable.  Note that the master locks the	     slave lock before is starts creating the slave processes, so	     the initial lock is not acquired until *after* the master 	     releases the lock. */	    p4_lock(&p4_global->slave_lock);	    p4_unlock(&p4_global->slave_lock);#ifdef CAN_DO_SOCKET_MSGS	    /* Wait to install the listener interrupt handler until	       the proctable is valid.  The listener will reissue the	       interrupt if the slave misses because it was waiting on the	       lock around the proctable 	    */#ifndef THREAD_LISTENER	    SIGNAL_P4(LISTENER_ATTN_SIGNAL, handle_connection_interrupt);#endif#endif /* CAN_DO_SOCKET_MSGS */	    p4_local->my_id = p4_get_my_id_from_proc();#if defined(SUN_SOLARIS)/*****  Shyam code, removed by RL	    {	      int no_processors;	      processorid_t bindproc;	      no_processors = sysconf(_SC_NPROCESSORS_ONLN);	      bindproc = (p4_local->my_id) % no_processors;	      if(p_online(bindproc,P_STATUS) != P_ONLINE)		 {		   printf("could not bind slave %d to processor %d",		            p4_local->my_id, bindproc);		 }	      else		 {		   printf("Bound slave %d to processor %d\n",			  p4_local->my_id,bindproc);		   processor_bind(P_PID,P_MYID,bindproc, &bindproc);                   printf("previous binding was %d\n",bindproc);		 }	    }*****/#endif /* SUN_SOLARIS */	    setup_conntab();	    sprintf(whoami_p4, "p%d_%d", p4_local->my_id, (int)getpid());	    usc_init();	    init_usclock();#           ifdef TCMP            tcmp_init(NULL,p4_get_my_cluster_id(),shmem_getclunid());#           endif	    /* 	       sync with master twice: once to make sure all slaves have 	       got proctable, and second after the master has synced with the 	       remote processes 	    */	    p4_barrier(&(p4_global->cluster_barrier),p4_num_cluster_ids());	    p4_barrier(&(p4_global->cluster_barrier),p4_num_cluster_ids());	    p4_dprintfl(20, "local slave starting\n");	    ALOG_SETUP(p4_local->my_id,ALOG_TRUNCATE);	    ALOG_LOG(p4_local->my_id,BEGIN_USER,0,"");	    return (0);	}#   ifdef CAN_DO_SOCKET_MSGS	/* slave holds this end */	close(end_1);#   endif	/* master installing local slaves */	install_in_proctable(0, p4_global->listener_port, slave_pid,			     p4_global->my_host_name, 			     p4_global->my_host_name, 			     slave_idx, P4_MACHINE_TYPE,			     p4_global->proctable[0].switch_port);	p4_global->local_slave_count++;    }#   endif#   if defined(CM5)    for (i=nslaves+1; i < CMMD_partition_size(); i++)        CMMD_send_noblock(i, DIE, &bm_msg, sizeof(struct bm_rm_msg));#   endif#   if defined(NCUBE)    for (i=nslaves+1; i < ncubesize(); i++)	nwrite(&bm_msg, sizeof(struct bm_rm_msg), i, DIE, &unused_flag);#   endif#   if defined(SP1_EUI)    for (i=nslaves+1; i < eui_numtasks; i++)	mpc_bsend(&bm_msg, sizeof(struct bm_rm_msg), i, DIE);#   endif#   if defined(SP1_EUIH)    for (i=nslaves+1; i < euih_numtasks; i++)    {	len = sizeof(struct bm_rm_msg);	type = DIE;	mp_bsend(&bm_msg, &len, &i, &type);    }#   endif    /* Done creating slaves. Now fork off the listener */#   if !defined(IPSC860)  &&  !defined(CM5)  &&  !defined(NCUBE)  &&  !defined(SP1_EUI) && !defined(SP1_EUIH)#   if defined(CAN_DO_SOCKET_MSGS)  &&  !defined(NO_LISTENER) && !defined(THREAD_LISTENER)    if (!(p4_global->local_communication_only))    {	/* communication big master <--> listener */	get_pipe(&end_1, &end_2);	p4_local->listener_fd = end_1;	listener_info->slave_fd[0] = end_2;#ifdef USE_NONBLOCKING_LISTENER_SOCKETS	/* Set the listener socket to be nonblocking.  */	{	    int rc = p4_make_socket_nonblocking( end_1 );	    if (rc < 0) {		p4_error("create_bm_processes: set listener nonblocking",			 rc);	    }	}#endif /* USE_NONBLOCKING_LISTENER_SOCKETS */	/* Now, create the listener */	listener_pid = fork_p4();	if (listener_pid < 0) {	    p4_error("create_bm_processes listener fork", listener_pid);	}	if (listener_pid == 0)	{	    /* I am the listener */	    listener_info->slave_pid[0] = getppid();	    close(end_1);	    sprintf(whoami_p4, "bm_list_%d", (int)getpid());	    /* Inside listener */	    p4_local = alloc_local_listener();	    {		/* exec external listener process */		char *listener_prg = LISTENER_PATHNAME;		if (*listener_prg)		{		    char dbg_c[10], max_c[10], lfd_c[10], sfd_c[10];		    /* 		    p4_error("external listener not supported", 0);		    */		    sprintf(dbg_c, "%d", p4_debug_level);		    sprintf(max_c, "%d", p4_global->max_connections);		    sprintf(lfd_c, "%d", listener_info->listening_fd);		    sprintf(sfd_c, "%d", listener_info->slave_fd[0]);		    p4_dprintfl(70, "exec %s %s %s %s %s\n",				listener_prg, dbg_c, max_c, lfd_c, sfd_c);		    execlp(listener_prg, listener_prg,			   dbg_c, max_c, lfd_c, sfd_c, NULL);		    p4_dprintfl(70, "exec failed (errno= %d), using buildin\n",				errno);		}	    }	    listener();	    exit(0);	}    }#   endif    /* Else we're still in the big master */    sprintf(whoami_p4, "p0_%d", (int)getpid());#   if defined(THREAD_LISTENER)    /*        If there is only one process, then we will not have created a        listener port or corresponding fd.  In that case, we don't need       the listener (it just does an while(1) {accept(listener_fd);...},       so there isn't anything to do if only a single process is running).     */    if (p4_global->listener_fd >= 0) {	p4_dprintfl(50,"creating listener thread\n");	/* pthread_mutex_init( &p4_local->conntab_lock, 0 ); */	p4_create_thread( trc, thread_listener, 66 );	p4_dprintfl(50,"created listener thread\n");    }    /* else	trc = 0; */    /* NT version put the last arg of CreateThread into listener_pid */#   endif    /* We need to close the fds from the listener setup, in big master     * process, slave number 0. */#   if defined(CAN_DO_SOCKET_MSGS)  &&  !defined(NO_LISTENER)     if (!(p4_global->local_communication_only))    {#       if !defined(THREAD_LISTENER)	close(listener_fd);	close(end_2);#       endif	p4_global->listener_pid = listener_pid;    }#   endif#   endif    dump_global(80);    p4_dprintfl(90, "create_bm_processes: exiting\n");    return (nslaves);}P4VOID procgroup_to_proctable( struct p4_procgroup *pg){    int i, j, ptidx;    struct p4_procgroup_entry *pe;    if (strcmp(pg->entries[0].host_name,"local") == 0)    {	strcpy(p4_global->proctable[0].host_name,p4_global->my_host_name);    }    else    {	p4_dprintfl(10,"hostname in first line of procgroup is %s\n",		    pg->entries[0].host_name);	strcpy(p4_global->my_host_name,pg->entries[0].host_name);	strcpy(p4_global->proctable[0].host_name,pg->entries[0].host_name);    }    get_qualified_hostname(p4_global->proctable[0].host_name,HOSTNAME_LEN);    p4_dprintfl(10,"hostname for first entry in proctable is %s\n",		p4_global->proctable[0].host_name);    p4_global->proctable[0].group_id = 0;    ptidx = 1;    for (i=0, pe=pg->entries; i < pg->num_entries; i++, pe++)    {	for (j=0; j < pe->numslaves_in_group; j++)	{	    if (i == 0)		strcpy(p4_global->proctable[ptidx].host_name,		       p4_global->proctable[0].host_name);	    else		strcpy(p4_global->proctable[ptidx].host_name,pe->host_name);	    get_qualified_hostname(p4_global->proctable[ptidx].host_name,				   HOSTNAME_LEN);	    p4_global->proctable[ptidx].group_id = i;#           ifdef CAN_DO_SOCKET_MSGS	    {	    struct hostent *hp = 		gethostbyname_p4(p4_global->proctable[ptidx].host_name);	    struct sockaddr_in *listener_sockaddr = 		&p4_global->proctable[ptidx].sockaddr;#ifdef LAZY_GETHOSTBYNAME	    /* Since we just set the *address* above,we can confirm the	       data next */	    p4_procgroup_setsockaddr( &p4_global->proctable[ptidx] );#endif	    bzero( (P4VOID*) listener_sockaddr, sizeof(struct sockaddr_in) );	    bcopy((P4VOID *) hp->h_addr, 		  (P4VOID *)&listener_sockaddr->sin_addr, 		  hp->h_length);	    listener_sockaddr->sin_family = hp->h_addrtype;	    /* Set a dummy port so that we can detect that the field	       has been initialized */	    listener_sockaddr->sin_port = 1;	    }#           endif	    ptidx++;	}	p4_global->num_in_proctable = ptidx;    }}P4VOID sync_with_remotes(){    struct bm_rm_msg msg;    int i, fd, node, num_rms, rm[P4_MAXPROCS];    p4_dprintfl(90, "sync_with_remotes: starting\n");#   ifdef CAN_DO_SOCKET_MSGS    p4_get_cluster_masters(&num_rms, rm);    for (i = 1; i < num_rms; i++)    {	node = rm[i];	fd = p4_local->conntab[node].port;	net_recv(fd, &msg, sizeof(msg));	msg.type = p4_n_to_i(msg.type);	if (msg.type != SYNC_MSG)	    p4_error("sync_with_remotes: bad type rcvd\n",msg.type);    }    for (i = 1; i < num_rms; i++)    {	node = rm[i];	fd = p4_local->conntab[node].port;	msg.type = p4_i_to_n(SYNC_MSG);	net_send(fd, &msg, sizeof(msg), P4_FALSE);    }#   endif}P4VOID send_proc_table(){    int slave_idx, ent;    int fd;    struct bm_rm_msg msg;    struct proc_info *pe;    p4_dprintfl(90, "send_proc_table: starting\n");#   ifdef CAN_DO_SOCKET_MSGS    for (slave_idx = 1; slave_idx < p4_global->num_in_proctable; slave_idx++)    {	if (p4_global->proctable[slave_idx].slave_idx != 0)	    continue;	fd = p4_local->conntab[slave_idx].port;	p4_dprintfl(90, "sending proctable to slave %d on %d:\n", slave_idx, fd);	if (fd < 0)	    p4_error("send_proc_table: rm entry doesn't have valid fd", fd);	for (ent = 0, pe = p4_global->proctable;	     ent < p4_global->num_in_proctable; ent++, pe++)	{	    msg.type = p4_i_to_n(PROC_TABLE_ENTRY);	    msg.port = p4_i_to_n(pe->port);	    msg.unix_id = p4_i_to_n(pe->unix_id);	    msg.slave_idx = p4_i_to_n(pe->slave_idx);	    msg.group_id = p4_i_to_n(pe->group_id);	    strcpy(msg.host_name, pe->host_name);	    strcpy(msg.local_name, pe->local_name);	    strcpy(msg.machine_type,pe->machine_type);	    msg.switch_port = p4_i_to_n(pe->switch_port);	    net_send(fd, &msg, sizeof(msg), P4_FALSE);	    p4_dprintfl(90, "%s sent proctable entry to slave %d: %s \n",                         p4_global->proctable[0].host_name,                        ent+1, pe->host_name);	}	p4_dprintfl(90, "  sending end_of_proc_table\n");	msg.type = p4_i_to_n(PROC_TABLE_END);	net_send(fd, &msg, sizeof(msg), P4_FALSE);    }#   endif}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -