⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 gdk_logger.c

📁 这个是内存数据库中的一个管理工具
💻 C
📖 第 1 页 / 共 2 页
字号:
#line 158 "/export/scratch0/monet/monet.GNU.64.64.d.14791/MonetDB/src/gdk/gdk_logger.mx"#include "monetdb_config.h"#include "gdk_logger.h"#include <string.h>#line 169 "/export/scratch0/monet/monet.GNU.64.64.d.14791/MonetDB/src/gdk/gdk_logger.mx"#define LOG_START	1#define LOG_END		2#define LOG_INSERT	3#define LOG_DELETE	4#define LOG_UPDATE	5#define LOG_CREATE	6#define LOG_DESTROY	7#define LOG_USE		8#define LOG_CLEAR	9#define LOG_SEQ		10typedef struct logformat_t {	char flag;	int tid;	int nr;} logformat;#define LOGFILE "log"static int bm_commit(logger *lg);static void tr_grow(trans *tr);static voidlogbat_destroy(BAT *b){	if (b)		BBPunfix(b->batCacheid);}static BAT *logbat_new(int ht, int tt, size_t size){	BAT *nb = BATnew(ht, tt, size);	if (ht == TYPE_void)		BATseqbase(nb, 0);	nb->batDirty |= 2;	return nb;}static intlog_read_format(logger *l, logformat *data){	int res = 1;	if (stream_read(l->log, &data->flag, 1, 1) != 1)		return 0;	res = stream_readInt(l->log, &data->nr);	if (res)		res = stream_readInt(l->log, &data->tid);	return res;}static intlog_write_format(logger *l, logformat *data){	if (stream_write(l->log, &data->flag, 1, 1) != 1 || !stream_writeInt(l->log, data->nr) || !stream_writeInt(l->log, data->tid))		return LOG_ERR;	return LOG_OK;}static char *log_read_string(logger *l){	int len;	ssize_t nr;	char *buf;	if (!stream_readInt(l->log, &len))		return NULL;	if (len == 0)		return NULL;	buf = (char*)GDKmalloc(len);	if ((nr = stream_read(l->log, buf, 1, len)) != (ssize_t) len) {		buf[len-1] = 0;		printf("!ERROR: couldn't read name (%s) " SSZFMT "\n", buf, nr);		GDKfree(buf);		return NULL;	}	buf[len-1] = 0;	return buf;}static intlog_write_string(logger *l, char *n){	int len = (int) strlen(n) + 1;	/* log including EOS */	assert(len > 1);	if (!stream_writeInt(l->log, len) ||	    stream_write(l->log, n, 1, len) != (ssize_t) len)		return LOG_ERR;	return LOG_OK;}static voidlog_read_clear(logger *lg, trans *tr, char *name){	if (lg->debug & 1)		fprintf(stderr, "logger found log_read_clear %s\n", name);	tr_grow(tr);	tr->changes[tr->nr].type = LOG_CLEAR;	tr->changes[tr->nr].name = GDKstrdup(name);	tr->nr++;}static voidla_bat_clear(logger *lg, logaction *la){	log_bid bid = logger_find_bat(lg, la->name);	BAT *b;	/* do we need to skip these old updates */	if (BATcount(lg->snapshots)) {		BUN p = BUNfnd(lg->snapshots, &bid);		if (p) {			int tid = *(int*)BUNtloc(lg->snapshots, p);			if (lg->tid <= tid) 				return ;		}	} 	b = BATdescriptor(bid);	if (b) {		BATclear(b);		logbat_destroy(b);	}}static intlog_read_seq(logger *lg, logformat *l){	int seq = l->nr;	lng id;	if (!stream_readLng(lg->log, &id))		return LOG_ERR;	if (BUNfnd(lg->seqs, &seq)) {		BUNdelHead(lg->seqs, &seq, FALSE);	}	BUNins(lg->seqs, &seq, &id, FALSE);	return LOG_OK;}static intlog_read_updates(logger *lg, trans *tr, logformat *l, char *name){	log_bid bid = logger_find_bat(lg, name);	BAT *b = BATdescriptor(bid);	int res = LOG_OK;	int ht = -1, tt = -1, hseq = 0, tseq = 0;	if (lg->debug & 1)		fprintf(stderr, "logger found log_read_updates %s %s %d\n", name, l->flag == LOG_INSERT ? "insert" : l->flag == LOG_DELETE ? "delete" : "update", l->nr);	if (b) { 		ht = b->htype;		if (ht == TYPE_void && b->hseqbase != oid_nil)			hseq = 1;		tt = b->ttype;		if (tt == TYPE_void && b->tseqbase != oid_nil)			tseq = 1;	} else { /* search trans action for create statement */		int i;		for (i=0; i<tr->nr; i++) {			if (tr->changes[i].type == LOG_CREATE &&			    strcmp(tr->changes[i].name, name) == 0) {				ht = tr->changes[i].ht;				if (ht < 0) {					hseq = 1;					ht = TYPE_void;				}				tt = tr->changes[i].tt;				if (tt < 0) {					tseq = 1;					tt = TYPE_void;				}				break;			}		}	}	if (ht>=0 && tt>=0) {		BAT *r;		void *(*rt) (ptr, stream *, size_t) = BATatoms[tt].atomRead;		void *tv = ATOMnil(tt);		r = BATnew(ht, tt, l->nr);		if (hseq)			BATseqbase(r, 0);		if (tseq)			BATseqbase(BATmirror(r), 0);		if (ht == TYPE_void && l->flag == LOG_INSERT) { 			for (; l->nr > 0; l->nr--) {				void *t = rt(tv, lg->log, 1);				if (!t) {					res = LOG_ERR;					break;				}				if (l->flag == LOG_INSERT)					BUNappend(r, t, TRUE);				if (t != tv)					GDKfree(t);			}		} else {			void *(*rh) (ptr, stream *, size_t) = ht == TYPE_void ? BATatoms[TYPE_oid].atomRead : BATatoms[ht].atomRead;			void *hv = ATOMnil(ht);			for (; l->nr > 0; l->nr--) {				void *h = rh(hv, lg->log, 1);				void *t = rt(tv, lg->log, 1);				if (!h || !t) {					res = LOG_ERR;					break;				}				BUNins(r, h, t, TRUE);				if (h != hv)					GDKfree(h);				if (t != tv)					GDKfree(t);			}			GDKfree(hv);		}		GDKfree(tv);		logbat_destroy(b);				tr_grow(tr);		tr->changes[tr->nr].type = l->flag;		tr->changes[tr->nr].nr = l->nr;		tr->changes[tr->nr].ht = ht;		tr->changes[tr->nr].tt = tt;		tr->changes[tr->nr].name = GDKstrdup(name);		tr->changes[tr->nr].b = r;		tr->nr++;	} else { /* bat missing ERROR or ignore ? currently error. */		res = LOG_ERR;	}	return res;}static voidla_bat_updates(logger *lg, logaction *la){	log_bid bid = logger_find_bat(lg, la->name);	BAT *b;        if (bid == 0) return; /* ignore bats no longer in the catalog */	/* do we need to skip these old updates */	if (BATcount(lg->snapshots)) {		BUN p = BUNfnd(lg->snapshots, &bid);		if (p) {			int tid = *(int*)BUNtloc(lg->snapshots, p);			if (lg->tid <= tid) 				return ;		}	}	b = BATdescriptor(bid);	assert(b);	if (b) {		if (b->htype == TYPE_void && la->type == LOG_INSERT) { 			BATappend(b, la->b, TRUE);		} else {			if (la->type == LOG_INSERT)				BATins(b, la->b, TRUE);			else if (la->type == LOG_DELETE)				BATdel(b, la->b, TRUE);			else if (la->type == LOG_UPDATE) { 				BUN p, q;				BATloop(la->b, p, q) {					ptr h = BUNhead(la->b, p);					ptr t = BUNtail(la->b, p);					if (BUNfnd(b, h) == NULL) {						/* if value doesn't exist, insert it					   	   if b void headed, maintain that by inserting nils */						if (b->htype == TYPE_void) {							if (b->batCount == 0 && * (oid *) h != oid_nil)								b->hseqbase = * (oid *) h;							if (b->hseqbase != oid_nil && * (oid *) h != oid_nil) {								void *tv = ATOMnilptr(b->ttype);								while (b->hseqbase + b->batCount < * (oid *) h)									BUNappend(b, tv, TRUE);							}							BUNappend(b, t, TRUE);						} else {							BUNins(b, h, t, TRUE);						}					} else {						BUNreplace(b, h, t, TRUE);					}				}			}		}		logbat_destroy(b);	}}static voidlog_read_destroy(logger *lg, trans *tr, char *name){	(void)lg;	tr_grow(tr);	tr->changes[tr->nr].type = LOG_DESTROY;	tr->changes[tr->nr].name = GDKstrdup(name);	tr->nr++;}static voidla_bat_destroy(logger *lg, logaction *la){	log_bid bid = logger_find_bat(lg, la->name);	if (bid)		logger_del_bat(lg, bid);}static intlog_read_create(logger *lg, trans *tr, char *name){	char *buf = log_read_string(lg);	if (lg->debug & 1)		fprintf(stderr, "log_read_create %s\n", name);	if (!buf) {		return LOG_ERR;	} else {		int ht, tt;		char *ha = buf, *ta = strchr(buf, ',');		if (!ta) 			return LOG_ERR;		*ta = 0;		ta++;		/* skip over , */		if (strcmp(ha, "vid") == 0) {			ht = -1;		} else {			ht = ATOMindex(ha);		}		if (strcmp(ta, "vid") == 0) {			tt = -1;		} else {			tt = ATOMindex(ta);		}		tr_grow(tr);		tr->changes[tr->nr].type = LOG_CREATE;		tr->changes[tr->nr].ht = ht;		tr->changes[tr->nr].tt = tt;		tr->changes[tr->nr].name = GDKstrdup(name);		tr->changes[tr->nr].b = NULL;		tr->nr++;	}	if (buf)		GDKfree(buf);	return LOG_OK;}static voidla_bat_create(logger *lg, logaction *la){	int ht = (la->ht<0)?TYPE_void:la->ht;	int tt = (la->tt<0)?TYPE_void:la->tt;	BAT *b = BATnew(ht, tt, BATSIZE);	if (la->ht<0)		BATseqbase(b, 0);	if (la->tt<0)		BATseqbase(BATmirror(b), 0);	logger_add_bat(lg, b, la->name);	logbat_destroy(b);}static voidlog_read_use(logger *lg, trans *tr, logformat *l, char *name){	(void)lg;	tr_grow(tr);	tr->changes[tr->nr].type = LOG_USE;	tr->changes[tr->nr].nr = l->nr;	tr->changes[tr->nr].name = GDKstrdup(name);	tr->changes[tr->nr].b = NULL;	tr->nr++;}static voidla_bat_use(logger *lg, logaction *la ){	log_bid bid = la->nr;	BAT *b = BATdescriptor(bid);	if (!b) {		GDKerror("logger could not use bat (" OIDFMT ") for %s\n", 			bid, la->name);		return ;	}	logger_add_bat(lg, b, la->name);	logbat_destroy(b);}#define TR_SIZE 	1024static trans*tr_create(trans *tr, int tid){	trans *ntr = (trans*)GDKmalloc(sizeof(trans));	ntr->tid = tid;	ntr->sz = TR_SIZE;	ntr->nr = 0;	ntr->changes = (logaction*)GDKmalloc(sizeof(logaction)*TR_SIZE);	ntr->tr = tr;	return ntr;}static trans *tr_find(trans *tr, int tid)/* finds the tid and reorders the chain list, puts trans with tid first */{	trans *t = tr, *p = NULL;	while(t && t->tid != tid) {		p = t;		t = t->tr;	}	if (!t)		return NULL; /* BAD missing transaction */	if (t == tr) 		return tr;	if (t->tr)  /* get this tid out of the list */		p->tr = t->tr; 	t->tr = tr; /* and move it to the front */ 	return t;}static voidla_apply( logger *lg, logaction *c ){	switch(c->type) {	case LOG_INSERT:	case LOG_DELETE:	case LOG_UPDATE:		la_bat_updates(lg, c);		break;	case LOG_CREATE:		la_bat_create(lg, c);		break;	case LOG_USE:		la_bat_use(lg, c);		break;	case LOG_DESTROY:		la_bat_destroy(lg, c);		break;	case LOG_CLEAR:		la_bat_clear(lg, c);		break;	}}static voidla_destroy( logaction *c ) {	if (c->name)		GDKfree(c->name);	if (c->b)		logbat_destroy(c->b);}static voidtr_grow(trans *tr){	if (tr->nr == tr->sz) {		tr->sz <<= 1;		tr->changes = (logaction*)			GDKrealloc(tr->changes, tr->sz * sizeof(logaction));	}	/* cleanup the next */	tr->changes[tr->nr].name = NULL;	tr->changes[tr->nr].b = NULL;}static trans *tr_destroy(trans *tr){	trans *r = tr->tr;	GDKfree(tr->changes);	GDKfree(tr);	return r;}static trans *tr_commit(logger *lg, trans *tr){	int i;	if (lg->debug & 1)		printf("tr_commit\n");	for(i=0; i<tr->nr; i++) {		la_apply(lg, &tr->changes[i]);		la_destroy(&tr->changes[i]);	}	return tr_destroy(tr);}static trans *tr_abort(logger *lg, trans *tr){	int i;	if (lg->debug & 1)		printf("tr_abort\n");	for(i=0; i<tr->nr; i++) 		la_destroy(&tr->changes[i]);	return tr_destroy(tr);}static intlogger_open(logger *lg){	char filename[BUFSIZ];	snprintf(filename, BUFSIZ, "%s%s." LLFMT, lg->dir, LOGFILE, lg->id);	lg->log = open_wstream(filename);	if (stream_errnr(lg->log))		 return LOG_ERR;	return LOG_OK;}static voidlogger_close(logger *lg){	stream *log = lg->log;	if (log) {		stream_close(log);		stream_destroy(log);	}	lg->log = NULL;}static intlogger_readlog(logger *lg, char *filename){	trans *tr = NULL;	logformat l;	int err = 0;	lg->log = open_rstream(filename);	/* if the file doesn't exist, there is nothing to be readback */	if (!lg->log || stream_errnr(lg->log)) {		if (lg->log)			stream_destroy(lg->log);		return 0;	}	while (!err && log_read_format(lg, &l)) {		char *name = NULL;		if (l.flag != LOG_START && l.flag != LOG_END && 		    l.flag != LOG_SEQ) {			name = log_read_string(lg);			if (!name) {				err = -1;				break;			}		}		/* find proper transaction record */		if (l.flag != LOG_START)			tr = tr_find(tr, l.tid);		switch (l.flag) {		case LOG_START:			if (l.nr > lg->tid)				lg->tid = l.nr;			tr = tr_create(tr, l.nr);			if (lg->debug & 1)				fprintf(stderr, "logger tstart %d\n", 					tr->tid);			break;		case LOG_END:			if (l.tid != l.nr) /* abort record */				tr = tr_abort(lg, tr);			else				tr = tr_commit(lg, tr);			break;		case LOG_SEQ:			err = (log_read_seq(lg, &l) != LOG_OK);			break;		case LOG_INSERT:		case LOG_DELETE:		case LOG_UPDATE:			err = (log_read_updates(lg, tr, &l, name) !=LOG_OK);			break;		case LOG_CREATE:			err = (log_read_create(lg, tr, name) != LOG_OK);			break;		case LOG_USE:			log_read_use(lg, tr, &l, name);			break;		case LOG_DESTROY:			log_read_destroy(lg, tr, name);			break;		case LOG_CLEAR:			log_read_clear(lg, tr, name);			break;		default:			err = -2;		}		if (name)			GDKfree(name);		lg->changes++;	}	logger_close(lg);	/* remaining transactions are not committed, ie abort */	while(tr)		tr = tr_abort(lg, tr);	return 0;}#line 803 "/export/scratch0/monet/monet.GNU.64.64.d.14791/MonetDB/src/gdk/gdk_logger.mx"static intlogger_readlogs(logger *lg, FILE *fp, char *filename){	int res = 0;	char id[BUFSIZ];	if (lg->debug & 1)		fprintf(stderr, "logger_readlogs %s\n", filename);	while (fgets(id, BUFSIZ, fp) != NULL) {		char buf[BUFSIZ];		lng lid = strtoll(id, NULL, 10);				if (lid >= lg->id) {			lg->id = lid;			snprintf(buf, BUFSIZ, "%s." LLFMT, filename, lg->id);			if ((res = logger_readlog(lg, buf)) != 0) {				/* we cannot distinguish errors from 				   incomplete transactions (even if we 				   would log aborts in the logs). 				   So we simply abort and move to the next 				   log file */				(void)res;			}		}	}	return res;}static intlogger_commit(logger *lg){	int id = LOG_SID;	if (lg->debug & 1)		fprintf(stderr, "logger_commit\n");	BUNdelHead(lg->seqs, &id, FALSE);	BUNins(lg->seqs, &id, &lg->id, FALSE);	/* cleanup old snapshots */	if (BATcount(lg->snapshots)) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -