📄 warsocketiowin32nt.cpp
字号:
#include "StdAfx.h"#include "WarSocketIoWin32Nt.h" // class implemented#include "Mswsock.h"#ifndef WAR_SVR_WIN32_NT_ENGINE_H# include "WarSvrWin32NtEngine.h"#endif#ifndef WAR_AUTO_LOCK_H# include "WarAutoLock.h"#endif#ifndef WAR_LOG_H# include "WarLog.h"#endif#ifndef WAR_AUTO_LOCK_H# include "WarAutoLock.h"#endif#ifndef WAR_PERFMON_DEF_H# include "WarPerfmonDef.h"#endif#ifndef WAR_FUNCTION_PROFILER_H# include "WarFunctionProfiler.h"#endif#define AUTO_LOCK WarAutoLock my_lock(mLock);/////////////////////////////// PUBLIC ///////////////////////////////////////WarSocketIoWin32Nt::ol_result_t *WarSocketIoWin32Nt::mspBufferList;WarCriticalSection WarSocketIoWin32Nt::msBufferLock;//============================= LIFECYCLE ====================================WarSocketIoWin32Nt::WarSocketIoWin32Nt(){ AddLogIdentifierTag("Win32Nt");}// WarSocketIoWin32NtWarSocketIoWin32Nt::~WarSocketIoWin32Nt(){}// ~WarSocketIoWin32Nt//============================= OPERATORS ====================================//============================= OPERATIONS ===================================void WarSocketIoWin32Nt::SendWithCallback( war_transfer_buffer_ptr_t& outBuffer, size_t numSegments) throw(WarException){ WAR_PROFILE_FUNC(WAR_FP_WarSocketIoWin32Nt_SendWithCallback); DWORD dwBytes = 0; WarTransferBuffer *pbuffer = &(*outBuffer); if (numSegments > 1) { if (NULL == pbuffer->mpLock) pbuffer->mpLock = new WarCriticalSection; } if (pbuffer->mpLock) pbuffer->mpLock->Lock(); char *p = pbuffer->GetBuffer(); size_t segment_size = pbuffer->GetLength() / numSegments; size_t bytes = 0; size_t bytes_left = pbuffer->GetBytesUsed(); assert((segment_size * numSegments) == pbuffer->GetLength()); pbuffer->mBytesUsed = 0; // Prepere for data read/written for(unsigned segment = 0; ; segment++) { ol_result_t *pResult; bytes = min(segment_size, bytes_left); { WAR_PROFILE_FUNC(WAR_FP_WarSocketIoWin32Nt_SendWithCallback_ALLOC); pResult = AllocResult(OP_SEND, segment); } pResult->mBufferPtr = outBuffer; pbuffer->mSegmentCnt++; int result; { WAR_PROFILE_FUNC(WAR_FP_WarSocketIoWin32Nt_SendWithCallback_READFILE); result = WriteFile((HANDLE)mSocket, p, bytes, &dwBytes, &pResult->mOverlappedData); } WAR_PROFILE_FUNC(WAR_FP_WarSocketIoWin32Nt_SendWithCallback_POSTPRC); if (!result) { WarSystemError system_error; switch(system_error.SystemError()) { case ERROR_IO_PENDING: break; // No action case ERROR_HANDLE_EOF: if (pbuffer->mpLock) pbuffer->mpLock->Unlock(); DeleteResult(pResult); WarThrow(WarError(WAR_FERR_END_OF_FILE, ERROR_HANDLE_EOF), NULL); break; default: if (pbuffer->mpLock) pbuffer->mpLock->Unlock(); DeleteResult(pResult); WarThrow(system_error, NULL); } } bytes_left -= bytes; if (bytes_left == 0) break; p += bytes; } if (pbuffer->mpLock) pbuffer->mpLock->Unlock(); WAR_DB_PERFMON_INC(WAR_PRFDEBUG_NUM_SCKPENDINGIO);}void WarSocketIoWin32Nt::RecvWithCallback( war_transfer_buffer_ptr_t& inBuffer, size_t numSegments) throw(WarException){ WAR_PROFILE_FUNC(WAR_FP_WarSocketIoWin32Nt_RecvWithCallback); DWORD dwBytes = 0; WarTransferBuffer *pbuffer = &(*inBuffer); if (numSegments > 1) { if (NULL == pbuffer->mpLock) pbuffer->mpLock = new WarCriticalSection; } if (pbuffer->mpLock) pbuffer->mpLock->Lock(); char *p = pbuffer->GetBuffer(); size_t segment_size = pbuffer->GetLength() / numSegments; assert((segment_size * numSegments) == pbuffer->GetLength()); pbuffer->mBytesUsed = 0; // Prepere for data read/written for(unsigned segment = 0 ; segment < numSegments ; segment++, p += segment_size) { ol_result_t *pResult; { pResult = AllocResult(OP_RECV, segment); } pResult->mBufferPtr = inBuffer; pbuffer->mSegmentCnt++; if (!ReadFile((HANDLE)mSocket, p, segment_size, &dwBytes, &pResult->mOverlappedData)) { WarSystemError system_error; switch(system_error.SystemError()) { case ERROR_IO_PENDING: break; // No action case ERROR_HANDLE_EOF: if (pbuffer->mpLock) pbuffer->mpLock->Unlock(); DeleteResult(pResult); WarThrow(WarError(WAR_FERR_END_OF_FILE, ERROR_HANDLE_EOF), NULL); break; default: if (pbuffer->mpLock) pbuffer->mpLock->Unlock(); DeleteResult(pResult); WarThrow(system_error, NULL); } } else if(dwBytes == 0) { if (pbuffer->mpLock) pbuffer->mpLock->Unlock(); WarThrow(WarError(WAR_FERR_END_OF_FILE, ERROR_HANDLE_EOF), NULL); break; //No more data } } if (pbuffer->mpLock) pbuffer->mpLock->Unlock(); WAR_DB_PERFMON_INC(WAR_PRFDEBUG_NUM_SCKPENDINGIO);}//============================= ACCESS ===================================//============================= INQUIRY ===================================/////////////////////////////// PROTECTED ///////////////////////////////////void WarSocketIoWin32Nt::DoListen(int backlog) throw(WarException){ WarLog error_log(WARLOG_ERROR, "WarSocketIoWin32Nt::DoListen", this); AUTO_LOCK try { WarSocketIo::DoListen(backlog); } catch(WarException& ex) { error_log << "Failed to initiate listening connections. " << ex.Explain() << war_endl; goto fail; } { // Set up #backlog number of pending connections for(int t = 0; t < backlog; t++) CreateNewConnectionSocket(); }fail: if (mListenQueue.empty()) { error_log << "No accept handles aqued. I cannot accept connections in a reaiable manner. Aborting operation." << war_endl; WarThrow(WarError(WAR_ERR_INTERNAL_DATA_NOT_INITIALIZED), NULL); } if (mListenQueue.size() < backlog) { error_log << "Continuing with reduced capacity for connections. " << (int)mListenQueue.size() << " of " << (int)backlog << " future connection sockets were allocated." << war_endl; }}void WarSocketIoWin32Nt::DoConnect(const struct sockaddr *name, int namelen) throw(WarException){ WarError throw_err; try { WarSocketIo::DoConnect(name, namelen); } catch(WarException& ex) { throw_err = ex; if (ex.SystemError() == WSAEWOULDBLOCK) goto make_async_connect; throw ex; } return;make_async_connect: { // Implement I/O Completion port WarSvrWin32EventThreadTask *p = new WarSvrWin32EventThreadTask; WarSvrWin32EventThread::task_ptr_t my_ptr = p; p->mCompletionPort = WarSvrWin32NtEngine::GetEngine().GetPort(); p->mCompletionKey = (DWORD)FileIOCompletionRoutine; p->mpOverlapped = (LPOVERLAPPED)AllocResult(OP_CONNECT, 0); p->mSocket = mSocket; if ((p->mEvent = ::CreateEvent(NULL, false, false, NULL)) == NULL) WarThrowSyserr("CreateEvent()"); if (::WSAEventSelect(mSocket, p->mEvent, FD_CONNECT) != 0) { WarLog debug_log(WARLOG_DEBUG, "WarSocketIoWin32Nt::DoConnect()", this); WarSystemError sys_err; if (debug_log) { debug_log << "Failed to initiate an async connect. (WSAEventSelect). " << sys_err << war_endl; } WarThrow(sys_err, "WSAEventSelect()"); } try { WarSvrWin32NtEngine::GetEngine().QueueRequest(my_ptr); WAR_DB_PERFMON_INC(WAR_PRFDEBUG_NUM_SCKCONNPNDIO); } catch(WarException& ex) { WarLog debug_log(WARLOG_DEBUG, "WarSocketIoWin32Nt::DoConnect()", this); if (debug_log) { debug_log << "Failed to queue an async connect. (QueueRequest). " << ex << war_endl; } throw ex; } WarThrow(throw_err, NULL); }}void WarSocketIoWin32Nt::DoCreateSocket(int af, int type, int protocol) throw(WarException){ WarSocketIo::DoCreateSocket(af, type, protocol); assert(mSocket != WAR_INVALID_SOCKET); if (::CreateIoCompletionPort((HANDLE)mSocket, WarSvrWin32NtEngine::GetEngine().GetPort(), (DWORD)FileIOCompletionRoutine, 0) == NULL) { WarError system_error; system_error.Capture(); DoClose(); WarThrow(system_error, "CreateIoCompletionPort()"); }}void WarSocketIoWin32Nt::AssignSocket(war_socket_t sck) throw(WarException){ WarSocketIo::AssignSocket(sck); if (::CreateIoCompletionPort((HANDLE)mSocket, WarSvrWin32NtEngine::GetEngine().GetPort(), (DWORD)FileIOCompletionRoutine,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -