📄 cptest.c
字号:
/*
* Openmysee
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
*/
//#define HAVE_MYSQL 1
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <ctype.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <assert.h>
#include <syslog.h>
#include <string.h>
#include <time.h>
#include "ProtocolDefine.h"
#include "StructDefine.h"
#define MAX_DATA 100000
#define MAX_LINE 1024
#define MAX_IDLE 300
#define SILENCE_TIME 2
#define SP4CP_PORT 50001
int DELAY=0;
int timer=1;
float PROB=1.0;
struct Message
{
int len;
unsigned char type;
char buffer[MAX_DATA];
} UDPMsg;
unsigned short P2P_PORT=23;
char *CPSERVER="166.111.118.6";
unsigned long long TOTALDATA;
unsigned long long TOTALDATASIZE;
unsigned long long TOTALR;
unsigned long long TOTALW;
unsigned long long BYTES_READ;
unsigned long long BYTES_WRITE;
int MaxClient = 50;
int Verbosity;
int Current;
int process (char *md5, int latest);
void settimer (int sig)
{
timer = 0;
}
struct hostent * init_sockaddr (struct sockaddr_in *name, char *host, unsigned int port)
{
struct hostent *h;
unsigned short sp = port;
memset (name, 0, sizeof (*name));
name->sin_family = PF_INET;
name->sin_port = htons (sp);
h = gethostbyname (host);
if (h == (struct hostent *)0)
{
perror ("gethostbyname");
return (struct hostent *)0;
}
name->sin_addr = *(struct in_addr *)h->h_addr;
return h;
}
int my_connect (char *host, int port)
{
struct sockaddr_in client;
struct hostent *h;
int connection = socket (PF_INET, SOCK_STREAM, 0);
if ((connection < 0) || ((h = init_sockaddr (&client, host, port)) == (struct hostent *)0))
{
perror ("socket||gethostbyname");
return -1;
}
if (connect (connection, (struct sockaddr *) &client, sizeof (client)) < 0)
{
perror ("connect");
return -1;
}
return connection;
}
int maxID=11286, maxCurrentID;
unsigned int *maxBuf;
unsigned int *fdBuf;
int main(int argc, char **argv)
{
char *md5="13e7f8397575944c2ca1cb113531d758";
int c;
signal (SIGINT, settimer);
while ((c = getopt (argc, argv, "vD:s:m:hc:T:t:p:")) != -1)
{
switch (c)
{
case 'D':
DELAY = atoi (optarg);
break;
case 'v':
Verbosity = 1;
break;
case 's': // channel id of start
md5 = optarg;
break;
case 'm':
maxID = atoi(optarg);
break;
case 'h':
fprintf (stdout, "%s [-T timer_to_leave] [-D delay] [-s channelid] [-h] [-c numofclient] [-t spserver_ip] [-p port] [-m maxid] [-v]\n", argv[0]);
exit (0);
case 'c':
MaxClient = atoi (optarg);
break;
case 'T':
alarm (atoi (optarg));
signal (SIGALRM, settimer);
break;
case 't':
CPSERVER = optarg;
break;
case 'p':
P2P_PORT = atoi (optarg);
break;
default:
fprintf (stdout, "%s [-T timer_to_leave] [-D delay] [-s channelid] [-h] [-c numofclient] [-t spserver_ip] [-p port] [-m maxid] [-v]\n", argv[0]);
exit (0);
}
}
if (maxID > 0)
{
maxBuf = calloc (maxID, sizeof (int));
fdBuf = calloc (maxID, sizeof (int));
}
fprintf (stdout, "maxID is %d\n", maxID);
signal (SIGPIPE, SIG_IGN);
process (md5, 0);
return 0;
}
int writeMessage (int sock, char *ptr)
{
int len = *(int *)ptr;
if (write (sock, ptr, len) != len)
return -1;
TOTALW ++;
BYTES_WRITE += len;
return len;
}
struct sockInfo
{
int sockfd;
int reged;
int current;
unsigned int delay;
int status;
int start;
int len;
char buffer[MAX_DATA];
};
void process_reader (struct sockInfo *sockpool, int i, int *isfull, fd_set *osock)
{
struct timeval tm;
struct SPUpdate *s;
struct Message *msg;
int nlen, start, offset;
nlen = read (sockpool[i].sockfd, sockpool[i].buffer+sockpool[i].start+sockpool[i].len, MAX_DATA - sockpool[i].len - sockpool[i].start);
if (nlen < 0)
{
fprintf (stderr, "Error in recv from %d:%d\n", i, sockpool[i].sockfd);
FD_CLR (sockpool[i].sockfd, osock);
close (sockpool[i].sockfd);
sockpool[i].reged = -1;
return;
} else if (nlen == 0)
{
fprintf (stderr, "SP closed %d:%d\n", i, sockpool[i].sockfd);
FD_CLR (sockpool[i].sockfd, osock);
close (sockpool[i].sockfd);
sockpool[i].reged = -1;
return;
}
BYTES_READ += nlen;
sockpool[i].len += nlen;
while (sockpool[i].len > 0)
{
msg = (struct Message *)(sockpool[i].buffer + sockpool[i].start);
if (sockpool[i].len < sizeof (int) || sockpool[i].len < msg->len)
break;
switch (msg->type)
{
case P2P_RESPONSE:
// sockpool[i].status = 0;
if (*(int *)(msg->buffer+sizeof(int)) > 0)
{
TOTALDATA ++;
TOTALDATASIZE += msg->len;
sockpool[i].delay = DELAY;
if (maxBuf)
{
maxBuf[*(int *)(msg->buffer)] = 2;
fdBuf[*(int *)(msg->buffer)] = i;
}
} else
{
if (maxBuf)
maxBuf[*(int *)(msg->buffer)] = 0;
if (DELAY > 0)
{
tm.tv_sec = 0;
tm.tv_usec = sockpool[i].delay;
sockpool[i].delay = sockpool[i].delay * 2;
select (1,NULL, NULL, NULL, &tm);
}
}
break;
case P2P_SPUPDATE:
TOTALR ++;
s = (struct SPUpdate *)(msg->buffer);
if (maxID == 0)
{
maxCurrentID = s->maxBlockID;
// maxBuf = calloc (maxID, sizeof (int));
}
break;
}
sockpool[i].len -= msg->len;
sockpool[i].start += msg->len;
}
if (sockpool[i].len == 0) sockpool[i].start = 0;
else if (sockpool[i].start > 0 && sockpool[i].start +sockpool[i].len > MAX_DATA/2)
{
for (start = 0, offset=sockpool[i].start; start<sockpool[i].len; start++, offset++)
sockpool[i].buffer[start] = sockpool[i].buffer[offset];
sockpool[i].start = 0;
}
}
#define MAX_PUSH 5
void process_writer (struct sockInfo *sockpool, int i, int *isfull, fd_set *osock)
{
int count;
int blocks[MAX_PUSH];
int numb = 0;
int cur;
char *buf;
// if (sockpool[i].status != 0) return;
if (maxID>0)
{
*isfull = 1;
for (count=0,cur=Current; count<maxID && numb < MAX_PUSH; cur++, count++)
{
if (cur >= maxID) cur = 0;
switch (maxBuf[cur])
{
case 0:
blocks[numb] = cur;
numb ++;
break;
case 1:
*isfull = 0;
break;
}
}
if (count >= maxID)
{
// if (TOTALDATA == 9960) fprintf (stdout, "No request now\n");
return;
}
} else
{
if (sockpool[i].current + 1 <= maxCurrentID)
cur=sockpool[i].current + 1;
else return;
}
// sockpool[i].status = 1;
Current = cur;
// fprintf (stdout, "Request %d %d block\n", maxID, cur);
UDPMsg.type = P2P_PUSHLIST;
buf = UDPMsg.buffer;
*(char *)buf = 0;
buf += sizeof (char);
if (maxID > 0)
{
*(char *)buf = numb;
buf += sizeof (char);
for (cur=0; cur<numb; cur++)
{
*(int *)buf = blocks[cur];
buf += sizeof (int);
maxBuf[blocks[cur]] = 1;
fdBuf[blocks[cur]] = i;
}
} else
{
*(char *)buf = 1;
buf += sizeof (char);
*(int *)buf = cur;
buf += sizeof (int);
}
*(char *)buf = 0;
buf += sizeof (char);
UDPMsg.len = buf - UDPMsg.buffer + sizeof (int)+sizeof(char);
if (writeMessage (sockpool[i].sockfd, (char *)&UDPMsg) < 0)
{
fprintf (stdout, "Cannot write to sock %d, block id is %d\n", sockpool[i].sockfd, cur);
sockpool[i].reged = -1;
}
sockpool[i].current = cur;
*isfull = 0;
}
int process (char *md5, int latest)
{
struct timeval tmnew;
fd_set wsock, rsock, osock;
int nread, i, maxsock = 0;
struct sockInfo *sockpool = calloc (MaxClient, sizeof (struct sockInfo));
char *buf;
int cur;
int isfull = 0;
long long begin, now;
double elapsed=0;
struct NormalAddress SPADDR;
struct PeerInfoWithAddr myaddr;
SPADDR.sin_family = AF_INET;
SPADDR.sin_port = htons (SP4CP_PORT);
SPADDR.sin_addr.s_addr = inet_addr (CPSERVER);
memset (&myaddr, 0, sizeof (myaddr));
myaddr.b.outerIP.sin_family = AF_INET;
myaddr.b.outerIP.sin_port = htons (SP4CP_PORT);
myaddr.b.outerIP.sin_addr.s_addr = inet_addr ("166.111.215.87");
FD_ZERO (&osock);
for (i=0; i<MaxClient; i++)
{
if ((sockpool[i].sockfd = my_connect (CPSERVER, P2P_PORT)) < 0)
{
fprintf (stderr, "Error in my_connect %d.\n", i);
exit (1);
}
if (sockpool[i].sockfd > maxsock)
maxsock = sockpool[i].sockfd;
sockpool[i].reged = -1;
memset (&UDPMsg, 0, sizeof (struct Message));
UDPMsg.type = P2P_HELLO;
buf = UDPMsg.buffer + sizeof (float);
memcpy (buf, md5, MD5_LEN);
buf += MD5_LEN;
*(unsigned char *)buf = 0;
buf += sizeof (char);
memcpy (buf, &myaddr, sizeof (myaddr));
buf += sizeof (myaddr);
*(unsigned char *)buf = 1;
buf += sizeof (char);
memcpy (buf, &SPADDR, sizeof (struct NormalAddress));
buf += sizeof (SPADDR);
UDPMsg.len = buf - UDPMsg.buffer + sizeof (int) + sizeof(char);
if (writeMessage (sockpool[i].sockfd, (char *)&UDPMsg) >= 0)
{
sockpool[i].current = -1;
sockpool[i].start = 0;
sockpool[i].len = 0;
sockpool[i].reged = 0;
} else
{
fprintf (stdout, "Cannot write register info to sock %d", sockpool[i].sockfd);
}
}
gettimeofday (&tmnew, NULL);
begin = ((long long)tmnew.tv_sec)*1000000l + tmnew.tv_usec;
while (isfull == 0 && timer == 1)
{
FD_ZERO (&rsock);
FD_ZERO (&wsock);
for (i=0; i<MaxClient; i++)
{
if (sockpool[i].reged >= 0)
{
FD_SET (sockpool[i].sockfd, &rsock);
// if (sockpool[i].status == 0)
FD_SET (sockpool[i].sockfd, &wsock);
}
}
if ((nread = select (maxsock+1, &rsock, &wsock, NULL, NULL)) <= 0)
continue;
for (i=0; i<MaxClient; i++)
{
if (FD_ISSET (sockpool[i].sockfd, &rsock))
process_reader (sockpool, i, &isfull, &osock);
if (sockpool[i].reged >= 0 && sockpool[i].sockfd > 0 && FD_ISSET (sockpool[i].sockfd, &wsock))
process_writer (sockpool, i, &isfull, &osock);
}
if (Verbosity)
{
gettimeofday (&tmnew, NULL);
now = ((long long)tmnew.tv_sec) * 1000000l + tmnew.tv_usec;
if (now-begin > elapsed + 2000000l)
{
elapsed = now - begin;
fprintf (stdout, "%lld: total %lld write, %lld read, %lld bytes write, %lld bytes read, %lld blocks and %lld data.\n", now-begin, TOTALW, TOTALR, BYTES_WRITE, BYTES_READ, TOTALDATA, TOTALDATASIZE);
fprintf (stdout, "avg read packets/s: %f; avg write packets/s: %f\navg read Mb/s: %f; avg write Mb/s %f\ntotal packets/s: %f; total Mb/s %f.\n", TOTALR/(elapsed/1000000), TOTALW/(elapsed/1000000), BYTES_READ*8/elapsed, BYTES_WRITE*8/elapsed, (TOTALR+TOTALW)/(elapsed/1000000), (BYTES_READ+BYTES_WRITE)*8/elapsed);
}
}
}
gettimeofday (&tmnew, NULL);
now = ((long long)tmnew.tv_sec) * 1000000l + tmnew.tv_usec;
elapsed = now - begin;
fprintf (stdout, "%lld: total %lld write, %lld read, %lld bytes write, %lld bytes read, %lld blocks and %lld data.\n", now-begin, TOTALW, TOTALR, BYTES_WRITE, BYTES_READ, TOTALDATA, TOTALDATASIZE);
fprintf (stdout, "avg read packets/s: %f; avg write packets/s: %f\navg read Mb/s: %f; avg write Mb/s %f\ntotal packets/s: %f; total Mb/s %f.\n", TOTALR/(elapsed/1000000), TOTALW/(elapsed/1000000), BYTES_READ*8/elapsed, BYTES_WRITE*8/elapsed, (TOTALR+TOTALW)/(elapsed/1000000), (BYTES_READ+BYTES_WRITE)*8/elapsed);
if (maxID>0)
{
for (cur=0; cur<maxID; cur++)
{
if (maxBuf[cur] == 1 || maxBuf[cur] == 0)
fprintf(stdout, "(%d %d %d)\t", cur, maxBuf[cur], fdBuf[cur]);
}
fprintf(stdout, "\n");
}
for (i=0; i<MaxClient; i++)
{
if (sockpool[i].reged >= 0)
close (sockpool[i].sockfd);
}
return 0;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -