📄 livechannel.c
字号:
/*
* Openmysee
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
*/
#include "echo.h"
int NumNewChannel;
struct Channel *ChannelHash[MAX_CHANNEL];
struct Channel *ChannelList;
extern char *PREFIX;
extern struct ServerDesc TRACKER[MAX_TYPE];
extern int Clientclosure (int listnum, int type);
inline int hash_str (unsigned char *str, int len)
{
int hash;
for (hash=0; len; len--, str++)
hash += (hash << 5) - hash + (*str);
return hash & (MAX_CHANNEL - 1);
}
struct Channel *getChannel (struct Channel **phash, char *name, int len)
{
int id = hash_str (name, len);
struct Channel *p;
for (p=phash[id]; p; p=p->next)
{
if (strncmp (name, p->channel_md5, len) == 0)
return p;
}
PDEBUG("Cannot findChannel hash %.32s.\n", name);
return NULL;
}
void apply_hash (struct Channel **phash, void apply(struct Channel *, void *), void *p)
{
int i;
struct Channel *pc, *nextpc;
for (i=0; i<MAX_CHANNEL; i++)
{
for (pc=phash[i]; pc; pc=nextpc)
{
nextpc = pc->next;
apply (pc, p);
}
}
}
void apply_list (struct Channel *plist, void apply(struct Channel *, void *), void *p)
{
struct Channel *pc, *nextpc;
for (pc=plist; pc; pc=nextpc)
{
nextpc = pc->lnext;
apply (pc, p);
}
}
int freeChannel (struct Channel **phash, struct Channel **plist, int *count, struct Channel *p)
{
int id = hash_str (p->channel_md5, MD5_LEN);
struct Edge *nextedge, *pedge;
struct Channel *pchannel;
if (phash[id] == p)
{
phash[id] = p->next;
} else
{
for (pchannel=phash[id]; pchannel; pchannel=pchannel->next)
{
if (pchannel->next == p)
{
pchannel->next = p->next;
break;
}
}
if (!pchannel) return -1;
}
if ((*plist) == p)
{
*plist = p->lnext;
} else
{
for (pchannel=*plist; pchannel; pchannel=pchannel->lnext)
{
if (pchannel->lnext == p)
{
pchannel->lnext = p->lnext;
break;
}
}
if (!pchannel) return -1;
}
(*count) --;
#ifdef __CP_SOURCE
if (p->db != NULL) fclose (p->db);
#endif
if (p->pcinfo)
{
free_livechannel (p);
free (p->pcinfo);
}
for (pedge=p->PeerHead; pedge; pedge=nextedge)
{
p->numclient --;
nextedge = pedge->cnext;
delEdge (pedge);
}
free (p);
return 0;
}
inline struct Channel *findChannel (char *name, int len)
{
return getChannel (ChannelHash, name, len);
}
void freeLiveChannel (struct Channel *pc, void *p)
{
freeChannel (ChannelHash, &ChannelList, &NumNewChannel, pc);
}
void freeAllChannel ()
{
apply_hash (ChannelHash, freeLiveChannel, NULL);
}
inline void buildLivePath (char *buf, int len, char *md5)
{
snprintf (buf, len, "%s/%s%.2s/", PREFIX, LIVE_PREFIX, md5);
mkdir (buf, 0777);
strcat (buf, md5);
}
#ifdef __CP_SOURCE
int locate_by_id (struct Channel *pc, unsigned int id, char *buf, int max)
{
int i, *msg;
struct LiveChannelInfo *c = pc->pcinfo;
if (c == NULL)
return -1;
if (c->indisk == NULL)
{
c->max_queue = MAX_QUEUE;
c->indisk = calloc (c->max_queue, 1);
c->bitflag = calloc ((c->max_queue+7)/8, 1);
// if (c->indisk == NULL || c->bitflag == NULL)
PDEBUG ("allocate memory,%p,%p\n",c->indisk, c->bitflag);
return -1;
}
i = id % c->max_queue;
if (c->indisk[i] == 0 || (pc->type != T_PLIST && c->indisk[i] != (id/c->max_queue + 1)))
{
PINFO ("empty flag %d.\n", c->indisk[i]);
return -1;
}
if (pc->maxblocksize + 2*sizeof(int) > max)
{
PDEBUG ("too small buffer %d for %d", max, pc->maxblocksize);
return -2;
}
if (fseeko (pc->db, ((off_t)(i)) * (pc->maxblocksize + 2*sizeof (int)), SEEK_SET) != 0)
{
PDEBUG ("fseek failed.\n");
return -1;
}
if ((i=fread (buf, 1, pc->maxblocksize+2*sizeof (int), pc->db)) <= 2*sizeof (int) || i < ((int *)buf)[1]+2*sizeof(int))
{
PDEBUG ("Only %d read [%d]\n", i, ((int *)buf)[1]);
return -1;
}
msg = (int *)buf;
if (pc->type == T_PLIST)
msg[0] = id;
if (msg[0] != id || msg[1] > pc->maxblocksize || msg[1] <= 0)
{
PDEBUG ("Message read is [%d %d]\n", msg[0], msg[1]);
return -1;
}
pc->upsize += msg[1];
return msg[1];
}
int saveBlock (struct Channel *c, char *buf, struct Session *p)
{
unsigned int pos, id, size;
struct LiveChannelInfo *pcinfo;
assert (buf);
if ((!c) || (pcinfo = c->pcinfo) == NULL)
{
PDEBUG ("saveBlock c is null.\n");
return -1;
}
if (pcinfo->indisk == NULL)
{
pcinfo->max_queue = MAX_QUEUE;
pcinfo->indisk = calloc (pcinfo->max_queue, 1);
pcinfo->bitflag = calloc ((pcinfo->max_queue+7)/8, 1);
if (pcinfo->indisk == NULL || pcinfo->bitflag == NULL)
return -1;
}
id = ((unsigned int *)buf)[0];
size = ((unsigned int *)buf)[1];
pos = id % pcinfo->max_queue;
if (id > 0) clrBit (pcinfo->bitflag, pos);
if (size > MAX_BLOCK_SIZE || size <= MIN_BLOCK_SIZE || id < 0)
{
PDEBUG ("saveBlock:size is %d and id is %d.\n", size, id);
return 0;
}
if (c->maxblocksize == 0)
c->maxblocksize = size;
else if (size > c->maxblocksize)
{
PDEBUG ("saveBlock:maxblocksize is %d, size is %d and id is %d.\n", c->maxblocksize, size, id);
return 0;
}
// PDEBUG ("Recv %d(%d) Save2 %d, dataSource %p and now is %p\n", (int)id, (int)size, (int)pos, pcinfo->dataSource, p);
if (fseeko (c->db, ((off_t)(pos)) * (c->maxblocksize + 2*sizeof (int)), SEEK_SET) != 0)
{
PDEBUG ("Error in fsseko.\n");
return -1;
}
if (fwrite (buf, size+2*sizeof(int), 1, c->db) != 1)
{
PDEBUG ("fwrite error in saveBlock:%.32s:%s", c->channel_md5, c->fname);
return -1;
}
if (pcinfo->dataSource != p)
{
PDEBUG ("dataSource %p is not equal p %p.\n", pcinfo->dataSource, p);
pcinfo->dataSource = p;
}
fflush (c->db);
pcinfo->total ++;
c->downsize += size;
if (id > pcinfo->maxID) pcinfo->maxID = id;
pcinfo->indisk[pos] = (id/pcinfo->max_queue) + 1;
if (pcinfo->indisk[pos] == 0)
PDEBUG("Too large id %ds", id);
return size;
}
int init_livechannel (struct Channel *p)
{
if ((p->db = fopen (p->fname, "w+")) == (FILE *)0)
return -1;
p->pcinfo->isSave = 0;
return 0;
}
int free_livechannel (struct Channel *p)
{
struct LiveChannelInfo *pcinfo = p->pcinfo;
free (pcinfo->indisk);
free (pcinfo->bitflag);
unlink (p->fname);
if (pcinfo->dataSource != NULL)
{
pcinfo->dataSource->pc = NULL;
PDEBUG ("close CS source for channel %.32s.\n", p->channel_md5);
Clientclosure (pcinfo->dataSource-TRACKER[TYPE_P2PC].head, TYPE_P2PC);
pcinfo->dataSource = NULL;
}
freeMedia (p);
return 0;
}
#endif
#ifdef __SP_SOURCE
extern time_t CurrentTime;
extern char *NET_NAME[];
extern char *WWW_ROOT;
extern char *defaultspip;
int Changed = 1;
extern int timer_add (unsigned int t, TimerFunc process, void *entity, void *data);
extern int writeMessage (struct Session *p, struct Channel *pc, char *ptr);
extern int buildGTV (struct Channel *pc, int datalen, char *data, int type);
extern inline void freeProgram (struct Channel *, void *);
extern void send_all_spupdate (struct Channel *pc, struct SPUpdate *s);
void hup_handler (int sig)
{
Changed ++;
}
inline void buildPListPath (char *buf, int len, char *md5)
{
snprintf (buf, len, "%s/%s/", PREFIX, PLIST_PREFIX);
strcat (buf, md5);
}
FILE * open_keyfile (struct Channel *p)
{
char buffer[MAX_DATA];
struct stat stbuf;
if (p == NULL) return NULL;
snprintf (buffer, MAX_LINE, "%s/keysample", p->fname);
if (stat (buffer, &stbuf) == 0)
{
if (!S_ISREG (stbuf.st_mode))
{
PDEBUG ("File %s exist and not a regular file", buffer);
return NULL;
}
}
return fopen (buffer, "r");
}
int send_mplist_spupdate (struct Channel *pc, void * data)
{
int iddiff, prev;
char *buf, buffer[MAX_DATA];
struct Edge *pedge;
struct LiveChannelInfo *pcinfo;
struct SPUpdate s;
struct logrec lrec;
time_t slot;
if (pc == NULL || pc->pcinfo == NULL || pc->pcinfo->mlist == NULL
|| pc->pcinfo->maxID == 0)
{
PDEBUG ("Wrong playlist data.\n");
return -1;
}
pcinfo = pc->pcinfo;
if (pcinfo->s.maxBlockID == 0)
{
fseek (pcinfo->keyfile, 0, SEEK_SET);
if (fread (&lrec, 1, sizeof(struct logrec), pcinfo->keyfile) != sizeof (struct logrec))
{
PDEBUG ("Cannot read keysample\n");
return -1;
}
pcinfo->s.minBlockID = (((CurrentTime - FIX_MAGIC ) * 16 + pcinfo->maxID - 1) / pcinfo->maxID) * pcinfo->maxID;
pcinfo->s.maxBlockID = lrec.id;
if (pcinfo->s.maxKeySample == 0)
pcinfo->s.maxKeySample = lrec.keysample;
if (pcinfo->s.minKeySample == 0)
pcinfo->s.minKeySample = CurrentTime;
// pcinfo->s.minKeySample = lrec.keysample;
}
s = pcinfo->s;
s.maxBlockID += s.minBlockID;
s.maxKeySample = ((long long)CurrentTime) * 10000000;
s.minKeySample = ((long long)(s.minKeySample)) * 10000000;
if (pcinfo->updated < CurrentTime && pc->numclient > 0)
{
pcinfo->updated = CurrentTime;
buf = buffer + sizeof (int);
*(unsigned char *) buf = P2P_SPUPDATE;
buf += sizeof (char);
memcpy (buf, pc->channel_md5, MD5_LEN);
buf += MD5_LEN;
memcpy (buf, &s, sizeof (struct SPUpdate));
buf += sizeof (struct SPUpdate);
*(int *) buffer = buf - buffer;
for (pedge = pc->PeerHead; pedge; pedge = pedge->cnext)
{
if (writeMessage (pedge->me, pc, buffer) < 0)
{
PDEBUG ("send SPUPDATE err.\n");
}
}
PINFO ("Send spupdate in %d, %d.\n", (int)(pcinfo->s.maxKeySample), (int)(pcinfo->s.minKeySample));
}
if (fread (&lrec, 1, sizeof (struct logrec), pcinfo->keyfile) != sizeof (struct logrec))
{
fclose (pcinfo->keyfile);
prev = pcinfo->mlist->m_cursampleid;
pcinfo->mlist->m_cursampleid ++;
if (pcinfo->mlist->m_cursampleid >= pcinfo->mlist->m_totalchannel)
pcinfo->mlist->m_cursampleid = 0;
if ((pcinfo->keyfile = open_keyfile (pcinfo->mlist->m_lists[pcinfo->mlist->m_cursampleid])) == NULL)
{
PDEBUG ("Error in new keysample file\n");
return -1;
}
fseek (pcinfo->keyfile, SEEK_SET, 0);
if (fread (&lrec, 1, sizeof (struct logrec), pcinfo->keyfile) != sizeof (struct logrec))
{
PDEBUG ("Cannot read keysample\n");
return -1;
}
iddiff = pcinfo->mlist->m_lists[prev]->pcinfo->maxID + lrec.id - pcinfo->mlist->m_lastmaxid;
slot = lrec.keysample;
pcinfo->s.maxKeySample = lrec.keysample;
} else
{
if ((slot = lrec.keysample - pcinfo->s.maxKeySample) < 0)
slot = 0;
iddiff = lrec.id - pcinfo->mlist->m_lastmaxid;
pcinfo->s.maxKeySample += slot;
}
pcinfo->mlist->m_lastmaxid = lrec.id;
pcinfo->s.maxBlockID += iddiff;
timer_add (CurrentTime+slot, (TimerFunc)send_mplist_spupdate, pc, NULL);
return 0;
}
int init_mplistchannel (struct Channel *p)
{
struct LiveChannelInfo *pcinfo = p->pcinfo;
if (p->maxblocksize == 0) p->maxblocksize = DEFAULT_BLOCK;
pcinfo->max_queue = BLOCK_PER_FILE;
if ((pcinfo->keyfile = open_keyfile (pcinfo->mlist->m_lists[0])) == NULL)
{
PDEBUG ("Error in new keysample file\n");
return -1;
}
pcinfo->total = 0;
pcinfo->isSave = 0;
timer_add (CurrentTime, (TimerFunc)send_mplist_spupdate, p, NULL);
return 0;
}
struct Channel *newMPListChannel (char *name, char *cmd5, float bitrate, int maxblocksize, int nchannel, struct Channel **pchannel)
{
int i, id, startID=0;
struct Channel *p;
struct LiveChannelInfo *pcinfo;
if (NumNewChannel >= MAX_CHANNEL) return (struct Channel *)0;
p = (struct Channel *)calloc (sizeof (struct Channel), 1);
memcpy (p->channel_md5, cmd5, MD5_LEN);
if (name) strncpy (p->channel_name, name, sizeof (p->channel_name));
p->channel_md5[MD5_LEN] = 0;
p->upsize = 0;
p->downsize = 0;
p->maxblocksize = maxblocksize;
p->ctime = time (NULL);
p->pcinfo = (struct LiveChannelInfo *)calloc (sizeof (struct LiveChannelInfo), 1);
pcinfo = p->pcinfo;
pcinfo->bitrate = bitrate;
pcinfo->mlist = (struct MList *)calloc (sizeof (struct MList), 1);
pcinfo->mlist->m_totalchannel = nchannel;
pcinfo->media = calloc (nchannel, sizeof (struct MediaData));
pcinfo->max_channel = nchannel;
for (i=0; i<nchannel; i++)
{
pchannel[i]->ref ++;
pcinfo->mlist->m_lists[i] = pchannel[i];
pcinfo->mlist->m_startID[i] = startID;
pcinfo->maxID += pchannel[i]->pcinfo->maxID;
addMedia (p, startID, pchannel[i]->pcinfo->maxID, pchannel[i]->pcinfo->media[0].dlen, pchannel[i]->pcinfo->media[0].data, pchannel[i]->channel_name);
startID += pchannel[i]->pcinfo->maxID;
}
if (init_mplistchannel (p) < 0)
{
PDEBUG ("newPlistChannel error for %p.", p);
free_livechannel (p);
free (pcinfo);
free (p);
return (struct Channel *)0;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -