📄 tsnew.c
字号:
system ("ulimit -a");
if (getrlimit (RLIMIT_CORE, &rl) != 0)
{
perror ("getrlimit");
return -1;
}
fprintf (stderr, "Get core limit %d:%d\n", (int)rl.rlim_cur, (int)rl.rlim_max);
rl.rlim_cur = rl.rlim_max = (rlim_t )10240000;
if (setrlimit (RLIMIT_CORE, &rl) != 0)
{
perror ("getrlimit");
return -1;
}
if (getrlimit (RLIMIT_CORE, &rl) != 0)
{
perror ("getrlimit");
return -1;
}
fprintf (stderr, "Set core limit to %d:%d\n", (int)rl.rlim_cur, (int)rl.rlim_max);
system ("ulimit -a");
#endif
if (init_udpserver (&NPTRACKER, LOCALHOST, (int*)cfgTS4NP_PORT.ptr, MAX_NP, cfgTS4NP_PORT.size) < 0
#ifdef HAVE_RM
|| init_udpserver (&RMTRACKER, LOCALHOST, (int*)cfgTS4RM_PORT.ptr, MAX_RM, cfgTS4RM_PORT.size) < 0
#endif
|| init_udpserver (&CPTRACKER, LOCALHOST, (int*)cfgTS4CP_PORT.ptr, MAX_CP, cfgTS4CP_PORT.size) < 0)
{
return -1;
}
#ifdef SORT_NET
maxNet = readNETBLOCK (NETFN);
#endif
statlog = fopen("./stat.log", "a+");// create file "stat.log" automatically if it does not exist.
if (statlog == NULL) {
perror ("error opening statistics log file: stat.log.\n");
}
return 0;
}
int main(int argc, char **argv)
{
int i, mode = 1;
// struct itimerval t, ot;
signal (SIGPIPE, SIG_IGN); // SIGPIPE is raised when the client closes the socket exceptionally
// if not handled, SIGPIPE would cause unexpected termination.
// signal (SIGINT, my_exit);
// argv[1]: daemon mode, not clear
// argv[2]: output status, not used and not clear
if (argc > 1)
{
mode = atoi (argv[1]);
if (argc > 2) OUTPUT_STAT = atoi (argv[2]);
}
if (mode == 0)
daemon(1,1); // run in the background
// read configuration file. just ignored right now.
// 参数为文件名, 一个struct NamVal *, 以及该struct NamVal的项数
read_config (CONFIG, ConfigParameters, sizeof(ConfigParameters)/sizeof (struct NamVal));
readconfig("ip.list");
for (i=0; i<10; i++) // retry 10 times. not useful here.
{
FD_ZERO(&osocks);
if (init_ts () < 0)
{
PDEBUG ("Error in initialization.\n");
exit (1);
}
#ifdef HAVE_MYSQL
if ((local_mysql = init_mysql (MYSQL_HOST, MYSQL_USER, MYSQL_PASS, MYSQL_DB, "/var/run/mysqld/mysqld.sock")) == 0)
{
PDEBUG ("Error in init_mysql.\n");
exit (1);
}
#endif
process_child ();
}
return 0;
}
//===============================================
//===== Here begin the message process part =====
//===============================================
int init_NP (struct Session *p)
{
// maxid: maximum session index currently in the list. for optimization of search
int listnum = p - NPTRACKER.head; // this is the index!
if (listnum > NPTRACKER.maxid)
NPTRACKER.maxid = listnum;
NPTRACKER.cur ++; // cur is in fact the counter of sessions
if (p->u.p.cur) // if there is an edge, then the client is already in a channel
PDEBUG ("NP %d in %d enter Channel %.32s(%d clients).\n", p-NPTRACKER.head, NPTRACKER.cur, p->u.p.cur->head->name, p->u.p.cur->head->numclient);
else
PDEBUG ("NP %d in %d no default channel.\n", p-NPTRACKER.head, NPTRACKER.cur);
return 0;
}
int process_NP (int idsock)
{
int len, listnum;
struct Session *p;
struct TSMessage *m = &UDPMsg;
#ifdef MEASUREMENT
//struct timeval tm;
//long long msec;
#endif
socklen_t addr_len = sizeof (UDPCLIENT);
memset ((char *)&UDPCLIENT, 0, sizeof (UDPCLIENT));
if ((len = (recvfrom (NPTRACKER.sock[idsock], &UDPMsg, MAX_DATA, 0, (struct sockaddr *)&UDPCLIENT, &addr_len))) < 0)
{
perror ("recvfrom:");
return -1;
}
#ifdef MEASUREMENT
//gettimeofday (&tm, NULL);
//msec = ((long long)tm.tv_sec) * 1000000l + tm.tv_usec;
#endif
PDEBUG ("type %d len %d.", m->type, m->len);
if (m->type == NP2TS_LOGIN)
{
process_NP2TS_LOGIN ((struct Message *)m);
++np2tsLoginCount;
} else
{
listnum = m->authcode1; // index of session object
p = NPTRACKER.head+listnum;
// check the session: 1. bad index; 2. uninitialized or cleared; 3. not match
if (listnum >= NPTRACKER.max || p->socket == 0
|| p->auth != m->authcode2)
{
if (m->type != NP2TS_LOGOUT)
SEND_NPMSG(NPTRACKER.sock[idsock],TS2NP_MSG,ERR_INTERNAL,1,&UDPCLIENT);
return -1;
}
switch (m->type)
{
case NP2TS_REPORT: // 报告Interval信息,如果refresh为true, 则重置, 否则则先增加后删除.
process_NP2TS_REPORT (p, m);
++np2tsReportCount;
break;
case NP2TS_NEED_PEERS:
PDEBUG("Need peers!!\n");
process_NP2TS_NEED_PEERS (p, m);
++np2tsNeedPeerCount;
break;
case NP2TS_LOGOUT: // 退出
closure_NP (p);
++np2tsLogoutCount;
break;
case NP2TS_RES_LIST: /* 发送当前NP的所有RESOURCE,使用addSession来进行处理,
如果还没有这条边, 就添加. */
process_NP2TS_RES_LIST (p, m);
++np2tsResListCount;
break;
case NP2TS_REQ_RES: // 添加RES, 并返回Peers
process_NP2TS_REQ_RES (p, m);
++np2tsReqResCount;
break;
case NP2TS_DEL_RES: // 删除RES
process_NP2TS_DEL_RES (p, m);
++np2tsDelResCount;
break;
case NP2TS_QUERY_RES: //查询RES
process_NP2TS_QUERY_RES (p, m);
break;
case NP2TS_REPORT2:
process_NP2TS_REPORT2 (p, m);
break;
default:
SEND_NPMSG(NPTRACKER.sock[idsock],TS2NP_MSG,ERR_INTERNAL,1,&UDPCLIENT);
closure_NP (p);
break;
}
p->last_access = CurTimeSec;
}
PDEBUG ("done\n");
#ifdef MEASUREMENT
//gettimeofday (&tm, NULL);
//PDEBUG ("msg type %d, len %d: %lld msec.\n", m->type, m->len, ((long long)tm.tv_sec) * 1000000l + tm.tv_usec - msec);
#endif
return 0;
}
void logStat(struct Session *p)
{
// 打印记录的时间和客户端的版本号
fprintf(statlog, "%u %f", time(NULL), p->clientVer);
fprintf(statlog, "%d:%d | %d:%d\n", p->host, p->port, p->intra, p->npport);
fprintf(statlog, "%u\t%u\t%u\t%u\t%u\t%u\t%u\t%u\t%u\t%f\n", \
p->u.p.s.playingBlock - p->u.p.startBlock,\
p->u.p.s.currBufferTime,\
p->u.p.s.bufferCount,\
p->u.p.s.bufferTime,\
p->u.p.s.connFailCount,\
p->u.p.s.inConnections,\
p->u.p.s.outConnections,\
p->u.p.s.avgInConnTime,\
p->u.p.s.avgOutConnTime,\
p->u.p.s.messagePercent);
fprintf(statlog, "%lld\t%lld\t%f\t%f\t%f\t%f\n",\
p->u.p.t.totalDownBytes,\
p->u.p.t.totalUpBytes,\
p->u.p.t.currDownSpeed,\
p->u.p.t.currUpSpeed,\
p->u.p.t.avgDownSpeed,\
p->u.p.t.avgUpSpeed);
current_log_count ++;
if(current_log_count == MAX_LOG_COUNT)
{
fflush(statlog);
current_log_count = 0;
}
}
int closure_NP (struct Session *p)
{
int i, id;
struct Session *q;
// write statistics to log file
logStat(p);
// 1. decrease <maxid> if current session is the last
if ((i = p - NPTRACKER.head) == NPTRACKER.maxid && i > 0)
{
for (i--; NPTRACKER.head[i].socket == 0 && i> 0; i--);
NPTRACKER.maxid = i;
}
// 2. delete corresponding session in the session-channel map
delSession (p);
// 3. remove session from the hash table
id = hash_np (p->host, p->npport);
if ((q = NPTRACKER.hash[id]) != p) // not head of chain
{
for (; q && q->hnext != p; q=q->hnext); // search through the chain for the parent of <p>
assert (q);
if (q) q->hnext = p->hnext; // remove
} else NPTRACKER.hash[id] = p->hnext; // head of chain, got it
// 4. clear and free session object to the freelist
memset (p, 0, sizeof (struct Session)); // clear session
p->hnext = NPTRACKER.hash[0];
NPTRACKER.hash[0] = p;
Polluted ++;
NPTRACKER.cur --;
return 0;
}
int init_CP (struct Session *p)
{
const char* servicetype;
servicetype = find_cp_service_type(p->host);
if(servicetype == NULL)
servicetype = "UNKNOWN";
strcpy(p->u.cp.servicetype, servicetype);
PDEBUG("\n******************************************************************\ninit_CP: cp service type is %s\n", servicetype);
int listnum = p - CPTRACKER.head;
if (listnum > CPTRACKER.maxid)
CPTRACKER.maxid = listnum;
CPTRACKER.cur ++;
GCPCHOICE = p;
// add_cp_to_list((void*)p);
return 0;
}
int process_CP (int idsock)
{
int len, listnum;
struct Session *p;
struct TSMessage *m = &UDPMsg;
#ifdef MEASUREMENT
struct timeval tm;
long long msec;
#endif
socklen_t addr_len = sizeof (UDPCLIENT);
memset ((char *)&UDPCLIENT, 0, sizeof (UDPCLIENT));
if ((len = (recvfrom (CPTRACKER.sock[idsock], &UDPMsg, MAX_DATA, 0, (struct sockaddr *)&UDPCLIENT, &addr_len))) < 0)
{
perror ("recvfrom:");
return -1;
}
#ifdef MEASUREMENT
gettimeofday (&tm, NULL);
msec = ((long long)tm.tv_sec) * 1000000l + tm.tv_usec;
#endif
if (m->type == CP2TS_REGISTER)
{
process_CP2TS_REGISTER ((struct Message *)m);
} else
{
listnum = m->authcode1;
p = CPTRACKER.head+listnum;
if (listnum >= CPTRACKER.max || p->socket == 0
|| p->auth != m->authcode2)
{
if (m->type != CP2TS_LOGOUT)
{
PDEBUG("CP error. listnum=%d/%d. socket=%d auth=%d/%d\n", listnum, CPTRACKER.max, p->socket, p->auth, m->authcode2);
SEND_NPMSG(CPTRACKER.sock[idsock],TS2CP_MSG,ERR_INTERNAL,1,&UDPCLIENT);
}
return -1;
}
switch (m->type)
{
case CP2TS_NEED_PEERS: // ECP查询用, 目前尚未使用
process_CP2TS_NEED_PEERS (p, m);
break;
case CP2TS_UPDATE: // 报告CP负载
process_CP2TS_UPDATE (p, m);
break;
case CP2TS_LOGOUT:
closure_CP (p);
break;
default:
closure_CP (p);
break;
}
p->last_access = CurTimeSec;
}
return 0;
}
int closure_CP (struct Session *p)
{
int i, id;
struct Session *q;
//PDEBUG("closure_CP.\n");
//remove_cp_from_list((void*)p);
//PDEBUG("closure_CP OK.\n");
if ((i = p - CPTRACKER.head) == CPTRACKER.maxid && i > 0)
{
for (i--; CPTRACKER.head[i].socket == 0 && i> 0; i--);
CPTRACKER.maxid = i;
}
if (GCPCHOICE == p)
{
for (i=CPTRACKER.maxid; i>=0; i--)
{
if (CPTRACKER.head[i].socket != 0 && CPTRACKER.head[i].u.cp.type == CT_GENERAL)
{
GCPCHOICE = &(CPTRACKER.head[i]);
break;
}
}
}
id = hash_cp (p->host, p->npport);
if ((q = CPTRACKER.hash[id]) != p)
{
for (; q && q->hnext != p; q=q->hnext);
assert (q);
if (q) q->hnext = p->hnext;
} else CPTRACKER.hash[id] = p->hnext;
memset (p, 0, sizeof (struct Session));
p->hnext = CPTRACKER.hash[0];
CPTRACKER.hash[0] = p;
Polluted ++;
CPTRACKER.cur --;
return 0;
}
#ifdef HAVE_RM
int getChannelInfo (char *md5, char **buf)
{
struct Channel *pc;
int i, total=0;
if (strcmp (md5, "*") == 0)
{
for (i=0; i<MAX_CHANNEL; i++)
{
for (pc=ChannelHash[i]; pc; pc=pc->next)
{
memcpy (*buf, pc->name, MD5_LEN);
*buf += MD5_LEN;
*(int *)(*buf) = pc->numclient;
*buf += sizeof (int);
total ++;
}
}
return total;
}
if ((pc=findChannel (md5, MD5_LEN)) != NULL)
{
memcpy (*buf, md5, MD5_LEN);
*buf += MD5_LEN;
*(int *)(*buf) = pc->numclient;
*buf += sizeof (int);
return 1;
}
for (i=0; i<MAX_CHANNEL; i++)
{
for (pc=ChannelHash[i]; pc; pc=pc->next)
{
if (strstr (pc->name, md5) != NULL)
{
memcpy (*buf, pc->name, MD5_LEN);
*buf += MD5_LEN;
*(int *)(*buf) = pc->numclient;
*buf += sizeof (int);
total ++;
}
}
}
return total;
}
int init_RM (struct Session *p)
{
int listnum = p - RMTRACKER.head;
if (listnum > RMTRACKER.maxid)
RMTRACKER.maxid = listnum;
RMTRACKER.cur ++;
return 0;
}
#define RM2TS_STAT_QUERY 0x20
#define TS2RM_STAT_RESPONSE 0x30
int process_RM (int idsock)
{
char buffer[MAX_DATA];
char *p, *buf = buffer;
int * psize;
int querynum;
int len, total, i;
struct Message Msg, *m=&Msg;
#ifdef MEASUREMENT
struct timeval tm;
long long msec;
#endif
socklen_t addr_len = sizeof (UDPCLIENT);
memset ((char *)&UDPCLIENT, 0, sizeof (UDPCLIENT));
if ((len = (recvfrom (RMTRACKER.sock[idsock], m, MAX_DATA, 0, (struct sockaddr *)&UDPCLIENT, &addr_len))) < 0)
{
perror ("recvfrom:");
return -1;
}
#ifdef MEASUREMENT
gettimeofday (&tm, NULL);
msec = ((long long)tm.tv_sec) * 1000000l + tm.tv_usec;
#endif
PDEBUG("got RM msg, len %d. \n", len);
switch (m->type)
{
case RM2TS_STAT_QUERY:
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -