⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 socketserver.cpp

📁 这个动态链接库是Socket通过COm串口实现数据通信
💻 CPP
📖 第 1 页 / 共 2 页
字号:
///////////////////////////////////////////////////////////////////////////////
//
// File           : $Workfile: SocketServer.cpp $
// Version        : $Revision: 23 $
// Function       : 
//
// Author         : $Author: Len $
// Date           : $Date: 7/06/02 14:15 $
//
// Notes          : 
//
// Modifications  :
//
// $Log: /Web Articles/SocketServers/SimpleProtocolServer2/JetByteTools/Win32Tools/SocketServer.cpp $
// 
// 23    7/06/02 14:15 Len
// Changes due to change in CIOBuffer. The buffer now derives from
// OVERLAPPED so the explicit conversion functions are no longer required.
// 
// 22    5/06/02 19:17 Len
// Abortive socket closure is now done by an IO pool worker thread. This
// is a workaround for a problem with the COM wrapper.
// 
// 21    29/05/02 12:05 Len
// Lint issues.
// 
// 20    26/05/02 15:10 Len
// Factored out common 'user data' code into a mixin base class.
// 
// 19    24/05/02 12:13 Len
// Refactored all the linked list stuff for the sockets into a NodeList
// class.
// 
// 18    21/05/02 11:36 Len
// User data can now be stored/retrieved as either an unsigned long or a
// void *.
// A CIOBuffer containing the client's address is now passed with
// OnConnectionEstablished().
// 
// 17    21/05/02 8:33 Len
// Allow derived class to flush buffer allocator in destructor so that it
// can receive notifications about buffer release.
// 
// 16    21/05/02 8:05 Len
// SocketServer now derives from the buffer allocator.
// 
// 15    20/05/02 23:17 Len
// Updated copyright and disclaimers.
// 
// 14    20/05/02 17:26 Len
// Merged OnNewConnection() into OnConnectionEstablished(). 
// We now pass the socket to OnConnectionClosed() so that the derived
// class can dealocate any per connection user data when the connection is
// closed.
// 
// 13    20/05/02 14:45 Len
// SocketServer doesn't need to pass allocator to WorkerThread.
// 
// 12    20/05/02 14:38 Len
// WorkerThread never needs to use the allocator.
// 
// 11    20/05/02 8:09 Len
// Moved the concept of the io operation used for the io buffer into the
// socket server. The io buffer now simply presents 'user data' access
// functions. Added a similar concept of user data to the socket class so
// that users can associate their own data with a connection . Derived
// class is now notified when a connection occurs so that they can send a
// greeting or request a read, etc. 
// General code cleanup and refactoring.
// 
// 10    16/05/02 21:35 Len
// Users now signal that we're finished with a socket by calling
// Shutdown() rather than Close().
// 
// 9     15/05/02 11:07 Len
// TX and RX data logging are now wrapped in a DEBUG_ONLY() macro as the
// call to DumpData() was occurring even though the output wasnt being
// logged. This change almost doubled the throughput of the server...
// 
// 8     15/05/02 10:45 Len
// Enabled TX and RX data logging in debug build
// 
// 7     14/05/02 14:37 Len
// Expose CThread::Start() using a using declaration rather than a
// forwarding function.
// Lint cleanup.
// 
// 6     14/05/02 13:53 Len
// We now explicitly start the thread pool rather than allowing it to
// start itself in the constructor. There was a race condition over the
// completion of construction of derived classes and the first access to
// the pure virtual functions.
// Refactored some of the socket code to improve encapsulation.
// 
// 5     13/05/02 13:44 Len
// Added OnError() methods so that derived class can do something about
// obscure error situations.
// Added a 'max free sockets' concept so that the socket pool can shrink
// as well as grow. This exposed a problem in how we were handling sockets
// - knowing when we can actually delete them was complicated so they're
// now reference counted.
// 
// 4     11/05/02 11:05 Len
// Removed CreateListeningSocket() as it's now the responsibility of the
// derived class. General code cleaning.
// 
// 3     10/05/02 19:52 Len
// Bug fix. During the code cleanup we'd renamed most, but not all
// instances of 'socket'... 
// 
// 2     10/05/02 19:25 Len
// Lint options and code cleaning.
// 
// 1     9/05/02 18:47 Len
// 
///////////////////////////////////////////////////////////////////////////////
//
// Copyright 1997 - 2002 JetByte Limited.
//
// JetByte Limited grants you ("Licensee") a non-exclusive, royalty free, 
// licence to use, modify and redistribute this software in source and binary 
// code form, provided that i) this copyright notice and licence appear on all 
// copies of the software; and ii) Licensee does not utilize the software in a 
// manner which is disparaging to JetByte Limited.
//
// This software is provided "as is" without a warranty of any kind. All 
// express or implied conditions, representations and warranties, including
// any implied warranty of merchantability, fitness for a particular purpose
// or non-infringement, are hereby excluded. JetByte Limited and its licensors 
// shall not be liable for any damages suffered by licensee as a result of 
// using, modifying or distributing the software or its derivatives. In no
// event will JetByte Limited be liable for any lost revenue, profit or data,
// or for direct, indirect, special, consequential, incidental or punitive
// damages, however caused and regardless of the theory of liability, arising 
// out of the use of or inability to use software, even if JetByte Limited 
// has been advised of the possibility of such damages.
//
// This software is not designed or intended for use in on-line control of 
// aircraft, air traffic, aircraft navigation or aircraft communications; or in 
// the design, construction, operation or maintenance of any nuclear 
// facility. Licensee represents and warrants that it will not use or 
// redistribute the Software for such purposes. 
//
///////////////////////////////////////////////////////////////////////////////

#include "SocketServer.h"
#include "IOCompletionPort.h"
#include "Win32Exception.h"
#include "Utils.h"
#include "SystemInfo.h"

#include <vector>

#pragma comment(lib, "ws2_32.lib")

///////////////////////////////////////////////////////////////////////////////
// Lint options
//
//lint -save
//
// Symbol did not appear in the constructor initialiser list 
//lint -esym(1928, CThread)
//lint -esym(1928, CUsesWinsock)  
//lint -esym(1928, Node)  
//lint -esym(1928, COpaqueUserData)
//
// Symbol's default constructor implicitly called
//lint -esym(1926, CSocketServer::m_listManipulationSection)
//lint -esym(1926, CSocketServer::m_shutdownEvent)
//lint -esym(1926, CSocketServer::m_acceptConnectionsEvent)
//lint -esym(1926, CSocketServer::m_activeList)
//lint -esym(1926, CSocketServer::m_freeList)
//
// Member not defined
//lint -esym(1526, CSocketServer::CSocketServer)
//lint -esym(1526, CSocketServer::operator=)
//lint -esym(1526, Socket::Socket)
//lint -esym(1526, Socket::operator=)
//lint -esym(1526, WorkerThread::WorkerThread)
//lint -esym(1526, WorkerThread::operator=)
//
//lint -esym(534, InterlockedIncrement)   ignoring return value
//
///////////////////////////////////////////////////////////////////////////////

///////////////////////////////////////////////////////////////////////////////
// Using directives
///////////////////////////////////////////////////////////////////////////////

using std::vector;

///////////////////////////////////////////////////////////////////////////////
// Namespace: JetByteTools::Win32
///////////////////////////////////////////////////////////////////////////////

namespace JetByteTools {
namespace Win32 {

///////////////////////////////////////////////////////////////////////////////
// Static helper methods
///////////////////////////////////////////////////////////////////////////////

static size_t CalculateNumberOfThreads(
   size_t numThreads);

///////////////////////////////////////////////////////////////////////////////
// Local enums
///////////////////////////////////////////////////////////////////////////////

enum IO_Operation 
{ 
   IO_Read_Request, 
   IO_Read_Completed, 
   IO_Write_Request, 
   IO_Write_Completed,
   IO_Close
};

///////////////////////////////////////////////////////////////////////////////
// CSocketServer
///////////////////////////////////////////////////////////////////////////////

CSocketServer::CSocketServer(
   unsigned long addressToListenOn,
   unsigned short portToListenOn,
   size_t maxFreeSockets,
   size_t maxFreeBuffers,
   size_t bufferSize /* = 1024 */,
   size_t numThreads /* = 0 */)
   :  CIOBuffer::Allocator(bufferSize, maxFreeBuffers),
      m_numThreads(CalculateNumberOfThreads(numThreads)),
      m_listeningSocket(INVALID_SOCKET),
      m_iocp(0),
      m_address(addressToListenOn),
      m_port(portToListenOn),
      m_maxFreeSockets(maxFreeSockets)
{
}

CSocketServer::~CSocketServer()
{
   try
   {
      ReleaseSockets();
   }
   catch(...)
   {
   }
}

void CSocketServer::ReleaseSockets()
{
   CCriticalSection::Owner lock(m_listManipulationSection);

   Socket *pSocket = m_activeList.Head();

   while (pSocket)
   {
      Socket *pNext = SocketList::Next(pSocket);

      pSocket->Close();
   
      pSocket = pNext;
   }
      
   while (m_activeList.Head())
   {
      ReleaseSocket(m_activeList.Head());
   }

   while (m_freeList.Head())
   {
      DestroySocket(m_freeList.PopNode());
   }

   if (m_freeList.Count() + m_freeList.Count() != 0)
   {
      //lint -e{1933} call to unqualified virtual function
      OnError(_T("CSocketServer::ReleaseSockets() - Leaked sockets"));
   }
}

void CSocketServer::ReleaseBuffers()
{
   Flush();
}

void CSocketServer::StartAcceptingConnections()
{
   if (m_listeningSocket == INVALID_SOCKET)
   {
      //lint -e{1933} call to unqualified virtual function
      OnStartAcceptingConnections();

      //lint -e{1933} call to unqualified virtual function
      m_listeningSocket = CreateListeningSocket(m_address, m_port);
   
      m_acceptConnectionsEvent.Set();
   }
}

void CSocketServer::StopAcceptingConnections()
{
   if (m_listeningSocket != INVALID_SOCKET)
   {
      m_acceptConnectionsEvent.Reset();

      if (0 != ::closesocket(m_listeningSocket))
      {
         //lint -e{1933} call to unqualified virtual function
         OnError(_T("CSocketServer::StopAcceptingConnections() - closesocket - ") + GetLastErrorMessage(::WSAGetLastError()));
      }

      m_listeningSocket = INVALID_SOCKET;

      //lint -e{1933} call to unqualified virtual function
      OnStopAcceptingConnections();
   }
}

void CSocketServer::InitiateShutdown()
{
   // signal that the dispatch thread should shut down all worker threads and then exit

   StopAcceptingConnections();

   m_shutdownEvent.Set();

      //lint -e{1933} call to unqualified virtual function
   OnShutdownInitiated();
}

void CSocketServer::WaitForShutdownToComplete()
{
   // if we havent already started a shut down, do so...

   InitiateShutdown();

   Wait();
}

int CSocketServer::Run()
{
   try
   {
      vector<WorkerThread *> workers;
      
      workers.reserve(m_numThreads);
      
      for (size_t i = 0; i < m_numThreads; ++i)
      {
         //lint -e{1933} call to unqualified virtual function
         WorkerThread *pThread = CreateWorkerThread(m_iocp); 

         workers.push_back(pThread);

         pThread->Start();
      }

      HANDLE handlesToWaitFor[2];

      handlesToWaitFor[0] = m_shutdownEvent.GetEvent();
      handlesToWaitFor[1] = m_acceptConnectionsEvent.GetEvent();

      while (!m_shutdownEvent.Wait(0))
      {
         DWORD waitResult = ::WaitForMultipleObjects(2, handlesToWaitFor, false, INFINITE);

         if (waitResult == WAIT_OBJECT_0)
         {
            // Time to shutdown
            break;
         }
         else if (waitResult == WAIT_OBJECT_0 + 1)
         {
            // accept connections

            while (!m_shutdownEvent.Wait(0) && m_acceptConnectionsEvent.Wait(0))
            {
               CIOBuffer *pAddress = Allocate();

               int addressSize = (int)pAddress->GetSize();

               SOCKET acceptedSocket = ::WSAAccept(
                  m_listeningSocket, 
                  reinterpret_cast<sockaddr*>(const_cast<BYTE*>(pAddress->GetBuffer())), 
                  &addressSize, 
                  0, 
                  0);

               pAddress->Use(addressSize);

               if (acceptedSocket != INVALID_SOCKET)
               {
                  Socket *pSocket = AllocateSocket(acceptedSocket);
               
                  //lint -e{1933} call to unqualified virtual function
                  OnConnectionEstablished(pSocket, pAddress);
               }
               else if (m_acceptConnectionsEvent.Wait(0))
               {
                  //lint -e{1933} call to unqualified virtual function
                  OnError(_T("CSocketServer::Run() - WSAAccept:") + GetLastErrorMessage(::WSAGetLastError()));
               }

               pAddress->Release();
            }
         }
         else
         {
            //lint -e{1933} call to unqualified virtual function
            OnError(_T("CSocketServer::Run() - WaitForMultipleObjects: ") + GetLastErrorMessage(::GetLastError()));
         }
      }

      for (i = 0; i < m_numThreads; ++i)
      {
         workers[i]->InitiateShutdown();
      }  

      for (i = 0; i < m_numThreads; ++i)
      {
         workers[i]->WaitForShutdownToComplete();

         delete workers[i];

         workers[i] = 0;
      }  
   }
   catch(const CException &e)
   {
      //lint -e{1933} call to unqualified virtual function
      OnError(_T("CSocketServer::Run() - Exception: ") + e.GetWhere() + _T(" - ") + e.GetMessage());
   }
   catch(...)
   {
      //lint -e{1933} call to unqualified virtual function
      OnError(_T("CSocketServer::Run() - Unexpected exception"));
   }

   //lint -e{1933} call to unqualified virtual function
   OnShutdownComplete();

   return 0;
}

CSocketServer::Socket *CSocketServer::AllocateSocket(
   SOCKET theSocket)
{
   CCriticalSection::Owner lock(m_listManipulationSection);

   Socket *pSocket = 0;

   if (!m_freeList.Empty())
   {
      pSocket = m_freeList.PopNode();

      pSocket->Attach(theSocket);

      pSocket->AddRef();
   }
   else
   {
      pSocket = new Socket(*this, theSocket);

      //lint -e{1933} call to unqualified virtual function
      OnConnectionCreated();
   }

   m_activeList.PushNode(pSocket);

   //lint -e{611} suspicious cast
   m_iocp.AssociateDevice(reinterpret_cast<HANDLE>(theSocket), (ULONG_PTR)pSocket);

   return pSocket;
}

void CSocketServer::ReleaseSocket(Socket *pSocket)
{
   if (!pSocket)
   {
      throw CException(_T("CSocketServer::ReleaseSocket()"), _T("pSocket is null"));
   }

   CCriticalSection::Owner lock(m_listManipulationSection);

   pSocket->RemoveFromList();

   if (m_maxFreeSockets == 0 || 
       m_freeList.Count() < m_maxFreeSockets)
   {
      m_freeList.PushNode(pSocket);
   }
   else
   {
      DestroySocket(pSocket);
   }
}

void CSocketServer::DestroySocket(
   Socket *pSocket)
{
   delete pSocket;

   //lint -e{1933} call to unqualified virtual function
   OnConnectionDestroyed();
}

void CSocketServer::PostAbortiveClose(
   Socket *pSocket)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -