📄 spclient.cpp
字号:
}
else if(0 == ret) {
logFile->StatusOut("Connection has been disconnected gracefully.");
return FALSE;
}
recvOff += ret;
totalDownBytes += ret;
BOOL retVal = FALSE;
for(;;) {
// because multiple msgs can be received at once.
// keep call parseMsg() till !MSG_COMPLETE
MSG_STATE ms = ParseMsg();
switch(ms) {
case MSG_COMPLETE:
continue;
case MSG_UNCOMPLETE:
retVal = TRUE;
break;
case MSG_ERR_SIZE:
sprintf(errStr, "错误的消息大小!");
break;
case MSG_ERR_TYPE:
sprintf(errStr, "错误的消息类型!");
break;
break;
case MSG_REMOTE_ERR:
// 错误信息已经在errStr中了
break;
default:
sprintf(errStr, "未知错误类型!");
break;
}
if(strlen(errStr) > 0)
logFile->StatusOut("错误: %s", errStr);
break;
}
return retVal;
}
MSG_STATE SPClient::ParseMsg() {
// 如果过小,则不是正常的包
if(recvOff < sizeof(int)+sizeof(BYTE))
return MSG_UNCOMPLETE;
// 把移动指针放到数据的起始地址
recvPointer = recvBuf;
// 读取消息大小
UINT msgSize;
CopyMoveSrc(&msgSize, recvPointer, sizeof(msgSize));
// 读取消息类型
UINT8 msgType;
CopyMoveSrc(&msgType, recvPointer, sizeof(msgType));
// msgSize是否正常
if(msgSize > P2P_BUF_SIZE || msgSize < sizeof(int)+sizeof(BYTE))
return MSG_ERR_SIZE;
// 是否包含完成的消息
if(recvOff < msgSize)
return MSG_UNCOMPLETE;
MSG_STATE ret = MSG_COMPLETE;
switch(msgType) {
case SP2CS_WELCOME:
ret = OnWelcome();
break;
case SP2CS_MSG:
ret = OnMsg();
break;
default:
ret = MSG_ERR_TYPE;
break;
}
// copy left data to start of recvBuf
if(recvOff >= msgSize) {
memcpy(recvBuf, recvBuf+msgSize, recvOff-msgSize);
recvOff -= msgSize;
}
return ret;
}
//解析注册成功消息
MSG_STATE SPClient::OnWelcome() {
UINT startBlockID;
CopyMoveSrc(&startBlockID, recvPointer, sizeof(startBlockID));
lastSentBlockID = startBlockID;
logFile->StatusOut("Start Block %d.", lastSentBlockID);
isLogin = TRUE;
return MSG_COMPLETE;
}
//收到SP_MSG消息
MSG_STATE SPClient::OnMsg() {
// 错误代码
UINT16 errCode;
CopyMoveSrc(&errCode, recvPointer, sizeof(errCode));
// 是否需要断开连接
bool shouldDisconnect;
CopyMoveSrc(&shouldDisconnect, recvPointer, sizeof(shouldDisconnect));
// 根据错误代码处理
switch(errCode) {
case ERR_PROTOCOL_FORMAT:
sprintf(errStr, "协议错误");
break;
case ERR_AUTHORIZATION:
sprintf(errStr, "验证错误");
break;
case ERR_INTERNAL:
sprintf(errStr, "未知错误");
break;
default:
shouldDisconnect = true;
}
if(shouldDisconnect) {
return MSG_REMOTE_ERR;
}
return MSG_COMPLETE;
}
BOOL SPClient::SendPackets() {
if(m_sendList.empty())
return TRUE;
TCPPacket* packet = m_sendList.front();
m_sendList.pop_front();
int ret = send(m_socket, packet->buf+packet->sent, min(packet->size-packet->sent, 40000), 0);
if(SOCKET_ERROR == ret) {
DWORD lastError = ::WSAGetLastError();
if (WSAEWOULDBLOCK == lastError) {
m_sendList.push_front(packet);
}
else {
m_sendList.push_front(packet);
logFile->StatusErr("Sending data on TCP", lastError);
return FALSE;
}
}
else {
totalUpBytes += ret;
logFile->StatusOut("Send data %d of %d", ret, packet->size-packet->sent);
assert(ret <= packet->size-packet->sent);
if(ret == packet->size-packet->sent) {
if(packet->size > 16384)
logFile->StatusOut("Sent Block %d.", *packet->GetBlockID());
// 已经发送完毕,释放Buffer
m_freeList.Release(packet);
}
else { // 尚未发送完毕,下次继续发送
packet->sent += ret;
m_sendList.push_front(packet);
logFile->StatusOut("!!!!!!!!!!!");
}
}
return TRUE;
}
void SPClient::RunReceiver(SPClient* client)
{
timeval timeout;
DWORD lastSentUpdate = 0;
fd_set read_set, write_set;
DWORD lastManageTime=0, currTime=0;
while(client->isRunning)
{
if(client->m_socket == INVALID_SOCKET)
{
if(!client->m_bufferMgr->ShouldConnect()) {
Sleep(500);
continue;
}
CONNECT_RESULT ret = client->Connecting();
if(ret == CR_WOULDBLOCK) {
// 等待8秒钟,看能否连接上
FD_ZERO(&write_set);
FD_SET(client->m_socket, &write_set);
timeout.tv_sec = 8;
timeout.tv_usec = 0;
int s = select(0, NULL, &write_set, NULL, &timeout);
if(s > 0)
ret = CR_CONNECTED;
}
if(ret == CR_CONNECTED) {
client->lastSentBlockID = 0;
client->readData = 0;
// 可以开始存储数据了
client->m_bufferMgr->StartSave();
}
else {
TE_CloseSocket(client->m_socket, FALSE);
client->m_socket = INVALID_SOCKET;
client->logFile->StatusOut("无法连接SP,请检查网络。");
//MessageBox(NULL, "无法连接SP,请检查网络。然后重新打开本程序。", "错误", MB_OK|MB_ICONINFORMATION);
for(int i = 0; i < client->cs->cfgData.reconnectSecond; ++i) {
if(!client->isRunning)
break;
Sleep(1000);
}
continue;
}
}
if(client->SendRegister()) {
// 开始接收发送数据
while(client->isRunning) {
// 获取当前时间
currTime = timeGetTime();
timeout.tv_sec = 0;
timeout.tv_usec = 20000;
FD_ZERO(&read_set);
FD_SET(client->m_socket, &read_set);
int s = select(0, &read_set, 0, NULL, &timeout);
if(s > 0) {
if(FD_ISSET(client->m_socket, &read_set)) {
// 接收信息
if(!client->BaseRecv()) {
for(int i = 0; i < client->cs->cfgData.reconnectSecond; ++i) {
if(!client->isRunning)
break;
Sleep(1000);
}
break;
}
}
}
else if(s == SOCKET_ERROR) {
client->logFile->StatusErr("selecting", WSAGetLastError());
Sleep(1); // prevent dead loop
}
if(currTime-lastManageTime >= 10000) {
lastManageTime = currTime;
// 生成传输信息并打印
client->GenerateTransferInfo(TRUE);
client->logFile->StatusOut("Cur: (%.2f/%.2f)KB/s. Avg: (%.2f/%.2f)KB/s. Total: %.2f/%.2fMB.",
client->currDownSpeed/1024, client->currUpSpeed/1024,
client->avgDownSpeed/1024, client->avgUpSpeed/1024,
client->totalDownBytes/1024.f/1024.f, client->totalUpBytes/1024.f/1024.f);
}
if(!client->m_bufferMgr->CheckRecvingSample()) {
client->logFile->StatusOut("No Sample Any More!");
//MessageBox(client->cs->parentWindow, "CaptureServer 20秒钟没有接收到Sample了!", "Sample中断", MB_OK|MB_ICONSTOP);
if(client->cs->parentWindow)
SendMessage(client->cs->parentWindow, WM_NOMORESAMPLE, 0, 0);
}
// 发送数据
if(client->isLogin && client->isRunning) {
// 目前SP尚不支持这个协议
//client->SendMediaType();
client->SendBlock();
}
if(currTime-lastSentUpdate > 10000) {
if(!client->SendUpdate())
break;
lastSentUpdate = currTime;
}
if(!client->SendPackets())
break;
}
}
client->Disconnect();
}
}
BOOL SPClient::SendBegin(TCPPacket*& packet, UINT8 msgType) {
packet= m_freeList.Allocate();
if(!packet)
return FALSE;
// 先留着消息大小不写,到最后再写
sendPointer = packet->buf+sizeof(UINT);
CopyMoveDes(sendPointer, &msgType, sizeof(msgType));
return TRUE;
}
void SPClient::SendEnd(TCPPacket*& packet) {
// 消息的大小就是移动的指针减去初始的指针
packet->size = sendPointer-packet->buf;
packet->sent = 0;
memcpy(packet->buf, &packet->size, sizeof(packet->size));
m_sendList.push_back(packet);
}
void SPClient::CopyMoveSrc(void * des, const char *& src, size_t size) {
assert(des && src);
if(!des || !src)
return;
memcpy(des, src, size);
src += size;
}
void SPClient::CopyMoveDes(char *& des, const void * src, size_t size) {
assert(des && src);
if(!des || !src)
return;
memcpy(des, src, size);
des += size;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -