⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 spclient.cpp

📁 mysee网络直播源代码Mysee Lite是Mysee独立研发的网络视频流媒体播放系统。在应有了P2P技术和一系列先进流媒体技术之后
💻 CPP
📖 第 1 页 / 共 2 页
字号:
	}
	else if(0 == ret) {
		logFile->StatusOut("Connection has been disconnected gracefully.");
		return FALSE;
	}

	recvOff += ret;
	totalDownBytes += ret;

	BOOL retVal = FALSE;

	for(;;) {
		// because multiple msgs can be received at once.
		// keep call parseMsg() till !MSG_COMPLETE
		MSG_STATE ms = ParseMsg(); 

		switch(ms) {
		case MSG_COMPLETE:
			continue;
		case MSG_UNCOMPLETE:
			retVal = TRUE;
			break;
		case MSG_ERR_SIZE:
			sprintf(errStr, "错误的消息大小!");
			break;
		case MSG_ERR_TYPE: 
			sprintf(errStr, "错误的消息类型!");
			break;
			break;
		case MSG_REMOTE_ERR:
			// 错误信息已经在errStr中了
			break;
		default:
			sprintf(errStr, "未知错误类型!");
			break;
		}
		if(strlen(errStr) > 0)
			logFile->StatusOut("错误: %s", errStr);
		break;
	}
	return retVal;
}

MSG_STATE SPClient::ParseMsg() {
	// 如果过小,则不是正常的包
	if(recvOff < sizeof(int)+sizeof(BYTE))
		return MSG_UNCOMPLETE;

	// 把移动指针放到数据的起始地址
	recvPointer = recvBuf;

	// 读取消息大小
	UINT msgSize;
	CopyMoveSrc(&msgSize, recvPointer, sizeof(msgSize));

	// 读取消息类型
	UINT8 msgType;
	CopyMoveSrc(&msgType, recvPointer, sizeof(msgType));

	// msgSize是否正常
	if(msgSize > P2P_BUF_SIZE || msgSize < sizeof(int)+sizeof(BYTE))
		return MSG_ERR_SIZE;

	// 是否包含完成的消息
	if(recvOff < msgSize)
		return MSG_UNCOMPLETE;

	MSG_STATE ret = MSG_COMPLETE;
	switch(msgType) {
	case SP2CS_WELCOME:
		ret = OnWelcome();
		break;
	case SP2CS_MSG:
		ret = OnMsg();
		break;
	default:
		ret = MSG_ERR_TYPE;
		break;
	}

	// copy left data to start of recvBuf
	if(recvOff >= msgSize) {
		memcpy(recvBuf, recvBuf+msgSize, recvOff-msgSize);
		recvOff -= msgSize;
	}

	return ret;
}

//解析注册成功消息
MSG_STATE SPClient::OnWelcome() {
	UINT startBlockID;
	CopyMoveSrc(&startBlockID, recvPointer, sizeof(startBlockID));

	lastSentBlockID = startBlockID;
	logFile->StatusOut("Start Block %d.", lastSentBlockID);

	isLogin = TRUE;
	return MSG_COMPLETE;
}

//收到SP_MSG消息
MSG_STATE SPClient::OnMsg() {
	// 错误代码
	UINT16 errCode;
	CopyMoveSrc(&errCode, recvPointer, sizeof(errCode));

	// 是否需要断开连接
	bool shouldDisconnect;
	CopyMoveSrc(&shouldDisconnect, recvPointer, sizeof(shouldDisconnect));

	// 根据错误代码处理
	switch(errCode) {
	case ERR_PROTOCOL_FORMAT:
		sprintf(errStr, "协议错误");
		break;
	case ERR_AUTHORIZATION:
		sprintf(errStr, "验证错误");
		break;
	case ERR_INTERNAL:
		sprintf(errStr, "未知错误");
		break;
	default:
		shouldDisconnect = true;
	}
	if(shouldDisconnect) {
		return MSG_REMOTE_ERR;
	}

	return MSG_COMPLETE;
}

BOOL SPClient::SendPackets() {
	if(m_sendList.empty())
		return TRUE;

	TCPPacket* packet = m_sendList.front();
	m_sendList.pop_front();

	int ret = send(m_socket, packet->buf+packet->sent, min(packet->size-packet->sent, 40000), 0);
	if(SOCKET_ERROR == ret) {
		DWORD lastError = ::WSAGetLastError();
		if (WSAEWOULDBLOCK == lastError) {
			m_sendList.push_front(packet);
		}
		else {
			m_sendList.push_front(packet);
			logFile->StatusErr("Sending data on TCP", lastError);
			return FALSE;
		}
	}
	else {
		totalUpBytes += ret;
		logFile->StatusOut("Send data %d of %d", ret, packet->size-packet->sent);
		assert(ret <= packet->size-packet->sent);
		if(ret == packet->size-packet->sent) {
			if(packet->size > 16384)
				logFile->StatusOut("Sent Block %d.", *packet->GetBlockID());
			// 已经发送完毕,释放Buffer
			m_freeList.Release(packet);
		}
		else {	// 尚未发送完毕,下次继续发送
			packet->sent += ret;
			m_sendList.push_front(packet);
			logFile->StatusOut("!!!!!!!!!!!");
		}
	}
	return TRUE;
}

void SPClient::RunReceiver(SPClient* client)
{
	timeval timeout; 
	DWORD lastSentUpdate = 0;

	fd_set read_set, write_set;

	DWORD lastManageTime=0, currTime=0;

	while(client->isRunning)
	{
		if(client->m_socket == INVALID_SOCKET)
		{
			if(!client->m_bufferMgr->ShouldConnect()) {
				Sleep(500);
				continue;
			}
			CONNECT_RESULT ret = client->Connecting();
			if(ret == CR_WOULDBLOCK) {
				// 等待8秒钟,看能否连接上
				FD_ZERO(&write_set);
				FD_SET(client->m_socket, &write_set);
				timeout.tv_sec = 8;
				timeout.tv_usec = 0;
				int s = select(0, NULL, &write_set, NULL, &timeout);
				if(s > 0)
					ret = CR_CONNECTED;
			}
			if(ret == CR_CONNECTED) {
				client->lastSentBlockID = 0;
				client->readData = 0;
				// 可以开始存储数据了
				client->m_bufferMgr->StartSave();
			}
			else {
				TE_CloseSocket(client->m_socket, FALSE);
				client->m_socket = INVALID_SOCKET;
				client->logFile->StatusOut("无法连接SP,请检查网络。");
				//MessageBox(NULL, "无法连接SP,请检查网络。然后重新打开本程序。", "错误", MB_OK|MB_ICONINFORMATION);
                for(int i = 0; i < client->cs->cfgData.reconnectSecond; ++i) {
                    if(!client->isRunning)
                        break;
				    Sleep(1000);
                }
				continue;
			}
		}
		if(client->SendRegister()) {
			// 开始接收发送数据
			while(client->isRunning) {
				// 获取当前时间
				currTime = timeGetTime();

				timeout.tv_sec = 0;
				timeout.tv_usec = 20000;
				FD_ZERO(&read_set);
				FD_SET(client->m_socket, &read_set);
				int s = select(0, &read_set, 0, NULL, &timeout);
				if(s > 0) {
                    if(FD_ISSET(client->m_socket, &read_set)) {
                        // 接收信息
                        if(!client->BaseRecv()) {
                            for(int i = 0; i < client->cs->cfgData.reconnectSecond; ++i) {
                                if(!client->isRunning)
                                    break;
                                Sleep(1000);
                            }
                            break;
						}
					}
				}
 				else if(s == SOCKET_ERROR) {
					client->logFile->StatusErr("selecting", WSAGetLastError());
					Sleep(1); // prevent dead loop
				}
				if(currTime-lastManageTime >= 10000) {
					lastManageTime = currTime;
					// 生成传输信息并打印
					client->GenerateTransferInfo(TRUE);
					client->logFile->StatusOut("Cur: (%.2f/%.2f)KB/s. Avg: (%.2f/%.2f)KB/s. Total: %.2f/%.2fMB.", 
						client->currDownSpeed/1024, client->currUpSpeed/1024, 
						client->avgDownSpeed/1024, client->avgUpSpeed/1024, 
						client->totalDownBytes/1024.f/1024.f, client->totalUpBytes/1024.f/1024.f);
				}

				if(!client->m_bufferMgr->CheckRecvingSample()) {
					client->logFile->StatusOut("No Sample Any More!");
					//MessageBox(client->cs->parentWindow, "CaptureServer 20秒钟没有接收到Sample了!", "Sample中断", MB_OK|MB_ICONSTOP);
					if(client->cs->parentWindow)
						SendMessage(client->cs->parentWindow, WM_NOMORESAMPLE, 0, 0);
				}

				// 发送数据
                if(client->isLogin && client->isRunning) {
                    // 目前SP尚不支持这个协议
                    //client->SendMediaType();
					client->SendBlock();
                }
				if(currTime-lastSentUpdate > 10000) {
					if(!client->SendUpdate())
						break;
					lastSentUpdate = currTime;
				}
				if(!client->SendPackets())
					break;
			}
		}
		client->Disconnect();
	}
}

BOOL SPClient::SendBegin(TCPPacket*& packet, UINT8 msgType) {
	packet= m_freeList.Allocate();
	if(!packet)
		return FALSE;

	// 先留着消息大小不写,到最后再写
	sendPointer = packet->buf+sizeof(UINT);
	CopyMoveDes(sendPointer, &msgType, sizeof(msgType));

	return TRUE;
}

void SPClient::SendEnd(TCPPacket*& packet) {
	// 消息的大小就是移动的指针减去初始的指针
	packet->size = sendPointer-packet->buf;
	packet->sent = 0;
	memcpy(packet->buf, &packet->size, sizeof(packet->size));

	m_sendList.push_back(packet);
}

void SPClient::CopyMoveSrc(void * des, const char *& src, size_t size) {
	assert(des && src);
	if(!des || !src)
		return;
	memcpy(des, src, size);
	src += size;
}

void SPClient::CopyMoveDes(char *& des, const void * src, size_t size) {
	assert(des && src);
	if(!des || !src)
		return;
	memcpy(des, src, size);
	des += size;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -