📄 testmaster.c
字号:
/* * testmaster.{C,h} * * Copyright (C) 2002 Thomer M. Gil (thomer@lcs.mit.edu) * Massachusetts Institute of Technology * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the * "Software"), to deal in the Software without restriction, including * without limitation the rights to use, copy, modify, merge, publish, * distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so, subject to * the following conditions: * * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */#include "async.h"#include "dns.h"#include "rxx.h"#include "testmaster.h"testmaster::testmaster(char *filename) : _nhosts(0){ test_init(); FILE *f = fopen(filename, "r"); if(f <= 0) fatal << "couldn't open " << filename << ": " << strerror(errno) << "\n"; while(!feof(f)) { char l[1024]; if(!fgets(l, sizeof(l), f)) break; str line = str(l); // comment rxx commentrxx("^\\s*#"); if(commentrxx.search(line)) continue; rxx linerxx("^\\s*(\\S+)\\s+(\\d+)\\s+(\\d+)\\s*$"); if(!linerxx.search(line)) break; _slaves.push_back(New testslave(linerxx[1], atoi(linerxx[2]) ? atoi(linerxx[2]) : DEFAULT_PORT, atoi(linerxx[3]) ? atoi(linerxx[3]) : TESLA_CONTROL_PORT)); } fclose(f);}testmaster::testmaster(vec<str> *names, vec<int> *dhp, vec<int> *tcp) : _nhosts(0){ test_init(); assert(names->size() == dhp->size() && dhp->size() == tcp->size()); for(unsigned i=0; i<names->size(); i++) { _slaves.push_back(New testslave((*names)[i], (*dhp)[i] ? (*dhp)[i] : DEFAULT_PORT, (*tcp)[i] ? (*tcp)[i] : TESLA_CONTROL_PORT)); }}testmaster::~testmaster(){}voidtestmaster::unixdomainsock(int i, str &p2psocket, int &fd){ p2psocket = strbuf() << "/tmp/" << _slaves[i]->name << ":" << _slaves[i]->dhash_port; unlink(p2psocket); fd = unixsocket(p2psocket); if(!fd) fatal << "couldn't open unix socket " << p2psocket << "\n"; make_async(fd); if(listen(fd, 5)) fatal << "listen\n";}voidtestmaster::dry_setup(callback<void>::ref cb){ DEBUG(2) << "dry_setup\n"; _nhosts = _slaves.size(); for(unsigned i = 0; i < _slaves.size(); i++) { str p2psocket; int fd; unixdomainsock(i, p2psocket, fd); // by reference fdcb(fd, selread, wrap(this, &testmaster::accept_connection, fd, 0, true)); dns_hostbyname(_slaves[i]->name, wrap(this, &testmaster::dry_setup_cb, i, p2psocket, cb)); }}voidtestmaster::dry_setup_cb(int i, str p2psocket, callback<void>::ref cb, ptr<hostent> h, int err){ DEBUG(2) << "dry_setup_cb\n"; if(!h) fatal << "dns_hostbyname " << strerror(err) << "\n"; ptr<dhashclient_test> dhc = New refcounted<dhashclient_test>(p2psocket, _slaves[i]->name, (*(in_addr*)h->h_addr).s_addr, _slaves[i]->dhash_port, _slaves[i]->control_port); client *c = New client(i, 0, "", dhc); _clients.insert(c); if(!(--_nhosts)) cb();}voidtestmaster::setup(callback<void>::ref cb){ DEBUG(2) << "setup\n"; for(unsigned i = 0; i < _slaves.size(); i++) { // bogus unix domain socket str p2psocket; int fd; unixdomainsock(i, p2psocket, fd); // by reference DEBUG(2) << "created p2psock " << p2psocket << " for client " << i << "\n"; // create a connection to the other side for this unix socket conthunk tx = { i, p2psocket, _slaves[i], fd, cb }; addnode(tx); _nhosts++; }}// pipes from fake domain socket to remote lsd and backvoidtestmaster::pipe(const int from, const int to){ strbuf b; int r = b.tosuio()->input(from); if(!r) { warn << "connection broken\n"; // XXX: close connection return; } b.tosuio()->output(to);}voidtestmaster::addnode(conthunk tx){ DEBUG(2) << "setting up " << tx.s->name << ":" << tx.s->dhash_port << "\n"; // remove old entry client *c = _clients[tx.id]; if(c) _clients.remove(c); tcpconnect(tx.s->name, tx.s->dhash_port, wrap(this, &testmaster::addnode_cb, tx));}// accepts connection from dhashclient that we just created and sets up pipevoidtestmaster::accept_connection(const int unixsocket_fd, const int there_fd, bool bogus = false){ struct sockaddr_in sin; unsigned sinlen = sizeof(sin); DEBUG(2) << "accept_connection from dhashclient\n"; int here_fd = accept(unixsocket_fd, (struct sockaddr *) &sin, &sinlen); if(bogus) return; if(here_fd >= 0) { // setup pipe between dhashclient and remote slave fdcb(here_fd, selread, wrap(this, &testmaster::pipe, here_fd, there_fd)); fdcb(there_fd, selread, wrap(this, &testmaster::pipe, there_fd, here_fd)); } else if (errno != EAGAIN) fatal << "Could not accept slave connection, errno = " << errno << "\n";}voidtestmaster::addnode_cb(conthunk tx, const int there_fd){ DEBUG(2) << "addnode_cb for " << tx.s->name << ":" << tx.s->dhash_port << "\n"; if(there_fd == -1) { warn << "could not connect to " << tx.s->name << ":" << tx.s->dhash_port << ", error = " << strerror(errno) << "\n"; return; } // resolve the remote hostname dns_hostbyname(tx.s->name, wrap(this, &testmaster::addnode_cb2, tx, there_fd));}voidtestmaster::addnode_cb2(conthunk tx, const int there_fd, ptr<hostent> h, int err){ if(!h) { fatal << "dns_hostbyname failed\n"; } // listen for incoming connection from dhashclient that we are about to // create /* DEBUG(2) << "spawning server behind " << tx.p2psocket << "\n"; if(listen(tx.unixsocket_fd, 5)) fatal << "listen\n"; */ fdcb(tx.unixsocket_fd, selread, wrap(this, &testmaster::accept_connection, tx.unixsocket_fd, there_fd, false)); // create DHash object DEBUG(2) << "creating dhashclient on " << tx.p2psocket << "\n"; make_async(there_fd); ptr<dhashclient_test> dhc = New refcounted<dhashclient_test>(tx.p2psocket, tx.s->name, (*(in_addr*)h->h_addr).s_addr, tx.s->dhash_port, tx.s->control_port); if(!dhc) fatal << "couldn't create dhashclient on " << tx.p2psocket << "\n"; // update hash table client *c = New client(tx.id, tx.s, tx.p2psocket, dhc); _clients.insert(c); // call callback once all slaves are connected if(!(--_nhosts)) tx.cb();}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -