📄 warsocketiowin32nt.cpp
字号:
0) == NULL) { WarError system_error; system_error.Capture(); DoClose(); WarThrow(system_error, "CreateIoCompletionPort()"); }}WarSocketIoWin32Nt::ol_result_def *WarSocketIoWin32Nt::AllocResult(WarSocketIoWin32Nt::IoOperationE operation, unsigned segmentNum){ WarAutoLock my_lock(msBufferLock); ol_result_t *p; if (mspBufferList) { p = mspBufferList; mspBufferList = mspBufferList->mpNext; } else { WAR_PROFILE_FUNC(WAR_FP_WarSocketIoWin32Nt_AllocResult_NEW); p = new ol_result_t; } WAR_PROFILE_FUNC(WAR_FP_WarSocketIoWin32Nt_AllocResult_ASSIGN); p->mpSocket = this; p->SocketPtr = mpCompanion; // Protect the socket from destruction memset(&(p->mOverlappedData), 0, sizeof(OVERLAPPED)); p->mOperation = operation; p->mAcceptSocket = WAR_INVALID_SOCKET; p->mSegmentNum = segmentNum; return p;}void WarSocketIoWin32Nt::DeleteResult(ol_result_def *presult){ WarAutoLock my_lock(msBufferLock); // Prepere for re-use presult->mpNext = mspBufferList; mspBufferList = presult; // Release autopointers presult->SocketPtr = NULL; presult->mBufferPtr = NULL;}void WarSocketIoWin32Nt::OnConnect(const WarError& status){ ULONG socket_error = 0; int socket_error_len = sizeof(socket_error); WarError my_status = status; if (!status) { // Check if we _really_ are connected if (!getsockopt(mSocket, SOL_SOCKET, SO_ERROR, (char *)&socket_error, &socket_error_len) && (socket_error != 0)) { my_status = WarSystemError(); if (!my_status) my_status = WarError(WAR_ERR_SYSTEM_ERROR); } } WarSocketIo::OnConnect(my_status);}/////////////////////////////// PRIVATE ///////////////////////////////////VOID CALLBACK WarSocketIoWin32Nt::FileIOCompletionRoutine( DWORD dwErrorCode, // completion code DWORD dwNumberOfBytesTransfered, // number of bytes transferred LPOVERLAPPED lpOverlapped // I/O information buffer ){ ol_result_t *pResult = (ol_result_def *)lpOverlapped; WarLog err_log(WARLOG_ERROR, "WarSocketIoWin32Nt::FileIOCompletionRoutine()", pResult->mpSocket); WarError err((dwErrorCode != WAR_ERR_OK) ? WAR_ERR_SYSTEM_ERROR : WAR_ERR_OK, (int)dwErrorCode); static const char *op_names[OP_INVALID +1] = { "OP_RECV", "OP_SEND", "OP_CONNECT", "OP_ACCEPT", "OP_INVALID" }; war_ccstr_t current_op = op_names[ WarMin((unsigned)pResult->mOperation, (unsigned)OP_INVALID)]; // Map errors if (err) { switch(err.SystemError()) { case ERROR_HANDLE_EOF: err = WAR_FERR_END_OF_FILE; break; case ERROR_NETNAME_DELETED: err = WAR_NETERR_HARD_CLOSE; break; default: ; } }#ifdef DEBUG WarLog net_log(WARLOG_SOCKET, "WarSocketIoWin32Nt::FileIOCompletionRoutine()", pResult->mpSocket); if (net_log) { net_log << "Socket event: " << current_op << " sck: " << pResult->mpSocket->GetSeqNumber() << " "; if (err) net_log << err; else net_log << "OK "; WarThread *pthread = WarThread::GetCurrentThread(); if (pthread) { net_log << "thd: " << (unsigned)pthread->GetThreadId() << " "; } net_log << " len: " << (unsigned)dwNumberOfBytesTransfered << war_endl; }#endif try { switch(pResult->mOperation) { case OP_RECV: { WAR_DB_PERFMON_DEC(WAR_PRFDEBUG_NUM_SCKPENDINGIO); WarTransferBuffer *pbuffer = &(*pResult->mBufferPtr); { WarAutoLock my_buffer_lock(pbuffer->mpLock); pResult->mBufferPtr->mBytesUsed += dwNumberOfBytesTransfered; if (--pbuffer->mSegmentCnt == 0) { if ((pbuffer->GetBytesUsed() > 0) && (WAR_FERR_END_OF_FILE == err.LocalError())) err.Reset(); // Let the higher levels process the data } else break; } // unlock buffer lock pResult->mpSocket->OnReceived(err, pResult->mBufferPtr); } break; case OP_SEND: { WAR_DB_PERFMON_DEC(WAR_PRFDEBUG_NUM_SCKPENDINGIO); WarTransferBuffer *pbuffer = & (*pResult->mBufferPtr); { WarAutoLock my_buffer_lock(pbuffer->mpLock); pResult->mBufferPtr->mBytesUsed += dwNumberOfBytesTransfered; if (--pbuffer->mSegmentCnt == 0) { if ((pbuffer->GetBytesUsed() > 0) && (WAR_FERR_END_OF_FILE == err.LocalError())) err.Reset(); // Let the higher levels process the data } else break; } // unlock buffer lock pResult->mpSocket->OnSent(err, pResult->mBufferPtr); } break; case OP_CONNECT: WAR_DB_PERFMON_DEC(WAR_PRFDEBUG_NUM_SCKCONNPNDIO); pResult->mpSocket->OnConnect(err); break; case OP_ACCEPT: { // Handle errors switch (dwErrorCode) { case ERROR_SUCCESS: break; case ERROR_OPERATION_ABORTED: err = WarError(WAR_ERR_ABORT_THREAD, dwErrorCode); break; default: err = WarError(WAR_ERR_SYSTEM_ERROR, dwErrorCode); } bool do_callback = false; pending_ptr_t tmp_holder; WarNetAddress addr_local, addr_remote; if (!dwErrorCode) { WarAutoLock my_lock(pResult->mpSocket->mLock); // Replace the used connection handle if (pResult->mpSocket->IsOpen()) { pResult->mpSocket->CreateNewConnectionSocket(); // Handle the new connection pending_ptr_t search_socket( new WarSocketIoWin32NtPending( pResult->mAcceptSocket)); pending_set_t::iterator P = pResult->mpSocket->mListenQueue.find(search_socket); search_socket->mSocket = WAR_INVALID_SOCKET; // prevent close if (P != pResult->mpSocket->mListenQueue.end()) { tmp_holder = (*P); pResult->mpSocket->mListenQueue.erase(P); } WarSocketIoWin32NtPending& rio_buffer = *tmp_holder; struct sockaddr *plocal_sockaddr = NULL, *premote_sockaddr = NULL; INT local_sockaddr_length = 0; INT remote_sockaddr_length = 0; GetAcceptExSockaddrs(rio_buffer.mBuffer, rio_buffer.GetReadBufferLength(), rio_buffer.GetLocalAddrBufferLength(), rio_buffer.GetRemoteAddrBufferLength(), &plocal_sockaddr, &local_sockaddr_length, &premote_sockaddr, &remote_sockaddr_length); addr_local.AssignSockaddr(plocal_sockaddr, local_sockaddr_length); addr_remote.AssignSockaddr(premote_sockaddr, remote_sockaddr_length); do_callback = true; } } if (do_callback) { pResult->mpSocket->OnAccept(err, pResult->mAcceptSocket, addr_remote, addr_local); /* If we got this far, we leave the responsibility for * the new socket to whomever handled the OnAccept() * call. * In any other case, we would have closed it. */ tmp_holder->mSocket = WAR_INVALID_SOCKET; } } break; default: DeleteResult(pResult); WarThrow(WarError(WAR_ERR_INVALID_CASE_VALUE), "IoOperationE"); } } catch(WarException& ex) { err_log << "Caught unexcpected exception. Operation was " << current_op << ' ' << ex.Explain() << war_endl; }#if WAR_CATCH_ALL catch(...) { err_log << "Caught unhandled exception. Operation was " << current_op << war_endl; }#endif DeleteResult(pResult);} /* Called from either DoListen() or FileIOCompletionRoutine(). * Both these methods guarantees that the object is locked * so that properties can be safely accessed. */void WarSocketIoWin32Nt::CreateNewConnectionSocket(){ WarLog error_log(WARLOG_ERROR, "WarSocketIoWin32Nt::CreateNewConnectionSocket", this); war_socket_t new_socket = socket(AF_INET, SOCK_STREAM, 0); if (new_socket == WAR_INVALID_SOCKET) { error_log << "Failed to create socket for future connection." << WarSystemError() << war_endl; return; } pending_ptr_t sck_ptr( new WarSocketIoWin32NtPending(new_socket)); ol_result_t *polData = AllocResult(OP_ACCEPT, 0); polData->mAcceptSocket = new_socket; DWORD dwBytesReceived = 0; // Socket is ok. Initiate accept if (!AcceptEx(mSocket, new_socket, sck_ptr->mBuffer, 0, sizeof(struct sockaddr) + 16, sizeof(struct sockaddr) + 16, &dwBytesReceived, &polData->mOverlappedData)) { WarSystemError system_err; if (system_err.SystemError() != ERROR_IO_PENDING) { error_log << "AcceptEx() failed. " << system_err << war_endl; delete polData; return; } } // At this point, everything is swell mListenQueue.insert(sck_ptr);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -