📄 spnew.c
字号:
/*
* Openmysee
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
*/
#include "echo.h"
struct Argument
{
int type;
int spchannelcount;
int totalclient;
long long totalupsize;
FILE *f, *xml[MAX_TS];
char *buf;
};
#define SPUPDATE_SLOT 1
#define MAX_CHANNEL 1024 /* max number of Channel */
#define SP4CP_PORT 50001
#define TS4RM_PORT 22169
#define RM2TS_STAT_QUERY 0x20
#define TS2RM_STAT_RESPONSE 0x30
int MAX_CP=512;
int MAX_CS=32;
int MAX_JOB_PER_SESSION=3;
char *Home = "./";
char *PREFIX = "/data/sp/";
char *Database = "user.db";
char *CONFIG = "./asp.cfg";
char *PIDFile = "/var/run/spnew.pid";
char *RULE = "";
char *SERVERIP;
char *DBFILE;
char *MEDIABASE;
char *AUTH_MD5;
int AUTH_USERID;
fd_set osocks;
int NumNewChannel;
struct Message TempMsg;
char *NET_NAME[] = { "edu", "cnc", "uni", "tel" };
long long totalDownBytes=0, totalUpBytes=0;
long long tmpDownBytes=0, tmpUpBytes=0;
char *urlroot;
char *defaultspip;
char *spip[4];
char *tsip[4];
#ifdef TEST
char *WWW_ROOT;
#endif
#ifdef HAVE_MYSQL
char *MYSQL_HOST;
char *MYSQL_USER;
char *MYSQL_PASS;
char *MYSQL_DB;
#endif
char *CAS_ADDR; // Content Aggregation Server, added by lixingwu, 20070313
int tsSock[MAX_TS];
struct sockaddr_in tsAddr[MAX_TS];
extern struct Channel *ChannelHash[MAX_CHANNEL];
extern struct Channel *ChannelList;
extern int isDelete (struct Channel *pc);
extern int errno;
int JobHighWater = 10000;
int BINDALL=1;
int AuthCS=0;
time_t startTime;
time_t CurrentTime;
int PERIOD;
int SnapShotInterval; //make a snapshot every few second
int spuprecord[MAX_CHANNEL];
struct ServerDesc TRACKER[MAX_TYPE];
char *LOGXML;
struct NamVal ConfigParameters[] = {
{"DBdir", &Home, 's'},
{"AuthCS", &AuthCS, 'd'},
{"MAX_CP",&MAX_CP, 'd'},
{"MAX_CS", &MAX_CS, 'd'},
{"Pidfile", &PIDFile, 's'},
{"Prefix", &PREFIX, 's'},
{"DBfile", &Database, 's'},
{"TrackerIP", &SERVERIP, 's'},
{"Dir", &MEDIABASE, 's'},
{"authid", &AUTH_USERID, 'd'},
{"authmd5", &AUTH_MD5, 's'},
{"periodDump", &PERIOD, 'd'}, //make a periodCheck every periodDump second
{"UrlRoot", &urlroot, 's'},
{"EDUSPIP", &spip[EDUTS], 's'},
{"CNCSPIP", &spip[CNCTS], 's'},
{"UNISPIP", &spip[UNITS], 's'},
{"TELSPIP", &spip[TELTS], 's'},
{"BINDALL", &BINDALL, 'd'},
{"EDUTSIP", &tsip[EDUTS], 's'},
{"CNCTSIP", &tsip[CNCTS], 's'},
{"UNITSIP", &tsip[UNITS], 's'},
{"TELTSIP", &tsip[TELTS], 's'},
{"SnapShotInterval", &SnapShotInterval, 'd'},
{"LogFilePath", &LOGXML, 's'},
#ifdef TEST
{"WWWRoot", &WWW_ROOT, 's'},
#endif
#ifdef HAVE_MYSQL
{"MysqlAddress", &MYSQL_HOST, 's'},
{"User", &MYSQL_USER, 's'},
{"Database", &MYSQL_DB, 's'},
{"Password", &MYSQL_PASS, 's'},
#endif
{"AllowCSList", &RULE, 's'},
{"JobHighWater", &JobHighWater, 'd'},
{"CAS_ADDR", &CAS_ADDR, 's'} // Content Aggregation Server, added by lixingwu, 20070313
};
extern int db_end ();
extern int db_init (char *home, char *database);
extern int isAllowed (int id, char *md5, char *cname, float bitrate,
float *limitedbitrate, int *issave);
extern char *getJobBuffer (struct JobDes *p, int *max);
extern inline void setblockId (struct JobDes *pj, int id);
#ifdef TEST
int buildGTV (struct Channel *pc, int datalen, char *data, int type);
#endif
int send_P2P_SPUPDATE (struct Session *p, struct Channel *pc, char *md5, struct SPUpdate *s);
int send_p2p_err (struct Session *p, unsigned short code, int quit);
int init_sp ();
int handle_new_connection (int sock, int type);
int Clientclosure (int listnum, int type);
void process_child (void);
int init_CP (int listnum);
int process_CP (int listnum);
int closure_CP (int listnum);
int init_CS (int listnum);
int process_CS (int listnum);
int closure_CS (int listnum);
#ifdef HAVE_MYSQL
MYSQL *local_mysql;
#endif
int process_P2P_REQUEST_real (struct Session *p, struct Channel *pc, int id);
void apply_check (struct Channel *p, void *arg);
void apply_idle (struct Session *p, void *arg);
int period_process ();
void process_TS2RM (int type);
extern int locate_mplist_by_id (struct Channel *pc, unsigned int id, char *buf, int max);
extern int check_newplist ();
extern int timer_free ();
extern int timer_process (unsigned int t);
extern void hup_handler (int);
#include "sessions.c"
#define BUILD_NOCH_SPUPDATE(s) do\
{\
s.minKeySample = -1LL;\
s.maxKeySample = -1LL;\
s.minBlockID = 0xffffffff;\
s.maxBlockID = 0xffffffff;\
} while (0)
#define BUILD_ORDER_SPUPDATE(s,pc) do\
{\
s.minKeySample = -1LL;\
s.maxKeySample = -1LL;\
s.minBlockID = 0;\
s.maxBlockID = (pc->downsize > 0 ? ((pc->downsize - 1) / DEFAULT_BLOCK) : 0)+1;\
} while (0)
#define BUILD_CLOSE_SPUPDATE(s) do\
{\
s.minKeySample = 0;\
s.maxKeySample = 0;\
s.minBlockID = 0xffffffff;\
s.maxBlockID = 0xffffffff;\
} while (0)
#define BUILD_MLIST_SPUPDATE(s,pc) do\
{\
s.minKeySample = -2LL;\
s.maxKeySample = pc->pcinfo->mlist->m_totalchannel;\
s.minBlockID = 0;\
s.maxBlockID = pc->pcinfo->maxID;\
} while (0)
#define BUILD_LIVE_SPUPDATE(spupdate,pc) do\
{\
spupdate.minKeySample = -3LL;\
spupdate.maxKeySample = 1;\
spupdate.minBlockID = pc->pcinfo->s.minBlockID;\
spupdate.maxBlockID = pc->pcinfo->s.maxBlockID;\
} while (0)
void send_all_spupdate (struct Channel *pc, struct SPUpdate *s)
{
struct LiveChannelInfo *pcinfo = pc->pcinfo;
struct Edge *pedge;
char buffer[MAX_DATA], *buf;
if (pcinfo == NULL) return;
pcinfo->updated = CurrentTime;
buf = buffer + sizeof (int);
*(unsigned char *) buf = P2P_SPUPDATE;
buf += sizeof (char);
memcpy (buf, pc->channel_md5, MD5_LEN);
buf += MD5_LEN;
if (s == NULL)
memset (buf, 0, sizeof (struct SPUpdate));
else
memcpy (buf, s, sizeof (struct SPUpdate));
buf += sizeof (struct SPUpdate);
*(int *) buffer = buf - buffer;
for (pedge = pc->PeerHead; pedge; pedge = pedge->cnext)
{
if (writeMessage (pedge->me, pc, buffer) < 0)
{
PDEBUG ("send SPUPDATE err.\n");
}
}
}
void
process_TS2RM (int type) //assure num of channel
{
int len;
char buffer[4096];
char *buf;
struct sockaddr_in addr;
int addrlen = sizeof (struct sockaddr);
int i, msgsize, chnlnum;
struct Channel *pChannel;
if ((len =
recvfrom (tsSock[type], buffer, 4096, 0,
(struct sockaddr *) &addr, &addrlen)) < 0)
{
perror ("recvfrom ts");
return;
}
PINFO ("got TS msg, len %d. \n", len);
buf = buffer;
msgsize = *(int *) buf;
buf += sizeof (int);
if (*(unsigned char *) buf != TS2RM_STAT_RESPONSE)
{
perror ("bad message type from ts");
return;
}
buf += sizeof (unsigned char);
chnlnum = *(int *) buf;
buf += sizeof (int);
if (chnlnum > 100)
{
perror ("too large channel num from ts");
return;
}
for (i = 0; i < chnlnum; ++i)
{
if ((pChannel = findChannel (buf, MD5_LEN)) == NULL)
{
buf += MD5_LEN;
buf += sizeof (int);
continue;
}
buf += MD5_LEN;
pChannel->numofnp[type] = *(int *) buf;
buf += sizeof (int);
PINFO ("%s->%s: client %d.\n",
pChannel->pcinfo ? pChannel->channel_name : "",
pChannel->channel_md5, pChannel->numofnp[type]);
}
}
time_t lastCheck, last_snapshot;
int
period_process ()
{
int type;
time_t tmpTime, time_interval;
struct Argument arg;
char buffer[MAX_DATA];
struct tm result;
int *querynum;
static int snapCount = 0;
timer_process (CurrentTime);
check_newplist ();
if (CurrentTime <= lastCheck + PERIOD) //PERIOD =periodDump=60
return 0;
memset (&arg, 0, sizeof (arg));
tmpTime = CurrentTime - startTime;
time_interval = CurrentTime - lastCheck;
#ifdef HAVE_MYSQL
query_mysql (local_mysql, "delete from channel where '1'");
#endif
#ifdef TEST
for (type=0; type<MAX_TS; type++)
{
snprintf (buffer, MAX_DATA, "%s/%s/channel.xml", WWW_ROOT,
NET_NAME[type]);
if ((arg.xml[type] = fopen (buffer, "w")) == (FILE *) 0)
{
PDEBUG ("Error in open file %s\n", buffer);
continue;
}
fprintf (arg.xml[type],
"<?xml version=\"1.0\" encoding=\"gbk\"?>\r\n<?xml:stylesheet type=\"text/xsl\" href=\"simple.xsl\" ?>\r\n<GaoV>\r\n");
}
#endif
localtime_r (&CurrentTime, &result);
snprintf (buffer, MAX_DATA, "./sp-%d-%d-%d.log", result.tm_year+1900, result.tm_mon+1, result.tm_mday);
if ((arg.f = fopen (buffer, "a")) == NULL)
{
PDEBUG ("Couldn't open sp log file! \n");
return -1;
}
fprintf (arg.f, "\n\n********************Start %d SnapShot of SP, Time: %u/%u %u:%u:%u.\n",
snapCount++, result.tm_mon + 1, result.tm_mday, result.tm_hour,
result.tm_min, result.tm_sec);
fprintf(arg.f, "SP: cur Down %.4f KB. \n", ((float)tmpDownBytes)/1024/time_interval);
fprintf(arg.f, "SP: cur Up %.4f KB. \n", ((float)tmpUpBytes)/1024/time_interval);
totalDownBytes += tmpDownBytes;
totalUpBytes += tmpUpBytes;
fprintf(arg.f, "SP: avg Down %.4f KB. \n", ((float)totalDownBytes)/1024/tmpTime);
fprintf(arg.f, "SP: avg Up %.4f KB. \n", ((float)totalUpBytes)/1024/tmpTime);
arg.buf = buffer + sizeof (int);
*(unsigned char *) (arg.buf) = RM2TS_STAT_QUERY;//RM2TS=0x20
arg.buf += sizeof (unsigned char);
querynum = (int *) (arg.buf);
arg.buf += sizeof (int);
arg.type = TYPE_CS;
apply_session (TRACKER[TYPE_CS].head, TRACKER[TYPE_CS].maxid+1, apply_idle, &arg);
/*
arg.type = TYPE_CP;
apply_session (TRACKER[TYPE_CP].head, TRACKER[TYPE_CP].maxid+1, apply_idle, &arg);
*/
apply_list (ChannelList, apply_check, &arg);
#ifdef TEST
for (type=0; type<MAX_TS; type++)
{
if (arg.xml[type] == NULL)
continue;
fprintf (arg.xml[type], "</GaoV>\r\n");
fclose (arg.xml[type]);
}
#endif
fprintf (arg.f, "Channel Count : %d.Total client : %d . Total upsize :%lldB . \n",
arg.spchannelcount, arg.totalclient, arg.totalupsize);
fprintf (arg.f,
"\n************************END SnapShot*********************\n");
fclose (arg.f);
// added by lixingwu, 20070313
// upload channel.xml to server, only edu
char upload_cmd[MAX_DATA];
sprintf(upload_cmd, "curl -F filename=@%s/%s/channel.xml -F domain=%s %s",
WWW_ROOT, NET_NAME[EDUTS], spip[EDUTS], CAS_ADDR);
printf("%s\n", upload_cmd);
system(upload_cmd);
#ifdef TEST
if (arg.spchannelcount > 0)
{
*querynum = arg.spchannelcount;
*(int *) buffer = arg.buf - buffer;
for (type=0; type<MAX_TS; type++)
{
if (tsSock[type] <= 0)
continue;
if (sendto (tsSock[type], buffer, *(int *) buffer, 0, (struct sockaddr *) &tsAddr[type], sizeof (struct sockaddr_in)) != *(int *) buffer)
{
PDEBUG ("sent to ts %d error. \n", type);
return 0;
}
}
}
#endif
logto_xml (time_interval, tmpTime, arg.spchannelcount, arg.totalclient);
lastCheck = CurrentTime;
last_snapshot ++;
tmpDownBytes = tmpUpBytes = 0;
if (last_snapshot > SnapShotInterval)
{
system ("/usr/bin/vmstat -a >> sp.log 2>&1 &");
last_snapshot = 0;
}
return 0;
}
int
main (int argc, char **argv)
{
int i, mode = 1;
if (argc < 2)
{
printf ("usage: %s mode(0 for daemon, 1 for console).\n",
argv[0]);
return -1;
}
signal (SIGPIPE, SIG_IGN);
signal (SIGINT, terminate);
signal (SIGHUP, hup_handler);
mode = atoi (argv[1]);
if (mode == 0)
daemon (1, 1);
read_config (CONFIG, ConfigParameters,
sizeof (ConfigParameters) / sizeof (struct NamVal));
for (i = 0; i < 10 && IN_LOOP > 0; i++)
{
/*
pid_t pid;
if ((pid = fork ()) == 0)
{
*/
FD_ZERO (&osocks);
if (init_sp () < 0) // || initLOG () < 0)
{
PDEBUG ("init_sp error, exit...\n");
exit (-1);
}
process_child ();
/*
} else if (pid < 0)
{
perror ("fork");
exit (pid);
} else
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -