⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 p4_sock_list.c

📁 MPICH是MPI的重要研究,提供了一系列的接口函数,为并行计算的实现提供了编程环境.
💻 C
📖 第 1 页 / 共 2 页
字号:
#include "p4.h"#include "p4_sys.h"#ifndef THREAD_LISTENERstatic P4BOOL process_slave_message(int fd);static P4BOOL process_connect_request(int fd);P4VOID listener(){    struct listener_data *l = listener_info;    P4BOOL done = P4_FALSE;    fd_set read_fds;    int i, nfds, fd;    p4_dprintfl(70, "enter listener \n");    dump_listener(70);    while (!done)    {	FD_ZERO(&read_fds);	FD_SET(l->listening_fd, &read_fds);	FD_SET(l->slave_fd[0], &read_fds);	SYSCALL_P4(nfds, select(p4_global->max_connections, &read_fds, 0, 0, 0));	if (nfds < 0)	    p4_error("listener select", nfds);	if (nfds == 0)	    p4_dprintfl(70, "select timeout\n");	fd = 0;	for (i = 0; i < nfds && !done; i++)	{	    while (fd < p4_global->max_connections)	    {		if (FD_ISSET(fd, &read_fds))		{		    if (fd == l->listening_fd || fd == l->slave_fd[0])			break;		}		fd++;	    }	    p4_dprintfl(70, "got fd=%d listening_fd=%d slave_fd=%d\n",			fd, l->listening_fd, l->slave_fd[0]);	    /* We use |= to insure that after the loop, we haven't lost	       any "done" messages. 	       There really are some nasty race conditions here, and all	       this does is cause us to NOT lose a "DIE" message	     */	    if (fd == l->listening_fd)		done |= process_connect_request(fd);	    else if (fd == l->slave_fd[0]) 		done |= process_slave_message(fd);	    fd++;	}    }    close( l->listening_fd );    p4_dprintfl(70, "exit listener\n");    exit(0);}static P4BOOL process_connect_request(int fd){    struct slave_listener_msg msg;    int type, msglen;    int connection_fd, slave_fd;    int from, lport, to_pid, to;    P4BOOL rc = P4_FALSE;    p4_dprintfl(70, "processing connect check/request on %d\n", fd);    connection_fd = net_accept(fd);    p4_dprintfl(70, "accepted on connection_fd=%d reading size=%d\n", connection_fd,sizeof(msg));    /* We originally used net_recv here, but there is the chance that a       bogus message arrives.  In that case, we read, discard, and close       the connection.  We detect a bogus message by either a timeout       or invalid message type (we should switch to a session-specific       message cookie).  Because we need a timeout, we can't use net_recv.       Since we don't need a very complex receive message, this isn't       such a bad thing */    if ((msglen = net_recv_timeout(connection_fd, &msg, sizeof(msg), 10)) == 	PRECV_EOF || msglen != sizeof(msg))    {	close( connection_fd );	return (P4_FALSE);    }    type = p4_n_to_i(msg.type);    switch (type)    {      case IGNORE_THIS:	p4_dprintfl(70, "got IGNORE_THIS\n");	break;      case CONNECTION_REQUEST:	from = p4_n_to_i(msg.from);	to_pid = p4_n_to_i(msg.to_pid);	to = p4_n_to_i(msg.to);	lport = p4_n_to_i(msg.lport);	p4_dprintfl(70, "connection_request2: poking slave: from=%d lport=%d to_pid=%d to=%d\n",		    from, lport, to_pid, to);	slave_fd = listener_info->slave_fd[0];	if (kill(to_pid, LISTENER_ATTN_SIGNAL) == -1)	{	    p4_dprintf("Listener: Unable to interrupt client pid=%d.\n", to_pid);	    break;	}	net_send(slave_fd, &msg, sizeof(msg), P4_FALSE);	/* wait for msg from slave indicating it got connected */	/*	 * do not accept any more connections for slave until it has fully	 * completed this one, i.e. do not want to interrupt it until it has	 * handled this interrupt	 */	p4_dprintfl(70, "waiting for slave to handle interrupt\n");	net_recv(slave_fd, &msg, sizeof(msg));	/* Check that we get a valid message; for now (see p4_sock_conn/	   handle_connection_interrupt) this is just IGNORE_THIS */	if (p4_i_to_n(msg.type) != IGNORE_THIS) {	    p4_dprintf("received incorrect handshake message type=%d\n", 		       p4_i_to_n(msg.type) );	    p4_error("slave_listener_msg: broken handshake", 		     p4_i_to_n(msg.type));	    }	p4_dprintfl(70, "back from slave handling interrupt\n");	break;      default:	p4_dprintf("invalid type %d in process_connect_request\n", type);	break;    }    close(connection_fd);    return (rc);}static P4BOOL process_slave_message(int fd){    struct slave_listener_msg msg;    int type;    int from;    P4BOOL rc = P4_FALSE;    int status;    status = net_recv(fd, &msg, sizeof(msg));    if (status == PRECV_EOF)    {	p4_error("slave_listener_msg: got eof on fd=", fd);    }    type = p4_n_to_i(msg.type);    from = p4_n_to_i(msg.from);    switch (type)    {      case DIE:	p4_dprintfl(70, "received die msg from %d\n", from);	rc = P4_TRUE;	break;      default:	p4_dprintf("received unknown message type=%d from=%d\n", type, from);	p4_error("slave_listener_msg: unknown message type", type);	break;    }    return (rc);}#else /* def THREAD_LISTENER *//*  * The thread listener logic is quite different from the process listener * logic.  This takes advantage of the fact that the thread is in the same * process.  The algorithm is this: *   Let L be the listener thread and P be the "process"/user thread *   To connect, P sends a message to its OWN listener, using the pipe *   between them (this allows L to use a select to wait for work to do). *   P then waits for a message back down the pipe that indicates that the *   connection is ready.  It may get messages about other connections *   becoming ready while it is waiting. * *   L selects on the pipe to P and the external connection socket. *   If it gets a request from P, it checks the connection table; if *   the connection has already been made, it ignores the request (since *   the request-ready message is already in the pipe).  Otherwise, it *   creates a new socket and contacts the remote listener. *    *   If the rank of L is LOWER than the rank of the remote L, this is the *   socket that will be used for the connection.  Once the remote listener *   accepts the connection, BOTH listeners (local and remote) transfer the *   socket fd into the connection tables, set the connection to EST, and *   send a message down the pipe to P. * *   If the rank of L is higher than the rank of the remote L, a message is *   sent asking the remote (lower rank) L to establish a connection. *   The socket used for this request is closed when the connection is  *   established.  This is the only time a socket is created and later closed. * *   Because this is so different from the process listener, there is a  *   separate establish_connection routine. * *   Why choose the lower rank to establish the connection?  Because the *   first round of connections is from the master, at rank 0.  Additional *   connections as part of the initial distribution tree are also from low *   to high rank.  This reduces the number of connections that are made. */P4VOID thread_listener(){    struct slave_listener_msg msg;    int type;    int connection_fd;    int from, lport, to_pid, to;/*    int do_conn;    P4BOOL rc = P4_FALSE;    struct proc_info *from_pi; */    fd_set read_fds;    int    nfds, nfds_in;    int    msglen;  p4_dprintfl(70,"TL: thread listener starting\n");  while(1)  {      p4_dprintfl(70, "TL: thread listener starting select on fd=%d port=%d\n",		  p4_global->listener_fd,p4_global->listener_port);            FD_ZERO(&read_fds);      FD_SET(p4_global->listener_fd, &read_fds);      FD_SET(listener_info->slave_fd[0], &read_fds);      nfds_in = p4_global->listener_fd;      if (listener_info->slave_fd[0] > nfds_in) nfds_in = listener_info->slave_fd[0];      nfds_in++;      SYSCALL_P4(nfds, select(nfds_in, &read_fds, 0, 0, 0));      if (nfds < 0)	  p4_error("listener select", nfds);      if (nfds == 0) {	  p4_dprintfl(70, "TL: select timeout\n");	  continue;      }      /* Process remote connection requests first */      if (FD_ISSET(p4_global->listener_fd, &read_fds)) {	  /* Accept connection, get message */	  p4_dprintfl( 70, "TL: starting accept\n" );	  connection_fd = net_accept(p4_global->listener_fd);	  p4_dprintfl(70, 		  "TL: thread listener accepted on %d, got connection_fd=%d\n",		      p4_global->listener_fd, connection_fd);	  	  if ((msglen = net_recv_timeout(connection_fd, &msg, sizeof(msg),10)) == PRECV_EOF)	  {	      p4_dprintf("TL: thread listener detected EOF on fd=%d\n",			 connection_fd);	      p4_error("thread listener detected EOF",-1);	  }	  if (msglen != sizeof(msg)) {	      p4_dprintf("TL: message was wrong size (%d)\n", msglen );	      close(connection_fd);	  }	  type = p4_n_to_i(msg.type);	  switch (type) 	  {	  case IGNORE_THIS:	      p4_dprintfl(70, "TL: got IGNORE_THIS\n");	      break;	  case CONNECTION_REQUEST:	      from   = p4_n_to_i(msg.from);	      to_pid = p4_n_to_i(msg.to_pid);	      to     = p4_n_to_i(msg.to);	      lport  = p4_n_to_i(msg.lport);	      if (lport != -1) {		  /* Message from non-threaded listener! */	      }	      p4_dprintfl(70, 	      "TL: got connection_request: from=%d lport=%d to_pid=%d to=%d\n",			  from, lport, to_pid, to);	      if (p4_local->conntab[from].type == CONN_REMOTE_NON_EST) {		  /* Establish the connection */		  /* p4_local->conntab[from].type = CONN_REMOTE_OPENING; */		  p4_dprintfl(70, "TL: connection now opening for %d\n", 			      from );		  		  if (p4_local->my_id < from) 		  {		      int new_connection_fd;		      p4_dprintfl(90,"TL: myid < from, myid = %d, from = %d\n",				  p4_local->my_id,from);		      /* Create a connection back to "from".  We could use		       the same socket, but using the same request code 		       is easier */		      new_connection_fd = request_connection(from);		      if (new_connection_fd < 0) {			  p4_error( "Could not create new connection", 				    new_connection_fd );		      }		      close( connection_fd );		      connection_fd = new_connection_fd;		      /* We now have the socket for the connection */		  }		  /* This is the new socket.  Just keep it. */		  p4_local->conntab[from].port = connection_fd;		  p4_local->conntab[from].same_data_rep =

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -