⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 spnew.c

📁 mysee网络直播源代码Mysee Lite是Mysee独立研发的网络视频流媒体播放系统。在应有了P2P技术和一系列先进流媒体技术之后
💻 C
📖 第 1 页 / 共 3 页
字号:
		   {
		   waitpid (pid, NULL, 0);
		   }
		 */
	}
	return 0;
}

int
process_P2P_HELLO (struct Session *p, struct Message *m)
{
	struct SPUpdate spupdate;
	struct Edge *pedge;
	struct Channel *pc;
	int listnum;
	PDEBUG ("CP %d.%d.%d.%d:%d join channel %.32s\n",
		IPADDR (p->host), p->port, m->buffer);
	listnum = p - TRACKER[TYPE_CP].head;
	if ((pc = findChannel (m->buffer, MD5_LEN)) == NULL)
	{
		if ((pc = findOrder (m->buffer, MD5_LEN)) == NULL &&
			(pc = newOrder (m->buffer)) == NULL)
		{
			BUILD_NOCH_SPUPDATE(spupdate);
			send_P2P_SPUPDATE (p, pc, m->buffer, &spupdate);
			PDEBUG ("Cannot find channel %.32s\n",
				m->buffer);
			//Clientclosure (listnum, TYPE_CP);
		} else
		{
			BUILD_ORDER_SPUPDATE(spupdate,pc);
			send_P2P_SPUPDATE (p, pc, m->buffer, &spupdate);
			PDEBUG ("channel %.32s spupdate %d~%d. \n",
				m->buffer, spupdate.minBlockID,
				spupdate.maxBlockID);
		}
	} else
	{
		for (pedge = p->header; pedge && pedge->head == pc;
		     pedge = pedge->enext);
		if (!pedge)
		{
			pedge = newEdge (pc, p);
			pc->numclient++;
		}
		if (pc->pcinfo)
		{
			if (pc->pcinfo->status > 0)
			{
				// this channel has been closed
				BUILD_CLOSE_SPUPDATE(spupdate);
				send_P2P_SPUPDATE (p, pc, m->buffer, &spupdate);
				PDEBUG ( "channel %.32s has been closed. \n",
					m->buffer);
				//Clientclosure (listnum, TYPE_CP);
			} else if (pc->pcinfo->mlist != NULL)
			{
				BUILD_MLIST_SPUPDATE (spupdate, pc);
				send_P2P_SPUPDATE (p, pc, m->buffer, &spupdate);
				sendMedia (p, pc);
				PDEBUG ("Channel %.32s is a playlist.\n", m->buffer);
			} else
			{
				BUILD_LIVE_SPUPDATE(spupdate,pc);
				send_P2P_SPUPDATE (p, pc, m->buffer, &spupdate);
				sendMedia (p, pc);
			}
		} else
		{
			BUILD_ORDER_SPUPDATE(spupdate,pc);
			send_P2P_SPUPDATE (p, pc, m->buffer, &spupdate);
			PDEBUG ("channel %.32s spupdate %d~%d. \n",
				m->buffer, spupdate.minBlockID,
				spupdate.maxBlockID);
		}
	}
	return 0;
}

int
process_P2P_PUSHLIST (struct Session *p, struct Message *m)
{
	struct Edge *pedge;
	struct Channel *pc;
	char *buf;
	int i,size, type;

	if ((pc = findChannel (m->buffer, MD5_LEN)) == NULL)
		pc = newOrder (m->buffer);
	else if (pc->pcinfo != NULL)
	{
		for (pedge = p->header; pedge && pedge->head != pc;
		     pedge = pedge->enext);
		if (pedge == NULL)
		{
			pedge = newEdge (pc, p);
			pc->numclient++;
		}
	}
	if (pc == NULL)
		return -1;

	if (p->numjob >= MAX_JOB_PER_SESSION)
		return -2;

	buf = m->buffer + MD5_LEN;
	type = *(unsigned char *) buf;
	buf += sizeof (char);
	size = *(unsigned char *) buf;
	buf += sizeof (char);
	if (type)
	{
		deleteChannel (p, pc);
		for (i=0; i<size; i++)
			if (process_P2P_REQUEST_real (p, pc, ((unsigned int *)buf)[i]) < 0) 
				return -1;
	} else
	{
		for (i=0; i<size; i++)
			if (process_P2P_REQUEST_real (p, pc, ((unsigned int *)buf)[i]) < 0)
				return -1;
		buf += size * sizeof (int);
		size = *(unsigned char *) buf;
		buf += sizeof (char);
		deleteJob (p, pc, (unsigned int *) buf, size);
	}
	return 0;
}

int
process_P2P_REQUEST_real (struct Session *p, struct Channel *pc, int id)
{
	struct JobDes *pj = newJob ();
	char *buf, *buffer;
	int listnum, size=0, max;
	struct SPUpdate spupdate;

	if (pj == NULL) return -1;
	buffer = getJobBuffer (pj, &max);
	listnum = p - TRACKER[TYPE_CP].head;
	buf = buffer + sizeof (int);
	*(unsigned char *) buf = P2P_RESPONSE;
	buf += sizeof (char);
	memcpy (buf, pc->channel_md5, MD5_LEN);
	buf += MD5_LEN;

	if (pc->pcinfo == NULL)
	{
		*(int *) buf = id;
		buf += sizeof (int);
		if ((size =
		     locate_order_by_id (pc, id, buf + sizeof (int),
					 max)) < 0 && size != -2)
		{
			// BLOCK NOT FOUND
			spupdate.minKeySample = -1LL;
			spupdate.maxKeySample = -1LL;
			spupdate.minBlockID = 0xffffffff;
			spupdate.maxBlockID = 0xffffffff;
			send_P2P_SPUPDATE (p, pc, pc->channel_md5, &spupdate);
			size = 0;
			*(int *) buf = 0;
			buf += sizeof (int);

		} else if (size == -2)
		{
			PDEBUG ("Leave blocks %d to next round.\n", id);
			return -1;
		} else
		{
			// block found
			*(int *) buf = size;
			buf += sizeof (int) + size;
			p->last_transferblock = CurrentTime;
		}
//              Clientclosure (listnum, TYPE_CP);
	} else if (id >= 0 && pc->pcinfo->mlist != NULL
		   && (size = locate_mplist_by_id (pc, id, buf, max - 32)) > 0)
	{
		p->last_transferblock = CurrentTime;
		buf += 2 * sizeof (int) + size;

	} else if (id >= 0 && pc->pcinfo->mlist == NULL
		   && (size = locate_by_id (pc, id, buf, max - 32)) > 0)
	{
		p->last_transferblock = CurrentTime;
		buf += 2 * sizeof (int) + size;
	} else if (size == -2)
	{
		assert (0);
		PDEBUG ("Leave blocks %d to next round.\n", id);
		return -1;
	} else
	{
		*(int *) buf = id;
		buf += sizeof (int);
		size = 0;
		*(int *) buf = 0;
		buf += sizeof (int);
		PDEBUG ("Cannot find block id %d required by client %d.%d.%d.%d.\n",
			id, IPADDR (p->host));
	}
	*(int *) buffer = buf - buffer;
	setblockId (pj, id);
	writeDATAMessage (p, pc, pj);
//	PDEBUG ("Write block %d to %d.%d.%d.%d\n", id,
//		IPADDR (p->host));
	return 0;
}


int
init_CP (int listnum)
{
	return 0;
}

int
process_CP (int listnum)
{
	int ret;
	struct Session *p = &(TRACKER[TYPE_CP].head[listnum]);
	struct Message *m = (struct Message *) (p->buf + p->start);

	tmpDownBytes += m->len;

	switch (m->type)
	{
		case P2P_HELLO:
			ret=  process_P2P_HELLO (p, m);
			break;
		case P2P_PUSHLIST:
			ret = process_P2P_PUSHLIST (p, m);
			break;
		case P2P_MSG:
			break;
		default:
			ret = -1;
			break;
	}
	switch (ret)
	{
		case -1:
			PDEBUG ("Message processing error from client %d.%d.%d.%d\n",
				IPADDR (p->host));
			Clientclosure (listnum, TYPE_CP);
			return -1;
		case -2:
			return -2;
		default:
			return 0;
	}
}

int
closure_CP (int listnum)
{
	struct Session *p = &(TRACKER[TYPE_CP].head[listnum]);
//      struct Channel *pc = p->pc;
	struct Edge *pedge, *nextedge;
	PDEBUG ("CP disconnected from %d.%d.%d.%d:%d\n",
		IPADDR (p->host), p->port);
	for (pedge = p->header; pedge; pedge = nextedge)
	{
		nextedge = pedge->enext;
		if (pedge->head)
			pedge->head->numclient--;
		delEdge (pedge);
	}
	FD_CLR (p->socket, &osocks);
	close (p->socket);
	FREE (p->buf);
	deleteAll (p);
	memset (p, 0, sizeof (struct Session));
	return 0;
}


int
init_CS (int listnum)
{
	return 0;
}


int
process_CS2SP_REGISTER (int listnum, char *msg)
{
	int i, errmsg;
	char cname[MAX_LINE], cmd5[MD5_LEN + 1], buffer[MAX_DATA];
	//char escape_buf[MAX_LINE];
	char md5[MD5_LEN + 1];
	struct Session *p = &(TRACKER[TYPE_CS].head[listnum]), *source;
	struct Channel *pc;
	float bitrate, limitedbitrate=10000.0;
	int id, startblock, size, maxblocksize, datalen, issave=0;

	size = *(unsigned char *) msg;
	msg += sizeof (char);
	if (size > sizeof (cname) || size <= 0)	//wrong Message
	{
		Clientclosure (listnum, TYPE_CS);
		return -1;
	} else
		memcpy (cname, msg, size);
	cname[size] = 0;
	msg += size;
//      memcpy (cmd5, msg, MD5_LEN);
//      msg += MD5_LEN;
	id = *(int *) msg;
	msg += sizeof (int);

	sprintf (buffer, "%d@%s_%s", id, defaultspip, cname);
	md5_calc ((unsigned char *) md5, (unsigned char *) buffer,
		  strlen (buffer));
	for (i = 0; i < MD5_LEN; i += 2)
		sprintf (cmd5 + i, "%02x", (unsigned char) md5[i / 2]);
	cmd5[MD5_LEN] = 0;

	memcpy (md5, msg, MD5_LEN);
	msg += MD5_LEN;
	maxblocksize = *(int *) msg;
	msg += sizeof (int);
	bitrate = *(float *) msg;
	msg += sizeof (float);
	datalen = *(unsigned short *) msg;
	if (datalen > MAX_LINE)
	{
		Clientclosure (listnum, TYPE_CS);
		return -1;
	}
	msg += sizeof (short);
	if (AuthCS && (errmsg =
	     isAllowed (id, md5, cname, bitrate, &limitedbitrate, &issave)) < 0)
	{
		PDEBUG ("User %d is not allowed to newchannel %s.\n",
			id, cname);
		send_p2p_err (p, -errmsg, 1);
		Clientclosure (listnum, TYPE_CS);
		return -1;
	}

	startblock = 0;
	if ((pc = findChannel (cmd5, MD5_LEN)) != NULL)
	{
		if (pc->pcinfo == NULL || pc->pcinfo->mlist != NULL)
		{
			PDEBUG ("The channel %s is a playlist.\n", cname);
			send_p2p_err (p, ERR_INTERNAL, 1);
			Clientclosure (listnum, TYPE_CS);
			return -1;
		}
		if ((source = pc->pcinfo->dataSource) != NULL)
		{
			Clientclosure (source - TRACKER[TYPE_CS].head,
				       TYPE_CS);
		}
	}
	if ((pc = newLiveChannel (cname, p, cmd5, bitrate,
				    maxblocksize)) != (struct Channel *) 0)
	{
		p->pc = pc;
		pc->pcinfo->userid = id;
		pc->pcinfo->limitedBitRate = limitedbitrate;
		pc->pcinfo->isSave = issave;
		pc->pcinfo->dataSource = &(TRACKER[TYPE_CS].head[listnum]);
		pc->pcinfo->startid = pc->pcinfo->maxID = (CurrentTime - FIX_MAGIC) * 16;
	} else
	{
		PDEBUG ("newLiveChannel failed.\n");
		send_p2p_err (p, ERR_INTERNAL, 1);
		Clientclosure (listnum, TYPE_CS);
		return -1;
	}
	*(int *) buffer = 9;
	*(char *) (buffer + sizeof (int)) = SP2CS_WELCOME;
	*(int *) (buffer + sizeof (int) + sizeof (char)) = startblock;
	if (writeMessage (p, pc, buffer) < 0)
	{
		send_p2p_err (p, ERR_INTERNAL, 1);
		Clientclosure (listnum, TYPE_CS);
		return -1;
	}
	pc->pcinfo->cur_channel = pc->pcinfo->max_channel = 1;
	pc->pcinfo->media = calloc (1, sizeof (struct MediaData));
	pc->pcinfo->media[0].start = 0;
	pc->pcinfo->media[0].len = -1;
	pc->pcinfo->media[0].dlen = datalen;
	pc->pcinfo->media[0].data = calloc (1, datalen);
	memcpy (pc->pcinfo->media[0].data, msg, datalen);
#ifdef TEST
	/*
	for (type = 0; type < MAX_TS; ++type)
		buildGTV (pc, datalen, msg, type);
	*/
	for (i=0; i<MAX_TS; i++) {
	if (buildGTV (pc, datalen, msg, i) < 0)
		continue;
	}
#endif
#ifdef HAVE_MYSQL
//      query_mysql (local_mysql, "delete from channel where ChannelMD5 = \"%s\"", cmd5);
//      mysql_escape_string (escape_buf, msg, datalen);
//      query_mysql (local_mysql, "insert into channel (ChannelName, ChannelBitrate, ChannelAttachData, ChannelMD5, ChannelOwnerID) values (\"%s\",\"%f\", \"%s\", \"%s\", \"%d\")", cname, bitrate, escape_buf, cmd5, id);
#endif
	return 0;
}

int
process_CS2SP_UPDATE (int listnum, float rate)	//only update bitrate
{
	struct Session *p = &(TRACKER[TYPE_CS].head[listnum]);
	struct Channel *pc = p->pc;
	struct LiveChannelInfo *pcinfo = pc->pcinfo;

	pcinfo->bitrate = rate;
	if (rate > pcinfo->limitedBitRate)
	{
		send_p2p_err (p, ERR_EXCEED_BITRATE, 1);
		Clientclosure (listnum, TYPE_CS);
		return -1;
	}
#ifdef HAVE_MYSQL
//      query_mysql (local_mysql, "update channel set ChannelBitrate = \"%f\" where ChannelMD5 = \"%s\"", rate, pc->channel_md5);
#endif

	return 0;
}

int
process_CS2SP_BLOCK (int listnum, char *msg)
{
	char *buf, buffer[MAX_DATA];
	struct Edge *pedge;
	int size;		// max=TRACKER[TYPE_CP].maxid+1;
	struct Session *p = &(TRACKER[TYPE_CS].head[listnum]);
//      struct Session *q=TRACKER[TYPE_CP].head;
	struct Channel *pc = p->pc;
	struct LiveChannelInfo *pcinfo;

	if (p->pc == NULL || (pcinfo = p->pc->pcinfo) == NULL || pcinfo->mlist != NULL)
	{
		PDEBUG ("Unmatched channel\n");
		Clientclosure (listnum, TYPE_CS);
		return -1;
	}

	pcinfo = p->pc->pcinfo;
	if ((size = saveBlock (pc, msg, p)) > 0)
	{
		// directly send this block to connected CPs
		buf = buffer + sizeof (int);
		*(unsigned char *) buf = P2P_RESPONSE;
		buf += sizeof (char);
		memcpy (buf, pc->channel_md5, MD5_LEN);
		buf += MD5_LEN;
		p->last_transferblock = CurrentTime;
		memcpy (buf, msg, size + 2 * sizeof (int));
		buf += size + 2 * sizeof (int);
		*(int *) buffer = buf - buffer;
		for (pedge = pc->PeerHead; pedge; pedge = pedge->cnext)
		{
			if (pedge->me->numjob >= MAX_JOB_PER_SESSION)
				continue;
			pedge->me->last_transferblock = CurrentTime;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -