📄 cpnew.c
字号:
}
int process_P2P_REQUEST_real (struct Session *p, struct Channel *pc, int id)
{
struct JobDes *pj = newJob ();
char *buf, *buffer;
struct LiveChannelInfo *pcinfo;
int size, max, i;
buffer = getJobBuffer (pj, &max);
buf = buffer + sizeof (int);
*(unsigned char *)buf = P2P_RESPONSE;
buf += sizeof (char);
if (p->npcp == TYPE_CP)
{
memcpy (buf, pc->channel_md5, MD5_LEN);
buf += MD5_LEN;
}
pcinfo = pc->pcinfo;
if (p->first == 0)
{
p->first ++;
if (p->version >= MEDIATYPE_FIRST)
sendIdMedia (p, pc, id, 0);
}
if (pcinfo->dataSource == NULL)
{
*(int *)buf = id;
buf += sizeof (int);
*(int *)buf = 0;
buf += sizeof (int);
if (isGCP || SCP_CHANNEL)
{
if ((i = newChannel (pc, pcinfo->SPLIST, pcinfo->numofsp, 0)) < 0)
freeLiveChannel (pc, NULL);
} else
{
#ifdef HAVE_TS
UDPMsg.type = CP2TS_NEED_PEERS;
UDPMsg.len = 12+MD5_LEN;
memcpy (UDPMsg.buffer, pc->channel_md5, MD5_LEN);
/* isSet whether TS has returned WELCOME message */
if (isSet && sendMessage (TSSOCK, (char *)&UDPMsg, &TSADDR) < 0)
{
PDEBUG ("exit...\n");
exit (1);
}
#endif
}
} else if ((size=locate_by_id (pc, id, buf, max-32)) > 0)
{
buf += 2*sizeof (int) + size;
if (p->version >= MEDIATYPE_FIRST && (i=isHit (pc, id)) >= 0)
sendHitMedia (p, pc, i, id, 0);
p->last_transferblock = CurrentTime;
} else if (size == -2)
{
assert (0);
PDEBUG ("Leave block %d to next round.\n", id);
return -1;
} else if (id >= 0)
{
*(int *)buf = id;
buf += sizeof (int);
*(int *)buf = 0;
buf += sizeof (int);
PINFO ("no block %d\n", id);
send_P2P_PUSHLIST (pc, id);
}
*(int *)buffer = buf - buffer;
setblockId(pj, id);
writeDATAMessage(p,pc, pj);
// PDEBUG ("Write block %d\n", id);
return 0;
}
int process_P2PS (int listnum)
{
struct Session *p = &(TRACKER[TYPE_P2PS].head[listnum]);
struct Message *m = (struct Message *)(p->buf+p->start);
tmpDownBytes += m->len;
switch (m->type)
{
case P2P_HELLO:
if (process_P2P_HELLO (p, m) == -2)
return -2;
break;
case P2P_PUSHLIST:
if (process_P2P_PUSHLIST (p, m) == -2)
return -2;
break;
case P2P_REPORT: /* At present no action */
break;
case P2P_MSG:
break;
case P2P_SPUPDATE:
break;
case P2P_RESPONSE:
break;
case P2P_NEAR_PEERS:
break;
case P2P_REQMEDIA:
sendIdMedia (p, p->pc, *(int *)(m->buffer), 0);
break;
default:
PDEBUG ("Unknown message format from client\n");
Clientclosure (listnum, TYPE_P2PS);
return -1;
}
return 0;
}
int closure_P2PS (int listnum)
{
struct Session *p = &(TRACKER[TYPE_P2PS].head[listnum]);
// struct Channel *pc = p->pc;
struct Edge *pedge, *prevedge;
PDEBUG ("CP disconnected from %d.%d.%d.%d:%d.\n", IPADDR (p->host), p->port);
for (pedge=p->header; pedge; pedge=prevedge)
{
pedge->head->numclient --;
prevedge=pedge->enext;
delEdge (pedge);
}
FD_CLR(p->socket, &osocks);
close (p->socket);
FREE (p->buf);
deleteAll (p);
memset (p, 0, sizeof (struct Session));
return 0;
}
int init_P2PC (int listnum)
{
return 0;
}
int newChannel (struct Channel *pc, struct NormalAddress *client, int n, int flag)
{
struct Session *p;
int listnum, newconn = -1, max, sock_flag;
struct Session *head;
struct sockaddr_in addr;
char *buf, buffer[MAX_DATA];
if (pc == NULL) return -1;
head = TRACKER[TYPE_P2PC].head;
max = TRACKER[TYPE_P2PC].max;
for (listnum = 0; listnum < max; listnum ++)
{
if(head[listnum].socket == 0)
{
if ((newconn = socket (PF_INET, SOCK_STREAM, 0)) < 0)
{
perror ("socket||gethostbyname");
return -1;
}
memset (&addr, 0, sizeof (addr));
addr.sin_port = client[0].sin_port;
addr.sin_addr = client[0].sin_addr;
addr.sin_family = AF_INET;
if ((sock_flag = connect_nonb(newconn, &addr, sizeof (addr))) == -1)
{
close (newconn);
return -1;
}
head[listnum].socket = newconn;
head[listnum].type = TYPE_P2PC;
head[listnum].flag = 1;
head[listnum].sock_flag = sock_flag;
head[listnum].buf = NEW();
head[listnum].pc = pc;
head[listnum].time_sec = CurrentTime;
head[listnum].totalup = 0;
head[listnum].last_transferblock = CurrentTime;
FD_SET(newconn, &osocks);
if (listnum > TRACKER[TYPE_P2PC].maxid)
TRACKER[TYPE_P2PC].maxid = listnum;
break;
}
}
if (listnum >= max)
{
PDEBUG ("no space left for new incoming client.");
close (newconn);
return -1;
}
TRACKER[TYPE_P2PC].cur ++;
(*(TRACKER[TYPE_P2PC].init)) (listnum);
p = &(TRACKER[TYPE_P2PC].head[listnum]);
pc->pcinfo->dataSource = p;
pc->upsize = 0;
pc->downsize = 0;
PDEBUG("Connect to %s:%d. and send P2P_HELLO.\n", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
buf = buffer+sizeof (int);
*(unsigned char *)buf = P2P_HELLO;
buf += sizeof (char);
memcpy (buf, pc->channel_md5, MD5_LEN);
buf += MD5_LEN;
*(unsigned char *)buf = 0;
buf += sizeof (char);
if (flag == TYPE_CP)
{
*(unsigned char *)buf = pc->pcinfo->numofsp;
buf += sizeof (char);
if (pc->pcinfo->numofsp)
{
memcpy (buf, &(pc->pcinfo->SPLIST), pc->pcinfo->numofsp*sizeof (struct NormalAddress));
buf += pc->pcinfo->numofsp*sizeof (struct NormalAddress);
}
}
*(int *)buffer = buf - buffer;
if (writeMessage (p, pc, buffer) < 0)
{
perror ("CP: write SP");
Clientclosure (listnum, TYPE_P2PC);
return -1;
}
return listnum;
}
int process_P2P_SPUPDATE (int listnum, struct Message *m)
{
int maxq;
struct Edge *pedge;
char buffer[MAX_DATA];
struct Channel *pc;
struct SPUpdate *s = (struct SPUpdate *)(m->buffer+MD5_LEN);
int bShouldCloseSP = 0;
if (m->len < sizeof (struct SPUpdate) + MD5_LEN + 5)
{
PDEBUG ("Invalid message %d, length %d not enough\n", m->type, m->len);
Clientclosure (listnum, TYPE_P2PC);
return -1;
}
*(int *)buffer = sizeof (struct SPUpdate) + sizeof(int) + sizeof (char);
*(unsigned char *)(buffer + sizeof (int)) = P2P_SPUPDATE;
PINFO ("Recv SPUPDATE(%lld,%lld,%u,%u).\n", s->minKeySample, s->maxKeySample, s->minBlockID, s->maxBlockID);
memcpy (buffer+sizeof(int)+sizeof(char), s, sizeof(struct SPUpdate));
if ((pc=findChannel (m->buffer, MD5_LEN)) == NULL) return -1;
if (s->minBlockID == 0xffffffff && s->maxBlockID == 0xffffffff)
{
pc->pcinfo->dataSource = NULL; /* indication of end */
if(s->minKeySample == -1 && s->maxKeySample == -1)
PDEBUG("NO SUCH RESOURCE!\n"); // no such resource
else if(s->minKeySample == 0 && s->maxKeySample == 0)
PDEBUG("CHANNEL HAS BEEN CLOSED!\n");// channel has been closed
else
PDEBUG("UNKNOWN MESSAGE! 1\n"); // unknown message
bShouldCloseSP = 1; // should close connection
} else
{
if(s->minBlockID == 0 && s->maxBlockID == 0 && s->minKeySample == 0 && s->maxKeySample == 0)
{
PDEBUG("END OF CHANNEL!\n"); // end of channel
bShouldCloseSP = 1;
} else if (s->minKeySample == -1ULL && s->maxKeySample == -1ULL)
{
INIT_MAXQ(pc,s,maxq);
return 0;
} else if (s->minKeySample == -2LL)
{
INIT_MAXQ(pc,s,maxq);
pc->type = T_PLIST;
pc->pcinfo->max_channel = (int)(s->maxKeySample);
if (pc->pcinfo->max_channel <= 0 || pc->pcinfo->max_channel >= MAX_FILEINPUT)
return -1;
if (pc->pcinfo->media != NULL)
freeMedia (pc);
pc->pcinfo->media = calloc (sizeof (struct MediaData), pc->pcinfo->max_channel);
return 0;
} else if (s->minKeySample == -3LL)
{
pc->pcinfo->max_channel = 1;
if (pc->pcinfo->media != NULL)
freeMedia (pc);
pc->pcinfo->media = calloc (sizeof (struct MediaData), 1);
} else
{
memcpy (&(pc->pcinfo->s), s, sizeof (struct SPUpdate));
// request block after spupdate, not wait!
// now, block will be sent automaticlly by SP
// send_P2P_PUSHLIST (pc, s->maxBlockID);
}
}
{
int i = 0;
unsigned char vcode = 0;
for (i = 0; i < sizeof (struct SPUpdate); ++i) {
vcode += ((unsigned char*)s)[i];
}
buffer[*(int*)buffer] = vcode;
++*(int*)buffer;
}
for (pedge=pc->PeerHead; pedge; pedge=pedge->cnext)
{
if (pedge->me->npcp == TYPE_CP)
{
// PDEBUG ("Send SPUPDATE to CP %d.\n", pedge->me-TRACKER[TYPE_P2PC].head);
writeMessage (pedge->me, pc, (char *)m);
} else
{
// PDEBUG ("Send SPUPDATE to NP %d.\n", pedge->me-TRACKER[TYPE_P2PS].head);
writeMessage (pedge->me, pc, buffer);
}
}
if(bShouldCloseSP != 0 || pedge == pc->PeerHead/* no NP*/)
return -1; // Close Connection to SP
return 0;
}
int process_P2P_RESPONSE (int listnum, struct Message *m)
{
int size;
struct Session *p = &(TRACKER[TYPE_P2PC].head[listnum]);
struct Channel *pc;
char *msg = m->buffer+MD5_LEN;
if ((pc=findChannel (m->buffer, MD5_LEN)) == NULL)
return -1;
if ((size = saveBlock (pc, msg, p)) <= 0)
{
PDEBUG ("save block error, size %d, %d\n", size, listnum);
return -1;
// Clientclosure (listnum, TYPE_P2PC);
}
p->last_transferblock = CurrentTime;
return 0;
}
int process_P2PC (int listnum)
{
struct Session *p = &(TRACKER[TYPE_P2PC].head[listnum]);
struct Message *m = (struct Message *)(p->buf+p->start);
tmpDownBytes += m->len;
switch (m->type)
{
case P2P_SPUPDATE:
if(process_P2P_SPUPDATE (listnum, m) < 0)
{
Clientclosure(listnum, TYPE_P2PC);
}
break;
case P2P_RESPONSE:
process_P2P_RESPONSE (listnum, m);
break;
case P2P_MSG:
break;
case P2P_MEDIATYPE:
process_P2P_MEDIATYPE (listnum, m);
break;
default:
PDEBUG("Err msg type from SP.\n");
Clientclosure (listnum, TYPE_P2PC);
return -1;
}
return 0;
}
int closure_P2PC (int listnum)
{
// struct Edge *pedge, *prevedge;
// char buffer[MAX_DATA], *buf;
struct Session *p=&(TRACKER[TYPE_P2PC].head[listnum]);
struct Channel *pc = p->pc;
if (pc)
{
/*
buf = buffer + sizeof (int);
*(unsigned char *)buf = P2P_SPUPDATE;
buf += sizeof (char);
memcpy (buf, pc->channel_md5, MD5_LEN);
buf += MD5_LEN;
memset (buf, 0, sizeof (struct SPUpdate));
buf += sizeof (struct SPUpdate);
*(int *)buffer = buf - buffer;
for (pedge=pc->PeerHead; pedge; pedge=prevedge)
{
pc->numclient --;
writeMessage (pedge->me, buffer);
prevedge = pedge->cnext;
delEdge (pedge);
}
*/
pc->pcinfo->dataSource = NULL;
}
PDEBUG ("SP disconnected from %d.%d.%d.%d:%d.\n", IPADDR (p->host), p->port);
FD_CLR(p->socket, &osocks);
close (p->socket);
FREE (p->buf);
deleteAll (p);
memset (p, 0, sizeof (struct Session));
return 0;
}
#ifdef HAVE_TS
int register_cp () //send UDP msg
{
const int max_times = 10;
int i;
char *buf;
char buffer[MAX_DATA];
isSet = 0;
if (TSSOCK == 0)
{
for (i=0; i<max_times; i++)
{
if (BINDALL == 0)
TSSOCK = init_udp (BINDIP, cfgCP2TS_PORT);
else
TSSOCK = init_udp (NULL, cfgCP2TS_PORT);
if (TSSOCK > 0) break;
PDEBUG("Sleep 1000. cause init UDP port %d failed.", cfgCP2TS_PORT);
sleep (1000);
}
if (TSSOCK <= 0)
{
PDEBUG ("exit...\n");
exit (1); //the max times try init_udp failure
}
}
buf = buffer + sizeof (int);
*(unsigned char *)buf = CP2TS_REGISTER;
buf += sizeof (char);
*(int *)buf = AUTH_USERID;
buf += sizeof (int);
strncpy (buf, AUTH_MD5, MD5_LEN);
buf += MD5_LEN;
*(unsigned short *)buf = htons (cfgP2PS_PORT);
buf += sizeof (short);
if (SCP_CHANNEL) //Now only GCP is available
{
*buf = CT_SPECIFIED_RES;
buf += sizeof (char);
memcpy (buf, SCP_CHANNEL, strlen(SCP_CHANNEL)+1);
buf += strlen (SCP_CHANNEL)+1;
} else if (ECP_REGION)
{
*buf = CT_EDGE;
buf += sizeof (char);
buf = parseECP (ECP_REGION, buf);
} else
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -