📄 iocpserver.cpp
字号:
LeaveCriticalSection(&sock->SockCritSec);
return rc;
}
//
// Function: PostAccept
//
// Description:
// Post an overlapped accept on a listening socket.
//
int PostAccept(LISTEN_OBJ *listen, BUFFER_OBJ *acceptobj)
{
DWORD bytes;
int rc;
acceptobj->operation = OP_ACCEPT;
// Create the client socket for an incoming connection
acceptobj->sclient = socket(listen->AddressFamily, SOCK_STREAM, IPPROTO_TCP);
if (acceptobj->sclient == INVALID_SOCKET)
{
fprintf(stderr, "PostAccept: socket failed: %d\n", WSAGetLastError());
return -1;
}
rc = listen->lpfnAcceptEx(
listen->s,
acceptobj->sclient,
acceptobj->buf,
acceptobj->buflen - ((sizeof(SOCKADDR_STORAGE) + 16) * 2),
sizeof(SOCKADDR_STORAGE) + 16,
sizeof(SOCKADDR_STORAGE) + 16,
&bytes,
&acceptobj->ol
);
if (rc == FALSE)
{
if (WSAGetLastError() != WSA_IO_PENDING)
{
printf("PostAccept: AcceptEx failed: %d\n",
WSAGetLastError());
return SOCKET_ERROR;
}
}
// Increment the outstanding overlapped count for this socket
InterlockedIncrement(&listen->PendingAcceptCount);
return NO_ERROR;
}
//
// Function: HandleIo
//
// Description:
// This function handles the IO on a socket. In the event of a receive, the
// completed receive is posted again. For completed accepts, another AcceptEx
// is posted. For completed sends, the buffer is freed.
//
void HandleIo(ULONG_PTR key, BUFFER_OBJ *buf, HANDLE CompPort, DWORD BytesTransfered, DWORD error)
{
LISTEN_OBJ *listenobj=NULL;
SOCKET_OBJ *sockobj=NULL,
*clientobj=NULL; // New client object for accepted connections
BUFFER_OBJ *recvobj=NULL, // Used to post new receives on accepted connections
*sendobj=NULL; // Used to post new sends for data received
BOOL bCleanupSocket;
if (error != 0)
{
dbgprint("OP = %d; Error = %d\n", buf->operation, error);
}
bCleanupSocket = FALSE;
if (error != NO_ERROR)
{
// An error occured on a TCP socket, free the associated per I/O buffer
// and see if there are any more outstanding operations. If so we must
// wait until they are complete as well.
//
if (buf->operation != OP_ACCEPT)
{
sockobj = (SOCKET_OBJ *)key;
if (buf->operation == OP_READ)
{
if ((InterlockedDecrement(&sockobj->OutstandingRecv) == 0) &&
(sockobj->OutstandingSend == 0) )
{
dbgprint("Freeing socket obj in GetOverlappedResult\n");
FreeSocketObj(sockobj);
}
}
else if (buf->operation == OP_WRITE)
{
if ((InterlockedDecrement(&sockobj->OutstandingSend) == 0) &&
(sockobj->OutstandingRecv == 0) )
{
dbgprint("Freeing socket obj in GetOverlappedResult\n");
FreeSocketObj(sockobj);
}
}
}
else
{
listenobj = (LISTEN_OBJ *)key;
printf("Accept failed\n");
closesocket(buf->sclient);
buf->sclient = INVALID_SOCKET;
}
FreeBufferObj(buf);
return;
}
if (buf->operation == OP_ACCEPT)
{
HANDLE hrc;
SOCKADDR_STORAGE *LocalSockaddr=NULL,
*RemoteSockaddr=NULL;
int LocalSockaddrLen,
RemoteSockaddrLen;
listenobj = (LISTEN_OBJ *)key;
// Update counters
InterlockedIncrement(&gConnections);
InterlockedIncrement(&gConnectionsLast);
InterlockedDecrement(&listenobj->PendingAcceptCount);
InterlockedExchangeAdd(&gBytesRead, BytesTransfered);
InterlockedExchangeAdd(&gBytesReadLast, BytesTransfered);
// Print the client's addresss
listenobj->lpfnGetAcceptExSockaddrs(
buf->buf,
buf->buflen - ((sizeof(SOCKADDR_STORAGE) + 16) * 2),
sizeof(SOCKADDR_STORAGE) + 16,
sizeof(SOCKADDR_STORAGE) + 16,
(SOCKADDR **)&LocalSockaddr,
&LocalSockaddrLen,
(SOCKADDR **)&RemoteSockaddr,
&RemoteSockaddrLen
);
RemovePendingAccept(listenobj, buf);
// Get a new SOCKET_OBJ for the client connection
clientobj = GetSocketObj(buf->sclient, listenobj->AddressFamily);
if (clientobj)
{
// Associate the new connection to our completion port
hrc = CreateIoCompletionPort(
(HANDLE)clientobj->s,
CompPort,
(ULONG_PTR)clientobj,
0
);
if (hrc == NULL)
{
fprintf(stderr, "CompletionThread: CreateIoCompletionPort failed: %d\n",
GetLastError());
return;
}
sendobj = buf;
sendobj->buflen = BytesTransfered;
// Post the send - this is the first one for this connection so just do it
sendobj->sock = clientobj;
//PostSend(clientobj, sendobj);
EnqueuePendingOperation(&gPendingSendList, &gPendingSendListEnd, sendobj, OP_WRITE);
}
else
{
// Can't allocate a socket structure so close the connection
closesocket(buf->sclient);
buf->sclient = INVALID_SOCKET;
FreeBufferObj(buf);
}
if (error != NO_ERROR)
{
// Check for socket closure
EnterCriticalSection(&clientobj->SockCritSec);
if ( (clientobj->OutstandingSend == 0) &&
(clientobj->OutstandingRecv == 0) )
{
closesocket(clientobj->s);
clientobj->s = INVALID_SOCKET;
FreeSocketObj(clientobj);
}
else
{
clientobj->bClosing = TRUE;
}
LeaveCriticalSection(&clientobj->SockCritSec);
error = NO_ERROR;
}
InterlockedIncrement(&listenobj->RepostCount);
SetEvent(listenobj->RepostAccept);
}
else if (buf->operation == OP_READ)
{
sockobj = (SOCKET_OBJ *)key;
InterlockedDecrement(&sockobj->OutstandingRecv);
//
// Receive completed successfully
//
if (BytesTransfered > 0)
{
InterlockedExchangeAdd(&gBytesRead, BytesTransfered);
InterlockedExchangeAdd(&gBytesReadLast, BytesTransfered);
// Make the recv a send
sendobj = buf;
sendobj->buflen = BytesTransfered;
sendobj->sock = sockobj;
//PostSend(sockobj, sendobj);
EnqueuePendingOperation(&gPendingSendList, &gPendingSendListEnd, sendobj, OP_WRITE);
}
else
{
//dbgprint("Got 0 byte receive\n");
// Graceful close - the receive returned 0 bytes read
sockobj->bClosing = TRUE;
// Free the receive buffer
FreeBufferObj(buf);
// If this was the last outstanding operation on socket, clean it up
EnterCriticalSection(&sockobj->SockCritSec);
if ((sockobj->OutstandingSend == 0) &&
(sockobj->OutstandingRecv == 0) )
{
bCleanupSocket = TRUE;
}
LeaveCriticalSection(&sockobj->SockCritSec);
}
}
else if (buf->operation == OP_WRITE)
{
sockobj = (SOCKET_OBJ *)key;
InterlockedDecrement(&sockobj->OutstandingSend);
InterlockedDecrement(&gOutstandingSends);
// Update the counters
InterlockedExchangeAdd(&gBytesSent, BytesTransfered);
InterlockedExchangeAdd(&gBytesSentLast, BytesTransfered);
buf->buflen = gBufferSize;
if (sockobj->bClosing == FALSE)
{
buf->sock = sockobj;
PostRecv(sockobj, buf);
}
}
ProcessPendingOperations();
if (sockobj)
{
if (error != NO_ERROR)
{
printf("err = %d\n", error);
sockobj->bClosing = TRUE;
}
//
// Check to see if socket is closing
//
if ( (sockobj->OutstandingSend == 0) &&
(sockobj->OutstandingRecv == 0) &&
(sockobj->bClosing) )
{
bCleanupSocket = TRUE;
}
if (bCleanupSocket)
{
closesocket(sockobj->s);
sockobj->s = INVALID_SOCKET;
FreeSocketObj(sockobj);
}
}
return;
}
//
// Function: CompletionThread
//
// Description:
// This is the completion thread which services our completion port. One of
// these threads is created per processor on the system. The thread sits in
// an infinite loop calling GetQueuedCompletionStatus and handling socket
// IO that completed.
//
DWORD WINAPI CompletionThread(LPVOID lpParam)
{
ULONG_PTR Key;
SOCKET s;
BUFFER_OBJ *bufobj=NULL; // Per I/O object for completed I/O
OVERLAPPED *lpOverlapped=NULL; // Pointer to overlapped structure for completed I/O
HANDLE CompletionPort; // Completion port handle
DWORD BytesTransfered, // Number of bytes transfered
Flags; // Flags for completed I/O
int rc,
error;
CompletionPort = (HANDLE)lpParam;
while (1)
{
error = NO_ERROR;
rc = GetQueuedCompletionStatus(
CompletionPort,
&BytesTransfered,
(PULONG_PTR)&Key,
&lpOverlapped,
INFINITE
);
bufobj = CONTAINING_RECORD(lpOverlapped, BUFFER_OBJ, ol);
if (rc == FALSE)
{
// If the call fails, call WSAGetOverlappedResult to translate the
// error code into a Winsock error code.
if (bufobj->operation == OP_ACCEPT)
{
s = ((LISTEN_OBJ *)Key)->s;
}
else
{
s = ((SOCKET_OBJ *)Key)->s;
}
dbgprint("CompletionThread: GetQueuedCompletionStatus failed: %d [0x%x]\n",
GetLastError(), lpOverlapped->Internal);
rc = WSAGetOverlappedResult(
s,
&bufobj->ol,
&BytesTransfered,
FALSE,
&Flags
);
if (rc == FALSE)
{
error = WSAGetLastError();
}
}
// Handle the IO operation
HandleIo(Key, bufobj, CompletionPort, BytesTransfered, error);
}
ExitThread(0);
return 0;
}
//
// Function: main
//
// Description:
// This is the main program. It parses the command line and creates
// the main socket. For TCP the socket is used to accept incoming
// client connections. Each client TCP connection is handed off to
// a worker thread which will receive any data on that connection
// until the connection is closed.
//
int __cdecl main(int argc, char **argv)
{
WSADATA wsd;
SYSTEM_INFO sysinfo;
LISTEN_OBJ *ListenSockets=NULL,
*listenobj=NULL;
SOCKET_OBJ *sockobj=NULL;
BUFFER_OBJ *acceptobj=NULL;
GUID guidAcceptEx = WSAID_ACCEPTEX,
guidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
DWORD bytes;
HANDLE CompletionPort,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -