📄 eventserver.cpp
字号:
// Put new object at the head
obj->next = sock->pending;
sock->pending = obj;
}
}
//
// Function: DequeueBufferObj
//
// Description:
// Remove a BUFFER_OBJ from the given connection's queue for sending.
//
BUFFER_OBJ *DequeueBufferObj(SOCKET_OBJ *sock)
{
BUFFER_OBJ *ret=NULL;
if (sock->pendingtail != NULL)
{
// Queue is non empty
ret = sock->pending;
sock->pending = sock->pending->next;
if (sock->pendingtail == ret)
{
// Item is the only item in the queue
sock->pendingtail = NULL;
}
}
return ret;
}
//
// Function: ValidateArgs
//
// Description:
// Parses the command line arguments and sets up some global
// variables.
//
void ValidateArgs(int argc, char **argv)
{
int i;
for(i=1; i < argc ;i++)
{
if (((argv[i][0] != '/') && (argv[i][0] != '-')) || (strlen(argv[i]) < 2))
usage(argv[0]);
else
{
switch (tolower(argv[i][1]))
{
case 'a': // address family - IPv4 or IPv6
if (i+1 >= argc)
usage(argv[0]);
if (argv[i+1][0] == '4')
gAddressFamily = AF_INET;
else if (argv[i+1][0] == '6')
gAddressFamily = AF_INET6;
else
usage(argv[0]);
i++;
break;
case 'b': // buffer size for send/recv
if (i+1 >= argc)
usage(argv[0]);
gBufferSize = atol(argv[++i]);
break;
case 'e': // endpoint - port number
if (i+1 >= argc)
usage(argv[0]);
gBindPort = argv[++i];
break;
case 'l': // local address for binding
if (i+1 >= argc)
usage(argv[0]);
gBindAddr = argv[++i];
break;
case 'p': // protocol - TCP or UDP
if (i+1 >= argc)
usage(argv[0]);
if (_strnicmp(argv[i+1], "tcp", 3) == 0)
{
gProtocol = IPPROTO_TCP;
gSocketType = SOCK_STREAM;
}
else if (_strnicmp(argv[i+1], "udp", 3) == 0)
{
gProtocol = IPPROTO_UDP;
gSocketType = SOCK_DGRAM;
}
else
usage(argv[0]);
i++;
break;
default:
usage(argv[0]);
break;
}
}
}
}
//
// Function: ReceivePendingData
//
// Description:
// Receive data pending on the socket into a SOCKET_OBJ buffer. Enqueue
// the buffer into the socket object for sending later. This routine returns
// -1 indicating that the socket is no longer valid and the calling function
// should clean up (remove) the socket object. Zero is returned for success.
//
int ReceivePendingData(SOCKET_OBJ *sockobj)
{
BUFFER_OBJ *buffobj=NULL;
int rc,
ret;
// Get a buffer to receive the data
buffobj = GetBufferObj(gBufferSize);
ret = 0;
if (gProtocol == IPPROTO_TCP)
{
rc = recv(
sockobj->s,
buffobj->buf,
buffobj->buflen,
0
);
}
else
{
rc = recvfrom(
sockobj->s,
buffobj->buf,
buffobj->buflen,
0,
(SOCKADDR *)&buffobj->addr,
&buffobj->addrlen
);
}
if (rc == SOCKET_ERROR)
{
if (WSAGetLastError() != WSAEWOULDBLOCK)
{
// Socket connection has failed, close the socket
fprintf(stderr, "recv(from) failed: %d\n", WSAGetLastError());
closesocket(sockobj->s);
sockobj->s = INVALID_SOCKET;
ret = -1;
}
else
{
ret = WSAEWOULDBLOCK;
}
FreeBufferObj(buffobj);
}
else if (rc == 0)
{
// Graceful close
if (gProtocol == IPPROTO_TCP)
{
FreeBufferObj(buffobj);
}
else
{
// Always enqueue the zero byte datagrams for UDP
buffobj->buflen = 0;
EnqueueBufferObj(sockobj, buffobj, FALSE);
}
// Set the socket object to closing
sockobj->closing = TRUE;
if (sockobj->pending == NULL)
{
// If no sends are pending, close the socket for good
closesocket(sockobj->s);
sockobj->s = INVALID_SOCKET;
ret = -1;
}
else
{
// Sends are pending, just return
ret = 0;
}
}
else
{
// Read data, updated the counters and enqueue the buffer for sending
InterlockedExchangeAdd(&gBytesRead, rc);
InterlockedExchangeAdd(&gBytesReadLast, rc);
buffobj->buflen = rc;
EnqueueBufferObj(sockobj, buffobj, FALSE);
ret = 1;
}
return ret;
}
//
// Function: SendPendingData
//
// Description:
// Send any data pending on the socket. This routine goes through the
// queued buffer objects within the socket object and attempts to
// send all of them. If the send fails with WSAEWOULDBLOCK, put the
// remaining buffer back in the queue (at the front) for sending
// later when select indicates sends can be made. This routine returns
// -1 to indicate that an error has occured on the socket and the
// calling routine should remove the socket structure; otherwise, zero
// is returned.
//
int SendPendingData(SOCKET_OBJ *sock)
{
BUFFER_OBJ *bufobj=NULL;
BOOL breakouter;
int nleft,
idx,
ret,
rc;
// Attempt to dequeue all the buffer objects on the socket
ret = 0;
while (bufobj = DequeueBufferObj(sock))
{
if (gProtocol == IPPROTO_TCP)
{
breakouter = FALSE;
nleft = bufobj->buflen;
idx = 0;
// In the event not all the data was sent we need to increment
// through the buffer. This only needs to be done for stream
// sockets since UDP is datagram and its all or nothing for that.
while (nleft)
{
rc = send(
sock->s,
&bufobj->buf[idx],
nleft,
0
);
if (rc == SOCKET_ERROR)
{
if (WSAGetLastError() == WSAEWOULDBLOCK)
{
BUFFER_OBJ *newbuf=NULL;
// Copy the unsent portion of the buffer and put it back
// at the head of the send queue
newbuf = GetBufferObj(nleft);
memcpy(newbuf->buf, &bufobj->buf[idx], nleft);
EnqueueBufferObj(sock, newbuf, TRUE);
ret = WSAEWOULDBLOCK;
}
else
{
// The connection was broken, indicate failure
ret = -1;
}
breakouter = TRUE;
break;
}
else
{
// Update the stastics and increment the send counters
InterlockedExchangeAdd(&gBytesSent, rc);
InterlockedExchangeAdd(&gBytesSentLast, rc);
nleft -= rc;
idx += 0;
}
}
FreeBufferObj(bufobj);
if (breakouter)
break;
}
else
{
rc = sendto(
sock->s,
bufobj->buf,
bufobj->buflen,
0,
(SOCKADDR *)&bufobj->addr,
bufobj->addrlen
);
if (rc == SOCKET_ERROR)
{
if (WSAGetLastError() == WSAEWOULDBLOCK)
{
// If the send couldn't be made, put the buffer
// back at the head of the queue
EnqueueBufferObj(sock, bufobj, TRUE);
ret = WSAEWOULDBLOCK;
}
else
{
// Socket error occured so indicate the error to the caller
ret = -1;
}
break;
}
else
{
FreeBufferObj(bufobj);
}
}
}
// If no more sends are pending and the socket was marked as closing (the
// receiver got zero bytes) then close the socket and indicate to the caller
// to remove the socket structure.
if ((sock->pending == NULL) && (sock->closing))
{
closesocket(sock->s);
sock->s = INVALID_SOCKET;
ret = -1;
}
return ret;
}
//
// Function: PrintStatistics
//
// Description:
// Print the send/recv statistics for the server
//
void PrintStatistics()
{
ULONG bps, tick, elapsed;
tick = GetTickCount();
elapsed = (tick - gStartTime) / 1000;
if (elapsed == 0)
return;
printf("\n");
bps = gBytesSent / elapsed;
printf("Average BPS sent: %lu [%lu]\n", bps, gBytesSent);
bps = gBytesRead / elapsed;
printf("Average BPS read: %lu [%lu]\n", bps, gBytesRead);
elapsed = (tick - gStartTimeLast) / 1000;
if (elapsed == 0)
return;
bps = gBytesSentLast / elapsed;
printf("Current BPS sent: %lu\n", bps);
bps = gBytesReadLast / elapsed;
printf("Current BPS read: %lu\n", bps);
printf("Total Connections : %lu\n", gTotalConnections);
printf("Current Connections: %lu\n", gCurrentConnections);
InterlockedExchange(&gBytesSentLast, 0);
InterlockedExchange(&gBytesReadLast, 0);
gStartTimeLast = tick;
}
//
// Function: HandleIo
//
// Description:
// This function handles the IO on a socket. First, the events signaled
// on the socket are enuemrated, then the appropriate handler routine
// for the event is called.
//
int HandleIo(THREAD_OBJ *thread, SOCKET_OBJ *sock)
{
WSANETWORKEVENTS nevents;
int rc;
// Enumerate the events
rc = WSAEnumNetworkEvents(
sock->s,
sock->event,
&nevents
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "HandleIo: WSAEnumNetworkEvents failed: %d\n", WSAGetLastError());
return SOCKET_ERROR;
}
if (nevents.lNetworkEvents & FD_READ)
{
// Check for read error
if (nevents.iErrorCode[FD_READ_BIT] == 0)
{
rc = ReceivePendingData(sock);
if (rc == -1)
{
RemoveSocketObj(thread, sock);
FreeSocketObj(sock);
return SOCKET_ERROR;
}
rc = SendPendingData(sock);
if (rc == -1)
{
RemoveSocketObj(thread, sock);
FreeSocketObj(sock);
return SOCKET_ERROR;
}
}
else
{
fprintf(stderr, "HandleIo: FD_READ error %d\n",
nevents.iErrorCode[FD_READ_BIT]);
RemoveSocketObj(thread, sock);
FreeSocketObj(sock);
return SOCKET_ERROR;
}
}
if (nevents.lNetworkEvents & FD_WRITE)
{
// Check for write error
if (nevents.iErrorCode[FD_WRITE_BIT] == 0)
{
rc = SendPendingData(sock);
if (rc == -1)
{
RemoveSocketObj(thread, sock);
FreeSocketObj(sock);
return SOCKET_ERROR;
}
}
else
{
fprintf(stderr, "HandleIo: FD_WRITE error %d\n",
nevents.iErrorCode[FD_WRITE_BIT]);
return SOCKET_ERROR;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -