⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 p4_sock_list.c

📁 MPICH是MPI的重要研究,提供了一系列的接口函数,为并行计算的实现提供了编程环境.
💻 C
📖 第 1 页 / 共 2 页
字号:
 *   established.  This is the only time a socket is created and later closed. * *   Because this is so different from the process listener, there is a  *   separate establish_connection routine. * *   Why choose the lower rank to establish the connection?  Because the *   first round of connections is from the master, at rank 0.  Additional *   connections as part of the initial distribution tree are also from low *   to high rank.  This reduces the number of connections that are made. */P4VOID thread_listener( void ){    struct slave_listener_msg msg;    int type;    int connection_fd;    int from, lport, to_pid, to;/*    int do_conn;    P4BOOL rc = P4_FALSE;    struct proc_info *from_pi; */    fd_set read_fds;    int    nfds, nfds_in;    int    msglen;  p4_dprintfl(70,"TL: thread listener starting\n");  while(1)  {      p4_dprintfl(70, "TL: thread listener starting select on fd=%d port=%d\n",		  p4_global->listener_fd,p4_global->listener_port);            FD_ZERO(&read_fds);      FD_SET(p4_global->listener_fd, &read_fds);      /* This version only works with single-slaves */       if (listener_info->num > 1) {	  p4_error( "Threaded listener does not support multiple slaves", 		    listener_info->num );      }      FD_SET(listener_info->slave_fd[0], &read_fds);      nfds_in = p4_global->listener_fd;      if (listener_info->slave_fd[0] > nfds_in) nfds_in = listener_info->slave_fd[0];       nfds_in++;      SYSCALL_P4(nfds, select(nfds_in, &read_fds, 0, 0, 0));      if (nfds < 0)	  p4_error("listener select", nfds);      if (nfds == 0) {	  p4_dprintfl(70, "TL: select timeout\n");	  continue;      }      /* Process remote connection requests first */      if (FD_ISSET(p4_global->listener_fd, &read_fds)) {	  /* Accept connection, get message */	  p4_dprintfl( 70, "TL: starting accept\n" );	  connection_fd = net_accept(p4_global->listener_fd);	  p4_dprintfl(70, 		  "TL: thread listener accepted on %d, got connection_fd=%d\n",		      p4_global->listener_fd, connection_fd);	  	  if ((msglen = net_recv_timeout(connection_fd, &msg, sizeof(msg),10)) == PRECV_EOF)	  {	      p4_dprintf("TL: thread listener detected EOF on fd=%d\n",			 connection_fd);	      p4_error("thread listener detected EOF",-1);	  }	  if (msglen != sizeof(msg)) {	      p4_dprintf("TL: message was wrong size (%d)\n", msglen );	      close(connection_fd);	  }	  type = p4_n_to_i(msg.type);	  switch (type) 	  {	  case IGNORE_THIS:	      p4_dprintfl(70, "TL: got IGNORE_THIS\n");	      break;	  case CONNECTION_REQUEST:	      from   = p4_n_to_i(msg.from);	      to_pid = p4_n_to_i(msg.to_pid);	      to     = p4_n_to_i(msg.to);	      lport  = p4_n_to_i(msg.lport);	      if (lport != -1) {		  /* Message from non-threaded listener! */	      }	      p4_dprintfl(70, 	      "TL: got connection_request: from=%d lport=%d to_pid=%d to=%d\n",			  from, lport, to_pid, to);	      if (p4_local->conntab[from].type == CONN_REMOTE_NON_EST) {		  /* Establish the connection */		  /* p4_local->conntab[from].type = CONN_REMOTE_OPENING; */		  p4_dprintfl(70, "TL: connection now opening for %d\n", 			      from );		  		  if (p4_local->my_id < from) 		  {		      int new_connection_fd;		      p4_dprintfl(90,"TL: myid < from, myid = %d, from = %d\n",				  p4_local->my_id,from);		      /* Create a connection back to "from".  We could use		       the same socket, but using the same request code 		       is easier */		      new_connection_fd = request_connection(from);		      if (new_connection_fd < 0) {			  p4_error( "Could not create new connection", 				    new_connection_fd );		      }		      close( connection_fd );		      connection_fd = new_connection_fd;		      /* We now have the socket for the connection */		  }		  /* This is the new socket.  Just keep it. */		  p4_local->conntab[from].port = connection_fd;		  p4_local->conntab[from].same_data_rep =		      same_data_representation(p4_local->my_id,from);		  /* Requires write ordering in the thread */		  p4_local->conntab[from].type = CONN_REMOTE_EST;		  		  /* Send dummy message to P */		  p4_dprintfl(70,"TL: sending dummy msg on fd=%d\n",			      listener_info->slave_fd[0]);		  net_send(listener_info->slave_fd[0], &msg, sizeof(msg), 			   P4_FALSE);		  p4_dprintfl(70,"TL: sent dummy msg on fd=%d\n",			      listener_info->slave_fd[0]);	      }	      else {		  /* If the connection is in any other state, we've already		     connected it and have nothing to do (connections passed) 		   */		  close( connection_fd );	      }	      break;	  default:	      p4_dprintf("TL: invalid type %d in process_connect_request\n", 			 type);	      break;	  }      }      if (FD_ISSET(listener_info->slave_fd[0], &read_fds)) {	  p4_dprintfl( 70, "TL: connection request from slave\n" );	  /* Read this message */	  net_recv( listener_info->slave_fd[0], &msg, sizeof(msg) );	  from   = p4_n_to_i(msg.from);	  to_pid = p4_n_to_i(msg.to_pid);	  to     = p4_n_to_i(msg.to);	  lport  = p4_n_to_i(msg.lport);	  /* We may have established this connection while the slave	     was sending this request */	  if (p4_local->conntab[to].type == CONN_REMOTE_EST) {	      /* Nothing to do - we've already sent the est message */	      continue;	  }	  else {	      /* Send request connection */	      p4_dprintfl( 70, "TL: Slave requests a connection to %d\n", to );	      connection_fd = request_connection( to );	      if (connection_fd < 0) {		  p4_error( "Unable to get connection fd", connection_fd );	      }	      p4_dprintfl( 70, "TL: connection ready on fd=%d\n", 			   connection_fd );	      /* Send request ready */	      if (p4_local->my_id < to) {		  /* This is the new socket.  Just keep it. */		  p4_local->conntab[to].port = connection_fd;		  p4_local->conntab[to].same_data_rep =		      same_data_representation(p4_local->my_id,to);		  /* Requires write ordering in the thread */		  p4_local->conntab[to].type = CONN_REMOTE_EST;	      		  /* Send dummy message to P */		  p4_dprintfl(70,"TL: sending dummy msg on fd=%d\n",			      listener_info->slave_fd[0]);		  net_send(listener_info->slave_fd[0], &msg, sizeof(msg), 			   P4_FALSE);		  p4_dprintfl(70,"TL: sent dummy msg on fd=%d\n",			      listener_info->slave_fd[0]);	      }	      else {		  /* Otherwise, we need to wait for the connection to come		     from the other end. This connection is no longer needed */		  close( connection_fd );	      }	  }      }  }}/*  * This routine should only be called when the connection is not  * established.  It is called ONLY by the process, which is waiting for the * listener thread to complete. */int establish_connection(int dest_id){    int                       myid = p4_get_my_id();    struct slave_listener_msg msg;    struct proc_info          *dest_pi;    p4_dprintfl( 80, 	     "TL: Sending request to listener to open connection with %d\n", 		 dest_id );    /*      * Send message to local listener requesting connection to dest_id     * (process listener code uses p4_global->dest_id[myid] = dest_id/-1     * to lock/unlock around the request.  We don't need to do that.     */    dest_pi    = get_proc_info(dest_id);    msg.type   = p4_i_to_n(CONNECTION_REQUEST);    msg.from   = p4_i_to_n(myid);    msg.lport  = p4_i_to_n(-1);    msg.to     = p4_i_to_n(dest_id);    msg.to_pid = p4_i_to_n(dest_pi->unix_id);    net_send( p4_local->listener_fd, &msg, sizeof(msg), P4_FALSE );    /* Wait for thread to complete the request */    while (p4_local->conntab[dest_id].type == CONN_REMOTE_NON_EST) {        p4_dprintfl( 80, "TL: Waiting for message from listener thread\n" );        net_recv( p4_local->listener_fd, &msg, sizeof(msg) );        /* Just discard message */    }    p4_dprintfl(70, "TL :Connection established\n");    return (P4_TRUE);}/*  * Send a connection request from one listener to another listener.  Return  * the socket created for the request. */int request_connection(dest_id)int dest_id;{    struct proc_info *my_pi, *dest_pi;    char *my_host, *dest_host;    int my_id;    struct slave_listener_msg msg;    int dest_listener_con_fd;    int my_listener, dest_listener;    int num_tries;    /* Get some initial information */    my_id	  = p4_get_my_id();    my_pi	  = get_proc_info(my_id);    my_host	  = my_pi->host_name;    my_listener	  = my_pi->port;    dest_pi	  = get_proc_info(dest_id);    dest_host	  = dest_pi->host_name;    dest_listener = dest_pi->port;    p4_dprintfl(70, "TL :request_connection: my_id=%d my_host=%s my_listener=%d dest_id=%d dest_host=%s dest_listener=%d\n",	    my_id, my_host, my_listener, dest_id, dest_host, dest_listener);    /* Have we already connected?? */    if (p4_local->conntab[dest_id].type != CONN_REMOTE_NON_EST /* &&	p4_local->conntab[dest_id].type != CONN_REMOTE_OPENING */)    {	/* This should never happen */	p4_dprintfl(70,"TL: request_connection %d: already connected!\n", 		    dest_id);	return -2;    }    p4_dprintfl(70, "TL: enter loop to connect to dest listener %s\n",		dest_host);    /* Connect to dest listener */    num_tries = 1;    p4_has_timedout( 0 );    while((dest_listener_con_fd = 	   net_conn_to_listener(dest_host,dest_listener,1)) == -1) {	num_tries++;	if (p4_has_timedout( 1 )) {	    p4_error( "Timeout in establishing connection to remote process", 		      0 );	    }	}    p4_dprintfl(70,  "TL: conn_to_proc_contd: connected after %d tries, dest_listener_con_fd=%d\n",		num_tries, dest_listener_con_fd);    /* Construct a connection request message */    msg.type   = p4_i_to_n(CONNECTION_REQUEST);    msg.from   = p4_i_to_n(my_id);    msg.lport  = p4_i_to_n(-1);    msg.to     = p4_i_to_n(dest_id);    msg.to_pid = p4_i_to_n(dest_pi->unix_id);    /* Send it to dest_id's listener */    p4_dprintfl(70, "TL: request_connection: sending CONNECTION_REQUEST to %d on fd=%d size=%d\n",		dest_id,dest_listener_con_fd,sizeof(msg));    net_send(dest_listener_con_fd, &msg, sizeof(msg), P4_FALSE);    p4_dprintfl(70, 	"TL: request_connection: sent CONNECTION_REQUEST to dest_listener\n");    return dest_listener_con_fd;}#endif/* This is net_recv, except simplified for short messages and with an    explicit timeout */int net_recv_timeout(int fd, P4VOID *in_buf, int size, int secs){    int recvd = 0;    int n;    int read_counter = 0;    int block_counter = 0;    int eof_counter = 0;    char *buf = (char *)in_buf;    time_t start_time, cur_time;    start_time = time( (time_t) 0 );    p4_dprintfl( 99, "Beginning net_recv_timeout of %d on fd %d\n", size, fd );    while (recvd < size)    {	read_counter++;	SYSCALL_P4(n, read(fd, buf + recvd, size - recvd));	cur_time = time( (time_t) 0 );	if (cur_time - start_time >= secs) {	    if (n > 0) recvd += n;	    return recvd;	}	if (n == 0)		/* maybe EOF, maybe not */#if defined(P4SYSV) && !defined(NONBLOCKING_READ_WORKS)	{	    int n1 = 0;	    struct timeval tv;	    fd_set read_fds;	    int rc;	    char tempbuf[1];	    eof_counter++;	    tv.tv_sec = 5;	    tv.tv_usec = 0;	    FD_ZERO(&read_fds);	    FD_SET(fd, &read_fds);	    SYSCALL_P4(n1, select(fd+1, &read_fds, 0, 0, &tv));	    if (n1 == 1  &&  FD_ISSET(fd, &read_fds))	    {		rc = recv(fd, tempbuf, 1, MSG_PEEK);		if (rc == -1)		{		    /* -1 indicates ewouldblock (eagain) (check errno) */		    p4_error("net_recv_timeout recv:  got -1", -1);		}		if (rc == 0)	/* if eof */		{		    /* eof; a process has closed its socket; may have died */		    p4_error("net_recv_timeout recv:  EOF on socket", read_counter);		}		else		    continue;	    }	    sleep(1);	    if (eof_counter < 5)		continue;	    else		p4_error("net_recv_timeout read:  probable EOF on socket", read_counter);	}#else	{	    /* Except on SYSV, n == 0 is EOF */	    /* Should we ignore EOFs during shutdown? */	    p4_error("net_recv_timeout read:  probable EOF on socket", read_counter);        }#endif	if (n < 0)	{	    /* EAGAIN is really POSIX, so we check for either EAGAIN 	       or EWOULDBLOCK.  Note some systems set EAGAIN == EWOULDBLOCK	     */	    /* Solaris 2.5 occasionally sets n == -1 and errno == 0 (!!).	       since n == -1 and errno == 0 is invalid (i.e., a bug in read),	       it should be safe to treat it as EAGAIN and to try the	       read once more 	     */	    if (errno == EAGAIN || errno == EWOULDBLOCK || errno == 0)	    {		struct timeval tv;		fd_set         read_fds;		int            n1;		block_counter++;		/* Wait here for more data for no more than the timeout		   period */		tv.tv_sec = secs - (cur_time - start_time);		tv.tv_usec = 0;		FD_ZERO(&read_fds);		FD_SET(fd, &read_fds);		SYSCALL_P4(n1, select(fd+1, &read_fds, 0, 0, &tv));		continue;	    }	    else {		/* A closed socket can cause this to happen. */		printf( "net_recv_timeout failed for fd = %d\n", fd );		p4_error("net_recv_timeout read, errno = ", errno);	    }	}	recvd += n;    }    p4_dprintfl( 99, 		"Ending net_recv_timeout of %d on fd %d (eof_c = %d, block = %d)\n", 		 size, fd, eof_counter, block_counter );    return (recvd);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -