📄 chord_client.c
字号:
/* * * Copyright (C) 2000 Frans Kaashoek (kaashoek@lcs.mit.edu) * Copyright (C) 2001 Frans Kaashoek (kaashoek@lcs.mit.edu) and * Frank Dabek (fdabek@lcs.mit.edu). * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the * "Software"), to deal in the Software without restriction, including * without limitation the rights to use, copy, modify, merge, publish, * distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so, subject to * the following conditions: * * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * */#include "chord.h"#include <misc_utils.h>#include <id_utils.h>#include <location.h>#include <locationtable.h>#include "comm.h"#include "route.h"#include <transport_prot.h>#include <modlogger.h>#define trace modlogger ("chord")#include <configurator.h>static struct chord_config_init { chord_config_init ();} cci;chord_config_init::chord_config_init (){ bool ok = true;#define set_int Configurator::only ().set_int#define set_str Configurator::only ().set_str ok = ok && set_int ("chord.max_vnodes", 1024); ok = ok && set_int ("chord.nsucc", 16); ok = ok && set_int ("chord.npred", 1); ok = ok && set_int ("chord.ncoords", 3); ok = ok && set_str ("chord.rpc_mode", "stp"); ok = ok && set_int ("chord.lookup_timeout", 15); /** use the greedy metric instead. Probably desirable if toes are * enabled. */ ok = ok && set_int ("chord.greedy_lookup", 0); /** try to terminate retrieves early if the hop has * returned a "sufficient" number of successors. */ ok = ok && set_int ("chord.find_succlist_shaving", 1); ok = ok && set_int ("chord.checkdead_interval", 60); ok = ok && set_int ("chord.checkdead_max", 960); assert (ok);#undef set_int#undef set_str}chord::chord (str host, int port, vnode_producer_t p, int nvnodes, int max_cache) : max_vnodes (0), myname (host), myport (port), fd_dgram (-1), fd_stream (-1), x_dgram (NULL), nrcv (NULL), rpcm (NULL), locations (NULL){ bool ok (false); ok = Configurator::only ().get_int ("chord.max_vnodes", max_vnodes); assert (ok); if (nvnodes > max_vnodes) fatal << "Requested more than allowed vnodes: " << nvnodes << ">" << max_vnodes << "\n"; if (nvnodes < 1) { warn << "Creating at least one vnode.\n"; nvnodes = 1; } str rpcstr; ok = Configurator::only ().get_str ("chord.rpc_mode", rpcstr); assert (ok); chord_rpc_style = CHORD_RPC_STP; if (rpcstr == "tcp" || rpcstr == "TCP") chord_rpc_style = CHORD_RPC_SFST; else if (rpcstr == "udp" || rpcstr == "UDP") chord_rpc_style = CHORD_RPC_SFSU; nrcv = New refcounted<u_int32_t>; *nrcv = 0; switch (chord_rpc_style) { case CHORD_RPC_STP: rpcm = New refcounted<stp_manager> (nrcv); break; case CHORD_RPC_SFSU: rpcm = New refcounted<rpc_manager> (nrcv); break; case CHORD_RPC_SFST: case CHORD_RPC_SFSBT: rpcm = New refcounted<tcp_manager> (nrcv); break; default: fatal << "bad chord_rpc_style value: " << chord_rpc_style << "\n"; } /* In case myport == 0, need to initialize to something in order * to create chordIDs during vnode initialization */ myport = initxprt (myport, SOCK_DGRAM, &fd_dgram); myport = initxprt (myport, SOCK_STREAM, &fd_stream); warnx << "chord: running on " << myname << ":" << myport << "\n"; locations = New refcounted<locationtable> (max_cache); srandom ((unsigned int) (getusec() & 0xFFFFFFFF)); for (int i = 0; i < nvnodes; i++) { chordID newID = make_chordID (myname, myport, i); warnx << gettime () << ": creating new vnode: " << newID << "\n"; Coord coords; ptr<location> l = locations->insert (newID, myname, myport, i, coords, 30, 0, 1, true); assert (l); locations->pin (newID); ptr<vnode> vnodep = (*p) (mkref (this), rpcm, l); vnodes.insert (newID, vnodep); vlist.push_back (vnodep); }}intchord::initxprt (int myp, int type, int *fd){ in_addr my_addr; inet_aton (myname.cstr (), &my_addr); *fd = inetsocket (type, myp, ntohl (my_addr.s_addr)); if (*fd < 0) fatal ("binding %s addr %s port %d: %m\n", (type == SOCK_DGRAM ? "UDP" : "TCP"), myname.cstr (), myp); if (myp == 0) { struct sockaddr_in addr; socklen_t len = sizeof (addr); bzero (&addr, sizeof (addr)); if (getsockname (*fd, (sockaddr *) &addr, &len) < 0) fatal ("getsockname failed %m\n"); myp = ntohs (addr.sin_port); } return myp;}voidchord::startchord (){ assert (fd_stream > 0 || fd_dgram > 0); if (fd_dgram > 0) { x_dgram = axprt_dgram::alloc (fd_dgram, sizeof(sockaddr), 230000); ptr<asrv> s = asrv::alloc (x_dgram, transport_program_1); s->setcb (wrap (mkref(this), &chord::dispatch, s)); } if (fd_stream > 0) { int ret = listen (fd_stream, 1000); if (ret < 0) fatal ("listen (%d, 1000): %m\n", fd_stream); fdcb (fd_stream, selread, wrap (this, &chord::tcpclient_cb, fd_stream)); }}voidchord::tcpclient_cb (int srvfd){ int fd = accept (srvfd, NULL, NULL); if (fd < 0) warn << "chord: accept failed " << strerror (errno) << "\n"; else { ptr<axprt> x = axprt_stream::alloc (fd, 230000); ptr<asrv> s = asrv::alloc (x, transport_program_1); s->setcb (wrap (mkref(this), &chord::dispatch, s)); }}ptr<vnode>chord::get_vnode (unsigned int i){ if (i > vlist.size ()) return NULL; return vlist[i];}size_tchord::num_vnodes (void){ return vlist.size ();}voidchord::join (str wellknownhost, int wellknownport, bool failok){ chord_node wkn; bzero (&wkn, sizeof (wkn)); wkn.r.hostname = wellknownhost; wkn.r.port = wellknownport ? wellknownport : myport; wkn.x = make_chordID (wkn.r.hostname, wkn.r.port); wkn.vnode_num = 0; //make up info about the age and knownup for this entry wkn.age = 60; wkn.knownup = 600; wkn.coords.setsize (NCOORD + USING_HT); // Make up some random initial information for this other node. for (int i = 0; i < NCOORD + USING_HT; i++) wkn.coords[i] = (int) 0.0; wkn.e = -1; ptr<location> wellknown_node = vlist[0]->my_location (); if (myname != wellknownhost || myport != wellknownport) { wellknown_node = locations->insert (wkn); if (!wellknown_node) fatal << "Well known host failed to verify! Bailing.\n"; } if (vlist[0]->my_ID () == wellknown_node->id ()) { for (size_t i = 0; i < vlist.size (); i++) { vlist[i]->stabilize (); } } else { for (size_t i = 0; i < vlist.size (); i++) { vlist[i]->join (wellknown_node, wrap (this, &chord::join_cb, failok)); } }}voidchord::join_cb (bool failok, ptr<vnode> v, chordstat s){ if (s != CHORD_OK) { warnx << "chord::join failed " << s << "\n"; if (!failok) fatal << "Exiting!\n"; }}voidchord::stats (){ warnx << "CHORD NODE STATS\n"; warnx << "# vnodes: " << vlist.size () << "\n"; for (size_t i = 0; i < vlist.size (); i++) vlist[i]->stats (); rpcm->stats ();}voidchord::print (strbuf &outbuf){ for (size_t i = 0; i < vlist.size (); i++) vlist[i]->print (outbuf);}voidchord::stop () { for (size_t i = 0; i < vlist.size (); i++) vlist[i]->stop ();}voidchord::stabilize (){ for (size_t i = 0; i < vlist.size (); i++) vlist[i]->stabilize ();}const rpc_program *chord::get_program (int progno){ for (unsigned int i = 0; i < handledProgs.size (); i++) if (progno == (int)handledProgs[i]->progno) return handledProgs[i]; return NULL;}boolchord::isHandled (int progno) { for (u_int i = 0; i < handledProgs.size (); i++) if (progno == (int)handledProgs[i]->progno) return true; return false;}voidchord::handleProgram (const rpc_program &prog) { warn << "chord::handleProgram: " << prog.name << " (" << prog.progno << ")\n"; if (isHandled (prog.progno)) return; else { handledProgs.push_back (&prog); }}voidchord::dispatch (ptr<asrv> s, svccb *sbp){ if (!sbp) { s->setcb (NULL); return; } (*nrcv)++; dorpc_arg *arg = sbp->Xtmpl getarg<dorpc_arg> (); switch (sbp->proc ()) { case TRANSPORTPROC_NULL: sbp->reply (NULL); break; case TRANSPORTPROC_DORPC: { chordID v = make_chordID (arg->dest); vnode *vnodep = vnodes[v]; if (!vnodep) { trace << "unknown vnode " << v << " for procedure " << sbp->proc () << " (" << arg->progno << "." << arg->procno << ").\n"; sbp->replyref (rpcstat (DORPC_UNKNOWNNODE)); return; } //find the program const rpc_program *prog = get_program (arg->progno); if (!prog) { sbp->replyref (rpcstat (DORPC_NOHANDLER)); return; } //unmarshall the args char *arg_base = (char *)(arg->args.base ()); int arg_len = arg->args.size (); xdrmem x (arg_base, arg_len, XDR_DECODE); xdrproc_t proc = prog->tbl[arg->procno].xdr_arg; assert (proc); void *unmarshalled_args = prog->tbl[arg->procno].alloc_arg (); if (!proc (x.xdrp (), unmarshalled_args)) { warn << "dispatch: error unmarshalling arguments: " << arg->progno << "." << arg->procno << " from " << v <<"\n"; xdr_delete (prog->tbl[arg->procno].xdr_arg, unmarshalled_args); sbp->replyref (rpcstat (DORPC_MARSHALLERR)); return; } //call the handler user_args *ua = New user_args (sbp, unmarshalled_args, prog, arg->procno, arg->send_time); vnodep->fill_user_args (ua); if (!vnodep->progHandled (arg->progno)) { trace << "dispatch to vnode " << v << " doesn't handle " << arg->progno << "." << arg->procno << "\n"; ua->replyref (chordstat (CHORD_NOHANDLER)); } else { cbdispatch_t dispatch = vnodep->getHandler(arg->progno); (dispatch)(ua); } } break; default: warn << "Transport procedure " << sbp->proc () << " not handled\n"; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -