📄 flitvecio.c
字号:
/** @file flitvecio.c implements the method to send and receive message based on flitvec */#include <unistd.h>#include <fcntl.h>#include <sys/types.h>#include <sys/socket.h>#include <sys/un.h>#include <sys/time.h>#include <netinet/in.h>#include <netinet/tcp.h>#include <arpa/inet.h>#include <netdb.h>#include <sys/wait.h>#include <sys/uio.h>#include <signal.h>#include "config.h"#include "debug.h"#include "utils_socket.h"#include "flitvec.h"#include "flitvecio.h"#ifndef MAXIOVEC#define MAXIOVEC 512#endifstatic struct iovec iovbuff[MAXIOVEC];static void flitvecio_init(void) __attribute((__constructor__));#ifdef DEBUGextern char *fdtostr(int fd);static void on_sigpipe(int s){ printi("sendvec", "received a SIGPIPE"); signal(SIGPIPE, on_sigpipe);}static void flitvecio_init(void){ printi("sendvec", "SIGPIPE prints and ignore"); signal(SIGPIPE, on_sigpipe);}#elsestatic void flitvecio_init(void){ signal(SIGPIPE, SIG_IGN);}#define fdtostr(fd) 0#endifint sendvec(int fd, flitvec *fv){ int res, i, cfp; int j = 0; ASSERT(fv->nbflit < MAXIOVEC); printi("sendvec", "position of flitindex: %d, %d", fv->cflitindex, fv->cflitpos); if((fv->cflitindex == 0) && (fv->cflitpos == 0)) printi("sendvec", "sending new fv whose packet header is %s", ((fv->flittab[0].data) && (fv->flittab[0].size == sizeof(pkt_header))) ? format_packet((pkt_header*)fv->flittab[0].data):"malformed or null first flit"); /* filling iovec with current flitvec pos */ i = fv->cflitindex; cfp = fv->cflitpos; while(i < fv->nbflit) { iovbuff[j].iov_base = (fv->flittab[i].data) + cfp; iovbuff[j].iov_len = (fv->flittab[i].size) - cfp; printi("sendvec", "iovbuff[%d].len = %d", j, iovbuff[j].iov_len); cfp = 0; j++; i++; } printi("sendvec", "%d iobuf to send on %s", j, fdtostr(fd)); res = writev(fd, iovbuff, j); printi("sendvec", "sent %d bytes", res); if(res == -1) { if((errno == EAGAIN) || (errno == EINTR)) return 0; else { printe("sendvec error"); raise(SIGSEGV); return -1; } } /* how much flits were sent ? */ while((fv->cflitindex < fv->nbflit) && (res != 0)) { if(res >= (fv->flittab[fv->cflitindex].size - fv->cflitpos)) { /* this flit has been completely sent */ printi("sendvec", "This flit %d is completely sent, invoking callback", fv->cflitindex); fv->flittab[fv->cflitindex].oncomplete(&(fv->flittab[fv->cflitindex]), fv->parameter); res -= fv->flittab[fv->cflitindex].size - fv->cflitpos; printi("sendvec", "This flit %d is %d bytes, %d were sent this time, remaining %d bytes of next flits that have been sent during the same writev", fv->cflitindex, fv->flittab[fv->cflitindex].size, fv->flittab[fv->cflitindex].size - fv->cflitpos, res); fv->cflitindex++; fv->cflitpos = 0; while((fv->cflitindex < fv->nbflit) && (fv->flittab[fv->cflitindex].size == 0)) { printi("sendvec", "This flit %d is of size 0, invoking callback", fv->cflitindex); fv->flittab[fv->cflitindex].oncomplete(&(fv->flittab[fv->cflitindex]), fv->parameter); fv->cflitindex++; } } else { /* this flit was partialy sent */ printi("sendvec", "This flit %d is %d bytes, %d were sent during this writev", fv->cflitindex, fv->flittab[fv->cflitindex].size, res); fv->cflitpos += res; res = 0; } } if(fv->cflitindex == fv->nbflit) { printi("sendvec", "All flits were sent"); // ASSERT(!res); return 1; } else { printi("sendvec", "New position of flitindex: %d (+%d)", fv->cflitindex, fv->cflitpos); return 0; }}int receivevec(int fd, flitvec * message){ int res; int i = message->cflitindex; int j = 0; int cfp; int totaltoread = 0; printi("receivevec", "reading a flitvec at position %d, %d", message->cflitindex, message->cflitpos); cfp = message->cflitpos; while(i < message->nbflit) { if(message->flittab[i].data == NULL) { i++; continue; } iovbuff[j].iov_base = message->flittab[i].data + cfp; iovbuff[j].iov_len = message->flittab[i].size - cfp; totaltoread += iovbuff[j].iov_len; i++; cfp = 0; printi("receivevec", "add iovec[%d] %p, %d", j, iovbuff[j].iov_base, iovbuff[j].iov_len); j++; } if( totaltoread == 0 ) { printi("receivevec", "nothing to read : done"); res = 0; } else { res = readv(fd, iovbuff, j); printi("receivevec", "received %d bytes", res); if (res == 0) { printi("receivevec","reading 0 bytes: declaring peer is disconnected"); return -1; } if(res == -1) { if((errno == EAGAIN) || (errno == EINTR)) return 0; else return -1; } } #ifdef DEBUG if(res+message->cflitpos >= message->flittab[message->cflitindex].size) printi("recvvec", "tough case: res = %d, message = %d, %d", res, message->cflitpos, message->cflitindex);#endif do { i = message->cflitindex; if( message->cflitpos + res >= message->flittab[i].size ) { message->cflitindex++; message->flittab[i].oncomplete(&(message->flittab[i]), message->parameter); res -= ((message->flittab[i].size) - (message->cflitpos)); message->cflitpos = 0; } else { message->cflitpos += res; break; } } while( res > 0 ); printi("receivevec", "new position after read : %d, %d", message->cflitindex, message->cflitpos); if ( message->cflitindex == message->nbflit && res == 0 ) return 1; else return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -