📄 overserver.cpp
字号:
// operations. If not, clean up the connection; othewise wait for them
// to complete.
EnterCriticalSection(&sock->SockCritSec);
if (sock->bClosing && (sock->OutstandingOps == 0))
{
RemoveBufferFromThread(sock, buf);
closesocket(sock->s);
FreeBufferObj(buf);
bFreeSocketObj = TRUE;
}
else
{
buf->buflen = gBufferSize;
// Free the send op that just completed
if (PostRecv(buf) != NO_ERROR)
{
RemoveBufferFromThread(sock, buf);
FreeBufferObj(buf);
}
}
LeaveCriticalSection(&sock->SockCritSec);
}
if (bFreeSocketObj)
{
FreeSocketObj(sock);
}
return;
}
//
// Function: FindBufferObj
//
// Description:
// When I/O is assigned to a thread, the thread iterates through a list
// of BUFFER_OBJ and picks out the event assoicated with that operation.
// When the operation completes, the worker thread must get the BUFFER_OBJ
// corresponded to the signaled event. This routine searches through the
// list of BUFFER_OBJ of a thread and returns the one that corresponds
// to the signaled event.
//
BUFFER_OBJ *FindBufferObj(THREAD_OBJ *thread, WSAEVENT hEvent)
{
BUFFER_OBJ *ptr=NULL;
EnterCriticalSection(&thread->ThreadCritSec);
ptr = thread->BufferList;
while (ptr)
{
if (ptr->ol.hEvent == hEvent)
break;
ptr = ptr->next;
}
LeaveCriticalSection(&thread->ThreadCritSec);
return ptr;
}
//
// Function: IoThread
//
// Description:
// This is the I/O thread spawned to handle overlapped requests. When an
// overlapped operation is initialized, the I/O is first asisgned to a
// worker thread. This is the worker thread that waits for I/O to complete.
// Once an I/O operation is assigned to a thread, the thread's event is
// signaled which causes the thread to initialize its list of pending
// overlapped event handles to include any new operations assigned to it.
// Once one of the overlapepd I/O events is signaled, the thread calls the
// I/O handler routine to handle that particular operation and perform
// the necessariy steps.
//
DWORD WINAPI IoThread(LPVOID lpParam)
{
THREAD_OBJ *thread=NULL;
int index,
count,
rc,
i;
thread = (THREAD_OBJ *)lpParam;
// Initialize the event list to start with
RenumberEvents(thread);
while (1)
{
// Wait on the events
rc = WaitForMultipleObjects(
thread->EventCount,
thread->Handles,
FALSE,
INFINITE
);
if (rc == WAIT_FAILED || rc == WAIT_TIMEOUT)
{
if (GetLastError() == ERROR_INVALID_HANDLE)
{
RenumberEvents(thread);
continue;
}
else
{
fprintf(stderr, "IoThread: WaitForMultipleObjects failed: %d\n",
GetLastError());
break;
}
}
// Iterate through the events to see if more than one were signaled
count = thread->EventCount;
for(i=0; i < count ;i++)
{
rc = WaitForSingleObject(
thread->Handles[i],
0
);
if (rc == WAIT_TIMEOUT)
{
// This event wasn't signaled continue to the next one
continue;
}
index = i;
// Reset the event first
WSAResetEvent(thread->Handles[index]);
if (index == 0)
{
// The thread's event was signaled indicating new I/O assigned
RenumberEvents(thread);
break;
}
else
{
// Otherwise, an overlapped I/O operation completed, service it
HandleIo(FindBufferObj(thread, thread->Handles[index]));
}
}
}
ExitThread(0);
return 0;
}
//
// Function: AssignIoToThread
//
// Description:
// This routine assigns a socket connection to an available child
// thread to handle any IO on it. If no threads are available, a
// new thread is spawned to handle the connection.
//
void AssignIoToThread(BUFFER_OBJ *buf)
{
THREAD_OBJ *thread=NULL;
EnterCriticalSection(&gThreadListCritSec);
thread = gChildThreads;
while (thread)
{
// If this routine returns something other than SOCKET_ERROR
// that it was successfully assigned to a child thread.
if (InsertBufferObjToThread(thread, buf) == NO_ERROR)
{
break;
}
thread = thread->next;
}
if (thread == NULL)
{
// No thread was found to assign the client socket to, create a new thread
//
thread = GetThreadObj();
thread->Thread = CreateThread(NULL, 0, IoThread, (LPVOID)thread, 0, NULL);
if (thread->Thread == NULL)
{
fprintf(stderr, "AssignToFreeThread: CreateThread failed: %d\n", GetLastError());
ExitProcess(-1);
}
// Assign operation to a free thread
InsertBufferObjToThread(thread, buf);
// Insert the thread the list of threads
if (gChildThreads == NULL)
{
gChildThreads = thread;
}
else
{
thread->next = gChildThreads;
gChildThreads = thread;
}
gChildThreadsCount++;
}
buf->Thread = thread;
// signal child thread to rebuild the event list
WSASetEvent(thread->Event);
LeaveCriticalSection(&gThreadListCritSec);
return;
}
//
// Function: RemoveBufferFromThread
//
// Description:
// This routine removes the specified BUFFER_OBJ from a THREAD_OBJ's
// list of pending overlapped operations. Once the object is removed,
// the thread's event is signaled to force the thread to re-initialize
// it's list of pending events.
//
void RemoveBufferFromThread(SOCKET_OBJ *sock, BUFFER_OBJ *buf)
{
EnterCriticalSection(&buf->Thread->ThreadCritSec);
// Remove buffer from the list
RemoveBufferObj(&buf->Thread->BufferList, buf);
// Decrement the event count for the thread
buf->Thread->EventCount--;
// Set the thread's event
WSASetEvent(buf->Thread->Event);
LeaveCriticalSection(&buf->Thread->ThreadCritSec);
}
//
// Function: main
//
// Description:
// This is the main program. It parses the command line and creates
// the main socket. For UDP this socket is used to receive datagrams.
// For TCP the socket is used to accept incoming client connections.
// Each client TCP connection is handed off to a worker thread which
// will receive any data on that connection until the connection is
// closed.
//
int __cdecl main(int argc, char **argv)
{
WSADATA wsd;
THREAD_OBJ *thread=NULL;
SOCKET_OBJ *sockobj=NULL,
*ListenSockets=NULL;
int endpointcount=0,
interval,
rc,
i;
struct addrinfo *res=NULL,
*ptr=NULL;
// Validate the command line
ValidateArgs(argc, argv);
// Load Winsock
if (WSAStartup(MAKEWORD(2,2), &wsd) != 0)
{
fprintf(stderr, "unable to load Winsock!\n");
return -1;
}
InitializeCriticalSection(&gThreadListCritSec);
printf("Local address: %s; Port: %s; Family: %d\n",
gBindAddr, gBindPort, gAddressFamily);
res = ResolveAddress(gBindAddr, gBindPort, gAddressFamily, gSocketType, gProtocol);
if (res == NULL)
{
fprintf(stderr, "ResolveAddress failed to return any addresses!\n");
return -1;
}
// For each local address returned, create a listening/receiving socket
ptr = res;
while (ptr)
{
PrintAddress(ptr->ai_addr, ptr->ai_addrlen); printf("\n");
sockobj = GetSocketObj(INVALID_SOCKET, ptr->ai_family);
// create the socket
sockobj->s = socket(ptr->ai_family, ptr->ai_socktype, ptr->ai_protocol);
if (sockobj->s == INVALID_SOCKET)
{
fprintf(stderr,"socket failed: %d\n", WSAGetLastError());
return -1;
}
// bind the socket to a local address and port
rc = bind(sockobj->s, ptr->ai_addr, ptr->ai_addrlen);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "bind failed: %d\n", WSAGetLastError());
return -1;
}
if (gProtocol == IPPROTO_TCP)
{
BUFFER_OBJ *acceptobj=NULL;
GUID guidAcceptEx = WSAID_ACCEPTEX,
guidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
DWORD bytes;
// Load the extension functions
rc = WSAIoctl(
sockobj->s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidAcceptEx,
sizeof(guidAcceptEx),
&sockobj->lpfnAcceptEx,
sizeof(sockobj->lpfnAcceptEx),
&bytes,
NULL,
NULL
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "WSAIoctl: SIO_GET_EXTENSION_FUNCTION_POINTER failed: %d\n",
WSAGetLastError());
return -1;
}
rc = WSAIoctl(
sockobj->s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidGetAcceptExSockaddrs,
sizeof(guidGetAcceptExSockaddrs),
&sockobj->lpfnGetAcceptExSockaddrs,
sizeof(sockobj->lpfnGetAcceptExSockaddrs),
&bytes,
NULL,
NULL
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "WSAIoctl: SIO_GET_EXTENSION_FUNCTION_POINTER faled: %d\n",
WSAGetLastError());
return -1;
}
rc = listen(sockobj->s, 200);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "listen failed: %d\n", WSAGetLastError());
return -1;
}
// Allocate the overlapped structures for the accepts
sockobj->PendingAccepts = (BUFFER_OBJ **)HeapAlloc(
GetProcessHeap(),
HEAP_ZERO_MEMORY,
(sizeof(BUFFER_OBJ *) * gOverlappedCount));
if (sockobj->PendingAccepts == NULL)
{
fprintf(stderr, "HeapAlloc failed: %d\n", GetLastError());
ExitProcess(-1);
}
// Post the initial accepts
for(i=0; i < gOverlappedCount ;i++)
{
sockobj->PendingAccepts[i] = acceptobj = GetBufferObj(sockobj, gBufferSize);
acceptobj->Socket = sockobj;
AssignIoToThread(acceptobj);
if (PostAccept(acceptobj) == NO_ERROR)
{
// If we can't post accepts just bail
ExitProcess(-1);
}
}
//
// Maintain a list of the listening socket structures
//
if (ListenSockets == NULL)
{
ListenSockets = sockobj;
}
else
{
sockobj->next = ListenSockets;
ListenSockets = sockobj;
}
}
else
{
BUFFER_OBJ *recvobj=NULL;
DWORD bytes;
int optval;
// Turn off UDP errors resulting from ICMP messages (port/host unreachable, etc)
optval = 0;
rc = WSAIoctl(
sockobj->s,
SIO_UDP_CONNRESET,
&optval,
sizeof(optval),
NULL,
0,
&bytes,
NULL,
NULL
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "WSAIoctl: SIO_UDP_CONNRESET failed: %d\n",
WSAGetLastError());
}
// Post the initial UDP receives
for(i=0; i < gOverlappedCount ;i++)
{
recvobj = GetBufferObj(sockobj, gBufferSize);
recvobj->Socket = sockobj;
AssignIoToThread(recvobj);
if (PostRecv(recvobj) == NO_ERROR)
{
}
}
}
endpointcount++;
ptr = ptr->ai_next;
}
// free the addrinfo structure for the 'bind' address
freeaddrinfo(res);
gStartTime = gStartTimeLast = GetTickCount();
interval = 0;
while (1)
{
Sleep(5000);
interval++;
PrintStatistics();
if (interval == 12)
{
SOCKET_OBJ *listenptr=NULL;
int optval,
optlen;
// Walk the list of outstanding accepts
listenptr = ListenSockets;
while (listenptr)
{
for(i=0; i < gOverlappedCount ;i++)
{
optlen = sizeof(optval);
rc = getsockopt(
listenptr->PendingAccepts[i]->sclient,
SOL_SOCKET,
SO_CONNECT_TIME,
(char *)&optval,
&optlen
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "getsockopt: SO_CONNECT_TIME failed: %d\n",
WSAGetLastError());
return -1;
}
if ((optval != 0xFFFFFFFF) && (optval > 300))
{
closesocket(listenptr->PendingAccepts[i]->sclient);
}
}
listenptr = listenptr->next;
}
interval = 0;
}
}
WSACleanup();
return 0;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -