📄 tsnew.c
字号:
numnp = findNPPeers (pc, p, cur, num, &buf, buffer1);
} else
{
numnp = findNPPeers (pc, p, cur, num, &buf, NULL);
}
*psize = numnp; // set # of NP
*(int *)buffer = buf - buffer;
if (sendMessage(conn,buffer,&UDPCLIENT) < 0)
{
// closure_NP (p);
PDEBUG("send msg err\n");
return -1;
}
PDEBUG ("find %d NP and %d CP\n", numnp, numcp);
return numnp+numcp;
}
/* 查询Peer信息, 使用findCPPeer寻找合适的CP, 使用findNPPeers寻找合适的NP.
NP寻找时, 找到结果后按照networks来排序,保证在同一个网络中的排在前面. */
int process_NP2TS_NEED_PEERS (struct Session *p, struct TSMessage *m)
{
char *buf = m->buffer;
int needcp;
needcp = *(unsigned char *)buf;
buf += sizeof (char);
if(p->u.p.cur == NULL)
{
PDEBUG("no current in NPInfo\n");
return -1;
}
p->u.p.cur->current = *(unsigned int *)buf;
buf += sizeof (int);
p->u.p.p.layer= *(unsigned char *)buf;
buf += sizeof (char);
if (buf - m->buffer + AUTH_HEADER > m->len)
{
PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.\n", m->len);
closure_NP (p);
return -1;
}
return process_NEED_PEERS_real (p, NULL, needcp, p->u.p.cur->current, p->u.p.p.layer);
}
int findCPPeers (unsigned long host, char *md5, char **buffer)
{
// float minband = -1;
struct Session* p = NULL, *choice1 = NULL, *choice2 = NULL;
struct NormalAddress *addr1;
struct NormalAddress *addr2;
int i;
// We should start with a random address
p = CPTRACKER.head + rand() % (CPTRACKER.maxid+1);
//use pure random address instead of comparing the band
PDEBUG("In findCPPeers.\n");
int minpriority = -1;
for( i=0; i <= CPTRACKER.maxid && host != 0; i++, p++)
{
PDEBUG("begin to call findcppeers.\n");
if( p-CPTRACKER.head > CPTRACKER.maxid ) // round back to head
{
p = CPTRACKER.head;
}
if(p->u.cp.maxConn == 1 || p->socket == 0)
{
PDEBUG("Invalid CP.\n");
continue;
}
int priority = findcppeers(host, (void*)p);
//如果不是对应的CP,查找下一个
if(priority == -1)
{
PDEBUG("not found.\n");
continue;
}
PDEBUG("found: host : %d servicetype: %s.\n", host, p->u.cp.servicetype);
if(minpriority == -1)
minpriority = priority;
if(priority == 1)// priority - 1~n
{
if(choice1 == NULL)
{
choice1 = p;
minpriority = -1;
}
else
{
choice2 = p;
break;
}
}
else
{
if(priority < minpriority)
{
if(choice1 == NULL)
{
choice1 = p;
minpriority = -1;
}
else
{
choice2 = p;
break;
}
}
}
}
if(i > CPTRACKER.maxid)//find nothing via findcppeers, return CP directly
{
PDEBUG("begin to find sequencely.\n");
for( i=0; i <= CPTRACKER.maxid; i++, p++)
{
if( p-CPTRACKER.head > CPTRACKER.maxid ) // round back to head
{
p = CPTRACKER.head;
}
if(p->u.cp.maxConn == 1 || p->socket == 0)
continue;
// 始终保证choice1比choice2先获得值,如果两个都有了就break
if(choice1 == NULL)
{
choice1 = p;
}
else if(p->host == choice1->host)// 避免返回两个相同的CP
continue;
else if(choice2 == NULL)
{
choice2 = p;
}
else
break;
}
}
int found_cp_count = 0;
if (choice1)
{
addr1 = (struct NormalAddress *)*buffer;
addr1->sin_family = PF_INET;
addr1->sin_port = htons (choice1->npport);
addr1->sin_addr.s_addr = htonl (choice1->host);
*buffer += sizeof (*addr1);
found_cp_count ++; // one CP is found
}
if(choice2)
{
addr2 = (struct NormalAddress *)*buffer;
addr2->sin_family = PF_INET;
addr2->sin_port = htons (choice2->npport);
addr2->sin_addr.s_addr = htonl (choice2->host);
*buffer += sizeof (*addr2);
found_cp_count ++;
}
return found_cp_count;// 返回最终找到的CP的个数
}
#ifndef SORT_NET
int findNPPeers (struct Channel *pc, struct Session *me, int playing, int num, char **buffer, char *buffer1)
{
struct Edge *pedge = NULL;
struct Session *ps;
struct P2PAddress addr;
int j, k;
unsigned int randstart;
struct sockaddr_in client;
if (pc == NULL || pc->numclient <= 0) return 0;
if (me->cachepeer != NULL && me->cachepeer->u.p.header != NULL && me->type == TYPE_NP)
{
for (pedge=me->cachepeer->u.p.header; pedge && pedge->head != pc; pedge=pedge->enext);
}
if (pedge == NULL)
{
randstart = rand () % pc->numclient;
for (pedge=pc->PeerHead; pedge && randstart > 0; pedge=pedge->cnext, randstart--);
}
for (j=0,k=0; j<num && k<pc->numclient; pedge=pedge->cnext)
{
k++;
if (pedge == NULL) pedge = pc->PeerHead;
if ((ps = pedge->me) == me) continue; // exclude myself
if (CurTimeSec > pedge->me->last_access+60) continue; // don't bother it too often
if (ps->u.p.p.isMaxIn == 0 && ((me->type == TYPE_CP) || playing == 0xffffffff || check_valid (pedge, playing)))
{
if (ps->intra == 0xffffffff || ps->host == me->host) // 0xffffffff means ps is on the public network, ps->host == me->host means in the same private network
{
memcpy (*buffer, &(ps->u.p.p), sizeof (struct CorePeerInfo));
addr = (*buffer) + sizeof (struct CorePeerInfo);
addr->outerIP.sin_port = htons (ps->port);
addr->outerIP.sin_addr.s_addr = htonl (ps->host);
addr->subnetIP.sin_port = htons (ps->npport);
addr->subnetIP.sin_addr.s_addr = htonl (ps->intra);
addr->outerIP.sin_family = PF_INET;
addr->subnetIP.sin_family = PF_INET;
*buffer = (struct char *)(addr + 1);
j++;
} else if (buffer1)
{
memset ((char *)&client, 0, sizeof (client));
client.sin_port = htons (ps->port);
client.sin_family = AF_INET;
client.sin_addr.s_addr = htonl (ps->host);
if (sendMessage(ps->socket,buffer1, &client) < 0)
{
closure_NP (ps);
}
PDEBUG("Send ConnecTo to %s\n", inet_ntoa(client.sin_addr));
}
}
}
if (pedge != NULL) me->cachepeer = pedge->me;
return j;
}
#else
int findNPPeers (struct Channel *pc, struct Session *me, int playing, int num, char **buffer, char *buffer1)
{
int i, j, k, m, mnetnum;
unsigned int randstart;
struct Session *result[MAX_PEER];
struct Session *ps;
struct sockaddr_in client;
struct Edge *pedge;
struct P2PAddress *addr;
if (pc == NULL || pc->numclient <= 0) return 0;
Net = me->net;
for (j=0,m=0; m<MAX_NET_NUM; m++) { mnetnum = (Net + m) % MAX_NET_NUM;
k = 0;
if (pc->nclient_net[mnetnum] <= 0) continue;
pedge = NULL;
if (me->cachepeer[mnetnum] != NULL && me->cachepeer[mnetnum]->u.p.header != NULL && me->type == TYPE_NP)
{
for (pedge=me->cachepeer[mnetnum]->u.p.header; pedge && pedge->head != pc; pedge=pedge->enext);
}
if (pedge == NULL)
{
randstart = rand () % pc->nclient_net[mnetnum];
for (pedge=pc->PeerHead[mnetnum]; pedge && randstart > 0; pedge=pedge->cnext, randstart--);
}
for (; j<MAX_PEER && k < pc->nclient_net[mnetnum]; pedge=pedge->cnext)
{
k++;
if (pedge == NULL) pedge = pc->PeerHead[mnetnum];
if ((ps = pedge->me) == me) continue;
if (CurTimeSec > pedge->me->last_access+60) continue;
if (ps->u.p.p.isMaxIn == 0 && ((me->type == TYPE_CP) || playing == 0xffffffff || check_valid (pedge, playing)))
{
if (ps->intra == 0xffffffff || ps->host == me->host)
{
result[j] = ps;
j++;
} else if (buffer1)
{
memset ((char *)&client, 0, sizeof (client));
client.sin_port = htons (ps->port);
client.sin_family = AF_INET;
client.sin_addr.s_addr = htonl (ps->host);
if (sendMessage(ps->socket,buffer1, &client) < 0)
{
closure_NP (ps);
}
PDEBUG("Send ConnecTo to %s\n", inet_ntoa(client.sin_addr));
}
}
}
if (pedge != NULL) me->cachepeer[mnetnum] = pedge->me;
}
if (j > 1)
qsort (result, j, sizeof (struct Session *), compareSession);
if (num > 0 && j > num) j = num;
if (me->type == TYPE_NP)
PDEBUG ("NP %d find %d NP:", me-NPTRACKER.head, j);
else
PDEBUG ("CP %d find %d CP:", me-CPTRACKER.head, j);
for (i=0; i<j; i++)
{
ps = result[i];
PDEBUG ("%d\t", ps-NPTRACKER.head);
memcpy (*buffer, &(ps->u.p.p), sizeof (struct CorePeerInfo));
addr = (struct P2PAddress *)((*buffer) + sizeof (struct CorePeerInfo));
addr->outerIP.sin_port = htons (ps->port);
addr->outerIP.sin_addr.s_addr = htonl (ps->host);
addr->subnetIP.sin_port = htons (ps->npport);
addr->subnetIP.sin_addr.s_addr = htonl (ps->intra);
addr->outerIP.sin_family = PF_INET;
addr->subnetIP.sin_family = PF_INET;
*buffer = (char *)(addr + 1);
}
PDEBUG ("\n");
return j;
}
#endif
int process_CP2TS_UPDATE (struct Session *p, struct TSMessage *m)
{
char *buf = m->buffer;
p->u.cp.resnum = *(int *)buf;
buf += sizeof (int);
p->u.cp.connnum = *(unsigned short *)buf;
buf += sizeof (unsigned short);
p->u.cp.band = *(float *)buf;
buf += sizeof(float);
p->u.cp.maxConn = *(unsigned char *)buf;
buf += sizeof(char);
if (buf - m->buffer + AUTH_HEADER > m->len)
{
PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.\n", m->len);
closure_CP (p);
return -1;
}
if (p->u.cp.type == CT_GENERAL &&
(GCPCHOICE == NULL || p->u.cp.resnum < GCPCHOICE->u.cp.resnum))
GCPCHOICE = p;
if (p->u.cp.resnum < 0) p->u.cp.resnum = 0;
if (p->u.cp.band < 0) p->u.cp.band = 0.001;
return 0;
}
int process_CP2TS_NEED_PEERS (struct Session *p, struct TSMessage *m)
{
return process_NEED_PEERS_real (p, m->buffer/*md5*/, 1/*needcp*/, 0/*cur*/, 0/*layer*/);
}
/* 登录, CP向TS登录, 按照来源IP地址和所报告的npport进行hash,
如果距离上次发送CP2TS_REGISTER消息的时间小于SILENCE_TIME, 则直接返回,否则发送WELCOME消息. */
int process_CP2TS_REGISTER (struct Message *m)
{
char md5[MD5_LEN+1];
unsigned int host, cur = 0;
unsigned short port, npport;
int id, userID;
char *buf;
struct Session *p;
buf = m->buffer;
userID = *(int *)buf;
buf += sizeof (int);
memcpy (md5, buf, MD5_LEN);
md5[MD5_LEN] = 0;
buf += MD5_LEN;
#ifdef HAVE_MYSQL
if (authUser (userID, md5, local_mysql, NULL) == 0)
{
}
#endif
npport = ntohs(*(unsigned short *)buf);
buf += sizeof (short);
host = ntohl (UDPCLIENT.sin_addr.s_addr);
port = ntohs (UDPCLIENT.sin_port);
id = hash_cp (host, npport);
for (p=CPTRACKER.hash[id]; p; p=p->hnext)
if (p->host == host && p->port == port && p->npport == npport)
break;
if (!p)
{
if (CPTRACKER.cur >= CPTRACKER.max || CPTRACKER.hash[0] == 0)
{
PDEBUG("CP reg err, wrong cp index in array.\n");
SEND_NPMSG(CPTRACKER.sock[CurrentSock],TS2CP_MSG,ERR_INTERNAL,1,&UDPCLIENT);
return -1;
}
p = CPTRACKER.hash[0];
CPTRACKER.hash[0] = p->hnext;
p->hnext = CPTRACKER.hash[id];
CPTRACKER.hash[id] = p;
p->socket = CPTRACKER.sock[CurrentSock];
p->type = TYPE_CP;
p->port = port;
p->npport = npport;
p->host = host;
#ifdef SORT_NET
{
struct networks *pnetworks = getnetwork (host, NETBLOCKS, maxNet);
if (pnetworks) p->net = pnetworks->net;
else p->net = 0;
}
#endif
p->time_sec = cur;
p->u.cp.userid = userID;
p->u.cp.type = *(unsigned char *)buf;
buf += sizeof (char);
if (p->u.cp.type == CT_EDGE)
{
p->u.cp.numHeads = *(unsigned short *)buf;
buf += sizeof (short);
if (((char *)m) +m->len - buf <= sizeof (p->u.cp.parameter))
memcpy (p->u.cp.parameter, buf, (char *)m+m->len-buf);
else
memcpy (p->u.cp.parameter, buf,sizeof (p->u.cp.parameter));
if (p->u.cp.numHeads > sizeof(p->u.cp.parameter)/2)
p->u.cp.numHeads = sizeof (p->u.cp.parameter)/2;
} else if (p->u.cp.type == CT_SPECIFIED_RES)
{
memcpy (p->u.cp.parameter, buf, MD5_LEN);
buf += MD5_LEN;
}
p->auth = random ();
init_CP (p);
} else if (CurTimeSec < p->last_access + SILENCE_TIME)
return 0;
if (buf - m->buffer + NORMAL_HEADER > m->len)
{
PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.\n", m->len);
closure_CP (p);
return -1;
}
PDEBUG("CP register. socket=%d listnum=%d auth=%d.\n", p->socket, p-CPTRACKER.head, p->auth);
UDPMsg.len = 12;
UDPMsg.type=TS2CP_WELCOME;
UDPMsg.authcode1 = p-CPTRACKER.head;
UDPMsg.authcode2 = p->auth;
if (sendMessage(p->socket,(char *)&UDPMsg, &UDPCLIENT) < 0)
{
closure_CP (p);
return -1;
}
return 0;
}
int compareInter (const void *a, const void *b)
{
struct Interval *p = (struct Interval *) a;
struct Interval *q = (struct Interval *) b;
if ((p->start >= q->start && p->start+p->len <= q->start + q->len)
|| (q->start >= p->start && q->start+q->len <= p->start + p->len))
return 0;
return (p->start - q->start);
}
// delete用于从原有的Interval当中去掉新的.
int delete_interval (struct Interval *head, int total, struct Interval *_new, int num)
{
int i,j,k;
struct Interval tmp[MAX_INTERVAL*2];
for (i=0,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -