⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 test_peer.cc

📁 xorp源码hg
💻 CC
📖 第 1 页 / 共 2 页
字号:
	error_string = c_format("comm_set_reuseaddr failed: %s\n",				comm_get_last_error_str());	return false;    }    if (comm_sock_bind(s, reinterpret_cast<struct sockaddr *>(&local))	!= XORP_OK) {	comm_sock_close(s);	error_string = c_format("comm_sock_bind failed: %s",				comm_get_last_error_str());	return false;    }    _bind = s;    return true;}boolTestPeer::send(const vector<uint8_t>& data, string& error_string){    debug_msg("len: %u\n", XORP_UINT_CAST(data.size()));    if (!_s.is_valid()) {	XLOG_WARNING("Not connected");	error_string = "Not connected";	return false;    }    size_t len = data.size();    uint8_t *buf = new uint8_t[len];    size_t i;    for(i = 0; i < len; i++)	buf[i] = data[i];    _async_writer->add_buffer(buf, len,			      callback(this, &TestPeer::send_complete));    _async_writer->start();    if(_verbose && _bgp)	printf("Sending: %s", bgppp(buf, len).c_str());    return true;}voidTestPeer::send_complete(AsyncFileWriter::Event ev, const uint8_t *buf,			const size_t buf_bytes, const size_t offset){    switch (ev) {    case AsyncFileOperator::DATA:	debug_msg("event: data\n");	if (offset == buf_bytes)	    delete[] buf;	break;    case AsyncFileOperator::FLUSHING:	debug_msg("event: flushing\n");	debug_msg("Freeing Buffer for sent packet: %p\n", buf);	delete[] buf;	break;    case AsyncFileOperator::OS_ERROR:	debug_msg("event: error\n");	/* Don't free the message here we'll get it in the flush */	XLOG_ERROR("Writing buffer failed: %s",  strerror(errno));    case AsyncFileOperator::END_OF_FILE:	XLOG_ERROR("End of File: %s",  strerror(errno));    case AsyncFileOperator::WOULDBLOCK:	// do nothing	;    }}boolTestPeer::zap(XorpFd& fd, const char *msg){    debug_msg("%s = %s\n", msg, fd.str().c_str());    if (!fd.is_valid())	return true;    XorpFd tempfd = fd;    fd.clear();   _eventloop.remove_ioevent_cb(tempfd);   debug_msg("Removing I/O event cb for fd = %s\n", tempfd.str().c_str());   if (comm_sock_close(tempfd) == -1) {	XLOG_WARNING("Close of %s failed: %s", msg, comm_get_last_error_str());	return false;   }   return true;;}boolTestPeer::disconnect(string& error_string){    debug_msg("\n");    if (!_s.is_valid()) {	XLOG_WARNING("Not connected");	error_string = "Not connected";	return false;    }    delete _async_writer;    _async_writer = 0;    return ZAP(_s);}voidTestPeer::reset(){    debug_msg("\n");    delete _async_writer;    _async_writer = 0;    ZAP(_s);    ZAP(_listen);    ZAP(_bind);}boolTestPeer::terminate(string& /*error_string*/){    debug_msg("\n");    _done = true;    return true;}/*** Process incoming connection attempts.*/voidTestPeer::connect_attempt(XorpFd fd, IoEventType type){    debug_msg("\n");    if (type != IOT_ACCEPT) {	XLOG_WARNING("Unexpected IoEventType %d", type);	return;    }    if (_s.is_valid()) {	XLOG_WARNING("Connection attempt while already connected");	return;    }    XLOG_ASSERT(_listen == fd);    _listen.clear();    XorpFd connfd = comm_sock_accept(fd);    debug_msg("Incoming connection attempt %s\n", connfd.str().c_str());    /*    ** We don't want any more connection attempts so remove ourselves    ** from the eventloop and close the file descriptor.    */   _eventloop.remove_ioevent_cb(fd);   debug_msg("Removing I/O event cb for fd = %s\n", fd.str().c_str());   if (XORP_ERROR == comm_sock_close(fd))	XLOG_WARNING("Close failed");   /*   ** If there is a coordinator then add an I/O event callback.   */   if(0 != _coordinator.length() &&       !_eventloop.add_ioevent_cb(connfd, IOT_READ,				callback(this, &TestPeer::receive))) {	comm_sock_close(connfd);	XLOG_WARNING("Failed to add socket %s to eventloop",			connfd.str().c_str());	return;    }   /*   ** Set up the async writer.   */    if (XORP_ERROR == comm_sock_set_blocking(connfd, COMM_SOCK_NONBLOCKING)) {        XLOG_FATAL("Failed to go non-blocking: %s", comm_get_last_error_str());    }    delete _async_writer;   _async_writer = new AsyncFileWriter(_eventloop, connfd);   _s = connfd;}/*** Process incoming bytes*/voidTestPeer::receive(XorpFd fd, IoEventType type){    debug_msg("\n");    if (type != IOT_READ) {	XLOG_WARNING("Unexpected IoEventType %d", type);	return;    }    if (0 == _coordinator.length()) {	XLOG_WARNING("No coordinator configured");	return;    }    /*    ** If requested perform BGP packetisation.    */    int len;    if(!_bgp) {	uint8_t buf[64000];	if(-1 == (len = recv(fd, (char *)buf, sizeof(buf), 0))) {	    string error = c_format("read error: %s", strerror(errno));	    XLOG_WARNING("%s", error.c_str());	    datain(ERROR, _bgp_buf, _bgp_bytes, error);	    return;	}	datain(GOOD, _bgp_buf, _bgp_bytes);	return;    }    /*    ** Now doing BGP packetisation.    */    int get;    if(_bgp_bytes < BGPPacket::COMMON_HEADER_LEN) {	get = BGPPacket::COMMON_HEADER_LEN - _bgp_bytes;    } else {	uint16_t length = extract_16(_bgp_buf + BGPPacket::LENGTH_OFFSET);	get = length - _bgp_bytes;    }    if (-1 == (len = recv(fd, (char *)&_bgp_buf[_bgp_bytes], get, 0))) {	string error = c_format("read error: %s", strerror(errno));	XLOG_WARNING("%s", error.c_str());	datain(ERROR, _bgp_buf, _bgp_bytes, error);	_bgp_bytes = 0;	comm_sock_close(fd);	_s.clear();	_eventloop.remove_ioevent_cb(fd);	return;    }    if (0 == len) {	if(0 < _bgp_bytes)	    datain(GOOD, _bgp_buf, _bgp_bytes);	datain(CLOSED, 0, 0);	_bgp_bytes = 0;	comm_sock_close(fd);	_s.clear();	_eventloop.remove_ioevent_cb(fd);	return;    }    _bgp_bytes += len;    if (_bgp_bytes >= BGPPacket::COMMON_HEADER_LEN) {	uint16_t length = extract_16(_bgp_buf + BGPPacket::LENGTH_OFFSET);	if (length < BGPPacket::MINPACKETSIZE || length > sizeof(_bgp_buf)) {	    string error = c_format("Illegal length value %d", length);	    XLOG_ERROR("%s", error.c_str());	    datain(ERROR, _bgp_buf, _bgp_bytes, error);	    _bgp_bytes = 0;	    return;	}	if(length == _bgp_bytes) {	    datain(GOOD, _bgp_buf, _bgp_bytes);	    _bgp_bytes = 0;	}    }}/*** Send data received on the socket back to the coordinator*/voidTestPeer::datain(status st, uint8_t *ptr, size_t len, string error){    debug_msg("status = %d len = %u error = %s\n", st, XORP_UINT_CAST(len),	      error.c_str());    if (_verbose) {	switch(st) {	case GOOD:	    if(_bgp)		printf("Received: %s", bgppp(ptr, len).c_str());	    break;	case CLOSED:	    printf("Connection closed by peer\n");	    break;	case ERROR:	    printf("Error on connection\n");	    break;	}    }    queue_data(st, ptr, len, error);}/*** It is possible to overrun the XRL code by sending too many** XRL's. Therefore queue the data that needs to be sent and only have** a small number of outstanding XRL's at any time.*/voidTestPeer::queue_data(status st, uint8_t *ptr, size_t len, string error){    Queued q;    TimeVal tv;    _eventloop.current_time(tv);    q.secs = tv.sec();    q.micro = tv.usec();    q.st = st;    q.len = len;    for(size_t i = 0; i < len; i++)	q.v.push_back(ptr[i]);    q.error = error;    _xrl_queue.push(q);    sendit();}/*** If there is any queued data and less than the allowed quota is in** flight send some more.*/voidTestPeer::sendit(){    if(_flying >= FLYING_LIMIT)	return;    if(_xrl_queue.empty())	return;    Queued q = _xrl_queue.front();    _xrl_queue.pop();    XrlDatainV0p1Client datain(&_xrlrouter);    debug_msg("%u\n", XORP_UINT_CAST(q.v.size()));    XLOG_ASSERT(q.len == q.v.size());    _flying++;    if (q.len == 0) {	//	// XXX: Note that we don't consider it an error even if	// "q.st == ERROR".	// This is needed to preserve the original logic that considers	// the case of a peer resetting immediately the TCP	// connection not an error.	//	datain.send_closed(_coordinator.c_str(), _server, _genid,			   callback(this, &TestPeer::xrl_callback));	return;    }    if (q.st == ERROR) {	datain.send_error(_coordinator.c_str(), _server, _genid, q.error,			  callback(this, &TestPeer::xrl_callback));	return;    }    datain.send_receive(_coordinator.c_str(), _server, _genid,			GOOD == q.st ? true : false,			q.secs, q.micro,			q.v,			callback(this, &TestPeer::xrl_callback));}voidTestPeer::xrl_callback(const XrlError& error){    debug_msg("callback %s\n", error.str().c_str());    if(XrlError::OKAY() != error) {	if(XrlError::REPLY_TIMED_OUT() == error)	    XLOG_ERROR("%s: Try reducing FLYING_LIMIT",  error.str().c_str());	XLOG_ERROR("%s",  error.str().c_str());    }    _flying--;    XLOG_ASSERT(_flying >= 0);    sendit();}static voidusage(const char *name){    fprintf(stderr,	    "usage: %s [-h (finder host)] [-s server name] [-v][-t]\n", name);    exit(-1);}intmain(int argc, char **argv){    XorpUnexpectedHandler x(xorp_unexpected_handler);    //    // Initialize and start xlog    //    xlog_init(argv[0], NULL);    xlog_set_verbose(XLOG_VERBOSE_HIGH);    // XXX: verbosity of the error messages temporary increased    xlog_level_set_verbose(XLOG_LEVEL_ERROR, XLOG_VERBOSE_HIGH);    xlog_add_default_output();    xlog_start();    setvbuf(stdout, (char *)NULL, _IOLBF, 0);    int c;    string finder_host = FinderConstants::FINDER_DEFAULT_HOST().str();    const char *server = SERVER;    bool verbose = false;    bool trace = false;    while((c = getopt (argc, argv, "h:s:vt")) != EOF) {	switch(c) {	case 'h':	    finder_host = optarg;	    break;	case 's':	    server = optarg;	    break;	case 'v':	    verbose = true;	    break;	case 't':	    trace = true;	    break;	case '?':	    usage(argv[0]);	}    }    try {	EventLoop eventloop;	XrlStdRouter router(eventloop, server, finder_host.c_str());	TestPeer test_peer(eventloop, router, server, verbose);	XrlTestPeerTarget xrl_target(&router, test_peer, trace);	wait_until_xrl_router_is_ready(eventloop, router);	while(!test_peer.done()) {	    eventloop.run();	}    } catch(...) {	xorp_catch_standard_exceptions();    }    //    // Gracefully stop and exit xlog    //    xlog_stop();    xlog_exit();    debug_msg("Bye!\n");    return 0;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -