📄 spnew.c
字号:
// PDEBUG ("write %u ",
// ((unsigned int *) msg)[0]);
if (-1 == writeMessage (pedge->me, pc, buffer))
PDEBUG ("buffer is full\n");
// else
// PDEBUG ("OK\n");
}
if (pcinfo->updated <= CurrentTime + SPUPDATE_SLOT) //pcinfo is livechannel
send_all_spupdate (pc, &(pcinfo->s));
} else
{
PDEBUG ("save Block Failed ! size %d, %d\n", size,
listnum);
Clientclosure (listnum, TYPE_CS);
}
return 0;
}
int
process_CS (int listnum)
{
struct Session *p = &(TRACKER[TYPE_CS].head[listnum]);
struct Message *m = (struct Message *) (p->buf + p->start);
tmpDownBytes += m->len;
switch (m->type)
{
case CS2SP_REGISTER:
process_CS2SP_REGISTER (listnum, m->buffer);
break;
case CS2SP_UPDATE:
process_CS2SP_UPDATE (listnum,
*(float *) (m->buffer));
break;
case CS2SP_BLOCK:
process_CS2SP_BLOCK (listnum, m->buffer);
break;
default:
Clientclosure (listnum, TYPE_CS);
return -1;
}
return 0;
}
int
closure_CS (int listnum)
{
struct Session *p = &(TRACKER[TYPE_CS].head[listnum]);
struct Channel *pc = p->pc;
PDEBUG ("CS disconnected from %d.%d.%d.%d:%d\n",
IPADDR (p->host), p->port);
if (pc)
{
if (pc->pcinfo)
{
pc->pcinfo->dataSource = NULL;
pc->pcinfo->status = 1;
}
freeLiveChannel (pc, NULL);
}
FD_CLR (TRACKER[TYPE_CS].head[listnum].socket, &osocks);
close (TRACKER[TYPE_CS].head[listnum].socket);
FREE (p->buf);
deleteAll (p);
memset (&(TRACKER[TYPE_CS].head[listnum]), 0,
sizeof (struct Session));
return 0;
}
int
init_sp ()
{
FILE *pidf;
struct rlimit rl;
char *index;
int type;
char buffer[MAX_DATA];
rl.rlim_cur = rl.rlim_max = 1000000;
if (setrlimit (RLIMIT_NOFILE, &rl) != 0)
{
perror ("setrlimit");
}
for (type=0; type<MAX_TS; type++)
{
if (spip[type] != NULL && strlen (spip[type]) >= MIN_IPADDR_LEN)
{
defaultspip = spip[type];
break;
}
}
if (defaultspip == NULL)
defaultspip = "127.0.0.1";
OPENLOG;
#ifdef DEBUG
system ("ulimit -a");
if (getrlimit (RLIMIT_CORE, &rl) != 0)
{
perror ("getrlimit");
}
PDEBUG ("Get core limit %d:%d\n", (int)rl.rlim_cur, (int)rl.rlim_max);
rl.rlim_cur = rl.rlim_max = (rlim_t )10240000;
if (setrlimit (RLIMIT_CORE, &rl) != 0)
{
perror ("setrlimit");
}
if (getrlimit (RLIMIT_CORE, &rl) != 0)
{
perror ("getrlimit");
}
PDEBUG ("Set core limit to %d:%d\n", (int)rl.rlim_cur, (int)rl.rlim_max);
system ("ulimit -a");
#endif
#ifdef HAVE_MYSQL
if ((local_mysql =
init_mysql (MYSQL_HOST, MYSQL_USER, MYSQL_PASS, MYSQL_DB,
"/var/run/mysqld/mysqld.sock")) == 0)
{
PDEBUG ("Error in init_mysql.\n");
exit (1);
}
#endif
TRACKER[TYPE_CP].type = TYPE_CP; //allocate CP ServerDesc TYPE_CP = 0
TRACKER[TYPE_CP].port = SP4CP_PORT; //SP4CP_PORT = 50001
TRACKER[TYPE_CP].cur = 0; //current client connection
TRACKER[TYPE_CP].max = MAX_CP; //MAX_CP = 2048
TRACKER[TYPE_CP].init = init_CP; //the function pointer of init_CP,return a debug message
TRACKER[TYPE_CP].process = process_CP; //the function pointer of process_CP, switch TYPE
TRACKER[TYPE_CP].closure = closure_CP; //the funciont pointer of closure_CP,return debug msg and freejob
TRACKER[TYPE_CP].head = calloc (sizeof (struct Session), TRACKER[TYPE_CP].max); //allocate session memory
switch (BINDALL)
{
case 0:
if ((TRACKER[TYPE_CP].sock = init_server (spip[0], SP4CP_PORT)) < 0) //PORT = 50001
return -1;
break;
default:
if ((TRACKER[TYPE_CP].sock = init_server (NULL, SP4CP_PORT)) < 0) //PORT = 50001
return -1;
break;
}
FD_SET (TRACKER[TYPE_CP].sock, &osocks);
TRACKER[TYPE_CS].type = TYPE_CS; //allocate CP ServerDese,TYPE_CS = 1
TRACKER[TYPE_CS].port = SP4CS_PORT;
TRACKER[TYPE_CS].cur = 0;
TRACKER[TYPE_CS].max = MAX_CS; //MAX_CS = 512
TRACKER[TYPE_CS].init = init_CS; //return a debug message
TRACKER[TYPE_CS].process = process_CS;
TRACKER[TYPE_CS].closure = closure_CS;
TRACKER[TYPE_CS].head =
calloc (sizeof (struct Session), TRACKER[TYPE_CS].max);
switch (BINDALL)
{
case 0:
if ((TRACKER[TYPE_CS].sock = init_server (spip[0], SP4CS_PORT)) < 0) //port =
return -1;
break;
default:
if ((TRACKER[TYPE_CS].sock = init_server (NULL, SP4CS_PORT)) < 0) //port =
return -1;
break;
}
FD_SET (TRACKER[TYPE_CS].sock, &osocks);
if (db_init (Home, Database) < 0)
return -1;
memset (tsAddr, 0, sizeof (struct sockaddr_in) * MAX_TS); //sockaddr_in type
for (type = 0; type < MAX_TS; ++type)
{
#ifdef TEST
sprintf (buffer, "rm -f %s/%s/*.gtv", WWW_ROOT,
NET_NAME[type]);
system (buffer);
sprintf (buffer, "rm -f %s/%s/*.mediadata", WWW_ROOT,
NET_NAME[type]);
system (buffer);
sprintf (buffer, "rm -f %s/%s/channel.xml", WWW_ROOT,
NET_NAME[type]);
system (buffer);
#endif
sprintf (buffer, "rm -fr %s/%s/*", PREFIX, LIVE_PREFIX);
system (buffer);
if (tsip[type] == NULL) continue;
tsAddr[type].sin_family = AF_INET;
tsAddr[type].sin_port = htons (TS4RM_PORT);
index = strchr (tsip[type], ':');
if (index == NULL)
inet_aton (tsip[type], &tsAddr[type].sin_addr);
else
{
*index = 0;
inet_aton (tsip[type], &tsAddr[type].sin_addr);
*index = ':';
}
tsSock[type] = socket (PF_INET, SOCK_DGRAM, 0); //upd connection
if (tsSock[type] < 0)
return -1;
}
mkdir (PREFIX, 0777);
sprintf (buffer, "%s/%s", PREFIX, LIVE_PREFIX);
mkdir (buffer, 0777);
sprintf (buffer, "%s/%s", PREFIX, ORDER_PREFIX);
mkdir (buffer, 0777);
sprintf (buffer, "%s/%s", PREFIX, PLIST_PREFIX);
mkdir (buffer, 0777);
sprintf (buffer, "%s/%s", PREFIX, PROG_PREFIX);
mkdir (buffer, 0777);
if ((pidf = fopen (PIDFile, "w")) == NULL)
{
PDEBUG ("Cannot open pidfile.\n");
return -1;
}
fprintf (pidf, "%d\n", getpid ());
fclose (pidf);
return 0;
}
#ifdef TEST
/*
int
buildMediaData (struct Channel *pc, int datalen, char *data, int type)
{
int i;
char olddata[MAX_DATA];
char buffer[MAX_DATA];
struct stat stbuf;
FILE *f;
assert (pc->pcinfo);
snprintf (buffer, MAX_LINE, "%s/%s/%s.mediadata", WWW_ROOT, NET_NAME[type], pc->channel_name);
if (stat (buffer, &stbuf) == 0)
{
if (stbuf.st_size != datalen)
{
PDEBUG ("old media data size %d not match new %d\n", (int)(stbuf.st_size), datalen);
return -1;
}
if ((f = fopen (buffer, "r")) == NULL)
{
PDEBUG ("cannot open mediadata file %s\n", buffer);
perror ("fopen");
return -1;
}
if (fread (olddata, datalen, 1, f) == 1)
{
for (i=0; i<datalen; i++)
{
if (data[i] != olddata[i])
{
PDEBUG ("media data not match %s, %d\n", buffer, i);
fclose (f);
return -1;
}
}
} else
{
PDEBUG ("Error in read old mediadata %s.\n", buffer);
fclose (f);
return -1;
}
fclose (f);
return 0;
} else if ((f = fopen (buffer, "w")) == NULL)
{
PDEBUG ("cannot open gtv file %s\n", buffer);
perror ("fopen");
return -1;
}
fwrite (data, datalen, 1, f);
fclose (f);
return 0;
}
*/
int
buildGTV (struct Channel *pc, int datalen, char *data, int type)
{
char buffer[MAX_DATA];
FILE *f;
assert (pc->pcinfo);
snprintf (buffer, MAX_LINE, "%s/%s/%s.gtv", WWW_ROOT, NET_NAME[type], pc->channel_name);
// snprintf (buffer, MAX_LINE, "%s/%s.gtv", pc->fname, pc->channel_name);
if ((f = fopen (buffer, "w")) == NULL)
{
PDEBUG ("cannot open gtv file %s\n", buffer);
perror ("fopen");
return -1;
}
if (pc->pcinfo)
{
if (pc->pcinfo->mlist != NULL)
sprintf (buffer,
"CSUserID=%d\r\nBlockSize=%d\r\nBitRate=%f\r\nChannelName=%s\r\nPlaylist=true\r\nResourceHash=%s\r\nTrackServer=%s\r\nSuperPeer=%s:50001\r\nDataLength=%d\r\nData=",
pc->pcinfo->userid, pc->maxblocksize, pc->pcinfo->bitrate,
pc->channel_name, pc->channel_md5, tsip[type], spip[type],
datalen);
else
sprintf (buffer,
"CSUserID=%d\r\nBlockSize=%d\r\nBitRate=%f\r\nChannelName=%s\r\nResourceHash=%s\r\nTrackServer=%s\r\nSuperPeer=%s:50001\r\nDataLength=%d\r\nData=",
pc->pcinfo->userid, pc->maxblocksize, pc->pcinfo->bitrate,
pc->channel_name, pc->channel_md5, tsip[type], spip[type],
datalen);
} else
sprintf (buffer,
"GTVHome=%s/\r\nBlockSize=%d\r\nResourceHash=%s\r\nTrackServer=%s\r\nSuperPeer=%s:50001\r\nDataLength=%d\r\nData=",
urlroot, pc->maxblocksize, pc->channel_md5,
tsip[type], spip[type], datalen);
fwrite (buffer, strlen (buffer), 1, f);
fwrite (data, datalen, 1, f);
fclose (f);
/* snprintf (buffer, MAX_LINE, "%s/%s/%s.mediadata", WWW_ROOT, NET_NAME[type], pc->channel_name);
if (stat (buffer, &stbuf) == 0)
{
if (stbuf.st_size != datalen)
{
PDEBUG ("old media data size %d not match new %d\n", (int)(stbuf.st_size), datalen);
return -1;
}
if ((f = fopen (buffer, "r")) == NULL)
{
PDEBUG ("cannot open mediadata file %s\n", buffer);
perror ("fopen");
return -1;
}
if (fread (olddata, datalen, 1, f) == 1)
{
for (i=0; i<datalen; i++)
{
if (data[i] != olddata[i])
{
PDEBUG ("media data not match %s, %d\n", buffer, i);
fclose (f);
return -1;
}
}
} else
{
PDEBUG ("Error in read old mediadata %s.\n", buffer);
fclose (f);
return -1;
}
fclose (f);
return 0;
} else if ((f = fopen (buffer, "w")) == NULL)
{
PDEBUG ("cannot open gtv file %s\n", buffer);
perror ("fopen");
return -1;
}
fwrite (data, datalen, 1, f);
fclose (f);
*/
// added by lixingwu, 20070313
// upload gtv files to server
char upload_cmd[MAX_DATA];
sprintf(upload_cmd, "curl -F filename=@%s/%s/%s.gtv -F domain=%s %s",
WWW_ROOT, NET_NAME[type], pc->channel_name, spip[type], CAS_ADDR);
printf("%s\n", upload_cmd);
system(upload_cmd);
return 0;
}
#endif
int
send_P2P_SPUPDATE (struct Session *p, struct Channel *pc, char *md5, struct SPUpdate *s)
{
char buffer1[MAX_DATA];
char *buf;
buf = buffer1 + sizeof (int);
*(unsigned char *) buf = P2P_SPUPDATE; //P2P_SPUPDATE=1 in ProTocol file
buf += sizeof (char);
memcpy (buf, md5, MD5_LEN);
buf += MD5_LEN;
memcpy (buf, s, sizeof (struct SPUpdate));
buf += sizeof (struct SPUpdate);
*(int *) buffer1 = buf - buffer1;
if (writeMessage (p, pc, buffer1) < 0)
return -1;
return 0;
}
int
send_p2p_err (struct Session *p, unsigned short code, int quit)
{
char buffer1[MAX_DATA];
char *buf;
buf = buffer1 + sizeof (int);
*(unsigned char *) buf = P2P_MSG;
buf += sizeof (char);
(*(unsigned short *) buf) = code;
buf += sizeof (short);
*(int *) buf = quit;
buf += sizeof (int);
*(int *) buffer1 = buf - buffer1;
PDEBUG ("Send error msg type %hd to %p\n", code, p);
if (writeMessage (p, NULL, buffer1) < 0)
return -1;
return 0;
}
void apply_idle (struct Session *p, void *arg)
{
struct Argument *parg = (struct Argument *)arg;
if (CurrentTime - p->last_transferblock >= MAX_TRANSFER_IDLE)
{
fprintf (parg->f, "%s Session timeout! %ld \n", parg->type == TYPE_CS? "CS":"CP", CurrentTime - p->last_transferblock);
Clientclosure (p - TRACKER[parg->type].head, parg->type);
}
}
void apply_check (struct Channel *p, void *arg)
{
struct Argument *parg = (struct Argument *)arg;
int type;
struct LiveChannelInfo *pc = p->pcinfo;
if (pc && pc->status <= 0)
{
#ifdef HAVE_MYSQL
query_mysql (local_mysql,
"insert into channel(ChannelName, ChannelBitrate, ChannelMD5, ChannelElapsed, ChannelRange) values ('%s\','%f','%d','%d')",
pc->channel_name, pc->bitrate,
p->channel_md5,
time (NULL) - p->ctime,
pc->s.maxKeySample -
pc->s.minKeySample);
#endif
#ifdef TEST
for (type=0; type<MAX_TS; type++)
{
if (parg->xml[type] == NULL)
continue;
fprintf (parg->xml[type], "<Channel Name=\"%s\" Desc=\"%s\" File=\"%s.gtv\" NumClient=\"%d\" BitRate=\"%d\" Start=\"%ld\" End=\"-1\" Elapsed=\"%ld\"/>\r\n",
p->channel_name, "gtv", //pc->userid,
p->channel_name, p->numofnp[type],
(int) (pc->bitrate * 8),
(long) p->ctime, (long) CurrentTime);
}
#endif
PINFO ("query %s->%s \n",
p->channel_name, p->channel_md5);
memcpy (parg->buf, p->channel_md5, MD5_LEN);
parg->buf += MD5_LEN;
fprintf (parg->f, "Channel %s have %d client. upsize %lldB, avg speed %f; downsize %lldB, avg speed %f, reported %f, real/reported is %f%%.\n",
p->channel_md5, p->numclient,
p->upsize, ((float)(p->upsize)) / (CurrentTime - lastCheck), p->downsize, ((float)(p->downsize)) / (CurrentTime - lastCheck), p->pcinfo !=NULL ? p->pcinfo->bitrate:0, p->pcinfo != NULL && p->pcinfo->bitrate != 0 ? (((float)(p->downsize)) / (CurrentTime - lastCheck)*100/p->pcinfo->bitrate): 0);
fprintf (parg->f, "Live SPUpdate : SampleMin %lld SampleMax %lld BlockMin %d BlockMax %d \n",
pc->s.minKeySample,
pc->s.maxKeySample,
pc->s.minBlockID,
pc->s.maxBlockID);
parg->spchannelcount ++;
parg->totalclient += p->numclient;
parg->totalupsize += p->upsize;
} else if (pc && pc->isSave == 0)
freeLiveChannel (p, NULL);
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -