📄 comm.c
字号:
/* * * Copyright (C) 2000 Frans Kaashoek (kaashoek@lcs.mit.edu) * Copyright (C) 2001 Frans Kaashoek (kaashoek@lcs.mit.edu) and * Frank Dabek (fdabek@lcs.mit.edu). * Copyright (C) 2001 Frans Kaashoek (kaashoek@lcs.mit.edu), * Frank Dabek (fdabek@lcs.mit.edu) and * Emil Sit (sit@lcs.mit.edu). * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the * "Software"), to deal in the Software without restriction, including * without limitation the rights to use, copy, modify, merge, publish, * distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so, subject to * the following conditions: * * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * */// Chord's RPC manager is designed to perform flow control on all// Chord RPCs. It maintains a cache of hosts (i.e. IP addresses)// that it has connected to and maintains statistics about latency// and such to those hosts. It uses these statistics to calculate// optimal window sizes and delays.// #define __J__ 1#include <crypt.h>#include <chord_prot.h>#include <misc_utils.h>#include "comm.h"#include <location.h>#include "modlogger.h"#include "coord.h"chord_rpc_style_t chord_rpc_style (CHORD_RPC_STP);ihash<str, rpcstats, &rpcstats::key, &rpcstats::h_link> rpc_stats_tab;u_int64_t rpc_stats_lastclear (getusec ());static inline rpcstats *getstats (int progno, int procno){ str key = strbuf ("%d:%d", progno, procno); rpcstats *stats = rpc_stats_tab[key]; if (!stats) { stats = New rpcstats (key); rpc_stats_tab.insert (stats); } return stats;}voidtrack_call (const rpc_program &prog, int procno, size_t b){ rpcstats *stats = getstats (prog.progno, procno); stats->ncall++; stats->call_bytes += b;}voidtrack_rexmit (const rpc_program &prog, int procno, size_t b){ rpcstats *stats = getstats (prog.progno, procno); stats->nrexmit++; stats->rexmit_bytes += b;}voidtrack_rexmit (int progno, int procno, size_t b){ rpcstats *stats = getstats (progno, procno); stats->nrexmit++; stats->rexmit_bytes += b;}voidtrack_reply (const rpc_program &prog, int procno, size_t b){ rpcstats *stats = getstats (prog.progno, procno); stats->nreply++; stats->reply_bytes += b;}voidtrack_proctime (const rpc_program &prog, int procno, u_int64_t l){ rpcstats *stats = getstats (prog.progno, procno); // Don't bother with floats; this comes from getusec which is // probably more resolution that actually makes much sense anyway. if (stats->latency_ewma == 0) stats->latency_ewma = l; else stats->latency_ewma = (9 * stats->latency_ewma + l) / 10;}// -----------------------------------------------------rpc_state::rpc_state (ptr<location> from, ref<location> l, aclnt_cb c, cbtmo_t _cb_tmo, long s, int p, void *out) : loc (l), from (from), cb (c), progno (p), seqno (s), b (NULL), rexmits (0), cb_tmo (_cb_tmo), out (out){ ID = l->id (); in_window = true;};// -----------------------------------------------------hostinfo::hostinfo (const net_address &r) : host (r.hostname), nrpc (0), maxdelay (0), a_lat (0.0), a_var (0.0), fd (-2), orpc (0){}// -----------------------------------------------------const float rpc_manager::GAIN (0.2);rpc_manager::rpc_manager (ptr<u_int32_t> _nrcv) : a_lat (0.0), a_var (0.0), c_err (0.0), c_err_rel (0.0), c_var (0.0), nrpc (0), nrpcfailed (0), nsent (0), npending (0), nrcv (_nrcv){ warn << "CREATED RPC MANAGER\n"; int dgram_fd = inetsocket (SOCK_DGRAM); if (dgram_fd < 0) fatal << "Failed to allocate dgram socket\n"; dgram_xprt = axprt_dgram::alloc (dgram_fd, sizeof(sockaddr), 230000); if (!dgram_xprt) fatal << "Failed to allocate dgram xprt\n"; next_xid = &random_getword;}voidrpc_manager::stats () { char buf[1024]; warnx << "RPC MANAGER STATS:\n"; warnx << "total # of RPCs: good " << nrpc << " failed " << nrpcfailed << "\n"; warnx << buf << " Per link avg. RPC latencies\n"; for (hostinfo *h = hosts.first (); h ; h = hosts.next (h)) { warnx << " host " << h->host << " # RPCs: " << h->nrpc << " (" << h->orpc << " outstanding)\n"; sprintf (buf, " Average latency: %f\n" " Average variance: %f\n", h->a_lat, h->a_var); warnx << buf; sprintf (buf, " Max latency: %qd\n", h->maxdelay); warnx << buf; }}floatrpc_manager::get_a_lat (ptr<location> l){ hostinfo *h = lookup_host (l->address ()); return h->a_lat;}floatrpc_manager::get_a_var (ptr<location> l){ hostinfo *h = lookup_host (l->address ()); return h->a_var;}voidrpc_manager::remove_host (hostinfo *h){}hostinfo *rpc_manager::lookup_host (const net_address &r){ str key = strbuf () << r.hostname << ":" << r.port << "\n"; hostinfo *h = hosts[key]; if (!h) { if (hosts.size () > max_host_cache) { hostinfo *o = hostlru.first; hostlru.remove (o); hosts.remove (o); remove_host (o); delete (o); } h = New hostinfo (r); h->key = key; hostlru.insert_tail (h); hosts.insert (h); } else { // record recent access hostlru.remove (h); hostlru.insert_tail (h); } assert (h); return h;}longrpc_manager::doRPC (ptr<location> from, ptr<location> l, const rpc_program &prog, int procno, ptr<void> in, void *out, aclnt_cb cb, cbtmo_t cb_tmo){ ref<aclnt> c = aclnt::alloc (dgram_xprt, prog, (sockaddr *)&(l->saddr ())); // Make sure that there is an entry in the table for this guy. (void) lookup_host (l->address ()); u_int64_t sent = getusec (); c->call (procno, in, out, wrap (this, &rpc_manager::doRPCcb, cb, l, sent)); return 0;}longrpc_manager::doRPC_dead (ptr<location> l, const rpc_program &prog, int procno, ptr<void> in, void *out, aclnt_cb cb){ return doRPC (NULL, l, prog, procno, in, out, cb, NULL);}voidrpc_manager::doRPCcb (aclnt_cb realcb, ptr<location> l, u_int64_t sent, clnt_stat err){ if (err) { nrpcfailed++; l->set_alive (false); } else { nrpc++; // Only update latency on successful RPC. // This probably includes time needed for rexmits. u_int64_t now = getusec (); // prevent overflow, caused by time reversal if (now >= sent) { u_int64_t lat = now - sent; update_latency (NULL, l, lat); } else { warn << "*** Ignoring timewarp: sent " << sent << " > now " << now << "\n"; } } (realcb) (err);}voidrpc_manager::update_latency (ptr<location> from, ptr<location> l, u_int64_t lat){ nrpc++; //update global latency float err = (lat - a_lat); a_lat = a_lat + GAIN*err; if (err < 0) err = -err; a_var = a_var + GAIN*(err - a_var); //update per-host latency hostinfo *h = lookup_host (l->address ()); if (h) { h->nrpc++; if (h->a_lat == 0 && h->a_var == 0) h->a_lat = lat; else { err = (lat - h->a_lat); h->a_lat = h->a_lat + GAIN*err; if (err < 0) err = -err; h->a_var = h->a_var + GAIN*(err - h->a_var); } if (lat > h->maxdelay) h->maxdelay = lat; // Copy info over to just this location l->inc_nrpc (); l->set_distance (h->a_lat); l->set_variance (h->a_var); } //do the coordinate variance if available if (from && l && from->coords ().size () > 0 && l->coords ().size () > 0) { float predicted = Coord::distance_f (from->coords (), l->coords ()); float sample_err = (lat - predicted); /* warn << "To " << l->id () << " " << (int)sample_err << " " << (int)lat << " " << (int)predicted << " " << (int)c_err << " " << (int)c_var << " " << (int)(c_err_rel*1000) << "\n"; */ if (sample_err < 0) sample_err = -sample_err; float rel_err = sample_err/lat; c_err = (c_err*49 + sample_err)/50; c_err_rel = (c_err_rel*49 + rel_err)/50; c_var = c_var + GAIN*(sample_err - c_var); }}// -----------------------------------------------------longtcp_manager::doRPC (ptr<location> from, ptr<location> l, const rpc_program &prog, int procno, ptr<void> in, void *out, aclnt_cb cb, cbtmo_t cb_tmo){ // hack to avoid limit on wrap()'s number of arguments RPC_delay_args *args = New RPC_delay_args (from, l, prog, procno, in, out, cb, NULL); if (chord_rpc_style == CHORD_RPC_SFSBT) { tcpconnect (l->saddr ().sin_addr, ntohs (l->saddr ().sin_port), wrap (this, &tcp_manager::doRPC_tcp_connect_cb, args)); } else { hostinfo *hi = lookup_host (l->address ()); if (hi->fd == -2) { //no connect initiated // weird: tcpconnect wants the address in NBO, and port in HBO hi->fd = -1; // signal pending connect tcpconnect (l->saddr ().sin_addr, ntohs (l->saddr ().sin_port), wrap (this, &tcp_manager::doRPC_tcp_connect_cb, args)); } else if (hi->fd == -1) { //connect pending, add to waiters hi->connect_waiters.push_back (args); } else if (hi->fd > 0) { //already connected send_RPC (args); } } return 0;}longtcp_manager::doRPC_dead (ptr<location> l, const rpc_program &prog, int procno, ptr<void> in, void *out, aclnt_cb cb){ return doRPC (NULL, l, prog, procno, in, out, cb, NULL);}voidtcp_manager::remove_host (hostinfo *h) { // unnecessary SO_LINGER already set // tcp_abort (h->fd); h->fd = -2; h->xp = NULL; while (h->connect_waiters.size ()) { RPC_delay_args *a = h->connect_waiters.pop_front (); a->cb (RPC_CANTSEND); delete a; }}voidtcp_manager::send_RPC (RPC_delay_args *args){ hostinfo *hi = lookup_host (args->l->address ()); if (!hi->xp) { delaycb (0, 0, wrap (this, &tcp_manager::send_RPC_ateofcb, args)); } else if (hi->xp->ateof()) { hostlru.remove (hi); hostlru.insert_tail (hi); args->l->set_alive (false); remove_host (hi); delaycb (0, 0, wrap (this, &tcp_manager::send_RPC_ateofcb, args)); } else { hi->orpc++; args->now = getusec (); ptr<aclnt> c = aclnt::alloc (hi->xp, args->prog); c->call (args->procno, args->in, args->out, wrap (this, &tcp_manager::doRPC_tcp_cleanup, c, args)); }}voidtcp_manager::send_RPC_ateofcb (RPC_delay_args *args){ (args->cb) (RPC_CANTSEND); delete args;}voidtcp_manager::doRPC_tcp_connect_cb (RPC_delay_args *args, int fd){ hostinfo *hi = lookup_host (args->l->address ()); if (fd < 0) { warn << "locationtable: connect failed: " << strerror (errno) << "\n"; (args->cb) (RPC_CANTSEND); args->l->set_alive (false); remove_host (hi); delete args; } else { struct linger li; li.l_onoff = 1; li.l_linger = 0; setsockopt (fd, SOL_SOCKET, SO_LINGER, (char *) &li, sizeof (li)); tcp_nodelay (fd); make_async(fd); hi->fd = fd; hi->xp = axprt_stream::alloc (fd); assert (hi->xp); send_RPC (args); while (hi->connect_waiters.size ()) send_RPC (hi->connect_waiters.pop_front ()); }}voidtcp_manager::doRPC_tcp_cleanup (ptr<aclnt> c, RPC_delay_args *args, clnt_stat err){ hostinfo *hi = lookup_host (args->l->address ()); if (err) { nrpcfailed++; } else { nrpc++; if (hi) hi->nrpc++; } if (hi) hi->orpc--; (*args->cb)(err); delete args;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -