📄 controller.c
字号:
#include <sys/types.h>
#include <sys/param.h>
#include <netdb.h>
#include "sig.h"
#include "exit.h"
#include "sgetopt.h"
#include "uint16.h"
#include "fmt.h"
#include "scan.h"
#include "str.h"
#include "ip4.h"
#include "uint16.h"
#include "socket.h"
#include "fd.h"
#include "stralloc.h"
#include "buffer.h"
#include "error.h"
#include "strerr.h"
#include "pathexec.h"
#include "timeoutconn.h"
#include "remoteinfo.h"
#include "dns.h"
#include "tcp.h"
#include "/arbor/mysql-5.1.20-beta-linux-i686-glibc23/include/mysql.h"
#define LENGTH_OF_LISTEN_QUEUE 20
#define BUFFER_SIZE 1024
#define THREAD_MAX 5
#define FATAL "tcpclient: fatal: "
#define CONNECT "tcpclient: unable to connect to "
#define NUM_THREADS 30 /*zsj*/
struct thread_para{
int thread_id;
int collid;
char collip[50];
char command[1024];
int agentport;
};
struct thread_para thread_para_array[NUM_THREADS];
char * strmov (register char *dst, register const char *src) ;
void *run(void *threadarg);
void nomem(void)
{
strerr_die2x(111,FATAL,"out of memory");
}
void usage(void)
{
strerr_die1x(100,"tcpclient: usage: tcpclient \
[ -hHrRdDqQv ] \
[ -i localip ] \
[ -p localport ] \
[ -T timeoutconn ] \
[ -l localname ] \
[ -t timeoutinfo ] \
host port program");
}
int verbosity = 1;
int flagdelay = 1;
int flagremoteinfo = 1;
int flagremotehost = 1;
unsigned long itimeout = 26;
unsigned long ctimeout[2] = { 2, 58 };
char iplocal[4] = { 0,0,0,0 };
uint16 portlocal = 0;
char *forcelocal = 0;
char ipremote[4];
uint16 portremote;
char *hostname;
static stralloc addresses;
static stralloc moreaddresses;
static stralloc tmp;
static stralloc fqdn;
char strnum[FMT_ULONG];
char ipstr[IP4_FMT];
char seed[128];
main(int argc,char **argv)
{
unsigned long u;
int opt;
char *x;
int j;
int s;
int cloop;
unsigned long controllerport;
dns_random_init(seed);
close(6);
close(7);
sig_ignore(sig_pipe);
while ((opt = getopt(argc,argv,"dDvqQhHrRi:p:t:T:l:")) != opteof)
switch(opt) {
case 'd': flagdelay = 1; break;
case 'D': flagdelay = 0; break;
case 'v': verbosity = 2; break;
case 'q': verbosity = 0; break;
case 'Q': verbosity = 1; break;
case 'l': forcelocal = optarg; break;
case 'H': flagremotehost = 0; break;
case 'h': flagremotehost = 1; break;
case 'R': flagremoteinfo = 0; break;
case 'r': flagremoteinfo = 1; break;
case 't': scan_ulong(optarg,&itimeout); break;
case 'T': j = scan_ulong(optarg,&ctimeout[0]);
if (optarg[j] == '+') ++j;
scan_ulong(optarg + j,&ctimeout[1]);
break;
case 'i': if (!ip4_scan(optarg,iplocal)) usage(); break;
case 'p': scan_ulong(optarg,&u); portlocal = u; break;
default: usage();
}
if (!verbosity)
buffer_2->fd = -1;
scan_ulong(argv[optind+1],&controllerport);
int servfd=GetsocketAndListen(argv[optind],controllerport,1);
signal(sig_child, SIG_IGN);
while(1) //服务器端要一直运行,WaitFor FrontEnd
{
//定义客户端的socket地址结构client_addr
struct sockaddr_in client_addr;
socklen_t length = sizeof(client_addr);
//接受一个到server_socket代表的socket的一个连接
//如果没有连接请求,就等待到有连接请求--这是accept函数的特性
//accept函数返回一个新的socket,这个socket(new_server_socket)用于同连接到的客户的通信
//new_server_socket代表了服务器和客户端之间的一个通信通道
//accept函数把连接到的客户端信息填写到客户端的socket地址结构client_addr中
sig_unblock(sig_child);
int new_server_socket = accept(servfd,(struct sockaddr*)&client_addr,&length);
sig_block(sig_child);
if ( new_server_socket < 0)
{
printf("Server Accept Failed!\n");
break;
}
else
{printf("accepted\n");}
switch(fork()) {
case 0: //son
printf("son(%d) closeservfd:\n",(int)getpid());
close(servfd);
sig_uncatch(sig_child);
sig_unblock(sig_child);
sig_uncatch(sig_term);
sig_uncatch(sig_pipe);
//read from new_server_socket ,parse request ,and call collectors
char clistr[1024];
readline(new_server_socket,clistr,1023);
printf("clistr is %s----",clistr);
clistr[strlen(clistr)-1]='\0';
printf("after del tail ,clistr is %s----",clistr);
int istrcmp=strcmp(clistr,"whoami");
if(istrcmp!=0){printf("istrcmp is %d\n",istrcmp);break;}
struct collectors *collhead=NULL;
struct collectors *collcur;
printf("son:going to getcollectors\n");
int num_coll= GetCollectors ("0.0.0.0",&collhead);
collcur=collhead;
int i=0,rc;
pthread_t threads[NUM_THREADS];
pthread_attr_t attr;
void *status;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
while(collcur)
{
printf("coll : id,ip:%d,%s\n",collcur->collid,collcur->collip);
thread_para_array[i].thread_id=i;
thread_para_array[i].collid=collcur->collid;
strcpy(thread_para_array[i].collip,collcur->collip);
thread_para_array[i].agentport=collcur->agentport;
strcpy(thread_para_array[i].command,"Gettopip");
printf("OK 11\n");
rc=pthread_create(&threads[i], &attr, run,
(void *) &thread_para_array[i]);
printf("OK 2\n");
if (rc){
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
collcur=collcur->next;
i++;
}
pthread_attr_destroy(&attr);
for(i=0;i<num_coll;i++)
{
rc = pthread_join(threads[i], &status);
if (rc)
{
printf("ERROR; return code from pthread_join() is %d\n",rc);
exit(-1);
}
printf("Completed join with thread %d status= %ld\n",i, (long)status);
}
// run(sock, peer, twin, t_start, report_sequence, subdir_index, sampling_rate, compress);
pthread_exit(NULL);
exit(0);
break;
case -1:
exit(-1);
break;
//default: //father
//printf("father(%d): close new socket\n",(int)getpid());
//close(new_server_socket);
//break;
}
printf("father(%d): close new socket\n",(int)getpid());
close(new_server_socket);
}
//if(collcur!=NULL)
// CallCollector(2,"192.168.6.66",2008,30,"who",result,1999);
}
int CallCollector(int CollID, char* CollIP, int CollPort, int timeoutinseconds, char * command ,void * result, int resultmaxlength)
{
printf("in callcoll->collid,collip,collport:%d,%s,%d\n",CollID,CollIP,CollPort);
int servfd=GetsocketAndConn(CollIP,CollPort,30);
if(servfd==-1)
return -1;
char line[1024];
strcpy(line,command);
int strlength=strlen(line);
line[strlength]='\n';
line[strlength+1]='\0';
int ret=writen(servfd,line,strlen(line));
char ResultfileName[1024];
snprintf(ResultfileName,4,"%03d",CollID);
strcat(ResultfileName,"_result.txt");
int nBytes;
FILE *fp;
fp=fopen(ResultfileName,"a");
while(readline(servfd,line,1023))
{
//printf("read line ret is : %s----\n",line);
fprintf(fp,"%s",line);
if(strcmp(line,"End\n")==0)
{
printf("EndN\n");break;
}
}
fclose(fp);
}
int GetsocketAndListen(char *ipaddr,int port,int timeout)
{
struct sockaddr_in servaddr;
struct sockaddr_in localaddr;
struct hostent *servhost = NULL;
int servfd ;
if(-1 == (servfd = socket(AF_INET, SOCK_STREAM, 0))) {
myerror("tcpclient", "socket() error!\n");
return -1;
}
struct timeval tv;
tv.tv_sec = 3; /* send timeout: 30 Secs Timeout */
setsockopt(servfd, SOL_SOCKET, SO_SNDTIMEO,(struct timeval *)&tv,sizeof(struct timeval));
if (NULL == (servhost = gethostbyname(ipaddr))) {
myerror("tcpclient", "Specified host not exist\n");
return -1;
}
bzero(&localaddr, sizeof(localaddr));
localaddr.sin_family = AF_INET;
localaddr.sin_port = htons(port);
localaddr.sin_addr = *(struct in_addr *)servhost->h_addr;
int ret=bind (servfd, (struct sockaddr *)&localaddr, sizeof(servaddr));
if(ret!=0)
{printf("bind local addr error\n");return -1;}
if ( listen(servfd, LENGTH_OF_LISTEN_QUEUE) )
{
printf("Server Listen Failed!");
return -1;
}
return servfd;
}
int GetsocketAndConn(char *ipaddr,int port,int timeout)
{
struct sockaddr_in servaddr;
struct sockaddr_in localaddr;
struct hostent *servhost = NULL;
int servfd ;
if(-1 == (servfd = socket(AF_INET, SOCK_STREAM, 0))) {
myerror("tcpclient", "socket() error!\n");
return -1;
}
struct timeval tv;
tv.tv_sec = 3; /* 30 Secs Timeout */
setsockopt(servfd, SOL_SOCKET, SO_SNDTIMEO,(struct timeval *)&tv,sizeof(struct timeval));
if (NULL == (servhost = gethostbyname("192.168.6.66"))) {
myerror("tcpclient", "Specified host not exist\n");
return -1;
}
bzero(&localaddr, sizeof(localaddr));
localaddr.sin_family = AF_INET;
localaddr.sin_port = 0;
localaddr.sin_addr = *(struct in_addr *)servhost->h_addr;
//int ret=bind (servfd, (struct sockaddr *)&localaddr, sizeof(servaddr));
//if(ret!=0)
//{printf("bind local addr error\n");}
printf( "before gethostbyname(relay), ipaddr = %s\n" ,ipaddr);
if (NULL == (servhost = gethostbyname(ipaddr))) {
myerror("tcpclient", "Specified host not exist\n");
return -1;
}
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(port);
servaddr.sin_addr = *(struct in_addr *)servhost->h_addr;
if (0 != connect(servfd, (struct sockaddr *)&servaddr, sizeof(servaddr))) {
fprintf(stderr, "connect() error\n");
return -1;
}
return servfd;
}
int GetCollectors (char* RouterIP,struct collectors **collhead)
{
MYSQL mysql;
MYSQL_RES *result;
MYSQL_ROW row;
unsigned int num_fields;
unsigned int i;
char query_def[1000];
char record[1000];
struct collectors *NewNode,*CurNode;
int rownum=0;
printf("GetCollectors\n");
if(mysql_init(&mysql)==NULL)
{
printf("\nFailed to initate MySQL connection");
return 1;
}
if (!mysql_real_connect(&mysql,"localhost","root","root",NULL,0,NULL,0))
{
printf( "Failed to connect to localhost: Error: %s\n", mysql_error(&mysql));
return 2;
}
if(mysql_select_db(&mysql,"flow")==0)
printf( "Database flow Selected\n");
else
printf( "Failed to connect to Database flow: Error: %s\n", mysql_error(&mysql));
strcpy(query_def,"SELECT a.collid,a.collip,a.agentport FROM collector a,collassignment b,device c where a.collid=b.collid and b.devitemid=c.devitemid ");
if(strcmp(RouterIP,"0.0.0.0")!=0)// if user gives router ip constraints ,
{
strcat(query_def,"and c.loopaddress in ('");
strcat(query_def,RouterIP);
strcat(query_def,"')");
}
strcat(query_def," group by a.collid,a.collip,a.agentport");
printf( "sql query:%s\n",query_def);
int execsqlret=mysql_exec_sql(&mysql,query_def);
if(execsqlret==0)
{
printf( "for query_def:%s,%ld Record Found\n",query_def,(long) mysql_affected_rows(&mysql));
result = mysql_store_result(&mysql);
if (result) // there are rows
{
num_fields = mysql_num_fields(result);
// printf("fields : %d\n",num_fields);
//unsigned long *lengths;
while ((row = mysql_fetch_row(result)))
{
// lengths = mysql_fetch_lengths(result);
printf("row %d info :collid,collname,collIP-> %d,%s,%s",rownum++,atoi(row[0]),row[1]);
/* for(i = 0; i < num_fields; i++)
{
printf("[%d,%s] ", (int) lengths[i], row[i] ? row[i] : "NULL");
}*/
NewNode=(struct collectors *)malloc(sizeof(struct collectors));
NewNode->next=NULL;
NewNode->collid=atoi(row[0]);
strcpy(NewNode->collip,row[1]);
NewNode->agentport=atoi(row[2]);
if(*collhead==NULL)
{*collhead=NewNode;CurNode=NewNode;printf("ok 3\n");}
else
{CurNode->next=NewNode;CurNode=NewNode;printf("ok 4\n");}
}
mysql_free_result(result);
}
else // mysql_store_result() returned nothing
{
if(mysql_field_count(&mysql) > 0)
// mysql_store_result() should have returned data
{
printf( "Error getting records: %s\n", mysql_error(&mysql));
}
}
}
else
{
printf( "Failed to find any records and caused an error: errret:%d,mysqlerrorstr:%s\n",execsqlret, mysql_error(&mysql));
}
mysql_close(&mysql);
return rownum;
}
int mysql_exec_sql(MYSQL *mysql,const char *create_definition)
{
return mysql_real_query(mysql,create_definition,strlen(create_definition));
}
char *strmov(register char *dst, register const char *src)
{
while ((*dst++ = *src++)) ;
return dst-1;
}
void *run(void *threadarg)
{
struct thread_para *my_para;
int thread_id;
int collid;
char collip[50];
char command[1024];
int agentport;
my_para = (struct thread_para *) threadarg;
thread_id = my_para->thread_id;
collid = my_para->collid;
strcpy(collip ,my_para->collip);
strcpy(command,my_para->command);
agentport=my_para->agentport;
printf("thread id is %d\n",thread_id);
char result[2000];
CallCollector(collid,collip,agentport,30,command,result,1999);
pthread_exit((void *) 0);
} /* End of run */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -