📄 tcpnet.cpp
字号:
// CTCPReceivedData - Buffer for storing received data and information about it
// CTCPErrorInfo - Buffer for storing error information
// CTCPNet - Class, which implements TCP
// Implementation file
//
// (c) Lev Naumov, CAMEL Laboratory
// E-mail: camellab@mail.ru
// For more information see http://camel.ifmo.ru or
// http://www.codeproject.com/internet/ctp.asp
/////////////////////////////////////////////////////////////////////////////
#include "stdafx.h"
#include "NetBasic.h"
#include "TCPNet.h"
// Default maximum segment length
#define MAX_SEGLEN 8388608
char* CTCPErrorInfo::GetTimeStamp(char* s)
{
CHAR date[30]="";
CHAR time[30]="";
_timeb timebuffer;
// Get date/time
_strdate(date);
_ftime(&timebuffer);
_strtime(time);
// Create string
sprintf(s,"%8s %8s.%03d",date,time,timebuffer.millitm);
return s;
}
CTCPNet::CTCPNet(NetReceiver* receiver,unsigned short port,unsigned short maxthreads)
{
// Tuning
m_DefReceiver=receiver;
m_uPort=port;
// Initialization
m_bSuspended=true;
m_pBuffer=new char[MAX_SEGLEN];
m_Deliveries.clear();
m_pDeliverTrds.clear();
m_uMaxDeliverers=maxthreads;
m_uBusy=0;
CreateSockets();
m_bKill=false;
// Start threads and store handles
m_pServerTrd=AfxBeginThread(TCPServerFunction,this);
m_pDelManTrd=AfxBeginThread(TCPDelManFunction,this);
}
CTCPNet::~CTCPNet()
{
// Terminate threads
m_bKill=true;
while ((!m_pDeliverTrds.empty()) || m_pServerTrd || m_pDelManTrd) {
Sleep(50);
// Followong check is needed, because accept() is a blocking call
if (m_pServerTrd) {
TerminateThread(m_pServerTrd,0);
m_pServerTrd=NULL;
}
}
// Free resources
closesocket(m_SendSocket);
closesocket(m_RecvSocket);
FreeDeliveries();
delete[] m_pBuffer;
}
bool CTCPNet::CreateSockets()
{
// Sockets creation
m_RecvSocket=socket(AF_INET, SOCK_STREAM, 0);
if (m_SendSocket==INVALID_SOCKET || m_RecvSocket==INVALID_SOCKET) {
m_Deliveries.push_back(Delivery(new CTCPErrorInfo(0,WSAGetLastError(),IPAddr())));
m_bSuspended=true;
return false;
}
// Binding local address with receiving socket
m_Local.sin_family=AF_INET;
m_Local.sin_port=htons(m_uPort);
m_Local.sin_addr.s_addr=htonl(INADDR_ANY);
if (bind(m_RecvSocket,(SOCKADDR*)&m_Local,sizeof(m_Local))==SOCKET_ERROR)
{
m_Deliveries.push_back(Delivery(new CTCPErrorInfo(1,WSAGetLastError(),IPAddr())));
m_bSuspended=true;
return false;
}
// Tuning recieving socket to listen
if (listen(m_RecvSocket,50)==SOCKET_ERROR)
{
m_Deliveries.push_back(Delivery(new CTCPErrorInfo(1,WSAGetLastError(),IPAddr())));
m_bSuspended=true;
return false;
}
return true;
}
bool CTCPNet::Send(SmartBuffer& sb, unsigned __int16 command, IPAddr to, unsigned __int8 /*options*/, bool /*storeiffail*/)
{
if (m_bSuspended) return false;
m_SendSocket=socket(AF_INET, SOCK_STREAM, 0);
SOCKADDR_IN recip;
recip.sin_family=AF_INET;
recip.sin_port=htons(m_uPort);
recip.sin_addr.s_addr=to.Solid;
bool res=(connect(m_SendSocket,(sockaddr*)&recip,sizeof(recip))!=SOCKET_ERROR);
res&=(send(m_SendSocket,(const char*)sb.GetBufferBegin(),sb.GetBufferSize(),0)!=SOCKET_ERROR);
if (sb.GetAutoDel()) delete &sb;
closesocket(m_SendSocket);
return (res);
}
void CTCPNet::FreeDeliveries()
{
for (DeliveriesList::iterator it=m_Deliveries.begin();it!=m_Deliveries.end();it++) {
if (it->data) {
if (it->type==DeliveryType::ReceivedData) delete (CTCPReceivedData*)it->data; else
if (it->type==DeliveryType::ErrorInfo) delete (CTCPErrorInfo*)it->data; else
delete it->data;
}
}
m_Deliveries.clear();
}
unsigned int TCPServerFunction(void* pNet)
{
CTCPNet* net=(CTCPNet*)pNet;
// Necessary variables
SOCKADDR_IN sender;
SOCKET client;
int sendersize=NULL;
for(;;) {
// Wait while suspended
while (net->GetSuspended()) {
// Does killing needed
if (net->m_bKill) {
net->m_pServerTrd=NULL;
AfxEndThread(0,TRUE);
}
// Sleep a little bit
Sleep(50);
}
sendersize=sizeof(sender);
if ((client=accept(net->m_RecvSocket,(sockaddr*)&sender,&sendersize))!=INVALID_SOCKET) {
// Receive data
int ret=recv(client,net->m_pBuffer,MAX_SEGLEN,0);
if (ret==SOCKET_ERROR) {
// Error while receiving
net->m_Deliveries.push_back(CTCPNet::Delivery(new CTCPErrorInfo(3,WSAGetLastError(),IPAddr(sender.sin_addr.S_un.S_addr))));
} else {
net->m_Deliveries.push_back(CTCPNet::Delivery(new CTCPReceivedData(ret,sender.sin_addr.S_un.S_addr,net->m_pBuffer)));
}
}
Sleep(1);
// Does killing needed
if (net->m_bKill) {
net->m_pServerTrd=NULL;
AfxEndThread(0,TRUE);
}
}
}
unsigned int TCPDelManFunction(void* pNet)
{
CTCPNet* net=(CTCPNet*)pNet;
CTCPNet::Delivery del;
// Lock for delivery threads based on critical section
CSingleLock lock(&net->m_CriticalSection);
for(;;) {
lock.Lock();
// Does additional delivery threads needed
if (!net->m_Deliveries.empty() && net->m_pDeliverTrds.size()<net->m_uMaxDeliverers && net->m_pDeliverTrds.size()==net->m_uBusy) {
net->m_pDeliverTrds.push_back(AfxBeginThread(TCPDeliverFunction,pNet));
}
lock.Unlock();
// Kill server
if (net->m_bKill) {
net->m_pDelManTrd=NULL;
AfxEndThread(net->m_Deliveries.size(),TRUE);
}
Sleep(10);
}
}
unsigned int TCPDeliverFunction(void* pNet)
{
CTCPNet* net=(CTCPNet*)pNet;
CTCPNet::Delivery del;
bool bNothing=true; // Shows if no deliveries were planned
DWORD lastdel=GetTickCount();
// Lock for delivery threads based on critical section
CSingleLock lock(&net->m_CriticalSection);
for(;;) {
bNothing=true;
while (lock.Lock(), !net->m_Deliveries.empty()) {
// Get delivery
del=net->m_Deliveries.front();
net->m_Deliveries.pop_front();
lock.Unlock();
// Deliver delivery
switch (del.type) {
case CTCPNet::DeliveryType::ReceivedData:
net->m_uBusy++;
lastdel=GetTickCount();
net->GetDefaultReceiver()->OnReceive(del.data);
delete (CTCPReceivedData*)del.data;
net->m_uBusy--;
break;
case CTCPNet::DeliveryType::ErrorInfo:
net->m_uBusy++;
lastdel=GetTickCount();
net->GetDefaultReceiver()->OnError(del.data);
delete (CTCPErrorInfo*)del.data;
net->m_uBusy--;
break;
default:
delete del.data;
}
// Thread is working now
bNothing=false;
// If killing needed then do it after unlocking
if (net->m_bKill) break;
}
lock.Unlock();
// Does killing needed (because of request or because of sponging)
if (net->m_bKill || GetTickCount()-lastdel>20000) {
net->m_pDeliverTrds.erase(find(net->m_pDeliverTrds.begin(),net->m_pDeliverTrds.end(),AfxGetThread()));
AfxEndThread(net->m_Deliveries.size(),TRUE);
}
// Sleep a little bit
if (bNothing) {
Sleep(50);
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -