⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 virtualsocketserver.cc

📁 本人收集整理的一份c/c++跨平台网络库
💻 CC
字号:
#include <algorithm>#include <cassert>#include <cmath>#include <cstring>#include <iostream>#include <vector>#include <errno.h>#include "virtualsocketserver.h"#include "common.h"#include "time.h"namespace utils_base {const uint32 HEADER_SIZE = 28; // IP + UDP headersconst uint32 MSG_ID_PACKET = 1;// TODO: Add a message type for new connections.// Packets are passed between sockets as messages.  We copy the data just like// the kernel does.class Packet : public MessageData {public:  Packet(const char* data, size_t size, const SocketAddress& from)        : size_(size), from_(from) {    assert(data);    assert(size_ >= 0);    data_ = new char[size_];    std::memcpy(data_, data, size_);  }  virtual ~Packet() {    delete data_;  }  const char* data() const { return data_; }  size_t size() const { return size_; }  const SocketAddress& from() const { return from_; }  // Remove the first size bytes from the data.  void Consume(size_t size) {    assert(size < size_);    size_ -= size;    char* new_data = new char[size_];    std::memcpy(new_data, data_, size);    delete[] data_;    data_ = new_data;  }private:  char* data_;  size_t size_;  SocketAddress from_;};// Implements the socket interface using the virtual network.  Packets are// passed as messages using the message queue of the socket server.class VirtualSocket : public AsyncSocket, public MessageHandler {public:  VirtualSocket(      VirtualSocketServer* server, int type, bool async, uint32 ip)      : server_(server), type_(type), async_(async), connected_(false),        local_ip_(ip), readable_(true), queue_size_(0) {    assert((type_ == SOCK_DGRAM) || (type_ == SOCK_STREAM));    packets_ = new std::vector<Packet*>();  }  ~VirtualSocket() {    Close();    for (unsigned i = 0; i < packets_->size(); i++)      delete (*packets_)[i];    delete packets_;  }  SocketAddress GetLocalAddress() const {    return local_addr_;  }  SocketAddress GetRemoteAddress() const {    return remote_addr_;  }  int Bind(const SocketAddress& addr) {    assert(addr.port() != 0);    int result = server_->Bind(addr, this);    if (result >= 0)      local_addr_ = addr;    else      error_ = EADDRINUSE;    return result;  }  int Connect(const SocketAddress& addr) {    assert(!connected_);    connected_ = true;    remote_addr_ = addr;    assert(type_ == SOCK_DGRAM); // stream not yet implemented    return 0;  }  int Close() {    if (!local_addr_.IsAny())      server_->Unbind(local_addr_, this);    connected_ = false;    local_addr_ = SocketAddress();    remote_addr_ = SocketAddress();    return 0;  }  int Send(const void *pv, size_t cb) {    assert(connected_);    return SendInternal(pv, cb, remote_addr_);  }  int SendTo(const void *pv, size_t cb, const SocketAddress& addr) {    assert(!connected_);    return SendInternal(pv, cb, addr);  }  int SendInternal(const void *pv, size_t cb, const SocketAddress& addr) {    // If we have not been assigned a local port, then get one.    if (local_addr_.IsAny()) {      local_addr_.SetIP(local_ip_);      int result = server_->Bind(this, &local_addr_);      if (result < 0) {        local_addr_.SetIP(0);        error_ = EADDRINUSE;        return result;      }    }    // Send the data in a message to the appropriate socket.    return server_->Send(this, pv, cb, local_addr_, addr);  }  int Recv(void *pv, size_t cb) {    SocketAddress addr;    return RecvFrom(pv, cb, &addr);  }  int RecvFrom(void *pv, size_t cb, SocketAddress *paddr) {    // If we don't have a packet, then either error or wait for one to arrive.    if (packets_->size() == 0) {      if (async_) {        error_ = EAGAIN;        return -1;      }      while (packets_->size() == 0) {        Message msg;        server_->msg_queue_->Get(&msg);        server_->msg_queue_->Dispatch(&msg);      }    }    // Return the packet at the front of the queue.    Packet* packet = packets_->front();    *paddr = packet->from();    int size = (int)packet->size();    if (size <= (int)cb) {      std::memcpy(pv, packet->data(), size);      packets_->erase(packets_->begin());      delete packet;      return size;    } else {      std::memcpy(pv, packet->data(), cb);      packet->Consume(cb);      return (int)cb;    }  }  int Listen(int backlog) {    assert(false); // not yet implemented    return 0;  }  Socket* Accept(SocketAddress *paddr) {    assert(false); // not yet implemented    return 0;  }  bool readable() { return readable_; }  void set_readable(bool value) { readable_ = value; }  bool writable() { return false; }  void set_writable(bool value) {    // TODO: Send ourselves messages (delayed after the first) to give them a    // chance to write.    assert(false);  }  int GetError() const {    return error_;  }  void SetError(int error) {    error_ = error;  }  ConnState GetState() const {    return connected_ ? CS_CONNECTED : CS_CLOSED;  }  int SetOption(Option opt, int value) {    return 0;  }  int EstimateMTU(uint16* mtu) {    if (!connected_)      return ENOTCONN;    else      return 65536;  }  void OnMessage(Message *pmsg) {    if (pmsg->message_id == MSG_ID_PACKET) {      assert(pmsg->pdata);      Packet* packet = static_cast<Packet*>(pmsg->pdata);      if (!readable_)        return;      packets_->push_back(packet);      if (async_) {        SignalReadEvent(this);        // TODO: If the listeners don't want to read this packet now, we will        // need to send ourselves delayed messages to try again.        assert(packets_->size() == 0);      }    } else {      assert(false);    }  }private:  struct QueueEntry {    uint32 size;    uint32 done_time;  };  typedef std::deque<QueueEntry> SendQueue;  VirtualSocketServer* server_;  int type_;  bool async_;  bool connected_;  uint32 local_ip_;  bool readable_;  SocketAddress local_addr_;  SocketAddress remote_addr_;  std::vector<Packet*>* packets_;  int error_;  SendQueue queue_;  uint32 queue_size_;  CriticalSection queue_crit_;  friend class VirtualSocketServer;};VirtualSocketServer::VirtualSocketServer()      : fWait_(false), wait_version_(0), next_ip_(1), next_port_(45000),        bandwidth_(0), queue_capacity_(64 * 1024), delay_mean_(0),        delay_stddev_(0), delay_dist_(0), drop_prob_(0.0) {  msg_queue_ = new MessageQueue(); // uses physical socket server for Wait  bindings_ = new AddressMap();  UpdateDelayDistribution();}VirtualSocketServer::~VirtualSocketServer() {  delete bindings_;  delete msg_queue_;  delete delay_dist_;}uint32 VirtualSocketServer::GetNextIP() {  return next_ip_++;}Socket* VirtualSocketServer::CreateSocket(int type) {  return CreateSocketInternal(type);}AsyncSocket* VirtualSocketServer::CreateAsyncSocket(int type) {  return CreateSocketInternal(type);}VirtualSocket* VirtualSocketServer::CreateSocketInternal(int type) {  uint32 ip = (next_ip_ > 1) ? next_ip_ - 1 : 1;  return new VirtualSocket(this, type, true, ip);}bool VirtualSocketServer::Wait(int cmsWait, bool process_io) {  ASSERT(process_io);  // This can't be easily supported.  uint32 msEnd;  if (cmsWait != kForever)    msEnd = GetMillisecondCount() + cmsWait;  uint32 cmsNext = cmsWait;  fWait_ = true;  wait_version_ += 1;  while (fWait_) {    Message msg;    if (!msg_queue_->Get(&msg, cmsNext))      return true;    msg_queue_->Dispatch(&msg);    if (cmsWait != kForever) {      uint32 msCur = GetMillisecondCount();      if (msCur >= msEnd)        return true;      cmsNext = msEnd - msCur;    }  }  return true;}const uint32 MSG_WAKE_UP = 1;struct WakeUpMessage : public MessageData {  WakeUpMessage(uint32 ver) : wait_version(ver) {}  virtual ~WakeUpMessage() {}  uint32 wait_version;};void VirtualSocketServer::WakeUp() {  msg_queue_->Post(this, MSG_WAKE_UP, new WakeUpMessage(wait_version_));}void VirtualSocketServer::OnMessage(Message* pmsg) {  assert(pmsg->message_id == MSG_WAKE_UP);  assert(pmsg->pdata);  WakeUpMessage* wmsg = static_cast<WakeUpMessage*>(pmsg->pdata);  if (wmsg->wait_version == wait_version_)    fWait_ = false;  delete pmsg->pdata;}int VirtualSocketServer::Bind(      const SocketAddress& addr, VirtualSocket* socket) {  assert(addr.ip() > 0); // don't support any-address right now  assert(addr.port() > 0);  assert(socket);  if (bindings_->find(addr) == bindings_->end()) {    (*bindings_)[addr] = socket;    return 0;  } else {    return -1;  }}int VirtualSocketServer::Bind(VirtualSocket* socket, SocketAddress* addr) {  assert(addr->ip() > 0); // don't support any-address right now  assert(socket);  for (int i = 0; i < 65536; i++) {    addr->SetPort(next_port_++);    if (addr->port() > 0) {      AddressMap::iterator iter = bindings_->find(*addr);      if (iter == bindings_->end()) {        (*bindings_)[*addr] = socket;        return 0;      }    }  }  errno = EADDRINUSE; // TODO: is there a better error number?  return -1;}int VirtualSocketServer::Unbind(      const SocketAddress& addr, VirtualSocket* socket) {  assert((*bindings_)[addr] == socket);  bindings_->erase(bindings_->find(addr));  return 0;}static double Random() {  return double(rand()) / RAND_MAX;}int VirtualSocketServer::Send(    VirtualSocket* socket, const void *pv, size_t cb,    const SocketAddress& local_addr, const SocketAddress& remote_addr) {  // See if we want to drop this packet.  if (Random() < drop_prob_) {    std::cerr << "Dropping packet: bad luck" << std::endl;    return 0;  }  uint32 cur_time = GetMillisecondCount();  uint32 send_delay;  // Determine whether we have enough bandwidth to accept this packet.  To do  // this, we need to update the send queue.  Once we know it's current size,  // we know whether we can fit this packet.  //  // NOTE: There are better algorithms for maintaining such a queue (such as  // "Derivative Random Drop"); however, this algorithm is a more accurate  // simulation of what a normal network would do.  {     CritScope cs(&socket->queue_crit_);    while ((socket->queue_.size() > 0) &&           (socket->queue_.front().done_time <= cur_time)) {      assert(socket->queue_size_ >= socket->queue_.front().size);      socket->queue_size_ -= socket->queue_.front().size;      socket->queue_.pop_front();    }    VirtualSocket::QueueEntry entry;    entry.size = uint32(cb) + HEADER_SIZE;    if (socket->queue_size_ + entry.size > queue_capacity_) {      std::cerr << "Dropping packet: queue capacity exceeded" << std::endl;      return 0; // not an error    }    socket->queue_size_ += entry.size;    send_delay = SendDelay(socket->queue_size_);    entry.done_time = cur_time + send_delay;    socket->queue_.push_back(entry);  }  // Find the delay for crossing the many virtual hops of the network.  uint32 transit_delay = GetRandomTransitDelay();  // Post the packet as a message to be delivered (on our own thread)  AddressMap::iterator iter = bindings_->find(remote_addr);  if (iter != bindings_->end()) {    Packet* p = new Packet(static_cast<const char*>(pv), cb, local_addr);    uint32 delay = send_delay + transit_delay;    msg_queue_->PostDelayed(delay, iter->second, MSG_ID_PACKET, p);  } else {    std::cerr << "No one listening at " << remote_addr.ToString() << std::endl;  }  return (int)cb;}uint32 VirtualSocketServer::SendDelay(uint32 size) {  if (bandwidth_ == 0)    return 0;  else    return 1000 * size / bandwidth_;}void PrintFunction(std::vector<std::pair<double,double> >* f) {  for (uint32 i = 0; i < f->size(); i++)    std::cout << (*f)[i].first << '\t' << (*f)[i].second << std::endl;}void VirtualSocketServer::UpdateDelayDistribution() {  Function* dist = GetDelayDistribution();  dist = Resample(Invert(Accumulate(dist)), 0, 1);  // We take a lock just to make sure we don't leak memory.  {    CritScope cs(&delay_crit_);    delete delay_dist_;    delay_dist_ = dist;  }}const int NUM_SAMPLES = 100; // 1000;static double PI = 4 * std::atan(1.0);static double Normal(double x, double mean, double stddev) {  double a = (x - mean) * (x - mean) / (2 * stddev * stddev);  return std::exp(-a) / (stddev * sqrt(2 * PI));}#if 0 // static unused gives a warningstatic double Pareto(double x, double min, double k) {  if (x < min)    return 0;  else    return k * std::pow(min, k) / std::pow(x, k+1);}#endifVirtualSocketServer::Function* VirtualSocketServer::GetDelayDistribution() {  Function* f = new Function();  if (delay_stddev_ == 0) {    f->push_back(Point(delay_mean_, 1.0));  } else {    double start = 0;    if (delay_mean_ >= 4 * double(delay_stddev_))      start = delay_mean_ - 4 * double(delay_stddev_);    double end = delay_mean_ + 4 * double(delay_stddev_);    double delay_min = 0;    if (delay_mean_ >= 1.0 * delay_stddev_)      delay_min = delay_mean_ - 1.0 * delay_stddev_;    for (int i = 0; i < NUM_SAMPLES; i++) {      double x = start + (end - start) * i / (NUM_SAMPLES - 1);      double y = Normal(x, delay_mean_, delay_stddev_);      f->push_back(Point(x, y));    }  }  return f;}uint32 VirtualSocketServer::GetRandomTransitDelay() {  double delay = (*delay_dist_)[rand() % delay_dist_->size()].second;  return uint32(delay);}struct FunctionDomainCmp {  bool operator ()(const VirtualSocketServer::Point& p1, const VirtualSocketServer::Point& p2) {    return p1.first < p2.first;  }  bool operator ()(double v1, const VirtualSocketServer::Point& p2) {    return v1 < p2.first;  }  bool operator ()(const VirtualSocketServer::Point& p1, double v2) {    return p1.first < v2;  }};VirtualSocketServer::Function* VirtualSocketServer::Accumulate(Function* f) {  assert(f->size() >= 1);  double v = 0;  for (Function::size_type i = 0; i < f->size() - 1; ++i) {    double dx = (*f)[i].second * ((*f)[i+1].first - (*f)[i].first);    v = (*f)[i].second = v + dx;  }  (*f)[f->size()-1].second = v;  return f;}VirtualSocketServer::Function* VirtualSocketServer::Invert(Function* f) {  for (Function::size_type i = 0; i < f->size(); ++i)    std::swap((*f)[i].first, (*f)[i].second);  std::sort(f->begin(), f->end(), FunctionDomainCmp());  return f;}VirtualSocketServer::Function* VirtualSocketServer::Resample(    Function* f, double x1, double x2) {  Function* g = new Function();  for (int i = 0; i < NUM_SAMPLES; i++) {    double x = x1 + (x2 - x1) * i / (NUM_SAMPLES - 1);    double y = Evaluate(f, x);    g->push_back(Point(x, y));  }  delete f;  return g;}double VirtualSocketServer::Evaluate(Function* f, double x) {  Function::iterator iter =      std::lower_bound(f->begin(), f->end(), x, FunctionDomainCmp());  if (iter == f->begin()) {    return (*f)[0].second;  } else if (iter == f->end()) {    assert(f->size() >= 1);    return (*f)[f->size() - 1].second;  } else if (iter->first == x) {    return iter->second;  } else {    double x1 = (iter - 1)->first;    double y1 = (iter - 1)->second;    double x2 = iter->first;    double y2 = iter->second;    return y1 + (y2 - y1) * (x - x1) / (x2 - x1);  }}} // namespace utils_base

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -