⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tsnew.c

📁 mysee网络直播源代码Mysee Lite是Mysee独立研发的网络视频流媒体播放系统。在应有了P2P技术和一系列先进流媒体技术之后
💻 C
📖 第 1 页 / 共 5 页
字号:
			querynum = *(int*)m->buffer;
			if(querynum > 100)
				break; 
			if(querynum > 0)
			{
				buf += sizeof (int);
				*(unsigned char *)buf = TS2RM_STAT_RESPONSE;
				buf += sizeof (char);
				psize = (int*)buf;
				buf += sizeof (int);
				total = 0;
				
				p = m->buffer + sizeof(int);
				for(i = 0; i < querynum; ++i)
				{
					PDEBUG("query %s. \n", p);
					total += getChannelInfo(p, &buf);
					p += MD5_LEN;
				}
				*(int *)buffer = buf - buffer;
				*psize = total;
				if(*psize > 0)
					sendMessage(RMTRACKER.sock[idsock], buffer, &UDPCLIENT);
			}
			break;
		default:
			break;
	}
#ifdef MEASUREMENT
	gettimeofday (&tm, NULL);
	PDEBUG ("msg type %d, len %d: %lld msec.\n", m->type, m->len, ((long long)tm.tv_sec) * 1000000l + tm.tv_usec - msec);
#endif
	return 0;
}

int closure_RM (struct Session *p)
{
	int i;
	if ((i = p - RMTRACKER.head) == RMTRACKER.maxid && i > 0)
	{
		for (i--; RMTRACKER.head[i].socket == 0 && i> 0; i--);
		RMTRACKER.maxid = i;
	}
	return 0;
}
#endif


//------------------------------------------------
//- Here begin the specific message process part -
//------------------------------------------------

// | login id(UINT32) | md5 password(MD5_LEN) |
// | version of client(float) | listening port(USHORT) |
// | size of local ip list(UINT8) | first ip addr(in_addr) |... |

/* NP向TS登录, 按照来源IP地址和所报告的npport进行hash, 如果距离上次
   发送NP2TS_LOGIN消息的时间小于SILENCE_TIME, 则直接返回,否则发送WELCOME消息. */
int process_NP2TS_LOGIN (struct Message *m)
{
	struct Session *p;
//	struct ChannelInfo tmpch;
	char md5[MD5_LEN+1];
	unsigned int host, myhost, intra;
	unsigned short port, npport;
	int i,id, userID;
	float clientVer;
	unsigned int num;
	char *buf;
	struct P2PAddress *addr;

	buf = m->buffer;
	userID = *(int *)buf;
	buf += sizeof (int);
	memcpy (md5, buf, MD5_LEN);
	md5[MD5_LEN] = 0;
	buf += MD5_LEN;
#ifdef HAVE_MYSQL
	if (authUser (userID, md5, local_mysql, NULL) == 0)
	{
		SEND_NPMSG(NPTRACKER.sock[CurrentSock],TS2NP_MSG,ERR_INTERNAL,1,&UDPCLIENT);
		closure_NP (p);
		return -1;
	}
#endif
	// check protocol version
	clientVer = *(float*)buf;
	buf += sizeof (float);
	if(clientVer < MIN_CLIENT_VERSION) {
		SEND_NPMSG(NPTRACKER.sock[CurrentSock], TS2NP_MSG, ERR_LOW_VERSION, 1, &UDPCLIENT);
		return -1;
	}
			
	npport = ntohs (*(unsigned short *)buf);
	buf += sizeof (short);

	host = ntohl (UDPCLIENT.sin_addr.s_addr);
	port = ntohs (UDPCLIENT.sin_port);
	// find client session in the session cluster (hash table)
	id = hash_np (host, npport);
	for (p=NPTRACKER.hash[id]; p; p=p->hnext)
		if (p->host == host && p->port == port && p->npport == npport)
			break;
	if (!p) // not found, allocate and add new session object
	{
		// error adding session: TS full or uninitialized
		if (NPTRACKER.cur >= NPTRACKER.max || NPTRACKER.hash[0] == 0)
		{
			SEND_NPMSG(NPTRACKER.sock[CurrentSock],TS2NP_MSG,ERR_INTERNAL,1,&UDPCLIENT);
			return -1;
		}
		// allocate session object from the freelist indicated by hash[0]
		// allocate the FIRST free object in hash[0] and insert it into HEAD of corresponding bucket
		p = NPTRACKER.hash[0];
		NPTRACKER.hash[0] = p->hnext;
		p->hnext = NPTRACKER.hash[id];
		NPTRACKER.hash[id] = p;
		// fill in the object
		p->socket = NPTRACKER.sock[CurrentSock];
		p->type = TYPE_NP;
		p->port = port;
		p->npport = npport;
		p->host = host;
		p->clientVer = clientVer;
#ifdef SORT_NET	
{
		struct networks *pnetworks = getnetwork (host, NETBLOCKS, maxNet);
		if (pnetworks) p->net = pnetworks->net;
		else p->net = 0;
}
#endif
		p->time_sec = CurTimeSec;

		// process the ip list
		num = *(unsigned char *)buf;	// size of local ip list
		buf += sizeof (char);
		intra = 0;		
		for (i=0; i<num; i++)
		{
			myhost = ntohl (*(unsigned int *)buf);
			buf += sizeof (int);
			if (intra == 0xffffffff)
				continue;		// Must use continue to modify buf to right place
			if ((myhost >> 16) == 0xc0a8)		// 0xc0a8 == 192.168
			{
				intra = myhost;
				if(host == intra)
					intra = 0xffffffff;
			}
			else if (myhost == host)
				intra = 0xffffffff;
		}
		p->intra = intra;
		p->auth = random ();
		
		//init statistics
		p->u.p.startBlock = -1;

		init_NP (p);
	} else if (CurTimeSec < p->last_access + SILENCE_TIME)
		return 0; // still active, do NOTHING
	if (buf - m->buffer + NORMAL_HEADER > m->len)
	{
		PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.\n", m->len);
		return -1;
	}
	// Time to send back welcome message to NP
	UDPMsg.type = TS2NP_WELCOME;
	UDPMsg.len = 12+sizeof(struct P2PAddress);
	UDPMsg.authcode1 = p-NPTRACKER.head;
	UDPMsg.authcode2 = p->auth;
	addr = (struct P2PAddress *)(UDPMsg.buffer);
	addr->outerIP.sin_family = addr->subnetIP.sin_family = AF_INET;
	addr->outerIP.sin_addr.s_addr = htonl (p->host);
	addr->subnetIP.sin_addr.s_addr = htonl (p->intra);
	addr->outerIP.sin_port = ntohs (port);
	addr->subnetIP.sin_port = ntohs (npport);

	++ts2npWelcomeCount;

	if (sendMessage(p->socket, (char *)&UDPMsg, &UDPCLIENT) < 0)
	{
		closure_NP (p);
		return -1;
	}
//	process_NEED_PEERS_real (p, NULL, 1, 0, 0);

	return 0;
}

// |---CHECK DIGITS(7 BYTEs)---|res count(UINT8)|RESOURCE MD5(MD5_LEN)|...|
int process_NP2TS_QUERY_RES (struct Session *p, struct TSMessage *m)
{
	int i;
	char *buf = m->buffer;
	char buffer[MAX_DATA];
	char *resultMsg, *prescount;
	struct Channel *pc;
	unsigned char num = *(unsigned char *)buf;		//# of queried resources
	buf += sizeof (char);
	resultMsg = buffer+sizeof(int);
	*(unsigned char *)resultMsg = TS2NP_RESINFO;
	resultMsg += sizeof (char);
	prescount = resultMsg;
	*(unsigned char *)prescount = 0;			// counter of resources
	for (i=0; i<num; i++)
	{
		if ((pc = findChannel (buf, MD5_LEN)) != NULL)
		{
			(*(unsigned char *)prescount) ++;
			memcpy (resultMsg, buf, MD5_LEN);
			resultMsg += MD5_LEN;
			*(unsigned short *)resultMsg = pc->numclient;
			resultMsg += sizeof (short);
		}
		buf += MD5_LEN;
	}
	if (buf - m->buffer + AUTH_HEADER > m->len)
	{
		PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.\n", m->len);
		closure_NP (p);
		return -1;
	}
	*(unsigned int *)buffer = resultMsg - buffer;		// The length of the message
	if (sendMessage(p->socket,buffer,&UDPCLIENT) < 0)
	{
//		closure_NP (p);
		return -1;
	}
	return 0;
}

// new version of REPORT with TransferInfo, ignoring TransferInfo
// | ---CHECK DIGITS(7 BYTEs)--- |
// | info of peer(CorePeerInfo) | refresh(bool) |
// | Interval count(UINT8) | first BlockInterval | ... |
// | transfer Info(TransferInfo) |
int process_NP2TS_REPORT (struct Session *p, struct TSMessage *m)
{
	unsigned char type;
	char *buf = m->buffer;
	
	// 1. extract CorePeerInfo
	memcpy (&(p->u.p.p), buf, sizeof (struct CorePeerInfo));
	buf += sizeof (struct CorePeerInfo);
	
	if (p->u.p.cur)
	{
		type = *(unsigned char *)buf;
		buf += sizeof (char);
		if (type)		// refresh==true
		{
			p->u.p.cur->numinter = *(unsigned char *)buf;
			PDEBUG ("Set %d intervals\n", p->u.p.cur->numinter);
			buf += sizeof (char);
			if (p->u.p.cur->numinter > 0 && p->u.p.cur->numinter < MAX_INTERVAL)
				memcpy (p->u.p.cur->inter, buf, p->u.p.cur->numinter*sizeof (struct Interval));
			else p->u.p.cur->numinter = 0;
		} else			// refresh==false	incremental update
		{
			type = *(unsigned char *)buf;	// # of newly added intervals
			buf += sizeof (char);
			PDEBUG ("Add %d intervals,", type);
			if (type > 0)		// merge & delete are not good name!
			{
				p->u.p.cur->numinter = merge (p->u.p.cur->inter, p->u.p.cur->numinter, (struct Interval *)buf, type);
				buf += type*sizeof(struct Interval);
			}
			type = *(unsigned char *)buf;
			buf += sizeof (char);
			if (type > 0)
			{
				p->u.p.cur->numinter = delete_interval (p->u.p.cur->inter, p->u.p.cur->numinter, (struct Interval *)buf, type);
				buf += type*sizeof(struct Interval);
			}
			PDEBUG ("del %d intervals, now is %d.\n", type, p->u.p.cur->numinter);
		}
	}
	// ignore TransferInfo, but still count the size for integrity check
	// --DELETED-- copy transferinfo of client
	// --DELETED-- memcpy (&(p->u.p.t), buf, sizeof (struct TransferInfo));
	buf += sizeof(struct TransferInfo);
	if (buf - m->buffer + AUTH_HEADER > m->len)
	{
		PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.\n", m->len);
		closure_NP (p);
		return -1;
	}
	return 0;
}

int process_NP2TS_REPORT2 (struct Session *p, struct TSMessage *m)
{
	char *buf = m->buffer;

	// TODO: 
	// 	extract current playing block and count them
	memcpy (&(p->u.p.s), buf, sizeof (struct StatInfo));
	buf += sizeof (struct StatInfo);
	memcpy (&(p->u.p.t), buf, sizeof (struct TransferInfo));
	buf += sizeof (struct TransferInfo);

	// check if this is the first report and record starting block of NP
	if ( p->u.p.startBlock == -1 ) {
		p->u.p.startBlock = p->u.p.s.playingBlock;
	}

	if (buf - m->buffer + AUTH_HEADER > m->len)
	{
		PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.\n", m->len);
		closure_NP (p);
		return -1;
	}
	return 0;
}
// |---CHECK DIGITS(7 BYTEs)---|
// |resource list size(UINT8)|
// |RESOURCE MD5(MD5_LEN)|Interval count(UINT8)|first BlockInterval|...|
int process_NP2TS_RES_LIST (struct Session *p, struct TSMessage *m)
{
	unsigned int num, i;
	struct ChannelInfo c;
	char *buf = m->buffer;
//	unsigned char needcp;
	num = *(unsigned char *)buf;
	buf += sizeof (char);
	for (i=0; i<num; i++)
	{
		memcpy (c.md5, buf, MD5_LEN);
		c.md5[MD5_LEN] = 0;
		buf += MD5_LEN;
		c.numinter = *(unsigned char *)buf;
		buf += sizeof (char);
		if (c.numinter > MAX_INTERVAL) return -1;
		memcpy (c.inter, buf, c.numinter*sizeof(struct Interval));
		buf += sizeof(struct Interval) * c.numinter;
		addSession (p, &c, 1, 0);
	}
	if (buf - m->buffer + AUTH_HEADER > m->len)
	{
		PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.\n", m->len);
		closure_NP (p);
		return -1;
	}
	return 0;
}

// |---CHECK DIGITS(7 BYTEs)---|
// |RESOURCE MD5(MD5_LEN)|Interval count(UINT8)|first BlockInterval|...|
// |current block(UINT32)|need CP(bool)|
int process_NP2TS_REQ_RES (struct Session *p, struct TSMessage *m)
{
	struct ChannelInfo c;
	unsigned int cur;
	char *buf = m->buffer;
	unsigned char needcp;
	memcpy (c.md5, buf, MD5_LEN);
	c.md5[MD5_LEN] = 0;
	buf += MD5_LEN;
	c.numinter = *(unsigned char *)buf;
	buf += sizeof (char);
	if (c.numinter > MAX_INTERVAL) return -1;
	memcpy (c.inter, buf, c.numinter*sizeof(struct Interval));
	buf += sizeof(struct Interval) * c.numinter;
	cur = *(unsigned int *)buf;
	buf += sizeof (int);
	needcp = *(unsigned char *)buf;
	buf += sizeof (char);
	if (buf - m->buffer + AUTH_HEADER > m->len)
	{
		PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.\n", m->len);
		closure_NP (p);
		return -1;
	}
	addSession (p, &c, 1, cur);
	process_NEED_PEERS_real (p, c.md5, needcp, cur, 0);
	SEND_NPMSG(NPTRACKER.sock[CurrentSock],TS2NP_MSG,ERR_ADD_RES_OK,0,&UDPCLIENT);	
	return 0;
}
// NP has quit from one channel
// |---CHECK DIGITS(7 BYTEs)---|RESOURCE MD5(MD5_LEN)|
int process_NP2TS_DEL_RES (struct Session *p, struct TSMessage *m)
{
	struct Edge *pedge;
	struct Channel *pc;
	char *buf = m->buffer;
	if ((pc=findChannel (buf, MD5_LEN)) == NULL)
			return -1;
	buf += MD5_LEN;
	if (buf - m->buffer + AUTH_HEADER > m->len)
	{
		PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.\n", m->len);
		closure_NP (p);
		return -1;
	}
	for (pedge=p->u.p.header; pedge && pedge->head != pc; pedge=pedge->enext);
	if (pedge) delEdge (pedge);
	return 0;
}

int process_NEED_PEERS_real (struct Session *p, char *md5, int needcp, unsigned int cur, unsigned char layer)
{
	char buffer[MAX_DATA], buffer1[MAX_DATA]; /* buffer is used to hold TS2CP/NP_PEERS message, 
						     buffer1 is used to hold TS2NP_CONNECT_TO message */
	int num, numcp=0, numnp, conn = p->socket;
	char *buf, *psize;
	struct Channel *pc;
	struct P2PAddress *addr;

	PDEBUG ("NEED_PEERS type %d needcp %d cur %d\n", p->type, needcp, cur);
	buf = buffer+sizeof (int);
	if (p->type == TYPE_CP)
		*(unsigned char *)buf = TS2CP_PEERS;
	else
		*(unsigned char *)buf = TS2NP_PEERS;
	buf += sizeof (char);

	++ts2npPeersCount;

	if (md5)	// md5 of the channel
	{
		pc = findChannel (md5, MD5_LEN);
		if (p->type == TYPE_CP)
		{
			memcpy (buf, md5, MD5_LEN);
			buf += MD5_LEN;
		}
	} else if (p->u.p.cur)	// Session->NPInfo->CurrentEdge
		pc = p->u.p.cur->head;
	else
		pc = NULL;

	if (pc == NULL) 
	{
		PDEBUG("no channel\n");
		return -1;
	}

	num = MAX_PEER;
	if (needcp)
	{
		psize = buf;
		buf += sizeof (char);
		if (p->type == TYPE_CP)
			*(unsigned char *)psize = numcp = findCPPeers (0, pc->name, &buf);
		else
			//*(unsigned char *)psize = numcp = findCPPeers (ntohs (p->host & 0xffff0000), pc->name, &buf);
			*(unsigned char *)psize = numcp = findCPPeers (p->host, pc->name, &buf);
	} else
	{
		*(unsigned char *)buf = 0;
		buf += sizeof (char);
	}
	psize = buf;
	buf += sizeof (char);
	if (p->intra == 0xffffffff)		//NP is in the public network, ask NP to contact other NPs
	{
	// |len(INT32) | type(INT8)|target addr(P2PAddress)|connect for free(bool)|
		*(int *)buffer1 = sizeof(struct P2PAddress) + sizeof (int) + 2*sizeof (char);
		*(unsigned char *)(buffer1+sizeof (int)) = TS2NP_CONNECT_TO;
		addr = (struct P2PAddress *)(buffer1+sizeof(int)+sizeof(char));
		addr->outerIP.sin_family = PF_INET;
		addr->subnetIP.sin_family = PF_INET;
		addr->outerIP.sin_port = htons (p->port);
		addr->outerIP.sin_addr.s_addr = htonl (p->host);
		addr->subnetIP.sin_port = htons (p->npport);
		addr->subnetIP.sin_addr.s_addr = htonl (p->intra);
		*(unsigned char *)(addr + 1) = 0;		// connect for free(bool)
		
		PDEBUG("Send ConnectTo %s\n", inet_ntoa(addr->outerIP.sin_addr));

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -