📄 otherdaemoncomm.c
字号:
/** @file otherdaemoncomm.c implements the communication fonction between daemons */#include "debug.h"#include "config.h"#include "otherdaemoncomm.h"#include "simple_list.h"#include "flitvec.h"#include "flitvecio.h"#include "utils_socket.h"#include "daemoncom.h"#include "select.h"#define PENDING_NONE 0#define PENDING_TAG 1#define PENDING_ID 2#define DATA 1#define PROTO 2static unsigned char pending; /**< pending state */static LinkedList_t pnd_msgs; /**< list of pending messages (list of *(pkt_header followed by their data))*/static int pending_tag = -1; /**< current pending tag for non received message */static Communication * peermessages; /**< not the sender based structure but the dynamic asynchronous send/receive */static int commsize; /**< total number of mpi processes *//** initialize list of pending messages * @return void */static void init_pnd_mesgs_list(){ ll_create (&pnd_msgs); pending = PENDING_NONE;}LinkedList_t * getpending(){ return &pnd_msgs;}void newpendingtag(int tag){ printi ("Request", "entering pending state for tag %d", tag); pending_tag = tag; pending = PENDING_TAG;}pkt_header *find_pending(int (*f)(void*, void *), void *k){ pkt_header * pkt; pkt = (pkt_header *)ll_remove_by_key (&pnd_msgs, f, k); if ( pkt != NULL ) printi("find_pending","find header %s", format_packet(pkt)); return pkt;}int probing_pnd_mesgs(int tag){ if (ll_find_by_key (&pnd_msgs, cmp_tag, (void *) tag) != NULL) return 1; return 0;}int waiting_for_tag(int tag){ if( pending == PENDING_NONE ) return 0; return pending_tag == tag;}void clearpendingtag(){ pending_tag = 0; pending = PENDING_NONE;}void debugpndpkt(){ sElement_t * elem; int i; elem = pnd_msgs.head; for ( i = 0; i < pnd_msgs.size; i++ ) { pkt_header * pkt; pkt = (pkt_header *)elem->data; if ( pkt != NULL ) printi("pending", "%d/%d : head pkt = %s", i, pnd_msgs.size, format_packet_long(pkt, 48)); else printe("%d/%d : NULL pkt",i, pnd_msgs.size); elem = elem->pNext; }}void pending_add(pkt_header *pkt){ printi("pendingadd", "header: %s", format_packet(pkt)); printi("pendingadd", "past pending size: %d", pnd_msgs.size); ll_add_tail(&pnd_msgs, pkt);}/** initialize one Communication structure * @param onecomm : a Communication Structure * @return void */static void initonecomm(Communication * onecomm){ ll_create(&(onecomm->tosend)); onecomm->toreceive = NULL;}void initCommunication(int nbproc){ int i; peermessages = malloc(nbproc * sizeof(struct Communication)); commsize = nbproc; for (i = 0 ; i < nbproc ; i++ ) initonecomm(&peermessages[i]); init_pnd_mesgs_list();}int inline getpeerfdsets(fd_set * writeset, SockInfo * mpipeer, int max){ int i; int res = max; for (i = 0 ; i < commsize ; i++ ) { if( ftp_msgsend_needed(mpipeer, i) ) { FD_SET(mpipeer[i].fd, writeset); /* add socket to write set */ if(mpipeer[i].fd > res) res = mpipeer[i].fd; continue; } if (ll_is_empty(&(peermessages[i].tosend))) /* no message to send to peer i */ continue; if (mpipeer[i].fd < 0) /* no connection to peer i */ continue; FD_SET(mpipeer[i].fd, writeset); /* add socket to write set */ if(mpipeer[i].fd > res) res = mpipeer[i].fd; } return res;}void addtopeersend(flitvec *fv){ int dst = ntohl(((pkt_header *)fv->flittab[0].data)->iDst); ll_add_tail(&(peermessages[dst].tosend), fv);}void prependtopeersend(flitvec *fv){ int dst = ntohl(((pkt_header *)fv->flittab[0].data)->iDst); flitvec *first; /* first check that prepending now is safe */ if( peermessages[dst].tosend.head != NULL ) { first = (flitvec *)peermessages[dst].tosend.head->data; ASSERT( (first->cflitpos == 0) && (first->cflitindex == 0) ); } ll_add_head(&(peermessages[dst].tosend), fv);}/* send a (or part of a) message to one mpi peer. message is the head of peermessage[rank].tosend * @param rank : rank of the mpi peer to send to * @return * -1 if disconnected * 0 if not ready * 1 if a part but not the total has been sent * 2 if the write is complete */int on_send (int rank, SockInfo * mpipeer){ int n; pkt_header *hpkt; hpkt=(pkt_header *)(( (flitvec*)peermessages[rank].tosend.head->data)->flittab[0].data); /* header of next message */ printi("send", "Send message %s", format_packet( hpkt ) ); ASSERT(hpkt != NULL); n = sendvec(mpipeer[rank].fd, (flitvec *)( peermessages[rank].tosend.head->data)); printi("send", "Sent %s", n<0?"disconnect":(n==1?"whole flitvec":"part of the flitvec")); if (n < 0) { printi ("send","ERROR writing to socket %d: %s", mpipeer[rank].fd, strerror (errno)); return -1; } if (n == 1) { printi("send", "removing the head of %d", rank); release_flitvec( (flitvec*) peermessages[rank].tosend.head->data); ll_remove_head(&(peermessages[rank].tosend)); /* next message to be send */ printi("send", "returning 2"); return 2; } printi("send", "returning 1"); return 1;}static void do_disconnect(int rank, SockInfo *mpipeer){ int i; printi("disconnection","peer %d disconnected, cleaning socket structure", rank); /* socket information to reset for peer 'rank' */ on_mpipeer_disconnect (rank, mpipeer); /* emptying list of message in communication buffer */ while(!ll_is_empty(&(peermessages[rank].tosend))) /* while exists a flitvec to send in channel */ { flitvec * vec = (flitvec *) peermessages[rank].tosend.head->data; /* free data inside flitvec (by calling oncomplete() function of the flit ) */ for (i = vec->cflitindex ; i < vec->nbflit ; i++ ) vec->flittab[i].oncomplete(&(vec->flittab[i]), vec->parameter ); /* free flitvec itself */ release_flitvec(vec); ll_remove_head(&(peermessages[rank].tosend)); } if(!all_is_done()) ftp_on_disconnect(mpipeer, rank);}void writedaemon(int rank, SockInfo * mpipeer){ printi ("fdset", "mpipeer[%d] is write ready in state %d\n", rank, mpipeer[rank].hstate); if(mpipeer[rank].hstate != CO_DONE) { printi("writedaemon","sending reconnection information"); if(ftp_connection_write(mpipeer, rank) < 0) { printw("peer %d has broken connection during connection",rank); do_disconnect(rank, mpipeer); } } else { if (on_send (rank, mpipeer) < 0) { printw("peer %d has broken connection while sending",rank); do_disconnect (rank, mpipeer); } } printi("writedaemon", "returning");}void readdaemon(int rank, SockInfo * mpipeer){ int n; printi ("fdset", "mpipeer[%d] is read ready in state %d\n", rank, mpipeer[rank].hstate); if( mpipeer[rank].hstate != CO_DONE ) { if ((ftp_connection_read(mpipeer, rank)) < 0) { printi("warning", "peer %d has broken connection during connection",rank); do_disconnect (rank, mpipeer); return; } } else { pkt_header pkt; if( peermessages[rank].toreceive == NULL ) { printi ("Recv", "reading header from rank %d", rank); if( _urecv( mpipeer[rank].fd, &pkt, sizeof(pkt_header), 0 ) != sizeof(pkt_header) ) { printi("warning", "peer %d has broken connection", rank); do_disconnect(rank, mpipeer); return; } printi ("Recv", "creating the reception flitvec from rank %d, based on header : %s", rank, format_packet(&pkt)); peermessages[rank].toreceive = ftp_create_newfv( &pkt ); if( peermessages[rank].toreceive == NULL ) { printi("ckpt-info","checkpoint information from rank %d", rank); return; } } printi ("Recv", "reading data from rank %d", rank); n = receivevec( mpipeer[rank].fd, peermessages[rank].toreceive); if(n == 0) printi("Recv", "reception flitvec partially read"); if(n == 1) { printi("Recv", "reception flitvec completely read"); release_flitvec(peermessages[rank].toreceive); peermessages[rank].toreceive = NULL; } if (n < 0) { printw("peer %d has broken connection while receiving",rank); do_disconnect (rank, mpipeer); return; } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -