⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 p4_bm.c

📁 MPICH是MPI的重要研究,提供了一系列的接口函数,为并行计算的实现提供了编程环境.
💻 C
📖 第 1 页 / 共 2 页
字号:
#include "p4.h"#include "p4_sys.h"/* extern struct fdentry fdtable[MAXFDENTRIES]; */void p4_peer_msg_handler(char *);int bm_start(int *argc, char **argv){    int bm_switch_port;    char *s, pgmname[P4_MAX_PGM_LEN];    int rc, myjsize, myrank;    BNR_Group mygroup;    setbuf( stdout, NULL );  /* turn off buffering for clients */    alloc_global();  /* sets p4_global */    p4_global->local_communication_only = P4_FALSE;  /* hard code for mpd */    p4_local = alloc_local_bm();    if (p4_local == NULL)	p4_error("p4_initenv: alloc_local_bm failed\n", 0);    rc = BNR_Pre_init( p4_peer_msg_handler );  /* specific to mpich-1 */    rc = BNR_Init( );    rc = BNR_Get_group( &mygroup );    rc = BNR_Get_rank( mygroup, &myrank );    rc = BNR_Get_size( mygroup, &myjsize );    p4_dprintfl(10,"IGNORING SIGPIPE\n");    SIGNAL_P4(SIGPIPE,SIG_IGN);        sprintf(whoami_p4, "p%d_%d", myrank, (int)getpid());    p4_global->num_in_proctable = myjsize; /* there really isn't any proctable  */    p4_local->my_id	    = myrank;    p4_local->my_job	    = BNR_Get_group_id( mygroup );  /* default jobid for now */    setup_conntab();    /* get fd for talking to the manager from the environment, where he put it after       acquiring it. */    rc = BNR_Man_msgs_fd( &(p4_local->parent_man_fd) );    /* choose a working directory */    if (strlen(p4_wd) && !chdir(p4_wd))    {	p4_dprintfl(90, "working directory for %d set to %s\n", p4_local->my_id, p4_wd);    }    else    {	strncpy(pgmname,argv[0],P4_MAX_PGM_LEN);	pgmname[P4_MAX_PGM_LEN-1] = 0;	if ((s = (char *) rindex(pgmname,'/'))  !=  NULL)	{	    *s = '\0';  /* chg to directory name only */	    chdir(pgmname);	}    }#   ifdef SYSV_IPC    sysv_num_shmids = 0;    sysv_shmid[0]  = -1;    sysv_semid0    = -1;    sysv_semid0 = init_sysv_semset(0);#   endif    MD_initmem(globmemsize);    if (*bm_outfile)    {	freopen(bm_outfile, "w", stdout);	freopen(bm_outfile, "w", stderr);    }    MD_initenv();    bm_switch_port = getswport(p4_global->my_host_name);    usc_init();    init_usclock();    /* big master installing himself */    install_in_proctable(0, (-1), getpid(), p4_global->my_host_name, 			 p4_global->my_host_name, 			 0, P4_MACHINE_TYPE, bm_switch_port);    BNR_Fence( mygroup ); /* to make sure p4 data structures are set before interrupts			     can occur */    return (0);}void p4_peer_msg_handler( msg )char *msg;{    char cmd[32] /*, tohostname[MAXLINE]*/;    char c_torank[32],c_toport[32],c_toipaddr[32];    int flags,torank,toport,connection_fd,num_tries,rc,connected,optval;    int myid = p4_get_my_id();    unsigned int toipaddr;    struct sockaddr_in sa;    p4_dprintfl(077,"p4_peer_msg_handler entered with msg :%s:\n",msg );    if (strncmp(msg,"connect_to_me-",14) != 0)    {	p4_dprintf("invalid msg in p4_peer_msg_handler :%s:\n",msg);	return;    }    sscanf(msg,"%[^-]-%[^-]-%[^-]-%s",cmd,c_torank,c_toipaddr,c_toport);    torank = atoi(c_torank);    toport = atoi(c_toport);    toipaddr = inet_addr(c_toipaddr);    bzero((void *)&sa, sizeof(struct sockaddr_in));    bcopy((void *)&toipaddr, (void *)&sa.sin_addr, sizeof(unsigned int));    sa.sin_family = AF_INET;    sa.sin_port = htons(toport);    if (p4_local->conntab[torank].type == CONN_REMOTE_EST)    {	p4_dprintfl(077, "p4_peer_msg_handler: already conn'd to %d\n", torank);	return;    }    if (p4_global->dest_id[myid] == torank)  /* already connecting */    {	if (myid > torank)	{	    p4_dprintfl(077, "p4_peer_msg_handler: already making conn to %d\n", torank);	    return;	}    }    connected = 0;    num_tries = 3;    while (!connected && num_tries) {	SYSCALL_P4(connection_fd, socket(AF_INET, SOCK_STREAM, 0));	if (connection_fd < 0)	    p4_error("p4_peer_msg_handler socket", connection_fd);	SYSCALL_P4(rc, connect(connection_fd, (struct sockaddr *) &sa,			       sizeof(struct sockaddr_in)));	if (rc < 0)	{	    close(connection_fd);	    p4_dprintfl( 077, "Connect failed; closed socket %d\n", connection_fd );	    if (--num_tries)	    {		p4_dprintfl(077,"p4_peer_msg_handler: connect to %s failed; will try %d more times \n",c_toipaddr,num_tries);		sleep(1);	    }	}	else	{	    connected = 1;	    p4_dprintfl(077,"p4_peer_msg_handler: connected to %s\n",c_toipaddr);	}    }    if ( ! connected)    {        p4_dprintf("p4_peer_msg_handler: failed connect to %s\n",c_toipaddr);        p4_error("failed to connnect",-1);    }    flags = fcntl(connection_fd, F_GETFL, 0);    if (flags < 0)        p4_error("p4_bm fcntl1", flags); #   if defined(HP)    flags |= O_NONBLOCK;#   else    flags |= O_NDELAY;    p4_dprintfl(90, "p4_bm: setting ndelay for %d\n",connection_fd);#   endif   #   if defined(RS6000)    flags |= O_NONBLOCK;#   endif       flags = fcntl(connection_fd, F_SETFL, flags);     if (flags < 0)        p4_error("p4_bm fcntl2", flags);     optval = 1;    SYSCALL_P4(rc, setsockopt(connection_fd,IPPROTO_TCP,TCP_NODELAY,(char *) &optval,sizeof(optval)));    p4_local->conntab[torank].type = CONN_REMOTE_EST;    p4_local->conntab[torank].port = connection_fd;    p4_local->conntab[torank].same_data_rep = P4_TRUE;/*    p4_dprintfl(077, "p4_peer_msg_handler: connected after %d tries, connection_fd=%d host = %s\n",      num_tries, connection_fd, tohostname); */    p4_dprintfl(077, "p4_peer_msg_handler: connected after %d tries, connection_fd=%d\n",		num_tries, connection_fd);    /* We're connected, so we can add this connection to the table */    p4_dprintfl(077, "marked as established fd=%d torank=%d\n",		connection_fd, torank);    p4_dprintfl(077,"p4_peer_msg_handler done\n" );}int p4_startup(pg)struct p4_procgroup *pg;{    int nslaves;    int listener_port, listener_fd;    p4_dprintfl(90,"entering p4_startup\n");    if (p4_global == NULL)	p4_error("p4 not initialized; perhaps p4_initenv not called",0);/* On some systems (SGI IRIX 6), process exit sometimes kills all processes   in the process GROUP.  This code attempts to fix that.     We DON'T do it if stdin (0) is connected to a terminal, because that   disconnects the process from the terminal. */#if defined(HAVE_SETSID) && defined(HAVE_ISATTY) && defined(SET_NEW_PGRP)if (!isatty(0)) {    pid_t rc;    rc = setsid();    if (rc < 0) {	p4_dprintfl( 90, "Could not create new process group\n" );	}    else {	p4_dprintfl( 80, "Created new process group %d\n", rc );	}    }else {	p4_dprintfl( 80,          "Did not created new process group because isatty returned true\n" );    }#endif    procgroup_to_proctable(pg);    if (pg->num_entries > 1)	p4_global->local_communication_only = P4_FALSE;#   ifdef CAN_DO_SOCKET_MSGS    if (!p4_global->local_communication_only)    {	net_setup_anon_listener(10, &listener_port, &listener_fd);	p4_global->listener_port = listener_port;	p4_global->listener_fd = listener_fd;	p4_dprintfl(90, "setup listener on port %d fd %d\n",		    listener_port, listener_fd);	p4_global->proctable[0].port = listener_port;#ifndef THREAD_LISTENER	SIGNAL_P4(LISTENER_ATTN_SIGNAL, handle_connection_interrupt);#endif    }    else	p4_global->listener_fd = -1;#   endif    setup_conntab();    p4_lock(&p4_global->slave_lock);    if ((nslaves = create_bm_processes(pg)) < 0)	return (-1);    if (!p4_am_i_cluster_master())  /* I was forked in create_bm_processes */	return(0);#   ifdef CAN_DO_SOCKET_MSGS    if (create_remote_processes(pg) < 0)	return (-1);#   endif    /* let local slaves use proc table to identify themselves */    p4_unlock(&p4_global->slave_lock);     send_proc_table();  /* to remote masters */#   if defined(IPSC860)  ||  defined(CM5)     ||  defined(NCUBE)  \                         ||  defined(SP1_EUI) ||  defined(SP1_EUIH)     {    struct bm_rm_msg bm_msg;    struct p4_procgroup_entry *local_pg;    int len, to, type;    int i, unused_flag;    /* send initial info and proctable to local slaves */    /* must use p4_i_to_n procs because node slave        does not know if the msg is forwarded from bm */    local_pg = &(pg->entries[0]);    bm_msg.type = p4_i_to_n(INITIAL_INFO);    bm_msg.numinproctab = p4_i_to_n(p4_global->num_in_proctable);    bm_msg.numslaves = p4_i_to_n(local_pg->numslaves_in_group);    bm_msg.debug_level = p4_i_to_n(p4_remote_debug_level);    bm_msg.memsize = p4_i_to_n(globmemsize);    bm_msg.logging_flag = p4_i_to_n(logging_flag);    strcpy(bm_msg.application_id, p4_global->application_id);    strcpy(bm_msg.version, P4_PATCHLEVEL);    if (strlen( local_pg->slave_full_pathname ) >= P4_MAX_PGM_LEN) {	p4_error("Program name is too long, must be less than", 		 P4_MAX_PGM_LEN);    }    strcpy(bm_msg.pgm, local_pg->slave_full_pathname);    strcpy(bm_msg.wdir, p4_wd);    for (i = 1; i <= nslaves; i++)    {	p4_dprintfl(90,"sending initinfo to slave %d of %d\n",i,nslaves);#       if defined(IPSC860)	csend((long) INITIAL_INFO, &bm_msg, (long) sizeof(struct bm_rm_msg), 	      (long) i, (long) NODE_PID);	csend((long) INITIAL_INFO, p4_global->proctable, 	      (long) sizeof(p4_global->proctable), (long) i, (long) NODE_PID);#       endif#       if defined(CM5)	CMMD_send_noblock(i, INITIAL_INFO, &bm_msg, sizeof(struct bm_rm_msg));	CMMD_send_noblock(i, INITIAL_INFO, p4_global->proctable, sizeof(p4_global->proctable));#       endif#       if defined(NCUBE)	nwrite(&bm_msg, sizeof(struct bm_rm_msg), i, INITIAL_INFO, &unused_flag);	nwrite(p4_global->proctable, sizeof(p4_global->proctable), i, INITIAL_INFO, &unused_flag);#       endif#       if defined(SP1_EUI)	mpc_bsend(&bm_msg, sizeof(struct bm_rm_msg), i, INITIAL_INFO);	mpc_bsend(p4_global->proctable, sizeof(p4_global->proctable), i, INITIAL_INFO);#       endif#       if defined(SP1_EUIH)	len  = sizeof(struct bm_rm_msg);	to   = i;	type = INITIAL_INFO; 	mp_bsend(&bm_msg, &len, &to, &type); 	len  = sizeof(p4_global->proctable);	mp_bsend(p4_global->proctable, &len, &to, &type);#       endif	p4_dprintfl(90,"sent initinfo to slave %d of %d\n",i,nslaves);    }     }   /* End of local declarations */#   endif    p4_global->low_cluster_id = 	p4_local->my_id - p4_global->proctable[p4_local->my_id].slave_idx;    p4_global->hi_cluster_id = 	p4_global->low_cluster_id + p4_global->local_slave_count + 1;    /*        sync with local slaves thus insuring that they have the proctable before        syncing with remotes (this keeps remotes from interrupting the local        processes too early; then re-sync with local slaves (thus permitting them       to interrupt remotes)    */    p4_barrier(&(p4_global->cluster_barrier),p4_num_cluster_ids());    /*        NEED A SYNC WITH LOCALS THAT DOES A BARRIER WITH PROCS THAT SHARE       MEMORY AND MP BARRIER WITH OTHER "LOCAL" PROCESSES     */    sync_with_remotes();    p4_barrier(&(p4_global->cluster_barrier),p4_num_cluster_ids());    return (0);}int create_bm_processes(pg)struct p4_procgroup *pg;{    struct p4_procgroup_entry *local_pg;    struct listener_data *l = NULL;    int nslaves, end_1, end_2;    int slave_pid, listener_pid = -1;    int slave_idx, listener_fd = -1;#   if defined(IPSC860)  ||  defined(CM5)  ||  defined(NCUBE)  ||  defined(SP1_EUI) || defined(SP1_EUIH)    /* Message passing systems require additional information */    struct bm_rm_msg bm_msg;    int i;    int port, switch_port, type, len, from, unused_flag#endif#   if defined(THREAD_LISTENER)    p4_thread_t trc;#   endif    p4_dprintfl(90,"entering create_bm_processes\n");    local_pg = &(pg->entries[0]);    nslaves = local_pg->numslaves_in_group;#   if !defined(IPSC860)  &&  !defined(CM5)  &&  !defined(NCUBE)  &&  !defined(SP1_EUI) && !defined(SP1_EUIH)    if (nslaves > P4_MAX_MSG_QUEUES)	p4_error("more slaves than msg queues \n", nslaves);#   endif/* alloc listener local data since this proc will eventually become listener */#   if defined(CAN_DO_SOCKET_MSGS)  &&  !defined(NO_LISTENER)    if (!(p4_global->local_communication_only))    {	listener_fd = p4_global->listener_fd;	listener_info = alloc_listener_info(1);	l = listener_info;	get_pipe(&end_1, &end_2); /* used even by thread listener */	l->slave_fd[0] = end_2;    }#   endif

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -