📄 spnew.c
字号:
{
waitpid (pid, NULL, 0);
}
*/
}
return 0;
}
int
process_P2P_HELLO (struct Session *p, struct Message *m)
{
struct SPUpdate spupdate;
struct Edge *pedge;
struct Channel *pc;
int listnum;
PDEBUG ("CP %d.%d.%d.%d:%d join channel %.32s\n",
IPADDR (p->host), p->port, m->buffer);
listnum = p - TRACKER[TYPE_CP].head;
if ((pc = findChannel (m->buffer, MD5_LEN)) == NULL)
{
if ((pc = findOrder (m->buffer, MD5_LEN)) == NULL &&
(pc = newOrder (m->buffer)) == NULL)
{
BUILD_NOCH_SPUPDATE(spupdate);
send_P2P_SPUPDATE (p, pc, m->buffer, &spupdate);
PDEBUG ("Cannot find channel %.32s\n",
m->buffer);
//Clientclosure (listnum, TYPE_CP);
} else
{
BUILD_ORDER_SPUPDATE(spupdate,pc);
send_P2P_SPUPDATE (p, pc, m->buffer, &spupdate);
PDEBUG ("channel %.32s spupdate %d~%d. \n",
m->buffer, spupdate.minBlockID,
spupdate.maxBlockID);
}
} else
{
for (pedge = p->header; pedge && pedge->head == pc;
pedge = pedge->enext);
if (!pedge)
{
pedge = newEdge (pc, p);
pc->numclient++;
}
if (pc->pcinfo)
{
if (pc->pcinfo->status > 0)
{
// this channel has been closed
BUILD_CLOSE_SPUPDATE(spupdate);
send_P2P_SPUPDATE (p, pc, m->buffer, &spupdate);
PDEBUG ( "channel %.32s has been closed. \n",
m->buffer);
//Clientclosure (listnum, TYPE_CP);
} else if (pc->pcinfo->mlist != NULL)
{
BUILD_MLIST_SPUPDATE (spupdate, pc);
send_P2P_SPUPDATE (p, pc, m->buffer, &spupdate);
sendMedia (p, pc);
PDEBUG ("Channel %.32s is a playlist.\n", m->buffer);
} else
{
BUILD_LIVE_SPUPDATE(spupdate,pc);
send_P2P_SPUPDATE (p, pc, m->buffer, &spupdate);
sendMedia (p, pc);
}
} else
{
BUILD_ORDER_SPUPDATE(spupdate,pc);
send_P2P_SPUPDATE (p, pc, m->buffer, &spupdate);
PDEBUG ("channel %.32s spupdate %d~%d. \n",
m->buffer, spupdate.minBlockID,
spupdate.maxBlockID);
}
}
return 0;
}
int
process_P2P_PUSHLIST (struct Session *p, struct Message *m)
{
struct Edge *pedge;
struct Channel *pc;
char *buf;
int i,size, type;
if ((pc = findChannel (m->buffer, MD5_LEN)) == NULL)
pc = newOrder (m->buffer);
else if (pc->pcinfo != NULL)
{
for (pedge = p->header; pedge && pedge->head != pc;
pedge = pedge->enext);
if (pedge == NULL)
{
pedge = newEdge (pc, p);
pc->numclient++;
}
}
if (pc == NULL)
return -1;
if (p->numjob >= MAX_JOB_PER_SESSION)
return -2;
buf = m->buffer + MD5_LEN;
type = *(unsigned char *) buf;
buf += sizeof (char);
size = *(unsigned char *) buf;
buf += sizeof (char);
if (type)
{
deleteChannel (p, pc);
for (i=0; i<size; i++)
if (process_P2P_REQUEST_real (p, pc, ((unsigned int *)buf)[i]) < 0)
return -1;
} else
{
for (i=0; i<size; i++)
if (process_P2P_REQUEST_real (p, pc, ((unsigned int *)buf)[i]) < 0)
return -1;
buf += size * sizeof (int);
size = *(unsigned char *) buf;
buf += sizeof (char);
deleteJob (p, pc, (unsigned int *) buf, size);
}
return 0;
}
int
process_P2P_REQUEST_real (struct Session *p, struct Channel *pc, int id)
{
struct JobDes *pj = newJob ();
char *buf, *buffer;
int listnum, size=0, max;
struct SPUpdate spupdate;
if (pj == NULL) return -1;
buffer = getJobBuffer (pj, &max);
listnum = p - TRACKER[TYPE_CP].head;
buf = buffer + sizeof (int);
*(unsigned char *) buf = P2P_RESPONSE;
buf += sizeof (char);
memcpy (buf, pc->channel_md5, MD5_LEN);
buf += MD5_LEN;
if (pc->pcinfo == NULL)
{
*(int *) buf = id;
buf += sizeof (int);
if ((size =
locate_order_by_id (pc, id, buf + sizeof (int),
max)) < 0 && size != -2)
{
// BLOCK NOT FOUND
spupdate.minKeySample = -1LL;
spupdate.maxKeySample = -1LL;
spupdate.minBlockID = 0xffffffff;
spupdate.maxBlockID = 0xffffffff;
send_P2P_SPUPDATE (p, pc, pc->channel_md5, &spupdate);
size = 0;
*(int *) buf = 0;
buf += sizeof (int);
} else if (size == -2)
{
PDEBUG ("Leave blocks %d to next round.\n", id);
return -1;
} else
{
// block found
*(int *) buf = size;
buf += sizeof (int) + size;
p->last_transferblock = CurrentTime;
}
// Clientclosure (listnum, TYPE_CP);
} else if (id >= 0 && pc->pcinfo->mlist != NULL
&& (size = locate_mplist_by_id (pc, id, buf, max - 32)) > 0)
{
p->last_transferblock = CurrentTime;
buf += 2 * sizeof (int) + size;
} else if (id >= 0 && pc->pcinfo->mlist == NULL
&& (size = locate_by_id (pc, id, buf, max - 32)) > 0)
{
p->last_transferblock = CurrentTime;
buf += 2 * sizeof (int) + size;
} else if (size == -2)
{
assert (0);
PDEBUG ("Leave blocks %d to next round.\n", id);
return -1;
} else
{
*(int *) buf = id;
buf += sizeof (int);
size = 0;
*(int *) buf = 0;
buf += sizeof (int);
PDEBUG ("Cannot find block id %d required by client %d.%d.%d.%d.\n",
id, IPADDR (p->host));
}
*(int *) buffer = buf - buffer;
setblockId (pj, id);
writeDATAMessage (p, pc, pj);
// PDEBUG ("Write block %d to %d.%d.%d.%d\n", id,
// IPADDR (p->host));
return 0;
}
int
init_CP (int listnum)
{
return 0;
}
int
process_CP (int listnum)
{
int ret;
struct Session *p = &(TRACKER[TYPE_CP].head[listnum]);
struct Message *m = (struct Message *) (p->buf + p->start);
tmpDownBytes += m->len;
switch (m->type)
{
case P2P_HELLO:
ret= process_P2P_HELLO (p, m);
break;
case P2P_PUSHLIST:
ret = process_P2P_PUSHLIST (p, m);
break;
case P2P_MSG:
break;
default:
ret = -1;
break;
}
switch (ret)
{
case -1:
PDEBUG ("Message processing error from client %d.%d.%d.%d\n",
IPADDR (p->host));
Clientclosure (listnum, TYPE_CP);
return -1;
case -2:
return -2;
default:
return 0;
}
}
int
closure_CP (int listnum)
{
struct Session *p = &(TRACKER[TYPE_CP].head[listnum]);
// struct Channel *pc = p->pc;
struct Edge *pedge, *nextedge;
PDEBUG ("CP disconnected from %d.%d.%d.%d:%d\n",
IPADDR (p->host), p->port);
for (pedge = p->header; pedge; pedge = nextedge)
{
nextedge = pedge->enext;
if (pedge->head)
pedge->head->numclient--;
delEdge (pedge);
}
FD_CLR (p->socket, &osocks);
close (p->socket);
FREE (p->buf);
deleteAll (p);
memset (p, 0, sizeof (struct Session));
return 0;
}
int
init_CS (int listnum)
{
return 0;
}
int
process_CS2SP_REGISTER (int listnum, char *msg)
{
int i, errmsg;
char cname[MAX_LINE], cmd5[MD5_LEN + 1], buffer[MAX_DATA];
//char escape_buf[MAX_LINE];
char md5[MD5_LEN + 1];
struct Session *p = &(TRACKER[TYPE_CS].head[listnum]), *source;
struct Channel *pc;
float bitrate, limitedbitrate=10000.0;
int id, startblock, size, maxblocksize, datalen, issave=0;
size = *(unsigned char *) msg;
msg += sizeof (char);
if (size > sizeof (cname) || size <= 0) //wrong Message
{
Clientclosure (listnum, TYPE_CS);
return -1;
} else
memcpy (cname, msg, size);
cname[size] = 0;
msg += size;
// memcpy (cmd5, msg, MD5_LEN);
// msg += MD5_LEN;
id = *(int *) msg;
msg += sizeof (int);
sprintf (buffer, "%d@%s_%s", id, defaultspip, cname);
md5_calc ((unsigned char *) md5, (unsigned char *) buffer,
strlen (buffer));
for (i = 0; i < MD5_LEN; i += 2)
sprintf (cmd5 + i, "%02x", (unsigned char) md5[i / 2]);
cmd5[MD5_LEN] = 0;
memcpy (md5, msg, MD5_LEN);
msg += MD5_LEN;
maxblocksize = *(int *) msg;
msg += sizeof (int);
bitrate = *(float *) msg;
msg += sizeof (float);
datalen = *(unsigned short *) msg;
if (datalen > MAX_LINE)
{
Clientclosure (listnum, TYPE_CS);
return -1;
}
msg += sizeof (short);
if (AuthCS && (errmsg =
isAllowed (id, md5, cname, bitrate, &limitedbitrate, &issave)) < 0)
{
PDEBUG ("User %d is not allowed to newchannel %s.\n",
id, cname);
send_p2p_err (p, -errmsg, 1);
Clientclosure (listnum, TYPE_CS);
return -1;
}
startblock = 0;
if ((pc = findChannel (cmd5, MD5_LEN)) != NULL)
{
if (pc->pcinfo == NULL || pc->pcinfo->mlist != NULL)
{
PDEBUG ("The channel %s is a playlist.\n", cname);
send_p2p_err (p, ERR_INTERNAL, 1);
Clientclosure (listnum, TYPE_CS);
return -1;
}
if ((source = pc->pcinfo->dataSource) != NULL)
{
Clientclosure (source - TRACKER[TYPE_CS].head,
TYPE_CS);
}
}
if ((pc = newLiveChannel (cname, p, cmd5, bitrate,
maxblocksize)) != (struct Channel *) 0)
{
p->pc = pc;
pc->pcinfo->userid = id;
pc->pcinfo->limitedBitRate = limitedbitrate;
pc->pcinfo->isSave = issave;
pc->pcinfo->dataSource = &(TRACKER[TYPE_CS].head[listnum]);
pc->pcinfo->startid = pc->pcinfo->maxID = (CurrentTime - FIX_MAGIC) * 16;
} else
{
PDEBUG ("newLiveChannel failed.\n");
send_p2p_err (p, ERR_INTERNAL, 1);
Clientclosure (listnum, TYPE_CS);
return -1;
}
*(int *) buffer = 9;
*(char *) (buffer + sizeof (int)) = SP2CS_WELCOME;
*(int *) (buffer + sizeof (int) + sizeof (char)) = startblock;
if (writeMessage (p, pc, buffer) < 0)
{
send_p2p_err (p, ERR_INTERNAL, 1);
Clientclosure (listnum, TYPE_CS);
return -1;
}
pc->pcinfo->cur_channel = pc->pcinfo->max_channel = 1;
pc->pcinfo->media = calloc (1, sizeof (struct MediaData));
pc->pcinfo->media[0].start = 0;
pc->pcinfo->media[0].len = -1;
pc->pcinfo->media[0].dlen = datalen;
pc->pcinfo->media[0].data = calloc (1, datalen);
memcpy (pc->pcinfo->media[0].data, msg, datalen);
#ifdef TEST
/*
for (type = 0; type < MAX_TS; ++type)
buildGTV (pc, datalen, msg, type);
*/
for (i=0; i<MAX_TS; i++) {
if (buildGTV (pc, datalen, msg, i) < 0)
continue;
}
#endif
#ifdef HAVE_MYSQL
// query_mysql (local_mysql, "delete from channel where ChannelMD5 = \"%s\"", cmd5);
// mysql_escape_string (escape_buf, msg, datalen);
// query_mysql (local_mysql, "insert into channel (ChannelName, ChannelBitrate, ChannelAttachData, ChannelMD5, ChannelOwnerID) values (\"%s\",\"%f\", \"%s\", \"%s\", \"%d\")", cname, bitrate, escape_buf, cmd5, id);
#endif
return 0;
}
int
process_CS2SP_UPDATE (int listnum, float rate) //only update bitrate
{
struct Session *p = &(TRACKER[TYPE_CS].head[listnum]);
struct Channel *pc = p->pc;
struct LiveChannelInfo *pcinfo = pc->pcinfo;
pcinfo->bitrate = rate;
if (rate > pcinfo->limitedBitRate)
{
send_p2p_err (p, ERR_EXCEED_BITRATE, 1);
Clientclosure (listnum, TYPE_CS);
return -1;
}
#ifdef HAVE_MYSQL
// query_mysql (local_mysql, "update channel set ChannelBitrate = \"%f\" where ChannelMD5 = \"%s\"", rate, pc->channel_md5);
#endif
return 0;
}
int
process_CS2SP_BLOCK (int listnum, char *msg)
{
char *buf, buffer[MAX_DATA];
struct Edge *pedge;
int size; // max=TRACKER[TYPE_CP].maxid+1;
struct Session *p = &(TRACKER[TYPE_CS].head[listnum]);
// struct Session *q=TRACKER[TYPE_CP].head;
struct Channel *pc = p->pc;
struct LiveChannelInfo *pcinfo;
if (p->pc == NULL || (pcinfo = p->pc->pcinfo) == NULL || pcinfo->mlist != NULL)
{
PDEBUG ("Unmatched channel\n");
Clientclosure (listnum, TYPE_CS);
return -1;
}
pcinfo = p->pc->pcinfo;
if ((size = saveBlock (pc, msg, p)) > 0)
{
// directly send this block to connected CPs
buf = buffer + sizeof (int);
*(unsigned char *) buf = P2P_RESPONSE;
buf += sizeof (char);
memcpy (buf, pc->channel_md5, MD5_LEN);
buf += MD5_LEN;
p->last_transferblock = CurrentTime;
memcpy (buf, msg, size + 2 * sizeof (int));
buf += size + 2 * sizeof (int);
*(int *) buffer = buf - buffer;
for (pedge = pc->PeerHead; pedge; pedge = pedge->cnext)
{
if (pedge->me->numjob >= MAX_JOB_PER_SESSION)
continue;
pedge->me->last_transferblock = CurrentTime;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -