📄 cpnew.c
字号:
/*
* Openmysee
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
*/
#include "echo.h"
#define TYPE_NP 0
#define TYPE_CP 1
#define TYPE_SCP 2
#define TYPE_ECP 3
#define TYPE_GCP 4
#define MEDIATYPE_FIRST 0.149
#define MAX_IDLE 90 /* destruct a connection to sp/np after 90s */
//#define P2PS_PORT 50002
#define TS4CP_PORT 22168
#define REQUEST_AHEAD 15 /* Request how many blocks if a block doesn't exist in cache */
int MAX_P2PS=500;
int MAX_P2PC=512;
int MAX_JOB_PER_SESSION=3;
//char *CONFIG = "./acp.cfg";
char *PREFIX="/data/cp/";
char *SCP_CHANNEL;
char *ECP_REGION;
int isGCP;
char *SERVERIP;
char *BINDIP;
char *PIDFile="/var/run/cpnew.pid";
int MAX_BANDWIDTH;
char *AUTH_MD5;
int AUTH_USERID;
fd_set osocks;
int BINDALL;
#ifdef HAVE_TS
int TSSOCK = 0;
//#define RANDOM_PORT 3947
#endif
time_t CurrentTime;
time_t startTime;
int SnapShotInterval;
int NearPeerInterval = 30;
int isSet = 0; /* whether TS has returned WELCOME message */
struct TSMessage UDPMsg;
extern struct Channel *ChannelHash[MAX_CHANNEL];
extern struct Channel *ChannelList;
struct sockaddr_in TSADDR;
socklen_t addrlen = sizeof (struct sockaddr_in);
// calculate avg & cur speed
long long totalDownBytes=0, totalUpBytes=0;
long long tmpDownBytes=0, tmpUpBytes=0;
struct ServerDesc TRACKER[MAX_TYPE];
char *LOGXML;
extern int errno;
int JobHighWater = 10000;
int MaxNPPerChannel = 300;
int cfgP2PS_PORT = 23;
int cfgCP2TS_PORT = CP2TS_PORT;
struct NamVal ConfigParameters[]
=
{
{"Prefix", &PREFIX, 's'},
{"MAX_NP", &MAX_P2PS, 'd'},
{"MAX_SP", &MAX_P2PC, 'd'},
{"Pidfile", &PIDFile, 's'},
{"TrackerIP", &SERVERIP, 's'},
{"SCP", &SCP_CHANNEL, 's'},
{"GCP", &isGCP, 'd'},
{"authid", &AUTH_USERID, 'd'},
{"authmd5", &AUTH_MD5, 's'},
{"ECP", &ECP_REGION, 's'},
{"BandWidth", &MAX_BANDWIDTH, 'd'},
{"BindIP", &BINDIP, 's'},
{"SnapShotInterval",&SnapShotInterval,'d'},
{"CP4NP_PORT", &cfgP2PS_PORT, 'd'},
{"CP2TS_PORT", &cfgCP2TS_PORT, 'd'},
{"LogFilePath", &LOGXML, 's'},
{"NearPeerInterval", &NearPeerInterval, 'd'},
{"JobHighWater", &JobHighWater, 'd'},
{"MaxNPPerChannel", &MaxNPPerChannel, 'd'},
{"BINDALL", &BINDALL, 'd'}
};
int register_cp ();
int init_cp ();
int handle_new_connection(int sock, int type);
int Clientclosure (int listnum, int type);
void process_child (void);
int init_P2PS (int listnum);
int process_P2PS (int listnum);
int closure_P2PS (int listnum);
int init_P2PC (int listnum);
int process_P2PC (int listnum);
int closure_P2PC (int listnum);
int sendMessage (int sock, char *ptr, struct sockaddr_in *dest);
int process_TS2CP_PEERS (char *buf);
void process_type (int type, fd_set *socks, fd_set *wsocks, fd_set *esocks);
int reconnect (struct Session *p);
char *parseECP (char *str, char *buf);
int closure_TS ();
int periodCheck (float KBPSused);
void makeSnapShot(int count, int time_interval);
int send_nearpeers (struct Channel *pc, struct Edge *pme);
int send_nearpeers_toall (struct Channel *pc);
#ifdef HAVE_TS
int process_TS();
#endif
int send_P2P_PUSHLIST (struct Channel *pc, int id);
int period_process (void);
extern char *getJobBuffer (struct JobDes *p, int *max);
extern inline void setblockId (struct JobDes *pj, int id);
#include "sessions.c"
#define INIT_MAXQ(pc,s,maxq) do\
{\
if (s->maxBlockID == 0) return -1;\
if (s->maxBlockID >= MAX_QUEUE) maxq = MAX_QUEUE;\
else maxq = s->maxBlockID;\
if (pc->pcinfo->indisk == NULL)\
{\
pc->pcinfo->max_queue = maxq;\
pc->pcinfo->indisk = calloc (maxq, 1);\
pc->pcinfo->bitflag = calloc ((maxq+7)/8, 1);\
if (pc->pcinfo->indisk == NULL || pc->pcinfo->bitflag == NULL)\
return -1;\
} else if (maxq < pc->pcinfo->max_queue)\
{\
pc->pcinfo->max_queue = maxq;\
pc->pcinfo->indisk = realloc (pc->pcinfo->indisk, maxq);\
pc->pcinfo->bitflag = realloc (pc->pcinfo->bitflag, (maxq+7)/8);\
if (pc->pcinfo->indisk == NULL || pc->pcinfo->bitflag == NULL)\
return -1;\
}\
} while (0)
int period_process (void)
{
static time_t last_snapshot;
static int snapCount = 0;
if (CurrentTime - last_snapshot > SnapShotInterval)
{
makeSnapShot(snapCount++, CurrentTime-last_snapshot);
system("/usr/bin/vmstat -a >> cp.log 2>&1 &");
last_snapshot = CurrentTime;
}
return 0;
}
int main(int argc, char **argv)
{
int i, mode = 1, tmp = 0;
if (argc < 2)
{
printf("usage: %s mode(0 for daemon, 1 for console).\n", argv[0]);
return -1;
}
signal (SIGPIPE, SIG_IGN);
signal (SIGINT, terminate);
mode = atoi (argv[1]);
if (mode == 0)
daemon(1,1);
read_config (CONFIG, ConfigParameters, sizeof(ConfigParameters)/sizeof(struct NamVal));
for(i = 0 ; i < argc; ++i)
{
if(strncmp(argv[i], "tcp=", 4) == 0)
{
tmp = atoi(argv[i]+4);
if(tmp > 0 && tmp < 65535)
cfgP2PS_PORT = tmp;
}
else if(strncmp(argv[i], "udp=", 4) == 0)
{
tmp = atoi (argv[i]+4);
if(tmp > 0 && tmp < 65535)
cfgCP2TS_PORT = tmp;
}
}
for (i=0; i<10 && IN_LOOP > 0; i++)
{
/*
pid_t pid;
if ((pid = fork ()) == 0)
{
*/
FD_ZERO(&osocks);
if (init_cp () < 0)// || initLOG () < 0)
{
PDEBUG ("init_cp error, exit...\n");
exit (-1);
}
process_child ();
/*
} else if (pid < 0)
{
perror ("fork");
exit (pid);
} else
{
waitpid (pid, NULL, 0);
}
*/
}
return 0;
}
int init_P2PS (int listnum)
{
return 0;
}
int process_P2P_HELLO (struct Session *p, struct Message *m)
{
struct Edge *pedge = NULL;
float version = *(float *)(m->buffer);
char *buf = m->buffer + sizeof (float);
struct Channel *pc;
struct LiveChannelInfo *pcinfo;
int listnum;
// char *buf, buffer[MAX_DATA];
PINFO ("RECV P2P_HELLO. \n");
p->version = version;
listnum = p - TRACKER[TYPE_P2PS].head;
if ((pc = findChannel (buf, MD5_LEN)) == NULL)
{
if ((pc = newLiveChannel (buf, NULL, buf, 0, 0)) != (struct Channel *)0)
{
pedge=newEdge (pc, p);
pc->numclient ++;
} else
{
Clientclosure (listnum, TYPE_P2PS);
return -1;
}
p->pc = pc;
} else
{
if (pc->numclient > MaxNPPerChannel)
{
Clientclosure (listnum, TYPE_P2PS);
return -1;
}
for (pedge=p->header; pedge && pedge->head != pc; pedge=pedge->enext);
if (pedge == NULL)
{
pedge=newEdge (pc, p);
pc->numclient ++;
}
p->pc = pc;
if (pc->pcinfo->dataSource)
{
/*
// connect exist, send p2p_hello to check channel state.
buf = buffer+sizeof (int);
*(unsigned char *)buf = P2P_HELLO;
buf += sizeof (char);
memcpy (buf, pc->channel_md5, MD5_LEN);
buf += MD5_LEN;
*(unsigned char *)buf = 0;
buf += sizeof (char);
*(int *)buffer = buf - buffer;
if (writeMessage (pc->pcinfo->dataSource, buffer) < 0)
{
Clientclosure (listnum, TYPE_P2PC);
return -1;
}
PDEBUG("sent P2P_HELLO to SP. \n");
*/
buf += MD5_LEN + sizeof (char);
memcpy (&(p->addr), buf, sizeof (p->addr));
if (pedge) send_nearpeers (pc, pedge);
return 0;
}
PDEBUG("NO connection to SP. try connect.\n");
}
buf += MD5_LEN + sizeof (char);
memcpy (&(p->addr), buf, sizeof (p->addr));
buf += sizeof (p->addr);
pcinfo = pc->pcinfo;
pcinfo->numofsp = (unsigned char)*buf;
buf += sizeof (char);
if (pcinfo->numofsp == 0 || pcinfo->numofsp > MAX_REPSP)
{
Clientclosure (listnum, TYPE_P2PS);
return -1;
}
memcpy (&(pcinfo->SPLIST), buf, pcinfo->numofsp*sizeof(struct NormalAddress));
buf += pcinfo->numofsp*sizeof (struct NormalAddress);
if (buf - m->buffer + NORMAL_HEADER > m->len)
{
PDEBUG ("Invalid message %d, length %d not enough\n", m->type, m->len);
Clientclosure (listnum, TYPE_P2PS);
return -1;
}
if (isGCP || SCP_CHANNEL)
{
if ((listnum = newChannel (pc, pcinfo->SPLIST, pcinfo->numofsp, 0)) < 0)
{
PDEBUG("Unable connect to SP.\n");
Clientclosure (p-TRACKER[TYPE_P2PS].head, TYPE_P2PS);
return -1;
}
} else /* ECP, need peers */
{
#ifdef HAVE_TS
UDPMsg.type = CP2TS_NEED_PEERS;
UDPMsg.len = 12+MD5_LEN;
memcpy (UDPMsg.buffer, pc->channel_md5, MD5_LEN);
/* isSet whether TS has returned WELCOME message */
if (isSet && sendMessage (TSSOCK, (char *)&UDPMsg, &TSADDR) < 0)
{
PDEBUG ("exit...\n");
exit (1);
}
#endif
}
if (pedge) send_nearpeers (pc, pedge);
return 0;
}
int process_P2P_MEDIATYPE (int listnum, struct Message *m)
{
struct Channel *pc;
char *buf, *media, *name, *channel_name;
int start, length, size, proglen, chnllen;
buf = m->buffer;
if ((pc = findChannel (buf, MD5_LEN)) == NULL)
return -1;
buf += MD5_LEN;
start = *(int *)buf;
buf += sizeof (int);
length = *(int *)buf;
buf += sizeof (int);
size = *(int *)buf;
buf += sizeof (int);
media = buf;
buf += size;
proglen = *(unsigned char *)buf;
buf += sizeof (char);
name = buf;
buf += proglen;
buf += sizeof (int);
chnllen = *(unsigned char *)buf;
buf += sizeof (char);
channel_name = buf;
buf += chnllen;
if (buf - (char *)m > m->len)
return -1;
addMedia (pc, start, length, size, media, name, channel_name);
return 0;
}
int process_P2P_PUSHLIST (struct Session *p, struct Message *m)
{
struct Channel *pc;
struct Edge *pedge;
char *buf;
int i, type, listnum, size;
listnum = p - TRACKER[TYPE_P2PS].head;
buf = m->buffer;
if (p->npcp == TYPE_CP)
{
if ((pc = findChannel (buf, MD5_LEN)) == NULL)
{
if ((pc = newLiveChannel (m->buffer, NULL, m->buffer, 0, 0)) != (struct Channel *)0)
{
pedge=newEdge (pc, p);
pc->numclient ++;
}
} else
{
for (pedge=p->header; pedge && pedge->head != pc; pedge=pedge->enext);
if (pedge == NULL)
{
pedge=newEdge (pc, p);
pc->numclient ++;
}
}
buf += MD5_LEN;
} else
{
pc = p->pc;
}
if (pc == NULL && p->npcp != TYPE_CP)
{
Clientclosure (listnum, TYPE_P2PS);
return -1;
}
if (p->numjob >= MAX_JOB_PER_SESSION)
return -2;
type = *(unsigned char *)buf;
buf += sizeof (char);
size = *(unsigned char *)buf;
buf += sizeof (char);
if (type)
{
deleteChannel (p, pc);
for (i=0; i<size; i++)
if (process_P2P_REQUEST_real (p, pc, ((unsigned int *)buf)[i]) < 0)
return -1;
} else
{
for (i=0; i<size; i++)
if (process_P2P_REQUEST_real (p, pc, ((unsigned int *)buf)[i]) < 0)
return -1;
buf += size*sizeof(int);
size = *(unsigned char *)buf;
buf += sizeof (char);
deleteJob (p, pc, (unsigned int *)buf, size);
}
if (buf - m->buffer + NORMAL_HEADER > m->len)
{
PDEBUG ("Invalid message %d, length %d not enough\n", m->type, m->len);
Clientclosure (listnum, TYPE_P2PS);
return -1;
}
return 0;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -