⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 spnew.c

📁 mysee网络直播源代码Mysee Lite是Mysee独立研发的网络视频流媒体播放系统。在应有了P2P技术和一系列先进流媒体技术之后
💻 C
📖 第 1 页 / 共 3 页
字号:
//			PDEBUG ("write %u ",
//				((unsigned int *) msg)[0]);
			if (-1 == writeMessage (pedge->me, pc, buffer))
				PDEBUG ("buffer is full\n");
//			else
//				PDEBUG ("OK\n");
		}
		if (pcinfo->updated <= CurrentTime + SPUPDATE_SLOT)	//pcinfo is livechannel
			send_all_spupdate (pc, &(pcinfo->s));
	} else
	{
		PDEBUG ("save Block Failed ! size %d, %d\n", size,
			listnum);
		Clientclosure (listnum, TYPE_CS);
	}
	return 0;
}

int
process_CS (int listnum)
{
	struct Session *p = &(TRACKER[TYPE_CS].head[listnum]);
	struct Message *m = (struct Message *) (p->buf + p->start);

	tmpDownBytes += m->len;

	switch (m->type)
	{
		case CS2SP_REGISTER:
			process_CS2SP_REGISTER (listnum, m->buffer);
			break;
		case CS2SP_UPDATE:
			process_CS2SP_UPDATE (listnum,
					      *(float *) (m->buffer));
			break;
		case CS2SP_BLOCK:
			process_CS2SP_BLOCK (listnum, m->buffer);
			break;
		default:
			Clientclosure (listnum, TYPE_CS);
			return -1;
	}
	return 0;
}

int
closure_CS (int listnum)
{
	struct Session *p = &(TRACKER[TYPE_CS].head[listnum]);
	struct Channel *pc = p->pc;
	PDEBUG ("CS disconnected from %d.%d.%d.%d:%d\n",
		IPADDR (p->host), p->port);

	if (pc)
	{
		if (pc->pcinfo)
		{
			pc->pcinfo->dataSource = NULL;
			pc->pcinfo->status = 1;
		}
		freeLiveChannel (pc, NULL);
	}
	FD_CLR (TRACKER[TYPE_CS].head[listnum].socket, &osocks);
	close (TRACKER[TYPE_CS].head[listnum].socket);
	FREE (p->buf);
	deleteAll (p);
	memset (&(TRACKER[TYPE_CS].head[listnum]), 0,
		sizeof (struct Session));
	return 0;
}


int
init_sp ()
{
	FILE *pidf;
	struct rlimit rl;
	char *index;
	int type;
	char buffer[MAX_DATA];

	rl.rlim_cur = rl.rlim_max = 1000000;
	if (setrlimit (RLIMIT_NOFILE, &rl) != 0)
	{
		perror ("setrlimit");
	}
	for (type=0; type<MAX_TS; type++)
	{
		if (spip[type] != NULL && strlen (spip[type]) >= MIN_IPADDR_LEN)
		{
			defaultspip = spip[type];
			break;
		}
	}
	if (defaultspip == NULL)
		defaultspip = "127.0.0.1";

	OPENLOG;
#ifdef DEBUG
	system ("ulimit -a");
	if (getrlimit (RLIMIT_CORE, &rl) != 0)
	{
		perror ("getrlimit");
	}
	PDEBUG ("Get core limit %d:%d\n", (int)rl.rlim_cur, (int)rl.rlim_max);
	rl.rlim_cur = rl.rlim_max = (rlim_t )10240000;
	if (setrlimit (RLIMIT_CORE, &rl) != 0)
	{
		perror ("setrlimit");
	}
	if (getrlimit (RLIMIT_CORE, &rl) != 0)
	{
		perror ("getrlimit");
	}
	PDEBUG ("Set core limit to %d:%d\n", (int)rl.rlim_cur, (int)rl.rlim_max);
	system ("ulimit -a");
#endif
#ifdef HAVE_MYSQL
	if ((local_mysql =
	     init_mysql (MYSQL_HOST, MYSQL_USER, MYSQL_PASS, MYSQL_DB,
			 "/var/run/mysqld/mysqld.sock")) == 0)
	{
		PDEBUG ("Error in init_mysql.\n");
		exit (1);
	}
#endif
	TRACKER[TYPE_CP].type = TYPE_CP;	//allocate CP ServerDesc  TYPE_CP = 0
	TRACKER[TYPE_CP].port = SP4CP_PORT;	//SP4CP_PORT = 50001
	TRACKER[TYPE_CP].cur = 0;	//current client connection 
	TRACKER[TYPE_CP].max = MAX_CP;	//MAX_CP = 2048
	TRACKER[TYPE_CP].init = init_CP;	//the function pointer of init_CP,return a debug message
	TRACKER[TYPE_CP].process = process_CP;	//the function pointer of process_CP, switch TYPE
	TRACKER[TYPE_CP].closure = closure_CP;	//the funciont pointer of closure_CP,return debug msg and freejob
	TRACKER[TYPE_CP].head = calloc (sizeof (struct Session), TRACKER[TYPE_CP].max);	//allocate session memory
	switch (BINDALL)
	{
		case 0:
			if ((TRACKER[TYPE_CP].sock = init_server (spip[0], SP4CP_PORT)) < 0)	//PORT = 50001
				return -1;
			break;
		default:
			if ((TRACKER[TYPE_CP].sock = init_server (NULL, SP4CP_PORT)) < 0)	//PORT = 50001
				return -1;
			break;
	}
	FD_SET (TRACKER[TYPE_CP].sock, &osocks);
	TRACKER[TYPE_CS].type = TYPE_CS;	//allocate CP ServerDese,TYPE_CS = 1
	TRACKER[TYPE_CS].port = SP4CS_PORT;
	TRACKER[TYPE_CS].cur = 0;
	TRACKER[TYPE_CS].max = MAX_CS;	//MAX_CS = 512
	TRACKER[TYPE_CS].init = init_CS;	//return a debug message        
	TRACKER[TYPE_CS].process = process_CS;
	TRACKER[TYPE_CS].closure = closure_CS;
	TRACKER[TYPE_CS].head =
		calloc (sizeof (struct Session), TRACKER[TYPE_CS].max);
	switch (BINDALL)
	{
		case 0:
			if ((TRACKER[TYPE_CS].sock = init_server (spip[0], SP4CS_PORT)) < 0)	//port = 
				return -1;
			break;
		default:
			if ((TRACKER[TYPE_CS].sock = init_server (NULL, SP4CS_PORT)) < 0)	//port = 
				return -1;
			break;
	}
	FD_SET (TRACKER[TYPE_CS].sock, &osocks);
	if (db_init (Home, Database) < 0)
		return -1;

	memset (tsAddr, 0, sizeof (struct sockaddr_in) * MAX_TS);	//sockaddr_in type
	for (type = 0; type < MAX_TS; ++type)
	{
#ifdef TEST
		sprintf (buffer, "rm -f %s/%s/*.gtv", WWW_ROOT,
			 NET_NAME[type]);
		system (buffer);
		sprintf (buffer, "rm -f %s/%s/*.mediadata", WWW_ROOT,
			 NET_NAME[type]);
		system (buffer);
		sprintf (buffer, "rm -f %s/%s/channel.xml", WWW_ROOT,
			 NET_NAME[type]);
		system (buffer);
#endif
		sprintf (buffer, "rm -fr %s/%s/*", PREFIX, LIVE_PREFIX);
		system (buffer);

		if (tsip[type] == NULL) continue;
		tsAddr[type].sin_family = AF_INET;
		tsAddr[type].sin_port = htons (TS4RM_PORT);
		index = strchr (tsip[type], ':');
		if (index == NULL)
			inet_aton (tsip[type], &tsAddr[type].sin_addr);
		else
		{
			*index = 0;
			inet_aton (tsip[type], &tsAddr[type].sin_addr);
			*index = ':';
		}
		tsSock[type] = socket (PF_INET, SOCK_DGRAM, 0);	//upd connection
		if (tsSock[type] < 0)
			return -1;
	}
	mkdir (PREFIX, 0777);
	sprintf (buffer, "%s/%s", PREFIX, LIVE_PREFIX);
	mkdir (buffer, 0777);
	sprintf (buffer, "%s/%s", PREFIX, ORDER_PREFIX);
	mkdir (buffer, 0777);
	sprintf (buffer, "%s/%s", PREFIX, PLIST_PREFIX);
	mkdir (buffer, 0777);
	sprintf (buffer, "%s/%s", PREFIX, PROG_PREFIX);
	mkdir (buffer, 0777);
	if ((pidf = fopen (PIDFile, "w")) == NULL)
	{
		PDEBUG ("Cannot open pidfile.\n");
		return -1;
	}
	fprintf (pidf, "%d\n", getpid ());
	fclose (pidf);
	return 0;
}

#ifdef TEST
/*
int
buildMediaData (struct Channel *pc, int datalen, char *data, int type)
{
	int i;
	char olddata[MAX_DATA];
	char buffer[MAX_DATA];
	struct stat stbuf;
	FILE *f;
	assert (pc->pcinfo);
	snprintf (buffer, MAX_LINE, "%s/%s/%s.mediadata", WWW_ROOT, NET_NAME[type], pc->channel_name);
	if (stat (buffer, &stbuf) == 0)
	{
		if (stbuf.st_size != datalen)
		{
			PDEBUG ("old media data size %d not match new %d\n", (int)(stbuf.st_size), datalen);
			return -1;
		}
		if ((f = fopen (buffer, "r")) == NULL)
		{
			PDEBUG ("cannot open mediadata file %s\n", buffer);
			perror ("fopen");
			return -1;
		}
		if (fread (olddata, datalen, 1, f) == 1)
		{
			for (i=0; i<datalen; i++)
			{
				if (data[i] != olddata[i])
				{
					PDEBUG ("media data not match %s, %d\n", buffer, i);
					fclose (f);
					return -1;
				}
			}
		} else
		{
			PDEBUG ("Error in read old mediadata %s.\n", buffer);
			fclose (f);
			return -1;
		}
		fclose (f);
		return 0;
	} else if ((f = fopen (buffer, "w")) == NULL)
	{
		PDEBUG ("cannot open gtv file %s\n", buffer);
		perror ("fopen");
		return -1;
	}
	fwrite (data, datalen, 1, f);
	fclose (f);
	return 0;
}
*/

int
buildGTV (struct Channel *pc, int datalen, char *data, int type)
{
	char buffer[MAX_DATA];
	FILE *f;
	assert (pc->pcinfo);
	snprintf (buffer, MAX_LINE, "%s/%s/%s.gtv", WWW_ROOT, NET_NAME[type], pc->channel_name);
//	snprintf (buffer, MAX_LINE, "%s/%s.gtv", pc->fname, pc->channel_name);
	if ((f = fopen (buffer, "w")) == NULL)
	{
		PDEBUG ("cannot open gtv file %s\n", buffer);
		perror ("fopen");
		return -1;
	}
	if (pc->pcinfo)
	{
		if (pc->pcinfo->mlist != NULL)
		sprintf (buffer,
			 "CSUserID=%d\r\nBlockSize=%d\r\nBitRate=%f\r\nChannelName=%s\r\nPlaylist=true\r\nResourceHash=%s\r\nTrackServer=%s\r\nSuperPeer=%s:50001\r\nDataLength=%d\r\nData=",
			 pc->pcinfo->userid, pc->maxblocksize, pc->pcinfo->bitrate,
			 pc->channel_name, pc->channel_md5, tsip[type], spip[type],
			 datalen);
		else
		sprintf (buffer,
			 "CSUserID=%d\r\nBlockSize=%d\r\nBitRate=%f\r\nChannelName=%s\r\nResourceHash=%s\r\nTrackServer=%s\r\nSuperPeer=%s:50001\r\nDataLength=%d\r\nData=",
			 pc->pcinfo->userid, pc->maxblocksize, pc->pcinfo->bitrate,
			 pc->channel_name, pc->channel_md5, tsip[type], spip[type],
			 datalen);
	} else
		sprintf (buffer,
			 "GTVHome=%s/\r\nBlockSize=%d\r\nResourceHash=%s\r\nTrackServer=%s\r\nSuperPeer=%s:50001\r\nDataLength=%d\r\nData=",
			 urlroot, pc->maxblocksize, pc->channel_md5,
			 tsip[type], spip[type], datalen);
	fwrite (buffer, strlen (buffer), 1, f);
	fwrite (data, datalen, 1, f);
	fclose (f);
/*	snprintf (buffer, MAX_LINE, "%s/%s/%s.mediadata", WWW_ROOT, NET_NAME[type], pc->channel_name);
	if (stat (buffer, &stbuf) == 0)
	{
		if (stbuf.st_size != datalen)
		{
			PDEBUG ("old media data size %d not match new %d\n", (int)(stbuf.st_size), datalen);
			return -1;
		}
		if ((f = fopen (buffer, "r")) == NULL)
		{
			PDEBUG ("cannot open mediadata file %s\n", buffer);
			perror ("fopen");
			return -1;
		}
		if (fread (olddata, datalen, 1, f) == 1)
		{
			for (i=0; i<datalen; i++)
			{
				if (data[i] != olddata[i])
				{
					PDEBUG ("media data not match %s, %d\n", buffer, i);
					fclose (f);
					return -1;
				}
			}
		} else
		{
			PDEBUG ("Error in read old mediadata %s.\n", buffer);
			fclose (f);
			return -1;
		}
		fclose (f);
		return 0;
	} else if ((f = fopen (buffer, "w")) == NULL)
	{
		PDEBUG ("cannot open gtv file %s\n", buffer);
		perror ("fopen");
		return -1;
	}
	fwrite (data, datalen, 1, f);
	fclose (f);
*/
    // added by lixingwu, 20070313
    // upload gtv files to server
    char upload_cmd[MAX_DATA];
    sprintf(upload_cmd, "curl -F filename=@%s/%s/%s.gtv -F domain=%s %s",
        WWW_ROOT, NET_NAME[type], pc->channel_name, spip[type], CAS_ADDR);
    printf("%s\n", upload_cmd);
    system(upload_cmd);
    
    return 0;
}
#endif

int
send_P2P_SPUPDATE (struct Session *p, struct Channel *pc, char *md5, struct SPUpdate *s)
{
	char buffer1[MAX_DATA];
	char *buf;
	buf = buffer1 + sizeof (int);
	*(unsigned char *) buf = P2P_SPUPDATE;	//P2P_SPUPDATE=1 in ProTocol file
	buf += sizeof (char);
	memcpy (buf, md5, MD5_LEN);
	buf += MD5_LEN;
	memcpy (buf, s, sizeof (struct SPUpdate));
	buf += sizeof (struct SPUpdate);
	*(int *) buffer1 = buf - buffer1;
	if (writeMessage (p, pc, buffer1) < 0)
		return -1;
	return 0;
}

int
send_p2p_err (struct Session *p, unsigned short code, int quit)
{
	char buffer1[MAX_DATA];
	char *buf;
	buf = buffer1 + sizeof (int);
	*(unsigned char *) buf = P2P_MSG;
	buf += sizeof (char);
	(*(unsigned short *) buf) = code;
	buf += sizeof (short);
	*(int *) buf = quit;
	buf += sizeof (int);
	*(int *) buffer1 = buf - buffer1;
	PDEBUG ("Send error msg type %hd to %p\n", code, p);
	if (writeMessage (p, NULL, buffer1) < 0)
		return -1;
	return 0;
}

void apply_idle (struct Session *p, void *arg)
{
	struct Argument *parg = (struct Argument *)arg;

	if (CurrentTime - p->last_transferblock >= MAX_TRANSFER_IDLE)
	{
		fprintf (parg->f, "%s Session timeout! %ld \n", parg->type == TYPE_CS? "CS":"CP", CurrentTime - p->last_transferblock);
		Clientclosure (p - TRACKER[parg->type].head, parg->type);
	}
}

void apply_check (struct Channel *p, void *arg)
{
	struct Argument *parg = (struct Argument *)arg;
	int type;
	struct LiveChannelInfo *pc = p->pcinfo;
	if (pc && pc->status <= 0)
	{
#ifdef HAVE_MYSQL
		query_mysql (local_mysql,
			     "insert into channel(ChannelName, ChannelBitrate, ChannelMD5, ChannelElapsed, ChannelRange) values ('%s\','%f','%d','%d')",
			     pc->channel_name, pc->bitrate,
			     p->channel_md5,
			     time (NULL) - p->ctime,
			     pc->s.maxKeySample -
			     pc->s.minKeySample);
#endif
#ifdef TEST
	for (type=0; type<MAX_TS; type++)
	{
		if (parg->xml[type] == NULL)
			continue;
		fprintf (parg->xml[type], "<Channel Name=\"%s\" Desc=\"%s\" File=\"%s.gtv\" NumClient=\"%d\" BitRate=\"%d\" Start=\"%ld\" End=\"-1\" Elapsed=\"%ld\"/>\r\n",
				 p->channel_name, "gtv", //pc->userid,
				 p->channel_name, p->numofnp[type],
				 (int) (pc->bitrate * 8),
				 (long) p->ctime, (long) CurrentTime);
	}
#endif
		PINFO ("query %s->%s \n",
				p->channel_name, p->channel_md5);
		memcpy (parg->buf, p->channel_md5, MD5_LEN);
		parg->buf += MD5_LEN;
		fprintf (parg->f, "Channel %s have %d client. upsize %lldB, avg speed %f; downsize %lldB, avg speed %f, reported %f, real/reported is %f%%.\n",
				 p->channel_md5, p->numclient,
				 p->upsize, ((float)(p->upsize)) / (CurrentTime - lastCheck), p->downsize, ((float)(p->downsize)) / (CurrentTime - lastCheck), p->pcinfo !=NULL ? p->pcinfo->bitrate:0, p->pcinfo != NULL && p->pcinfo->bitrate != 0 ? (((float)(p->downsize)) / (CurrentTime - lastCheck)*100/p->pcinfo->bitrate): 0);
		fprintf (parg->f, "Live SPUpdate : SampleMin %lld SampleMax %lld BlockMin %d BlockMax %d \n",
					 pc->s.minKeySample,
					 pc->s.maxKeySample,
					 pc->s.minBlockID,
					 pc->s.maxBlockID);
		parg->spchannelcount ++;
		parg->totalclient += p->numclient;
		parg->totalupsize += p->upsize;
	} else if (pc && pc->isSave == 0)
		freeLiveChannel (p, NULL);
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -