⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sessions.c

📁 mysee网络直播源代码Mysee Lite是Mysee独立研发的网络视频流媒体播放系统。在应有了P2P技术和一系列先进流媒体技术之后
💻 C
字号:
/*
 *  Openmysee
 *
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; either version 2 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program; if not, write to the Free Software
 *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 *
 */
				 
#define FENCE_DATA		1024
int IN_LOOP = 1;
struct cachetype *BufferCacheHead;
inline char *NEW ()
{
	char *result;
	if (BufferCacheHead != NULL)
	{
		result = (char *) BufferCacheHead;
		BufferCacheHead = BufferCacheHead->next;
	} else
	{
		result = calloc (1, MAX_DATA+FENCE_DATA);
	}
	assert (result != NULL);
	return result;
}

inline void FREE (void *p)
{
	if (p == NULL) return;
	((struct cachetype *)p)->next = BufferCacheHead;
	BufferCacheHead = p;
}

inline void closure_cache ()
{
	struct cachetype *p, *nextp;
	for (p=BufferCacheHead; p; p=nextp)
	{
		nextp = p->next;
		free (p);
	}
	BufferCacheHead = NULL;
}

void terminate (int sig)
{
	IN_LOOP --;
}

int handle_new_connection(int sock, int type)
{
	struct sockaddr_in addr;
	socklen_t addrlen = sizeof (struct sockaddr_in);
	int listnum, max, flags;
	struct Session *head;
#ifdef SO_LINGER
	struct linger ling;
#endif
	int newconn = accept(sock, (struct sockaddr *)(&addr), &addrlen);
	int keepalive = 1;

	if (newconn < 0)
	{
		perror("accept");
		return -1;
	}
#ifdef SO_LINGER
	ling.l_onoff = 1;
	ling.l_linger = 0;
	setsockopt (newconn, SOL_SOCKET, SO_LINGER, &ling, sizeof (struct linger));
#endif
	setsockopt (newconn, SOL_SOCKET, SO_KEEPALIVE, &keepalive, sizeof (keepalive));
	
	flags = fcntl(newconn, F_GETFL, 0);
	flags |= O_NONBLOCK;
	fcntl(newconn, F_SETFL, flags);

	if (TRACKER[type].cur == TRACKER[type].max)
	{
		char *buf, buffer[MAX_DATA];
		buf = buffer+sizeof (int);
		*(unsigned char *)buf = P2P_MSG;
		buf += sizeof (char);
		*(unsigned short *)buf = ERR_CONNECTION_FULL;
		buf += sizeof (short);
		*(unsigned char *)buf = 1;
		buf += sizeof (char);
		*(unsigned int *)buffer = buf - buffer;
		write (newconn, buffer, *(unsigned int *)buffer);
		close (newconn);
		return -1;
	}

	head = TRACKER[type].head;
	max = TRACKER[type].max;
	for (listnum = 0; listnum < max; listnum ++)
	{
		if(head[listnum].socket == 0)
		{
			head[listnum].socket = newconn;
			head[listnum].type = type;
			getpeername (newconn, (struct sockaddr *)(&addr), &addrlen);
			head[listnum].host = ntohl(addr.sin_addr.s_addr);
			head[listnum].port = ntohs(addr.sin_port);
			head[listnum].buf = NEW ();
#ifdef __CP_SOURCE
			head[listnum].totalup = 0;
#endif
			head[listnum].time_sec = CurrentTime;
			head[listnum].last_transferblock = CurrentTime;
			FD_SET(newconn, &osocks);
			if (listnum > TRACKER[type].maxid)
				TRACKER[type].maxid = listnum;
			break;
		}
	}
	if (listnum >= max)
	{
		PDEBUG ("no space left for incoming client type %d.", type);
		close (newconn);
		return -1;
	}
	TRACKER[type].cur ++;
	return (*(TRACKER[type].init)) (listnum);
}

int Clientclosure (int listnum, int type)
{
	(*(TRACKER[type].closure)) (listnum);
	while (TRACKER[type].maxid == listnum && TRACKER[type].head[listnum].socket == 0 && listnum > 0)
	{
		listnum --;
		TRACKER[type].maxid --;
	}
	return (--TRACKER[type].cur);
}

void my_exit() __attribute__((noreturn, destructor));
#ifdef __SP_SOURCE
extern struct Channel *ProgramHash[MAX_CHANNEL];
#endif
extern void freeJobCache ();

void my_exit (int err)
{
	int max, listnum, type;
	struct Session *head;
	int (*closure) (int);
	for (type=MAX_TYPE-1; type >=0; type --)
	{
		max = TRACKER[type].maxid+1;
		closure = TRACKER[type].closure;
		if ((head = TRACKER[type].head) != NULL)
		{
			for (listnum=0; listnum<max; listnum++)
			{
				if (head[listnum].socket > 0)
					(*closure) (listnum);
			}
			free (head);
		}
#ifdef __CP_SOURCE
		if (TRACKER[type].flag == FLAG_SERVER) close (TRACKER[type].sock);
#endif
#ifdef __SP_SOURCE
		close (TRACKER[type].sock);
#endif
	}
#ifdef __CP_SOURCE
#ifdef HAVE_TS
	closure_TS ();
	close (TSSOCK);
#endif
#endif
	PDEBUG ("exit...\n");
	freeAllChannel ();
#ifdef __SP_SOURCE
	db_end ();
	timer_free ();
	freeAllProgram ();
	freeAllOrder ();
#endif
	free_config (ConfigParameters, sizeof(ConfigParameters)/sizeof(struct NamVal));
	closure_cache ();
	freeJobCache ();
	unlink (PIDFile);
	CLOSELOG;
	exit (err);
}
/* check the message in p->buf, wheter is complete */
inline int checkComplete (struct Session *p)
{
	int len;
	if (p->socket == 0 || p->off < sizeof (int)+sizeof(char)) return 0;
	len = *(unsigned int *)(p->buf+p->start);
	if (len >= MAX_BLOCK_SIZE) return -1;
	return (p->off >= len?len:0);
}

inline void my_memmove (char *dst, char *src, int len)
{
	int i;
	if (len <= 0 || dst == src) return;
	for (i=0; i<len; i++)
		*dst++ = *src++;
}
/* after process a message, update the buf position in p */
inline int updateBuf (struct Session *p, int len)
{
	if (p->socket == 0) return 0;
	p->start += len;
	p->off -= len;
	return 0;
}


void process_type (int type, fd_set *socks, fd_set *wsocks, fd_set *esocks)
{
	struct ServerDesc *ser = &(TRACKER[type]);
	struct Session *p;
	int (*process)(int listnum);
	int listnum, max, this_read, this_write;

	max = ser->maxid + 1;
	process = ser->process;	//the pointer of function process_P2PC P2PS
	for (listnum = 0; listnum < max; listnum++)
	{
		p = &(ser->head[listnum]);
#ifdef __CP_SOURCE
		if (p->flag > 0 && (CurrentTime - p->time_sec) > MAX_CONN)
		{
			if (reconnect (p) < 0)
			{
				PDEBUG("Reconnect Error.\n");
				Clientclosure (listnum, type);
			}
			continue;
		}
#endif
		if (p->socket != 0 && FD_ISSET (p->socket, wsocks))
		{
#ifdef __CP_SOURCE
			struct sockaddr_in addr;
			socklen_t addrlen = sizeof (struct sockaddr_in);
			if (p->flag != 0)
			{
				p->flag = 0;
				getpeername (p->socket, (struct sockaddr *)(&addr), &addrlen);
				p->port = ntohs(addr.sin_port);
				/* restore file status flags */
				fcntl(p->socket, F_SETFL, p->sock_flag);
				
			}
#endif
			if ((this_write = processJobs (p)) < 0)
			{
				perror ("PP: Write");
				Clientclosure (listnum, type);
			} else
			{
//				PDEBUG ("Send %d to %d\n", this_write, listnum);
				tmpUpBytes += this_write;
#ifdef __CP_SOURCE
				p->totalup += this_write;
#endif
			}
//			continue;
		}
		if (p->socket != 0 && FD_ISSET (p->socket, socks))
		{
			if (p->start + p->off  >= MAX_DATA-1)
			{
				my_memmove (p->buf, p->buf+p->start, p->off);
				p->start = 0;
			} else if (p->off == 0)
				p->start = 0;
			if ((this_read = read(p->socket, p->buf+p->start+p->off, MAX_DATA -p->start- p->off)) <= 0)
			{
				perror("PP: read err");
				PDEBUG("socket: %d, p->start: %d. p->off: %d. \n", p->socket, p->start, p->off);
				Clientclosure (listnum, type);
			} else
			{
				p->off += this_read;
			}
		}
		while (p->socket != 0 && (this_read = checkComplete (p)) > 0)
		{
			if ((*process) (listnum) == -2) break;
			updateBuf (p, this_read);
		}
		if (p->socket != 0)
		{
			if (this_read < 0 || this_read > MAX_DATA)
			{
				PDEBUG ("Too long message, cut off, length is %d, and p->off is %d, p->start is %d.\n", this_read, p->off, p->start);
				Clientclosure (listnum, type);
			}
		}
	}
}


void process_child (void)
{
	int readsocks;	//?
	struct Session *head;
	int highsock;
	fd_set socks, wsocks, esocks;
	int type, listnum, max;
	struct timeval tm;
#ifdef __CP_SOURCE
	time_t last_handle_conn = 0;
#endif

	startTime = time(NULL);

	while (IN_LOOP > 0)
	{
		FD_ZERO(&socks);
		FD_ZERO(&esocks);
		FD_ZERO(&wsocks);
#ifdef __CP_SOURCE
#ifdef HAVE_TS
		highsock = TSSOCK;
		FD_SET(TSSOCK, &socks);
#else
		highsock = 0;
#endif
#endif
		CurrentTime = time (NULL);
#ifdef __SP_SOURCE
		highsock = 0;
		for(type=0; type<MAX_TS; type++)
		{
			if (tsSock[type] <= 0) continue;
			if (tsSock[type] > highsock)
				highsock = tsSock[type];
			FD_SET(tsSock[type], &socks);
		}
#endif
		for (type=0; type<MAX_TYPE; type++)//type is P2PC and P2PS 
		{
#ifdef __CP_SOURCE
			if (TRACKER[type].flag == FLAG_SERVER && highsock < TRACKER[type].sock)
				highsock = TRACKER[type].sock;
			if(CurrentTime - last_handle_conn >= 1) 
			{
				last_handle_conn = CurrentTime;
			}
			if (TRACKER[type].flag == FLAG_SERVER) FD_SET(TRACKER[type].sock, &socks);
#endif
#ifdef __SP_SOURCE
			if (highsock < TRACKER[type].sock)
				highsock = TRACKER[type].sock;
			FD_SET(TRACKER[type].sock, &socks);
#endif


			max = TRACKER[type].maxid + 1;
			head = TRACKER[type].head;
			for (listnum = 0; listnum < max; listnum++)
			{
				if (head[listnum].socket != 0)
				{
					if (head[listnum].head)
						FD_SET(head[listnum].socket, &wsocks);
					else
						FD_SET(head[listnum].socket, &socks);
					if (head[listnum].socket > highsock)
						highsock = head[listnum].socket;
				}
			}
		}

		tm.tv_sec = 0;
		tm.tv_usec = 10000;
		readsocks = select(highsock+1, &socks, &wsocks, &esocks, &tm);
		if (readsocks <= 0)
			goto NEXTROUND;
#ifdef __CP_SOURCE
#ifdef HAVE_TS
		if (FD_ISSET(TSSOCK, &socks))
			process_TS ();
		if (FD_ISSET(TSSOCK, &esocks))
		{
			PDEBUG ("exit...\n");
			exit (errno);
		}
#endif
#endif
#ifdef __SP_SOURCE
		for(type=0; type < MAX_TS; ++type)
		{
			if(tsSock[type] > 0 && FD_ISSET(tsSock[type], &socks))
				process_TS2RM(type);    //

		}
#endif
		for (type=0; type<MAX_TYPE; type++)
		{
#ifdef __CP_SOURCE
			if (TRACKER[type].flag == FLAG_SERVER && FD_ISSET(TRACKER[type].sock, &socks))
#endif
#ifdef __SP_SOURCE
			if (FD_ISSET(TRACKER[type].sock, &socks))
#endif
				handle_new_connection (TRACKER[type].sock, type);
#ifdef __CP_SOURCE
			if (TRACKER[type].flag == FLAG_SERVER && FD_ISSET(TRACKER[type].sock, &esocks))
#endif
#ifdef __SP_SOURCE
			if (FD_ISSET(TRACKER[type].sock, &esocks))
#endif
			{
				PDEBUG ("exit...\n");
				exit (errno);
			}

			process_type (type, &socks, &wsocks, &esocks);
		}
NEXTROUND:
		period_process ();
	}
}

/* assume the message has been transfered into p->buf */
inline void writeDATAMessage (struct Session *p, struct Channel *pc, struct JobDes *ptr)
{
	ptr->len += (*(int *)(ptr->buffer));
	addJob (p, pc, ptr);
}

int writeMessage (struct Session *p, struct Channel *pc, char *ptr)
{
	struct JobDes *pj;
	char *buffer;
	int max = 0;
	int new = 0;
	int len = *(int *)ptr;
	if ((pj = findEnoughBuffer (p, pc, len)) == NULL)
	{
		new = 1;
		if ((pj = newJob ()) == NULL)
			return -1;
	}
	buffer = getJobBuffer (pj, &max);
	memcpy (buffer, ptr, len);
	pj->len += len;
	if (new)
		addJob (p, pc, pj);
	return 0;
}

struct Edge *newEdge (struct Channel *head, struct Session *me)
{
	struct Edge *result = malloc (sizeof (struct Edge));
	result->head = head;
	result->me = me;
	result->cnext = head->PeerHead;
	head->PeerHead = result;
	result->enext = me->header;
	me->header = result;
	return result;
}

int delEdge (struct Edge *e)
{
	struct Channel *pchannel=e->head;
	struct Session *psession=e->me;
	struct Edge *pedge;

	if (pchannel)
	{
		if (pchannel->PeerHead == e) pchannel->PeerHead = e->cnext;
		else
		{
			for (pedge=pchannel->PeerHead; pedge && pedge->cnext && pedge->cnext != e; pedge = pedge->cnext);
			if (pedge && pedge->cnext)
				pedge->cnext = e->cnext;
		}
	}
	if (psession)
	{
		if (psession->header == e) psession->header = e->enext;
		else
		{
			for (pedge=psession->header; pedge && pedge->enext && pedge->enext != e; pedge = pedge->enext);
			if (pedge && pedge->enext)
				pedge->enext = e->enext;
		}
	}
	if (psession->pc == pchannel) psession->pc = NULL;
	free (e);
	return 0;
}

void apply_session (struct Session *head, int size, void apply(struct Session *, void *), void *p)
{
	int i;
	for (i = 0; i < size; i++)
	{
		if (head[i].socket > 0) apply (&(head[i]), p);
	}
}

int logto_xml (unsigned int time_interval, unsigned int tmpTime, unsigned int channelcount, unsigned int totalclient)
{
	FILE *logf;
	if (LOGXML == NULL || LOGXML[0] == 0 || (logf = fopen(LOGXML,"w")) == NULL)
	{
		PDEBUG("Couldn't open xml log %s!.\n", LOGXML);
		return -1;
	}
	fprintf(logf, "<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>\n");
#ifdef __SP_SOURCE
	fprintf(logf, "<SP>\n");
#endif
#ifdef __CP_SOURCE
	fprintf(logf, "<CP>\n");
#endif
	fprintf(logf, "<ElapsedTime>%ld</ElapsedTime>\n", CurrentTime-startTime);
	fprintf(logf, "<CurrentIncoming>%.4f</CurrentIncoming>\n", ((float)tmpDownBytes)/1024/time_interval);
	fprintf(logf, "<CurrentOutgoing>%.4f</CurrentOutgoing>\n", ((float)tmpUpBytes)/1024/time_interval);
	fprintf(logf, "<AverageIncoming>%.4f</AverageIncoming>\n", ((float)totalDownBytes)/1024/tmpTime);
	fprintf(logf, "<AverageOutgoing>%.4f</AverageOutgoing>\n", ((float)totalUpBytes)/1024/tmpTime);
	fprintf(logf, "<ActiveChannel>%d</ActiveChannel>\n", channelcount);
	fprintf(logf, "<OnlineUser>%d</OnlineUser>\n", totalclient);
	fprintf(logf, "<TotalIncoming>%lld</TotalIncoming>\n", totalDownBytes);
	fprintf(logf, "<TotalOutgoing>%lld</TotalOutgoing>\n", totalUpBytes);
#ifdef __SP_SOURCE
	fprintf(logf, "</SP>\n");
#endif
#ifdef __SP_SOURCE
	fprintf(logf, "</CP>\n");
#endif
        fclose(logf);
	return 0;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -