📄 collector.c
字号:
} /* Linux has a buffering problem. Maybe mixing fgets() and */ /* getchar() doesn't work? This fixes it anyway. */#ifdef _HARVEST_LINUX_ setbuf(fp, NULL);#endif return (fp);}void COL_Close_Read_Pipe(fp) FILE *fp;{ (void) fclose(fp); /* shutdown pipe to gather */ (void) close(To_Gatherer[0]);}/* ----------------------------------------------------------------- * * COL_Init_Connect() -- initialize connection to Gatherer/Broker * ----------------------------------------------------------------- */char *COL_Init_Connect(hostname, port, type, lupdate, queri) char *hostname; int port; short type; time_t lupdate; char *queri;{ int csock; struct sockaddr_in coll; struct hostent *hp; char pstr[BUFSIZ]; FILE *fp; static char *ret; LOGCOLLECT(hostname, port, lupdate); if (type < BAFULL_U) { /* Do a collection from a Gatherer */ Log("Collecting from the Gatherer at %s:%d.\n", hostname, port); pstr[0] = '\0'; switch (type) { case UFULL_U: Log("Type 0: Full collection each time, without data compression.\n"); sprintf(pstr, "%s -nocompress %s %d 0", Gather, hostname, port); break; case UPARTIAL_U: Log("Type 1: Incremental collection, without data compression (since %d).\n", lupdate); sprintf(pstr, "%s -nocompress %s %d %d", Gather, hostname, port, lupdate); break; case CFULL_U: Log("Type 2: Full collection each time, with data compression.\n"); sprintf(pstr, "%s %s %d 0", Gather, hostname, port); break; case CPARTIAL_U: Log("Type 3: Incremental collections, with data compression (since %d).\n", lupdate); sprintf(pstr, "%s %s %d %d", Gather, hostname, port, lupdate); break; } if ((fp = COL_Create_Read_Pipe(pstr)) == NULL) { errorlog("Collector: Pipe failed.\n"); return NULL; } ret = (char *) fp; return (ret); } /* Do a collection from a Broker */ Log("Collecting from the Broker at %s:%d.\n", hostname, port); if ((csock = socket(AF_INET, SOCK_STREAM, 0)) < 0) { log_errno("socket"); return NULL; } coll.sin_family = AF_INET; if ((hp = gethostbyname(hostname)) == NULL) { log_errno("gethostbyname"); errorlog("%s Unknown host %s\n", COLLECT, hostname); return NULL; } memcpy((char *) &coll.sin_addr, (char *) hp->h_addr, hp->h_length); coll.sin_port = htons(port); if (connect(csock, (struct sockaddr *) &coll, sizeof(coll)) < 0) { log_errno("connect"); return NULL; } ret = COL_Get_BInput(csock, queri, type, lupdate); close(csock); return (ret);}/* ----------------------------------------------------------------- * * COL_Do_Collection -- do a collection from the list in the * Collection.conf file * ----------------------------------------------------------------- */int COL_Do_Collection(){ FILE *g_file; char host[MAXHOSTNAMELEN + 1], queri[BUFSIZ], mspace[BUFSIZ]; char *ret; int port, do_indexing = 0; short type; time_t lupdate = 0; if ((g_file = fopen(ColConfig, "r")) == NULL) { errorlog("Collector: Cannot read collections configuration file: %s\n", ColConfig); return ERROR; } Log("Starting collections...\n"); if (do_IND_Index_Start() == ERROR) { errorlog("Collector: Failed to start indexer.\n"); (void) fclose(g_file); return ERROR; } memset(mspace, '\0', BUFSIZ); while (fgets(mspace, BUFSIZ, g_file) != NULL) { if (mspace[0] == '#') continue; memset(host, '\0', MAXHOSTNAMELEN + 1); memset(queri, '\0', BUFSIZ); /* * We read the Collection.conf file to get the collection * points. For Gatherer collections, queri is: "--". * For Broker collections 6 & 7, queri is: * "--FLAGS the flags --QUERY the query" */ if (sscanf(mspace, "%s %d %hd %[^\n]s\n", host, &port, &type, queri) != 4) continue; new_nobjs = up_nobjs = del_nobjs = ref_nobjs = recv_nobjs = ign_nobjs = 0; lupdate = COL_get_last_update(host, port); max_update_time = lupdate; ret = COL_Init_Connect(host, port, type, lupdate, queri); if (ret != NULL) { (void) P_parse_input(ret, type); } else { (void) COL_reset_update(host, port, lupdate); } /* If we did something, then update the update time */ if (new_nobjs > 0 || up_nobjs > 0 || del_nobjs > 0 || ref_nobjs > 0) { (void) COL_put_last_update(host, port, max_update_time); do_indexing = 1; } Log("Finished collection - received %d objects (added %d, updated %d, deleted %d, refreshed %d, ignored %d).\n", recv_nobjs, new_nobjs, up_nobjs, del_nobjs, ref_nobjs, ign_nobjs); } (void) fclose(g_file); if (do_indexing) { /* Should do any/all indexing now */ if (do_IND_Index_Flush() == ERROR) { errorlog("Collector: Index flush failed.\n"); return ERROR; } /* Compute statistics */ if (AD_do_stats() == ERROR) { errorlog("Collector: Generate stats failed.\n"); return ERROR; } } (void) RG_Sync_Registry(); Log("Finished collections.\n"); return SUCCESS;}/* * parse_bulk_query - Parses the Query in Collection.conf. Looks like * "--FLAGS the flags --QUERY the query" */static void parse_bulk_query(q, qf, qe) char *q, *qf, *qe;{ char *s; if (!strcmp(q, "--")) return; if ((s = strstr(q, "--FLAGS")) != NULL) /* Grab flags */ strcpy(qf, s + strlen("--FLAGS")); if ((s = strstr(q, "--QUERY")) != NULL) /* Grab expresion */ strcpy(qe, s + strlen("--QUERY")); if ((s = strstr(qe, "--FLAGS")) != NULL) /* Strip flags */ *s = '\0'; if ((s = strstr(qf, "--QUERY")) != NULL) /* Strip expres. */ *s = '\0';}/* ----------------------------------------------------------------- * COL_Get_BInput() -- Do a Broker-to-Broker xfer * ----------------------------------------------------------------- */char *COL_Get_BInput(csock, queri, type, when) int csock; char *queri; short type; time_t when;{ static char *tfile; /* temporary file for xfer */ char ibuf[BUFSIZ], command[BUFSIZ]; char qe[BUFSIZ], qf[BUFSIZ]; FILE *fp, *i_file; int n; Debug(71, 1, ("Begin COL_Get_BInput...\n")); memset(command, '\0', BUFSIZ); memset(qe, '\0', BUFSIZ); memset(qf, '\0', BUFSIZ); /* Skip the beginning '--' in the query */ if (strlen(queri) <= 2 && ((type == BQFULL_U) || (type == BQPARTIAL_U))) { errorlog("Collector: Cannot send null query: '%s'.\n", queri); return NULL; } parse_bulk_query(queri, qf, qe); if (type == BAFULL_U) { Log("Type 4: Full collection each time, without data compression.\n"); sprintf(command, "#BULK #SINCE 0 #END #ALLB"); } else if (type == BAPARTIAL_U) { Log("Type 5: Incremental collections, without data compression (since %d).\n", when); sprintf(command, "#BULK #SINCE %d #END #ALLB", when); } else if (type == BQFULL_U) { Log("Type 6: Collection based on a query, without data compression.\n"); sprintf(command, "#BULK #SINCE 0 %s #END %s", qf, qe); } else if (type == BQPARTIAL_U) { Log("Type 7: Incremental based on a query, without data compression (since %d).\n", when); sprintf(command, "#BULK #SINCE %d %s #END %s", when, qf, qe); } else { errorlog("Illegal collection type %d\n", type); return NULL; } if (write(csock, command, strlen(command)) < 0) { log_errno("write"); return NULL; } tfile = xstrdup(tempnam(NULL, "bgat")); if ((i_file = fopen(tfile, "a+")) == NULL) { errorlog("%s%s\n", COLLECT, OPEN_ERR); xfree(tfile); return NULL; } memset(ibuf, '\0', BUFSIZ); if ((fp = fdopen(csock, "r")) == NULL) { log_errno("fdopen"); fclose(i_file); (void) unlink(tfile); xfree(tfile); return NULL; } if (fgets(ibuf, BUFSIZ, fp) == NULL) { errorlog("Collection: Empty response from remote Broker.\n"); fclose(i_file); (void) unlink(tfile); xfree(tfile); return NULL; } /* Check for a version header, and skip it. */ while (strncmp(ibuf, QMVERSION, 3) == 0) { if (fgets(ibuf, BUFSIZ, fp) == NULL) { errorlog("Collection: Null response?\n"); fclose(i_file); (void) unlink(tfile); xfree(tfile); return NULL; } } /* If we don't receive the Bulk succeeded tag, die */ if (strncmp(ibuf, BULK_SUC, 3) != 0) { errorlog("%s\n", ibuf); fclose(i_file); (void) unlink(tfile); xfree(tfile); return NULL; } /* See if they're talking to a Harvest Gatherer, not Broker */ if (!strncmp(ibuf, "000 - HELLO", strlen("000 - HELLO"))) { errorlog("This collection point is to a Harvest GATHERER, not a Broker!!!.\n"); fclose(i_file); (void) unlink(tfile); xfree(tfile); return NULL; } /* Read all of the data into a temporary file */ while ((n = fread(ibuf, 1, BUFSIZ, fp)) > 0) { fwrite(ibuf, 1, n, i_file); } Debug(71, 1, ("Finished COL_Get_BInput...\n")); fclose(fp); fclose(i_file); return (tfile);}/* ----------------------------------------------------------------- * * COL_get_list_update() -- * ----------------------------------------------------------------- */time_t COL_get_last_update(host, port) char *host; int port;{ int lufd; char *filename; time_t ret = 0; int inport, done = 0, n; char inbuf[LUPDATE_SIZE]; filename = UTIL_make_admin_filename("LASTUPDATE"); if ((lufd = open(filename, O_RDONLY | O_CREAT, 0666)) == -1) { errorlog("%s%s :%s:\n", COLLECT, OPEN_ERR, filename); xfree(filename); return (0); } xfree(filename); memset(inbuf, '\0', LUPDATE_SIZE); while (done == 0) { if ((n = read(lufd, inbuf, LUPDATE_SIZE)) < 0) { log_errno("read"); ret = 0; done = 1; } if (n == 0) { ret = 0; done = 1; } if (strncmp(host, inbuf, MAXHOSTNAMELEN) == 0) { inport = atoi(inbuf + MAXHOSTNAMELEN); if (inport == port) { ret = (time_t) atol(inbuf + MAXHOSTNAMELEN + 10); done = 1; } } } close(lufd); return (ret);}/* ----------------------------------------------------------------- * * COL_put_last_update() -- write the update time * ----------------------------------------------------------------- */static int COL_put_last_update(host, port, when) char *host; int port; time_t when;{ int lufd; char *filename; off_t here = 0; int inport, done = 0, n; char inbuf[LUPDATE_SIZE]; filename = UTIL_make_admin_filename("LASTUPDATE"); if ((lufd = open(filename, O_RDWR | O_CREAT, 0666)) == -1) { fprintf(stderr, "%s%s :%s:\n", COLLECT, OPEN_ERR, filename); xfree(filename); return (0); } xfree(filename); while (done == 0) { here = lseek(lufd, 0, SEEK_CUR); memset(inbuf, '\0', LUPDATE_SIZE); if ((n = read(lufd, inbuf, LUPDATE_SIZE)) < 0) { perror("read"); close(lufd); return 0; } if (n == 0) done = 1; if (strncmp(host, inbuf, MAXHOSTNAMELEN) == 0) { inport = atoi(inbuf + MAXHOSTNAMELEN); if (inport == port) done = 1; } } memset(inbuf, '\0', LUPDATE_SIZE); strncpy(inbuf, host, MAXHOSTNAMELEN); sprintf(inbuf + MAXHOSTNAMELEN, "%d", port); sprintf(inbuf + MAXHOSTNAMELEN + 10, "%d", when); if (lseek(lufd, here, SEEK_SET) < 0) { perror("lseek"); return 0; } if (write(lufd, inbuf, LUPDATE_SIZE) < 0) { perror("write"); return 0; } close(lufd); return 1;}/* ----------------------------------------------------------------- * COL_reset_update() - reverts the update time for the collection point * ----------------------------------------------------------------- */int COL_reset_update(host, port, lup) char *host; int port; time_t lup;{ int lufd; char *filename; off_t here = 0; int inport, done = 0, n; char inbuf[LUPDATE_SIZE]; filename = UTIL_make_admin_filename("LASTUPDATE"); if ((lufd = open(filename, O_RDWR | O_CREAT, 0666)) == -1) { errorlog("%s%s :%s:\n", COLLECT, OPEN_ERR, filename); xfree(filename); return ERROR; } xfree(filename); while (done == 0) { here = lseek(lufd, 0, SEEK_CUR); if ((n = read(lufd, inbuf, LUPDATE_SIZE)) < 0) { log_errno("read"); close(lufd); return ERROR; } if (n == 0) { done = 1; } if (strncmp(host, inbuf, MAXHOSTNAMELEN) == 0) { inport = atoi(inbuf + MAXHOSTNAMELEN); if (inport == port) { done = 1; } } } memset(inbuf, '\0', LUPDATE_SIZE); strncpy(inbuf, host, MAXHOSTNAMELEN); sprintf(inbuf + MAXHOSTNAMELEN, "%d", port); sprintf(inbuf + MAXHOSTNAMELEN + 10, "%d", lup); lseek(lufd, here, SEEK_SET); if (write(lufd, inbuf, LUPDATE_SIZE) < 0) { log_errno("write"); } close(lufd); return SUCCESS;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -