⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 filestore.c

📁 基于DHT的对等协议
💻 C
字号:
#define BLOCKSIZE 16384// supports files up to (BLOCKSIZE-260)/20 * BLOCKSIZE/20 * BLOCKSIZE// currently:  10815307776 bytes#include <chord.h>#include <sys/types.h>#include <sys/stat.h>#include <libgen.h>#include <string.h>#include <stdio.h>#include <dhash_common.h>#include <dhashclient.h>#include <dhblock.h>dhashclient *dhash;int inflight = 0, maxinflight;FILE *outfile;callback<void>::ptr todo = 0;// write block functions ----------------------------------void insert_cb(dhash_stat s, ptr<insert_info>) {  if(s != DHASH_OK)    warn << "bad store\n";  inflight--;  // wakeup from throttling  if((inflight < maxinflight) &&     (todo != NULL)) {    cbv f = todo;    todo = 0;    f();  } else if(inflight == 0)    exit(0);}chordID write_block(char *buf, int len) {  chordID ID = compute_hash(buf, len);  dhash->insert (ID, buf, len, wrap(&insert_cb));  inflight++;  return ID;}// indirect block -----------------------------------struct indirect {  vec<chordID> hs;  indirect *parent;  void add_hash(chordID h) {    hs.push_back(h);    if(full()) {      // write out indirect block      write_out();    }  }  virtual void print(char *buf, int l) {    if(len() > l) {      warnx << len() << "\n";      fatal("buf too small\n");    }    for(unsigned int i=0; i<hs.size(); i++) {      mpz_get_raw (buf, sha1::hashsize, &hs[i]);      buf += sha1::hashsize;    }  }  chordID write_out(void) {    chordID ID;    char buf[BLOCKSIZE];        if(len() == 0)      return 0; // xxx bug for max-sized file?    print(buf, BLOCKSIZE);    warnx << "len " << len() << "\n";    ID = write_block(buf, len());    clear();    if(parent) {      warnx << "p\n";      parent->add_hash(ID);    }    return ID;  }  void clear(void) {    hs.clear();  }  virtual int len(void) {    return hs.size() * sha1::hashsize;  }  bool full(void) {    return (len() + sha1::hashsize) > BLOCKSIZE;  }  indirect(indirect *p) : parent(p) {}  virtual ~indirect() {}};// inode block -------------------------------------// this is similar to an indirect block, but prepends// a file name and length to it.struct inode : indirect {  char filename[256];  int filelen;  unsigned int extralen, blen;  char *buf;  inode(char *_name, int _len) : indirect(NULL) {    extralen = sizeof(filename) + sizeof(filelen);    strncpy(filename, _name, sizeof(filename));    filelen = _len;  }  inode(ptr<dhash_block> blk) : indirect(NULL) {    extralen = sizeof(filename) + sizeof(filelen);    if (blk->data.len () < extralen)      fatal("incorrect format of inode block\n");    strncpy(filename, blk->data, sizeof(filename));    memcpy(&filelen, blk->data + sizeof(filename), sizeof(filelen));    // Beware! blk may be freed despite our illegal pointer to its mem    buf = (char *) blk->data.cstr () + extralen;    blen = blk->data.len () - extralen;  }  int len(void) {    return indirect::len() + extralen;  }  void print(char *buf, int l) {    if(len() > l) {      warnx << len() << "\n";      fatal("buf too small\n");    }    memcpy(buf, filename, sizeof(filename));    buf += sizeof(filename);    memcpy(buf, &filelen, sizeof(filelen));    buf += sizeof(filelen);    indirect::print(buf, l - extralen);  }  void write_out(void) {    warnx << indirect::write_out() << "\n";  }};// callback for printing out inode info --------------------------void list_cb(dhash_stat st, ptr<dhash_block> bl, vec<chordID> vc) {  if(st != DHASH_OK)    fatal("lost inode\n");  inode i(bl);  warnx << i.filename << ": " << i.filelen << " bytes\n";  exit(0);}// a chain of callbacks for retrieving indirect and file data blocks,// to retrieve the file ----------------------------------------------void gotblock_cb(int len, dhash_stat st, ptr<dhash_block> bl,		 vec<chordID> vc) {  if(st != DHASH_OK) {    warnx << "at " << len;    fatal(" lost block\n");  }  if(fseek(outfile, len, SEEK_SET) != 0)    fatal("fseek failure\n");  if(fwrite(bl->data.cstr (), 1, bl->data.len (), outfile) != bl->data.len ())    fatal("write failure\n");  inflight--;  // wakeup from throttling  if((inflight < maxinflight) &&     (todo != NULL)) {    cbv f = todo;    todo = 0;    f();  } else if(inflight == 0)    exit(0);}void gotindirect_step(int len, unsigned int step,		      dhash_stat st, ptr<dhash_block> bl) {  if(st != DHASH_OK) {    warnx << "at " << len;    fatal(" lost indirect\n");  }  const char *buf = bl->data.cstr ();  chordID ID;  for(unsigned int i=step; i<bl->data.len (); i+=sha1::hashsize) {    if(inflight >= maxinflight) {      // throttling      todo = wrap(&gotindirect_step, len, i, st, bl);      return;    }    mpz_set_rawmag_be (&ID, buf+i, sha1::hashsize);    dhash->retrieve(ID, wrap(&gotblock_cb, 			     len+(i*BLOCKSIZE/sha1::hashsize)));    warnx << "retrieve " << ID << "\n";    inflight++;  }}void gotindirect_cb(int len,		    dhash_stat st, ptr<dhash_block> bl, vec<chordID> vc) {  gotindirect_step(len, 0, st, bl);}void gotinode_cb(dhash_stat st, ptr<dhash_block> bl, vec<chordID> vc) {  if(st != DHASH_OK)    fatal("lost inode\n");  inode in(bl);  outfile = fopen(in.filename, "w");  if(outfile == NULL)    fatal("can't open file for writing\n");  chordID ID;  for(unsigned int i=0; i<(in.blen); i+=sha1::hashsize) {    mpz_set_rawmag_be(&ID, in.buf+i, sha1::hashsize);    dhash->retrieve(ID, wrap(&gotindirect_cb, 			     (BLOCKSIZE/sha1::hashsize)*(i*BLOCKSIZE/20)));    warnx << "retrieve " << ID << "\n";  }}void store_send(FILE *f, indirect *in, inode *n) {  char buf[BLOCKSIZE];  int len;  chordID ID;  do {    if(inflight >= maxinflight) {       // throttling      todo = wrap(&store_send, f, in, n);      return;    }    len = fread(buf, 1, BLOCKSIZE, f);    warnx << "len " << len << " inflight " << inflight << "\n";    ID = write_block(buf, len);    in->add_hash(ID);  } while(len == BLOCKSIZE);  in->write_out();  n->write_out();  delete in;  delete n;}void store(char *name) {  struct stat st;  if(stat(name, &st) == -1)    fatal("couldn't stat file\n");  inode *n = New inode(basename(name), st.st_size);  FILE *f = fopen(name, "r");  if(f == NULL)    fatal("couldn't open file\n");  indirect *in = New indirect(n);  store_send(f, in, n);}int main(int argc, char *argv[]) {  if(argc != 5)    fatal("Usage: filestore sockname -[fls] filename_or_hash"	  " num_RPCs_in_flight\n");  dhash = New dhashclient(argv[1]);  chordID ID;  char *cmd = argv[2];  char *name = argv[3];  maxinflight = atoi(argv[4]);  if(!strcmp(cmd, "-s")) {    // store    store(name);  } else if(!strcmp(cmd, "-l")) {    // list    str2chordID(name, ID);    dhash->retrieve(ID, wrap(&list_cb));  } else if(!strcmp(cmd, "-f")) {    // retrieve    str2chordID(name, ID);    dhash->retrieve(ID, wrap(&gotinode_cb));    warnx << "retrieve " << ID << "\n";  } else {    fatal("Usage: filestore sockname -[fls] filename_or_hash"	  " num_RPCs_in_flight\n");  }  amain();  return 0;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -