gdk_logger.mx

来自「这个是内存数据库中的一个管理工具」· MX 代码 · 共 1,528 行 · 第 1/3 页

MX
1,528
字号
		if (strcmp(ha, "vid") == 0) {			ht = -1;		} else {			ht = ATOMindex(ha);		}		if (strcmp(ta, "vid") == 0) {			tt = -1;		} else {			tt = ATOMindex(ta);		}		tr_grow(tr);		tr->changes[tr->nr].type = LOG_CREATE;		tr->changes[tr->nr].ht = ht;		tr->changes[tr->nr].tt = tt;		tr->changes[tr->nr].name = GDKstrdup(name);		tr->changes[tr->nr].b = NULL;		tr->nr++;	}	if (buf)		GDKfree(buf);	return LOG_OK;}static voidla_bat_create(logger *lg, logaction *la){	int ht = (la->ht<0)?TYPE_void:la->ht;	int tt = (la->tt<0)?TYPE_void:la->tt;	BAT *b = BATnew(ht, tt, BATSIZE);	if (la->ht<0)		BATseqbase(b, 0);	if (la->tt<0)		BATseqbase(BATmirror(b), 0);	logger_add_bat(lg, b, la->name);	logbat_destroy(b);}static voidlog_read_use(logger *lg, trans *tr, logformat *l, char *name){	(void)lg;	tr_grow(tr);	tr->changes[tr->nr].type = LOG_USE;	tr->changes[tr->nr].nr = l->nr;	tr->changes[tr->nr].name = GDKstrdup(name);	tr->changes[tr->nr].b = NULL;	tr->nr++;}static voidla_bat_use(logger *lg, logaction *la ){	log_bid bid = la->nr;	BAT *b = BATdescriptor(bid);	if (!b) {		GDKerror("logger could not use bat (" OIDFMT ") for %s\n", 			bid, la->name);		return ;	}	logger_add_bat(lg, b, la->name);	logbat_destroy(b);}#define TR_SIZE 	1024static trans*tr_create(trans *tr, int tid){	trans *ntr = (trans*)GDKmalloc(sizeof(trans));	ntr->tid = tid;	ntr->sz = TR_SIZE;	ntr->nr = 0;	ntr->changes = (logaction*)GDKmalloc(sizeof(logaction)*TR_SIZE);	ntr->tr = tr;	return ntr;}static trans *tr_find(trans *tr, int tid)/* finds the tid and reorders the chain list, puts trans with tid first */{	trans *t = tr, *p = NULL;	while(t && t->tid != tid) {		p = t;		t = t->tr;	}	if (!t)		return NULL; /* BAD missing transaction */	if (t == tr) 		return tr;	if (t->tr)  /* get this tid out of the list */		p->tr = t->tr; 	t->tr = tr; /* and move it to the front */ 	return t;}static voidla_apply( logger *lg, logaction *c ){	switch(c->type) {	case LOG_INSERT:	case LOG_DELETE:	case LOG_UPDATE:		la_bat_updates(lg, c);		break;	case LOG_CREATE:		la_bat_create(lg, c);		break;	case LOG_USE:		la_bat_use(lg, c);		break;	case LOG_DESTROY:		la_bat_destroy(lg, c);		break;	case LOG_CLEAR:		la_bat_clear(lg, c);		break;	}}static voidla_destroy( logaction *c ) {	if (c->name)		GDKfree(c->name);	if (c->b)		logbat_destroy(c->b);}static voidtr_grow(trans *tr){	if (tr->nr == tr->sz) {		tr->sz <<= 1;		tr->changes = (logaction*)			GDKrealloc(tr->changes, tr->sz * sizeof(logaction));	}	/* cleanup the next */	tr->changes[tr->nr].name = NULL;	tr->changes[tr->nr].b = NULL;}static trans *tr_destroy(trans *tr){	trans *r = tr->tr;	GDKfree(tr->changes);	GDKfree(tr);	return r;}static trans *tr_commit(logger *lg, trans *tr){	int i;	if (lg->debug & 1)		printf("tr_commit\n");	for(i=0; i<tr->nr; i++) {		la_apply(lg, &tr->changes[i]);		la_destroy(&tr->changes[i]);	}	return tr_destroy(tr);}static trans *tr_abort(logger *lg, trans *tr){	int i;	if (lg->debug & 1)		printf("tr_abort\n");	for(i=0; i<tr->nr; i++) 		la_destroy(&tr->changes[i]);	return tr_destroy(tr);}static intlogger_open(logger *lg){	char filename[BUFSIZ];	snprintf(filename, BUFSIZ, "%s%s." LLFMT, lg->dir, LOGFILE, lg->id);	lg->log = open_wstream(filename);	if (stream_errnr(lg->log))		 return LOG_ERR;	return LOG_OK;}static voidlogger_close(logger *lg){	stream *log = lg->log;	if (log) {		stream_close(log);		stream_destroy(log);	}	lg->log = NULL;}static intlogger_readlog(logger *lg, char *filename){	trans *tr = NULL;	logformat l;	int err = 0;	lg->log = open_rstream(filename);	/* if the file doesn't exist, there is nothing to be readback */	if (!lg->log || stream_errnr(lg->log)) {		if (lg->log)			stream_destroy(lg->log);		return 0;	}	while (!err && log_read_format(lg, &l)) {		char *name = NULL;		if (l.flag != LOG_START && l.flag != LOG_END && 		    l.flag != LOG_SEQ) {			name = log_read_string(lg);			if (!name) {				err = -1;				break;			}		}		/* find proper transaction record */		if (l.flag != LOG_START)			tr = tr_find(tr, l.tid);		switch (l.flag) {		case LOG_START:			if (l.nr > lg->tid)				lg->tid = l.nr;			tr = tr_create(tr, l.nr);			if (lg->debug & 1)				fprintf(stderr, "logger tstart %d\n", 					tr->tid);			break;		case LOG_END:			if (l.tid != l.nr) /* abort record */				tr = tr_abort(lg, tr);			else				tr = tr_commit(lg, tr);			break;		case LOG_SEQ:			err = (log_read_seq(lg, &l) != LOG_OK);			break;		case LOG_INSERT:		case LOG_DELETE:		case LOG_UPDATE:			err = (log_read_updates(lg, tr, &l, name) !=LOG_OK);			break;		case LOG_CREATE:			err = (log_read_create(lg, tr, name) != LOG_OK);			break;		case LOG_USE:			log_read_use(lg, tr, &l, name);			break;		case LOG_DESTROY:			log_read_destroy(lg, tr, name);			break;		case LOG_CLEAR:			log_read_clear(lg, tr, name);			break;		default:			err = -2;		}		if (name)			GDKfree(name);		lg->changes++;	}	logger_close(lg);	/* remaining transactions are not committed, ie abort */	while(tr)		tr = tr_abort(lg, tr);	return 0;}@-The log files are incrementally numbered. They are processed in thesame sequence.@cstatic intlogger_readlogs(logger *lg, FILE *fp, char *filename){	int res = 0;	char id[BUFSIZ];	if (lg->debug & 1)		fprintf(stderr, "logger_readlogs %s\n", filename);	while (fgets(id, BUFSIZ, fp) != NULL) {		char buf[BUFSIZ];		lng lid = strtoll(id, NULL, 10);				if (lid >= lg->id) {			lg->id = lid;			snprintf(buf, BUFSIZ, "%s." LLFMT, filename, lg->id);			if ((res = logger_readlog(lg, buf)) != 0) {				/* we cannot distinguish errors from 				   incomplete transactions (even if we 				   would log aborts in the logs). 				   So we simply abort and move to the next 				   log file */				(void)res;			}		}	}	return res;}static intlogger_commit(logger *lg){	int id = LOG_SID;	if (lg->debug & 1)		fprintf(stderr, "logger_commit\n");	BUNdelHead(lg->seqs, &id, FALSE);	BUNins(lg->seqs, &id, &lg->id, FALSE);	/* cleanup old snapshots */	if (BATcount(lg->snapshots)) {		BATclear(lg->snapshots);		BATcommit(lg->snapshots);	}	return bm_commit(lg);}static intcheck_version(logger *lg, FILE *fp){	int version = 0;	if (fscanf(fp, "%6d", &version) != 1 || version != lg->version) {		GDKerror("Incompatible database version %06d, "			 "this server supports version %06d\n"			 "Please move away %s and its corresponding dbfarm.",			 version, lg->version, lg->dir);		return -1;	}	fgetc(fp);		/* skip \n */	fgetc(fp);		/* skip \n */	return 0;}staticint bm_subcommit( BAT * list, BAT * catalog ) {	BUN p,q;	BAT *n = logbat_new(TYPE_void, TYPE_str, BATcount(list)*2);	BAT *b = list;	int res;	BATseqbase(n,0);	/* first loop over deleted then over current and new */	for (p = b->batDeleted; p < b->batFirst; p = BUNnext(b, p)) {		bat col = *(log_bid*)BUNhead(b,p);		str name = BBPname(col);		BUNappend(n, name, FALSE);	}	BATloop(b, p, q) {		bat col = *(log_bid*)BUNhead(b,p);		str name = BBPname(col);		BUNappend(n, name, FALSE);	}	BUNappend(n, BBPname(catalog->batCacheid), FALSE);	/* now commit catalog, so it's also up to date on disk */	BATcommit(catalog); 	res = TMsubcommit(n);	BBPreclaim(n);	return res;}static logger *logger_new(int debug, char *fn, char *logdir, char *dbname, int version){	int id = LOG_SID;	logger *lg = (struct logger*)GDKmalloc(sizeof(struct logger));	FILE *fp;	char filename[BUFSIZ];	char bak[BUFSIZ];	log_bid seqs = 0;	bat catalog;	snprintf(bak, BUFSIZ, "%s_catalog", fn); 	catalog = BBPindex(bak);	lg->debug = debug;	lg->changes = 0;	lg->version = version;	lg->id = 1;	lg->tid = 0;	snprintf(filename, BUFSIZ, "%s%c%s%c", logdir, DIR_SEP, dbname, DIR_SEP);	lg->fn = GDKstrdup(fn);	lg->dir = GDKstrdup(filename);	lg->log = NULL;	lg->catalog = NULL;	if (catalog == 0) {		log_bid bid = 0;		lg->catalog = logbat_new(TYPE_int, TYPE_str, BATSIZE);		if (debug)			printf("create %s catalog\n", fn);		bid = lg->catalog->batCacheid;		/* Make persistent */		BBPincref(bid, TRUE);		BATmode(lg->catalog, PERSISTENT);		BBPrename(lg->catalog->batCacheid, bak);		if (bm_subcommit(lg->catalog, lg->catalog) != 0)  			return NULL;	} else { /* find the persistent catalog. As non persistent bats		    require a logical reference we also add a logical 		    reference for the persistent bats */		BUN p, q;		BAT *b = BATdescriptor(catalog);		lg->catalog = b;		BATloop(b, p, q) {			bat bid = *(log_bid *) BUNhead(b, p);			BBPincref(bid, TRUE);		}	}	seqs = logger_find_bat(lg, "seqs");	if (seqs == 0) {		lg->seqs = logbat_new(TYPE_int, TYPE_lng, 1);		BATmode(lg->seqs, PERSISTENT);	        snprintf(bak, BUFSIZ, "%s_seqs", fn);		BBPrename(lg->seqs->batCacheid, bak);				logger_add_bat(lg, lg->seqs, "seqs");		BUNins(lg->seqs, &id, &lg->id, FALSE);		lg->snapshots = logbat_new(TYPE_int, TYPE_int, 1);		BATmode(lg->snapshots, PERSISTENT);	        snprintf(bak, BUFSIZ, "%s_snapshots", fn);		BBPrename(lg->snapshots->batCacheid, bak);		logger_add_bat(lg, lg->snapshots, "snapshots");		bm_subcommit(lg->catalog, lg->catalog);	} else {		bat snapshots = logger_find_bat(lg, "snapshots");		lg->seqs = BATdescriptor(seqs);		if (BATcount(lg->seqs)) {			lg->id = *(lng*)BUNtail(lg->seqs, BUNfnd(lg->seqs, &id));		} else {			BUNins(lg->seqs, &id, &lg->id, FALSE);		}		lg->snapshots = BATdescriptor(snapshots);	}	snprintf(filename, BUFSIZ, "%s%s", lg->dir, LOGFILE);	snprintf(bak, BUFSIZ, "%s.bak", filename);	if ((fp = fopen(filename, "r")) != NULL) {		if (check_version(lg, fp))			 return NULL;		lg->changes++;		logger_readlogs(lg, fp, filename);	} else if ((fp = fopen(bak, "r")) != NULL) {		fclose(fp);		GDKmove(lg->dir, LOGFILE, "bak", lg->dir, LOGFILE, NULL);		if ((fp = fopen(filename, "r")) != NULL) {			if (check_version(lg, fp))				 return NULL;			lg->changes++;			logger_readlogs(lg, fp, filename);		}	} else if ((fp = fopen(filename, "w")) == NULL) {		if (!GDKcreatedir(filename)) {			GDKerror("logger could not create log directory %s\n", lg->dir);			return NULL;		} else if ((fp = fopen(filename, "w")) == NULL) {			GDKerror("logger could not create file %s\n", filename);			return NULL;		}		fprintf(fp, "%06d\n\n", lg->version);		lg->id ++;		fprintf(fp, LLFMT "\n", lg->id);	} else {		fprintf(fp, "%06d\n\n", lg->version);		lg->id ++;		fprintf(fp, LLFMT "\n", lg->id);	}	fclose(fp);	return lg;}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?