⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 flitvecio.c

📁 Path MPICH-V for MPICH the MPI Implementation
💻 C
字号:
/** @file flitvecio.c implements the method to send and receive message based on flitvec */#include <unistd.h>#include <fcntl.h>#include <sys/types.h>#include <sys/socket.h>#include <sys/un.h>#include <sys/time.h>#include <netinet/in.h>#include <netinet/tcp.h>#include <arpa/inet.h>#include <netdb.h>#include <sys/wait.h>#include <sys/uio.h>#include <signal.h>#include "config.h"#include "debug.h"#include "utils_socket.h"#include "flitvec.h"#include "flitvecio.h"#ifndef MAXIOVEC#define MAXIOVEC 512#endifstatic struct iovec iovbuff[MAXIOVEC];static void flitvecio_init(void) __attribute((__constructor__));#ifdef DEBUGextern char *fdtostr(int fd);static void on_sigpipe(int s){  printi("sendvec", "received a SIGPIPE");  signal(SIGPIPE, on_sigpipe);}static void flitvecio_init(void){  printi("sendvec", "SIGPIPE prints and ignore");  signal(SIGPIPE, on_sigpipe);}#elsestatic void flitvecio_init(void){  signal(SIGPIPE, SIG_IGN);}#define fdtostr(fd) 0#endifint sendvec(int fd, flitvec *fv){  int res, i, cfp;  int j = 0;    ASSERT(fv->nbflit < MAXIOVEC);  printi("sendvec", "position of flitindex: %d, %d", fv->cflitindex, fv->cflitpos);  if((fv->cflitindex == 0) && (fv->cflitpos == 0))    printi("sendvec", "sending new fv whose packet header is %s", ((fv->flittab[0].data) && (fv->flittab[0].size == sizeof(pkt_header))) ? format_packet((pkt_header*)fv->flittab[0].data):"malformed or null first flit");  /* filling iovec with current flitvec pos */  i = fv->cflitindex;  cfp = fv->cflitpos;  while(i < fv->nbflit)  {    iovbuff[j].iov_base = (fv->flittab[i].data) + cfp;    iovbuff[j].iov_len = (fv->flittab[i].size) - cfp;    printi("sendvec", "iovbuff[%d].len = %d", j, iovbuff[j].iov_len);    cfp = 0;    j++;    i++;  }  printi("sendvec", "%d iobuf to send on %s", j, fdtostr(fd));  res = writev(fd, iovbuff, j);  printi("sendvec", "sent %d bytes", res);  if(res == -1)  {    if((errno == EAGAIN) || (errno == EINTR))      return 0;    else      {        printe("sendvec error");        raise(SIGSEGV);        return -1;      }  }  /* how much flits were sent ?  */  while((fv->cflitindex < fv->nbflit) && (res != 0))    {      if(res >= (fv->flittab[fv->cflitindex].size - fv->cflitpos))        {          /* this flit has been completely sent */          printi("sendvec", "This flit %d is completely sent, invoking callback", fv->cflitindex);          fv->flittab[fv->cflitindex].oncomplete(&(fv->flittab[fv->cflitindex]), fv->parameter);          res -= fv->flittab[fv->cflitindex].size - fv->cflitpos;          printi("sendvec", "This flit %d is %d bytes, %d were sent this time, remaining %d bytes of  next flits that have been sent during the same writev", 		 fv->cflitindex, fv->flittab[fv->cflitindex].size, fv->flittab[fv->cflitindex].size - fv->cflitpos, res);          fv->cflitindex++;          fv->cflitpos = 0;          while((fv->cflitindex < fv->nbflit) && (fv->flittab[fv->cflitindex].size == 0))	    {	      printi("sendvec", "This flit %d is of size 0, invoking callback", fv->cflitindex);	      fv->flittab[fv->cflitindex].oncomplete(&(fv->flittab[fv->cflitindex]), fv->parameter);	      fv->cflitindex++;	    }        }      else        {          /* this flit was partialy sent */                printi("sendvec", "This flit %d is %d bytes, %d were sent during this writev", fv->cflitindex, fv->flittab[fv->cflitindex].size, res);          fv->cflitpos += res;          res = 0;        }    }  if(fv->cflitindex == fv->nbflit)   {    printi("sendvec", "All flits were sent");    //    ASSERT(!res);    return 1;  }  else   {    printi("sendvec", "New position of flitindex: %d (+%d)", fv->cflitindex, fv->cflitpos);    return 0;  }}int receivevec(int fd, flitvec * message){  int res;  int i = message->cflitindex;  int j = 0;  int cfp;  int totaltoread = 0;  printi("receivevec", "reading a flitvec at position %d, %d", message->cflitindex, message->cflitpos);  cfp = message->cflitpos;  while(i < message->nbflit)    {      if(message->flittab[i].data == NULL)        {          i++;          continue;        }      iovbuff[j].iov_base = message->flittab[i].data + cfp;      iovbuff[j].iov_len = message->flittab[i].size - cfp;      totaltoread += iovbuff[j].iov_len;      i++;      cfp = 0;      printi("receivevec", "add iovec[%d] %p, %d", j, iovbuff[j].iov_base, iovbuff[j].iov_len);      j++;    }  if( totaltoread == 0 )  {	printi("receivevec", "nothing to read : done");	res = 0;  } else {  	res = readv(fd, iovbuff, j);  	printi("receivevec", "received %d bytes", res);  	if (res == 0)    	{           printi("receivevec","reading 0 bytes: declaring peer is disconnected");     	   return -1;        }  	if(res == -1)    	{	   if((errno == EAGAIN) || (errno == EINTR))	     return 0;           else             return -1;        }  }  #ifdef DEBUG  if(res+message->cflitpos >= message->flittab[message->cflitindex].size)    printi("recvvec", "tough case: res = %d, message = %d, %d", res, message->cflitpos, message->cflitindex);#endif  do    {      i = message->cflitindex;      if( message->cflitpos + res >= message->flittab[i].size )        {          message->cflitindex++;          message->flittab[i].oncomplete(&(message->flittab[i]), message->parameter);          res -= ((message->flittab[i].size) - (message->cflitpos));          message->cflitpos = 0;        }      else        {          message->cflitpos += res;          break;        }    }  while( res > 0 );  printi("receivevec", "new position after read : %d, %d", message->cflitindex, message->cflitpos);  if ( message->cflitindex == message->nbflit && res == 0 )    return 1;  else    return 0;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -