📄 server.c
字号:
/* * * Copyright (C) 2000 Frans Kaashoek (kaashoek@lcs.mit.edu) * Copyright (C) 2001 Frans Kaashoek (kaashoek@lcs.mit.edu) and * Frank Dabek (fdabek@lcs.mit.edu). * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the * "Software"), to deal in the Software without restriction, including * without limitation the rights to use, copy, modify, merge, publish, * distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so, subject to * the following conditions: * * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * */#include "chord_impl.h"#include "stabilize.h"#include "succ_list.h"#include "pred_list.h"#include <coord.h>#include "comm.h"#include <location.h>#include <locationtable.h>#include <math.h>#include <configurator.h>#include <modlogger.h>#define warning modlogger ("vnode", modlogger::WARNING)#define info modlogger ("vnode", modlogger::INFO)#define trace modlogger ("vnode", modlogger::TRACE)const int aclnttrace (getenv ("ACLNT_TRACE") ? atoi (getenv ("ACLNT_TRACE")) : 0);const bool aclnttime (getenv ("ACLNT_TIME"));void vnode_impl::get_successor (ptr<location> n, cbchordID_t cb){ // warn << "get successor of " << n << "\n"; ngetsuccessor++; chord_noderes *res = New chord_noderes (CHORD_OK); ptr<chordID> v = New refcounted<chordID> (n->id ()); doRPC (n, chord_program_1, CHORDPROC_GETSUCCESSOR, v, res, wrap (mkref (this), &vnode_impl::get_successor_cb, n->id (), cb, res));}voidvnode_impl::get_successor_cb (chordID n, cbchordID_t cb, chord_noderes *res, clnt_stat err) { if (err) { chord_node bad; // warnx << "get_successor_cb: RPC failure " << err << "\n"; cb (bad, CHORD_RPCFAILURE); } else if (res->status) { chord_node bad; // warnx << "get_successor_cb: RPC error " << res->status << "\n"; cb (bad, res->status); } else { cb (make_chord_node (*res->resok), CHORD_OK); } delete res;}voidvnode_impl::get_succlist (ptr<location> n, cbchordIDlist_t cb){ ngetsucclist++; chord_nodelistres *res = New chord_nodelistres (CHORD_OK); ptr<chordID> v = New refcounted<chordID> (n->id ()); doRPC (n, chord_program_1, CHORDPROC_GETSUCCLIST, v, res, wrap (mkref (this), &vnode_impl::get_succlist_cb, cb, res));}voidvnode_impl::get_predlist (ptr<location> n, cbchordIDlist_t cb){ ngetsucclist++; chord_nodelistres *res = New chord_nodelistres (CHORD_OK); ptr<chordID> v = New refcounted<chordID> (n->id ()); //we can use the same callback doRPC (n, chord_program_1, CHORDPROC_GETPREDLIST, v, res, wrap (mkref (this), &vnode_impl::get_succlist_cb, cb, res));}voidvnode_impl::get_succlist_cb (cbchordIDlist_t cb, chord_nodelistres *res, clnt_stat err){ vec<chord_node> nlist; if (err) { cb (nlist, CHORD_RPCFAILURE); } else if (res->status) { cb (nlist, res->status); } else { // xxx there must be something more intelligent to do here for (unsigned int i = 0; i < res->resok->nlist.size (); i++) { chord_node n = make_chord_node (res->resok->nlist[i]); nlist.push_back (n); //update the coordinates if this node is in the location table ptr<location> l = locations->lookup (n.x); if (l) { l->set_coords (n); } } cb (nlist, CHORD_OK); } delete res;}void vnode_impl::get_predecessor (ptr<location> n, cbchordID_t cb){ ptr<chordID> v = New refcounted<chordID> (n->id ()); ngetpredecessor++; chord_noderes *res = New chord_noderes (CHORD_OK); doRPC (n, chord_program_1, CHORDPROC_GETPREDECESSOR, v, res, wrap (mkref (this), &vnode_impl::get_predecessor_cb, n->id (), cb, res));}voidvnode_impl::get_predecessor_cb (chordID n, cbchordID_t cb, chord_noderes *res, clnt_stat err) { if (err) { chord_node bad; cb (bad, CHORD_RPCFAILURE); } else if (res->status) { chord_node bad; cb (bad, res->status); } else { cb (make_chord_node (*res->resok), CHORD_OK); } delete res;}voidvnode_impl::find_succlist (const chordID &x, u_long m, cbroute_t cb, ptr<chordID> guess){ route_iterator *ri = produce_iterator_ptr (x); ri->first_hop (wrap (this, &vnode_impl::find_succlist_hop_cb, cb, ri, m), guess);}voidvnode_impl::find_succlist_hop_cb (cbroute_t cb, route_iterator *ri, u_long m, bool done){ static bool shave = true; static bool initialized = false; if (!initialized) { int x = 1; assert (Configurator::only ().get_int ("chord.find_succlist_shaving", x)); shave = (x == 1); initialized = true; } vec<chord_node> cs = ri->successors (); if (done) { cb (cs, ri->path (), ri->status ()); delete ri; return; } if (shave) { size_t left = 0; if (cs.size () < m) left = cs.size (); else left = cs.size () - m; for (size_t i = 1; i < left; i++) { if (betweenrightincl (cs[i-1].x, cs[i].x, ri->key ())) { trace << myID << ": find_succlist (" << ri->key () << "): skipping " << i << " nodes.\n"; cs.popn_front (i); cb (cs, ri->path (), ri->status ()); delete ri; return; } } } ri->next_hop ();}voidvnode_impl::find_successor (const chordID &x, cbroute_t cb){ nfindsuccessor++; find_route (x, wrap (mkref (this), &vnode_impl::find_successor_cb, x, cb));}voidvnode_impl::find_successor_cb (chordID x, cbroute_t cb, vec<chord_node> s, route search_path, chordstat status){ if (status != CHORD_OK) { warnx << "find_successor_cb: find successor of " << x << " failed: " << status << "\n"; } else { nhops += search_path.size (); if (search_path.size () > nmaxhops) nmaxhops = search_path.size (); } cb (s, search_path, status);}voidvnode_impl::find_route (const chordID &x, cbroute_t cb) { route_iterator *ri = produce_iterator_ptr (x); ri->first_hop(wrap (this, &vnode_impl::find_route_hop_cb, cb, ri), NULL);}voidvnode_impl::find_route_hop_cb (cbroute_t cb, route_iterator *ri, bool done){ if (done) { cb (ri->successors (), ri->path (), ri->status ()); delete ri; } else { ri->next_hop (); }}voidvnode_impl::notify (ptr<location> n, chordID &x){ ptr<chord_nodearg> na = New refcounted<chord_nodearg>; chordstat *res = New chordstat; nnotify++; // warnx << gettime () << ": notify " << n << " about " << x << "\n"; locations->lookup (x)->fill_node (na->n); doRPC (n, chord_program_1, CHORDPROC_NOTIFY, na, res, wrap (mkref (this), &vnode_impl::notify_cb, n->id (), res));}voidvnode_impl::notify_cb (chordID n, chordstat *res, clnt_stat err){ if (err || *res) { if (err) warnx << "notify_cb: RPC failure " << n << " " << err << "\n"; else { warnx << "notify_cb: RPC error " << n << " " << *res << "\n"; ptr<location> nl = locations->lookup (n); nl->set_alive (false); } } delete res;}voidvnode_impl::alert (ptr<location> n, ptr<location> x){ ptr<chord_nodearg> na = New refcounted<chord_nodearg>; chordstat *res = New chordstat; nalert++; warnx << "alert: " << x->id() << " died; notify " << n->id () << "\n"; x->fill_node (na->n); doRPC (n, chord_program_1, CHORDPROC_ALERT, na, res, wrap (mkref (this), &vnode_impl::alert_cb, res));}voidvnode_impl::alert_cb (chordstat *res, clnt_stat err){ if (err) { warnx << "alert_cb: RPC failure " << err << "\n"; } else if (*res != CHORD_OK) { warnx << "alert_cb: returns " << *res << "\n"; } delete res;}voidvnode_impl::addHandler (const rpc_program &prog, cbdispatch_t cb) { dispatch_record *rec = New dispatch_record (prog.progno, cb); dispatch_table.insert (rec); chordnode->handleProgram (prog);};boolvnode_impl::progHandled (int progno) { return (dispatch_table[progno] != NULL);}cbdispatch_t vnode_impl::getHandler (unsigned long prog) { dispatch_record *rec = dispatch_table[prog]; assert (rec); return rec->cb;}voidvnode_impl::register_upcall (int progno, cbupcall_t cb){ upcall_record *uc = New upcall_record (progno, cb); upcall_table.insert (uc);}void vnode_impl::fill_user_args (user_args *a){ a->me_ = me_;}user_args::~user_args () { if (args) xdr_delete (prog->tbl[procno].xdr_arg, args);}void user_args::fill_from (chord_node *from){ dorpc_arg *t_arg = transport_header (); *from = make_chord_node (t_arg->src);#if 0 /**** I don't think this is needed any more ****/ const struct sockaddr_in *sa = (struct sockaddr_in *)sbp->getsa (); if (sa) { from->r.hostname = inet_ntoa (sa->sin_addr); } else { // connected sockets don't have the addr field set in the sbp, so // we have to dig harder bool hasname = false; const ptr<asrv> srv = sbp->getsrv (); const ref<axprt> x = srv->xprt (); axprt_stream *xs = 0; // make a guess that this is a stream if (x->reliable && x->connected && (xs = static_cast<axprt_stream *>(sbp->getsrv ()->xprt ().get ()))) { int fd = xs->getfd (); sockaddr_in sin; bzero (&sin, sizeof (sin)); socklen_t sinlen = sizeof (sin); if (getpeername (fd, (sockaddr *) &sin, &sinlen) == 0) { from->r.hostname = inet_ntoa (sin.sin_addr); hasname = true; } } if (!hasname) warn << "XXX cannot run fill_from on stream (?) connection\n"; }#endif /* 0 */}void user_args::replyref (const int &res){ reply ((void *)&res);}voiduser_args::reply (void *res){ //marshall result xdrproc_t inproc = prog->tbl[procno].xdr_res; xdrsuio x (XDR_ENCODE); if ((!inproc) || (!inproc (x.xdrp (), res))) fatal << "couldn't marshall result\n"; int res_len = x.uio ()->resid (); void *marshalled_res = NULL; if (res_len > 0) marshalled_res = suio_flatten (x.uio ()); track_reply (*prog, procno, res_len); //stuff into a transport wrapper dorpc_res *rpc_res = New dorpc_res (DORPC_OK); me_->fill_node (rpc_res->resok->src); rpc_res->resok->send_time_echo = send_time; rpc_res->resok->results.setsize (res_len); if (res_len > 0) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -