⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tsnew.c

📁 mysee网络直播源代码Mysee Lite是Mysee独立研发的网络视频流媒体播放系统。在应有了P2P技术和一系列先进流媒体技术之后
💻 C
📖 第 1 页 / 共 5 页
字号:
	system ("ulimit -a");
	if (getrlimit (RLIMIT_CORE, &rl) != 0)
	{
		perror ("getrlimit");
		return -1;
	}
	fprintf (stderr, "Get core limit %d:%d\n", (int)rl.rlim_cur, (int)rl.rlim_max);
	rl.rlim_cur = rl.rlim_max = (rlim_t )10240000;
	if (setrlimit (RLIMIT_CORE, &rl) != 0)
	{
		perror ("getrlimit");
		return -1;
	}
	if (getrlimit (RLIMIT_CORE, &rl) != 0)
	{
		perror ("getrlimit");
		return -1;
	}
	fprintf (stderr, "Set core limit to %d:%d\n", (int)rl.rlim_cur, (int)rl.rlim_max);
	system ("ulimit -a");
#endif
	if (init_udpserver (&NPTRACKER, LOCALHOST, (int*)cfgTS4NP_PORT.ptr, MAX_NP, cfgTS4NP_PORT.size) < 0
#ifdef HAVE_RM
		|| init_udpserver (&RMTRACKER, LOCALHOST, (int*)cfgTS4RM_PORT.ptr, MAX_RM, cfgTS4RM_PORT.size) < 0
#endif
		|| init_udpserver (&CPTRACKER, LOCALHOST, (int*)cfgTS4CP_PORT.ptr, MAX_CP, cfgTS4CP_PORT.size) < 0)
	{
		return -1;
	}
#ifdef SORT_NET
	maxNet = readNETBLOCK (NETFN);
#endif

	statlog = fopen("./stat.log", "a+");// create file "stat.log" automatically if it does not exist. 
	if (statlog == NULL) {
		perror ("error opening statistics log file: stat.log.\n");
	}

	return 0;
}


int main(int argc, char **argv)
{
	int i, mode = 1;
//	struct itimerval t, ot;

	signal (SIGPIPE, SIG_IGN); // SIGPIPE is raised when the client closes the socket exceptionally
	                           // if not handled, SIGPIPE would cause unexpected termination.
//	signal (SIGINT, my_exit);
	// argv[1]: daemon mode, not clear
	// argv[2]: output status, not used and not clear
	if (argc > 1)
	{
		mode = atoi (argv[1]);
		if (argc > 2) OUTPUT_STAT = atoi (argv[2]);
	}
	if (mode == 0)
		daemon(1,1);	// run in the background

	// read configuration file. just ignored right now.
	// 参数为文件名, 一个struct NamVal *, 以及该struct NamVal的项数
	read_config (CONFIG, ConfigParameters, sizeof(ConfigParameters)/sizeof (struct NamVal));
	readconfig("ip.list");

	for (i=0; i<10; i++) // retry 10 times. not useful here.
	{
			FD_ZERO(&osocks);
			if (init_ts () < 0)
			{
				PDEBUG ("Error in initialization.\n");
				exit (1);
			}
#ifdef HAVE_MYSQL
			if ((local_mysql = init_mysql (MYSQL_HOST, MYSQL_USER, MYSQL_PASS, MYSQL_DB, "/var/run/mysqld/mysqld.sock")) == 0)
			{
				PDEBUG ("Error in init_mysql.\n");
				exit (1);
			}
#endif
			process_child ();
	}
	return 0;
}


//===============================================
//===== Here begin the message process part =====
//===============================================

int init_NP (struct Session *p)
{
	// maxid: maximum session index currently in the list. for optimization of search
	int listnum = p - NPTRACKER.head;	// this is the index!
	if (listnum > NPTRACKER.maxid)
		NPTRACKER.maxid = listnum;
	NPTRACKER.cur ++;			// cur is in fact the counter of sessions
	if (p->u.p.cur)		// if there is an edge, then the client is already in a channel
		PDEBUG ("NP %d in %d enter Channel %.32s(%d clients).\n", p-NPTRACKER.head, NPTRACKER.cur, p->u.p.cur->head->name, p->u.p.cur->head->numclient);
	else
		PDEBUG ("NP %d in %d no default channel.\n", p-NPTRACKER.head, NPTRACKER.cur);
	return 0;
}

int process_NP (int idsock)
{
	int len, listnum;
	struct Session *p;
	struct TSMessage *m = &UDPMsg;
#ifdef MEASUREMENT
	//struct timeval tm;
	//long long msec;
#endif
	socklen_t addr_len = sizeof (UDPCLIENT);

	memset ((char *)&UDPCLIENT, 0, sizeof (UDPCLIENT));
	if ((len = (recvfrom (NPTRACKER.sock[idsock], &UDPMsg, MAX_DATA, 0, (struct sockaddr *)&UDPCLIENT, &addr_len))) < 0)
	{
		perror ("recvfrom:");
		return -1;
	}

#ifdef MEASUREMENT
	//gettimeofday (&tm, NULL);
	//msec = ((long long)tm.tv_sec) * 1000000l + tm.tv_usec;
#endif
	PDEBUG ("type %d len %d.", m->type, m->len);

	if (m->type == NP2TS_LOGIN)
	{
		process_NP2TS_LOGIN ((struct Message *)m);
		++np2tsLoginCount;
	} else
	{
		listnum = m->authcode1; // index of session object
		p = NPTRACKER.head+listnum;
		// check the session: 1. bad index; 2. uninitialized or cleared; 3. not match
		if (listnum >= NPTRACKER.max || p->socket == 0
				|| p->auth != m->authcode2)
		{
			if (m->type != NP2TS_LOGOUT)
				SEND_NPMSG(NPTRACKER.sock[idsock],TS2NP_MSG,ERR_INTERNAL,1,&UDPCLIENT);
			return -1;
		}
		switch (m->type)
		{
			case NP2TS_REPORT: // 报告Interval信息,如果refresh为true, 则重置, 否则则先增加后删除.
				process_NP2TS_REPORT (p, m);
				++np2tsReportCount;
				break;
			case NP2TS_NEED_PEERS:
				PDEBUG("Need peers!!\n");
				process_NP2TS_NEED_PEERS (p, m);
				++np2tsNeedPeerCount;
				break;
			case NP2TS_LOGOUT: // 退出
				closure_NP (p);
				++np2tsLogoutCount;
				break;
			case NP2TS_RES_LIST: /* 发送当前NP的所有RESOURCE,使用addSession来进行处理, 
					        如果还没有这条边, 就添加. */

				process_NP2TS_RES_LIST (p, m);
				++np2tsResListCount;
				break;
			case NP2TS_REQ_RES: // 添加RES, 并返回Peers
				process_NP2TS_REQ_RES (p, m);
				++np2tsReqResCount;
				break;
			case NP2TS_DEL_RES: // 删除RES
				process_NP2TS_DEL_RES (p, m);
				++np2tsDelResCount;
				break;
			case NP2TS_QUERY_RES: //查询RES
				process_NP2TS_QUERY_RES (p, m);
				break;
			case NP2TS_REPORT2:
				process_NP2TS_REPORT2 (p, m);
				break;
			default:
				SEND_NPMSG(NPTRACKER.sock[idsock],TS2NP_MSG,ERR_INTERNAL,1,&UDPCLIENT);
				closure_NP (p);
				break;
		}
		p->last_access = CurTimeSec;
	}
	PDEBUG ("done\n");
#ifdef MEASUREMENT
	//gettimeofday (&tm, NULL);
	//PDEBUG ("msg type %d, len %d: %lld msec.\n", m->type, m->len, ((long long)tm.tv_sec) * 1000000l + tm.tv_usec - msec);
#endif
	return 0;
}

void logStat(struct Session *p)
{
		// 打印记录的时间和客户端的版本号
		fprintf(statlog, "%u %f", time(NULL), p->clientVer);
       fprintf(statlog, "%d:%d | %d:%d\n", p->host, p->port, p->intra, p->npport);
       fprintf(statlog, "%u\t%u\t%u\t%u\t%u\t%u\t%u\t%u\t%u\t%f\n", \
                       p->u.p.s.playingBlock - p->u.p.startBlock,\
                       p->u.p.s.currBufferTime,\
                       p->u.p.s.bufferCount,\
                       p->u.p.s.bufferTime,\
                       p->u.p.s.connFailCount,\
                       p->u.p.s.inConnections,\
                       p->u.p.s.outConnections,\
                       p->u.p.s.avgInConnTime,\
                       p->u.p.s.avgOutConnTime,\
                       p->u.p.s.messagePercent);
       fprintf(statlog, "%lld\t%lld\t%f\t%f\t%f\t%f\n",\
                       p->u.p.t.totalDownBytes,\
                       p->u.p.t.totalUpBytes,\
                       p->u.p.t.currDownSpeed,\
                       p->u.p.t.currUpSpeed,\
                       p->u.p.t.avgDownSpeed,\
                       p->u.p.t.avgUpSpeed);
                       
		current_log_count ++;
		if(current_log_count == MAX_LOG_COUNT)
		{
			fflush(statlog);
			current_log_count = 0;
		}
}

int closure_NP (struct Session *p)
{
	int i, id;
	struct Session *q;
	
        // write statistics to log file
        logStat(p);

	// 1. decrease <maxid> if current session is the last
	if ((i = p - NPTRACKER.head) == NPTRACKER.maxid && i > 0)
	{
		for (i--; NPTRACKER.head[i].socket == 0 && i> 0; i--);
		NPTRACKER.maxid = i;
	}
	
	// 2. delete corresponding session in the session-channel map
	delSession (p);

	// 3. remove session from the hash table
	id = hash_np (p->host, p->npport);
	if ((q = NPTRACKER.hash[id]) != p) // not head of chain
	{
		for (; q && q->hnext != p; q=q->hnext); // search through the chain for the parent of <p>
		assert (q);
		if (q) q->hnext = p->hnext; // remove
	} else NPTRACKER.hash[id] = p->hnext; // head of chain, got it

	// 4. clear and free session object to the freelist
	memset (p, 0, sizeof (struct Session)); // clear session
	p->hnext = NPTRACKER.hash[0];
	NPTRACKER.hash[0] = p;
	Polluted ++;
	NPTRACKER.cur --;
	return 0;
}

int init_CP (struct Session *p)
{	
	const char* servicetype;
	
	
	servicetype = find_cp_service_type(p->host);
	if(servicetype == NULL)
		servicetype = "UNKNOWN";
	strcpy(p->u.cp.servicetype, servicetype);

	PDEBUG("\n******************************************************************\ninit_CP: cp service type is %s\n", servicetype);

	int listnum = p - CPTRACKER.head;
	if (listnum > CPTRACKER.maxid)
		CPTRACKER.maxid = listnum;
	CPTRACKER.cur ++;
	GCPCHOICE = p;

//	add_cp_to_list((void*)p);

	return 0;
}

int process_CP (int idsock)
{
	int len, listnum;
	struct Session *p;
	struct TSMessage *m = &UDPMsg;
#ifdef MEASUREMENT
	struct timeval tm;
	long long msec;
#endif
	socklen_t addr_len = sizeof (UDPCLIENT);

	memset ((char *)&UDPCLIENT, 0, sizeof (UDPCLIENT));
	if ((len = (recvfrom (CPTRACKER.sock[idsock], &UDPMsg, MAX_DATA, 0, (struct sockaddr *)&UDPCLIENT, &addr_len))) < 0)
	{
		perror ("recvfrom:");
		return -1;
	}

#ifdef MEASUREMENT
	gettimeofday (&tm, NULL);
	msec = ((long long)tm.tv_sec) * 1000000l + tm.tv_usec;
#endif

	if (m->type == CP2TS_REGISTER)
	{
		process_CP2TS_REGISTER ((struct Message *)m);
	} else
	{
		listnum = m->authcode1;
		p = CPTRACKER.head+listnum;
		if (listnum >= CPTRACKER.max || p->socket == 0
				|| p->auth != m->authcode2)
		{
			if (m->type != CP2TS_LOGOUT)
			{
				PDEBUG("CP error. listnum=%d/%d. socket=%d auth=%d/%d\n", listnum, CPTRACKER.max, p->socket, p->auth, m->authcode2);
				SEND_NPMSG(CPTRACKER.sock[idsock],TS2CP_MSG,ERR_INTERNAL,1,&UDPCLIENT);
			}
			return -1;
		}
		switch (m->type)
		{
			case CP2TS_NEED_PEERS: // ECP查询用, 目前尚未使用
				process_CP2TS_NEED_PEERS (p, m);
				break;
			case CP2TS_UPDATE: // 报告CP负载
				process_CP2TS_UPDATE (p, m);
				break;
			case CP2TS_LOGOUT:
				closure_CP (p);
				break;
			default:
				closure_CP (p);
				break;
		}
		p->last_access = CurTimeSec;
	}
	return 0;
}

int closure_CP (struct Session *p)
{
	int i, id;
	struct Session *q;

	//PDEBUG("closure_CP.\n");
	//remove_cp_from_list((void*)p);
	//PDEBUG("closure_CP OK.\n");

	if ((i = p - CPTRACKER.head) == CPTRACKER.maxid && i > 0)
	{
		for (i--; CPTRACKER.head[i].socket == 0 && i> 0; i--);
		CPTRACKER.maxid = i;
	}
	if (GCPCHOICE == p)
	{
		for (i=CPTRACKER.maxid; i>=0; i--)
		{
			if (CPTRACKER.head[i].socket != 0 && CPTRACKER.head[i].u.cp.type == CT_GENERAL)
			{
				GCPCHOICE = &(CPTRACKER.head[i]);
				break;
			}
		}
	}
	id = hash_cp (p->host, p->npport);
	if ((q = CPTRACKER.hash[id]) != p)
	{
		for (; q && q->hnext != p; q=q->hnext);
		assert (q);
		if (q) q->hnext = p->hnext;
	} else CPTRACKER.hash[id] = p->hnext;
	memset (p, 0, sizeof (struct Session));
	p->hnext = CPTRACKER.hash[0];
	CPTRACKER.hash[0] = p;
	Polluted ++;
	CPTRACKER.cur --;
	return 0;
}

#ifdef HAVE_RM
int getChannelInfo (char *md5, char **buf)
{
	struct Channel *pc;
	int i, total=0;
	if (strcmp (md5, "*") == 0)
	{
		for (i=0; i<MAX_CHANNEL; i++)
		{
			for (pc=ChannelHash[i]; pc; pc=pc->next)
			{
				memcpy (*buf, pc->name, MD5_LEN);
				*buf += MD5_LEN;
				*(int *)(*buf) = pc->numclient;
				*buf += sizeof (int);
				total ++;
			}
		}
		return total;
	}
	if ((pc=findChannel (md5, MD5_LEN)) != NULL)
	{
		memcpy (*buf, md5, MD5_LEN);
		*buf += MD5_LEN;
		*(int *)(*buf) = pc->numclient;
		*buf += sizeof (int);
		return 1;
	}

	for (i=0; i<MAX_CHANNEL; i++)
	{
		for (pc=ChannelHash[i]; pc; pc=pc->next)
		{
			if (strstr (pc->name, md5) != NULL)
			{
				memcpy (*buf, pc->name, MD5_LEN);
				*buf += MD5_LEN;
				*(int *)(*buf) = pc->numclient;
				*buf += sizeof (int);
				total ++;
			}
		}
	}
	return total;
}

int init_RM (struct Session *p)
{
	int listnum = p - RMTRACKER.head;
	if (listnum > RMTRACKER.maxid)
		RMTRACKER.maxid = listnum;
	RMTRACKER.cur ++;
	return 0;
}
#define RM2TS_STAT_QUERY	0x20
#define TS2RM_STAT_RESPONSE	0x30
int process_RM (int idsock)
{
	char buffer[MAX_DATA];
	char *p, *buf = buffer;
	int * psize;
	int querynum;
	int len, total, i;
	struct Message Msg, *m=&Msg;
#ifdef MEASUREMENT
	struct timeval tm;
	long long msec;
#endif
	socklen_t addr_len = sizeof (UDPCLIENT);

	memset ((char *)&UDPCLIENT, 0, sizeof (UDPCLIENT));
	if ((len = (recvfrom (RMTRACKER.sock[idsock], m, MAX_DATA, 0, (struct sockaddr *)&UDPCLIENT, &addr_len))) < 0)
	{
		perror ("recvfrom:");
		return -1;
	}

#ifdef MEASUREMENT
	gettimeofday (&tm, NULL);
	msec = ((long long)tm.tv_sec) * 1000000l + tm.tv_usec;
#endif
	PDEBUG("got RM msg, len %d. \n", len);
	switch (m->type)
	{
		case RM2TS_STAT_QUERY:

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -