📄 spclient.cpp
字号:
/*
* Openmysee
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
*/
#include "StdAfx.h"
#include "SPClient.h"
#include "CaptureServer.h"
#include "MD5.h"
SPClient::SPClient(CaptureServer* cServer, NormalAddress address, BufferMgr* bufferMgr, LogMgr* log) : m_freeList(8, 1) {
cs = cServer;
m_bufferMgr = bufferMgr;
logFile = log;
addr = address;
isRunning = TRUE;
isLogin = FALSE;
m_socket = INVALID_SOCKET;
sendPointer = NULL;
recvPointer = NULL;
recvOff = 0;
errStr[0] = 0;
lastSentBlockID = 0;
memset(&header, 0, sizeof(header));
readData = 0;
DWORD threadID;
hThread = CreateThread(
NULL, 0, (LPTHREAD_START_ROUTINE)(SPClient::RunReceiver), this, 0, &threadID);
}
SPClient::~SPClient() {
isRunning = FALSE;
Disconnect();
if(hThread) {
// 如果一段时间内不能正常停止,就强行终止线程
DWORD ret = WaitForSingleObject(hThread, 5000);
if(ret == WAIT_TIMEOUT) {
TerminateThread(hThread, 0);
}
CloseHandle(hThread);
hThread = NULL;
}
}
BOOL SPClient::SendRegister() {
UINT8 nameSize = cs->cfgData.chnlStr.size();
if(nameSize >= 0xff)
return FALSE;
TCPPacket* packet;
if(!SendBegin(packet, CS2SP_REGISTER))
return FALSE;
//2.write channel name
CopyMoveDes(sendPointer, &nameSize, sizeof(nameSize));
CopyMoveDes(sendPointer, cs->cfgData.chnlStr.data(), nameSize);
//3.write userinfo.
CopyMoveDes(sendPointer, &cs->cfgData.userID, sizeof(UINT));
CopyMoveDes(sendPointer, cs->cfgData.password.data(), MD5_LEN);
//4. ratio ,maxblocksize
UINT maxblockSize = BLOCK_SIZE;
CopyMoveDes(sendPointer, &maxblockSize, sizeof(maxblockSize));
float ratio = cs->GetSpeedInKBPS();
CopyMoveDes(sendPointer, &ratio, sizeof(ratio));
//5. write video & audio
TVMEDIATYPESECTION videoTV, audioTV;
PBYTE videoData = NULL;
PBYTE audioData = NULL;
do{
delete [] videoData;
delete [] audioData;
Sleep(100);
if(!isRunning) {
m_freeList.Release(packet);
return FALSE;
}
}
while(!cs->GetFormatData(videoTV, videoData, FALSE) || !cs->GetFormatData(audioTV, audioData, TRUE) );
// 检查媒体类型数据是否正确,关键是是否为空
if(videoTV.cbFormat <= 0 && audioTV.cbFormat <= 0) {
MessageBox(cs->parentWindow, "媒体数据为空!请重新启动采集端。", "错误", MB_OK|MB_ICONSTOP);
return FALSE;
}
USHORT channelinfoLen = 2*sizeof(TVMEDIATYPESECTION)+videoTV.cbFormat + audioTV.cbFormat;
CopyMoveDes(sendPointer, &channelinfoLen, sizeof(channelinfoLen));
CopyMoveDes(sendPointer, &videoTV, sizeof(TVMEDIATYPESECTION));
if(videoData)
CopyMoveDes(sendPointer, videoData, videoTV.cbFormat);
CopyMoveDes(sendPointer, &audioTV, sizeof(TVMEDIATYPESECTION));
if(audioData)
CopyMoveDes(sendPointer, audioData, audioTV.cbFormat);
SendEnd(packet);
return TRUE;
}
/**********发送消息的函数**********/
BOOL SPClient::SendBlock() {
if(m_sendList.size() > 10)
return TRUE;
TCPPacket* packet;
if(!SendBegin(packet, CS2SP_BLOCK))
return FALSE;
//2.write ID.
CopyMoveDes(sendPointer, &lastSentBlockID, sizeof(lastSentBlockID));
//4.write data
UINT size = 0;
if(!m_bufferMgr->GetBlock(lastSentBlockID, reinterpret_cast<PBYTE>(sendPointer+sizeof(size)), size)) {
// 一定要释放这个packet
m_freeList.Release(packet);
return TRUE;
}
char* hashcode = "";
#ifdef DEBUG
MD5 md5(reinterpret_cast<PBYTE>(sendPointer+sizeof(size)), size);
hashcode = md5.hex_digest();
#endif
// verify block
char* startPos = sendPointer+sizeof(size);
char* tempPos = startPos + 8;
if(lastSentBlockID == 0) {
if(*((UINT*)startPos + 1) != UINT_MAX)
tempPos = startPos + *((UINT*)startPos + 1);
else
tempPos = startPos + size;
}
while(header.size < 1638400) {
//写header信息
if((UINT)readData < sizeof(header)) {
if(startPos+size - tempPos < sizeof(header)-readData){
memcpy((char*)&header + readData, tempPos, startPos+size - tempPos);
readData += startPos+size - tempPos;
break;
}
else {
memcpy((char*)&header + readData, tempPos, sizeof(header)-readData);
tempPos += sizeof(header)-readData;//头的位置tempPos
readData = sizeof(header);
//assert(header.size < 1638400);
if(header.size >= 1638400) {
logFile->StatusOut("Header size %d, blockID %d, offset %d",
header.size, lastSentBlockID, readData);
break;
}
}
}
if(readData >= sizeof(header)) {
if(startPos+size - tempPos < header.size - readData) {
readData += startPos+size - tempPos;
break;
}
else {
tempPos += header.size - readData;
readData = 0;
}
}
}
//3.write data size
CopyMoveDes(sendPointer, &size, sizeof(size));
int firstKeySampleOffset;
memcpy(&firstKeySampleOffset, packet->buf+sizeof(UINT)*3+sizeof(char), sizeof(firstKeySampleOffset));
LONGLONG keySample;
memcpy(&keySample, packet->buf+sizeof(UINT)*3+sizeof(char)+firstKeySampleOffset, sizeof(keySample));
memcpy(&firstKeySampleOffset, packet->buf+sizeof(UINT)*3+sizeof(char), sizeof(firstKeySampleOffset));
if(firstKeySampleOffset == 0)
keySample = 0;
char temp[64];
_i64toa(keySample, temp, 10);
logFile->StatusOut("Queue Block. ID:%d/%d, offset: %d, keysample: %s, hash %s",
lastSentBlockID, m_bufferMgr->GetMaxBlockID(), firstKeySampleOffset, temp, hashcode);
#ifdef DEBUG
delete [] hashcode;
#endif
sendPointer += size;
SendEnd(packet);
lastSentBlockID++;
return TRUE;
}
BOOL SPClient::SendUpdate(){
TCPPacket* packet;
if(!SendBegin(packet, CS2SP_UPDATE))
return FALSE;
float ratio = cs->GetSpeedInKBPS();
CopyMoveDes(sendPointer, &ratio, sizeof(ratio));
SendEnd(packet);
return TRUE;
}
BOOL SPClient::SendMediaType() {
MediaData mediaData;
if(!m_bufferMgr->GetMediaData(lastSentBlockID, mediaData))
return FALSE;
TCPPacket* packet;
if(!SendBegin(packet, CS2SP_MEDIA_TYPE))
return FALSE;
UINT size = mediaData.audioType.cbFormat + mediaData.videoType.cbFormat +
sizeof(mediaData.audioType) + sizeof(mediaData.videoType);
CopyMoveDes(sendPointer, &size, sizeof(UINT));
CopyMoveDes(sendPointer, &mediaData.videoType, sizeof(mediaData.videoType));
CopyMoveDes(sendPointer, mediaData.videoData, mediaData.videoType.cbFormat);
CopyMoveDes(sendPointer, &mediaData.audioType, sizeof(mediaData.audioType));
CopyMoveDes(sendPointer, mediaData.audioData, mediaData.audioType.cbFormat);
logFile->StatusOut("Queue Block media data. ID:%d/%d", lastSentBlockID, m_bufferMgr->GetMaxBlockID());
SendEnd(packet);
return TRUE;
}
CONNECT_RESULT SPClient::Connecting() {
logFile->StatusOut("Connecting to %s:%d.", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
CONNECT_RESULT ret = CR_ERROR;
// Create a TCP/IP socket that is bound to the server.
// Microsoft Knowledge Base: WSA_FLAG_OVERLAPPED Is Needed for Non-Blocking Sockets
// http://support.microsoft.com/default.aspx?scid=kb;EN-US;179942
m_socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
if(m_socket == INVALID_SOCKET) {
logFile->StatusErr("Creating socket", WSAGetLastError());
return ret;
}
// 不使用Nagle算法
BOOL bNoDelay = TRUE;
if(setsockopt(m_socket, SOL_SOCKET, TCP_NODELAY, (const char*)&bNoDelay, sizeof(bNoDelay)) == SOCKET_ERROR) {
logFile->StatusErr("Setting UDP socket as TCP_NODELAY", WSAGetLastError());
return ret;
}
// Set this socket as a Non-blocking socket.
ULONG flag = 1;
if(ioctlsocket(m_socket, FIONBIO, &flag) == SOCKET_ERROR) {
logFile->StatusErr("Setting socket as non-blocking", WSAGetLastError());
return ret;
}
// Connect to remote address
if(WSAConnect(m_socket, (sockaddr*)&addr, sizeof(sockaddr), NULL, NULL, NULL, NULL) == SOCKET_ERROR) {
if(WSAGetLastError() != WSAEWOULDBLOCK) {
logFile->StatusErr("Connecting socket", WSAGetLastError());
return ret;
}
else {
ret = CR_WOULDBLOCK;
logFile->StatusOut("%s:%d is blocking.", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
}
}
else {
ret = CR_CONNECTED;
logFile->StatusOut("%s:%d is connected.", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
}
return ret;
}
void SPClient::Disconnect() {
isLogin = FALSE;
m_bufferMgr->StopSave();
ClearTransferInfo();
m_sendList.clear();
if(m_socket != INVALID_SOCKET) {
::TE_CloseSocket(m_socket, FALSE);
logFile->StatusOut("SPClient: disconnected from %s:%d.", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
m_socket = INVALID_SOCKET;
}
}
BOOL SPClient::BaseRecv() {
int ret = recv(m_socket, recvBuf+recvOff, P2P_BUF_SIZE-recvOff, 0);
if(ret < 0) {
DWORD lastError = ::WSAGetLastError();
if (WSAEWOULDBLOCK != lastError) {
logFile->StatusErr("Receiving data on TCP", lastError);
return FALSE;
}
else
return TRUE;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -