⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cptest.c

📁 mysee网络直播源代码Mysee Lite是Mysee独立研发的网络视频流媒体播放系统。在应有了P2P技术和一系列先进流媒体技术之后
💻 C
字号:
/*
 *  Openmysee
 *
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; either version 2 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program; if not, write to the Free Software
 *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 *
 */
				 
//#define HAVE_MYSQL	1

#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <ctype.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <assert.h>
#include <syslog.h>
#include <string.h>
#include <time.h>

#include "ProtocolDefine.h"
#include "StructDefine.h"

#define MAX_DATA		100000
#define MAX_LINE		1024
#define MAX_IDLE		300
#define SILENCE_TIME		2
#define SP4CP_PORT		50001
int DELAY=0;
int timer=1;
float PROB=1.0;

struct Message
{
	int len;
	unsigned char type;
	char buffer[MAX_DATA];
} UDPMsg;


unsigned short P2P_PORT=23;
char *CPSERVER="166.111.118.6";
unsigned long long TOTALDATA;
unsigned long long TOTALDATASIZE;
unsigned long long TOTALR;
unsigned long long TOTALW;
unsigned long long BYTES_READ;
unsigned long long BYTES_WRITE;
int MaxClient = 50;
int Verbosity;
int Current;

int process (char *md5, int latest);

void settimer (int sig)
{
	timer = 0;
}

struct hostent * init_sockaddr (struct sockaddr_in *name, char *host, unsigned int port)
{
	struct hostent *h;
	unsigned short sp = port;
	memset (name, 0, sizeof (*name));
	name->sin_family = PF_INET;
	name->sin_port = htons (sp);
	h = gethostbyname (host);
	if (h == (struct hostent *)0)
	{
		perror ("gethostbyname");
		return (struct hostent *)0;
	}
	name->sin_addr = *(struct in_addr *)h->h_addr;
	return h;
}

int my_connect (char *host, int port)
{
	struct sockaddr_in client;
	struct hostent *h;
	int connection = socket (PF_INET, SOCK_STREAM, 0);
	if ((connection < 0) || ((h = init_sockaddr (&client, host, port)) == (struct hostent *)0))
	{
		perror ("socket||gethostbyname");
		return -1;
	}
	if (connect (connection, (struct sockaddr *) &client, sizeof (client)) < 0)
	{
		perror ("connect");
		return -1;
	}
	return connection;
}

int maxID=11286, maxCurrentID;
unsigned int *maxBuf;
unsigned int *fdBuf;

int main(int argc, char **argv)
{
	char *md5="13e7f8397575944c2ca1cb113531d758";
	int c;

	signal (SIGINT, settimer);
	while ((c = getopt (argc, argv, "vD:s:m:hc:T:t:p:")) != -1)
	{
		switch (c)
		{
			case 'D':
				DELAY = atoi (optarg);
				break;
			case 'v':
				Verbosity = 1;
				break;
			case 's':	// channel id of start
				md5 = optarg;
				break;
			case 'm':
				maxID = atoi(optarg);
				break;
			case 'h':
				fprintf (stdout, "%s [-T timer_to_leave] [-D delay] [-s channelid] [-h] [-c numofclient] [-t spserver_ip] [-p port] [-m maxid] [-v]\n", argv[0]);
				exit (0);
			case 'c':
				MaxClient = atoi (optarg);
				break;
			case 'T':
				alarm (atoi (optarg));
				signal (SIGALRM, settimer);
				break;
			case 't':
				CPSERVER = optarg;
				break;
			case 'p':
				P2P_PORT = atoi (optarg);
				break;
			default:
				fprintf (stdout, "%s [-T timer_to_leave] [-D delay] [-s channelid] [-h] [-c numofclient] [-t spserver_ip] [-p port] [-m maxid] [-v]\n", argv[0]);
				exit (0);
		}
	}
	if (maxID > 0)
	{
		maxBuf = calloc (maxID, sizeof (int));
		fdBuf = calloc (maxID, sizeof (int));
	}
	fprintf (stdout, "maxID is %d\n", maxID);
	signal (SIGPIPE, SIG_IGN);
	process (md5, 0);
	return 0;
}

int writeMessage (int sock, char *ptr)
{
	int len = *(int *)ptr;
	if (write (sock, ptr, len) != len)
		return -1;
	TOTALW ++;
	BYTES_WRITE += len;
	return len;
}



struct sockInfo
{
	int sockfd;
	int reged;
	int current;
	unsigned int delay;
	int status;
	int start;
	int len;
	char buffer[MAX_DATA];
};


void process_reader (struct sockInfo *sockpool, int i, int *isfull, fd_set *osock)
{
	struct timeval tm;
	struct SPUpdate *s;
	struct Message *msg;
	int nlen, start, offset;
	nlen = read (sockpool[i].sockfd, sockpool[i].buffer+sockpool[i].start+sockpool[i].len, MAX_DATA - sockpool[i].len - sockpool[i].start);
	if (nlen < 0)
	{
		fprintf (stderr, "Error in recv from %d:%d\n", i, sockpool[i].sockfd);
		FD_CLR (sockpool[i].sockfd, osock);
		close (sockpool[i].sockfd);
		sockpool[i].reged = -1;
		return;
	} else if (nlen == 0)
	{
		fprintf (stderr, "SP closed %d:%d\n", i, sockpool[i].sockfd);
		FD_CLR (sockpool[i].sockfd, osock);
		close (sockpool[i].sockfd);
		sockpool[i].reged = -1;
		return;
	}
	BYTES_READ += nlen;
	sockpool[i].len += nlen;
	while (sockpool[i].len > 0)
	{
		msg = (struct Message *)(sockpool[i].buffer + sockpool[i].start);
		if (sockpool[i].len < sizeof (int) || sockpool[i].len < msg->len)
			break;


		switch (msg->type)
		{
			case P2P_RESPONSE:
//				sockpool[i].status = 0;
				if (*(int *)(msg->buffer+sizeof(int)) > 0)
				{
					TOTALDATA ++;
					TOTALDATASIZE += msg->len;
					sockpool[i].delay = DELAY;
					if (maxBuf)
					{
						maxBuf[*(int *)(msg->buffer)] = 2;
						fdBuf[*(int *)(msg->buffer)] = i;
					}
				} else
				{
					if (maxBuf)
						maxBuf[*(int *)(msg->buffer)] = 0;
					if (DELAY > 0)
					{
						tm.tv_sec = 0;
						tm.tv_usec = sockpool[i].delay;
						sockpool[i].delay = sockpool[i].delay * 2;
						select (1,NULL, NULL, NULL, &tm);
					}
				}
				break;
			case P2P_SPUPDATE:
				TOTALR ++;
				s = (struct SPUpdate *)(msg->buffer);
				if (maxID == 0)
				{
					maxCurrentID = s->maxBlockID;
//					maxBuf = calloc (maxID, sizeof (int));
				}
				break;
		}
		sockpool[i].len -= msg->len;
		sockpool[i].start += msg->len;
	}
	if (sockpool[i].len == 0) sockpool[i].start = 0;
	else if (sockpool[i].start > 0 && sockpool[i].start +sockpool[i].len > MAX_DATA/2)
	{
		for (start = 0, offset=sockpool[i].start; start<sockpool[i].len; start++, offset++)
			sockpool[i].buffer[start] = sockpool[i].buffer[offset];
		sockpool[i].start = 0;
	}
}

#define MAX_PUSH	5
void process_writer (struct sockInfo *sockpool, int i, int *isfull, fd_set *osock)
{
	int count;
	int blocks[MAX_PUSH];
	int numb = 0;
	int cur;
	char *buf;
//	if (sockpool[i].status != 0) return;
	if (maxID>0)
	{
		*isfull = 1;
		for (count=0,cur=Current; count<maxID && numb < MAX_PUSH; cur++, count++)
		{
			if (cur >= maxID) cur = 0;
			switch (maxBuf[cur])
			{
				case 0:
					blocks[numb] = cur;
					numb ++;
					break;
				case 1:
					*isfull = 0;
					break;
			}
		}
		if (count >= maxID)
		{
//			if (TOTALDATA == 9960) fprintf (stdout, "No request now\n");
			return;
		}
	} else
	{
		if (sockpool[i].current + 1 <= maxCurrentID)
			cur=sockpool[i].current + 1;
		else return;
	}
//	sockpool[i].status = 1;
	Current = cur;
//	fprintf (stdout, "Request %d %d block\n", maxID, cur);
	UDPMsg.type = P2P_PUSHLIST;
	buf = UDPMsg.buffer;
	*(char *)buf = 0;
	buf += sizeof (char);
	if (maxID > 0)
	{
		*(char *)buf = numb;
		buf += sizeof (char);
		for (cur=0; cur<numb; cur++)
		{
			*(int *)buf = blocks[cur];
			buf += sizeof (int);
			maxBuf[blocks[cur]] = 1;
			fdBuf[blocks[cur]] = i;
		}
	} else
	{
		*(char *)buf = 1;
		buf += sizeof (char);
		*(int *)buf = cur;
		buf += sizeof (int);
	}
	*(char *)buf = 0;
	buf += sizeof (char);
	UDPMsg.len = buf - UDPMsg.buffer + sizeof (int)+sizeof(char);
	if (writeMessage (sockpool[i].sockfd, (char *)&UDPMsg) < 0)
	{
		fprintf (stdout, "Cannot write to sock %d, block id is %d\n", sockpool[i].sockfd, cur);

		sockpool[i].reged = -1;
	}
	sockpool[i].current = cur;
	*isfull = 0;
}

int process (char *md5, int latest)
{
	struct timeval tmnew;
	fd_set wsock, rsock, osock;
	int nread, i, maxsock = 0;
	struct sockInfo *sockpool = calloc (MaxClient, sizeof (struct sockInfo));
	char *buf;
	int cur;
	int isfull = 0;
	long long begin, now;
	double elapsed=0;
	struct NormalAddress SPADDR;
	struct PeerInfoWithAddr myaddr;

	SPADDR.sin_family = AF_INET;
	SPADDR.sin_port = htons (SP4CP_PORT);
	SPADDR.sin_addr.s_addr = inet_addr (CPSERVER);
	memset (&myaddr, 0, sizeof (myaddr));
	myaddr.b.outerIP.sin_family = AF_INET;
	myaddr.b.outerIP.sin_port = htons (SP4CP_PORT);
	myaddr.b.outerIP.sin_addr.s_addr = inet_addr ("166.111.215.87");
	FD_ZERO (&osock);
	for (i=0; i<MaxClient; i++)
	{
		if ((sockpool[i].sockfd = my_connect (CPSERVER, P2P_PORT)) < 0)
		{
			fprintf (stderr, "Error in my_connect %d.\n", i);
			exit (1);
		}
		if (sockpool[i].sockfd > maxsock)
			maxsock = sockpool[i].sockfd;
		sockpool[i].reged = -1;
		memset (&UDPMsg, 0, sizeof (struct Message));
		UDPMsg.type = P2P_HELLO;
		buf = UDPMsg.buffer + sizeof (float);
		memcpy (buf, md5, MD5_LEN);
		buf += MD5_LEN;
		*(unsigned char *)buf = 0;
		buf += sizeof (char);
		memcpy (buf, &myaddr, sizeof (myaddr));
		buf += sizeof (myaddr);
		*(unsigned char *)buf = 1;
		buf += sizeof (char);
		memcpy (buf, &SPADDR, sizeof (struct NormalAddress));
		buf += sizeof (SPADDR);
		UDPMsg.len = buf - UDPMsg.buffer + sizeof (int) + sizeof(char);
		if (writeMessage (sockpool[i].sockfd, (char *)&UDPMsg) >= 0)
		{
			sockpool[i].current = -1;
			sockpool[i].start = 0;
			sockpool[i].len = 0;
			sockpool[i].reged = 0;
		} else
		{
			fprintf (stdout, "Cannot write register info to sock %d", sockpool[i].sockfd);
		}
	}

	gettimeofday (&tmnew, NULL);
	begin = ((long long)tmnew.tv_sec)*1000000l + tmnew.tv_usec;
	while (isfull == 0 && timer == 1)
	{
		FD_ZERO (&rsock);
		FD_ZERO (&wsock);
		for (i=0; i<MaxClient; i++)
		{
			if (sockpool[i].reged >= 0)
			{
				FD_SET (sockpool[i].sockfd, &rsock);
//				if (sockpool[i].status == 0)
					FD_SET (sockpool[i].sockfd, &wsock);
			}
		}
		if ((nread = select (maxsock+1, &rsock, &wsock, NULL, NULL)) <= 0)
			continue;
		for (i=0; i<MaxClient; i++)
		{
			if (FD_ISSET (sockpool[i].sockfd, &rsock))
				process_reader (sockpool, i, &isfull, &osock);
			if (sockpool[i].reged >= 0 && sockpool[i].sockfd > 0 && FD_ISSET (sockpool[i].sockfd, &wsock))
				process_writer (sockpool, i, &isfull, &osock);
		}
		if (Verbosity)
		{
			gettimeofday (&tmnew, NULL);
			now = ((long long)tmnew.tv_sec) * 1000000l + tmnew.tv_usec;
			if (now-begin > elapsed + 2000000l)
			{
				elapsed = now - begin;
				fprintf (stdout, "%lld: total %lld write, %lld read, %lld bytes write, %lld bytes read, %lld blocks and %lld data.\n", now-begin, TOTALW, TOTALR, BYTES_WRITE, BYTES_READ, TOTALDATA, TOTALDATASIZE);
				fprintf (stdout, "avg read packets/s: %f; avg write packets/s: %f\navg read Mb/s: %f; avg write Mb/s %f\ntotal packets/s: %f; total Mb/s %f.\n", TOTALR/(elapsed/1000000), TOTALW/(elapsed/1000000), BYTES_READ*8/elapsed, BYTES_WRITE*8/elapsed, (TOTALR+TOTALW)/(elapsed/1000000), (BYTES_READ+BYTES_WRITE)*8/elapsed);
			}
		}

	}
	gettimeofday (&tmnew, NULL);
	now = ((long long)tmnew.tv_sec) * 1000000l + tmnew.tv_usec;
	elapsed = now - begin;
	fprintf (stdout, "%lld: total %lld write, %lld read, %lld bytes write, %lld bytes read, %lld blocks and %lld data.\n", now-begin, TOTALW, TOTALR, BYTES_WRITE, BYTES_READ, TOTALDATA, TOTALDATASIZE);
	fprintf (stdout, "avg read packets/s: %f; avg write packets/s: %f\navg read Mb/s: %f; avg write Mb/s %f\ntotal packets/s: %f; total Mb/s %f.\n", TOTALR/(elapsed/1000000), TOTALW/(elapsed/1000000), BYTES_READ*8/elapsed, BYTES_WRITE*8/elapsed, (TOTALR+TOTALW)/(elapsed/1000000), (BYTES_READ+BYTES_WRITE)*8/elapsed);
	if (maxID>0)
	{
		for (cur=0; cur<maxID; cur++)
		{
			if (maxBuf[cur] == 1 || maxBuf[cur] == 0)
				fprintf(stdout, "(%d %d %d)\t", cur, maxBuf[cur], fdBuf[cur]);
		}
		fprintf(stdout, "\n");
	}
	for (i=0; i<MaxClient; i++)
	{
		if (sockpool[i].reged >= 0)
			close (sockpool[i].sockfd);
	}
	return 0;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -