⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 livechannel.c

📁 mysee网络直播源代码Mysee Lite是Mysee独立研发的网络视频流媒体播放系统。在应有了P2P技术和一系列先进流媒体技术之后
💻 C
📖 第 1 页 / 共 2 页
字号:
/*
 *  Openmysee
 *
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; either version 2 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program; if not, write to the Free Software
 *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 *
 */
				 
#include "echo.h"
int NumNewChannel;
struct Channel *ChannelHash[MAX_CHANNEL];
struct Channel *ChannelList;
extern char *PREFIX;
extern struct ServerDesc TRACKER[MAX_TYPE];
extern int Clientclosure (int listnum, int type);

inline int hash_str (unsigned char *str, int len)
{
	int hash;
	for (hash=0; len; len--, str++)
		hash += (hash << 5) - hash + (*str);
	return hash & (MAX_CHANNEL - 1);
}

struct Channel *getChannel (struct Channel **phash, char *name, int len)
{
	int id = hash_str (name, len);
	struct Channel *p;
	for (p=phash[id]; p; p=p->next)
	{
		if (strncmp (name, p->channel_md5, len) == 0)
			return p;
	}
	PDEBUG("Cannot findChannel hash %.32s.\n", name);
	return NULL;
}

void apply_hash (struct Channel **phash, void apply(struct Channel *, void *), void *p)
{
	int i;
	struct Channel *pc, *nextpc;
	for (i=0; i<MAX_CHANNEL; i++)
	{
		for (pc=phash[i]; pc; pc=nextpc)
		{
			nextpc = pc->next;
			apply (pc, p);
		}
	}
}

void apply_list (struct Channel *plist, void apply(struct Channel *, void *), void *p)
{
	struct Channel *pc, *nextpc;
	for (pc=plist; pc; pc=nextpc)
	{
		nextpc = pc->lnext;
		apply (pc, p);
	}
}

int freeChannel (struct Channel **phash, struct Channel **plist, int *count, struct Channel *p)
{
	int id = hash_str (p->channel_md5, MD5_LEN);
	struct Edge *nextedge, *pedge;
	struct Channel *pchannel;
	if (phash[id] == p)
	{
		phash[id] = p->next;
	} else
	{
		for (pchannel=phash[id]; pchannel; pchannel=pchannel->next)
		{
			if (pchannel->next == p)
			{
				pchannel->next = p->next;
				break;
			}
		}
		if (!pchannel) return -1;
	}
	if ((*plist) == p)
	{
		*plist = p->lnext;
	} else
	{
		for (pchannel=*plist; pchannel; pchannel=pchannel->lnext)
		{
			if (pchannel->lnext == p)
			{
				pchannel->lnext = p->lnext;
				break;
			}
		}
		if (!pchannel) return -1;
	}
	(*count) --;
#ifdef __CP_SOURCE
	if (p->db != NULL) fclose (p->db);
#endif
	if (p->pcinfo)
	{
		free_livechannel (p);
		free (p->pcinfo);
	}
	for (pedge=p->PeerHead; pedge; pedge=nextedge)
	{
		p->numclient --;
		nextedge = pedge->cnext;
		delEdge (pedge);
	}
	free (p);
	return 0;
}

inline struct Channel *findChannel (char *name, int len)
{
	return getChannel (ChannelHash, name, len);
}

void freeLiveChannel (struct Channel *pc, void *p)
{
	freeChannel (ChannelHash, &ChannelList, &NumNewChannel, pc);
}

void freeAllChannel ()
{
	apply_hash (ChannelHash, freeLiveChannel, NULL);
}

inline void buildLivePath (char *buf, int len, char *md5)
{
	snprintf (buf, len, "%s/%s%.2s/", PREFIX, LIVE_PREFIX, md5);
	mkdir (buf, 0777);
	strcat (buf, md5);
}


#ifdef __CP_SOURCE
int locate_by_id (struct Channel *pc, unsigned int id, char *buf, int max)
{
	int i, *msg;
	struct LiveChannelInfo *c = pc->pcinfo;
	if (c == NULL)
		return -1;
	if (c->indisk == NULL)
	{
		c->max_queue = MAX_QUEUE;
		c->indisk = calloc (c->max_queue, 1);
		c->bitflag = calloc ((c->max_queue+7)/8, 1);
//		if (c->indisk == NULL || c->bitflag == NULL)
		PDEBUG ("allocate memory,%p,%p\n",c->indisk, c->bitflag);
		return -1;
	}
	i = id % c->max_queue;
	if (c->indisk[i] == 0 || (pc->type != T_PLIST && c->indisk[i] != (id/c->max_queue + 1)))
	{
		PINFO ("empty flag %d.\n", c->indisk[i]);
		return -1;
	}

	if (pc->maxblocksize + 2*sizeof(int) > max)
	{
		PDEBUG ("too small buffer %d for %d", max, pc->maxblocksize);
		return -2;
	}

	if (fseeko (pc->db, ((off_t)(i)) * (pc->maxblocksize + 2*sizeof (int)), SEEK_SET) != 0)
	{
		PDEBUG ("fseek failed.\n");
		return -1;
	}
	if ((i=fread (buf, 1, pc->maxblocksize+2*sizeof (int), pc->db)) <= 2*sizeof (int) || i < ((int *)buf)[1]+2*sizeof(int))
	{
		PDEBUG ("Only %d read [%d]\n", i, ((int *)buf)[1]);
		return -1;
	}
	msg = (int *)buf;
	if (pc->type == T_PLIST)
		msg[0] = id;
	if (msg[0] != id || msg[1] > pc->maxblocksize || msg[1] <= 0)
	{
		PDEBUG ("Message read is [%d %d]\n", msg[0], msg[1]);
		return -1;
	}
	pc->upsize += msg[1];
	return msg[1];
}

int saveBlock (struct Channel *c, char *buf, struct Session *p)
{
	unsigned int pos, id, size;
	struct LiveChannelInfo *pcinfo;

	assert (buf);
	if ((!c) || (pcinfo = c->pcinfo) == NULL)
	{
		PDEBUG ("saveBlock c is null.\n");
		return -1;
	}
	if (pcinfo->indisk == NULL)
	{
		pcinfo->max_queue = MAX_QUEUE;
		pcinfo->indisk = calloc (pcinfo->max_queue, 1);
		pcinfo->bitflag = calloc ((pcinfo->max_queue+7)/8, 1);
		if (pcinfo->indisk == NULL || pcinfo->bitflag == NULL)
			return -1;
	}
	id = ((unsigned int *)buf)[0];
	size = ((unsigned int *)buf)[1];
	pos = id % pcinfo->max_queue;
	if (id > 0) clrBit (pcinfo->bitflag, pos);
	if (size > MAX_BLOCK_SIZE || size <= MIN_BLOCK_SIZE || id < 0)
	{
		PDEBUG ("saveBlock:size is %d and id is %d.\n", size, id);
		return 0;
	}

	if (c->maxblocksize == 0)
		c->maxblocksize = size;
	else if (size > c->maxblocksize)
	{
		PDEBUG ("saveBlock:maxblocksize is %d, size is %d and id is %d.\n", c->maxblocksize, size, id);
		return 0;
	}
//	PDEBUG ("Recv %d(%d) Save2 %d, dataSource %p and now is %p\n", (int)id, (int)size, (int)pos, pcinfo->dataSource, p);

	if (fseeko (c->db, ((off_t)(pos)) * (c->maxblocksize + 2*sizeof (int)), SEEK_SET) != 0)
	{
		PDEBUG ("Error in fsseko.\n");
		return -1;
	}

	if (fwrite (buf, size+2*sizeof(int), 1, c->db) != 1)
	{
		PDEBUG ("fwrite error in saveBlock:%.32s:%s", c->channel_md5, c->fname);
		return -1;
	}

	if (pcinfo->dataSource != p)
	{
		PDEBUG ("dataSource %p is not equal p %p.\n", pcinfo->dataSource, p);
		pcinfo->dataSource = p;
	}

	fflush (c->db);
	pcinfo->total ++;
	c->downsize += size;
	if (id > pcinfo->maxID) pcinfo->maxID = id;
	pcinfo->indisk[pos] = (id/pcinfo->max_queue) + 1;
	if (pcinfo->indisk[pos] == 0)
		PDEBUG("Too large id %ds", id);
	return size;
}

int init_livechannel (struct Channel *p)
{
	if ((p->db = fopen (p->fname, "w+")) == (FILE *)0)
		return -1;
	p->pcinfo->isSave = 0;
	return 0;
}

int free_livechannel (struct Channel *p)
{
	struct LiveChannelInfo *pcinfo = p->pcinfo;
	free (pcinfo->indisk);
	free (pcinfo->bitflag);
	unlink (p->fname);
	if (pcinfo->dataSource != NULL)
	{
		pcinfo->dataSource->pc = NULL;
		PDEBUG ("close CS source for channel %.32s.\n", p->channel_md5);
		Clientclosure (pcinfo->dataSource-TRACKER[TYPE_P2PC].head, TYPE_P2PC);
		pcinfo->dataSource = NULL;
	}
	freeMedia (p);
	return 0;
}
#endif

#ifdef __SP_SOURCE
extern time_t CurrentTime;
extern char *NET_NAME[];
extern char *WWW_ROOT;
extern char *defaultspip;
int Changed = 1;
extern int timer_add (unsigned int t, TimerFunc process, void *entity, void *data);
extern int writeMessage (struct Session *p, struct Channel *pc, char *ptr);
extern int buildGTV (struct Channel *pc, int datalen, char *data, int type);
extern inline void freeProgram (struct Channel *, void *);
extern void send_all_spupdate (struct Channel *pc, struct SPUpdate *s);

void hup_handler (int sig)
{
	Changed ++;
}
inline void buildPListPath (char *buf, int len, char *md5)
{
	snprintf (buf, len, "%s/%s/", PREFIX, PLIST_PREFIX);
	strcat (buf, md5);
}

FILE * open_keyfile (struct Channel *p)
{
	char buffer[MAX_DATA];
	struct stat stbuf;
	if (p == NULL) return NULL;
	snprintf (buffer, MAX_LINE, "%s/keysample", p->fname);
	if (stat (buffer, &stbuf) == 0)
	{
		if (!S_ISREG (stbuf.st_mode))
		{
			PDEBUG ("File %s exist and not a regular file", buffer);
			return NULL;
		}
	}
	return fopen (buffer, "r");
}

int send_mplist_spupdate (struct Channel *pc, void * data)
{
	int iddiff, prev;
	char *buf, buffer[MAX_DATA];
	struct Edge *pedge;
	struct LiveChannelInfo *pcinfo;
	struct SPUpdate s;
	struct logrec lrec;
	time_t slot;

	if (pc == NULL || pc->pcinfo == NULL || pc->pcinfo->mlist == NULL
			|| pc->pcinfo->maxID == 0)
	{
		PDEBUG ("Wrong playlist data.\n");
		return -1;
	}
	pcinfo = pc->pcinfo;
	if (pcinfo->s.maxBlockID == 0)
	{
		fseek (pcinfo->keyfile, 0, SEEK_SET);
		if (fread (&lrec, 1, sizeof(struct logrec), pcinfo->keyfile) != sizeof (struct logrec))
		{
			PDEBUG ("Cannot read keysample\n");
			return -1;
		}
		pcinfo->s.minBlockID = (((CurrentTime - FIX_MAGIC ) * 16 + pcinfo->maxID - 1) / pcinfo->maxID) * pcinfo->maxID;
		pcinfo->s.maxBlockID = lrec.id;
		if (pcinfo->s.maxKeySample == 0)
			pcinfo->s.maxKeySample = lrec.keysample;
		if (pcinfo->s.minKeySample == 0)
			pcinfo->s.minKeySample = CurrentTime;
//			pcinfo->s.minKeySample = lrec.keysample;
	}

	s = pcinfo->s;
	s.maxBlockID += s.minBlockID;
	s.maxKeySample = ((long long)CurrentTime) * 10000000;
	s.minKeySample = ((long long)(s.minKeySample)) * 10000000;

	if (pcinfo->updated < CurrentTime && pc->numclient > 0)
	{
		pcinfo->updated = CurrentTime;
		buf = buffer + sizeof (int);
		*(unsigned char *) buf = P2P_SPUPDATE;
		buf += sizeof (char);
		memcpy (buf, pc->channel_md5, MD5_LEN);
		buf += MD5_LEN;
		memcpy (buf, &s, sizeof (struct SPUpdate));
		buf += sizeof (struct SPUpdate);
		*(int *) buffer = buf - buffer;
		for (pedge = pc->PeerHead; pedge; pedge = pedge->cnext)
		{
			if (writeMessage (pedge->me, pc, buffer) < 0)
			{
				PDEBUG ("send SPUPDATE err.\n");
			}
		}
		PINFO ("Send spupdate in %d, %d.\n", (int)(pcinfo->s.maxKeySample), (int)(pcinfo->s.minKeySample));
	}

	if (fread (&lrec, 1, sizeof (struct logrec), pcinfo->keyfile) != sizeof (struct logrec))
	{
		fclose (pcinfo->keyfile);
		prev = pcinfo->mlist->m_cursampleid;
		pcinfo->mlist->m_cursampleid ++;
		if (pcinfo->mlist->m_cursampleid >= pcinfo->mlist->m_totalchannel)
			pcinfo->mlist->m_cursampleid = 0;
		if ((pcinfo->keyfile = open_keyfile (pcinfo->mlist->m_lists[pcinfo->mlist->m_cursampleid])) == NULL)
		{
			PDEBUG ("Error in new keysample file\n");
			return -1;
		}
		fseek (pcinfo->keyfile, SEEK_SET, 0);
		if (fread (&lrec, 1, sizeof (struct logrec), pcinfo->keyfile) != sizeof (struct logrec))
		{
			PDEBUG ("Cannot read keysample\n");
			return -1;
		}
		iddiff = pcinfo->mlist->m_lists[prev]->pcinfo->maxID + lrec.id - pcinfo->mlist->m_lastmaxid;
		slot = lrec.keysample;
		pcinfo->s.maxKeySample = lrec.keysample;
	} else
	{
		if ((slot = lrec.keysample - pcinfo->s.maxKeySample) < 0)
			slot = 0;
		iddiff = lrec.id - pcinfo->mlist->m_lastmaxid;
		pcinfo->s.maxKeySample += slot;
	}
	pcinfo->mlist->m_lastmaxid = lrec.id;
	pcinfo->s.maxBlockID += iddiff;
	timer_add (CurrentTime+slot, (TimerFunc)send_mplist_spupdate, pc, NULL);
	return 0;
}

int init_mplistchannel (struct Channel *p)
{
	struct LiveChannelInfo *pcinfo = p->pcinfo;
	if (p->maxblocksize == 0) p->maxblocksize = DEFAULT_BLOCK;
	pcinfo->max_queue = BLOCK_PER_FILE;
	if ((pcinfo->keyfile = open_keyfile (pcinfo->mlist->m_lists[0])) == NULL)
	{
		PDEBUG ("Error in new keysample file\n");
		return -1;
	}
	pcinfo->total = 0;
	pcinfo->isSave = 0;
	timer_add (CurrentTime, (TimerFunc)send_mplist_spupdate, p, NULL);
	return 0;
}


struct Channel *newMPListChannel (char *name, char *cmd5, float bitrate, int maxblocksize, int nchannel, struct Channel **pchannel)
{
	int i, id, startID=0;
	struct Channel *p;
	struct LiveChannelInfo *pcinfo;
	if (NumNewChannel >= MAX_CHANNEL) return (struct Channel *)0;
	p = (struct Channel *)calloc (sizeof (struct Channel), 1);
	memcpy (p->channel_md5, cmd5, MD5_LEN);
	if (name) strncpy (p->channel_name, name, sizeof (p->channel_name));
	p->channel_md5[MD5_LEN] = 0;
	p->upsize = 0;
	p->downsize = 0;
	p->maxblocksize = maxblocksize;
	p->ctime = time (NULL);

	p->pcinfo = (struct LiveChannelInfo *)calloc (sizeof (struct LiveChannelInfo), 1);
	pcinfo = p->pcinfo;
	pcinfo->bitrate = bitrate;
	pcinfo->mlist = (struct MList *)calloc (sizeof (struct MList), 1);
	pcinfo->mlist->m_totalchannel = nchannel;
	pcinfo->media = calloc (nchannel, sizeof (struct MediaData));

	pcinfo->max_channel = nchannel;
	for (i=0; i<nchannel; i++)
	{
		pchannel[i]->ref ++;
		pcinfo->mlist->m_lists[i] = pchannel[i];
		pcinfo->mlist->m_startID[i] = startID;
		pcinfo->maxID += pchannel[i]->pcinfo->maxID;
		addMedia (p, startID, pchannel[i]->pcinfo->maxID, pchannel[i]->pcinfo->media[0].dlen, pchannel[i]->pcinfo->media[0].data, pchannel[i]->channel_name);
		startID += pchannel[i]->pcinfo->maxID;
	}
	if (init_mplistchannel (p) < 0)
	{
		PDEBUG ("newPlistChannel error for %p.", p);
		free_livechannel (p);
		free (pcinfo);
		free (p);
		return (struct Channel *)0;
	}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -