⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 spnew.c

📁 mysee网络直播源代码Mysee Lite是Mysee独立研发的网络视频流媒体播放系统。在应有了P2P技术和一系列先进流媒体技术之后
💻 C
📖 第 1 页 / 共 3 页
字号:
/*
 *  Openmysee
 *
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; either version 2 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program; if not, write to the Free Software
 *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 *
 */
				 
#include "echo.h"
struct Argument
{
	int type;
	int spchannelcount;
	int totalclient;
	long long totalupsize;
	FILE *f, *xml[MAX_TS];
	char *buf;
};
#define SPUPDATE_SLOT		1

#define MAX_CHANNEL		1024	/* max number of Channel */

#define SP4CP_PORT		50001

#define TS4RM_PORT		22169
#define RM2TS_STAT_QUERY	0x20
#define TS2RM_STAT_RESPONSE	0x30

int MAX_CP=512;
int MAX_CS=32;

int MAX_JOB_PER_SESSION=3;

char *Home = "./";
char *PREFIX = "/data/sp/";
char *Database = "user.db";
char *CONFIG = "./asp.cfg";
char *PIDFile = "/var/run/spnew.pid";
char *RULE = "";
char *SERVERIP;
char *DBFILE;
char *MEDIABASE;

char *AUTH_MD5;
int AUTH_USERID;

fd_set osocks;
int NumNewChannel;

struct Message TempMsg;

char *NET_NAME[] = { "edu", "cnc", "uni", "tel" };

long long totalDownBytes=0, totalUpBytes=0;
long long tmpDownBytes=0, tmpUpBytes=0;

char *urlroot;
char *defaultspip;
char *spip[4];
char *tsip[4];
#ifdef TEST
char *WWW_ROOT;
#endif
#ifdef HAVE_MYSQL
char *MYSQL_HOST;
char *MYSQL_USER;
char *MYSQL_PASS;
char *MYSQL_DB;
#endif
char *CAS_ADDR;     // Content Aggregation Server, added by lixingwu, 20070313

int tsSock[MAX_TS];
struct sockaddr_in tsAddr[MAX_TS];

extern struct Channel *ChannelHash[MAX_CHANNEL];
extern struct Channel *ChannelList;
extern int isDelete (struct Channel *pc);

extern int errno;

int JobHighWater = 10000;
int BINDALL=1;
int AuthCS=0;
time_t startTime;
time_t CurrentTime;
int PERIOD;
int SnapShotInterval;		//make a snapshot every few second
int spuprecord[MAX_CHANNEL];
struct ServerDesc TRACKER[MAX_TYPE];

char *LOGXML;

struct NamVal ConfigParameters[] = {
	{"DBdir", &Home, 's'},
	{"AuthCS", &AuthCS, 'd'},
	{"MAX_CP",&MAX_CP, 'd'},
	{"MAX_CS", &MAX_CS, 'd'},
	{"Pidfile", &PIDFile, 's'},
	{"Prefix", &PREFIX, 's'},
	{"DBfile", &Database, 's'},
	{"TrackerIP", &SERVERIP, 's'},
	{"Dir", &MEDIABASE, 's'},
	{"authid", &AUTH_USERID, 'd'},
	{"authmd5", &AUTH_MD5, 's'},
	{"periodDump", &PERIOD, 'd'},	//make a periodCheck every periodDump second
	{"UrlRoot", &urlroot, 's'},
	{"EDUSPIP", &spip[EDUTS], 's'},
	{"CNCSPIP", &spip[CNCTS], 's'},
	{"UNISPIP", &spip[UNITS], 's'},
	{"TELSPIP", &spip[TELTS], 's'},
	{"BINDALL", &BINDALL, 'd'},
	{"EDUTSIP", &tsip[EDUTS], 's'},
	{"CNCTSIP", &tsip[CNCTS], 's'},
	{"UNITSIP", &tsip[UNITS], 's'},
	{"TELTSIP", &tsip[TELTS], 's'},
	{"SnapShotInterval", &SnapShotInterval, 'd'},
	{"LogFilePath", &LOGXML, 's'},
#ifdef TEST
	{"WWWRoot", &WWW_ROOT, 's'},
#endif
#ifdef HAVE_MYSQL
	{"MysqlAddress", &MYSQL_HOST, 's'},
	{"User", &MYSQL_USER, 's'},
	{"Database", &MYSQL_DB, 's'},
	{"Password", &MYSQL_PASS, 's'},
#endif
	{"AllowCSList", &RULE, 's'},
	{"JobHighWater", &JobHighWater, 'd'},
    {"CAS_ADDR", &CAS_ADDR, 's'}     // Content Aggregation Server, added by lixingwu, 20070313
};

extern int db_end ();
extern int db_init (char *home, char *database);
extern int isAllowed (int id, char *md5, char *cname, float bitrate,
		      float *limitedbitrate, int *issave);
extern char *getJobBuffer (struct JobDes *p, int *max);
extern inline void setblockId (struct JobDes *pj, int id);
#ifdef TEST
int buildGTV (struct Channel *pc, int datalen, char *data, int type);
#endif
int send_P2P_SPUPDATE (struct Session *p, struct Channel *pc, char *md5, struct SPUpdate *s);
int send_p2p_err (struct Session *p, unsigned short code, int quit);
int init_sp ();
int handle_new_connection (int sock, int type);
int Clientclosure (int listnum, int type);
void process_child (void);
int init_CP (int listnum);
int process_CP (int listnum);
int closure_CP (int listnum);
int init_CS (int listnum);
int process_CS (int listnum);
int closure_CS (int listnum);
#ifdef HAVE_MYSQL
MYSQL *local_mysql;
#endif
int process_P2P_REQUEST_real (struct Session *p, struct Channel *pc, int id);
void apply_check (struct Channel *p, void *arg);
void apply_idle (struct Session *p, void *arg);
int period_process ();
void process_TS2RM (int type);

extern int locate_mplist_by_id (struct Channel *pc, unsigned int id, char *buf, int max);
extern int check_newplist ();
extern int timer_free ();
extern int timer_process (unsigned int t);
extern void hup_handler (int);

#include "sessions.c"

#define BUILD_NOCH_SPUPDATE(s)		do\
{\
	s.minKeySample = -1LL;\
	s.maxKeySample = -1LL;\
	s.minBlockID = 0xffffffff;\
	s.maxBlockID = 0xffffffff;\
} while (0)

#define BUILD_ORDER_SPUPDATE(s,pc)	do\
{\
	s.minKeySample = -1LL;\
	s.maxKeySample = -1LL;\
	s.minBlockID = 0;\
	s.maxBlockID = (pc->downsize > 0 ? ((pc->downsize - 1) / DEFAULT_BLOCK) : 0)+1;\
} while (0)

#define BUILD_CLOSE_SPUPDATE(s)		do\
{\
	s.minKeySample = 0;\
	s.maxKeySample = 0;\
	s.minBlockID = 0xffffffff;\
	s.maxBlockID = 0xffffffff;\
} while (0)

#define BUILD_MLIST_SPUPDATE(s,pc)	do\
{\
	s.minKeySample = -2LL;\
	s.maxKeySample = pc->pcinfo->mlist->m_totalchannel;\
	s.minBlockID = 0;\
	s.maxBlockID = pc->pcinfo->maxID;\
} while (0)

#define BUILD_LIVE_SPUPDATE(spupdate,pc)	do\
{\
	spupdate.minKeySample = -3LL;\
	spupdate.maxKeySample = 1;\
	spupdate.minBlockID = pc->pcinfo->s.minBlockID;\
	spupdate.maxBlockID = pc->pcinfo->s.maxBlockID;\
} while (0)

void send_all_spupdate (struct Channel *pc, struct SPUpdate *s)
{
	struct LiveChannelInfo *pcinfo = pc->pcinfo;
	struct Edge *pedge;
	char buffer[MAX_DATA], *buf;
	if (pcinfo == NULL) return;

	pcinfo->updated = CurrentTime;
	buf = buffer + sizeof (int);
	*(unsigned char *) buf = P2P_SPUPDATE;
	buf += sizeof (char);
	memcpy (buf, pc->channel_md5, MD5_LEN);
	buf += MD5_LEN;
	if (s == NULL)
		memset (buf, 0, sizeof (struct SPUpdate));
	else
		memcpy (buf, s, sizeof (struct SPUpdate));
	buf += sizeof (struct SPUpdate);
	*(int *) buffer = buf - buffer;
	for (pedge = pc->PeerHead; pedge; pedge = pedge->cnext)
	{
		if (writeMessage (pedge->me, pc, buffer) < 0)
		{
			PDEBUG ("send SPUPDATE err.\n");
		}
	}
}

void
process_TS2RM (int type)	//assure num of channel 
{
	int len;
	char buffer[4096];
	char *buf;
	struct sockaddr_in addr;
	int addrlen = sizeof (struct sockaddr);
	int i, msgsize, chnlnum;
	struct Channel *pChannel;

	if ((len =
	     recvfrom (tsSock[type], buffer, 4096, 0,
		       (struct sockaddr *) &addr, &addrlen)) < 0)
	{
		perror ("recvfrom ts");
		return;
	}
	PINFO ("got TS msg, len %d. \n", len);
	buf = buffer;
	msgsize = *(int *) buf;
	buf += sizeof (int);
	if (*(unsigned char *) buf != TS2RM_STAT_RESPONSE)
	{
		perror ("bad message type from ts");
		return;
	}
	buf += sizeof (unsigned char);
	chnlnum = *(int *) buf;
	buf += sizeof (int);
	if (chnlnum > 100)
	{
		perror ("too large channel num from ts");
		return;
	}
	for (i = 0; i < chnlnum; ++i)
	{
		if ((pChannel = findChannel (buf, MD5_LEN)) == NULL)
		{
			buf += MD5_LEN;
			buf += sizeof (int);
			continue;
		}
		buf += MD5_LEN;
		pChannel->numofnp[type] = *(int *) buf;
		buf += sizeof (int);

		PINFO ("%s->%s: client %d.\n",
			pChannel->pcinfo ? pChannel->channel_name : "",
			pChannel->channel_md5, pChannel->numofnp[type]);
	}
}

time_t lastCheck, last_snapshot;

int
period_process ()
{
	int type;
	time_t tmpTime, time_interval;
	struct Argument arg;
	char buffer[MAX_DATA];
	struct tm result;
	int *querynum;
	static int snapCount = 0;

	timer_process (CurrentTime);
	check_newplist ();
	if (CurrentTime <= lastCheck + PERIOD)	//PERIOD =periodDump=60
		return 0;

	memset (&arg, 0, sizeof (arg));
	tmpTime = CurrentTime - startTime;
	time_interval = CurrentTime - lastCheck;
#ifdef HAVE_MYSQL
	query_mysql (local_mysql, "delete from channel where '1'");
#endif
#ifdef TEST
	for (type=0; type<MAX_TS; type++)
	{
		snprintf (buffer, MAX_DATA, "%s/%s/channel.xml", WWW_ROOT,
		  	NET_NAME[type]);
		if ((arg.xml[type] = fopen (buffer, "w")) == (FILE *) 0)
		{
			PDEBUG ("Error in open file %s\n", buffer);
			continue;
		}
		fprintf (arg.xml[type],
		 	"<?xml version=\"1.0\" encoding=\"gbk\"?>\r\n<?xml:stylesheet type=\"text/xsl\" href=\"simple.xsl\" ?>\r\n<GaoV>\r\n");
	}
#endif
	localtime_r (&CurrentTime, &result);
	snprintf (buffer, MAX_DATA, "./sp-%d-%d-%d.log", result.tm_year+1900, result.tm_mon+1, result.tm_mday);
	if ((arg.f = fopen (buffer, "a")) == NULL)
	{
		PDEBUG ("Couldn't open sp log file! \n");
		return -1;
	}
	fprintf (arg.f, "\n\n********************Start %d SnapShot of SP, Time: %u/%u %u:%u:%u.\n",
		snapCount++, result.tm_mon + 1, result.tm_mday, result.tm_hour,
	 	result.tm_min, result.tm_sec);

	fprintf(arg.f, "SP: cur Down %.4f KB. \n", ((float)tmpDownBytes)/1024/time_interval);
	fprintf(arg.f, "SP: cur Up   %.4f KB. \n", ((float)tmpUpBytes)/1024/time_interval);
	totalDownBytes += tmpDownBytes;
	totalUpBytes += tmpUpBytes;
	fprintf(arg.f, "SP: avg Down %.4f KB. \n", ((float)totalDownBytes)/1024/tmpTime);
	fprintf(arg.f, "SP: avg Up   %.4f KB. \n", ((float)totalUpBytes)/1024/tmpTime);

	arg.buf = buffer + sizeof (int);
	*(unsigned char *) (arg.buf) = RM2TS_STAT_QUERY;//RM2TS=0x20
	arg.buf += sizeof (unsigned char);
	querynum = (int *) (arg.buf);
	arg.buf += sizeof (int);

	arg.type = TYPE_CS;
	apply_session (TRACKER[TYPE_CS].head, TRACKER[TYPE_CS].maxid+1, apply_idle, &arg);
/*
	arg.type = TYPE_CP;
	apply_session (TRACKER[TYPE_CP].head, TRACKER[TYPE_CP].maxid+1, apply_idle, &arg);
*/
	apply_list (ChannelList, apply_check, &arg);
#ifdef TEST
	for (type=0; type<MAX_TS; type++)
	{
		if (arg.xml[type] == NULL)
			continue;
		fprintf (arg.xml[type], "</GaoV>\r\n");
		fclose (arg.xml[type]);
	}
#endif
	fprintf (arg.f, "Channel Count : %d.Total client : %d . Total upsize :%lldB . \n",
	 	arg.spchannelcount, arg.totalclient, arg.totalupsize);
		fprintf (arg.f,
	 	"\n************************END SnapShot*********************\n");
	fclose (arg.f);

    // added by lixingwu, 20070313
    // upload channel.xml to server, only edu
    char upload_cmd[MAX_DATA];
    sprintf(upload_cmd, "curl -F filename=@%s/%s/channel.xml -F domain=%s %s",
        WWW_ROOT, NET_NAME[EDUTS], spip[EDUTS], CAS_ADDR);
    printf("%s\n", upload_cmd);
    system(upload_cmd);

#ifdef TEST
	if (arg.spchannelcount > 0)
	{
		*querynum = arg.spchannelcount;
		*(int *) buffer = arg.buf - buffer;
		for (type=0; type<MAX_TS; type++)
		{
			if (tsSock[type] <= 0)
				continue;
			if (sendto (tsSock[type], buffer, *(int *) buffer, 0, (struct sockaddr *) &tsAddr[type], sizeof (struct sockaddr_in)) != *(int *) buffer)
			{
				PDEBUG ("sent to ts %d error. \n", type);
				return 0;
			}
		}
	}
#endif
	logto_xml (time_interval, tmpTime, arg.spchannelcount, arg.totalclient);
	lastCheck = CurrentTime;
	last_snapshot ++;
	tmpDownBytes = tmpUpBytes = 0;
	if (last_snapshot > SnapShotInterval)
	{
		system ("/usr/bin/vmstat -a >> sp.log 2>&1 &");
		last_snapshot = 0;
	}
	return 0;
}


int
main (int argc, char **argv)
{
	int i, mode = 1;

	if (argc < 2)
	{
		printf ("usage: %s mode(0 for daemon, 1 for console).\n",
			argv[0]);
		return -1;
	}
	signal (SIGPIPE, SIG_IGN);
	signal (SIGINT, terminate);
	signal (SIGHUP, hup_handler);

	mode = atoi (argv[1]);

	if (mode == 0)
		daemon (1, 1);

	read_config (CONFIG, ConfigParameters,
		     sizeof (ConfigParameters) / sizeof (struct NamVal));
	for (i = 0; i < 10 && IN_LOOP > 0; i++)
	{
		/*
		   pid_t pid;
		   if ((pid = fork ()) == 0)
		   {
		 */
		FD_ZERO (&osocks);
		if (init_sp () < 0)	// || initLOG () < 0)
		{
			PDEBUG ("init_sp error, exit...\n");
			exit (-1);
		}
		process_child ();
		/*
		   } else if (pid < 0)
		   {
		   perror ("fork");
		   exit (pid);
		   } else

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -