⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 otherdaemoncomm.c

📁 Path MPICH-V for MPICH the MPI Implementation
💻 C
字号:
/** @file otherdaemoncomm.c implements the communication fonction between daemons */#include "debug.h"#include "config.h"#include "otherdaemoncomm.h"#include "simple_list.h"#include "flitvec.h"#include "flitvecio.h"#include "utils_socket.h"#include "daemoncom.h"#include "select.h"#define PENDING_NONE 0#define PENDING_TAG  1#define PENDING_ID   2#define DATA 1#define PROTO 2static unsigned char pending; /**< pending state */static LinkedList_t pnd_msgs; /**< list of pending messages (list of *(pkt_header followed by their data))*/static int pending_tag = -1;  /**< current pending tag for non received message */static Communication * peermessages; /**< not the sender based structure but                                        the dynamic asynchronous send/receive */static int commsize; /**< total number of mpi processes *//** initialize list of pending messages * @return void */static void init_pnd_mesgs_list(){  ll_create (&pnd_msgs);  pending = PENDING_NONE;}LinkedList_t * getpending(){  return &pnd_msgs;}void newpendingtag(int tag){  printi ("Request", "entering pending state for tag %d", tag);  pending_tag = tag;  pending = PENDING_TAG;}pkt_header *find_pending(int (*f)(void*, void *), void *k){  pkt_header * pkt;  pkt = (pkt_header *)ll_remove_by_key (&pnd_msgs, f, k);  if ( pkt != NULL )    printi("find_pending","find header %s", format_packet(pkt));  return pkt;}int probing_pnd_mesgs(int tag){  if (ll_find_by_key (&pnd_msgs, cmp_tag, (void *) tag) != NULL)    return 1;  return 0;}int waiting_for_tag(int tag){  if( pending == PENDING_NONE )    return 0;  return pending_tag == tag;}void clearpendingtag(){  pending_tag = 0;  pending = PENDING_NONE;}void debugpndpkt(){  sElement_t * elem;  int i;  elem = pnd_msgs.head;  for ( i = 0; i < pnd_msgs.size; i++ )    {      pkt_header * pkt;      pkt = (pkt_header *)elem->data;      if ( pkt != NULL )	printi("pending", "%d/%d : head pkt = %s", i, pnd_msgs.size, format_packet_long(pkt, 48));      else	printe("%d/%d : NULL pkt",i, pnd_msgs.size);      elem = elem->pNext;    }}void pending_add(pkt_header *pkt){  printi("pendingadd", "header: %s", format_packet(pkt));  printi("pendingadd", "past pending size: %d", pnd_msgs.size);  ll_add_tail(&pnd_msgs, pkt);}/** initialize one Communication structure * @param onecomm : a Communication Structure * @return void */static void initonecomm(Communication * onecomm){  ll_create(&(onecomm->tosend));  onecomm->toreceive = NULL;}void initCommunication(int nbproc){  int i;  peermessages = malloc(nbproc * sizeof(struct Communication));  commsize = nbproc;  for (i = 0 ; i < nbproc ; i++ )    initonecomm(&peermessages[i]);  init_pnd_mesgs_list();}int inline getpeerfdsets(fd_set * writeset, SockInfo * mpipeer, int max){  int i;  int res = max;  for (i = 0 ; i < commsize ; i++ )    {      if( ftp_msgsend_needed(mpipeer, i) )        {          FD_SET(mpipeer[i].fd, writeset); /* add socket to write set */          if(mpipeer[i].fd > res)            res = mpipeer[i].fd;          continue;        }            if (ll_is_empty(&(peermessages[i].tosend))) /* no message to send to peer i */        continue;      if (mpipeer[i].fd < 0) /* no connection to peer i */        continue;      FD_SET(mpipeer[i].fd, writeset); /* add socket to write set */      if(mpipeer[i].fd > res)        res = mpipeer[i].fd;    }  return res;}void addtopeersend(flitvec *fv){  int dst = ntohl(((pkt_header *)fv->flittab[0].data)->iDst);  ll_add_tail(&(peermessages[dst].tosend), fv);}void prependtopeersend(flitvec *fv){  int dst = ntohl(((pkt_header *)fv->flittab[0].data)->iDst);  flitvec *first;  /* first check that prepending now is safe */  if( peermessages[dst].tosend.head != NULL )    {      first = (flitvec *)peermessages[dst].tosend.head->data;      ASSERT( (first->cflitpos == 0) && (first->cflitindex == 0) );    }  ll_add_head(&(peermessages[dst].tosend), fv);}/* send a (or part of a) message to one mpi peer. message is the head of peermessage[rank].tosend *  @param rank : rank of the mpi peer to send to *  @return *            -1 if disconnected *             0 if not ready *             1 if a part but not the total has been sent *             2 if the write is complete */int on_send (int rank, SockInfo * mpipeer){  int n;  pkt_header *hpkt;  hpkt=(pkt_header *)(( (flitvec*)peermessages[rank].tosend.head->data)->flittab[0].data);  /* header of next message */  printi("send", "Send message %s", format_packet( hpkt ) );  ASSERT(hpkt != NULL);  n = sendvec(mpipeer[rank].fd, (flitvec *)( peermessages[rank].tosend.head->data));  printi("send", "Sent %s", n<0?"disconnect":(n==1?"whole flitvec":"part of the flitvec"));  if (n < 0)    {      printi ("send","ERROR writing to socket %d: %s", mpipeer[rank].fd,	      strerror (errno));      return -1;    }  if (n == 1)    {      printi("send", "removing the head of %d", rank);      release_flitvec( (flitvec*) peermessages[rank].tosend.head->data);      ll_remove_head(&(peermessages[rank].tosend)); /* next message to be send */      printi("send", "returning 2");      return 2;    }  printi("send", "returning 1");  return 1;}static void do_disconnect(int rank, SockInfo *mpipeer){  int i;  printi("disconnection","peer %d disconnected, cleaning socket structure", rank);  /* socket information to reset for peer 'rank' */  on_mpipeer_disconnect (rank, mpipeer);  /* emptying list of message in communication buffer */  while(!ll_is_empty(&(peermessages[rank].tosend))) /* while exists a flitvec to send in channel */    {      flitvec * vec = (flitvec *) peermessages[rank].tosend.head->data;      /* free data inside flitvec (by calling oncomplete() function of the flit ) */      for (i = vec->cflitindex ; i < vec->nbflit ; i++ )        vec->flittab[i].oncomplete(&(vec->flittab[i]), vec->parameter );      /* free flitvec itself */      release_flitvec(vec);      ll_remove_head(&(peermessages[rank].tosend));    }  if(!all_is_done())    ftp_on_disconnect(mpipeer, rank);}void writedaemon(int rank, SockInfo * mpipeer){  printi ("fdset", "mpipeer[%d] is write ready in state %d\n", rank, mpipeer[rank].hstate);  if(mpipeer[rank].hstate != CO_DONE)    {      printi("writedaemon","sending reconnection information");      if(ftp_connection_write(mpipeer, rank) < 0)        {          printw("peer %d has broken connection during connection",rank);          do_disconnect(rank, mpipeer);        }    }  else    {      if (on_send (rank, mpipeer) < 0)        {          printw("peer %d has broken connection while sending",rank);          do_disconnect (rank, mpipeer);        }    }  printi("writedaemon", "returning");}void readdaemon(int rank, SockInfo * mpipeer){  int n;  printi ("fdset", "mpipeer[%d] is read ready in state %d\n", rank, mpipeer[rank].hstate);  if( mpipeer[rank].hstate != CO_DONE )    {      if ((ftp_connection_read(mpipeer, rank)) < 0)	{          printi("warning", "peer %d has broken connection during connection",rank);          	  do_disconnect (rank, mpipeer);	  return;	}    }  else    {      pkt_header pkt;      if( peermessages[rank].toreceive == NULL )        {          printi ("Recv", "reading header from rank %d", rank);	  if( _urecv( mpipeer[rank].fd, &pkt, sizeof(pkt_header), 0 ) != sizeof(pkt_header) )	    {	      printi("warning", "peer %d has broken connection", rank);	      do_disconnect(rank, mpipeer);	      return;	    }          printi ("Recv", "creating the reception flitvec from rank %d, based on header : %s", rank, format_packet(&pkt));          peermessages[rank].toreceive = ftp_create_newfv( &pkt );	  if( peermessages[rank].toreceive == NULL )	    {	      printi("ckpt-info","checkpoint information from rank %d", rank);	      return;	    }        }      printi ("Recv", "reading data from rank %d", rank);      n = receivevec( mpipeer[rank].fd, peermessages[rank].toreceive);      if(n == 0)        printi("Recv", "reception flitvec partially read");      if(n == 1)        {          printi("Recv", "reception flitvec completely read");          release_flitvec(peermessages[rank].toreceive);          peermessages[rank].toreceive = NULL;        }      if (n < 0)        {          printw("peer %d has broken connection while receiving",rank);          do_disconnect (rank, mpipeer);	  return;        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -