⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 collector.c

📁 harvest是一个下载html网页得机器人
💻 C
📖 第 1 页 / 共 2 页
字号:
	}	/* Linux has a buffering problem.  Maybe mixing fgets() and     */	/* getchar() doesn't work?  This fixes it anyway.               */#ifdef _HARVEST_LINUX_	setbuf(fp, NULL);#endif	return (fp);}void COL_Close_Read_Pipe(fp)     FILE *fp;{	(void) fclose(fp);	/* shutdown pipe to gather */	(void) close(To_Gatherer[0]);}/* ----------------------------------------------------------------- * * COL_Init_Connect() -- initialize connection to Gatherer/Broker * ----------------------------------------------------------------- */char *COL_Init_Connect(hostname, port, type, lupdate, queri)     char *hostname;     int port;     short type;     time_t lupdate;     char *queri;{	int csock;	struct sockaddr_in coll;	struct hostent *hp;	char pstr[BUFSIZ];	FILE *fp;	static char *ret;	LOGCOLLECT(hostname, port, lupdate);	if (type < BAFULL_U) {		/* Do a collection from a Gatherer */		Log("Collecting from the Gatherer at %s:%d.\n", hostname, port);		pstr[0] = '\0';		switch (type) {		case UFULL_U:			Log("Type 0: Full collection each time, without data compression.\n");			sprintf(pstr, "%s -nocompress %s %d 0",			    Gather, hostname, port);			break;		case UPARTIAL_U:			Log("Type 1: Incremental collection, without data compression (since %d).\n", lupdate);			sprintf(pstr, "%s -nocompress %s %d %d",			    Gather, hostname, port, lupdate);			break;		case CFULL_U:			Log("Type 2: Full collection each time, with data compression.\n");			sprintf(pstr, "%s %s %d 0", Gather, hostname, port);			break;		case CPARTIAL_U:			Log("Type 3: Incremental collections, with data compression (since %d).\n", lupdate);			sprintf(pstr, "%s %s %d %d", Gather, hostname, port, lupdate);			break;		}		if ((fp = COL_Create_Read_Pipe(pstr)) == NULL) {			errorlog("Collector: Pipe failed.\n");			return NULL;		}		ret = (char *) fp;		return (ret);	}	/* Do a collection from a Broker */	Log("Collecting from the Broker at %s:%d.\n", hostname, port);	if ((csock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {		log_errno("socket");		return NULL;	}	coll.sin_family = AF_INET;	if ((hp = gethostbyname(hostname)) == NULL) {		log_errno("gethostbyname");		errorlog("%s Unknown host %s\n", COLLECT, hostname);		return NULL;	}	memcpy((char *) &coll.sin_addr, (char *) hp->h_addr, hp->h_length);	coll.sin_port = htons(port);	if (connect(csock, (struct sockaddr *) &coll, sizeof(coll)) < 0) {		log_errno("connect");		return NULL;	}	ret = COL_Get_BInput(csock, queri, type, lupdate);	close(csock);	return (ret);}/* ----------------------------------------------------------------- * * COL_Do_Collection -- do a collection from the list in the * Collection.conf file * ----------------------------------------------------------------- */int COL_Do_Collection(){	FILE *g_file;	char host[MAXHOSTNAMELEN + 1], queri[BUFSIZ], mspace[BUFSIZ];	char *ret;	int port, do_indexing = 0;	short type;	time_t lupdate = 0;	if ((g_file = fopen(ColConfig, "r")) == NULL) {		errorlog("Collector: Cannot read collections configuration file: %s\n", ColConfig);		return ERROR;	}	Log("Starting collections...\n");	if (do_IND_Index_Start() == ERROR) {		errorlog("Collector: Failed to start indexer.\n");		(void) fclose(g_file);		return ERROR;	}	memset(mspace, '\0', BUFSIZ);	while (fgets(mspace, BUFSIZ, g_file) != NULL) {		if (mspace[0] == '#')			continue;		memset(host, '\0', MAXHOSTNAMELEN + 1);		memset(queri, '\0', BUFSIZ);		/*		 *  We read the Collection.conf file to get the collection		 *  points.  For Gatherer collections, queri is: "--".		 *  For Broker collections 6 & 7, queri is:		 *  "--FLAGS the flags --QUERY the query"		 */		if (sscanf(mspace, "%s %d %hd %[^\n]s\n",			host, &port, &type, queri) != 4)			continue;		new_nobjs = up_nobjs = del_nobjs = ref_nobjs = recv_nobjs = ign_nobjs = 0;		lupdate = COL_get_last_update(host, port);		max_update_time = lupdate;		ret = COL_Init_Connect(host, port, type, lupdate, queri);		if (ret != NULL) {			(void) P_parse_input(ret, type);		} else {			(void) COL_reset_update(host, port, lupdate);		}		/* If we did something, then update the update time */		if (new_nobjs > 0 || up_nobjs > 0 || del_nobjs > 0 || ref_nobjs > 0) {			(void) COL_put_last_update(host, port, max_update_time);			do_indexing = 1;		}		Log("Finished collection - received %d objects (added %d, updated %d, deleted %d, refreshed %d, ignored %d).\n", recv_nobjs, new_nobjs, up_nobjs, del_nobjs, ref_nobjs, ign_nobjs);	}	(void) fclose(g_file);	if (do_indexing) {		/* Should do any/all indexing now */		if (do_IND_Index_Flush() == ERROR) {			errorlog("Collector: Index flush failed.\n");			return ERROR;		}		/* Compute statistics */		if (AD_do_stats() == ERROR) {			errorlog("Collector: Generate stats failed.\n");			return ERROR;		}	}	(void) RG_Sync_Registry();	Log("Finished collections.\n");	return SUCCESS;}/* *  parse_bulk_query - Parses the Query in Collection.conf.  Looks like *  "--FLAGS the flags --QUERY the query" */static void parse_bulk_query(q, qf, qe)     char *q, *qf, *qe;{	char *s;	if (!strcmp(q, "--"))		return;	if ((s = strstr(q, "--FLAGS")) != NULL)		/* Grab flags */		strcpy(qf, s + strlen("--FLAGS"));	if ((s = strstr(q, "--QUERY")) != NULL)		/* Grab expresion */		strcpy(qe, s + strlen("--QUERY"));	if ((s = strstr(qe, "--FLAGS")) != NULL)	/* Strip flags */		*s = '\0';	if ((s = strstr(qf, "--QUERY")) != NULL)	/* Strip expres. */		*s = '\0';}/* ----------------------------------------------------------------- * COL_Get_BInput() -- Do a Broker-to-Broker xfer * ----------------------------------------------------------------- */char *COL_Get_BInput(csock, queri, type, when)     int csock;     char *queri;     short type;     time_t when;{	static char *tfile;	/* temporary file for xfer */	char ibuf[BUFSIZ], command[BUFSIZ];	char qe[BUFSIZ], qf[BUFSIZ];	FILE *fp, *i_file;	int n;	Debug(71, 1, ("Begin COL_Get_BInput...\n"));	memset(command, '\0', BUFSIZ);	memset(qe, '\0', BUFSIZ);	memset(qf, '\0', BUFSIZ);	/* Skip the beginning '--' in the query */	if (strlen(queri) <= 2 &&	    ((type == BQFULL_U) || (type == BQPARTIAL_U))) {		errorlog("Collector: Cannot send null query: '%s'.\n", queri);		return NULL;	}	parse_bulk_query(queri, qf, qe);	if (type == BAFULL_U) {		Log("Type 4: Full collection each time, without data compression.\n");		sprintf(command, "#BULK #SINCE 0 #END #ALLB");	} else if (type == BAPARTIAL_U) {		Log("Type 5: Incremental collections, without data compression (since %d).\n", when);		sprintf(command, "#BULK #SINCE %d #END #ALLB", when);	} else if (type == BQFULL_U) {		Log("Type 6: Collection based on a query, without data compression.\n");		sprintf(command, "#BULK #SINCE 0 %s #END %s", qf, qe);	} else if (type == BQPARTIAL_U) {		Log("Type 7: Incremental based on a query, without data compression (since %d).\n", when);		sprintf(command, "#BULK #SINCE %d %s #END %s", when, qf, qe);	} else {		errorlog("Illegal collection type %d\n", type);		return NULL;	}	if (write(csock, command, strlen(command)) < 0) {		log_errno("write");		return NULL;	}	tfile = xstrdup(tempnam(NULL, "bgat"));	if ((i_file = fopen(tfile, "a+")) == NULL) {		errorlog("%s%s\n", COLLECT, OPEN_ERR);		xfree(tfile);		return NULL;	}	memset(ibuf, '\0', BUFSIZ);	if ((fp = fdopen(csock, "r")) == NULL) {		log_errno("fdopen");		fclose(i_file);		(void) unlink(tfile);		xfree(tfile);		return NULL;	}	if (fgets(ibuf, BUFSIZ, fp) == NULL) {		errorlog("Collection: Empty response from remote Broker.\n");		fclose(i_file);		(void) unlink(tfile);		xfree(tfile);		return NULL;	}	/* Check for a version header, and skip it. */	while (strncmp(ibuf, QMVERSION, 3) == 0) {		if (fgets(ibuf, BUFSIZ, fp) == NULL) {			errorlog("Collection: Null response?\n");			fclose(i_file);			(void) unlink(tfile);			xfree(tfile);			return NULL;		}	}	/* If we don't receive the Bulk succeeded tag, die */	if (strncmp(ibuf, BULK_SUC, 3) != 0) {		errorlog("%s\n", ibuf);		fclose(i_file);		(void) unlink(tfile);		xfree(tfile);		return NULL;	}	/* See if they're talking to a Harvest Gatherer, not Broker */	if (!strncmp(ibuf, "000 - HELLO", strlen("000 - HELLO"))) {		errorlog("This collection point is to a Harvest GATHERER, not a Broker!!!.\n");		fclose(i_file);		(void) unlink(tfile);		xfree(tfile);		return NULL;	}	/* Read all of the data into a temporary file */	while ((n = fread(ibuf, 1, BUFSIZ, fp)) > 0) {		fwrite(ibuf, 1, n, i_file);	}	Debug(71, 1, ("Finished COL_Get_BInput...\n"));	fclose(fp);	fclose(i_file);	return (tfile);}/* ----------------------------------------------------------------- * * COL_get_list_update() -- * ----------------------------------------------------------------- */time_t COL_get_last_update(host, port)     char *host;     int port;{	int lufd;	char *filename;	time_t ret = 0;	int inport, done = 0, n;	char inbuf[LUPDATE_SIZE];	filename = UTIL_make_admin_filename("LASTUPDATE");	if ((lufd = open(filename, O_RDONLY | O_CREAT, 0666)) == -1) {		errorlog("%s%s :%s:\n", COLLECT, OPEN_ERR, filename);		xfree(filename);		return (0);	}	xfree(filename);	memset(inbuf, '\0', LUPDATE_SIZE);	while (done == 0) {		if ((n = read(lufd, inbuf, LUPDATE_SIZE)) < 0) {			log_errno("read");			ret = 0;			done = 1;		}		if (n == 0) {			ret = 0;			done = 1;		}		if (strncmp(host, inbuf, MAXHOSTNAMELEN) == 0) {			inport = atoi(inbuf + MAXHOSTNAMELEN);			if (inport == port) {				ret = (time_t) atol(inbuf + MAXHOSTNAMELEN + 10);				done = 1;			}		}	}	close(lufd);	return (ret);}/* ----------------------------------------------------------------- * * COL_put_last_update() -- write the update time * ----------------------------------------------------------------- */static int COL_put_last_update(host, port, when)     char *host;     int port;     time_t when;{	int lufd;	char *filename;	off_t here = 0;	int inport, done = 0, n;	char inbuf[LUPDATE_SIZE];	filename = UTIL_make_admin_filename("LASTUPDATE");	if ((lufd = open(filename, O_RDWR | O_CREAT, 0666)) == -1) {		fprintf(stderr, "%s%s :%s:\n", COLLECT, OPEN_ERR, filename);		xfree(filename);		return (0);	}	xfree(filename);	while (done == 0) {		here = lseek(lufd, 0, SEEK_CUR);		memset(inbuf, '\0', LUPDATE_SIZE);		if ((n = read(lufd, inbuf, LUPDATE_SIZE)) < 0) {			perror("read");			close(lufd);			return 0;		}		if (n == 0)			done = 1;		if (strncmp(host, inbuf, MAXHOSTNAMELEN) == 0) {			inport = atoi(inbuf + MAXHOSTNAMELEN);			if (inport == port)				done = 1;		}	}	memset(inbuf, '\0', LUPDATE_SIZE);	strncpy(inbuf, host, MAXHOSTNAMELEN);	sprintf(inbuf + MAXHOSTNAMELEN, "%d", port);	sprintf(inbuf + MAXHOSTNAMELEN + 10, "%d", when);	if (lseek(lufd, here, SEEK_SET) < 0) {		perror("lseek");		return 0;	}	if (write(lufd, inbuf, LUPDATE_SIZE) < 0) {		perror("write");		return 0;	}	close(lufd);	return 1;}/* ----------------------------------------------------------------- * COL_reset_update()  - reverts the update time for the collection point * ----------------------------------------------------------------- */int COL_reset_update(host, port, lup)     char *host;     int port;     time_t lup;{	int lufd;	char *filename;	off_t here = 0;	int inport, done = 0, n;	char inbuf[LUPDATE_SIZE];	filename = UTIL_make_admin_filename("LASTUPDATE");	if ((lufd = open(filename, O_RDWR | O_CREAT, 0666)) == -1) {		errorlog("%s%s :%s:\n", COLLECT, OPEN_ERR, filename);		xfree(filename);		return ERROR;	}	xfree(filename);	while (done == 0) {		here = lseek(lufd, 0, SEEK_CUR);		if ((n = read(lufd, inbuf, LUPDATE_SIZE)) < 0) {			log_errno("read");			close(lufd);			return ERROR;		}		if (n == 0) {			done = 1;		}		if (strncmp(host, inbuf, MAXHOSTNAMELEN) == 0) {			inport = atoi(inbuf + MAXHOSTNAMELEN);			if (inport == port) {				done = 1;			}		}	}	memset(inbuf, '\0', LUPDATE_SIZE);	strncpy(inbuf, host, MAXHOSTNAMELEN);	sprintf(inbuf + MAXHOSTNAMELEN, "%d", port);	sprintf(inbuf + MAXHOSTNAMELEN + 10, "%d", lup);	lseek(lufd, here, SEEK_SET);	if (write(lufd, inbuf, LUPDATE_SIZE) < 0) {		log_errno("write");	}	close(lufd);	return SUCCESS;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -