⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 server.c

📁 chord 源码 http://pdos.csail.mit.edu/chord/
💻 C
字号:
/* * * Copyright (C) 2001  Frank Dabek (fdabek@lcs.mit.edu),  *                     Frans Kaashoek (kaashoek@lcs.mit.edu), *   		       Massachusetts Institute of Technology *  * *  Permission is hereby granted, free of charge, to any person obtaining *  a copy of this software and associated documentation files (the *  "Software"), to deal in the Software without restriction, including *  without limitation the rights to use, copy, modify, merge, publish, *  distribute, sublicense, and/or sell copies of the Software, and to *  permit persons to whom the Software is furnished to do so, subject to *  the following conditions: * *  The above copyright notice and this permission notice shall be *  included in all copies or substantial portions of the Software. * *  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, *  EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF *  MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND *  NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE *  LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION *  OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION *  WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * */#include <arpc.h>#include <rpctypes.h>#include <chord.h>#include "dhash_common.h"#include "dhash_impl.h"#include "dhashcli.h"#include <dhash_prot.h>#include <chord_types.h>#include <id_utils.h>#include <location.h>#include <locationtable.h>#include <misc_utils.h>#include <merkle_server.h>#include <merkle_sync_prot.h>#include "dhblock_srv.h"#include "dhblock_chash_srv.h"#include "dhblock_keyhash_srv.h"#include "dhblock_noauth_srv.h"#ifdef DMALLOC#include <dmalloc.h>#endif#include <modlogger.h>#define warning modlogger ("dhash", modlogger::WARNING)#define info  modlogger ("dhash", modlogger::INFO)#define trace modlogger ("dhash", modlogger::TRACE)#include <configurator.h>struct dhash_config_init {  dhash_config_init ();} dci;dhash_config_init::dhash_config_init (){  bool ok = true;#define set_int Configurator::only ().set_int  /** Whether or not to drop writes */  ok = ok && set_int ("dhash.drop_writes", 0);  /** Should replication run initially? */  ok = ok && set_int ("dhash.start_maintenance", 1);    ok = ok && set_int ("dhash.repair_timer",  300);  //plab hacks  ok = ok && set_int ("dhash.disable_db_env", 0);  assert (ok);#undef set_int}// Things that read from Configurator#define DECL_CONFIG_METHOD(name,key)			\u_long							\dhash::name ()						\{							\  static bool initialized = false;			\  static int v = 0;					\  if (!initialized) {					\    initialized = Configurator::only ().get_int (key, v);	\    assert (initialized);				\  }							\  return v;						\}DECL_CONFIG_METHOD(reptm, "dhash.repair_timer")DECL_CONFIG_METHOD(dhash_disable_db_env, "dhash.disable_db_env")#undef DECL_CONFIG_METHOD// Pure virtual destructors still need definitionsdhash::~dhash () {}ref<dhash>dhash::produce_dhash (ptr<vnode> v, str dbname, cbv donecb){  return New refcounted<dhash_impl> (v, dbname, donecb);}dhash_impl::~dhash_impl (){}dhash_impl::dhash_impl (ptr<vnode> node, str dbname, cbv dcb) :  host_node (node),  cli (NULL),  bytes_stored (0),  keys_stored (0),  keys_replicated (0),  keys_cached (0),  keys_others (0),  bytes_served (0),  keys_served (0),  rpc_answered (0),  donecb (dcb){  ptr<dhblock_srv> srv;  cli = New refcounted<dhashcli> (host_node, mkref(this));  ptr<uint> num_to_go = New refcounted<uint>;  (*num_to_go) = 3;  str ext = strbuf () << host_node->my_ID () << ".c";  srv = New refcounted<dhblock_chash_srv> (node, cli, "db file",      dbname, ext, wrap (this, &dhash_impl::srv_ready, num_to_go));  blocksrv.insert (DHASH_CONTENTHASH, srv);  ext = strbuf () << host_node->my_ID () << ".k";  srv = New refcounted<dhblock_keyhash_srv> (node, cli, "keyhash db file",      dbname, ext, wrap (this, &dhash_impl::srv_ready, num_to_go));  blocksrv.insert (DHASH_KEYHASH, srv);  ext = strbuf () << host_node->my_ID () << ".n";  srv = New refcounted<dhblock_noauth_srv> (node, cli, "noauth db file",      dbname, ext, wrap (this, &dhash_impl::srv_ready, num_to_go));  blocksrv.insert (DHASH_NOAUTH, srv);}voiddhash_impl::srv_ready (ptr<uint> num_to_go) {  (*num_to_go)--;  if( *num_to_go == 0 ) {    // RPC demux    trace << host_node->my_ID () << " registered dhash_program_1\n";    host_node->addHandler (dhash_program_1, wrap(this, &dhash_impl::dispatch));        trace << host_node->my_ID () << " registered merklesync_program_1\n";    host_node->addHandler (merklesync_program_1, 			   wrap(this, &dhash_impl::merkle_dispatch));    start_maint ();    (*donecb)();  }}voiddhash_impl::start_maint () {  int v;  bool ok = Configurator::only ().get_int ("dhash.start_maintenance", v);  if (!ok || v)    start (true);}voiddhash_impl::fetchcomplete_done (int nonce, chord_node sender,				dhash_stat err, bool present){  // warn << host_node->my_ID () << ": dhash_impl::fetchcomplete_done: "  //     << nonce << " " << sender << " " << err << "\n";  if (err == DHASH_NOENT) { // block wasn't in db. notify sender.    ref<s_dhash_insertarg> arg = New refcounted<s_dhash_insertarg> ();    //fill in just enough fields so that it marshalls    // only nonce and type will be examined on the other end    arg->key = chordID (0);    arg->data.setsize (0);    arg->nonce = nonce;    arg->type = DHASH_NOENT_NOTIFY;    doRPC (sender, dhash_program_1, DHASHPROC_FETCHCOMPLETE, 	   arg, NULL, aclnt_cb_null);  }  }voiddhash_impl::merkle_dispatch (user_args *sbp){  merkle_stat err (MERKLE_ERR);  dhash_ctype *ctype = sbp->Xtmpl getarg<dhash_ctype> ();  ptr<dhblock_srv> srv = blocksrv[*ctype];  if (srv) {    merkle_server *msrv = srv->mserv ();    if (msrv)      msrv->dispatch (sbp);    else      sbp->replyref (err);  } else     sbp->replyref (err);}longdhash_impl::register_fetch_callback (cbfetch cb) {  fcb_state *fcb = New fcb_state (cb);  fetch_cbs.insert (fcb);    return fcb->nonce;}voiddhash_impl::unregister_fetch_callback (long nonce) {  fcb_state *fcb = fetch_cbs[nonce];  fetch_cbs.remove (fcb);  delete fcb;}voiddhash_impl::dispatch (user_args *sbp) {  rpc_answered++;  switch (sbp->procno) {  case DHASHPROC_FETCHREC:    {      assert (0);      // dhash_fetchrec_arg *arg = sbp->template getarg<dhash_fetchrec_arg> ();      //dofetchrec (sbp, arg);    }    break;  case DHASHPROC_FETCHITER:    {      s_dhash_fetch_arg *farg = sbp->Xtmpl getarg<s_dhash_fetch_arg> ();      blockID id (farg->key, farg->ctype);      dhash_fetchiter_res res (DHASH_INPROGRESS);      chord_node from;      sbp->fill_from (&from);      long nonce = farg->nonce;      sbp->reply (&res);      //farg is garbage below this line -----       ptr<location> requestor = host_node->locations->lookup_or_create (from);      ptr<dhblock_srv> srv = blocksrv[id.ctype];      cli->sendblock (requestor, id, srv,		      wrap (this, &dhash_impl::fetchcomplete_done,			    nonce, from),		      nonce);    }    break;  case DHASHPROC_FETCHCOMPLETE:    {      s_dhash_insertarg *sarg = sbp->Xtmpl getarg<s_dhash_insertarg> ();      fcb_state *fcb = fetch_cbs[sarg->nonce];      if (fcb) {	if (sarg->type == DHASH_NOENT_NOTIFY) {	  (fcb->cb) (str (""), -1, -1);	} else {	  str data (sarg->data.base (), sarg->data.size ());	  (fcb->cb) (data, sarg->offset, sarg->attr.size);	}      } else	warn << host_node->my_ID () << ": unexpected FCB for " << sarg->key 	  << " nonce: " << sarg->nonce 	  << " type: " << sarg->type	  << " offset: " << sarg->offset	  << " totsz: " << sarg->attr.size	  << "\n";            sbp->reply (NULL);    }    break;  case DHASHPROC_STORE:    {      s_dhash_insertarg *sarg = sbp->Xtmpl getarg<s_dhash_insertarg> ();      // What to do about retries??      // e.g. checking if we're responsible (or continuing a partial)      store (sarg, 	     wrap (this, &dhash_impl::storesvc_cb, sbp, sarg));    }    break;  case DHASHPROC_OFFER:    {      dhash_offer_arg *arg = sbp->Xtmpl getarg<dhash_offer_arg> ();      dhash_ctype ctype = DHASH_CONTENTHASH; // XXX      ptr<dhblock_srv> srv = blocksrv[ctype];      if (!srv) {	dhash_offer_res res (DHASH_ERR);	sbp->reply (&res);      } else {	srv->offer (sbp, arg);      }    }    break;  default:    sbp->replyref (PROC_UNAVAIL);    break;  }}void store_after_store (cbstore cb, dhash_stat status);voiddhash_impl::storesvc_cb (user_args *sbp,		         s_dhash_insertarg *arg,		         bool already_present,		         dhash_stat err){  dhash_storeres res (DHASH_OK);  if ((err != DHASH_OK) && (err != DHASH_STORE_PARTIAL))     res.set_status (err);  else {    res.resok->already_present = already_present;    res.resok->source = host_node->my_ID ();    res.resok->done = (err == DHASH_OK);  }  sbp->reply (&res);}void dhash_impl::store (s_dhash_insertarg *arg, cbstore cb){  ptr<dhblock_srv> srv = blocksrv[arg->ctype];  if (!srv) {    cb (false, DHASH_ERR);    return;  }  store_state *ss = pst[arg->key];  if (ss == NULL) {    ss = New store_state (arg->key, arg->attr.size);    pst.insert(ss);  }  if (!ss->addchunk(arg->offset, arg->offset+arg->data.size (), 		    arg->data.base ())) {    cb (false, DHASH_ERR);    return;  }  if (ss->iscomplete()) {    str d (ss->buf, ss->size);    // this will start the store, but it won't be on disk until the cb    // is called.    srv->store (arg->key, d, wrap (&store_after_store, cb));    bytes_stored += ss->size;    pst.remove (ss);    delete ss;    // Wait until store returns to decide what error code to return.    // XXX this needs a "callback" RPC.  } else {    cb (false, DHASH_STORE_PARTIAL);  }  // XXX should throw out very old store states.}voidstore_after_store (cbstore cb, dhash_stat status){  // XXX eliminate already_present flag to simplify this.  cb (false, status);  return; }// --------- utilityvoiddhash_impl::doRPC (const chord_node &n, const rpc_program &prog, int procno,	           ptr<void> in,void *out, aclnt_cb cb,		   cbtmo_t cb_tmo) {  host_node->doRPC (n, prog, procno, in, out, cb, cb_tmo);}voiddhash_impl::doRPC (const chord_node_wire &n, const rpc_program &prog,                   int procno, ptr<void> in,void *out, aclnt_cb cb,		   cbtmo_t cb_tmo) {  host_node->doRPC (make_chord_node (n), prog, procno, in, out, cb, cb_tmo);}voiddhash_impl::doRPC (ptr<location> ID, const rpc_program &prog, int procno,	           ptr<void> in,void *out, aclnt_cb cb,		   cbtmo_t cb_tmo)  {  host_node->doRPC (ID, prog, procno, in, out, cb, cb_tmo);}// ---------- debug ----voiddhash_impl::print_stats () {  warnx << "ID: " << host_node->my_ID () << "\n";  warnx << "Stats:\n";  vec<dstat> ds = stats ();  for (size_t i = 0; i < ds.size (); i++)    warnx << "  " << ds[i].value << " " << ds[i].desc << "\n";}static voidstatscb (vec<dstat> *s, const dhash_ctype &c, ptr<dhblock_srv> srv){  srv->stats (*s);}vec<dstat>dhash_impl::stats (){  vec<dstat> s;  s.push_back(dstat ("new blocks stored", keys_stored));  s.push_back(dstat ("new bytes stored", bytes_stored));  s.push_back(dstat ("keys served", keys_served));  s.push_back(dstat ("bytes served", bytes_served));  s.push_back(dstat ("RPCs answered", rpc_answered));  blocksrv.traverse (wrap (&statscb, &s));  return s;}static voidstartcb (bool randomize, const dhash_ctype &c, ptr<dhblock_srv> srv){  srv->start (randomize);}voiddhash_impl::start (bool randomize){  blocksrv.traverse (wrap (&startcb, randomize));}static voidstopcb (const dhash_ctype &c, ptr<dhblock_srv> srv){  srv->stop ();}voiddhash_impl::stop (){  blocksrv.traverse (wrap (&stopcb));}// ----------------------------------------------------------------------------// store state static voidjoin (store_chunk *c){  store_chunk *cnext;  while (c->next && c->end >= c->next->start) {    cnext = c->next;    if (c->end < cnext->end)      c->end = cnext->end;    c->next = cnext->next;    delete cnext;  }}boolstore_state::gap (){  if (!have)    return true;  store_chunk *c = have;  store_chunk *p = 0;  if (c->start != 0)    return true;  while (c) {    if (p && p->end != c->start)      return true;    p = c;    c = c->next;  }  if (p->end != size)    return true;  return false;}boolstore_state::iscomplete (){  return have && have->start == 0 && have->end == (unsigned)size && !gap ();}boolstore_state::addchunk (unsigned int start, unsigned int end, void *base){  store_chunk **l, *c;  if (start >= end || end > size)    return false;    l = &have;  for (l=&have; *l; l=&(*l)->next) {    c = *l;    // our start touches this block    if (c->start <= start && start <= c->end) {      // we have new data      if (end > c->end) {        memmove (buf+start, base, end-start);        c->end = end;        join(c);      }      return true;    }    // our start comes before this block; break to insert    if (start < c->start)      break;  }  *l = New store_chunk(start, end, *l);  memmove(buf+start, base, end-start);  join(*l);  return true;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -