📄 checkpointserverproto.c
字号:
/* MPICH-V2 Copyright (C) 2002, 2003 Groupe Cluster et Grid, LRI, Universite de Paris Sud This file is part of MPICH-V2. MPICH-V2 is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. MPICH-V2 is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with MPICH-V2; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA $Id: checkpointServerProto.c,v 1.11 2006/01/24 19:35:01 rodrigue Exp $*/#include "config.h"#include "debug.h"#include "checkpointServerProto.h"#include "checkpointServerError.h"#include "checkpointServerFile.h"#include <stdio.h>#include <string.h>#include <unistd.h>#include <errno.h>#include <sys/types.h>#include <sys/stat.h>#include <fcntl.h>#include <sys/socket.h>#include <netdb.h>#include <netinet/in.h>static int get_checkpoint(int sproto, int sdata);static int put_checkpoint(int sproto, int sdata);static int del_checkpoint(int sproto, int sdata);static int recv_type_message(int s);static int recv_get_ids(int s, int *wid, int *rankid, int *seq);static int recv_put_ids(int s,int *wid,int *rankid, int *seq, int *protosize);static char recv_del_ids(int s, int *group, int *rank, int *seq);static int qsend_protofailure(int s, int group, int rank);/*static char *recv_serv(int s);*/#ifndef BUFFSIZE#define BUFFSIZE 32000#endif#define GET 1#define PUT 2#define DEL 3#define DEL_EXACT 'E'#define DEL_UPTO 'U'#define ERROR_CO -1#define ERROR_SYNTAX -2char buff[BUFFSIZE];int traiter_message(int sproto, int sdata){ switch(recv_type_message(sproto)) { case GET : get_checkpoint(sproto, sdata); return 0; case PUT : put_checkpoint(sproto, sdata); return 0; case DEL : del_checkpoint(sproto, sdata); return 0; default : return -1; }}static int get_checkpoint(int sproto, int sdata){ int f; int ret; int group; int rank; int seq; int protosize; int datasize; int i; int c; /* retrieve get request arguments */ if(recv_get_ids(sproto, &group, &rank, &seq) < 0) printq("Cannot read protocol : aborting"); if((f = getopenCheckpointFile(group, rank, seq)) < 0) qsend_protofailure(sproto, group, rank); /* send back serveur response */ if(read(f, buff, sizeof(int) * 5) != (sizeof(int) * 5)) { printe("Cannot read protocol data from checkpoint file %d:%d:%d", group, rank, seq); qsend_protofailure(sproto, group, rank); } if(group != *((int *)buff)) { printw("File stored group %d does not match requested group %d", *((int *) buff), group); qsend_protofailure(sproto, group, rank); } if(rank != *((int *)(buff + sizeof(int)))) { printw("File stored rank %d does not match requested rank %d", *((int *) buff), rank); qsend_protofailure(sproto, group, rank); } if((seq != -1) && (seq != *((int *)(buff + 2 * sizeof(int))))) { printw("File stored seqnumber %d does not match requested %d", *((int *) buff + 2 * sizeof(int)), seq); qsend_protofailure(sproto, group, rank); } seq = *((int *)(buff + 2 * sizeof(int))); protosize = *((int *)(buff + 3 * sizeof(int))); datasize = *((int *)(buff + 4 * sizeof(int))); *buff = 'A'; *((int *)(buff + 1)) = htonl(group); *((int *)(buff + 1 + sizeof(int))) = htonl(rank); *((int *)(buff + 1 + 2 * sizeof(int))) = htonl(seq); *((int *)(buff + 1 + 3 * sizeof(int))) = htonl(protosize); *((int *)(buff + 1 + 4 * sizeof(int))) = htonl(datasize); printi("CS_get", "Sending info message to client %d:%d for checkpoint %d (proto=%d, data=%d)", group, rank, seq, protosize, datasize); if(write(sproto, buff, 1 + sizeof(int) * 5) != (1 + sizeof(int) * 5)) qerror("Sending protocol to client %d:%d(%d)", group, rank, seq); /* send implementation dependant protocol data */ for( ; protosize > 0; ) {#define min(a,b) ((a)<(b)?(a):(b)) if((ret = read(f,buff, min(BUFFSIZE, protosize) )) <= 0) qerror("Could not read more than %d protocol data from file (waited %d)", i, protosize);#undef min#ifdef HARDDEBUG printw("read from f : %08x %08x %08x %08x %08x %08x", *(int*)buff, *((int*)(buff+4)), *((int*)(buff+8)), *((int*)(buff+12)), *((int*)(buff+16)), *((int*)(buff+20)) );#endif if(write(sproto, buff, ret) != ret) qerror("Could not send all of %d protocol data to client %d:%d", ret, group, rank); protosize -= ret; } /* send image data */ for(i = 0; i < datasize; i += ret) { int s; if((ret = read(f, buff, BUFFSIZE)) <= 0) qerror("Could not read more than %d image data from file (waited %d)", i, datasize); s = 0; resend: s = write(sdata, buff+s, ret); if(s != ret) { if( s < 0 ) qerror("unable to send to client %d:%d", group, rank); else { printw("Could send only %d/%d of image data to client %d:%d", s, ret, group, rank); ret -= s; goto resend; } } } shutdown(sproto, 2); shutdown(sdata, 2); read(sproto, &c, 1); read(sdata, &c, 1); printi("CS_get", "Get ACK recieved, closing connection"); close(sproto); close(sdata); return 0;}static int put_checkpoint(int sproto, int sdata){ int f; int ret; int group; int rank; int seq; int protosize; int datasize; fd_set fdset; int maxset; int protoread = 0; int dataread = 0; char c = 'A'; if(recv_put_ids(sproto, &group, &rank, &seq, &protosize) < 0) printq("Cannot recieve correct protocol from client : Aborting"); if((f = putopenCheckpointFile(group, rank, seq)) < 0) printq("Cannot open Checkpoint file for %d:%d : Aborting", group, rank); printi("CS", "Sending 'A'ccept notification to client %d:%d for checkpoint %d", group, rank, seq); if(write(sproto, &c, 1) != 1) { printe("write failed dureing sending of accept message to client %d:%d(%d) (probably client failure)", group, rank, seq); discardCheckpointFile(f, group, rank, seq); printq("Aborting due to client write failure"); } maxset = ((sproto > sdata) ? sproto : sdata) + 1; while(protoread < protosize) { FD_ZERO(&fdset); FD_SET(sproto, &fdset); if(sdata != -1) FD_SET(sdata, &fdset); if(select(maxset , &fdset, NULL , NULL , NULL) < 0) { printe("Select failed during reception of image from %d:%d(%d)", group, rank, seq); discardCheckpointFile(f, group, rank, seq); printq("Aborting due to select failure"); } if(FD_ISSET(sproto, &fdset)) { //printf(stdout,"put_checkpoint: sproto = %s protosize = %d"); if((ret = read(sproto, buff, (((protosize - protoread) > BUFFSIZE) ? BUFFSIZE : protosize - protoread))) < 0) { printe("Read failed on proto channel during reception of checkpoint %d:%d(%d)", group, rank, seq); discardCheckpointFile(f, group, rank, seq); printq("Aborting due to read failure (probably client failure)"); } else if(ret == 0) { printw("Proto channel connection closed before reception of checkpoint %d:%d(%d) finished", group, rank, seq); discardCheckpointFile(f, group, rank, seq); printq("Aborting due to read failure (probably client failure)"); } if(protowriteCheckpointFile(f, protosize, protoread, buff, ret) != ret) { printw("Writting to checkpoint file %d:%d(%d) failed", group, rank, seq); discardCheckpointFile(f, group, rank, seq); printq("Aborting due to internal write failure"); } protoread += ret; } if(FD_ISSET(sdata, &fdset)) { if((ret = read(sdata, buff, BUFFSIZE)) < 0) { printe("Read failed on data channel during reception of checkpoint %d:%d(%d)", group, rank, seq); discardCheckpointFile(f, group, rank, seq); printq("Aborting due to read failure (probably client failure)");
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -