📄 cpnew.c
字号:
{
*buf = CT_GENERAL;
buf += sizeof (char);
}
*(int *)buffer = buf - buffer; //the size of buffer;
TSADDR.sin_port = htons (TS4CP_PORT);
TSADDR.sin_addr.s_addr = inet_addr (SERVERIP);
TSADDR.sin_family = AF_INET;
if (sendMessage (TSSOCK, buffer, &TSADDR) < 0) //send register msg
{
PDEBUG ("Cannot write to server\n");
return -1;
}
return 0;
}
#endif
int init_cp ()
{
FILE *pidf;
struct rlimit rl;
char buffer[MAX_DATA];
rl.rlim_cur = rl.rlim_max = 1000000;
if (setrlimit (RLIMIT_NOFILE, &rl) != 0)
{
perror ("getrlimit");
}
OPENLOG;
#ifdef DEBUG
system ("ulimit -a");
if (getrlimit (RLIMIT_CORE, &rl) != 0)
{
perror ("getrlimit");
}
fprintf (stderr, "Get core limit %d:%d\n", (int)rl.rlim_cur, (int)rl.rlim_max);
rl.rlim_cur = rl.rlim_max = (rlim_t )102400;
if (setrlimit (RLIMIT_CORE, &rl) != 0)
{
perror ("getrlimit");
}
if (getrlimit (RLIMIT_CORE, &rl) != 0)
{
perror ("getrlimit");
}
fprintf (stderr, "Set core limit to %d:%d\n", (int)rl.rlim_cur, (int)rl.rlim_max);
system ("ulimit -a");
#endif
#ifdef HAVE_TS
if (register_cp () < 0)
{
PDEBUG ("Cannot init TS connection\n");
return -1;
}
#endif
TRACKER[TYPE_P2PS].flag = FLAG_SERVER;
TRACKER[TYPE_P2PS].type = TYPE_P2PS;
TRACKER[TYPE_P2PS].port = cfgP2PS_PORT;
TRACKER[TYPE_P2PS].cur = 0;
TRACKER[TYPE_P2PS].max = MAX_P2PS;
TRACKER[TYPE_P2PS].init = init_P2PS;
TRACKER[TYPE_P2PS].process = process_P2PS;
TRACKER[TYPE_P2PS].closure = closure_P2PS;
TRACKER[TYPE_P2PS].head = calloc (sizeof (struct Session), TRACKER[TYPE_P2PS].max);
switch (BINDALL)
{
case 0:
if ((TRACKER[TYPE_P2PS].sock = init_server (BINDIP, cfgP2PS_PORT)) < 0)
return -1;
break;
default:
if ((TRACKER[TYPE_P2PS].sock = init_server (NULL, cfgP2PS_PORT)) < 0)
return -1;
break;
}
FD_SET(TRACKER[TYPE_P2PS].sock, &osocks);
TRACKER[TYPE_P2PC].flag = FLAG_CLIENT;
TRACKER[TYPE_P2PC].type = TYPE_P2PC;
TRACKER[TYPE_P2PC].port = 0;
TRACKER[TYPE_P2PC].cur = 0;
TRACKER[TYPE_P2PC].max = MAX_P2PC;
TRACKER[TYPE_P2PC].init = init_P2PC;
TRACKER[TYPE_P2PC].process = process_P2PC;
TRACKER[TYPE_P2PC].closure = closure_P2PC;
TRACKER[TYPE_P2PC].head = calloc (sizeof (struct Session), TRACKER[TYPE_P2PC].max);
mkdir (PREFIX, 0777);
snprintf (buffer, MAX_DATA, "%s/%s", PREFIX, LIVE_PREFIX);
mkdir (buffer, 0777);
snprintf (buffer, MAX_DATA, "rm -fr %s/%s/*", PREFIX, LIVE_PREFIX);
system (buffer);
if ((pidf = fopen (PIDFile, "w")) == NULL)
{
PDEBUG ("Cannot open pidfile.\n");
return -1;
}
fprintf (pidf, "%d\n", getpid ());
fclose (pidf);
return 0;
}
#ifdef HAVE_TS
int process_TS()
{
struct sockaddr_in dest;
int addr_len = sizeof (dest);
struct Message m;
int i;
if ((i = recvfrom (TSSOCK, &m, sizeof (struct Message), 0, (struct sockaddr *)&dest, &addr_len)) <= 0)
{
PDEBUG ("Error in recving ts message.\n");
register_cp ();
return 0;
}
switch (m.type)
{
case TS2CP_WELCOME:
memcpy (&UDPMsg, &m, 12);
PDEBUG("recv WELCOME from TS.\n");
/* isSet whether TS has returned WELCOME message */
isSet = 1;
break;
case TS2CP_PEERS:
process_TS2CP_PEERS (m.buffer);
break;
case TS2CP_MSG:
if (*(char *)(m.buffer+sizeof(short)))
{
PDEBUG ("Error in TS2CP_MSG. \n");
register_cp ();
}
break;
default:
PDEBUG ("Error in trackerserver message format\n");
register_cp ();
return -1;
}
return 0;
}
int closure_TS ()
{
struct TSMessage *m = &UDPMsg;
m->type = CP2TS_LOGOUT;
m->len = 12;
sendMessage (TSSOCK, (char *)m, &TSADDR);
return 0;
}
int process_TS2CP_PEERS (char *buf)
{
struct Channel *pc;
int listnum;
char *channel_md5;
unsigned char cpsize;
struct NormalAddress *CPlist;
unsigned char peersize;
struct PeerInfoWithAddr *pinfo;
channel_md5 = buf;
buf += MD5_LEN;
cpsize = *(unsigned char *)buf;
buf += sizeof (char);
CPlist = (struct NormalAddress *)buf;
buf += sizeof(struct NormalAddress)*cpsize;
peersize = *(unsigned char *)buf;
buf += sizeof (char);
pinfo = (struct PeerInfoWithAddr *)buf;
// now find the channel
if ((pc = findChannel (channel_md5, MD5_LEN)) == NULL) return -1;
if ((listnum = newChannel (pc, CPlist, cpsize, TYPE_CP)) < 0)
return -1;
return 0;
}
#endif
char *parseECP (char *str, char *buf)
{
char *buffer = buf;
int flag = -1;
unsigned char c;
unsigned char part;
buf += sizeof (int);
for (part=0; *str ;str++)
{
c = *str;
switch (c)
{
case ':':
flag = 0;
break;
case '.':
if (flag < 2)
{
*(unsigned char *)buf = part;
buf += sizeof (char);
}
part = 0;
flag ++;
break;
default:
if (c >= '0' && c <= '9')
part = c;
break;
}
}
*(int *)buffer = (buf - buffer - sizeof (int))/sizeof(short);
return buf;
}
int periodCheck (float KBPSused)
{
struct Session *head;
int max, listnum;
struct statistics
{
unsigned int resnum;
unsigned short connnum;
float bandwidth;
} stat;
stat.bandwidth = KBPSused/(MAX_BANDWIDTH*1024)/8;
/* isSet whether TS has returned WELCOME message */
if (isSet == 0)
{
register_cp ();
return 0;
}
max = TRACKER[TYPE_P2PC].maxid + 1;
head = TRACKER[TYPE_P2PC].head;
memset (&stat, 0, sizeof(stat));
for (listnum = 0; listnum < max; listnum++)
{
if (head[listnum].socket > 0)
{
stat.resnum ++;
stat.connnum ++;
}
}
max = TRACKER[TYPE_P2PS].maxid + 1;
head = TRACKER[TYPE_P2PS].head;
for (listnum = 0; listnum < max; listnum++)
{
if (head[listnum].socket > 0)
{
if (head[listnum].pc == NULL &&
head[listnum].header == NULL &&
CurrentTime - head[listnum].last_transferblock > MAX_TRANSFER_IDLE)
{
PDEBUG ("timeout %d from NP %d to %d.\n", listnum, head[listnum].last_transferblock, (int)CurrentTime);
Clientclosure (listnum, TYPE_P2PS);
}
else
stat.connnum ++;
}
}
#ifdef HAVE_TS
*(unsigned int *)(UDPMsg.buffer) = stat.resnum;
*(unsigned short *)(UDPMsg.buffer+sizeof(int)) = stat.connnum;
*(float *)(UDPMsg.buffer+sizeof(int)+sizeof(short)) = stat.bandwidth;
if(stat.connnum <= MAX_P2PS)
*(unsigned char *)(UDPMsg.buffer+sizeof(int)+sizeof(short)+sizeof(float)) = 0;
else
*(unsigned char *)(UDPMsg.buffer+sizeof(int)+sizeof(short)+sizeof(float)) = 1;
UDPMsg.len = 23;
UDPMsg.type = CP2TS_UPDATE;
if (sendMessage (TSSOCK, (char *)&UDPMsg, &TSADDR) < 0)
{
PDEBUG ("exit...\n");
exit (1);
}
#endif
PDEBUG("Res Num: %d. Connection Num: %d. BandWidth Usage: %.4f. \n", stat.resnum, stat.connnum, stat.bandwidth);
return 0;
}
void makeSnapShot(int count, int time_interval)
{
time_t tmpTime;
struct tm result;
struct Channel *pc, *nextpc;
// struct Session *ps;
// struct Edge *pe;
int cpchannelcount = 0;
int totalclient = 0;
long long totalupsize = 0, totaldownsize = 0;
FILE *f;
char buffer[MAX_DATA];
if (time_interval <= 0)
return;
localtime_r(&CurrentTime, &result);
sprintf (buffer, "./cp-%d-%d-%d.log", result.tm_year+1900, result.tm_mon+1, result.tm_mday);
if ((f = fopen(buffer,"a")) == NULL)
{
PDEBUG("Couldn't open cp.log file! \n");
return;
}
fseeko(f, 0, SEEK_END);
// 1. start CP SnapShot
fprintf(f, "\n\n**************Start %d SnapShot of CP, Time : %u/%u %u:%u:%u.********* \n",count,result.tm_mon+1, result.tm_mday, result.tm_hour, result.tm_min, result.tm_sec);
// 2. log speed
fprintf(f, "CP: cur Down %.4f KB. \n", ((float)tmpDownBytes)/1024/time_interval);
fprintf(f, "CP: cur Up %.4f KB. \n", ((float)tmpUpBytes)/1024/time_interval);
periodCheck(((float)tmpDownBytes+tmpUpBytes)/1024/time_interval);
totalDownBytes += tmpDownBytes;
totalUpBytes += tmpUpBytes;
tmpTime = CurrentTime - startTime;
fprintf(f, "CP: avg Down %.4f KB. \n", ((float)totalDownBytes)/1024/tmpTime);
fprintf(f, "CP: avg Up %.4f KB. \n", ((float)totalUpBytes)/1024/tmpTime);
// 3. log channel state
for (pc=ChannelList; pc; pc=nextpc)
{
nextpc = pc->lnext;
++cpchannelcount;
totalclient += pc->numclient;
totalupsize += pc->upsize;
totaldownsize += pc->downsize;
fprintf(f,"Channel %s have %d client. Down size %lld, avg speed %f. Up Size %lld, avg speed %f. \n",pc->fname,pc->numclient,pc->downsize, ((float)(pc->downsize)) / time_interval, pc->upsize, ((float)(pc->upsize))/ time_interval);
/*
for (pe=pc->PeerHead; pe; pe = pe->cnext)
{
// if bitrate < 300kb/s ,then kill it
if (pe->me->totalup/(CurrentTime - pe->me->time_sec)/1024 < 300)
fprintf(f,"Session bitrate:%lld .Too slow ! \n",pe->me->totalup/(CurrentTime - pe->me->time_sec));
}
*/
if (pc->numclient == 0)
freeLiveChannel (pc, NULL);
else if (CurrentTime - pc->last_nearpeer > NearPeerInterval)
{
send_nearpeers_toall (pc);
pc->last_nearpeer = CurrentTime;
}
}
fprintf(f,"Channel Count : %d. Total client : %d. Total dowsize: %lld. Total upsize %lld \n",cpchannelcount,totalclient,totaldownsize,totalupsize);
fprintf(f,"\n*********************End of SnapShot************************\n");
fclose(f);
logto_xml (time_interval, tmpTime, cpchannelcount, totalclient);
tmpDownBytes = tmpUpBytes = 0;
}
int reconnect (struct Session *p)
{
struct NormalAddress *client;
struct sockaddr_in addr;
if (p->pc == NULL || p->pc->pcinfo->numofsp <= p->flag) return -1;
close (p->socket);
FD_CLR(p->socket, &osocks);
if ((p->socket = socket (PF_INET, SOCK_STREAM, 0)) < 0)
{
perror ("socket||gethostbyname");
p->socket = 0;
return -1;
}
client = &(p->pc->pcinfo->SPLIST[p->flag]);
memset (&addr, 0, sizeof (addr));
addr.sin_port = client->sin_port;
addr.sin_addr = client->sin_addr;
addr.sin_family = AF_INET;
p->flag ++;
if ((p->sock_flag = connect_nonb(p->socket, &addr, sizeof (addr))) == -1)
{
close (p->socket);
return -1;
}
FD_SET(p->socket, &osocks);
p->time_sec = CurrentTime;
p->totalup = 0;
return p->flag;
}
int send_P2P_PUSHLIST (struct Channel *pc, int id)
{
unsigned char *ptr;
char buffer[MAX_DATA], *buf;
struct LiveChannelInfo *pcinfo = pc->pcinfo;
int i, j;
buf = buffer+sizeof (int);
*(unsigned char *)buf = P2P_PUSHLIST;
buf += sizeof (char);
memcpy (buf, pc->channel_md5, MD5_LEN);
buf += MD5_LEN;
*(unsigned char *)buf = 0;
buf += sizeof (char);
ptr = buf;
buf += sizeof (char);
*ptr = 0;
for (i=id; (i >= pcinfo->s.minBlockID && i <= pcinfo->s.maxBlockID) && i<id+REQUEST_AHEAD; i++)
{
j = i % pcinfo->max_queue;
if (pcinfo->indisk[j] == (i/pcinfo->max_queue+1) || isSet (pcinfo->bitflag, j))
continue;
*(int *)buf = i;
buf += sizeof (int);
(*ptr) ++;
setBit (pcinfo->bitflag, j);
}
if (*ptr > 0)
{
*(unsigned char *)buf = 0;
buf += sizeof (char);
*(int *)buffer = buf - buffer;
writeMessage (pcinfo->dataSource, pc, buffer);
}
return 0;
}
int send_nearpeers (struct Channel *pc, struct Edge *pme)
{
char buffer[MAX_DATA], *buf, *ptr;
struct Edge *pedge;
int i;
buf = buffer+sizeof (int);
*(unsigned char *)buf = P2P_NEAR_PEERS;
buf += sizeof (char);
ptr = buf;
buf += sizeof (char);
for (i=0, pedge=pme->cnext; i<MAX_NEARPEER; pedge=pedge->cnext)
{
if (pedge == NULL)
pedge = pc->PeerHead;
if (pedge == pme)
break;
memcpy (buf, &(pedge->me->addr), sizeof (struct PeerInfoWithAddr));
buf += sizeof (struct PeerInfoWithAddr);
i++;
}
if (i > 0)
{
*(unsigned char *)ptr = i;
*(int *)buffer = buf - buffer;
writeMessage (pme->me, pc, buffer);
}
return 0;
}
int send_nearpeers_toall (struct Channel *pc)
{
struct Edge *pedge;
for (pedge=pc->PeerHead; pedge; pedge=pedge->cnext)
send_nearpeers (pc, pedge);
return 0;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -