⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cpnew.c

📁 mysee网络直播源代码Mysee Lite是Mysee独立研发的网络视频流媒体播放系统。在应有了P2P技术和一系列先进流媒体技术之后
💻 C
📖 第 1 页 / 共 3 页
字号:
	{
		*buf = CT_GENERAL;
		buf += sizeof (char);
	}
	*(int *)buffer = buf - buffer;	//the size of buffer;
	TSADDR.sin_port = htons (TS4CP_PORT);
	TSADDR.sin_addr.s_addr = inet_addr (SERVERIP);
	TSADDR.sin_family = AF_INET;
	if (sendMessage (TSSOCK, buffer, &TSADDR) < 0)	//send register msg
	{
		PDEBUG ("Cannot write to server\n");
		return -1;
	}
	return 0;
}
#endif

int init_cp ()
{
	FILE *pidf;
	struct rlimit rl;
	char buffer[MAX_DATA];
	rl.rlim_cur = rl.rlim_max = 1000000;
	if (setrlimit (RLIMIT_NOFILE, &rl) != 0)
	{
		perror ("getrlimit");
	}
	OPENLOG;
#ifdef DEBUG
	system ("ulimit -a");
	if (getrlimit (RLIMIT_CORE, &rl) != 0)
	{
		perror ("getrlimit");
	}
	fprintf (stderr, "Get core limit %d:%d\n", (int)rl.rlim_cur, (int)rl.rlim_max);
	rl.rlim_cur = rl.rlim_max = (rlim_t )102400;
	if (setrlimit (RLIMIT_CORE, &rl) != 0)
	{
		perror ("getrlimit");
	}
	if (getrlimit (RLIMIT_CORE, &rl) != 0)
	{
		perror ("getrlimit");
	}
	fprintf (stderr, "Set core limit to %d:%d\n", (int)rl.rlim_cur, (int)rl.rlim_max);
	system ("ulimit -a");
#endif
#ifdef HAVE_TS
	if (register_cp () < 0)
	{
		PDEBUG ("Cannot init TS connection\n");
		return -1;
	}
#endif
	TRACKER[TYPE_P2PS].flag = FLAG_SERVER;
	TRACKER[TYPE_P2PS].type = TYPE_P2PS;
	TRACKER[TYPE_P2PS].port = cfgP2PS_PORT;
	TRACKER[TYPE_P2PS].cur = 0;
	TRACKER[TYPE_P2PS].max = MAX_P2PS;
	TRACKER[TYPE_P2PS].init = init_P2PS;
	TRACKER[TYPE_P2PS].process = process_P2PS;
	TRACKER[TYPE_P2PS].closure = closure_P2PS;
	TRACKER[TYPE_P2PS].head = calloc (sizeof (struct Session), TRACKER[TYPE_P2PS].max);
	switch (BINDALL)
	{
		case 0:
			if ((TRACKER[TYPE_P2PS].sock = init_server (BINDIP, cfgP2PS_PORT)) < 0)
			return -1;
			break;
		default:
			if ((TRACKER[TYPE_P2PS].sock = init_server (NULL, cfgP2PS_PORT)) < 0)
			return -1;
			break;
	}

	FD_SET(TRACKER[TYPE_P2PS].sock, &osocks);
	TRACKER[TYPE_P2PC].flag = FLAG_CLIENT;
	TRACKER[TYPE_P2PC].type = TYPE_P2PC;
	TRACKER[TYPE_P2PC].port = 0;
	TRACKER[TYPE_P2PC].cur = 0;
	TRACKER[TYPE_P2PC].max = MAX_P2PC;
	TRACKER[TYPE_P2PC].init = init_P2PC;
	TRACKER[TYPE_P2PC].process = process_P2PC;
	TRACKER[TYPE_P2PC].closure = closure_P2PC;
	TRACKER[TYPE_P2PC].head = calloc (sizeof (struct Session), TRACKER[TYPE_P2PC].max);

	mkdir (PREFIX, 0777);
	snprintf (buffer, MAX_DATA, "%s/%s", PREFIX, LIVE_PREFIX);
	mkdir (buffer, 0777);
	snprintf (buffer, MAX_DATA, "rm -fr %s/%s/*", PREFIX, LIVE_PREFIX);
	system (buffer);
	if ((pidf = fopen (PIDFile, "w")) == NULL)
	{
		PDEBUG ("Cannot open pidfile.\n");
		return -1;
	}
	fprintf (pidf, "%d\n", getpid ());
	fclose (pidf);
	return 0;
}

#ifdef HAVE_TS
int process_TS()
{
	struct sockaddr_in dest;
	int addr_len = sizeof (dest);
	struct Message m;
	int i;

	if ((i = recvfrom (TSSOCK, &m, sizeof (struct Message), 0, (struct sockaddr *)&dest, &addr_len)) <= 0)
	{
		PDEBUG ("Error in recving ts message.\n");
		register_cp ();
		return 0;
	}
	switch (m.type)
	{
		case TS2CP_WELCOME:
			memcpy (&UDPMsg, &m, 12);
			PDEBUG("recv WELCOME from TS.\n");
/* isSet whether TS has returned WELCOME message */
			isSet = 1;
			break;
		case TS2CP_PEERS:
			process_TS2CP_PEERS (m.buffer);
			break;
		case TS2CP_MSG:
			if (*(char *)(m.buffer+sizeof(short)))
			{
				PDEBUG ("Error in TS2CP_MSG. \n");
				register_cp ();
			}
			break;
		default:
			PDEBUG ("Error in trackerserver message format\n");
			register_cp ();
			return -1;
	}
	return 0;
}

int closure_TS ()
{
	struct TSMessage *m = &UDPMsg;

	m->type = CP2TS_LOGOUT;
	m->len = 12;
	sendMessage (TSSOCK, (char *)m, &TSADDR);
	return 0;
}

int process_TS2CP_PEERS (char *buf)
{
	struct Channel *pc;
	int listnum;
	char *channel_md5;
	unsigned char cpsize;
	struct NormalAddress *CPlist;
	unsigned char peersize;
	struct PeerInfoWithAddr *pinfo;


	channel_md5 = buf;
	buf += MD5_LEN;
	cpsize = *(unsigned char *)buf;
	buf += sizeof (char);
	CPlist = (struct NormalAddress *)buf;
	buf += sizeof(struct NormalAddress)*cpsize;
	peersize = *(unsigned char *)buf;
	buf += sizeof (char);
	pinfo = (struct PeerInfoWithAddr *)buf;
	// now find the channel
	if ((pc = findChannel (channel_md5, MD5_LEN)) == NULL) return -1;
	if ((listnum = newChannel (pc, CPlist, cpsize, TYPE_CP)) < 0)
		return -1;
	return 0;
}
#endif


char *parseECP (char *str, char *buf)
{
	char  *buffer = buf;
	int flag = -1;
	unsigned char c;
	unsigned char part;
	buf += sizeof (int);
	for (part=0; *str ;str++)
	{
		c = *str;
		switch (c)
		{
			case ':':
				flag = 0;
				break;
			case '.':
				if (flag < 2)
				{
					*(unsigned char *)buf = part;
					buf += sizeof (char);
				}
				part = 0;
				flag ++;
				break;
			default:
				if (c >= '0' && c <= '9')
					part = c;
				break;
		}
	}
	*(int *)buffer = (buf - buffer - sizeof (int))/sizeof(short);
	return buf;
}

int periodCheck (float KBPSused)
{
	struct Session *head;
	int max, listnum;
	struct statistics
	{
		unsigned int resnum;
		unsigned short connnum;
		float bandwidth;
	} stat;
	stat.bandwidth = KBPSused/(MAX_BANDWIDTH*1024)/8;

/* isSet whether TS has returned WELCOME message */
	if (isSet == 0)
	{
		register_cp ();
		return 0;
	}
	max = TRACKER[TYPE_P2PC].maxid + 1;
	head = TRACKER[TYPE_P2PC].head;
	memset (&stat, 0, sizeof(stat));
	for (listnum = 0; listnum < max; listnum++)
	{
		if (head[listnum].socket > 0)
		{
			stat.resnum ++;
			stat.connnum ++;
		}
	}
	max = TRACKER[TYPE_P2PS].maxid + 1;
	head = TRACKER[TYPE_P2PS].head;
	for (listnum = 0; listnum < max; listnum++)
	{
		if (head[listnum].socket > 0)
		{
			if (head[listnum].pc == NULL &&
				head[listnum].header == NULL &&
				CurrentTime - head[listnum].last_transferblock > MAX_TRANSFER_IDLE)
			{
				PDEBUG ("timeout %d from NP %d to %d.\n", listnum, head[listnum].last_transferblock, (int)CurrentTime);
				Clientclosure (listnum, TYPE_P2PS);
			}
			else
				stat.connnum ++;
		}
	}

#ifdef HAVE_TS
	*(unsigned int *)(UDPMsg.buffer) = stat.resnum;
	*(unsigned short *)(UDPMsg.buffer+sizeof(int)) = stat.connnum;
	*(float *)(UDPMsg.buffer+sizeof(int)+sizeof(short)) = stat.bandwidth;
	if(stat.connnum <= MAX_P2PS)
			*(unsigned char *)(UDPMsg.buffer+sizeof(int)+sizeof(short)+sizeof(float)) = 0;
		else
			*(unsigned char *)(UDPMsg.buffer+sizeof(int)+sizeof(short)+sizeof(float)) = 1;
	UDPMsg.len = 23;
	UDPMsg.type = CP2TS_UPDATE;
	if (sendMessage (TSSOCK, (char *)&UDPMsg, &TSADDR) < 0)
	{
		PDEBUG ("exit...\n");
		exit (1);
	}
#endif
	PDEBUG("Res Num: %d. Connection Num: %d. BandWidth Usage: %.4f. \n", stat.resnum, stat.connnum, stat.bandwidth);
	return 0;
}

void makeSnapShot(int count, int time_interval)
{
	time_t tmpTime;
	struct tm result;
	struct Channel *pc, *nextpc;
//	struct Session *ps;
//	struct Edge *pe;
	int cpchannelcount = 0;
	int totalclient = 0;
	long long totalupsize = 0, totaldownsize = 0;
	FILE *f;
	char buffer[MAX_DATA];

	if (time_interval <= 0)
		return;
	localtime_r(&CurrentTime, &result);
	sprintf (buffer, "./cp-%d-%d-%d.log", result.tm_year+1900, result.tm_mon+1, result.tm_mday);
	if ((f = fopen(buffer,"a")) == NULL)
	{
		PDEBUG("Couldn't open cp.log file! \n");
		return;
	}
	fseeko(f, 0, SEEK_END);

	// 1. start CP SnapShot 
	fprintf(f, "\n\n**************Start %d SnapShot of CP, Time : %u/%u %u:%u:%u.*********  \n",count,result.tm_mon+1, result.tm_mday, result.tm_hour, result.tm_min, result.tm_sec);

	// 2. log speed
	fprintf(f, "CP: cur Down %.4f KB. \n", ((float)tmpDownBytes)/1024/time_interval);
	fprintf(f, "CP: cur Up   %.4f KB. \n", ((float)tmpUpBytes)/1024/time_interval);
			
	periodCheck(((float)tmpDownBytes+tmpUpBytes)/1024/time_interval);
			
	totalDownBytes += tmpDownBytes;
	totalUpBytes += tmpUpBytes;
	tmpTime = CurrentTime - startTime; 
	fprintf(f, "CP: avg Down %.4f KB. \n", ((float)totalDownBytes)/1024/tmpTime);
	fprintf(f, "CP: avg Up   %.4f KB. \n", ((float)totalUpBytes)/1024/tmpTime);

	// 3. log channel state
	for (pc=ChannelList; pc; pc=nextpc)
	{
		nextpc = pc->lnext;
		++cpchannelcount;
		totalclient += pc->numclient;
		totalupsize += pc->upsize;
		totaldownsize += pc->downsize;
		fprintf(f,"Channel %s have %d client. Down size %lld, avg speed %f. Up Size %lld, avg speed %f. \n",pc->fname,pc->numclient,pc->downsize, ((float)(pc->downsize)) / time_interval, pc->upsize, ((float)(pc->upsize))/ time_interval);
			/*
			for (pe=pc->PeerHead; pe; pe = pe->cnext)
			{
				// if bitrate < 300kb/s ,then kill it
				if (pe->me->totalup/(CurrentTime - pe->me->time_sec)/1024 < 300)
				fprintf(f,"Session bitrate:%lld .Too slow ! \n",pe->me->totalup/(CurrentTime - pe->me->time_sec));
			}
			*/
		if (pc->numclient == 0)
			freeLiveChannel (pc, NULL);
		else if (CurrentTime - pc->last_nearpeer > NearPeerInterval)
		{
			send_nearpeers_toall (pc);
			pc->last_nearpeer = CurrentTime;
		}

	}
	fprintf(f,"Channel Count : %d. Total client : %d. Total dowsize: %lld. Total upsize %lld \n",cpchannelcount,totalclient,totaldownsize,totalupsize);
	
	fprintf(f,"\n*********************End of SnapShot************************\n");
	fclose(f);
	logto_xml (time_interval, tmpTime, cpchannelcount, totalclient);
	tmpDownBytes = tmpUpBytes = 0;
}

int reconnect (struct Session *p)
{
	struct NormalAddress *client;
	struct sockaddr_in addr;
	if (p->pc == NULL || p->pc->pcinfo->numofsp <= p->flag) return -1;
	
	close (p->socket);
	FD_CLR(p->socket, &osocks);
	if ((p->socket = socket (PF_INET, SOCK_STREAM, 0)) < 0)
	{
		perror ("socket||gethostbyname");
		p->socket = 0;
		return -1;
	}
	client = &(p->pc->pcinfo->SPLIST[p->flag]);
	memset (&addr, 0, sizeof (addr));
	addr.sin_port = client->sin_port;
	addr.sin_addr = client->sin_addr;
	addr.sin_family = AF_INET;
	p->flag ++;
	if ((p->sock_flag = connect_nonb(p->socket, &addr, sizeof (addr))) == -1)
	{
		close (p->socket);
		return -1;
	}
	
	FD_SET(p->socket, &osocks);
	p->time_sec = CurrentTime;
	p->totalup = 0;
	return p->flag;
}

int send_P2P_PUSHLIST (struct Channel *pc, int id)
{
	unsigned char *ptr;
	char buffer[MAX_DATA], *buf;
	struct LiveChannelInfo *pcinfo = pc->pcinfo;
	int i, j;
	
	buf = buffer+sizeof (int);
	*(unsigned char *)buf = P2P_PUSHLIST;
	buf += sizeof (char);
	memcpy (buf, pc->channel_md5, MD5_LEN);
	buf += MD5_LEN;
	*(unsigned char *)buf = 0;
	buf += sizeof (char);
	ptr = buf;
	buf += sizeof (char);
	*ptr = 0;
	for (i=id; (i >= pcinfo->s.minBlockID && i <= pcinfo->s.maxBlockID) && i<id+REQUEST_AHEAD; i++)
	{
		j = i % pcinfo->max_queue;
		if (pcinfo->indisk[j] == (i/pcinfo->max_queue+1) || isSet (pcinfo->bitflag, j))
			continue;
		*(int *)buf = i;
		buf += sizeof (int);
		(*ptr) ++;
		setBit (pcinfo->bitflag, j);
	}
	if (*ptr > 0)
	{
		*(unsigned char *)buf = 0;
		buf += sizeof (char);
		*(int *)buffer = buf - buffer;
		writeMessage (pcinfo->dataSource, pc, buffer);
	}
	return 0;
}

int send_nearpeers (struct Channel *pc, struct Edge *pme)
{
	char buffer[MAX_DATA], *buf, *ptr;
	struct Edge *pedge;
	int i;
	buf = buffer+sizeof (int);
	*(unsigned char *)buf = P2P_NEAR_PEERS;
	buf += sizeof (char);
	ptr = buf;
	buf += sizeof (char);
	for (i=0, pedge=pme->cnext; i<MAX_NEARPEER; pedge=pedge->cnext)
	{
		if (pedge == NULL)
			pedge = pc->PeerHead;
		if (pedge == pme)
			break;
		memcpy (buf, &(pedge->me->addr), sizeof (struct PeerInfoWithAddr));
		buf += sizeof (struct PeerInfoWithAddr);
		i++;
	}
	if (i > 0)
	{
		*(unsigned char *)ptr = i;
		*(int *)buffer = buf - buffer;
		writeMessage (pme->me, pc, buffer);
	}
	return 0;
}

int send_nearpeers_toall (struct Channel *pc)
{
	struct Edge *pedge;
	for (pedge=pc->PeerHead; pedge; pedge=pedge->cnext)
		send_nearpeers (pc, pedge);
	return 0;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -