⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cpnew.c

📁 mysee网络直播源代码Mysee Lite是Mysee独立研发的网络视频流媒体播放系统。在应有了P2P技术和一系列先进流媒体技术之后
💻 C
📖 第 1 页 / 共 3 页
字号:
}

int process_P2P_REQUEST_real (struct Session *p, struct Channel *pc, int id)
{
	struct JobDes *pj = newJob ();
	char *buf, *buffer;
	struct LiveChannelInfo *pcinfo;
	int size, max, i;

	buffer = getJobBuffer (pj, &max);
	buf = buffer + sizeof (int);
	*(unsigned char *)buf = P2P_RESPONSE;
	buf += sizeof (char);
	if (p->npcp == TYPE_CP)
	{
		memcpy (buf, pc->channel_md5, MD5_LEN);
		buf += MD5_LEN;
	}
	pcinfo = pc->pcinfo;
	if (p->first == 0)
	{
		p->first ++;
		if (p->version >= MEDIATYPE_FIRST)
			sendIdMedia (p, pc, id, 0);
	}
	if (pcinfo->dataSource == NULL)
	{
		*(int *)buf = id;
		buf += sizeof (int);
		*(int *)buf = 0;
		buf += sizeof (int);
		if (isGCP || SCP_CHANNEL)
		{
			if ((i = newChannel (pc, pcinfo->SPLIST, pcinfo->numofsp, 0)) < 0)
				freeLiveChannel (pc, NULL);
		} else
		{
#ifdef HAVE_TS
			UDPMsg.type = CP2TS_NEED_PEERS;
			UDPMsg.len = 12+MD5_LEN;
			memcpy (UDPMsg.buffer, pc->channel_md5, MD5_LEN);
/* isSet whether TS has returned WELCOME message */
			if (isSet && sendMessage (TSSOCK, (char *)&UDPMsg, &TSADDR) < 0)
			{
				PDEBUG ("exit...\n");
				exit (1);
			}
#endif
		}
	} else if ((size=locate_by_id (pc, id, buf, max-32)) > 0)
	{
		buf += 2*sizeof (int) + size;
		if (p->version >= MEDIATYPE_FIRST && (i=isHit (pc, id)) >= 0)
			sendHitMedia (p, pc, i, id, 0);
		p->last_transferblock = CurrentTime;
	} else if (size == -2)
	{
		assert (0);
		PDEBUG ("Leave block %d to next round.\n", id);
		return -1;
	} else if (id >= 0)
	{
		*(int *)buf = id;
		buf += sizeof (int);
		*(int *)buf = 0;
		buf += sizeof (int);
		PINFO ("no block %d\n", id);
		send_P2P_PUSHLIST (pc, id);
	}
	*(int *)buffer = buf - buffer;
	setblockId(pj, id);
	writeDATAMessage(p,pc, pj);
//	PDEBUG ("Write block %d\n", id);
	return 0;
}

int process_P2PS (int listnum)
{
	struct Session *p = &(TRACKER[TYPE_P2PS].head[listnum]);
	struct Message *m = (struct Message *)(p->buf+p->start);

	tmpDownBytes += m->len;

	switch (m->type)
	{
		case P2P_HELLO:
			if (process_P2P_HELLO (p, m) == -2)
				return -2;
			break;
		case P2P_PUSHLIST:
			if (process_P2P_PUSHLIST (p, m) == -2)
				return -2;
			break;
		case P2P_REPORT:	/* At present no action */
			break;
		case P2P_MSG:
			break;
		case P2P_SPUPDATE:
			break;
		case P2P_RESPONSE:
			break;
		case P2P_NEAR_PEERS:
			break;
		case P2P_REQMEDIA:
			sendIdMedia (p, p->pc, *(int *)(m->buffer), 0);
			break;
		default:
			PDEBUG ("Unknown message format from client\n");
			Clientclosure (listnum, TYPE_P2PS);
			return -1;
	}
	return 0;
}

int closure_P2PS (int listnum)
{
	struct Session *p = &(TRACKER[TYPE_P2PS].head[listnum]);
//	struct Channel *pc = p->pc;
	struct Edge *pedge, *prevedge;
	PDEBUG ("CP disconnected from %d.%d.%d.%d:%d.\n", IPADDR (p->host), p->port);
	for (pedge=p->header; pedge; pedge=prevedge)
	{
		pedge->head->numclient --;
		prevedge=pedge->enext;
		delEdge (pedge);
	}
	FD_CLR(p->socket, &osocks);
	close (p->socket);
	FREE (p->buf);
	deleteAll (p);
	memset (p, 0, sizeof (struct Session));
	return 0;
}


int init_P2PC (int listnum)
{
	return 0;
}


int newChannel (struct Channel *pc, struct NormalAddress *client, int n, int flag)
{
	struct Session *p;
	int listnum, newconn = -1, max, sock_flag;
	struct Session *head;
	struct sockaddr_in addr;
	char *buf, buffer[MAX_DATA];

	if (pc == NULL) return -1;
	head = TRACKER[TYPE_P2PC].head;
	max = TRACKER[TYPE_P2PC].max;
	for (listnum = 0; listnum < max; listnum ++)
	{
		if(head[listnum].socket == 0)
		{
			if ((newconn = socket (PF_INET, SOCK_STREAM, 0)) < 0)
			{
				perror ("socket||gethostbyname");
				return -1;
			}
			memset (&addr, 0, sizeof (addr));
			addr.sin_port = client[0].sin_port;
			addr.sin_addr = client[0].sin_addr;
			addr.sin_family = AF_INET;
			if ((sock_flag = connect_nonb(newconn, &addr, sizeof (addr))) == -1)
			{
				close (newconn);
				return -1;
			}
			head[listnum].socket = newconn;
			head[listnum].type = TYPE_P2PC;
			head[listnum].flag = 1;
			head[listnum].sock_flag = sock_flag;
			head[listnum].buf = NEW();
			head[listnum].pc = pc;
			head[listnum].time_sec = CurrentTime;
			head[listnum].totalup = 0;
			head[listnum].last_transferblock = CurrentTime;
			FD_SET(newconn, &osocks);
			if (listnum > TRACKER[TYPE_P2PC].maxid)
				TRACKER[TYPE_P2PC].maxid = listnum;
			break;
		}
	}
	if (listnum >= max)
	{
		PDEBUG ("no space left for new incoming client.");
		close (newconn);
		return -1;
	}
	TRACKER[TYPE_P2PC].cur ++;
	(*(TRACKER[TYPE_P2PC].init)) (listnum);
	p = &(TRACKER[TYPE_P2PC].head[listnum]);
	pc->pcinfo->dataSource = p;
	pc->upsize = 0;
	pc->downsize = 0;

	PDEBUG("Connect to %s:%d. and send P2P_HELLO.\n", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));


	buf = buffer+sizeof (int);
	*(unsigned char *)buf = P2P_HELLO;
	buf += sizeof (char);
	memcpy (buf, pc->channel_md5, MD5_LEN);
	buf += MD5_LEN;
	*(unsigned char *)buf = 0;
	buf += sizeof (char);
	if (flag == TYPE_CP)
	{
		*(unsigned char *)buf = pc->pcinfo->numofsp;
		buf += sizeof (char);
		if (pc->pcinfo->numofsp)
		{
			memcpy (buf, &(pc->pcinfo->SPLIST), pc->pcinfo->numofsp*sizeof (struct NormalAddress));
			buf += pc->pcinfo->numofsp*sizeof (struct NormalAddress);
		}
	}
	*(int *)buffer = buf - buffer;
	if (writeMessage (p, pc, buffer) < 0)
	{
		perror ("CP: write SP");
		Clientclosure (listnum, TYPE_P2PC);
		return -1;
	}
	return listnum;
}


int process_P2P_SPUPDATE (int listnum, struct Message *m)
{
	int maxq;
	struct Edge *pedge;
	char buffer[MAX_DATA];
	struct Channel *pc;
	struct SPUpdate *s = (struct SPUpdate *)(m->buffer+MD5_LEN);
	int bShouldCloseSP = 0;

	if (m->len < sizeof (struct SPUpdate) + MD5_LEN + 5)
	{
		PDEBUG ("Invalid message %d, length %d not enough\n", m->type, m->len);
		Clientclosure (listnum, TYPE_P2PC);
		return -1;
	}
	*(int *)buffer = sizeof (struct SPUpdate) + sizeof(int) + sizeof (char);
	*(unsigned char *)(buffer + sizeof (int)) = P2P_SPUPDATE;
	PINFO ("Recv SPUPDATE(%lld,%lld,%u,%u).\n", s->minKeySample, s->maxKeySample, s->minBlockID, s->maxBlockID);
	memcpy (buffer+sizeof(int)+sizeof(char), s, sizeof(struct SPUpdate));

	if ((pc=findChannel (m->buffer, MD5_LEN)) == NULL) return -1;

	if (s->minBlockID == 0xffffffff && s->maxBlockID == 0xffffffff)
	{
		pc->pcinfo->dataSource = NULL;	/* indication of end */
		if(s->minKeySample == -1 && s->maxKeySample == -1)
			PDEBUG("NO SUCH RESOURCE!\n"); // no such resource
		else if(s->minKeySample == 0 && s->maxKeySample == 0)
			PDEBUG("CHANNEL HAS BEEN CLOSED!\n");// channel has been closed
		else
			PDEBUG("UNKNOWN MESSAGE! 1\n"); // unknown message
		bShouldCloseSP = 1; // should close connection
	} else
	{
		if(s->minBlockID == 0 && s->maxBlockID == 0 && s->minKeySample == 0 && s->maxKeySample == 0)
		{
			PDEBUG("END OF CHANNEL!\n"); // end of channel
			bShouldCloseSP = 1;
		} else if (s->minKeySample == -1ULL && s->maxKeySample == -1ULL)
		{
			INIT_MAXQ(pc,s,maxq);
			return 0;
		} else if (s->minKeySample == -2LL)
		{
			INIT_MAXQ(pc,s,maxq);
			pc->type = T_PLIST;
			pc->pcinfo->max_channel = (int)(s->maxKeySample);
			if (pc->pcinfo->max_channel <= 0 || pc->pcinfo->max_channel >= MAX_FILEINPUT)
				return -1;
			if (pc->pcinfo->media != NULL)
				freeMedia (pc);
			pc->pcinfo->media = calloc (sizeof (struct MediaData), pc->pcinfo->max_channel);
			return 0;
		} else if (s->minKeySample == -3LL)
		{
			pc->pcinfo->max_channel = 1;
			if (pc->pcinfo->media != NULL)
				freeMedia (pc);
			pc->pcinfo->media = calloc (sizeof (struct MediaData), 1);
		} else
		{
			memcpy (&(pc->pcinfo->s), s, sizeof (struct SPUpdate));
			// request block after spupdate, not wait!
			// now, block will be sent automaticlly by SP
			//	send_P2P_PUSHLIST (pc, s->maxBlockID);
		}
	}
	{
		int i = 0;
		unsigned char vcode = 0;
		for (i = 0; i < sizeof (struct SPUpdate); ++i) {
			vcode += ((unsigned char*)s)[i];
		}
		buffer[*(int*)buffer] = vcode;
		++*(int*)buffer;
	}
	for (pedge=pc->PeerHead; pedge; pedge=pedge->cnext)
	{
		if (pedge->me->npcp == TYPE_CP)
		{
//			PDEBUG ("Send SPUPDATE to CP %d.\n", pedge->me-TRACKER[TYPE_P2PC].head);
			writeMessage (pedge->me, pc, (char *)m);
		} else
		{
//			PDEBUG ("Send SPUPDATE to NP %d.\n", pedge->me-TRACKER[TYPE_P2PS].head);
			writeMessage (pedge->me, pc, buffer);
		}
	}
	if(bShouldCloseSP != 0 || pedge == pc->PeerHead/* no NP*/)
			return -1; // Close Connection to SP
	return 0;
}

int process_P2P_RESPONSE (int listnum, struct Message *m)
{
	int size;
	struct Session *p = &(TRACKER[TYPE_P2PC].head[listnum]);
	struct Channel *pc;
	char *msg = m->buffer+MD5_LEN;

	
	if ((pc=findChannel (m->buffer, MD5_LEN)) == NULL)
		return -1;

 	if ((size = saveBlock (pc, msg, p)) <= 0)
	{
   		PDEBUG ("save block error, size %d, %d\n", size, listnum);
		return -1;
//		Clientclosure (listnum, TYPE_P2PC);
	}
	p->last_transferblock = CurrentTime;
	return 0;
}

int process_P2PC (int listnum)
{
	struct Session *p = &(TRACKER[TYPE_P2PC].head[listnum]);
	struct Message *m = (struct Message *)(p->buf+p->start);

	tmpDownBytes += m->len;

	switch (m->type)
	{
		case P2P_SPUPDATE:
			if(process_P2P_SPUPDATE (listnum, m) < 0)
			{
				Clientclosure(listnum, TYPE_P2PC);
			}
			break;
		case P2P_RESPONSE:
			process_P2P_RESPONSE (listnum, m);
			break;
		case P2P_MSG:
			break;
		case P2P_MEDIATYPE:
			process_P2P_MEDIATYPE (listnum, m);
			break;
		default:
			PDEBUG("Err msg type from SP.\n");
			Clientclosure (listnum, TYPE_P2PC);
			return -1;
	}
	return 0;
}

int closure_P2PC (int listnum)
{
//	struct Edge *pedge, *prevedge;
//	char buffer[MAX_DATA], *buf;
	struct Session *p=&(TRACKER[TYPE_P2PC].head[listnum]);
	struct Channel *pc = p->pc;

	if (pc)
	{
		/*
		buf = buffer + sizeof (int);
		*(unsigned char *)buf = P2P_SPUPDATE;
		buf += sizeof (char);
		memcpy (buf, pc->channel_md5, MD5_LEN);
		buf += MD5_LEN;
		memset (buf, 0, sizeof (struct SPUpdate));
		buf += sizeof (struct SPUpdate);
		*(int *)buffer = buf - buffer;
		for (pedge=pc->PeerHead; pedge; pedge=prevedge)
		{
			pc->numclient --;
			writeMessage (pedge->me, buffer);
			prevedge = pedge->cnext;
			delEdge (pedge);
		}
		*/
		pc->pcinfo->dataSource = NULL;
	}
	PDEBUG ("SP disconnected from %d.%d.%d.%d:%d.\n", IPADDR (p->host), p->port);
	FD_CLR(p->socket, &osocks);
	close (p->socket);
	FREE (p->buf);
	deleteAll (p);
	memset (p, 0, sizeof (struct Session));
	return 0;
}

#ifdef HAVE_TS
int register_cp ()	//send UDP msg
{
	const int max_times = 10;
	int i;
	char *buf;
	char buffer[MAX_DATA];

	isSet = 0;

	if (TSSOCK == 0)
	{
		for (i=0; i<max_times; i++)
		{
			if (BINDALL == 0)
				TSSOCK = init_udp (BINDIP, cfgCP2TS_PORT);
			else
				TSSOCK = init_udp (NULL, cfgCP2TS_PORT);
			if (TSSOCK > 0) break;
			PDEBUG("Sleep 1000. cause init UDP port %d failed.", cfgCP2TS_PORT);
			sleep (1000);
		}
		if (TSSOCK <= 0)
		{
			PDEBUG ("exit...\n");
			exit (1);	//the max times try init_udp failure
		}
	}
	buf = buffer + sizeof (int);
	*(unsigned char *)buf = CP2TS_REGISTER;
	buf += sizeof (char);
	*(int *)buf = AUTH_USERID;
	buf += sizeof (int);
	strncpy (buf, AUTH_MD5, MD5_LEN);
	buf += MD5_LEN;
	*(unsigned short *)buf = htons (cfgP2PS_PORT);
	buf += sizeof (short);
	if (SCP_CHANNEL)	//Now only GCP is available
	{
		*buf = CT_SPECIFIED_RES;
		buf += sizeof (char);
		memcpy (buf, SCP_CHANNEL, strlen(SCP_CHANNEL)+1);
		buf += strlen (SCP_CHANNEL)+1;
	} else if (ECP_REGION)
	{
		*buf = CT_EDGE;
		buf += sizeof (char);
		buf = parseECP (ECP_REGION, buf);
	} else

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -