📄 iocpclient.cpp
字号:
}
FreeBufferObj(buf);
}
}
LeaveCriticalSection(&sock->SockCritSec);
}
else if (buf->operation == OP_TRANSMIT)
{
// Update the counters
InterlockedExchangeAdd(&gBytesSent, BytesTransfered);
InterlockedExchangeAdd(&gBytesSentLast, BytesTransfered);
// If there are more sends to be made, post another TransmitFile
EnterCriticalSection(&sock->SockCritSec);
if (gRateLimit == -1)
{
// If rate limiting is not set, just repost the send
if (sock->SendCount > 0)
{
rc = PostTransmitFile(sock, buf);
if (rc != NO_ERROR)
{
bCleanupSocket = TRUE;
}
}
else
{
// Otherwise shutdown the socket
shutdown(sock->s, SD_SEND);
FreeBufferObj(buf);
}
}
else
{
// If rate limiting is enabled, save off the operation for
// sending by the send thread.
//
if (sock->SendCount > 0)
{
buf->next = sock->Repost;
sock->Repost = buf;
}
else
{
// Otherwise shutdown the socket
shutdown(sock->s, SD_SEND);
FreeBufferObj(buf);
}
}
LeaveCriticalSection(&sock->SockCritSec);
}
}
//
// Check to see if socket is closing
//
if ( (InterlockedDecrement(&sock->OutstandingOps) == 0) &&
(sock->bClosing))
{
bCleanupSocket = TRUE;
}
if (sock->bClosing)
{
printf("CLOSING: ops outstanding %d\n", sock->OutstandingOps);
}
// If indicated to clean up, close the socket and free the objects
if (bCleanupSocket)
{
InterlockedDecrement(&gCurrentConnections);
EnterCriticalSection(&sock->SockCritSec);
closesocket(sock->s);
sock->s = INVALID_SOCKET;
LeaveCriticalSection(&sock->SockCritSec);
printf("removing conneciton object\n");
if (gTimeout == -1)
{
RemoveSocketObj(&gConnectionList, sock);
FreeSocketObj(sock);
}
else if (gCurrentConnections == 0)
{
return 0;
}
if (gConnectionList == NULL)
{
printf("List is NULL\n");
return 0;
}
}
return 1;
}
//
// Function: SetPort
//
// Description:
// Sets the port number in the specified socket address.
//
void SetPort(int af, SOCKADDR *sa, USHORT port)
{
if (af == AF_INET)
{
((SOCKADDR_IN *)sa)->sin_port = htons(port);
}
else if (af == AF_INET6)
{
((SOCKADDR_IN6 *)sa)->sin6_port = htons(port);
}
}
//
// Function: SendThread
//
// Description:
// This thread rate limits the send operations in an attempt to smooth the
// data flow. The sleep time is computed from the send size and the given
// send rate. This method breaks down for very high or very low send rates.
// Its main goal is to smooth the traffic so as to not overrun the local
// network bandwidth and prevent extreme spikes in network usage.
//
DWORD WINAPI SendThread(LPVOID lpParam)
{
SOCKET_OBJ *connobj=NULL,
*tmp=NULL;
BUFFER_OBJ *buf=NULL;
printf("SendThread\n");
Sleep(gTimeout);
while (1)
{
// Walk the connection list to repost sends
connobj = gConnectionList;
if (connobj == NULL)
{
break;
}
while (connobj)
{
EnterCriticalSection(&connobj->SockCritSec);
if ((connobj->s != INVALID_SOCKET) && (connobj->Repost != NULL))
{
buf = connobj->Repost;
connobj->Repost = buf->next;
// Post the appropriate send operation
if (buf->operation == OP_WRITE)
{
PostSend(connobj, buf);
}
else if (buf->operation == OP_TRANSMIT)
{
PostTransmitFile(connobj, buf);
}
}
LeaveCriticalSection(&connobj->SockCritSec);
connobj = connobj->next;
Sleep(gTimeout);
if (gCurrentConnections == 0)
break;
}
}
// Free the connection objects if exiting
connobj = gConnectionList;
while (connobj)
{
tmp = connobj;
connobj = connobj->next;
FreeSocketObj(tmp);
}
ExitThread(0);
return 0;
}
//
// Function: main
//
// Description:
// This is the main program. It parses the command line and creates
// the main socket. For UDP this socket is used to receive datagrams.
// For TCP the socket is used to accept incoming client connections.
// Each client TCP connection is handed off to a worker thread which
// will receive any data on that connection until the connection is
// closed.
//
int __cdecl main(int argc, char **argv)
{
GUID guidConnectEx = WSAID_CONNECTEX,
guidTransmitFile = WSAID_TRANSMITFILE;
DWORD bytes,
flags;
SOCKET_OBJ *sockobj=NULL;
BUFFER_OBJ *connobj=NULL,
*buffobj=NULL;
OVERLAPPED *lpOverlapped=NULL;
HANDLE CompletionPort,
hThread,
hrc;
WSADATA wsd;
ULONG lastprint=0;
int error,
rc,
i;
struct addrinfo *resremote=NULL,
*reslocal=NULL,
*ptr=NULL;
// Validate the command line
ValidateArgs(argc, argv);
if (gTransmitFile && (gOverlappedCount > 1))
{
printf("Can only have one TransmitFile oustanding per connection!\n");
gOverlappedCount = 1;
}
// Create the temp file if TransmitFile is to be used
if (gTransmitFile)
{
gTempFile = CreateTempFile("txfile.tmp", gFileSize);
if (gTempFile == INVALID_HANDLE_VALUE)
{
fprintf(stderr, "Unable to create temp file!\n");
return -1;
}
}
// Load Winsock
if (WSAStartup(MAKEWORD(2,2), &wsd) != 0)
{
fprintf(stderr, "unable to load Winsock!\n");
return -1;
}
// Create the completion port used by this server
CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, (ULONG_PTR)NULL, 0);
if (CompletionPort == NULL)
{
fprintf(stderr, "CreateIoCompletionPort failed: %d\n", GetLastError());
return -1;
}
// Resolve the local address
printf("Local address: %s; Port: %s; Family: %d\n",
gBindAddr, gBindPort, gAddressFamily);
resremote = ResolveAddress(gServerAddr, gBindPort, gAddressFamily, gSocketType, gProtocol);
if (resremote == NULL)
{
fprintf(stderr, "ResolveAddress failed to return any addresses!\n");
return -1;
}
// Compute the timeout if rate limiting is selected
if (gRateLimit != -1)
{
gTimeout = (((gConnectionCount * gBufferSize) / gRateLimit) * 1000) / gConnectionCount;
if (gRateLimit >= 1000000)
{
gTimeout /= 2;
}
printf("gTimeout == %lu\n", gTimeout);
}
// Start the timer for statistics counting
gStartTime = gStartTimeLast = GetTickCount();
// For each local address returned, create a listening/receiving socket
ptr = resremote;
while (ptr)
{
reslocal = ResolveAddress(gBindAddr, "0", ptr->ai_family, ptr->ai_socktype, ptr->ai_protocol);
if (reslocal)
{
// Initiate the specified number of connections for each server
// address returned.
for(i=0; i < gConnectionCount ;i++)
{
sockobj = GetSocketObj(INVALID_SOCKET, ptr->ai_family);
// create the socket
sockobj->s = socket(ptr->ai_family, ptr->ai_socktype, ptr->ai_protocol);
if (sockobj->s == INVALID_SOCKET)
{
fprintf(stderr,"socket failed: %d\n", WSAGetLastError());
return -1;
}
// Associate the socket and its SOCKET_OBJ to the completion port
hrc = CreateIoCompletionPort((HANDLE)sockobj->s, CompletionPort, (ULONG_PTR)sockobj, 0);
if (hrc == NULL)
{
fprintf(stderr, "CreateIoCompletionPort failed: %d\n", GetLastError());
return -1;
}
// bind the socket to a local address and port
do
{
SetPort(reslocal->ai_family, reslocal->ai_addr, gLocalPort);
rc = bind(sockobj->s, reslocal->ai_addr, reslocal->ai_addrlen);
if (rc == SOCKET_ERROR)
{
// Bail out if the port gets too low
if (--gLocalPort == 1024)
{
fprintf(stderr, "bind failed: %d\n", WSAGetLastError());
return -1;
}
}
else
{
break;
}
} while (1);
gLocalPort--;
// Need to load the Winsock extension functions from each provider
// -- e.g. AF_INET and AF_INET6.
rc = WSAIoctl(
sockobj->s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidConnectEx,
sizeof(guidConnectEx),
&sockobj->lpfnConnectEx,
sizeof(sockobj->lpfnConnectEx),
&bytes,
NULL,
NULL
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "WSAIoctl: SIO_GET_EXTENSION_FUNCTION_POINTER failed: %d\n",
WSAGetLastError());
return -1;
}
rc = WSAIoctl(
sockobj->s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidTransmitFile,
sizeof(guidTransmitFile),
&sockobj->lpfnTransmitFile,
sizeof(sockobj->lpfnTransmitFile),
&bytes,
NULL,
NULL
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "WSAIoctl: SIO_GET_EXTENSION_FUNCTION_POINTER faled: %d\n",
WSAGetLastError());
return -1;
}
connobj = GetBufferObj(gBufferSize);
// Copy the remote address into the connect object
memcpy(&connobj->addr, ptr->ai_addr, ptr->ai_addrlen);
connobj->addrlen = ptr->ai_addrlen;
sockobj->SendCount = gSendCount;
// Insert this socket object into the list of pending connects
InsertSocketObj(&gConnectionList, sockobj);
PostConnect(sockobj, connobj);
if (gRateLimit != -1)
Sleep(gTimeout);
}
freeaddrinfo(reslocal);
}
ptr = ptr->ai_next;
}
// free the addrinfo structure for the 'bind' address
freeaddrinfo(resremote);
if (gRateLimit != -1)
{
hThread = CreateThread(NULL, 0, SendThread, (LPVOID)NULL, 0, NULL);
if (hThread == NULL)
{
fprintf(stderr, "CreateThread failed: %d\n", GetLastError());
return -1;
}
}
lastprint = GetTickCount();
// Our worker thread is simly our main thread, process the completion
// notifications.
while (1)
{
error = NO_ERROR;
rc = GetQueuedCompletionStatus(
CompletionPort,
&bytes,
(PULONG_PTR)&sockobj,
&lpOverlapped,
2000
);
if (rc == 0)
{
if (((error = GetLastError()) == WAIT_TIMEOUT) ||
(error == STATUS_TIMEOUT))
{
PrintStatistics();
lastprint = GetTickCount();
}
else
{
fprintf(stderr, "GetQueuedCompletionStatus failed: %d\n", WSAGetLastError());
rc = WSAGetOverlappedResult(
sockobj->s,
lpOverlapped,
&bytes,
FALSE,
&flags
);
error = WSAGetLastError();
}
}
else
{
buffobj = CONTAINING_RECORD(lpOverlapped, BUFFER_OBJ, ol);
// Handle IO until 0 is returned -- this indicates that no more socket
// connections remain open.
if (HandleIo(sockobj, buffobj, bytes, error) == 0)
{
break;
}
if ((GetTickCount() - lastprint) > 2000)
{
PrintStatistics();
lastprint = GetTickCount();
}
}
if (gRateLimit != -1)
{
rc = WaitForSingleObject(hThread, 0);
if (rc != WAIT_TIMEOUT && rc != WAIT_FAILED)
{
CloseHandle(hThread);
break;
}
}
}
PrintStatistics();
CloseHandle(CompletionPort);
CloseHandle(gTempFile);
WSACleanup();
return 0;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -