⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tsnew.c

📁 mysee网络直播源代码Mysee Lite是Mysee独立研发的网络视频流媒体播放系统。在应有了P2P技术和一系列先进流媒体技术之后
💻 C
📖 第 1 页 / 共 5 页
字号:
		
		numnp = findNPPeers (pc, p, cur, num, &buf, buffer1);
	} else
	{
		numnp = findNPPeers (pc, p, cur, num, &buf, NULL);
	}
	*psize = numnp;		// set # of NP
	*(int *)buffer = buf - buffer;
	if (sendMessage(conn,buffer,&UDPCLIENT) < 0)
	{
//		closure_NP (p);
		PDEBUG("send msg err\n");
		return -1;
	}
	PDEBUG ("find %d NP and %d CP\n", numnp, numcp);
	return numnp+numcp;
}


/* 查询Peer信息, 使用findCPPeer寻找合适的CP, 使用findNPPeers寻找合适的NP. 
   NP寻找时, 找到结果后按照networks来排序,保证在同一个网络中的排在前面. */
int process_NP2TS_NEED_PEERS (struct Session *p, struct TSMessage *m)
{
	char *buf = m->buffer;
	int needcp;

	needcp = *(unsigned char *)buf;
	buf += sizeof (char);
	if(p->u.p.cur == NULL)
	{
		PDEBUG("no current in NPInfo\n");
		return -1;
	}
	p->u.p.cur->current = *(unsigned int *)buf;
	buf += sizeof (int);
	p->u.p.p.layer= *(unsigned char *)buf;
	buf += sizeof (char);
	if (buf - m->buffer + AUTH_HEADER > m->len)
	{
		PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.\n", m->len);
		closure_NP (p);
		return -1;
	}

	return process_NEED_PEERS_real (p, NULL, needcp, p->u.p.cur->current, p->u.p.p.layer);

}

int findCPPeers (unsigned long host, char *md5, char **buffer)
{
//	float minband = -1;
	struct Session* p = NULL, *choice1 = NULL, *choice2 = NULL;
	struct NormalAddress *addr1;
	struct NormalAddress *addr2;
	int i;
	// We should start with a random address

	p = CPTRACKER.head + rand() % (CPTRACKER.maxid+1);

	//use pure random address instead of comparing the band

PDEBUG("In findCPPeers.\n");
	int minpriority = -1;

	for( i=0; i <= CPTRACKER.maxid && host != 0; i++, p++)
	{
		PDEBUG("begin to call findcppeers.\n");
		if( p-CPTRACKER.head > CPTRACKER.maxid )	// round back to head
		{	
			
			p = CPTRACKER.head;
		}

		if(p->u.cp.maxConn == 1 || p->socket == 0)
		{
			PDEBUG("Invalid CP.\n");
			continue;
		}

		
		int priority = findcppeers(host, (void*)p);

		//如果不是对应的CP,查找下一个
		if(priority == -1)
		{
			PDEBUG("not found.\n");
			continue;
		}
		
		PDEBUG("found: host : %d  servicetype: %s.\n", host, p->u.cp.servicetype);
		if(minpriority == -1)
			minpriority = priority;

		if(priority == 1)// priority - 1~n
		{
			if(choice1 == NULL)
			{
				choice1 = p;
				minpriority = -1;
			}					
			else
			{
				choice2 = p;
				break;
			}
		}
		else
		{
			if(priority < minpriority)
			{
				if(choice1 == NULL)
				{
					choice1 = p;
					minpriority = -1;
				}
				else
				{
					choice2 = p;
					break;
				}
			}
		}
	}

	if(i > CPTRACKER.maxid)//find nothing via findcppeers, return CP directly
	{
		PDEBUG("begin to find sequencely.\n");
		for( i=0; i <= CPTRACKER.maxid; i++, p++)
		{
			if( p-CPTRACKER.head > CPTRACKER.maxid )	// round back to head
			{
				p = CPTRACKER.head;
			}

			if(p->u.cp.maxConn == 1 || p->socket == 0)
				continue;

			// 始终保证choice1比choice2先获得值,如果两个都有了就break
			if(choice1 == NULL)
			{
				choice1 = p;
			}
			else if(p->host == choice1->host)// 避免返回两个相同的CP
				continue;
			else if(choice2 == NULL)
			{
				choice2 = p;
			}
			else
				break;
		}
	}

	int found_cp_count = 0;
	if (choice1)
	{
		addr1 = (struct NormalAddress *)*buffer;
		addr1->sin_family = PF_INET;
		addr1->sin_port = htons (choice1->npport);
		addr1->sin_addr.s_addr = htonl (choice1->host);
		*buffer += sizeof (*addr1);
		
		found_cp_count ++;		// one CP is found
	}	
	
	if(choice2)
	{
		addr2 = (struct NormalAddress *)*buffer;
		addr2->sin_family = PF_INET;
		addr2->sin_port = htons (choice2->npport);
		addr2->sin_addr.s_addr = htonl (choice2->host);
		*buffer += sizeof (*addr2);
		
		found_cp_count ++;
	}
	
	return found_cp_count;// 返回最终找到的CP的个数
}

#ifndef SORT_NET
int findNPPeers (struct Channel *pc, struct Session *me, int playing, int num, char **buffer, char *buffer1)
{
	struct Edge *pedge = NULL;
	struct Session *ps;
	struct P2PAddress addr;
	int j, k;
	unsigned int randstart;
	struct sockaddr_in client;

	if (pc == NULL || pc->numclient <= 0) return 0;
	if (me->cachepeer != NULL && me->cachepeer->u.p.header != NULL && me->type == TYPE_NP)
	{
		for (pedge=me->cachepeer->u.p.header; pedge && pedge->head != pc; pedge=pedge->enext);
	}
	if (pedge == NULL)
	{
		randstart = rand () % pc->numclient;
		for (pedge=pc->PeerHead; pedge && randstart > 0; pedge=pedge->cnext, randstart--);
	}

	for (j=0,k=0; j<num && k<pc->numclient; pedge=pedge->cnext)
	{
		k++;
		if (pedge == NULL) pedge = pc->PeerHead;
		if ((ps = pedge->me) == me) continue;	// exclude myself
		if (CurTimeSec > pedge->me->last_access+60) continue;	// don't bother it too often
		if (ps->u.p.p.isMaxIn == 0 && ((me->type == TYPE_CP) || playing == 0xffffffff || check_valid (pedge, playing)))
		{
			if (ps->intra == 0xffffffff || ps->host == me->host)	// 0xffffffff means ps is on the public network, ps->host == me->host means in the same private network
			{
				memcpy (*buffer, &(ps->u.p.p), sizeof (struct CorePeerInfo));
				addr = (*buffer) + sizeof (struct CorePeerInfo);
				addr->outerIP.sin_port = htons (ps->port);
				addr->outerIP.sin_addr.s_addr = htonl (ps->host);
				addr->subnetIP.sin_port = htons (ps->npport);
				addr->subnetIP.sin_addr.s_addr = htonl (ps->intra);
				addr->outerIP.sin_family = PF_INET;
				addr->subnetIP.sin_family = PF_INET;
				*buffer = (struct char *)(addr + 1);
				j++;
			} else if (buffer1)
			{
				memset ((char *)&client, 0, sizeof (client));
				client.sin_port = htons (ps->port);
				client.sin_family = AF_INET;
				client.sin_addr.s_addr = htonl (ps->host);
				if (sendMessage(ps->socket,buffer1, &client) < 0)
				{
					closure_NP (ps);
				}
				PDEBUG("Send ConnecTo to %s\n", inet_ntoa(client.sin_addr));
			}
		}
	}
	if (pedge != NULL) me->cachepeer = pedge->me;
	return j;
}
#else
int findNPPeers (struct Channel *pc, struct Session *me, int playing, int num, char **buffer, char *buffer1)
{
	int i, j, k, m, mnetnum;
	unsigned int randstart;
	struct Session *result[MAX_PEER];
	struct Session *ps;
	struct sockaddr_in client;
	struct Edge *pedge;
	struct P2PAddress *addr;

	if (pc == NULL || pc->numclient <= 0) return 0;
	Net = me->net;
	for (j=0,m=0; m<MAX_NET_NUM; m++) { mnetnum = (Net + m) % MAX_NET_NUM;
	k = 0;
	if (pc->nclient_net[mnetnum] <= 0) continue;
	pedge = NULL;
	if (me->cachepeer[mnetnum] != NULL && me->cachepeer[mnetnum]->u.p.header != NULL && me->type == TYPE_NP)
	{
		for (pedge=me->cachepeer[mnetnum]->u.p.header; pedge && pedge->head != pc; pedge=pedge->enext);
	}
	if (pedge == NULL)
	{
		randstart = rand () % pc->nclient_net[mnetnum];
		for (pedge=pc->PeerHead[mnetnum]; pedge && randstart > 0; pedge=pedge->cnext, randstart--);
	}
	for (; j<MAX_PEER && k < pc->nclient_net[mnetnum]; pedge=pedge->cnext)
	{
		k++;
		if (pedge == NULL) pedge = pc->PeerHead[mnetnum];
		if ((ps = pedge->me) == me) continue;
		if (CurTimeSec > pedge->me->last_access+60) continue;
		if (ps->u.p.p.isMaxIn == 0 && ((me->type == TYPE_CP) || playing == 0xffffffff || check_valid (pedge, playing)))
		{
			if (ps->intra == 0xffffffff || ps->host == me->host)
			{
				result[j] = ps;
				j++;
			} else if (buffer1)
			{
				memset ((char *)&client, 0, sizeof (client));
				client.sin_port = htons (ps->port);
				client.sin_family = AF_INET;
				client.sin_addr.s_addr = htonl (ps->host);
				if (sendMessage(ps->socket,buffer1, &client) < 0)
				{
					closure_NP (ps);
				}
				PDEBUG("Send ConnecTo to %s\n", inet_ntoa(client.sin_addr));
			}
		}
	}
	if (pedge != NULL) me->cachepeer[mnetnum] = pedge->me;
	}
	if (j > 1)
		qsort (result, j, sizeof (struct Session *), compareSession);
	if (num > 0 && j > num) j = num;
	if (me->type == TYPE_NP)
		PDEBUG ("NP %d find %d NP:", me-NPTRACKER.head, j);
	else
		PDEBUG ("CP %d find %d CP:", me-CPTRACKER.head, j);
	for (i=0; i<j; i++)
	{
		ps = result[i];
		PDEBUG ("%d\t", ps-NPTRACKER.head);
		memcpy (*buffer, &(ps->u.p.p), sizeof (struct CorePeerInfo));
		addr = (struct P2PAddress *)((*buffer) + sizeof (struct CorePeerInfo));
		addr->outerIP.sin_port = htons (ps->port);
		addr->outerIP.sin_addr.s_addr = htonl (ps->host);
		addr->subnetIP.sin_port = htons (ps->npport);
		addr->subnetIP.sin_addr.s_addr = htonl (ps->intra);
		addr->outerIP.sin_family = PF_INET;
		addr->subnetIP.sin_family = PF_INET;
		*buffer = (char *)(addr + 1);
	}
	PDEBUG ("\n");
	return j;
}
#endif

int process_CP2TS_UPDATE (struct Session *p, struct TSMessage *m)
{
	char *buf = m->buffer;
	p->u.cp.resnum = *(int *)buf;
	buf += sizeof (int);
	p->u.cp.connnum = *(unsigned short *)buf;
	buf += sizeof (unsigned short);
	p->u.cp.band = *(float *)buf;
	buf += sizeof(float);
	p->u.cp.maxConn = *(unsigned char *)buf;
	buf += sizeof(char);
	if (buf - m->buffer + AUTH_HEADER > m->len)
	{
		PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.\n", m->len);
		closure_CP (p);
		return -1;
	}
	if (p->u.cp.type == CT_GENERAL &&
		(GCPCHOICE == NULL || p->u.cp.resnum < GCPCHOICE->u.cp.resnum))
			GCPCHOICE = p;
	if (p->u.cp.resnum < 0) p->u.cp.resnum = 0;
	if (p->u.cp.band < 0) p->u.cp.band = 0.001;
	return 0;
}

int process_CP2TS_NEED_PEERS (struct Session *p, struct TSMessage *m)
{
	return process_NEED_PEERS_real (p, m->buffer/*md5*/, 1/*needcp*/, 0/*cur*/, 0/*layer*/);
}

/* 登录, CP向TS登录, 按照来源IP地址和所报告的npport进行hash,
   如果距离上次发送CP2TS_REGISTER消息的时间小于SILENCE_TIME, 则直接返回,否则发送WELCOME消息. */
int process_CP2TS_REGISTER (struct Message *m)
{
	char md5[MD5_LEN+1];
	unsigned int host, cur = 0;
	unsigned short port, npport;
	int id, userID;
	char *buf;
	struct Session *p;

	buf = m->buffer;
	userID = *(int *)buf;
	buf += sizeof (int);
	memcpy (md5, buf, MD5_LEN);
	md5[MD5_LEN] = 0;
	buf += MD5_LEN;

#ifdef HAVE_MYSQL
	if (authUser (userID, md5, local_mysql, NULL) == 0)
	{
	}
#endif

	npport = ntohs(*(unsigned short *)buf);
	buf += sizeof (short);
	host = ntohl (UDPCLIENT.sin_addr.s_addr);
	port = ntohs (UDPCLIENT.sin_port);
	id = hash_cp (host, npport);
	for (p=CPTRACKER.hash[id]; p; p=p->hnext)
		if (p->host == host && p->port == port && p->npport == npport)
			break;
	if (!p)
	{
		if (CPTRACKER.cur >= CPTRACKER.max || CPTRACKER.hash[0] == 0)
		{
			PDEBUG("CP reg err, wrong cp index in array.\n");
			SEND_NPMSG(CPTRACKER.sock[CurrentSock],TS2CP_MSG,ERR_INTERNAL,1,&UDPCLIENT);
			return -1;
		}
		p = CPTRACKER.hash[0];
		CPTRACKER.hash[0] = p->hnext;
		p->hnext = CPTRACKER.hash[id];
		CPTRACKER.hash[id] = p;
		p->socket = CPTRACKER.sock[CurrentSock];
		p->type = TYPE_CP;
		p->port = port;
		p->npport = npport;
		p->host = host;


#ifdef SORT_NET
{
		struct networks *pnetworks = getnetwork (host, NETBLOCKS, maxNet);
		if (pnetworks) p->net = pnetworks->net;
		else p->net = 0;
}
#endif
		p->time_sec = cur;

		p->u.cp.userid = userID;
		p->u.cp.type = *(unsigned char *)buf;
		buf += sizeof (char);

		if (p->u.cp.type == CT_EDGE)
		{
			p->u.cp.numHeads = *(unsigned short *)buf;
			buf += sizeof (short);
			if (((char *)m) +m->len - buf <= sizeof (p->u.cp.parameter))
				memcpy (p->u.cp.parameter, buf, (char *)m+m->len-buf);
			else
				memcpy (p->u.cp.parameter, buf,sizeof (p->u.cp.parameter));
			if (p->u.cp.numHeads > sizeof(p->u.cp.parameter)/2)
				p->u.cp.numHeads = sizeof (p->u.cp.parameter)/2;
		} else if (p->u.cp.type == CT_SPECIFIED_RES)
		{
			memcpy (p->u.cp.parameter, buf, MD5_LEN);
			buf += MD5_LEN;
		}

		p->auth = random ();

		init_CP (p);
	} else if (CurTimeSec < p->last_access + SILENCE_TIME)
		return 0;
	if (buf - m->buffer + NORMAL_HEADER > m->len)
	{
		PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.\n", m->len);
		closure_CP (p);
		return -1;
	}

	PDEBUG("CP register. socket=%d listnum=%d auth=%d.\n", p->socket, p-CPTRACKER.head, p->auth);

	UDPMsg.len = 12;
	UDPMsg.type=TS2CP_WELCOME;
	UDPMsg.authcode1 = p-CPTRACKER.head;
	UDPMsg.authcode2 = p->auth;

	if (sendMessage(p->socket,(char *)&UDPMsg, &UDPCLIENT) < 0)
	{
		closure_CP (p);
		return -1;
	}
	return 0;
}

int compareInter (const void *a, const void *b)
{
	struct Interval *p = (struct Interval *) a;
	struct Interval *q = (struct Interval *) b;
	if ((p->start >= q->start && p->start+p->len <= q->start + q->len)
	|| (q->start >= p->start && q->start+q->len <= p->start + p->len))
		return 0;
	return (p->start - q->start);
}

// delete用于从原有的Interval当中去掉新的.
int delete_interval (struct Interval *head, int total, struct Interval *_new, int num)
{
	int i,j,k;
	struct Interval tmp[MAX_INTERVAL*2];
	for (i=0,

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -