📄 test_peer.cc
字号:
error_string = c_format("comm_set_reuseaddr failed: %s\n", comm_get_last_error_str()); return false; } if (comm_sock_bind(s, reinterpret_cast<struct sockaddr *>(&local)) != XORP_OK) { comm_sock_close(s); error_string = c_format("comm_sock_bind failed: %s", comm_get_last_error_str()); return false; } _bind = s; return true;}boolTestPeer::send(const vector<uint8_t>& data, string& error_string){ debug_msg("len: %u\n", XORP_UINT_CAST(data.size())); if (!_s.is_valid()) { XLOG_WARNING("Not connected"); error_string = "Not connected"; return false; } size_t len = data.size(); uint8_t *buf = new uint8_t[len]; size_t i; for(i = 0; i < len; i++) buf[i] = data[i]; _async_writer->add_buffer(buf, len, callback(this, &TestPeer::send_complete)); _async_writer->start(); if(_verbose && _bgp) printf("Sending: %s", bgppp(buf, len).c_str()); return true;}voidTestPeer::send_complete(AsyncFileWriter::Event ev, const uint8_t *buf, const size_t buf_bytes, const size_t offset){ switch (ev) { case AsyncFileOperator::DATA: debug_msg("event: data\n"); if (offset == buf_bytes) delete[] buf; break; case AsyncFileOperator::FLUSHING: debug_msg("event: flushing\n"); debug_msg("Freeing Buffer for sent packet: %p\n", buf); delete[] buf; break; case AsyncFileOperator::OS_ERROR: debug_msg("event: error\n"); /* Don't free the message here we'll get it in the flush */ XLOG_ERROR("Writing buffer failed: %s", strerror(errno)); case AsyncFileOperator::END_OF_FILE: XLOG_ERROR("End of File: %s", strerror(errno)); case AsyncFileOperator::WOULDBLOCK: // do nothing ; }}boolTestPeer::zap(XorpFd& fd, const char *msg){ debug_msg("%s = %s\n", msg, fd.str().c_str()); if (!fd.is_valid()) return true; XorpFd tempfd = fd; fd.clear(); _eventloop.remove_ioevent_cb(tempfd); debug_msg("Removing I/O event cb for fd = %s\n", tempfd.str().c_str()); if (comm_sock_close(tempfd) == -1) { XLOG_WARNING("Close of %s failed: %s", msg, comm_get_last_error_str()); return false; } return true;;}boolTestPeer::disconnect(string& error_string){ debug_msg("\n"); if (!_s.is_valid()) { XLOG_WARNING("Not connected"); error_string = "Not connected"; return false; } delete _async_writer; _async_writer = 0; return ZAP(_s);}voidTestPeer::reset(){ debug_msg("\n"); delete _async_writer; _async_writer = 0; ZAP(_s); ZAP(_listen); ZAP(_bind);}boolTestPeer::terminate(string& /*error_string*/){ debug_msg("\n"); _done = true; return true;}/*** Process incoming connection attempts.*/voidTestPeer::connect_attempt(XorpFd fd, IoEventType type){ debug_msg("\n"); if (type != IOT_ACCEPT) { XLOG_WARNING("Unexpected IoEventType %d", type); return; } if (_s.is_valid()) { XLOG_WARNING("Connection attempt while already connected"); return; } XLOG_ASSERT(_listen == fd); _listen.clear(); XorpFd connfd = comm_sock_accept(fd); debug_msg("Incoming connection attempt %s\n", connfd.str().c_str()); /* ** We don't want any more connection attempts so remove ourselves ** from the eventloop and close the file descriptor. */ _eventloop.remove_ioevent_cb(fd); debug_msg("Removing I/O event cb for fd = %s\n", fd.str().c_str()); if (XORP_ERROR == comm_sock_close(fd)) XLOG_WARNING("Close failed"); /* ** If there is a coordinator then add an I/O event callback. */ if(0 != _coordinator.length() && !_eventloop.add_ioevent_cb(connfd, IOT_READ, callback(this, &TestPeer::receive))) { comm_sock_close(connfd); XLOG_WARNING("Failed to add socket %s to eventloop", connfd.str().c_str()); return; } /* ** Set up the async writer. */ if (XORP_ERROR == comm_sock_set_blocking(connfd, COMM_SOCK_NONBLOCKING)) { XLOG_FATAL("Failed to go non-blocking: %s", comm_get_last_error_str()); } delete _async_writer; _async_writer = new AsyncFileWriter(_eventloop, connfd); _s = connfd;}/*** Process incoming bytes*/voidTestPeer::receive(XorpFd fd, IoEventType type){ debug_msg("\n"); if (type != IOT_READ) { XLOG_WARNING("Unexpected IoEventType %d", type); return; } if (0 == _coordinator.length()) { XLOG_WARNING("No coordinator configured"); return; } /* ** If requested perform BGP packetisation. */ int len; if(!_bgp) { uint8_t buf[64000]; if(-1 == (len = recv(fd, (char *)buf, sizeof(buf), 0))) { string error = c_format("read error: %s", strerror(errno)); XLOG_WARNING("%s", error.c_str()); datain(ERROR, _bgp_buf, _bgp_bytes, error); return; } datain(GOOD, _bgp_buf, _bgp_bytes); return; } /* ** Now doing BGP packetisation. */ int get; if(_bgp_bytes < BGPPacket::COMMON_HEADER_LEN) { get = BGPPacket::COMMON_HEADER_LEN - _bgp_bytes; } else { uint16_t length = extract_16(_bgp_buf + BGPPacket::LENGTH_OFFSET); get = length - _bgp_bytes; } if (-1 == (len = recv(fd, (char *)&_bgp_buf[_bgp_bytes], get, 0))) { string error = c_format("read error: %s", strerror(errno)); XLOG_WARNING("%s", error.c_str()); datain(ERROR, _bgp_buf, _bgp_bytes, error); _bgp_bytes = 0; comm_sock_close(fd); _s.clear(); _eventloop.remove_ioevent_cb(fd); return; } if (0 == len) { if(0 < _bgp_bytes) datain(GOOD, _bgp_buf, _bgp_bytes); datain(CLOSED, 0, 0); _bgp_bytes = 0; comm_sock_close(fd); _s.clear(); _eventloop.remove_ioevent_cb(fd); return; } _bgp_bytes += len; if (_bgp_bytes >= BGPPacket::COMMON_HEADER_LEN) { uint16_t length = extract_16(_bgp_buf + BGPPacket::LENGTH_OFFSET); if (length < BGPPacket::MINPACKETSIZE || length > sizeof(_bgp_buf)) { string error = c_format("Illegal length value %d", length); XLOG_ERROR("%s", error.c_str()); datain(ERROR, _bgp_buf, _bgp_bytes, error); _bgp_bytes = 0; return; } if(length == _bgp_bytes) { datain(GOOD, _bgp_buf, _bgp_bytes); _bgp_bytes = 0; } }}/*** Send data received on the socket back to the coordinator*/voidTestPeer::datain(status st, uint8_t *ptr, size_t len, string error){ debug_msg("status = %d len = %u error = %s\n", st, XORP_UINT_CAST(len), error.c_str()); if (_verbose) { switch(st) { case GOOD: if(_bgp) printf("Received: %s", bgppp(ptr, len).c_str()); break; case CLOSED: printf("Connection closed by peer\n"); break; case ERROR: printf("Error on connection\n"); break; } } queue_data(st, ptr, len, error);}/*** It is possible to overrun the XRL code by sending too many** XRL's. Therefore queue the data that needs to be sent and only have** a small number of outstanding XRL's at any time.*/voidTestPeer::queue_data(status st, uint8_t *ptr, size_t len, string error){ Queued q; TimeVal tv; _eventloop.current_time(tv); q.secs = tv.sec(); q.micro = tv.usec(); q.st = st; q.len = len; for(size_t i = 0; i < len; i++) q.v.push_back(ptr[i]); q.error = error; _xrl_queue.push(q); sendit();}/*** If there is any queued data and less than the allowed quota is in** flight send some more.*/voidTestPeer::sendit(){ if(_flying >= FLYING_LIMIT) return; if(_xrl_queue.empty()) return; Queued q = _xrl_queue.front(); _xrl_queue.pop(); XrlDatainV0p1Client datain(&_xrlrouter); debug_msg("%u\n", XORP_UINT_CAST(q.v.size())); XLOG_ASSERT(q.len == q.v.size()); _flying++; if (q.len == 0) { // // XXX: Note that we don't consider it an error even if // "q.st == ERROR". // This is needed to preserve the original logic that considers // the case of a peer resetting immediately the TCP // connection not an error. // datain.send_closed(_coordinator.c_str(), _server, _genid, callback(this, &TestPeer::xrl_callback)); return; } if (q.st == ERROR) { datain.send_error(_coordinator.c_str(), _server, _genid, q.error, callback(this, &TestPeer::xrl_callback)); return; } datain.send_receive(_coordinator.c_str(), _server, _genid, GOOD == q.st ? true : false, q.secs, q.micro, q.v, callback(this, &TestPeer::xrl_callback));}voidTestPeer::xrl_callback(const XrlError& error){ debug_msg("callback %s\n", error.str().c_str()); if(XrlError::OKAY() != error) { if(XrlError::REPLY_TIMED_OUT() == error) XLOG_ERROR("%s: Try reducing FLYING_LIMIT", error.str().c_str()); XLOG_ERROR("%s", error.str().c_str()); } _flying--; XLOG_ASSERT(_flying >= 0); sendit();}static voidusage(const char *name){ fprintf(stderr, "usage: %s [-h (finder host)] [-s server name] [-v][-t]\n", name); exit(-1);}intmain(int argc, char **argv){ XorpUnexpectedHandler x(xorp_unexpected_handler); // // Initialize and start xlog // xlog_init(argv[0], NULL); xlog_set_verbose(XLOG_VERBOSE_HIGH); // XXX: verbosity of the error messages temporary increased xlog_level_set_verbose(XLOG_LEVEL_ERROR, XLOG_VERBOSE_HIGH); xlog_add_default_output(); xlog_start(); setvbuf(stdout, (char *)NULL, _IOLBF, 0); int c; string finder_host = FinderConstants::FINDER_DEFAULT_HOST().str(); const char *server = SERVER; bool verbose = false; bool trace = false; while((c = getopt (argc, argv, "h:s:vt")) != EOF) { switch(c) { case 'h': finder_host = optarg; break; case 's': server = optarg; break; case 'v': verbose = true; break; case 't': trace = true; break; case '?': usage(argv[0]); } } try { EventLoop eventloop; XrlStdRouter router(eventloop, server, finder_host.c_str()); TestPeer test_peer(eventloop, router, server, verbose); XrlTestPeerTarget xrl_target(&router, test_peer, trace); wait_until_xrl_router_is_ready(eventloop, router); while(!test_peer.done()) { eventloop.run(); } } catch(...) { xorp_catch_standard_exceptions(); } // // Gracefully stop and exit xlog // xlog_stop(); xlog_exit(); debug_msg("Bye!\n"); return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -