📄 iocpserver.cpp
字号:
}
else
{
dbgprint("Got 0 byte receive\n");
// Graceful close - the receive returned 0 bytes read
sock->bClosing = TRUE;
// Free the receive buffer
FreeBufferObj(buf);
if (DoSends(sock) != NO_ERROR)
{
dbgprint("0: cleaning up in zero byte handler\n");
error = SOCKET_ERROR;
}
// If this was the last outstanding operation on socket, clean it up
if ((sock->OutstandingOps == 0) && (sock->OutOfOrderSends == NULL))
{
dbgprint("1: cleaning up in zero byte handler\n");
bCleanupSocket = TRUE;
}
}
}
else if ((buf->operation == OP_READ) && (error != NO_ERROR) && (gProtocol == IPPROTO_UDP))
{
// If for UDP, a receive completes with an error, we ignore it and re-post the recv
if (PostRecv(sock, buf) != NO_ERROR)
{
error = SOCKET_ERROR;
}
}
else if (buf->operation == OP_WRITE)
{
// Update the counters
InterlockedExchangeAdd(&gBytesSent, BytesTransfered);
InterlockedExchangeAdd(&gBytesSentLast, BytesTransfered);
FreeBufferObj(buf);
if (DoSends(sock) != NO_ERROR)
{
dbgprint("Cleaning up inside OP_WRITE handler\n");
error = SOCKET_ERROR;
}
}
if (error != NO_ERROR)
{
sock->bClosing = TRUE;
}
//
// Check to see if socket is closing
//
if ( (InterlockedDecrement(&sock->OutstandingOps) == 0) &&
(sock->bClosing) &&
(sock->OutOfOrderSends == NULL) )
{
bCleanupSocket = TRUE;
}
else
{
if (DoSends(sock) != NO_ERROR)
{
bCleanupSocket = TRUE;
}
}
LeaveCriticalSection(&sock->SockCritSec);
if (bCleanupSocket)
{
closesocket(sock->s);
sock->s = INVALID_SOCKET;
FreeSocketObj(sock);
}
return;
}
//
// Function: CompletionThread
//
// Description:
// This is the completion thread which services our completion port. One of
// these threads is created per processor on the system. The thread sits in
// an infinite loop calling GetQueuedCompletionStatus and handling socket
// IO that completed.
//
DWORD WINAPI CompletionThread(LPVOID lpParam)
{
SOCKET_OBJ *sockobj=NULL; // Per socket object for completed I/O
BUFFER_OBJ *bufobj=NULL; // Per I/O object for completed I/O
OVERLAPPED *lpOverlapped=NULL; // Pointer to overlapped structure for completed I/O
HANDLE CompletionPort; // Completion port handle
DWORD BytesTransfered, // Number of bytes transfered
Flags; // Flags for completed I/O
int rc,
error;
CompletionPort = (HANDLE)lpParam;
while (1)
{
error = NO_ERROR;
rc = GetQueuedCompletionStatus(
CompletionPort,
&BytesTransfered,
(PULONG_PTR)&sockobj,
&lpOverlapped,
INFINITE
);
bufobj = CONTAINING_RECORD(lpOverlapped, BUFFER_OBJ, ol);
if (rc == FALSE)
{
// If the call fails, call WSAGetOverlappedResult to translate the
// error code into a Winsock error code.
dbgprint("CompletionThread: GetQueuedCompletionStatus failed: %d\n",
GetLastError());
rc = WSAGetOverlappedResult(
sockobj->s,
&bufobj->ol,
&BytesTransfered,
FALSE,
&Flags
);
if (rc == FALSE)
{
error = WSAGetLastError();
}
}
// Handle the IO operation
HandleIo(sockobj, bufobj, CompletionPort, BytesTransfered, error);
}
ExitThread(0);
return 0;
}
//
// Function: main
//
// Description:
// This is the main program. It parses the command line and creates
// the main socket. For UDP this socket is used to receive datagrams.
// For TCP the socket is used to accept incoming client connections.
// Each client TCP connection is handed off to a worker thread which
// will receive any data on that connection until the connection is
// closed.
//
int __cdecl main(int argc, char **argv)
{
WSADATA wsd;
SYSTEM_INFO sysinfo;
SOCKET_OBJ *sockobj=NULL,
*ListenSockets=NULL;
HANDLE CompletionPort,
CompThreads[MAX_COMPLETION_THREAD_COUNT],
hrc;
int endpointcount=0,
interval,
rc,
i;
struct addrinfo *res=NULL,
*ptr=NULL;
// Validate the command line
ValidateArgs(argc, argv);
// Load Winsock
if (WSAStartup(MAKEWORD(2,2), &wsd) != 0)
{
fprintf(stderr, "unable to load Winsock!\n");
return -1;
}
// Create the completion port used by this server
CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, (ULONG_PTR)NULL, 0);
if (CompletionPort == NULL)
{
fprintf(stderr, "CreateIoCompletionPort failed: %d\n", GetLastError());
return -1;
}
// Find out how many processors are on this system
GetSystemInfo(&sysinfo);
if (sysinfo.dwNumberOfProcessors > MAX_COMPLETION_THREAD_COUNT)
{
sysinfo.dwNumberOfProcessors = MAX_COMPLETION_THREAD_COUNT;
}
// Create the worker threads to service the completion notifications
for(i=0; i < (int)sysinfo.dwNumberOfProcessors ;i++)
{
CompThreads[i] = CreateThread(NULL, 0, CompletionThread, (LPVOID)CompletionPort, 0, NULL);
if (CompThreads[i] == NULL)
{
fprintf(stderr, "CreatThread failed: %d\n", GetLastError());
return -1;
}
}
printf("Local address: %s; Port: %s; Family: %d\n",
gBindAddr, gBindPort, gAddressFamily);
res = ResolveAddress(gBindAddr, gBindPort, gAddressFamily, gSocketType, gProtocol);
if (res == NULL)
{
fprintf(stderr, "ResolveAddress failed to return any addresses!\n");
return -1;
}
// For each local address returned, create a listening/receiving socket
ptr = res;
while (ptr)
{
PrintAddress(ptr->ai_addr, ptr->ai_addrlen); printf("\n");
sockobj = GetSocketObj(INVALID_SOCKET, ptr->ai_family);
// create the socket
sockobj->s = socket(ptr->ai_family, ptr->ai_socktype, ptr->ai_protocol);
if (sockobj->s == INVALID_SOCKET)
{
fprintf(stderr,"socket failed: %d\n", WSAGetLastError());
return -1;
}
// Associate the socket and its SOCKET_OBJ to the completion port
hrc = CreateIoCompletionPort((HANDLE)sockobj->s, CompletionPort, (ULONG_PTR)sockobj, 0);
if (hrc == NULL)
{
fprintf(stderr, "CreateIoCompletionPort failed: %d\n", GetLastError());
return -1;
}
// bind the socket to a local address and port
rc = bind(sockobj->s, ptr->ai_addr, ptr->ai_addrlen);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "bind failed: %d\n", WSAGetLastError());
return -1;
}
if (gProtocol == IPPROTO_TCP)
{
BUFFER_OBJ *acceptobj=NULL;
GUID guidAcceptEx = WSAID_ACCEPTEX,
guidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
DWORD bytes;
// Need to load the Winsock extension functions from each provider
// -- e.g. AF_INET and AF_INET6.
rc = WSAIoctl(
sockobj->s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidAcceptEx,
sizeof(guidAcceptEx),
&sockobj->lpfnAcceptEx,
sizeof(sockobj->lpfnAcceptEx),
&bytes,
NULL,
NULL
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "WSAIoctl: SIO_GET_EXTENSION_FUNCTION_POINTER failed: %d\n",
WSAGetLastError());
return -1;
}
rc = WSAIoctl(
sockobj->s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidGetAcceptExSockaddrs,
sizeof(guidGetAcceptExSockaddrs),
&sockobj->lpfnGetAcceptExSockaddrs,
sizeof(sockobj->lpfnGetAcceptExSockaddrs),
&bytes,
NULL,
NULL
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "WSAIoctl: SIO_GET_EXTENSION_FUNCTION_POINTER faled: %d\n",
WSAGetLastError());
return -1;
}
// For TCP sockets, we need to "listen" on them
rc = listen(sockobj->s, 100);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "listen failed: %d\n", WSAGetLastError());
return -1;
}
// Keep track of the pending AcceptEx operations
sockobj->PendingAccepts = (BUFFER_OBJ **)HeapAlloc(
GetProcessHeap(),
HEAP_ZERO_MEMORY,
(sizeof(BUFFER_OBJ *) * gOverlappedCount));
if (sockobj->PendingAccepts == NULL)
{
fprintf(stderr, "HeapAlloc failed: %d\n", GetLastError());
ExitProcess(-1);
}
// Post the AcceptEx(s)
for(i=0; i < gOverlappedCount ;i++)
{
sockobj->PendingAccepts[i] = acceptobj = GetBufferObj(sockobj, gBufferSize);
PostAccept(sockobj, acceptobj);
}
//
// Maintain a list of the listening socket structures
//
if (ListenSockets == NULL)
{
ListenSockets = sockobj;
}
else
{
sockobj->next = ListenSockets;
ListenSockets = sockobj;
}
}
else
{
BUFFER_OBJ *recvobj=NULL;
DWORD bytes;
int optval;
// Turn off UDP errors resulting from ICMP messages (port/host unreachable, etc)
optval = 0;
rc = WSAIoctl(
sockobj->s,
SIO_UDP_CONNRESET,
&optval,
sizeof(optval),
NULL,
0,
&bytes,
NULL,
NULL
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "WSAIoctl: SIO_UDP_CONNRESET failed: %d\n",
WSAGetLastError());
}
// For UDP, simply post some receives
for(i=0; i < gOverlappedCount ;i++)
{
recvobj = GetBufferObj(sockobj, gBufferSize);
PostRecv(sockobj, recvobj);
}
}
endpointcount++;
ptr = ptr->ai_next;
}
// free the addrinfo structure for the 'bind' address
freeaddrinfo(res);
gStartTime = gStartTimeLast = GetTickCount();
interval = 0;
while (1)
{
rc = WSAWaitForMultipleEvents(
sysinfo.dwNumberOfProcessors,
CompThreads,
TRUE,
5000,
FALSE
);
if (rc == WAIT_FAILED)
{
fprintf(stderr, "WSAWaitForMultipleEvents failed: %d\n", WSAGetLastError());
break;
}
else if (rc == WAIT_TIMEOUT)
{
interval++;
PrintStatistics();
if (interval == 12)
{
SOCKET_OBJ *listenptr=NULL;
int optval,
optlen;
// For TCP, cycle through all the outstanding AcceptEx operations
// to see if any of the client sockets have been connected but
// haven't received any data. If so, close them as they could be
// a denial of service attack.
listenptr = ListenSockets;
while (listenptr)
{
for(i=0; i < gOverlappedCount ;i++)
{
optlen = sizeof(optval);
rc = getsockopt(
listenptr->PendingAccepts[i]->sclient,
SOL_SOCKET,
SO_CONNECT_TIME,
(char *)&optval,
&optlen
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "getsockopt: SO_CONNECT_TIME failed: %d\n",
WSAGetLastError());
return -1;
}
// If the socket has been connected for more than 5 minutes,
// close it. If closed, the AcceptEx call will fail in the
// completion thread.
if ((optval != 0xFFFFFFFF) && (optval > 300))
{
closesocket(listenptr->PendingAccepts[i]->sclient);
}
}
listenptr = listenptr->next;
}
interval = 0;
}
}
}
WSACleanup();
return 0;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -