⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cpnew.c

📁 mysee网络直播源代码Mysee Lite是Mysee独立研发的网络视频流媒体播放系统。在应有了P2P技术和一系列先进流媒体技术之后
💻 C
📖 第 1 页 / 共 3 页
字号:
/*
 *  Openmysee
 *
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; either version 2 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program; if not, write to the Free Software
 *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 *
 */
				 
#include "echo.h"

#define TYPE_NP			0
#define TYPE_CP			1
#define TYPE_SCP		2
#define TYPE_ECP		3
#define TYPE_GCP		4

#define MEDIATYPE_FIRST		0.149

#define MAX_IDLE		90	/* destruct a connection to sp/np after 90s */
//#define P2PS_PORT		50002
#define TS4CP_PORT		22168

#define REQUEST_AHEAD		15	/* Request how many blocks if a block doesn't exist in cache */

int MAX_P2PS=500;
int MAX_P2PC=512;
int MAX_JOB_PER_SESSION=3;

//char *CONFIG = "./acp.cfg";
char *PREFIX="/data/cp/";
char *SCP_CHANNEL;
char *ECP_REGION;
int isGCP;
char *SERVERIP;
char *BINDIP;
char *PIDFile="/var/run/cpnew.pid";
int MAX_BANDWIDTH;

char *AUTH_MD5;
int  AUTH_USERID;

fd_set osocks;
int BINDALL;
#ifdef HAVE_TS
int TSSOCK = 0;
//#define RANDOM_PORT	3947
#endif
time_t CurrentTime;
time_t startTime;
int SnapShotInterval;
int NearPeerInterval = 30;

int isSet = 0;					/* whether TS has returned WELCOME message */
struct TSMessage UDPMsg;

extern struct Channel *ChannelHash[MAX_CHANNEL];
extern struct Channel *ChannelList;

struct sockaddr_in TSADDR;
socklen_t addrlen = sizeof (struct sockaddr_in);

// calculate avg & cur speed
long long totalDownBytes=0, totalUpBytes=0;
long long tmpDownBytes=0, tmpUpBytes=0;

struct ServerDesc TRACKER[MAX_TYPE];

char *LOGXML;

extern int errno;

int JobHighWater = 10000;
int MaxNPPerChannel = 300;
int cfgP2PS_PORT = 23;
int cfgCP2TS_PORT = CP2TS_PORT;

struct NamVal ConfigParameters[]
= 
{
{"Prefix", &PREFIX, 's'},
{"MAX_NP", &MAX_P2PS, 'd'},
{"MAX_SP", &MAX_P2PC, 'd'},
{"Pidfile", &PIDFile, 's'},
{"TrackerIP", &SERVERIP, 's'},
{"SCP", &SCP_CHANNEL, 's'},
{"GCP", &isGCP, 'd'},
{"authid", &AUTH_USERID, 'd'},
{"authmd5", &AUTH_MD5, 's'},
{"ECP", &ECP_REGION, 's'},
{"BandWidth", &MAX_BANDWIDTH, 'd'},
{"BindIP", &BINDIP, 's'},
{"SnapShotInterval",&SnapShotInterval,'d'},
{"CP4NP_PORT", &cfgP2PS_PORT, 'd'},
{"CP2TS_PORT", &cfgCP2TS_PORT, 'd'},
{"LogFilePath", &LOGXML, 's'},
{"NearPeerInterval", &NearPeerInterval, 'd'},
{"JobHighWater", &JobHighWater, 'd'},
{"MaxNPPerChannel", &MaxNPPerChannel, 'd'},
{"BINDALL", &BINDALL, 'd'}
};

int register_cp ();
int init_cp ();
int handle_new_connection(int sock, int type);
int Clientclosure (int listnum, int type);
void process_child (void);
int init_P2PS (int listnum);
int process_P2PS (int listnum);
int closure_P2PS (int listnum);
int init_P2PC (int listnum);
int process_P2PC (int listnum);
int closure_P2PC (int listnum);
int sendMessage (int sock, char *ptr, struct sockaddr_in *dest);
int process_TS2CP_PEERS (char *buf);
void process_type (int type, fd_set *socks, fd_set *wsocks, fd_set *esocks);
int reconnect (struct Session *p);
char *parseECP (char *str, char *buf);
int closure_TS ();
int periodCheck (float KBPSused);
void makeSnapShot(int count, int time_interval);
int send_nearpeers (struct Channel *pc, struct Edge *pme);
int send_nearpeers_toall (struct Channel *pc);
#ifdef HAVE_TS
int process_TS();
#endif
int send_P2P_PUSHLIST (struct Channel *pc, int id);
int period_process (void);
extern char *getJobBuffer (struct JobDes *p, int *max);
extern inline void setblockId (struct JobDes *pj, int id);

#include "sessions.c"
#define INIT_MAXQ(pc,s,maxq)		do\
{\
	if (s->maxBlockID == 0) return -1;\
	if (s->maxBlockID >= MAX_QUEUE) maxq = MAX_QUEUE;\
	else maxq = s->maxBlockID;\
	if (pc->pcinfo->indisk == NULL)\
	{\
		pc->pcinfo->max_queue = maxq;\
		pc->pcinfo->indisk = calloc (maxq, 1);\
		pc->pcinfo->bitflag = calloc ((maxq+7)/8, 1);\
		if (pc->pcinfo->indisk == NULL || pc->pcinfo->bitflag == NULL)\
			return -1;\
	} else if (maxq < pc->pcinfo->max_queue)\
	{\
		pc->pcinfo->max_queue = maxq;\
		pc->pcinfo->indisk = realloc (pc->pcinfo->indisk, maxq);\
		pc->pcinfo->bitflag = realloc (pc->pcinfo->bitflag, (maxq+7)/8);\
		if (pc->pcinfo->indisk == NULL || pc->pcinfo->bitflag == NULL)\
			return -1;\
	}\
} while (0)

int period_process (void)
{
	static time_t last_snapshot;
	static int snapCount = 0;

	if (CurrentTime - last_snapshot > SnapShotInterval)
	{
		makeSnapShot(snapCount++, CurrentTime-last_snapshot);
		system("/usr/bin/vmstat -a >> cp.log 2>&1 &");
		last_snapshot = CurrentTime;
	}
	return 0;
}
		
int main(int argc, char **argv)
{
	int i, mode = 1, tmp = 0;

	if (argc < 2)
	{
		printf("usage: %s mode(0 for daemon, 1 for console).\n", argv[0]);
		return -1;
	}
	signal (SIGPIPE, SIG_IGN);
	signal (SIGINT, terminate);

	mode = atoi (argv[1]);

	if (mode == 0)
		daemon(1,1);

	read_config (CONFIG, ConfigParameters, sizeof(ConfigParameters)/sizeof(struct NamVal));

	for(i = 0 ; i < argc; ++i)
	{
		if(strncmp(argv[i], "tcp=", 4) == 0)
		{
			tmp = atoi(argv[i]+4);
			if(tmp > 0 && tmp < 65535)
				cfgP2PS_PORT = tmp;
		}
		else if(strncmp(argv[i], "udp=", 4) == 0)
		{
			tmp = atoi (argv[i]+4);
			if(tmp > 0 && tmp < 65535)
				cfgCP2TS_PORT = tmp;

		}
	}

	for (i=0; i<10 && IN_LOOP > 0; i++)
	{
		/*
		pid_t pid;
		if ((pid = fork ()) == 0)
		{
		*/
			FD_ZERO(&osocks);
			if (init_cp () < 0)// || initLOG () < 0)
			{
				PDEBUG ("init_cp error, exit...\n");
				exit (-1);
			}
			process_child ();
		/*
		} else if (pid < 0)
		{
			perror ("fork");
			exit (pid);
		} else
		{
			waitpid (pid, NULL, 0);
		}
		*/
	}
	return 0;
}

int init_P2PS (int listnum)
{
	return 0;
}

int process_P2P_HELLO (struct Session *p, struct Message *m)
{
	struct Edge *pedge = NULL;
	float version = *(float *)(m->buffer);
	char *buf = m->buffer + sizeof (float);
	struct Channel *pc;
	struct LiveChannelInfo *pcinfo;
	int listnum;
//	char *buf, buffer[MAX_DATA];

	PINFO ("RECV P2P_HELLO. \n");
	p->version = version;
	listnum = p - TRACKER[TYPE_P2PS].head;
	if ((pc = findChannel (buf, MD5_LEN)) == NULL)
	{
		if ((pc = newLiveChannel (buf, NULL, buf, 0, 0)) != (struct Channel *)0)
		{
			pedge=newEdge (pc, p);
			pc->numclient ++;
		} else
		{
			Clientclosure (listnum, TYPE_P2PS);
			return -1;
		}
		p->pc = pc;
	} else
	{
		if (pc->numclient > MaxNPPerChannel)
		{
			Clientclosure (listnum, TYPE_P2PS);
			return -1;
		}
		for (pedge=p->header; pedge && pedge->head != pc; pedge=pedge->enext);
		if (pedge == NULL)
		{
			pedge=newEdge (pc, p);
			pc->numclient ++;
		}
		p->pc = pc;
		if (pc->pcinfo->dataSource) 
		{
			/*
			// connect exist, send p2p_hello to check channel state.
			buf = buffer+sizeof (int);
			*(unsigned char *)buf = P2P_HELLO;
			buf += sizeof (char);
			memcpy (buf, pc->channel_md5, MD5_LEN);
			buf += MD5_LEN;
			*(unsigned char *)buf = 0;
			buf += sizeof (char);
			*(int *)buffer = buf - buffer;
			if (writeMessage (pc->pcinfo->dataSource, buffer) < 0)
			{
				Clientclosure (listnum, TYPE_P2PC);
				return -1;
			}
			PDEBUG("sent P2P_HELLO to SP. \n");
			*/
			buf += MD5_LEN + sizeof (char);
			memcpy (&(p->addr), buf, sizeof (p->addr));
			if (pedge) send_nearpeers (pc, pedge);
			return 0;
		}
		PDEBUG("NO connection to SP. try connect.\n");
	}
	buf += MD5_LEN + sizeof (char);
	memcpy (&(p->addr), buf, sizeof (p->addr));
	buf += sizeof (p->addr);

	pcinfo = pc->pcinfo;
	pcinfo->numofsp = (unsigned char)*buf;
	buf += sizeof (char);
	if (pcinfo->numofsp == 0 || pcinfo->numofsp > MAX_REPSP)
	{
		Clientclosure (listnum, TYPE_P2PS);
		return -1;
	}
	memcpy (&(pcinfo->SPLIST), buf, pcinfo->numofsp*sizeof(struct NormalAddress));
	buf += pcinfo->numofsp*sizeof (struct NormalAddress);
	if (buf - m->buffer + NORMAL_HEADER > m->len)
	{
		PDEBUG ("Invalid message %d, length %d not enough\n", m->type, m->len);
		Clientclosure (listnum, TYPE_P2PS);
		return -1;
	}
	if (isGCP || SCP_CHANNEL)
	{
		if ((listnum = newChannel (pc, pcinfo->SPLIST, pcinfo->numofsp, 0)) < 0)
		{
			PDEBUG("Unable connect to SP.\n");
			Clientclosure (p-TRACKER[TYPE_P2PS].head, TYPE_P2PS);
			return -1;
		}
	} else	/* ECP, need peers */
	{
#ifdef HAVE_TS
		UDPMsg.type = CP2TS_NEED_PEERS;
		UDPMsg.len = 12+MD5_LEN;
		memcpy (UDPMsg.buffer, pc->channel_md5, MD5_LEN);
/* isSet whether TS has returned WELCOME message */
		if (isSet && sendMessage (TSSOCK, (char *)&UDPMsg, &TSADDR) < 0)
		{
			PDEBUG ("exit...\n");
			exit (1);
		}
#endif
	}
	if (pedge) send_nearpeers (pc, pedge);
	return 0;
}

int process_P2P_MEDIATYPE (int listnum, struct Message *m)
{
	struct Channel *pc;
	char *buf, *media, *name, *channel_name;
	int start, length, size, proglen, chnllen;

	buf = m->buffer;
	if ((pc = findChannel (buf, MD5_LEN)) == NULL)
		return -1;
	buf += MD5_LEN;
	start = *(int *)buf;
	buf += sizeof (int);
	length = *(int *)buf;
	buf += sizeof (int);
	size = *(int *)buf;
	buf += sizeof (int);
	media = buf;
	buf += size;
	proglen = *(unsigned char *)buf;
	buf += sizeof (char);
	name = buf;
	buf += proglen;
	buf += sizeof (int);
	chnllen = *(unsigned char *)buf;
	buf += sizeof (char);
	channel_name = buf;
	buf += chnllen;
	if (buf - (char *)m > m->len)
		return -1;
	addMedia (pc, start, length, size, media, name, channel_name);
	return 0;
}

int process_P2P_PUSHLIST (struct Session *p, struct Message *m)
{
	struct Channel *pc;
	struct Edge *pedge;
	char *buf;
	int i, type, listnum, size;

	listnum = p - TRACKER[TYPE_P2PS].head;
	buf = m->buffer;
	if (p->npcp == TYPE_CP)
	{
		if ((pc = findChannel (buf, MD5_LEN)) == NULL)
		{
			if ((pc = newLiveChannel (m->buffer, NULL, m->buffer, 0, 0)) != (struct Channel *)0)
			{
				pedge=newEdge (pc, p);
				pc->numclient ++;
			}
		} else
		{
			for (pedge=p->header; pedge && pedge->head != pc; pedge=pedge->enext);
			if (pedge == NULL)
			{
				pedge=newEdge (pc, p);
				pc->numclient ++;
			}
		}
		buf += MD5_LEN;
	} else
	{
		pc = p->pc;
	}

	if (pc == NULL && p->npcp != TYPE_CP)
	{
		Clientclosure (listnum, TYPE_P2PS);
		return -1;
	}
	if (p->numjob >= MAX_JOB_PER_SESSION)
		return -2;

	type = *(unsigned char *)buf;
	buf += sizeof (char);
	size = *(unsigned char *)buf;
	buf += sizeof (char);
	if (type)
	{
		deleteChannel (p, pc);
		for (i=0; i<size; i++)
			if (process_P2P_REQUEST_real (p, pc, ((unsigned int *)buf)[i]) < 0)
				return -1;
	} else
	{
		for (i=0; i<size; i++)
			if (process_P2P_REQUEST_real (p, pc, ((unsigned int *)buf)[i]) < 0)
				return -1;
		buf += size*sizeof(int);
		size = *(unsigned char *)buf;
		buf += sizeof (char);
		deleteJob (p, pc, (unsigned int *)buf, size);
	}
	if (buf - m->buffer + NORMAL_HEADER > m->len)
	{
		PDEBUG ("Invalid message %d, length %d not enough\n", m->type, m->len);
		Clientclosure (listnum, TYPE_P2PS);
		return -1;
	}

	return 0;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -