⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 iocp.cpp

📁 基于完成端口的TCP网络通信框架实现 工程iocp中包含了框架实现的所有代码
💻 CPP
字号:
#include <process.h>
#include <set>
#include <string>
#include "iocp.h"

using std::set;
using std::pair;
using std::string;

#pragma comment(lib,"wsock32")
#pragma comment(lib,"Ws2_32")

/***************************************
              NetIOCPBase
***************************************/

//worker接收到的操作
enum NET_OPTYPE
{
  NOT_SOCKNEW,   //新socket
  NOT_SOCKCLOSE, //关闭socket
  NOT_DATASEND,  //发送数据
  NOT_DATARECV,  //接收数据
  NOT_SHUTDOWN,  //关闭server
};

int NetIOCPBase::ref=0;

static void stopTCP(SOCKET s)
{
	//int b;
	//char buf[256];

  shutdown(s, SD_SEND);
	//while ((b = recv(s, buf, 256, 0))!=0)
  //{	
  //  if (b==SOCKET_ERROR)
	//		break;
  //}
	closesocket(s);
}

void NetIOCPBase::senddata(SOCKET s,PerIoData *iodata)
{
  assert(iodata->len<=NET_BLOCK_SIZE&&iodata->len>0);
  iodata->sock=s;
  map<SOCKET,Channel *>::iterator iter;
  mmsend_pool_mutex.Lock();
  iter=mmsend_pool.find(s);
  if(mmsend_pool.end()==iter)
  {
    mmsend_pool_mutex.Unlock();
    delete iodata;
    return;
  }
  Channel *chan=iter->second;
  chan->sendqueue.push_back(iodata);
  if(chan->busy)
  {
    mmsend_pool_mutex.Unlock();
    return;
  }
  iodata=chan->sendqueue.front();
  chan->sendqueue.pop_front();
  chan->busy=true;
  mmsend_pool_mutex.Unlock();
  aio_send(iodata->sock,iodata);
}

void NetIOCPBase::wkshutdown()
{
  set<SOCKET> clients;
  {
    map<SOCKET,Channel *>::iterator iter;
    Locker lock(&mmsend_pool_mutex);
    for_each_iter(mmsend_pool,iter)
    {
      Channel *chan=iter->second;
      list<PerIoData *>::iterator dataiter;
      for_each_iter(chan->sendqueue,dataiter)
      {
        delete *dataiter;
      }
      clients.insert(chan->sock);
      delete chan;
    }
    mmsend_pool.clear();
  }
  set<SOCKET>::iterator iter;
  for_each_iter(clients,iter)
  {
    close(*iter);
  }
  for(int i=0;i<(int)mwthread.size();++i)
  {
    PerIoData *iodata=new PerIoData;
    memset(&iodata->overlapped,0,sizeof(OVERLAPPED));
    iodata->sock=INVALID_SOCKET;
    iodata->op_type=NOT_SHUTDOWN;
    BOOL ok=PostQueuedCompletionStatus(miocp,1,iodata->sock,(LPOVERLAPPED)iodata);
    assert(ok);
  }
  for(int i=0;i<(int)mwthread.size();++i)
  {
    WaitForSingleObject(mwthread[i], INFINITE);
    CloseHandle(mwthread[i]);
  }
}

void NetIOCPBase::runloop()
{
  recv_allnetevents();
  NetEvent ne;
  while(peek_netevent(&ne))
  {
    switch(ne.event)
    {
    case NE_SOCKNEW:
      net_open_handler(ne.iodata->sock);
      aio_recv(ne.iodata->sock,ne.iodata);
      break;
    case NE_SOCKCLOSED:
      net_closed_handler(ne.iodata->sock);
      delete ne.iodata;
      break;
    case NE_DATARECVED:
      net_data_handler(ne.iodata);
      delete ne.iodata;
      break;
    default:
      assert(0);
      delete ne.iodata;
      break;
    }
  }
}



bool NetIOCPBase::delete_sendqueue(SOCKET s)
{
  Channel *chan=NULL;
  {
    map<SOCKET,Channel *>::iterator iter;
    Locker lock(&mmsend_pool_mutex);
    iter=mmsend_pool.find(s);
    if(iter!=mmsend_pool.end())
    {
      chan=iter->second;
      mmsend_pool.erase(iter);
    }
  }
  if(chan!=NULL)
  {
    list<PerIoData *>::iterator iter;
    for_each_iter(chan->sendqueue,iter)
    {
      delete *iter;
    }
    delete chan;
  }
  return chan!=NULL;
}

bool NetIOCPBase::create_sendqueue(SOCKET s)
{
  typedef map<SOCKET,Channel *>::iterator ChanMapIter;
  pair<ChanMapIter,bool> ret;
  Channel *chan=new Channel;
  chan->sock=s;
  chan->busy=false;
  {
    Locker lock(&mmsend_pool_mutex);
    ret=mmsend_pool.insert(make_pair(chan->sock,chan));
  }
  if(!ret.second)
  {
    delete chan;
  }
  return ret.second;
}

void NetIOCPBase::worker_func()
{
  SOCKET sock;
  PerIoData *iodata;
  DWORD bytes;
  bool running=true;
  while (running)
  {
    bytes=0;
    BOOL ret = GetQueuedCompletionStatus(miocp, &bytes, (LPDWORD) & sock,
                                         (LPOVERLAPPED *) & iodata, INFINITE);
    if((!ret&&iodata!=NULL) || bytes==0)
    {
      assert(iodata->sock==sock);
      if(delete_sendqueue(iodata->sock))
      {
        stopTCP(iodata->sock);
        NetEvent ne;
        ne.event=NE_SOCKCLOSED;
        ne.iodata=iodata;
        post_netevent(ne);
      }
      else
      {
        delete iodata;
      }
      continue;
    }
    assert(iodata!=NULL);
    assert(iodata->sock==sock);
    switch(iodata->op_type)
    {
    case NOT_SOCKNEW:
      {
        bool ok=create_sendqueue(iodata->sock);
        assert(ok);
        NetEvent ne;
        ne.event=NE_SOCKNEW;
        ne.iodata=iodata;
        post_netevent(ne);
      }
      break;
    case NOT_SOCKCLOSE:
      {
        stopTCP(iodata->sock);
        NetEvent ne;
        ne.event=NE_SOCKCLOSED;
        ne.iodata=iodata;
        post_netevent(ne);
      }
      break;
    case NOT_DATASEND:
      {
        if(bytes<iodata->databuf.len)
        {
          aio_resend(iodata,bytes);
          break;
        }
        SOCKET s=iodata->sock;
        delete iodata;
        map<SOCKET,Channel *>::iterator iter;
        {
          Locker lock(&mmsend_pool_mutex);
          iter=mmsend_pool.find(s);
          if(iter==mmsend_pool.end())
            break;
          Channel *chan=iter->second;
          if(chan->sendqueue.empty())
          {  
            chan->busy=false;
            break;
          }
          iodata=chan->sendqueue.front();
          chan->sendqueue.pop_front();
        }
        aio_send(iodata->sock,iodata);
      }
      break;
    case NOT_DATARECV:
      {
        NetEvent ne;
        ne.event=NE_DATARECVED;
        ne.iodata=iodata;
        SOCKET s=iodata->sock;
        iodata->len=bytes;
        post_netevent(ne);
        iodata=new PerIoData;
        aio_recv(s,iodata);
      }
      break;
    case NOT_SHUTDOWN:
      {
        running=false;
        delete iodata;
      }
      break;
    default:
      assert(0);
      break;
    }
  }
}

void NetIOCPBase::aio_send(SOCKET s,PerIoData *iodata)
{
  memset(&iodata->overlapped,0,sizeof(OVERLAPPED));
  iodata->sock=s;
  iodata->op_type=NOT_DATASEND;
  iodata->databuf.buf=iodata->data;
  iodata->databuf.len=iodata->len;
  DWORD bytes = 0;
  DWORD flags = 0;
  int ret = WSASend(iodata->sock, &iodata->databuf, 1, &bytes, flags,
                    (LPWSAOVERLAPPED)iodata, NULL);
  //发送失败,通知关闭连接
  if (ret == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING)
  {
    BOOL ok=PostQueuedCompletionStatus(miocp, 0, iodata->sock, (LPOVERLAPPED)iodata);
    assert(ok);
  }
}

void NetIOCPBase::aio_recv(SOCKET s,PerIoData *iodata)
{
  memset(&iodata->overlapped,0,sizeof(OVERLAPPED));
  iodata->sock=s;
  iodata->op_type=NOT_DATARECV;
  iodata->databuf.buf=iodata->data;
  iodata->databuf.len=sizeof(iodata->data);
  DWORD bytes = 0;
  DWORD flags = 0;
  int ret = WSARecv(iodata->sock, &iodata->databuf, 1, &bytes, &flags,
                    (LPWSAOVERLAPPED)iodata, NULL);
  if (ret == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING)
  {
    BOOL ok=PostQueuedCompletionStatus(miocp, 0, iodata->sock, (LPOVERLAPPED)iodata);
    assert(ok);
  }
}

void NetIOCPBase::aio_resend(PerIoData *iodata,int lastlen)
{
  assert(lastlen<(int)iodata->databuf.len);
  iodata->databuf.buf+=lastlen;
  iodata->databuf.len-=lastlen;
  DWORD bytes = 0;
  DWORD flags = 0;
  int ret = WSASend(iodata->sock, &iodata->databuf, 1, &bytes, flags,
                    (LPWSAOVERLAPPED)iodata, NULL);
  //发送失败,通知关闭连接
  if (ret == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING)
  {
    BOOL ok=PostQueuedCompletionStatus(miocp, 0, iodata->sock, (LPOVERLAPPED)iodata);
    assert(ok);
  }
}

void NetIOCPBase::close(SOCKET s)
{
  if(delete_sendqueue(s))
  {
    PerIoData *iodata=new PerIoData;
    memset(&iodata->overlapped,0,sizeof(OVERLAPPED));
    iodata->sock=s;
    iodata->op_type=NOT_SOCKCLOSE;
    BOOL ok=PostQueuedCompletionStatus(miocp, 1, iodata->sock, (LPOVERLAPPED)iodata);
    assert(ok);
  }
}

NetIOCPBase::NetIOCPBase()
{
  if(ref==0)
  {
    WSADATA data;
    if (WSAStartup(0x0202, &data) != 0)
    {
      throw string("WSAStartup fail!");
    }
  }
  started=false;
  miocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
  if (NULL == miocp)
    throw string("CreateIoCompletionPort fail!");
  ++ref;
}
NetIOCPBase::~NetIOCPBase()
{
  if(started)
  {  
    stop();
    started=false;
  }
  if(miocp!=NULL)
  {  
    CloseHandle(miocp);
    miocp=NULL;
  }
  if(ref==1)
  {
    WSACleanup();
  }
  --ref;
}
void NetIOCPBase::start()
{
  if(!started)
  {
    SYSTEM_INFO sys_info;
    GetSystemInfo(&sys_info);
    for (int i = 0;i < (int)sys_info.dwNumberOfProcessors*2;++i)
    {
      HANDLE thread = (HANDLE)_beginthreadex(NULL, 0, completion_port_worker_thread, this, 0, NULL);
      if (NULL == thread)
        throw string("_beginthreadex,completion_port_worker_thread,fail!");
      mwthread.push_back(thread);
    }
    start_run();
    started=true;
  }
}
void NetIOCPBase::stop()
{
  if(started)
  {
    stop_run();
    wkshutdown();
    started=false;
  }
}

unsigned int __stdcall NetIOCPBase::completion_port_worker_thread(void *cookie)
{
  ((NetIOCPBase *)cookie)->worker_func();
  _endthreadex(0);
  return 0;
}

/***************************************
              NetIOCPServer
***************************************/
void NetIOCPServer::accept_func()
{
  while(1)
  {
    SOCKET client = accept(mserver, NULL, NULL);
    if (client == INVALID_SOCKET)
    {
      //如果服务器套接字已经关闭,退出线程
      int ret = WSAGetLastError();
      if (ret == WSAEINTR   ||
          ret == WSAENOTSOCK||
          ret == WSANOTINITIALISED)
      {
        break;
      }
      assert(0);
      continue;
    }
    HANDLE port = CreateIoCompletionPort((HANDLE)client, miocp, (DWORD)client, 0);
    //如果不能将套接字关联到完成端口
    if (port == NULL)
    {
      stopTCP(client);
      continue;
    }
    PerIoData *iodata=new PerIoData;
    memset(&iodata->overlapped,0,sizeof(OVERLAPPED));
    iodata->sock=client;
    iodata->op_type=NOT_SOCKNEW;
    BOOL ok=PostQueuedCompletionStatus(miocp,1,iodata->sock,(LPOVERLAPPED)iodata);
    assert(ok);
  }
}

NetIOCPServer::NetIOCPServer(unsigned short port)
{
  mport=port;
  mserver= WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
  if (mserver == INVALID_SOCKET)
    throw string("WSASocket fail!");
  sockaddr_in addr;
  addr.sin_family = AF_INET;
  addr.sin_addr.s_addr = htonl(INADDR_ANY);
  addr.sin_port = htons(mport);
  if (bind(mserver, (sockaddr *)&addr, sizeof(addr)) != 0)
    throw string("bind fail!");
  if (listen(mserver, SOMAXCONN) != 0)
    throw string("listen fail!");
  mathread=NULL;
}

NetIOCPServer::~NetIOCPServer()
{
  stop_run();
}

void NetIOCPServer::start_run()
{
  mathread=(HANDLE)_beginthreadex(NULL, 0, completion_port_accept_thread, this, 0, NULL);
  if(mathread==NULL)
    throw string("_beginthreadex,completion_port_accept_thread,fail!");
}
void NetIOCPServer::stop_run()
{
  if(mserver!=INVALID_SOCKET)
  {
    closesocket(mserver);
    mserver=INVALID_SOCKET;
    if(mathread!=NULL)
    {
      WaitForSingleObject(mathread,INFINITE);
      CloseHandle(mathread);
      mathread=NULL;
    }
  }
}
unsigned int __stdcall NetIOCPServer::completion_port_accept_thread(void *cookie)
{
  ((NetIOCPServer *)cookie)->accept_func();
  _endthreadex(0);
  return 0;
}

/***************************************
              NetIOCPClient
***************************************/
//
// 注意: connect 成功后
// 参数SOCKET返回连接的socket,此时底层尚未完成对该socket的资源分配
// iocp完成资源分配后会在net_open_handler里面将该socket返回
//
bool NetIOCPClient::connect(const char *dotip,unsigned short port,SOCKET *ss)
{
  SOCKET s = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
  if (s == INVALID_SOCKET)
  {
    return false;
  }

  sockaddr_in addr;
  addr.sin_family = AF_INET;
  addr.sin_addr.s_addr = inet_addr(dotip);
  addr.sin_port = htons(port);
  if (::connect(s, (sockaddr *)&addr, sizeof(addr)) != 0)
  {
    closesocket(s);
    return false;
  }

  HANDLE hdl = CreateIoCompletionPort((HANDLE)s, miocp, (DWORD)s, 0);
  if (hdl == NULL)
  {
    stopTCP(s);
    return false;
  }
  PerIoData *iodata=new PerIoData;
  memset(&iodata->overlapped,0,sizeof(OVERLAPPED));
  iodata->sock=s;
  iodata->op_type=NOT_SOCKNEW;
  BOOL ok=PostQueuedCompletionStatus(miocp, 1, iodata->sock, (LPOVERLAPPED)iodata);
  assert(ok);
  if(ss)
    *ss=s;
  return true;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -