⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 controller.c

📁 拥有分布式控制编程
💻 C
字号:
#include <sys/types.h>
#include <sys/param.h>
#include <netdb.h>
#include "sig.h"
#include "exit.h"
#include "sgetopt.h"
#include "uint16.h"
#include "fmt.h"
#include "scan.h"
#include "str.h"
#include "ip4.h"
#include "uint16.h"
#include "socket.h"
#include "fd.h"
#include "stralloc.h"
#include "buffer.h"
#include "error.h"
#include "strerr.h"
#include "pathexec.h"
#include "timeoutconn.h"
#include "remoteinfo.h"
#include "dns.h"
#include "tcp.h"

#include "/arbor/mysql-5.1.20-beta-linux-i686-glibc23/include/mysql.h"
#define LENGTH_OF_LISTEN_QUEUE 20
#define BUFFER_SIZE 1024
#define THREAD_MAX    5

#define FATAL "tcpclient: fatal: "
#define CONNECT "tcpclient: unable to connect to "
#define NUM_THREADS 30 /*zsj*/
struct thread_para{
   int  thread_id;
   int collid;
   char collip[50];
   char command[1024];
   int agentport;
   
};
struct thread_para thread_para_array[NUM_THREADS];
char *  strmov (register char *dst, register const char *src) ;
void *run(void *threadarg);
void nomem(void)
{
  strerr_die2x(111,FATAL,"out of memory");
}
void usage(void)
{
  strerr_die1x(100,"tcpclient: usage: tcpclient \
[ -hHrRdDqQv ] \
[ -i localip ] \
[ -p localport ] \
[ -T timeoutconn ] \
[ -l localname ] \
[ -t timeoutinfo ] \
host port program");
}

int verbosity = 1;
int flagdelay = 1;
int flagremoteinfo = 1;
int flagremotehost = 1;
unsigned long itimeout = 26;
unsigned long ctimeout[2] = { 2, 58 };

char iplocal[4] = { 0,0,0,0 };
uint16 portlocal = 0;
char *forcelocal = 0;

char ipremote[4];
uint16 portremote;

char *hostname;
static stralloc addresses;
static stralloc moreaddresses;

static stralloc tmp;
static stralloc fqdn;
char strnum[FMT_ULONG];
char ipstr[IP4_FMT];

char seed[128];

main(int argc,char **argv)
{
  unsigned long u;
  int opt;
  char *x;
  int j;
  int s;
  int cloop;
	unsigned long controllerport;
  dns_random_init(seed);

  close(6);
  close(7);
  sig_ignore(sig_pipe);
 
  while ((opt = getopt(argc,argv,"dDvqQhHrRi:p:t:T:l:")) != opteof)
    switch(opt) {
      case 'd': flagdelay = 1; break;
      case 'D': flagdelay = 0; break;
      case 'v': verbosity = 2; break;
      case 'q': verbosity = 0; break;
      case 'Q': verbosity = 1; break;
      case 'l': forcelocal = optarg; break;
      case 'H': flagremotehost = 0; break;
      case 'h': flagremotehost = 1; break;
      case 'R': flagremoteinfo = 0; break;
      case 'r': flagremoteinfo = 1; break;
      case 't': scan_ulong(optarg,&itimeout); break;
      case 'T': j = scan_ulong(optarg,&ctimeout[0]);
		if (optarg[j] == '+') ++j;
		scan_ulong(optarg + j,&ctimeout[1]);
		break;
      case 'i': if (!ip4_scan(optarg,iplocal)) usage(); break;
      case 'p': scan_ulong(optarg,&u); portlocal = u; break;
      default: usage();
    }
 
  if (!verbosity)
    buffer_2->fd = -1;
    scan_ulong(argv[optind+1],&controllerport); 
  int servfd=GetsocketAndListen(argv[optind],controllerport,1);
  signal(sig_child, SIG_IGN);
	  while(1) //服务器端要一直运行,WaitFor FrontEnd
    {

        //定义客户端的socket地址结构client_addr
        struct sockaddr_in client_addr;
        socklen_t length = sizeof(client_addr);

        //接受一个到server_socket代表的socket的一个连接
        //如果没有连接请求,就等待到有连接请求--这是accept函数的特性
        //accept函数返回一个新的socket,这个socket(new_server_socket)用于同连接到的客户的通信
        //new_server_socket代表了服务器和客户端之间的一个通信通道
        //accept函数把连接到的客户端信息填写到客户端的socket地址结构client_addr中
          sig_unblock(sig_child);
        int new_server_socket = accept(servfd,(struct sockaddr*)&client_addr,&length);
          sig_block(sig_child);
        if ( new_server_socket < 0)
        {
            printf("Server Accept Failed!\n");
            break;
        }
        else
        	{printf("accepted\n");}
         switch(fork()) {
     		 	case 0: //son
     		 		printf("son(%d) closeservfd:\n",(int)getpid());
        	close(servfd);
        	 sig_uncatch(sig_child);
        sig_unblock(sig_child);
        
        sig_uncatch(sig_term);
        sig_uncatch(sig_pipe);
        	//read from new_server_socket ,parse request ,and call collectors
        	char clistr[1024];
        readline(new_server_socket,clistr,1023);
        printf("clistr is %s----",clistr);
        clistr[strlen(clistr)-1]='\0';
         printf("after del tail ,clistr is %s----",clistr);
        int istrcmp=strcmp(clistr,"whoami");
        if(istrcmp!=0){printf("istrcmp is %d\n",istrcmp);break;}
        	struct collectors *collhead=NULL;
					struct collectors *collcur;
				   printf("son:going to getcollectors\n");
						int num_coll= GetCollectors ("0.0.0.0",&collhead);
						collcur=collhead;
						int i=0,rc;
						pthread_t threads[NUM_THREADS];
						pthread_attr_t attr;
						void *status;
						pthread_attr_init(&attr);
					   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
						while(collcur)
						{
							printf("coll : id,ip:%d,%s\n",collcur->collid,collcur->collip);
							thread_para_array[i].thread_id=i;
							thread_para_array[i].collid=collcur->collid;
							strcpy(thread_para_array[i].collip,collcur->collip);
							thread_para_array[i].agentport=collcur->agentport;
							strcpy(thread_para_array[i].command,"Gettopip");
							
							printf("OK 11\n");
							rc=pthread_create(&threads[i], &attr, run, 
					        (void *) &thread_para_array[i]);
					        printf("OK 2\n");
							if (rc){
					         printf("ERROR; return code from pthread_create() is %d\n", rc);
					         exit(-1);
					      }
					      collcur=collcur->next;
							
					      i++;
						}
						pthread_attr_destroy(&attr);
						
						for(i=0;i<num_coll;i++)
						{
							rc = pthread_join(threads[i], &status);
					      if (rc)
					      {
					         printf("ERROR; return code from pthread_join() is %d\n",rc);
					         exit(-1);
					      }
					      printf("Completed join with thread %d status= %ld\n",i, (long)status);
					      	
						}
						// run(sock, peer, twin, t_start, report_sequence, subdir_index, sampling_rate, compress);
											
						pthread_exit(NULL);
						exit(0);
        	break;
        	case -1:
        		exit(-1);
        		break;
        	//default: //father
        		//printf("father(%d): close new socket\n",(int)getpid());
        		//close(new_server_socket);
        		//break;
      	}
      	printf("father(%d): close new socket\n",(int)getpid());
      	close(new_server_socket);
		}


		//if(collcur!=NULL)
	

// CallCollector(2,"192.168.6.66",2008,30,"who",result,1999); 
		



 
}


int CallCollector(int CollID, char* CollIP, int CollPort, int timeoutinseconds, char * command ,void * result, int resultmaxlength)
{
		printf("in callcoll->collid,collip,collport:%d,%s,%d\n",CollID,CollIP,CollPort);
		int servfd=GetsocketAndConn(CollIP,CollPort,30);
		if(servfd==-1)
		return -1;
 		char line[1024];
		strcpy(line,command);
		int strlength=strlen(line);
		line[strlength]='\n';
		line[strlength+1]='\0';
		int ret=writen(servfd,line,strlen(line));
		
		 char ResultfileName[1024];
		 snprintf(ResultfileName,4,"%03d",CollID);
		 strcat(ResultfileName,"_result.txt");
	 	int nBytes;
		FILE *fp;
		fp=fopen(ResultfileName,"a");
	 while(readline(servfd,line,1023))
	 {
	 	 //printf("read line ret is : %s----\n",line);
	 	fprintf(fp,"%s",line);
	 	if(strcmp(line,"End\n")==0)
	 	{
	 			printf("EndN\n");break;
	 	}
	 
	 }
	 fclose(fp);
	
}
int GetsocketAndListen(char *ipaddr,int port,int timeout)
{
	  struct sockaddr_in servaddr;
	  struct sockaddr_in localaddr;
 		struct hostent  *servhost = NULL;
 		int servfd ;
	  if(-1 == (servfd = socket(AF_INET, SOCK_STREAM, 0))) {
		  myerror("tcpclient", "socket() error!\n");
		  return -1;
		 }
		 struct timeval tv;
	tv.tv_sec = 3;  /* send  timeout: 30 Secs Timeout */
	setsockopt(servfd, SOL_SOCKET, SO_SNDTIMEO,(struct timeval *)&tv,sizeof(struct timeval));
		  if (NULL == (servhost = gethostbyname(ipaddr))) {
		  myerror("tcpclient", "Specified host not exist\n");
		  return -1;
		 }
		 bzero(&localaddr, sizeof(localaddr));
		 localaddr.sin_family = AF_INET;
		 localaddr.sin_port = htons(port);
		 localaddr.sin_addr = *(struct in_addr *)servhost->h_addr;
		 int ret=bind (servfd, (struct sockaddr *)&localaddr, sizeof(servaddr));
		 if(ret!=0)
		 {printf("bind local addr error\n");return -1;}
		   if ( listen(servfd, LENGTH_OF_LISTEN_QUEUE) )
    {
        printf("Server Listen Failed!"); 
        return -1;
    }
    	 return servfd;
   
	
	
	
}
int GetsocketAndConn(char *ipaddr,int port,int timeout)
{
	  struct sockaddr_in servaddr;
	  struct sockaddr_in localaddr;
 		struct hostent  *servhost = NULL;
 		int servfd ;
	  if(-1 == (servfd = socket(AF_INET, SOCK_STREAM, 0))) {
		  myerror("tcpclient", "socket() error!\n");
		  return -1;
		 }
		 struct timeval tv;
	tv.tv_sec = 3;  /* 30 Secs Timeout */
	setsockopt(servfd, SOL_SOCKET, SO_SNDTIMEO,(struct timeval *)&tv,sizeof(struct timeval));
		  if (NULL == (servhost = gethostbyname("192.168.6.66"))) {
		  myerror("tcpclient", "Specified host not exist\n");
		  return -1;
		 }
		 bzero(&localaddr, sizeof(localaddr));
		 localaddr.sin_family = AF_INET;
		 localaddr.sin_port = 0;
		 localaddr.sin_addr = *(struct in_addr *)servhost->h_addr;
		 //int ret=bind (servfd, (struct sockaddr *)&localaddr, sizeof(servaddr));
		 //if(ret!=0)
		 //{printf("bind local addr error\n");}

		 printf( "before gethostbyname(relay), ipaddr = %s\n" ,ipaddr);
		 if (NULL == (servhost = gethostbyname(ipaddr))) {
		  myerror("tcpclient", "Specified host not exist\n");
		  return -1;
		 }
		
		 bzero(&servaddr, sizeof(servaddr));
		 servaddr.sin_family = AF_INET;
		 servaddr.sin_port = htons(port);
		 servaddr.sin_addr = *(struct in_addr *)servhost->h_addr;
		 
		 if (0 != connect(servfd, (struct sockaddr *)&servaddr, sizeof(servaddr))) {
		  fprintf(stderr, "connect() error\n");
		  return -1;
		 }
		 return servfd;
}


int GetCollectors (char* RouterIP,struct collectors **collhead)

{
	MYSQL mysql;
	MYSQL_RES *result;
	MYSQL_ROW row;
	
	unsigned int num_fields;
	unsigned int i;
	char query_def[1000];
	char record[1000];
	struct collectors *NewNode,*CurNode;
	int rownum=0;
	printf("GetCollectors\n");
	if(mysql_init(&mysql)==NULL)
	{
	  printf("\nFailed to initate MySQL connection");
	  return 1;
	}
	
	if (!mysql_real_connect(&mysql,"localhost","root","root",NULL,0,NULL,0)) 
	{
	    printf( "Failed to connect to localhost: Error: %s\n", mysql_error(&mysql));
	    return 2;
	}
	if(mysql_select_db(&mysql,"flow")==0)
	    printf( "Database flow Selected\n");
	else
	    printf( "Failed to connect to Database flow: Error: %s\n", mysql_error(&mysql));
	  strcpy(query_def,"SELECT a.collid,a.collip,a.agentport FROM collector a,collassignment b,device c where a.collid=b.collid and b.devitemid=c.devitemid ");
	 if(strcmp(RouterIP,"0.0.0.0")!=0)// if user gives router ip constraints ,
	 {
	 	strcat(query_def,"and c.loopaddress in ('");
	 	strcat(query_def,RouterIP);
	 	strcat(query_def,"')");
	}
		strcat(query_def," group by a.collid,a.collip,a.agentport");
		 printf( "sql query:%s\n",query_def);
	int execsqlret=mysql_exec_sql(&mysql,query_def);
		if(execsqlret==0)
	{
	    printf( "for query_def:%s,%ld Record Found\n",query_def,(long) mysql_affected_rows(&mysql));
	    result = mysql_store_result(&mysql);
	    if (result)  // there are rows
	    {
	        num_fields = mysql_num_fields(result);
	       // printf("fields : %d\n",num_fields);
	        //unsigned long *lengths;
					
	        while ((row = mysql_fetch_row(result))) 
	        { 
	           // lengths = mysql_fetch_lengths(result);
	           printf("row %d info :collid,collname,collIP-> %d,%s,%s",rownum++,atoi(row[0]),row[1]);
	          /* for(i = 0; i < num_fields; i++) 
	           { 
	             printf("[%d,%s] ", (int) lengths[i], row[i] ? row[i] : "NULL"); 
	            }*/ 
	          NewNode=(struct collectors *)malloc(sizeof(struct collectors));
	          NewNode->next=NULL;
	          NewNode->collid=atoi(row[0]);
	          strcpy(NewNode->collip,row[1]);
	           NewNode->agentport=atoi(row[2]);
	           if(*collhead==NULL)
	          {*collhead=NewNode;CurNode=NewNode;printf("ok 3\n");}
	          else
	          {CurNode->next=NewNode;CurNode=NewNode;printf("ok 4\n");}
	           
	          }
	        mysql_free_result(result);
	     }
	     else  // mysql_store_result() returned nothing
	     {
	        if(mysql_field_count(&mysql) > 0)
	        // mysql_store_result() should have returned data
	        {
	            printf( "Error getting records: %s\n", mysql_error(&mysql));
	        }
	    }
	}
	else
	{
	    printf( "Failed to find any records and caused an error: errret:%d,mysqlerrorstr:%s\n",execsqlret, mysql_error(&mysql));
	}
	
	mysql_close(&mysql);
	return rownum;
}

int mysql_exec_sql(MYSQL *mysql,const char *create_definition)
{
   return mysql_real_query(mysql,create_definition,strlen(create_definition));
}

char *strmov(register char *dst, register const char *src)
{
  while ((*dst++ = *src++)) ;
  return dst-1;
}
void *run(void *threadarg)
{
   struct thread_para *my_para;
   int  thread_id;
   int collid;
   char collip[50];
   char command[1024];
   int agentport;
   my_para = (struct thread_para *) threadarg;
   thread_id = my_para->thread_id;
   collid = my_para->collid;
   strcpy(collip ,my_para->collip);
  	strcpy(command,my_para->command);
  	agentport=my_para->agentport;
   printf("thread id is %d\n",thread_id);
   char result[2000];
  CallCollector(collid,collip,agentport,30,command,result,1999); 
		
	pthread_exit((void *) 0);


} /* End of run */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -