📄 newspeer.c
字号:
#include <async.h>#include <aios.h>#include <rxx.h>#include <qhash.h>#include <dbfe.h>#include "usenet.h"#include "newspeer.h"// XXX should gc newspeers if connections are closed.qhash<str, ref<newspeer> > newspeers;voidfeed_article (str id, const vec<str> &groups){ ptr<peerinfo> pi (NULL); ptr<newspeer> np (NULL); for (size_t i = 0; i < opt->peers.size (); i++) { pi = opt->peers[i]; np = newspeers[pi->peerkey]; if (np == NULL) { np = New refcounted<newspeer> (pi->hostname, pi->port); newspeers.insert (pi->peerkey, np); } for (size_t j = 0; j < groups.size (); j++) { if (pi->desired (groups[j])) { np->queue_article (id); break; } } }}peerinfo::peerinfo (str h, u_int16_t p) : hostname (h), port (p), peerkey (strbuf ("%s:%d", h.cstr (), p)){}peerinfo::~peerinfo (){}boolpeerinfo::add_pattern (str p){ const char *c = p.cstr (); while (*c) { if (!isalnum(*c) && *c != '.' && *c != '*') return false; c++; } patterns.push_back (p); return true;}boolpeerinfo::desired (str group){ // case sensitivity? for (size_t d = 0; d < patterns.size (); d++) { str pattern = patterns[d]; for (size_t i = 0; i < pattern.len () && i < group.len (); i++) { if (pattern[i] == '*' && i == pattern.len () - 1) return true; if (pattern[i] != group[i]) break; } } return false;}u_int64_tnewspeer::totalfedbytes (){ u_int64_t totalout (0); qhash_slot<str, ref<newspeer> > *s = newspeers.first (); while (s) { totalout += s->value->fedoutbytes (); s = newspeers.next (s); } return totalout;}newspeer::newspeer (str h, u_int16_t p) : s (-1), aio (NULL), conncb (NULL), state (HELLO_WAIT), dhtok (false), streamok (false), fedoutbytes_ (0), hostname (h), port (p){ start_feed ();}newspeer::~newspeer (){ if (conncb) { timecb_remove (conncb); conncb = NULL; }}voidnewspeer::reset (){ aio = NULL; // aios destructor will close socket cleanly. s = -1; if (conncb) { timecb_remove (conncb); conncb = NULL; } state = HELLO_WAIT; dhtok = false; streamok = false;}voidnewspeer::start_feed (int t){ conncb = NULL; tcpconnect (hostname, port, wrap (this, &newspeer::feed_connected, t));}voidnewspeer::feed_connected (int t, int ns){ s = ns; if (s < 0) { t *= 2; if (t > 3600) t = 3600; warn << "Connection to " << hostname << ":" << port << " failed: " << strerror (errno) << "; retry in " << t << " seconds.\n"; conncb = delaycb (t, 0, wrap (this, &newspeer::start_feed, t)); return; } aio = aios::alloc (s); aio->settimeout (opt->peer_timeout); aio->setdebug (strbuf("%dp", s)); // Start lockstep state = HELLO_WAIT; aio->readline (wrap (this, &newspeer::process_line));}static rxx emptyrxx ("^\\s*$");voidnewspeer::process_line (const str data, int err){ strbuf prefix ("%dp: ", s); if (err < 0) { warn << prefix << "newspeer aio oops " << err << "\n"; if (err == ETIMEDOUT) { reset (); return; } } if (!data || !data.len()) { warn << prefix << "newspeer data oops\n"; reset (); return; } vec<str> cmdargs; int n = split (&cmdargs, rxx("\\s+"), data); if (n > 0) { str code = cmdargs[0]; if (state == HELLO_WAIT) { if (code == "200") { state = MODE_CHANGE; aio << "MODE STREAM\r\n"; } else { warn << prefix << "Unexpected input from peer: " << data << "\n"; } } else if (state == MODE_CHANGE) { state = DHT_CHECK; streamok = false; if (code[0] == '2') { streamok = true; if (code != "203") warn << prefix << "Unexpected (but ok) response to mode stream: " << data << "\n"; flush_queue (); } else if (code != "500") { warn << prefix << "Unexpected response to mode stream: " << data << "\n"; } } else if (state == DHT_CHECK || state == FEEDING) { if (code == "238") { // CHECK, CHECKDHT if (state == DHT_CHECK) dhtok = true; send_article (cmdargs[1]); } else if (code == "239" || code == "438") { // TAKETHIS, TAKEDHT // fantastic. some article went over ok. // XXX remove from list of articles to send warn << prefix << "sent successfully.\n"; } else if (code == "431" || code == "400" || code == "439") { // something went wrong. try again later... XXX warn << prefix << "dropping to-delay article on the floor " << data << "\n"; } else if (code == "480") { // CHECK*, TAKE* // permission denied. disconnect and go away. warn << prefix << "permission denied!\n"; } else if (code[0] != '5') { warn << prefix << "Unexpected response to check/take command: " << data << "\n"; if (state == DHT_CHECK) dhtok = false; } state = FEEDING; } else fatal << prefix << "UNKNOWN STATE: " << state << "\n"; } else if (emptyrxx.match(data)) { warn << prefix << "empty line\n"; } else { // XXX what is correct error code? aio << "500 What?\r\n"; } aio->readline (wrap (this, &newspeer::process_line)); // XXX}voidnewspeer::flush_queue (){ while (outq.size ()) { str id = outq.pop_front (); if (dhtok) { aio << "CHECKDHT " << id << "\r\n"; } else { aio << "CHECK " << id << "\r\n"; } if (!streamok || state == DHT_CHECK) break; }}voidnewspeer::queue_article (str id){ if (outq.size() > opt->peer_max_queue) { outq.pop_front (); } outq.push_back (id); warn << hostname << ": queued " << id << "\n"; if (!aio) { if (!conncb) start_feed (); } else flush_queue ();}voidnewspeer::send_article (str id){ ptr<dbrec> key, d; key = New refcounted<dbrec> (id, id.len ()); d = header_db->lookup (key); if (!d) { warn << hostname << ": was going to send " << id << " but not in db?!!\n"; return; } str header (d->value, d->len); if (dhtok) { aio << "TAKEDHT " << id << "\r\n"; aio << header << "\r\n.\r\n"; // need extra \r\n for feed parsing other side fedoutbytes_ += id.len () + d->len + 15; } else { // dispatch a body fetch // aio << "TAKETHIS " << id << "\r\n"; warn << hostname << ": would send " << id << " via TAKETHIS.\n"; }}u_int64_tnewspeer::fedoutbytes (bool reset){ u_int64_t x = fedoutbytes_; if (reset) fedoutbytes_ = 0; return x;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -