📄 livechannel.c
字号:
id = hash_str (p->channel_md5, MD5_LEN);
PDEBUG("newMPlistChannel hash %.32s(fname=%s) to %d.\n", p->channel_md5, p->fname, id);
p->next = ChannelHash[id];
ChannelHash[id] = p;
p->lnext = ChannelList;
ChannelList = p;
NumNewChannel ++;
return p;
}
struct Channel *add_mplist_channel (char *buffer, char *md5)
{
int i=0;
FILE *in;
struct stat stbuf;
struct Channel *pc=NULL, *pchannel[MAX_FILEINPUT];
char *data=NULL, cname[MAX_DATA], buf[MAX_DATA];
float bitrate=0.0;
int bsize=16384, dlen=0;
if (stat (buffer, &stbuf) < 0) return NULL;
if (S_ISREG (stbuf.st_mode))
{
if ((in=fopen (buffer, "r")) == NULL)
{
PDEBUG ("Error in open file %s.\n", buffer);
return NULL;
}
while (fgets (buf, MAX_DATA, in))
{
switch (i)
{
case 0:
bitrate = atof (buf);
i++;
break;
case 1:
if (buf[strlen(buf)-1] == '\r' || buf[strlen(buf)-1] == '\n')
buf[strlen(buf)-1] = 0;
strncpy (cname, buf, sizeof(cname));
i++;
break;
default:
buf[MD5_LEN] = 0;
if ((pc = getProgrambymd5 (buf, MD5_LEN)) != NULL)
{
pchannel[i-2] = pc;
i++;
}
break;
}
}
fclose (in);
if (i <= 2) return NULL;
dlen = pchannel[0]->pcinfo->media[0].dlen;
data = pchannel[0]->pcinfo->media[0].data;
// snprintf (chname, MAX_DATA, "%s/%s/%s.mediadata", WWW_ROOT, NET_NAME[0], pchannel[0]->channel_name);
// data = read_file (chname, &dlen);
if ((pc=newMPListChannel (cname, md5, bitrate, bsize, i-2, pchannel)) == NULL)
{
PDEBUG ("Error in newMPListChannel %s.\n", md5);
// free (data);
return NULL;
}
if (pc != NULL)
strncpy (pc->fname, buffer, CHNLURL_LEN);
#ifdef TEST
for (i=0; i<MAX_TS; i++) {
if (dlen <= 0 || data == NULL || buildGTV (pc, dlen, data, i) < 0)
continue;
}
#endif
// free (data);
}
return pc;
}
void apply_update (struct Channel *p, void *arg)
{
struct stat stbuf;
// char buffer[MAX_DATA];
struct LiveChannelInfo *pc = p->pcinfo;
if (pc && pc->mlist != NULL)
{
if (stat (p->fname, &stbuf) != 0)
pc->status = 1;
}
}
int check_newplist ()
{
int i;
struct Channel *pc;
DIR *entry;
struct dirent *pd;
char buffer[MAX_DATA];
char tmp0[MD5_LEN+1], tmp[MD5_LEN+1];
// if (Changed == 0) return 0;
snprintf (buffer, MAX_DATA, "%s/%s/", PREFIX, PLIST_PREFIX);
if ((entry = opendir (buffer)) == NULL)
{
return -1;
}
while ((pd = readdir (entry)) != NULL)
{
if (strcmp (pd->d_name, ".") == 0 || strcmp (pd->d_name, "..") == 0 || strlen (pd->d_name) != MD5_LEN)
continue;
snprintf (buffer, MAX_DATA, "%s_%s", defaultspip, pd->d_name);
md5_calc ((unsigned char *) tmp0,
(unsigned char *) buffer, strlen (buffer));
for (i = 0; i < MD5_LEN; i += 2)
sprintf (tmp + i, "%02x", (unsigned char) tmp0[i / 2]);
tmp[MD5_LEN] = 0;
if ((pc = findChannel (tmp, strlen (tmp))) != NULL)
{
if (pc->pcinfo != NULL) pc->pcinfo->status = 0;
continue;
}
snprintf (buffer, MAX_DATA, "%s/%s/%s", PREFIX, PLIST_PREFIX, pd->d_name);
add_mplist_channel (buffer, tmp);
}
closedir (entry);
apply_list (ChannelList, apply_update, NULL);
Changed = 0;
return 0;
}
int locate_mplist_by_id (struct Channel *pc, unsigned int id, char *buf, int max)
{
int result;
int i, j;
struct LiveChannelInfo *c = pc->pcinfo;
if (c == NULL)
{
PDEBUG ("c is null.\n");
return -1;
}
i = id % c->maxID;
for (j=0; j<pc->pcinfo->mlist->m_totalchannel; j++)
if (pc->pcinfo->mlist->m_startID[j] > i) break;
j--;
if (j < 0)
{
PDEBUG ("Internal error in locate id %d.\n", id);
return -1;
}
if ((result = locateprog_by_id (pc->pcinfo->mlist->m_lists[j], i-pc->pcinfo->mlist->m_startID[j], buf, max)) > 0)
((int *)buf)[0] = id;
return result;
}
// Return 1 to indicate write available, return 0 to indicate now writable.
inline int newChannelFile (struct Channel *p)
{
int result;
struct stat stbuf;
char buffer[MAX_LINE];
struct LiveChannelInfo *pcinfo = p->pcinfo;
if (pcinfo->numinput >= MAX_FILEINPUT)
{
PDEBUG ("Max file input has been reached, %s:%d.\n", p->fname,pcinfo->numinput);
return -1;
}
snprintf (buffer, MAX_LINE, "%s/%d", p->fname, pcinfo->numinput);
if (stat (buffer, &stbuf) == 0)
{
if (stbuf.st_size / (p->maxblocksize+2*sizeof (int)) < pcinfo->max_queue)
{
if ((pcinfo->input[pcinfo->numinput] = fopen (buffer, "a+")) == NULL)
{
PDEBUG ("Cannot open file %s.\n", buffer);
return -1;
}
result = 1;
} else
{
if ((pcinfo->input[pcinfo->numinput] = fopen (buffer, "r+")) == NULL)
{
PDEBUG ("Cannot open file %s.\n", buffer);
return -1;
}
result = 0;
}
pcinfo->numblocks = stbuf.st_size /(p->maxblocksize+2*sizeof (int));
pcinfo->maxID += stbuf.st_size / (p->maxblocksize+2*sizeof(int));
if (stbuf.st_size % (p->maxblocksize + 2*sizeof(int))) return -1;
} else
{
if ((pcinfo->input[pcinfo->numinput] = fopen (buffer, "w+")) == NULL)
{
PDEBUG ("Cannot open file %s.\n", buffer);
return -1;
}
pcinfo->numblocks = 0;
result = 1;
}
pcinfo->numinput ++;
return result;
}
int init_livechannel (struct Channel *p)
{
int i;
struct stat stbuf;
char buffer[MAX_LINE];
struct LiveChannelInfo *pcinfo = p->pcinfo;
if (stat (p->fname, &stbuf) == 0 && (!S_ISDIR (stbuf.st_mode)))
{
PDEBUG ("directory %s not exist or not a dir.\n", p->fname);
return -1;
}
mkdir (p->fname, 0777);
if (p->maxblocksize == 0) p->maxblocksize = DEFAULT_BLOCK;
pcinfo->max_queue = BLOCK_PER_FILE;
while ((i = newChannelFile (p)) == 0);
if (i < 0) return -1;
p->db = pcinfo->input[pcinfo->numinput-1];
snprintf (buffer, MAX_LINE, "%s/keysample", p->fname);
if (stat (buffer, &stbuf) == 0)
{
if (!S_ISREG (stbuf.st_mode))
{
PDEBUG ("File %s exist and not a regular file", buffer);
return -1;
}
}
pcinfo->keyfile = fopen (buffer, "a+");
if (pcinfo->keyfile == NULL)
{
PDEBUG ("File %s can not be opened.\n", buffer);
return -1;
}
pcinfo->total = 0;
return 0;
}
int locate_by_id (struct Channel *pc, unsigned int id, char *buf, int max)
{
int i, pos, *msg;
struct LiveChannelInfo *c = pc->pcinfo;
if (c == NULL || id > c->maxID)
{
PDEBUG ("c is %p and id is (%d,%d).\n", c, c->maxID, id);
return -1;
}
if (pc->maxblocksize + 2*sizeof(int) > max)
{
PDEBUG ("too small buffer %d for %d", max, pc->maxblocksize);
return -2;
}
i = (id - c->startid) % c->max_queue;
pos = (id - c->startid) / c->max_queue;
if (pos >= c->numinput || c->input[pos] == NULL)
{
PDEBUG ("file %d does not exist. (%d,%p)\n", pos, c->numinput, c->input[pos]);
return -1;
}
if (fseeko (c->input[pos], ((off_t)(i)) * (pc->maxblocksize + 2*sizeof (int)), SEEK_SET) != 0)
{
PDEBUG ("Fssek failed. (%d, %d, %d)\n", pos, i, pc->maxblocksize);
return -1;
}
if ((i=fread (buf, 1, pc->maxblocksize+2*sizeof (int), c->input[pos])) <= 2*sizeof (int) || i < ((int *)buf)[1]+2*sizeof(int))
{
PDEBUG ("Fread failed. (%d, %d, %d)\n", pos, i, pc->maxblocksize);
return -1;
}
msg = (int *)buf;
if (msg[1] > pc->maxblocksize || msg[1] <= 0)
{
PDEBUG ("msg format error. (%d, %d, %d)\n", msg[0], msg[1], pc->maxblocksize);
return -1;
}
PINFO ("Found block. (%d, %d, %d, %d)\n", id, msg[0], msg[1], pc->maxblocksize);
// msg[0] = id;
pc->upsize += msg[1];
return msg[1];
}
int saveBlock (struct Channel *c, char *buf, struct Session *p)
{
struct logrec lrec;
int id, size;
int j=0;
unsigned long long keysample;
struct SPUpdate *s;
struct LiveChannelInfo *pcinfo;
if ((!c) || (pcinfo = c->pcinfo) == NULL)
{
PDEBUG ("saveBlock c is null.\n");
return -1;
}
assert (buf);
size = ((int *)buf)[1];
if (size > MAX_BLOCK_SIZE || size <= MIN_BLOCK_SIZE)
{
PDEBUG ("saveBlock size is %d and id is %d.\n", size, id);
return 0;
}
if (c->maxblocksize == 0)
{
c->maxblocksize = size;
pcinfo->max_queue = BLOCK_PER_FILE;
}
else if (size > c->maxblocksize)
{
PDEBUG ("saveBlock maxblocksize is %d, size is %d and id is %d.\n", c->maxblocksize, size, id);
return 0;
}
if (pcinfo->max_queue == 0)
{
PDEBUG ("c->max_queue is 0\n");
return -1;
}
// PDEBUG ("Recv %d(%d) Save2 (%d,%d,%d), dataSource %p and now is %p\n", ((int*)buf)[0], (int)size, pcinfo->numinput, pcinfo->numblocks, pcinfo->maxID, pcinfo->dataSource, p);
id = pcinfo->maxID;
((int *)buf)[0] = id;
if (fseeko (c->db, ((off_t)(pcinfo->numblocks)) * (c->maxblocksize + 2*sizeof (int)), SEEK_SET) != 0)
{
PDEBUG ("Error in fsseko.\n");
return -1;
}
if (fwrite (buf, c->maxblocksize+2*sizeof(int), 1, c->db) != 1)
{
PDEBUG ("fwrite error in saveBlock:%.32s:%s", c->channel_md5, c->fname);
return -1;
}
j = ((int *)buf)[2];
if (j <= 0 || j >= size-sizeof(keysample)) keysample = 0;
else keysample = *(unsigned long long *) (buf+2*sizeof(int)+j);
s = &(pcinfo->s);
if (keysample > 0)
{
if(s->maxKeySample < keysample)
{
//the increasement should not be larger than 1000 seconds!
if(s->maxKeySample == 0 || (keysample-s->maxKeySample)/10000000 < 1000)
{
s->maxKeySample = keysample;
} else
PDEBUG("Error Keysample at block %d,(%lld,%lld). \n", id, keysample, s->maxKeySample);
}
if (s->minKeySample == 0 || keysample < s->minKeySample)
{
s->minKeySample = keysample;
}
lrec.id = id;
lrec.keysample = keysample/10000000;
fwrite (&lrec, sizeof (lrec), 1, pcinfo->keyfile);
fflush (pcinfo->keyfile);
}
if (id < s->minBlockID || s->maxBlockID == 0)
{
s->minBlockID = id;
}
if (id > s->maxBlockID)
{
s->maxBlockID = id;
}
if (pcinfo->dataSource != p)
{
PDEBUG ("dataSource %p is not equal p %p.\n", pcinfo->dataSource, p);
pcinfo->dataSource = p;
}
fflush (c->db);
pcinfo->total ++;
pcinfo->maxID ++;
pcinfo->numblocks ++;
c->downsize += size;
if (pcinfo->numblocks >= pcinfo->max_queue)
{
while ((j = newChannelFile (c)) == 0);
if (j < 0) return -1;
c->db = pcinfo->input[pcinfo->numinput-1];
}
return size;
}
int free_livechannel (struct Channel *p)
{
char buffer[MAX_LINE];
struct LiveChannelInfo *pcinfo = p->pcinfo;
int i;
send_all_spupdate (p, NULL);
for (i=0; i<pcinfo->numinput; i++)
{
if (pcinfo->input[i]) fclose (pcinfo->input[i]);
if (!pcinfo->isSave)
{
snprintf (buffer, MAX_LINE, "%s/%d", p->fname, i);
unlink (buffer);
}
}
p->db = NULL;
if (pcinfo->keyfile != NULL) fclose (pcinfo->keyfile);
if (!pcinfo->isSave)
{
snprintf (buffer, MAX_LINE, "%s/keysample", p->fname);
unlink (buffer);
rmdir (p->fname);
}
if (pcinfo->dataSource != NULL)
{
pcinfo->dataSource->pc = NULL;
Clientclosure (pcinfo->dataSource - TRACKER[TYPE_CS].head, TYPE_CS);
pcinfo->dataSource = NULL;
}
if (pcinfo->mlist)
{
timer_remove (p, NULL);
for (i=0; i<pcinfo->max_channel; i++)
{
if (pcinfo->mlist->m_lists[i] != NULL)
{
pcinfo->mlist->m_lists[i]->ref --;
if (pcinfo->mlist->m_lists[i]->ref <= 0)
freeProgram (pcinfo->mlist->m_lists[i], NULL);
}
}
free (pcinfo->mlist);
}
freeMedia (p);
#ifdef TEST
for (i=0; i<MAX_TS; i++)
{
sprintf (buffer, "%s/%s/%s.gtv", WWW_ROOT, NET_NAME[i], p->channel_name);
remove (buffer);
}
#endif
return 0;
}
#endif
struct Channel *newLiveChannel (char *name, struct Session *source, char *cmd5, float bitrate, int maxblocksize)
{
int id;
struct Channel *p;
struct LiveChannelInfo *pcinfo;
if (NumNewChannel >= MAX_CHANNEL) return (struct Channel *)0;
p = (struct Channel *)calloc (sizeof (struct Channel), 1);
memcpy (p->channel_md5, cmd5, MD5_LEN);
if (name) strncpy (p->channel_name, name, sizeof (p->channel_name));
p->channel_md5[MD5_LEN] = 0;
p->upsize = 0;
p->downsize = 0;
p->maxblocksize = maxblocksize;
p->ctime = time (NULL);
p->pcinfo = (struct LiveChannelInfo *)calloc (sizeof (struct LiveChannelInfo), 1);
pcinfo = p->pcinfo;
pcinfo->dataSource = source;
pcinfo->bitrate = bitrate;
buildLivePath (p->fname, CHNLURL_LEN, cmd5);
if (init_livechannel (p) < 0)
{
PDEBUG ("newLiveChannel error for %p.", p);
free_livechannel (p);
free (pcinfo);
free (p);
return (struct Channel *)0;
}
id = hash_str (p->channel_md5, MD5_LEN);
PDEBUG("newLiveChannel hash %.32s(fname=%s) to %d.\n", p->channel_md5, p->fname, id);
p->next = ChannelHash[id];
ChannelHash[id] = p;
p->lnext = ChannelList;
ChannelList = p;
NumNewChannel ++;
return p;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -