📄 tsnew.c
字号:
/*
* Openmysee
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <ctype.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <assert.h>
#include <syslog.h>
#include <string.h>
#include <time.h>
#include <assert.h>
#include "ProtocolDefine.h"
#include "StructDefine.h"
#include "ErrorDefine.h"
#include "util.h"
#include "findcp.h"
#define MAX_BIND 10
#define AUTH_HEADER 12
#define NORMAL_HEADER 5
#define MAX_NET_NUM 5
#define MAX_CLIENT 3072
#define MAX_PEER 0x10 /* max peers returned by NEED_PEERS */
#define IPADDR_LEN 16 /*****IP地址最大长度*****/
#define MD5_LEN 32
#define SERVICE_LEN 128
#define TYPE_NP 0
#define TYPE_CP 1
#define MAX_CHANNEL 0x400 //1024
#define MAX_NP 0x40000 //262144
#define MAX_CP 0x10000 //65536
#define MAX_RM 0x4 //4
//#define TS4CP_PORT 22168
//#define TS4RM_PORT 22169
#define MAX_DATA 20000
#define MAX_LINE 1024
#define MAX_IDLE 120
#define SILENCE_TIME 2
#define BUILD_SOCKADDR(host,port,client) \
{\
memset ((char *)&client, 0, sizeof (client));\
client.sin_port = htons (pnp->port);\
client.sin_family = AF_INET;\
client.sin_addr.s_addr = htonl (pnp->host);\
}
#define FREE_TRACKER(tracker) \
{\
int i;\
max = tracker.max;\
closure = tracker.closure;\
if ((head = tracker.head) != NULL)\
{\
for (listnum=0; listnum<max; listnum++)\
{\
if (head[listnum].socket > 0)\
(*closure) (head+listnum);\
}\
free (head);\
}\
if (tracker.hash) free (NPTRACKER.hash);\
for (i=0; tracker.sock[i] != 0 && i<MAX_BIND; i++)\
close (tracker.sock[i]);\
}
#define MAX_INTERVAL 20
#define OBSERVE_LAYER 10
#define BASEDIR "/home/channel/"
int current_log_count = 0;
#define MAX_LOG_COUNT 100 // call fflush() when current_log_count reaches MAX_LOG_COUNT
// 用来表示NP上面所有的快区间, 目前块区间由一个开始字段和一个长度字段来标识.
struct Interval
{
unsigned int start;
unsigned int len;
};
// Generic format of messages between TS and other peers
// Most often used in login process
struct Message
{
unsigned int len;
unsigned char type;
char buffer[MAX_DATA];
};
// format of messages between TS and other peers
// with authcodes specified
struct TSMessage
{
unsigned int len;
unsigned char type;
unsigned int authcode1:24;
unsigned int authcode2;
char buffer[MAX_DATA];
} UDPMsg;
struct ChannelInfo
{
char md5[MD5_LEN+1];
unsigned char numinter;
struct Interval inter[MAX_INTERVAL];
};
struct Channel
{
char name[MD5_LEN+1];
unsigned int numclient;
unsigned int accumclient;
unsigned int latest_time;
#ifndef SORT_NET
struct Edge *PeerHead;
#else
unsigned int nclient_net[MAX_NET_NUM];
struct Edge *PeerHead[MAX_NET_NUM];
#endif
struct Session *SCPhead;
struct Channel *next;
};
struct Edge
{
struct Channel *head;
struct Session *me;
struct Edge *cnext;
struct Edge *enext;
unsigned char numinter;
unsigned int current;
struct Interval inter[MAX_INTERVAL];
};
struct NPInfo
{
struct CorePeerInfo p;
struct TransferInfo t;
struct StatInfo s;
int startBlock; // starting position of NP
int numchannel;
struct Edge *cur;
struct Edge *header;
};
struct CPInfo
{
char type;
unsigned char numHeads;
unsigned short connnum;
unsigned char maxConn;
char parameter[42];
int userid;
int resnum;
float band;
char servicetype[128];
};
struct Session
{
int type;
int socket;
unsigned int host;
unsigned int intra;
unsigned short port;
unsigned short npport;
#ifdef SORT_NET
unsigned int net;
struct Session *cachepeer[MAX_NET_NUM];
#else
struct Session *cachepeer;
#endif
unsigned int auth;
unsigned int time_sec;
unsigned int last_access;
union
{
struct NPInfo p;
struct CPInfo cp;
} u;
float clientVer;
struct Session *hnext;
};
FILE *statlog = NULL; // statistics log file
extern int errno;
int OUTPUT_STAT;
time_t CurTimeSec;
time_t startTime;
char *CONFIG="./ats.cfg";
char *LOCALHOST;
#ifdef HAVE_MYSQL
char *MYSQL_HOST="localhost";
char *MYSQL_USER="root";
char *MYSQL_PASS="gtv";
char *MYSQL_DB="gtv";
#endif
int CurrentSock;
fd_set osocks;
int highsock;
char *LOGXML;
struct TransferInfo Transfer;
struct sockaddr_in UDPCLIENT;
extern int init();
extern int readconfig(char * filename);
extern const char* find_ip_from_list(unsigned long ip);
extern int findcppeers(unsigned long ip, void *p);
extern int findnettype(const char *servicetype, void* p);
extern void add_cp_to_list(void *p);
extern void remove_cp_from_list(void *p);
extern const char* find_cp_service_type(unsigned long ip);
#define MAX_POLLUTE 1000
int BINDALL;
int Polluted; /* should we call periodLOG? if polluted>MAX_POLLUTE call */
struct Array cfgTS4NP_PORT;
struct Array cfgTS4CP_PORT;
struct Array cfgTS4RM_PORT;
int SnapShotInterval; // make a snap shot every few seconds
float MIN_CLIENT_VERSION; // client version should be bigger than this one.
long long np2tsLoginCount = 0;
long long np2tsResListCount = 0;
long long np2tsReqResCount = 0;
long long np2tsDelResCount = 0;
long long np2tsReportCount = 0;
long long np2tsNeedPeerCount = 0;
long long np2tsLogoutCount = 0;
long long ts2npWelcomeCount = 0;
long long ts2npPeersCount = 0;
long long ts2npConnectToCount = 0;
long long ts2npMsgCount = 0;
/***************************************************
* SessionCluster是有关每种服务的信息. *
* 其中有一个指向该服务相关的各个Session的指针. *
***************************************************/
struct SessionCluster
{
unsigned int port[MAX_BIND];
unsigned int maxbuf;
int sock[MAX_BIND]; // 描述所使用的socket
int cur; // current number of sessions 当前客户端的个数
int max; // maximum number of sessions 最多容纳客户端的个数
int maxid; // maxid: maximum session index currently in the list. for optimization of search
struct Session *head; /* pointer to the session pool
head[0]为第一个Session,head[max-1]为最后一个session */
struct Session **hash; // session hash table
int (*process) (struct Session *); //这一服务中消息的处理函数
int (*closure) (struct Session *); //这一服务中需要的析构函数
};
struct Session *GCPCHOICE;
struct Channel *ChannelHash[MAX_CHANNEL];
struct SessionCluster NPTRACKER, CPTRACKER;
#ifdef HAVE_RM
struct SessionCluster RMTRACKER;
#endif
#ifdef SORT_NET
char *NETFN;
#endif
struct NamVal ConfigParameters[]
=
{
#ifdef HAVE_MYSQL
{"MysqlAddress", &MYSQL_HOST, 's'},
{"User", &MYSQL_USER, 's'},
{"Password", &MYSQL_PASS, 's'},
{"Database", &MYSQL_DB, 's'},
#endif
#ifdef SORT_NET
{"Netfile", &NETFN, 's'},
#endif
{"Bind", &LOCALHOST, 's'},
{"TS4NP_PORT", &cfgTS4NP_PORT, 'a'},
{"TS4CP_PORT", &cfgTS4CP_PORT, 'a'},
{"TS4RM_PORT", &cfgTS4RM_PORT, 'a'},
{"SnapShotInterval", &SnapShotInterval, 'd'},
{"ClientVersion", &MIN_CLIENT_VERSION, 'f'},
{"LogFilePath", &LOGXML, 's'},
{"BINDALL", &BINDALL, 'd'}
};
#ifdef DEBUG
#define PDEBUG(fmt, args...) fprintf(stderr, "TS: (%s,%d)" fmt, __FILE__, __LINE__, ## args)
#else
#define PDEBUG(fmt, args...) do {} while (0)
#endif
#ifdef SORT_NET
#define MAX_NET 2048
struct networks
{
unsigned int host;
unsigned int mask;
int net;
} NETBLOCKS[MAX_NET];
unsigned int MASKS[33]={0x0,0x80000000,0xc0000000,0xe0000000,0xf0000000,0xf8000000,0xfc000000,0xfe000000,0xff000000,0xff800000,0xffc00000,0xffe00000,0xfff00000,0xfff80000,0xfffc0000,0xfffe0000,0xffff0000,0xffff8000,0xffffc000,0xffffe000,0xfffff000,0xfffff800,0xfffffc00,0xfffffe00,0xffffff00,0xffffff80,0xffffffc0,0xffffffe0,0xfffffff0,0xfffffff8,0xfffffffc,0xfffffffe,0xffffffff};
// globals
int maxNet; // max id
int readNETBLOCK (char *fname); //用来读入network配置文件
struct networks *getnetwork (unsigned int host, struct networks *head, int n);
int compareNet (const void *a, const void *b);
#endif
struct Message ErrMSG;
#define SEND_NPMSG(sock,msg,code,quit,client) \
{\
ErrMSG.len = 8;\
ErrMSG.type = msg;\
++ts2npMsgCount;\
*(unsigned short *)(ErrMSG.buffer) = code;\
*(unsigned char *)(ErrMSG.buffer+sizeof(short)) = quit;\
sendMessage(sock,(char *)&ErrMSG,client);\
}
int init_ts ();
void my_exit () __attribute__((noreturn, destructor));
void process_child (void);
int init_NP (struct Session *);
int process_NP (int idsock);
int closure_NP (struct Session *);
int process_NP2TS_LOGIN (struct Message *);
int process_NP2TS_REPORT (struct Session *, struct TSMessage *);
int process_NP2TS_REPORT2 (struct Session *, struct TSMessage *);
int process_NP2TS_RES_LIST (struct Session *, struct TSMessage *);
int process_NP2TS_REQ_RES (struct Session *, struct TSMessage *);
int process_NP2TS_DEL_RES (struct Session *, struct TSMessage *);
int process_NP2TS_NEED_PEERS (struct Session *, struct TSMessage *m);
int process_NP2TS_QUERY_RES (struct Session *p, struct TSMessage *m);
int process_NEED_PEERS_real (struct Session *p, char *md5, int needcp, unsigned int cur, unsigned char layer);
int findNPPeers (struct Channel *pc, struct Session *me, int playing, int num, char **buffer, char *buffer1);
int findCPPeers (unsigned long host, char *md5, char **buffer);
int init_CP (struct Session *);
int process_CP (int idsock);
int closure_CP (struct Session *);
int process_CP2TS_UPDATE (struct Session *, struct TSMessage *m);
int process_CP2TS_REGISTER (struct Message *m);
int process_CP2TS_NEED_PEERS (struct Session *p, struct TSMessage *m);
void periodLOG (int s);
void makeSnapShot(int count);
//int memlog(char *pwd,char *cmd);
void freeChannel (struct Channel *p);
int check_valid (struct Edge *e, int play);
int merge (struct Interval *head, int total, struct Interval *_new, int num);
int delete_interval (struct Interval *head, int total, struct Interval *_new, int num);
#ifdef HAVE_RM
int init_RM (struct Session *p);
int process_RM (int idsock);
int closure_RM (struct Session *p);
#endif
int logto_xml (long channelcount, unsigned int totalclient, long long totalstay);
inline int hash_np (int h, int p)
{
int id = (((h & (0xffffffff-MAX_NP+1)) >> 14) ^ (h & (MAX_NP-1)) ^ (p & 0xffff)) & (MAX_NP -1);
return id?id:(MAX_NP-1);
}
inline int hash_cp (int h, int p)
{
int id = (((h & (0xffffffff-MAX_CP+1)) >> 12) ^ (h & (MAX_CP-1)) ^ (p & 0xffff)) & (MAX_CP -1);
return id?id:(MAX_CP-1);
}
inline int hash_str (unsigned char *str, int len)
{
int hash;
for (hash=0; len; len--, str++)
hash += (hash << 5) - hash + (*str);
return hash & (MAX_CHANNEL-1);
}
// process_child: 主要函数, 这一函数主要用来设置socks和wsocks.
void process_child (void)
{
time_t last_update=0, last_snapshot=0;
int snapCount = 0;
int i, readsocks;
struct timeval tm;
fd_set socks, esocks;
startTime = time(NULL);
for (i=0; ; i++)
{
CurTimeSec = time (NULL);
if (CurTimeSec-last_update > MAX_IDLE)
{
periodLOG (1); // close timeout peers
last_update = CurTimeSec;
Polluted = 0; // clear dirty flag
}
if(CurTimeSec - last_snapshot > SnapShotInterval)
{
makeSnapShot(snapCount++); // read it!
system("/usr/bin/vmstat >> ts.log 2>&1 &");
last_snapshot = CurTimeSec;
}
socks = osocks;
esocks = osocks;
tm.tv_sec = 1;
tm.tv_usec = 0;
readsocks = select(highsock+1, &socks, (fd_set *) 0, &esocks, &tm);
if (readsocks <= 0)
continue;
for (i=0; i<MAX_BIND && NPTRACKER.sock[i] != 0; i++)
{
if (FD_ISSET (NPTRACKER.sock[i], &socks))
{
CurrentSock = i;
process_NP (i);
}
// should have a 'break' here to cut off unnecessary check
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -