⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tsnew.c

📁 mysee网络直播源代码Mysee Lite是Mysee独立研发的网络视频流媒体播放系统。在应有了P2P技术和一系列先进流媒体技术之后
💻 C
📖 第 1 页 / 共 5 页
字号:
/*
 *  Openmysee
 *
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; either version 2 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program; if not, write to the Free Software
 *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 *
 */
				 
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <ctype.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <assert.h>
#include <syslog.h>
#include <string.h>
#include <time.h>
#include <assert.h>

#include "ProtocolDefine.h"
#include "StructDefine.h"
#include "ErrorDefine.h"
#include "util.h"
#include "findcp.h"

#define MAX_BIND		10
#define AUTH_HEADER             12
#define NORMAL_HEADER           5
#define MAX_NET_NUM		5

#define MAX_CLIENT		3072

#define MAX_PEER		0x10		/* max peers returned by NEED_PEERS */

#define IPADDR_LEN		16		/*****IP地址最大长度*****/
#define MD5_LEN			32
#define SERVICE_LEN		128

#define TYPE_NP			0
#define TYPE_CP			1


#define MAX_CHANNEL		0x400		//1024
#define MAX_NP			0x40000		//262144
#define MAX_CP			0x10000		//65536
#define MAX_RM			0x4		//4

//#define TS4CP_PORT		22168
//#define TS4RM_PORT		22169

#define MAX_DATA		20000
#define MAX_LINE		1024
#define MAX_IDLE		120
#define SILENCE_TIME		2
#define BUILD_SOCKADDR(host,port,client)	\
{\
	memset ((char *)&client, 0, sizeof (client));\
	client.sin_port = htons (pnp->port);\
	client.sin_family = AF_INET;\
	client.sin_addr.s_addr = htonl (pnp->host);\
}

#define FREE_TRACKER(tracker)		\
{\
	int i;\
	max = tracker.max;\
	closure = tracker.closure;\
	if ((head = tracker.head) != NULL)\
	{\
	for (listnum=0; listnum<max; listnum++)\
		{\
		if (head[listnum].socket > 0)\
		(*closure) (head+listnum);\
		}\
		free (head);\
	}\
	if (tracker.hash) free (NPTRACKER.hash);\
	for (i=0; tracker.sock[i] != 0 && i<MAX_BIND; i++)\
	close (tracker.sock[i]);\
}
#define MAX_INTERVAL	20
#define OBSERVE_LAYER	10
#define BASEDIR		"/home/channel/"

int current_log_count = 0;
#define MAX_LOG_COUNT 100 // call fflush() when current_log_count reaches MAX_LOG_COUNT

// 用来表示NP上面所有的快区间, 目前块区间由一个开始字段和一个长度字段来标识. 
struct Interval
{
	unsigned int start;
	unsigned int len;
};
// Generic format of messages between TS and other peers
// Most often used in login process
struct Message
{
	unsigned int len;
	unsigned char type;
	char buffer[MAX_DATA];
};
// format of messages between TS and other peers
// with authcodes specified
struct TSMessage
{
	unsigned int len;
	unsigned char type;
	unsigned int authcode1:24;
	unsigned int authcode2;
	char buffer[MAX_DATA];
} UDPMsg;

struct ChannelInfo
{
	char md5[MD5_LEN+1];
	unsigned char numinter;
	struct Interval inter[MAX_INTERVAL];
};


struct Channel
{
	char name[MD5_LEN+1];
	unsigned int numclient;
	unsigned int accumclient;
	unsigned int latest_time;
#ifndef SORT_NET
	struct Edge *PeerHead;
#else
	unsigned int nclient_net[MAX_NET_NUM];
	struct Edge *PeerHead[MAX_NET_NUM];
#endif
	struct Session *SCPhead;
	struct Channel *next;
};

struct Edge
{
	struct Channel *head;
	struct Session *me;
	struct Edge *cnext;
	struct Edge *enext;
	unsigned char numinter;
	unsigned int current;
	struct Interval inter[MAX_INTERVAL];
};

struct NPInfo
{
	struct CorePeerInfo p;
	struct TransferInfo t;
	struct StatInfo	 s;
	int    startBlock; // starting position of NP
	int numchannel;
	struct Edge *cur;
	struct Edge *header;
};

struct CPInfo
{
	char type;
	unsigned char numHeads;
	unsigned short connnum;
	unsigned char maxConn;
	char parameter[42];
	int userid;
	int resnum;
	float band;
	char servicetype[128];
};

struct Session
{
	int type;
	int socket;
	unsigned int host;
	unsigned int intra;
	unsigned short port;
	unsigned short npport;
#ifdef SORT_NET
	unsigned int net;
	struct Session *cachepeer[MAX_NET_NUM];
#else
	struct Session *cachepeer;
#endif
	unsigned int auth;
	unsigned int time_sec;
	unsigned int last_access;
	union
	{
		struct NPInfo p;
		struct CPInfo	cp;
	} u;
	float clientVer;
	struct Session *hnext;
};

FILE *statlog = NULL; // statistics log file

extern int errno;
int OUTPUT_STAT;
time_t CurTimeSec;
time_t startTime;
char *CONFIG="./ats.cfg";

char *LOCALHOST;
#ifdef HAVE_MYSQL
char *MYSQL_HOST="localhost";
char *MYSQL_USER="root";
char *MYSQL_PASS="gtv";
char *MYSQL_DB="gtv";
#endif

int CurrentSock;

fd_set osocks;
int highsock;

char *LOGXML;

struct TransferInfo Transfer;
struct sockaddr_in UDPCLIENT;

extern int init();
extern int readconfig(char * filename);
extern const char* find_ip_from_list(unsigned long ip);
extern int findcppeers(unsigned long ip, void *p);
extern int findnettype(const char *servicetype, void* p);
extern void add_cp_to_list(void *p);
extern void remove_cp_from_list(void *p);
extern const char* find_cp_service_type(unsigned long ip);

#define MAX_POLLUTE	1000
int BINDALL;
int Polluted;			/* should we call periodLOG? if polluted>MAX_POLLUTE call */
struct Array cfgTS4NP_PORT;
struct Array cfgTS4CP_PORT;
struct Array cfgTS4RM_PORT;
int SnapShotInterval;		// make a snap shot every few seconds
float MIN_CLIENT_VERSION;	// client version should be bigger than this one.

long long np2tsLoginCount = 0;
long long np2tsResListCount = 0;
long long np2tsReqResCount = 0;
long long np2tsDelResCount = 0;
long long np2tsReportCount = 0;
long long np2tsNeedPeerCount = 0;
long long np2tsLogoutCount = 0;

long long ts2npWelcomeCount = 0;
long long ts2npPeersCount = 0;
long long ts2npConnectToCount = 0;
long long ts2npMsgCount = 0;

/*************************************************** 
 * SessionCluster是有关每种服务的信息.             *
 * 其中有一个指向该服务相关的各个Session的指针.    *
 ***************************************************/ 
struct SessionCluster
{
	unsigned int port[MAX_BIND];	
	unsigned int maxbuf;
	int sock[MAX_BIND]; // 描述所使用的socket
	int cur; // current number of sessions 当前客户端的个数
	int max; // maximum number of sessions 最多容纳客户端的个数
	int maxid; // maxid: maximum session index currently in the list. for optimization of search
	struct Session *head; /* pointer to the session pool 
			         head[0]为第一个Session,head[max-1]为最后一个session */
	struct Session **hash; // session hash table
	int (*process) (struct Session *); //这一服务中消息的处理函数
	int (*closure) (struct Session *); //这一服务中需要的析构函数
};

struct Session *GCPCHOICE;
struct Channel *ChannelHash[MAX_CHANNEL];
struct SessionCluster NPTRACKER, CPTRACKER;
#ifdef HAVE_RM
struct SessionCluster RMTRACKER;
#endif
#ifdef SORT_NET
char *NETFN;
#endif

struct NamVal ConfigParameters[]
= 
{
#ifdef HAVE_MYSQL
	{"MysqlAddress", &MYSQL_HOST, 's'},	
	{"User", &MYSQL_USER, 's'},
	{"Password", &MYSQL_PASS, 's'},
	{"Database", &MYSQL_DB, 's'},
#endif
#ifdef SORT_NET
	{"Netfile", &NETFN, 's'},
#endif
	{"Bind", &LOCALHOST, 's'},
	{"TS4NP_PORT", &cfgTS4NP_PORT, 'a'},
	{"TS4CP_PORT", &cfgTS4CP_PORT, 'a'},
	{"TS4RM_PORT", &cfgTS4RM_PORT, 'a'},
	{"SnapShotInterval", &SnapShotInterval, 'd'},
	{"ClientVersion", &MIN_CLIENT_VERSION, 'f'},
	{"LogFilePath", &LOGXML, 's'},
	{"BINDALL", &BINDALL, 'd'}
};

#ifdef DEBUG
#define PDEBUG(fmt, args...)		fprintf(stderr, "TS: (%s,%d)" fmt, __FILE__, __LINE__, ## args)
#else
#define PDEBUG(fmt, args...)		do {} while (0)
#endif

#ifdef SORT_NET
#define MAX_NET		2048
struct networks
{
	unsigned int host;
	unsigned int mask;
	int net;
} NETBLOCKS[MAX_NET];

unsigned int MASKS[33]={0x0,0x80000000,0xc0000000,0xe0000000,0xf0000000,0xf8000000,0xfc000000,0xfe000000,0xff000000,0xff800000,0xffc00000,0xffe00000,0xfff00000,0xfff80000,0xfffc0000,0xfffe0000,0xffff0000,0xffff8000,0xffffc000,0xffffe000,0xfffff000,0xfffff800,0xfffffc00,0xfffffe00,0xffffff00,0xffffff80,0xffffffc0,0xffffffe0,0xfffffff0,0xfffffff8,0xfffffffc,0xfffffffe,0xffffffff};

// globals
int maxNet;		// max id


int readNETBLOCK (char *fname); //用来读入network配置文件
struct networks *getnetwork (unsigned int host, struct networks *head, int n);
int compareNet (const void *a, const void *b);
#endif

struct Message ErrMSG;

#define SEND_NPMSG(sock,msg,code,quit,client)		\
{\
	ErrMSG.len = 8;\
	ErrMSG.type = msg;\
	++ts2npMsgCount;\
	*(unsigned short *)(ErrMSG.buffer) = code;\
	*(unsigned char *)(ErrMSG.buffer+sizeof(short)) = quit;\
	sendMessage(sock,(char *)&ErrMSG,client);\
}

int init_ts ();
void my_exit () __attribute__((noreturn, destructor));
void process_child (void);

int init_NP (struct Session *);
int process_NP (int idsock);
int closure_NP (struct Session *);
int process_NP2TS_LOGIN (struct Message *);
int process_NP2TS_REPORT (struct Session *, struct TSMessage *);
int process_NP2TS_REPORT2 (struct Session *, struct TSMessage *);
int process_NP2TS_RES_LIST (struct Session *, struct TSMessage *);
int process_NP2TS_REQ_RES (struct Session *, struct TSMessage *);
int process_NP2TS_DEL_RES (struct Session *, struct TSMessage *);
int process_NP2TS_NEED_PEERS (struct Session *, struct TSMessage *m);
int process_NP2TS_QUERY_RES (struct Session *p, struct TSMessage *m);
int process_NEED_PEERS_real (struct Session *p, char *md5, int needcp, unsigned int cur, unsigned char layer);

int findNPPeers (struct Channel *pc, struct Session *me, int playing, int num, char **buffer, char *buffer1);
int findCPPeers (unsigned long host, char *md5, char **buffer);

int init_CP (struct Session *);
int process_CP (int idsock);
int closure_CP (struct Session *);
int process_CP2TS_UPDATE (struct Session *, struct TSMessage *m);
int process_CP2TS_REGISTER (struct Message *m);
int process_CP2TS_NEED_PEERS (struct Session *p, struct TSMessage *m);

void periodLOG (int s);
void makeSnapShot(int count);
//int memlog(char *pwd,char *cmd);
void freeChannel (struct Channel *p);
int check_valid (struct Edge *e, int play);
int merge (struct Interval *head, int total, struct Interval *_new, int num);
int delete_interval (struct Interval *head, int total, struct Interval *_new, int num);

#ifdef HAVE_RM
int init_RM (struct Session *p);
int process_RM (int idsock);
int closure_RM (struct Session *p);
#endif
int logto_xml (long channelcount, unsigned int totalclient, long long totalstay);

inline int hash_np (int h, int p)
{
	int id = (((h & (0xffffffff-MAX_NP+1)) >> 14) ^ (h & (MAX_NP-1)) ^ (p & 0xffff)) & (MAX_NP -1);
	return id?id:(MAX_NP-1);
}

inline int hash_cp (int h, int p)
{
	int id = (((h & (0xffffffff-MAX_CP+1)) >> 12) ^ (h & (MAX_CP-1)) ^ (p & 0xffff)) & (MAX_CP -1);
	return id?id:(MAX_CP-1);
}

inline int hash_str (unsigned char *str, int len)
{
	int hash;
	for (hash=0; len; len--, str++)
		hash += (hash << 5) - hash + (*str);
	return hash & (MAX_CHANNEL-1);
}

// process_child: 主要函数, 这一函数主要用来设置socks和wsocks.
void process_child (void)
{
	time_t last_update=0, last_snapshot=0;
	int snapCount = 0;
	int i, readsocks;
	struct timeval tm;
	fd_set socks, esocks;

	startTime = time(NULL);
	for (i=0; ; i++) 
	{
		CurTimeSec = time (NULL);
		if (CurTimeSec-last_update > MAX_IDLE)
		{
			periodLOG (1); // close timeout peers
			last_update = CurTimeSec;
			Polluted = 0; // clear dirty flag
		}
		if(CurTimeSec - last_snapshot > SnapShotInterval)
		{
			makeSnapShot(snapCount++); // read it!
			system("/usr/bin/vmstat >> ts.log 2>&1 &");
			last_snapshot = CurTimeSec;
		}
		
		socks = osocks;
		esocks = osocks;
		
		tm.tv_sec = 1;
		tm.tv_usec = 0;
		readsocks = select(highsock+1, &socks, (fd_set *) 0, &esocks, &tm);
		if (readsocks <= 0)
			continue;
		for (i=0; i<MAX_BIND && NPTRACKER.sock[i] != 0; i++)
		{
			if (FD_ISSET (NPTRACKER.sock[i], &socks))
			{
				CurrentSock = i;
				process_NP (i);
			}
			// should have a 'break' here to cut off unnecessary check
		}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -