⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 spclient.cpp

📁 mysee网络直播源代码Mysee Lite是Mysee独立研发的网络视频流媒体播放系统。在应有了P2P技术和一系列先进流媒体技术之后
💻 CPP
📖 第 1 页 / 共 2 页
字号:
/*
*  Openmysee
*
*  This program is free software; you can redistribute it and/or modify
*  it under the terms of the GNU General Public License as published by
*  the Free Software Foundation; either version 2 of the License, or
*  (at your option) any later version.
*
*  This program is distributed in the hope that it will be useful,
*  but WITHOUT ANY WARRANTY; without even the implied warranty of
*  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
*  GNU General Public License for more details.
*
*  You should have received a copy of the GNU General Public License
*  along with this program; if not, write to the Free Software
*  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
*
*/
#include "StdAfx.h"
#include "SPClient.h"
#include "CaptureServer.h"
#include "MD5.h"

SPClient::SPClient(CaptureServer* cServer, NormalAddress address, BufferMgr* bufferMgr, LogMgr* log) : m_freeList(8, 1) {
	cs = cServer;
	m_bufferMgr = bufferMgr;
	logFile = log;
	addr = address;
	isRunning = TRUE;
	isLogin = FALSE;
	m_socket = INVALID_SOCKET;
	sendPointer = NULL;
	recvPointer = NULL;
	recvOff = 0;
	errStr[0] = 0;

	lastSentBlockID = 0;
	memset(&header, 0, sizeof(header));
	readData = 0;
 	
	DWORD threadID;
	hThread = CreateThread(
		NULL, 0, (LPTHREAD_START_ROUTINE)(SPClient::RunReceiver), this, 0, &threadID);
}

SPClient::~SPClient() {
	isRunning = FALSE;
	Disconnect();
	if(hThread) {
		// 如果一段时间内不能正常停止,就强行终止线程
		DWORD ret = WaitForSingleObject(hThread, 5000);
		if(ret == WAIT_TIMEOUT) {
			TerminateThread(hThread, 0);
		}
		CloseHandle(hThread);
		hThread = NULL;
	}
}

BOOL SPClient::SendRegister() {
	UINT8 nameSize = cs->cfgData.chnlStr.size();
	if(nameSize >= 0xff)
		return FALSE;

	TCPPacket* packet;
	if(!SendBegin(packet, CS2SP_REGISTER))
		return FALSE;

	//2.write channel name
	CopyMoveDes(sendPointer, &nameSize, sizeof(nameSize));
	CopyMoveDes(sendPointer, cs->cfgData.chnlStr.data(), nameSize);

	//3.write userinfo.
	CopyMoveDes(sendPointer, &cs->cfgData.userID, sizeof(UINT));
	CopyMoveDes(sendPointer, cs->cfgData.password.data(), MD5_LEN);

	//4. ratio ,maxblocksize
	UINT maxblockSize = BLOCK_SIZE;
	CopyMoveDes(sendPointer, &maxblockSize, sizeof(maxblockSize));

	float ratio = cs->GetSpeedInKBPS();
	CopyMoveDes(sendPointer, &ratio, sizeof(ratio));
	
	//5. write video & audio
 	TVMEDIATYPESECTION videoTV, audioTV;
	PBYTE videoData = NULL;
	PBYTE audioData = NULL;
	do{
		delete [] videoData;
		delete [] audioData;
		Sleep(100);
		if(!isRunning) {
			m_freeList.Release(packet);
			return FALSE;
		}
	}
	while(!cs->GetFormatData(videoTV, videoData, FALSE) || !cs->GetFormatData(audioTV, audioData, TRUE) );

	// 检查媒体类型数据是否正确,关键是是否为空
	if(videoTV.cbFormat <= 0 && audioTV.cbFormat <= 0) {
		MessageBox(cs->parentWindow, "媒体数据为空!请重新启动采集端。", "错误", MB_OK|MB_ICONSTOP);
		return FALSE;
	}
		
	USHORT channelinfoLen = 2*sizeof(TVMEDIATYPESECTION)+videoTV.cbFormat + audioTV.cbFormat;
	CopyMoveDes(sendPointer, &channelinfoLen, sizeof(channelinfoLen));

	CopyMoveDes(sendPointer, &videoTV, sizeof(TVMEDIATYPESECTION));
	if(videoData)
		CopyMoveDes(sendPointer, videoData, videoTV.cbFormat);

	CopyMoveDes(sendPointer, &audioTV, sizeof(TVMEDIATYPESECTION));
	if(audioData)
		CopyMoveDes(sendPointer, audioData, audioTV.cbFormat);

	SendEnd(packet);
	return TRUE;
}

/**********发送消息的函数**********/
BOOL SPClient::SendBlock() {
	if(m_sendList.size() > 10)
		return TRUE;
	TCPPacket* packet;
	if(!SendBegin(packet, CS2SP_BLOCK))
		return FALSE;
    
	//2.write ID.
	CopyMoveDes(sendPointer, &lastSentBlockID, sizeof(lastSentBlockID));

	//4.write data
	UINT size = 0;
	if(!m_bufferMgr->GetBlock(lastSentBlockID, reinterpret_cast<PBYTE>(sendPointer+sizeof(size)), size)) {
		// 一定要释放这个packet
		m_freeList.Release(packet);
		return TRUE;
	}
	char* hashcode = "";
#ifdef DEBUG
	MD5 md5(reinterpret_cast<PBYTE>(sendPointer+sizeof(size)), size);
	hashcode = md5.hex_digest();
#endif

	// verify block
	char* startPos = sendPointer+sizeof(size);
	char* tempPos = startPos + 8;

	if(lastSentBlockID == 0) {
		if(*((UINT*)startPos + 1) != UINT_MAX)
			tempPos = startPos + *((UINT*)startPos + 1);
		else
			tempPos = startPos + size;
	}

	while(header.size < 1638400) {
		//写header信息
		if((UINT)readData < sizeof(header)) {
			if(startPos+size - tempPos < sizeof(header)-readData){
				memcpy((char*)&header + readData, tempPos, startPos+size - tempPos);
				readData += startPos+size - tempPos;
				break;
			}
			else {
				memcpy((char*)&header + readData, tempPos, sizeof(header)-readData);
				tempPos += sizeof(header)-readData;//头的位置tempPos
				readData = sizeof(header);

				//assert(header.size < 1638400);
				if(header.size >= 1638400) {
					logFile->StatusOut("Header size %d, blockID %d, offset %d", 
						header.size, lastSentBlockID, readData);
					break;
				}
			}
		}
		if(readData >= sizeof(header)) {
			if(startPos+size - tempPos < header.size - readData) {
				readData += startPos+size - tempPos;
				break;
			}
			else {
				tempPos += header.size - readData;
				readData = 0;
			}
		}
	}
	
    //3.write data size 
	CopyMoveDes(sendPointer, &size, sizeof(size));

	int firstKeySampleOffset;
	memcpy(&firstKeySampleOffset, packet->buf+sizeof(UINT)*3+sizeof(char), sizeof(firstKeySampleOffset));
	LONGLONG keySample;
	memcpy(&keySample, packet->buf+sizeof(UINT)*3+sizeof(char)+firstKeySampleOffset, sizeof(keySample));
	memcpy(&firstKeySampleOffset, packet->buf+sizeof(UINT)*3+sizeof(char), sizeof(firstKeySampleOffset));
	if(firstKeySampleOffset == 0)
		keySample = 0;
	char temp[64];
	_i64toa(keySample, temp, 10);
	logFile->StatusOut("Queue Block. ID:%d/%d, offset: %d, keysample: %s, hash %s", 
		lastSentBlockID, m_bufferMgr->GetMaxBlockID(), firstKeySampleOffset, temp, hashcode);
#ifdef DEBUG
	delete [] hashcode;
#endif

	sendPointer += size;

	SendEnd(packet);

	lastSentBlockID++;
	return TRUE;	
}

BOOL SPClient::SendUpdate(){
	TCPPacket* packet;
	if(!SendBegin(packet, CS2SP_UPDATE))
		return FALSE;

	float ratio = cs->GetSpeedInKBPS();
	CopyMoveDes(sendPointer, &ratio, sizeof(ratio));
	
	SendEnd(packet);
	return TRUE;
}

BOOL SPClient::SendMediaType() {
    MediaData mediaData;
    if(!m_bufferMgr->GetMediaData(lastSentBlockID, mediaData))
        return FALSE;

	TCPPacket* packet;
	if(!SendBegin(packet, CS2SP_MEDIA_TYPE))
		return FALSE;

    UINT size = mediaData.audioType.cbFormat + mediaData.videoType.cbFormat + 
        sizeof(mediaData.audioType) + sizeof(mediaData.videoType);
    CopyMoveDes(sendPointer, &size, sizeof(UINT));

    CopyMoveDes(sendPointer, &mediaData.videoType, sizeof(mediaData.videoType));
    CopyMoveDes(sendPointer, mediaData.videoData, mediaData.videoType.cbFormat);
    CopyMoveDes(sendPointer, &mediaData.audioType, sizeof(mediaData.audioType));
    CopyMoveDes(sendPointer, mediaData.audioData, mediaData.audioType.cbFormat);

	logFile->StatusOut("Queue Block media data. ID:%d/%d", lastSentBlockID, m_bufferMgr->GetMaxBlockID());
	
	SendEnd(packet);
	return TRUE;
}

CONNECT_RESULT SPClient::Connecting() {
	logFile->StatusOut("Connecting to %s:%d.", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));

	CONNECT_RESULT ret = CR_ERROR;

	// Create a TCP/IP socket that is bound to the server.
	// Microsoft Knowledge Base: WSA_FLAG_OVERLAPPED Is Needed for Non-Blocking Sockets
	// http://support.microsoft.com/default.aspx?scid=kb;EN-US;179942
	m_socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
	if(m_socket == INVALID_SOCKET) {
		logFile->StatusErr("Creating socket", WSAGetLastError());
		return ret;
	}

	// 不使用Nagle算法
	BOOL bNoDelay = TRUE;
	if(setsockopt(m_socket, SOL_SOCKET, TCP_NODELAY, (const char*)&bNoDelay, sizeof(bNoDelay)) == SOCKET_ERROR) {
		logFile->StatusErr("Setting UDP socket as TCP_NODELAY", WSAGetLastError());
		return ret;
	}

	// Set this socket as a Non-blocking socket.
	ULONG flag = 1;
	if(ioctlsocket(m_socket, FIONBIO, &flag) == SOCKET_ERROR) {
		logFile->StatusErr("Setting socket as non-blocking", WSAGetLastError());
		return ret;
	}

	// Connect to remote address
	if(WSAConnect(m_socket, (sockaddr*)&addr, sizeof(sockaddr), NULL, NULL, NULL, NULL) == SOCKET_ERROR) {
		if(WSAGetLastError() != WSAEWOULDBLOCK) {
			logFile->StatusErr("Connecting socket", WSAGetLastError());
			return ret;
		}
		else {
			ret = CR_WOULDBLOCK;
			logFile->StatusOut("%s:%d is blocking.", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
		}
	}
	else {
		ret = CR_CONNECTED;
		logFile->StatusOut("%s:%d is connected.", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
	}
	return ret;
}

void SPClient::Disconnect() {
	isLogin = FALSE;
	m_bufferMgr->StopSave();
	ClearTransferInfo();
	m_sendList.clear();
	if(m_socket != INVALID_SOCKET) {
		::TE_CloseSocket(m_socket, FALSE);
		logFile->StatusOut("SPClient: disconnected from %s:%d.", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
		m_socket = INVALID_SOCKET;
	}
}

BOOL SPClient::BaseRecv() {
	int ret = recv(m_socket, recvBuf+recvOff, P2P_BUF_SIZE-recvOff, 0);
	if(ret < 0) {
		DWORD lastError = ::WSAGetLastError();
		if (WSAEWOULDBLOCK != lastError) {
			logFile->StatusErr("Receiving data on TCP", lastError);
			return FALSE;
		}
		else
			return TRUE;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -