📄 select.c
字号:
/*** Writeset */ max = getpeerfdsets(writeset, mpipeer, max);#undef FD_SETm return max + 1;}#ifdef DEBUGchar *fdtostr(int fd){ static char *pot[] = { "connect", "readpipe", "writepipe", "checkpoint pipe", "checkpoint scheduler", "event logger", "dispatcher", "checkpoint server protocol", "checkpoint server data", "checkpoint local file"}; static char oth[64]; int ran; if(fd == -1) { snprintf(oth, 64, "undefined (-1)"); return oth; } if(fd == scon) return pot[0]; if(fd == smpi_read) return pot[1]; if(fd == smpi_write) return pot[2]; if(fd == sockCSCHED) return pot[4]; if(fd == sockEL) return pot[5]; if(fd == disp_fd) return pot[6]; if(cin) { if(fd == cin->pipe) return pot[3]; if(fd == cin->sock.proto) return pot[7]; if(fd == cin->sock.data) return pot[8]; if(fd == cin->sock.file) return pot[9]; } for(ran = 0; ran < sizeofmpipeer; ran++) { if(fd == mpipeer[ran].cofd) { snprintf(oth, 64, "cofd of %d", ran); return oth; } if( fd == mpipeer[ran].fd ) { snprintf(oth, 64, "fd of %d", ran); return oth; } } snprintf(oth, 64, "unregistered socket (%d)", fd); return oth;}#define _debug_preselectprinti() \ { \ int i; \ int npfds = 0; \ char msg[64]; \ char rmsg[65500]; \ char wmsg[65500]; \ \ sprintf (rmsg, "Readset waits on : "); \ sprintf (wmsg, "Writeset waits on : "); \ for (i = 0; i < maxfdset; i++) \ { \ if (FD_ISSET (i, &readset)) \ { \ npfds++; \ sprintf (msg, ", %s", fdtostr(i)); \ strcat (rmsg, msg); \ } \ if (FD_ISSET (i, &writeset)) \ { \ sprintf (msg, ", %s", fdtostr(i)); \ strcat (wmsg, msg); \ } \ } \ printi ("pre_FDSET", " total fd waited: %d, %s", npfds, rmsg); \ printi ("pre_FDSET", "%s", wmsg); \ }#define _debug_postselectprinti() \ { \ int i; \ char msg[64]; \ char rmsg[65500]; \ char wmsg[65500]; \ sprintf (rmsg, "Readset return : "); \ sprintf (wmsg, "Writeset return : "); \ for (i = 0; i < maxfdset; i++) \ { \ if (FD_ISSET (i, &readset)) \ { \ sprintf (msg, ", %s", fdtostr(i)); \ strcat (rmsg, msg); \ } \ if (FD_ISSET (i, &writeset)) \ { \ sprintf (msg, ", %s", fdtostr(i)); \ strcat (wmsg, msg); \ } \ } \ printi ("FDSET", "%s", rmsg); \ printi ("FDSET", "%s", wmsg); \ }#else#define _debug_preselectprinti() do {} while(0)#define _debug_postselectprinti() do {} while(0)#endifint all_is_done(void){ if( AllPeerHasFinalized && ChildIsDead ) { printi("finalize", "all is done. Returning"); close (disp_fd); disp_fd = -1; ftp_finalize(); return 1; } else { printi("finalize", "all is not done, since %s %s", AllPeerHasFinalized?"":"not all peer has finalized", ChildIsDead?"":"my child is still alive"); return 0; }}static void on_child_death(int s){ printi("finalize", "my child is dead"); ChildIsDead = 1; wait(&s);}void daemonselect(){ int ran; /* erreur -> mpipeer.fd = -1 */ fd_set readset; /* set of fd on read */ fd_set writeset; /* set of fd on write */ int maxfdset; /* max fd num */ /* for interrupt of the child */ signal(SIGCHLD, on_child_death); siginterrupt(SIGCHLD, 1); while (1) { debut_boucle: FD_ZERO(&readset); FD_ZERO(&writeset); maxfdset = getallfdsets (&readset, &writeset); if( ChildIsDead ) if(all_is_done()) goto out_while; _debug_preselectprinti(); if(select (maxfdset, &readset, &writeset, NULL, NULL) == -1) { printi("finalize", "select interrupted (%s)", strerror(errno)); if((errno != EAGAIN) && (errno != EINTR)) qerror ("select"); else { printi("finalize", "ChildIsDead ? %s", ChildIsDead?"yes":"no"); printi("finalize", "errno = %d (EINTR=%d)", errno, EINTR); if( (errno == EINTR) && (ChildIsDead) ) if(all_is_done()) goto out_while; printw("select: %s", strerror(errno)); continue; } } _debug_postselectprinti(); /*** LOCAL MPI */ /* if something to read from local mpi process, read it */ if(FD_ISSET(smpi_read, &readset)) on_request(); /*** EVENT LOGGER COMMS (first, as for pessimistic it is latency critical) */ /* if ready to write events to event logger */ if((sockEL >= 0) && FD_ISSET(sockEL, &writeset)) ftp_sendtoEL(sockEL); /* if receiving information from event logger */ if((sockEL >= 0) && FD_ISSET(sockEL, &readset)) ftp_receivefromEL(sockEL); /*** COMMUNICATIONS BETWEEN DAEMONS */ /* for all in mpipeer, read and write if necessary */ for(ran = 0; ran < sizeofmpipeer; ran++) { /* if mpi peer not connected, pass */ if (mpipeer[ran].fd == -1) continue; if(FD_ISSET (mpipeer[ran].fd, &writeset)) writedaemon(ran, mpipeer); if(mpipeer[ran].fd != -1 && FD_ISSET (mpipeer[ran].fd, &readset)) readdaemon(ran , mpipeer); } /*** CONNEXION ESTABLISHMENT (Connect/Accept) */ /* verify status of all our connect request */ for(ran = 0; ran < sizeofmpipeer; ran++) { if(mpipeer[ran].cofd == -1) continue; // risque de plantage sur deconnection adverse. if(FD_ISSET(mpipeer[ran].cofd, &writeset)) { printi ("fdset", "cofd[%d] is write ready (trying to connect to %d has resulted)", ran, ran); on_connect (mpipeer, ran); goto debut_boucle; } if(FD_ISSET (mpipeer[ran].cofd, &readset)) { printi ("fdset", "cofd[%d] is read ready (trying to connect to %d has resulted)", ran, ran); on_connected (mpipeer, ran); goto debut_boucle; } } /* verify status of all our accept request */ if (FD_ISSET (scon, &readset)) { printi ("fdset", "scon is read ready (mpi peer in connecting to us)"); on_accept(scon); } /* if new connection reveals rank : goto debut_boucle to avoid readdaemon() */ if(on_accepted(mpipeer, &readset)) goto debut_boucle; /*** CHECKPOINT SCHEDULER COMMS */ if((sockCSCHED >= 0) && FD_ISSET (sockCSCHED, &writeset)) if(ftp_csched_write(sockCSCHED) < 0) sockCSCHED = -1; if((sockCSCHED >= 0) && FD_ISSET (sockCSCHED, &readset)) if(ftp_csched_read (sockCSCHED) < 0) sockCSCHED = -1; /*** CHECKPOINT SERVER COMMS */ /* something to send to the checkpoint server */ if(cin) { if(((cin->pipe != -1) && FD_ISSET(cin->pipe, &readset)) || ((cin->sock.data != -1) && FD_ISSET(cin->sock.data, &writeset)) || ((cin->sock.file != -1) && FD_ISSET(cin->sock.file, &writeset))) { switch(pckpt_genericwrite()) { case -1 : qerror("Failure during checkpoint writting"); case 1 : pckpt_end(); default : ; } } } /*** DISPATCHER COMMS */ if(FD_ISSET(disp_fd, &readset)) { int ack; _urecv (disp_fd, &ack, sizeof (int),0); AllPeerHasFinalized = 1; release_driver(); if(all_is_done()) goto out_while; } printi("select", "Select loop ended"); } /* end while(1) */ out_while: pckpt_finalize();}void gestSignal(int numSignal){ if(numSignal == SIGSEGV) printw("Segmentation fault"); else printw ("exiting after SIGTERM/SIGINT"); if (sockEL >= 0) { close (sockEL); sockEL = -1; } printw ("socket to event logger closed"); if (sockCSCHED >= 0) { close (sockCSCHED); sockCSCHED = -1; } printw ("socket to scheduler closed"); if (scon >= 0) { close (scon); scon = -1; } printw ("listen socket closed"); close_all_sockets(); printw ("all peer sockets closed"); if (smpi_read >= 0) { close (smpi_read); smpi_read = -1; } if (smpi_write >= 0) { close (smpi_write); smpi_write = -1; } printw ("unix socket closed and wait for local mpi to exit"); wait (NULL); printi ("select","daemon exit(0)"); fflush(stderr); exit (0);}void dispatcher_mpi_exit(){ int msg = htonl(FINALIZE_MSG); IHaveFinalized = 1; printi ("finalizing", "received local mpi finalize"); /* Sending finalize message to dispatcher */ _usend (disp_fd, &msg, sizeof (int),0);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -