📄 gdk_logger.c
字号:
#line 158 "/export/scratch0/monet/monet.GNU.64.64.d.14791/MonetDB/src/gdk/gdk_logger.mx"#include "monetdb_config.h"#include "gdk_logger.h"#include <string.h>#line 169 "/export/scratch0/monet/monet.GNU.64.64.d.14791/MonetDB/src/gdk/gdk_logger.mx"#define LOG_START 1#define LOG_END 2#define LOG_INSERT 3#define LOG_DELETE 4#define LOG_UPDATE 5#define LOG_CREATE 6#define LOG_DESTROY 7#define LOG_USE 8#define LOG_CLEAR 9#define LOG_SEQ 10typedef struct logformat_t { char flag; int tid; int nr;} logformat;#define LOGFILE "log"static int bm_commit(logger *lg);static void tr_grow(trans *tr);static voidlogbat_destroy(BAT *b){ if (b) BBPunfix(b->batCacheid);}static BAT *logbat_new(int ht, int tt, size_t size){ BAT *nb = BATnew(ht, tt, size); if (ht == TYPE_void) BATseqbase(nb, 0); nb->batDirty |= 2; return nb;}static intlog_read_format(logger *l, logformat *data){ int res = 1; if (stream_read(l->log, &data->flag, 1, 1) != 1) return 0; res = stream_readInt(l->log, &data->nr); if (res) res = stream_readInt(l->log, &data->tid); return res;}static intlog_write_format(logger *l, logformat *data){ if (stream_write(l->log, &data->flag, 1, 1) != 1 || !stream_writeInt(l->log, data->nr) || !stream_writeInt(l->log, data->tid)) return LOG_ERR; return LOG_OK;}static char *log_read_string(logger *l){ int len; ssize_t nr; char *buf; if (!stream_readInt(l->log, &len)) return NULL; if (len == 0) return NULL; buf = (char*)GDKmalloc(len); if ((nr = stream_read(l->log, buf, 1, len)) != (ssize_t) len) { buf[len-1] = 0; printf("!ERROR: couldn't read name (%s) " SSZFMT "\n", buf, nr); GDKfree(buf); return NULL; } buf[len-1] = 0; return buf;}static intlog_write_string(logger *l, char *n){ int len = (int) strlen(n) + 1; /* log including EOS */ assert(len > 1); if (!stream_writeInt(l->log, len) || stream_write(l->log, n, 1, len) != (ssize_t) len) return LOG_ERR; return LOG_OK;}static voidlog_read_clear(logger *lg, trans *tr, char *name){ if (lg->debug & 1) fprintf(stderr, "logger found log_read_clear %s\n", name); tr_grow(tr); tr->changes[tr->nr].type = LOG_CLEAR; tr->changes[tr->nr].name = GDKstrdup(name); tr->nr++;}static voidla_bat_clear(logger *lg, logaction *la){ log_bid bid = logger_find_bat(lg, la->name); BAT *b; /* do we need to skip these old updates */ if (BATcount(lg->snapshots)) { BUN p = BUNfnd(lg->snapshots, &bid); if (p) { int tid = *(int*)BUNtloc(lg->snapshots, p); if (lg->tid <= tid) return ; } } b = BATdescriptor(bid); if (b) { BATclear(b); logbat_destroy(b); }}static intlog_read_seq(logger *lg, logformat *l){ int seq = l->nr; lng id; if (!stream_readLng(lg->log, &id)) return LOG_ERR; if (BUNfnd(lg->seqs, &seq)) { BUNdelHead(lg->seqs, &seq, FALSE); } BUNins(lg->seqs, &seq, &id, FALSE); return LOG_OK;}static intlog_read_updates(logger *lg, trans *tr, logformat *l, char *name){ log_bid bid = logger_find_bat(lg, name); BAT *b = BATdescriptor(bid); int res = LOG_OK; int ht = -1, tt = -1, hseq = 0, tseq = 0; if (lg->debug & 1) fprintf(stderr, "logger found log_read_updates %s %s %d\n", name, l->flag == LOG_INSERT ? "insert" : l->flag == LOG_DELETE ? "delete" : "update", l->nr); if (b) { ht = b->htype; if (ht == TYPE_void && b->hseqbase != oid_nil) hseq = 1; tt = b->ttype; if (tt == TYPE_void && b->tseqbase != oid_nil) tseq = 1; } else { /* search trans action for create statement */ int i; for (i=0; i<tr->nr; i++) { if (tr->changes[i].type == LOG_CREATE && strcmp(tr->changes[i].name, name) == 0) { ht = tr->changes[i].ht; if (ht < 0) { hseq = 1; ht = TYPE_void; } tt = tr->changes[i].tt; if (tt < 0) { tseq = 1; tt = TYPE_void; } break; } } } if (ht>=0 && tt>=0) { BAT *r; void *(*rt) (ptr, stream *, size_t) = BATatoms[tt].atomRead; void *tv = ATOMnil(tt); r = BATnew(ht, tt, l->nr); if (hseq) BATseqbase(r, 0); if (tseq) BATseqbase(BATmirror(r), 0); if (ht == TYPE_void && l->flag == LOG_INSERT) { for (; l->nr > 0; l->nr--) { void *t = rt(tv, lg->log, 1); if (!t) { res = LOG_ERR; break; } if (l->flag == LOG_INSERT) BUNappend(r, t, TRUE); if (t != tv) GDKfree(t); } } else { void *(*rh) (ptr, stream *, size_t) = ht == TYPE_void ? BATatoms[TYPE_oid].atomRead : BATatoms[ht].atomRead; void *hv = ATOMnil(ht); for (; l->nr > 0; l->nr--) { void *h = rh(hv, lg->log, 1); void *t = rt(tv, lg->log, 1); if (!h || !t) { res = LOG_ERR; break; } BUNins(r, h, t, TRUE); if (h != hv) GDKfree(h); if (t != tv) GDKfree(t); } GDKfree(hv); } GDKfree(tv); logbat_destroy(b); tr_grow(tr); tr->changes[tr->nr].type = l->flag; tr->changes[tr->nr].nr = l->nr; tr->changes[tr->nr].ht = ht; tr->changes[tr->nr].tt = tt; tr->changes[tr->nr].name = GDKstrdup(name); tr->changes[tr->nr].b = r; tr->nr++; } else { /* bat missing ERROR or ignore ? currently error. */ res = LOG_ERR; } return res;}static voidla_bat_updates(logger *lg, logaction *la){ log_bid bid = logger_find_bat(lg, la->name); BAT *b; if (bid == 0) return; /* ignore bats no longer in the catalog */ /* do we need to skip these old updates */ if (BATcount(lg->snapshots)) { BUN p = BUNfnd(lg->snapshots, &bid); if (p) { int tid = *(int*)BUNtloc(lg->snapshots, p); if (lg->tid <= tid) return ; } } b = BATdescriptor(bid); assert(b); if (b) { if (b->htype == TYPE_void && la->type == LOG_INSERT) { BATappend(b, la->b, TRUE); } else { if (la->type == LOG_INSERT) BATins(b, la->b, TRUE); else if (la->type == LOG_DELETE) BATdel(b, la->b, TRUE); else if (la->type == LOG_UPDATE) { BUN p, q; BATloop(la->b, p, q) { ptr h = BUNhead(la->b, p); ptr t = BUNtail(la->b, p); if (BUNfnd(b, h) == NULL) { /* if value doesn't exist, insert it if b void headed, maintain that by inserting nils */ if (b->htype == TYPE_void) { if (b->batCount == 0 && * (oid *) h != oid_nil) b->hseqbase = * (oid *) h; if (b->hseqbase != oid_nil && * (oid *) h != oid_nil) { void *tv = ATOMnilptr(b->ttype); while (b->hseqbase + b->batCount < * (oid *) h) BUNappend(b, tv, TRUE); } BUNappend(b, t, TRUE); } else { BUNins(b, h, t, TRUE); } } else { BUNreplace(b, h, t, TRUE); } } } } logbat_destroy(b); }}static voidlog_read_destroy(logger *lg, trans *tr, char *name){ (void)lg; tr_grow(tr); tr->changes[tr->nr].type = LOG_DESTROY; tr->changes[tr->nr].name = GDKstrdup(name); tr->nr++;}static voidla_bat_destroy(logger *lg, logaction *la){ log_bid bid = logger_find_bat(lg, la->name); if (bid) logger_del_bat(lg, bid);}static intlog_read_create(logger *lg, trans *tr, char *name){ char *buf = log_read_string(lg); if (lg->debug & 1) fprintf(stderr, "log_read_create %s\n", name); if (!buf) { return LOG_ERR; } else { int ht, tt; char *ha = buf, *ta = strchr(buf, ','); if (!ta) return LOG_ERR; *ta = 0; ta++; /* skip over , */ if (strcmp(ha, "vid") == 0) { ht = -1; } else { ht = ATOMindex(ha); } if (strcmp(ta, "vid") == 0) { tt = -1; } else { tt = ATOMindex(ta); } tr_grow(tr); tr->changes[tr->nr].type = LOG_CREATE; tr->changes[tr->nr].ht = ht; tr->changes[tr->nr].tt = tt; tr->changes[tr->nr].name = GDKstrdup(name); tr->changes[tr->nr].b = NULL; tr->nr++; } if (buf) GDKfree(buf); return LOG_OK;}static voidla_bat_create(logger *lg, logaction *la){ int ht = (la->ht<0)?TYPE_void:la->ht; int tt = (la->tt<0)?TYPE_void:la->tt; BAT *b = BATnew(ht, tt, BATSIZE); if (la->ht<0) BATseqbase(b, 0); if (la->tt<0) BATseqbase(BATmirror(b), 0); logger_add_bat(lg, b, la->name); logbat_destroy(b);}static voidlog_read_use(logger *lg, trans *tr, logformat *l, char *name){ (void)lg; tr_grow(tr); tr->changes[tr->nr].type = LOG_USE; tr->changes[tr->nr].nr = l->nr; tr->changes[tr->nr].name = GDKstrdup(name); tr->changes[tr->nr].b = NULL; tr->nr++;}static voidla_bat_use(logger *lg, logaction *la ){ log_bid bid = la->nr; BAT *b = BATdescriptor(bid); if (!b) { GDKerror("logger could not use bat (" OIDFMT ") for %s\n", bid, la->name); return ; } logger_add_bat(lg, b, la->name); logbat_destroy(b);}#define TR_SIZE 1024static trans*tr_create(trans *tr, int tid){ trans *ntr = (trans*)GDKmalloc(sizeof(trans)); ntr->tid = tid; ntr->sz = TR_SIZE; ntr->nr = 0; ntr->changes = (logaction*)GDKmalloc(sizeof(logaction)*TR_SIZE); ntr->tr = tr; return ntr;}static trans *tr_find(trans *tr, int tid)/* finds the tid and reorders the chain list, puts trans with tid first */{ trans *t = tr, *p = NULL; while(t && t->tid != tid) { p = t; t = t->tr; } if (!t) return NULL; /* BAD missing transaction */ if (t == tr) return tr; if (t->tr) /* get this tid out of the list */ p->tr = t->tr; t->tr = tr; /* and move it to the front */ return t;}static voidla_apply( logger *lg, logaction *c ){ switch(c->type) { case LOG_INSERT: case LOG_DELETE: case LOG_UPDATE: la_bat_updates(lg, c); break; case LOG_CREATE: la_bat_create(lg, c); break; case LOG_USE: la_bat_use(lg, c); break; case LOG_DESTROY: la_bat_destroy(lg, c); break; case LOG_CLEAR: la_bat_clear(lg, c); break; }}static voidla_destroy( logaction *c ) { if (c->name) GDKfree(c->name); if (c->b) logbat_destroy(c->b);}static voidtr_grow(trans *tr){ if (tr->nr == tr->sz) { tr->sz <<= 1; tr->changes = (logaction*) GDKrealloc(tr->changes, tr->sz * sizeof(logaction)); } /* cleanup the next */ tr->changes[tr->nr].name = NULL; tr->changes[tr->nr].b = NULL;}static trans *tr_destroy(trans *tr){ trans *r = tr->tr; GDKfree(tr->changes); GDKfree(tr); return r;}static trans *tr_commit(logger *lg, trans *tr){ int i; if (lg->debug & 1) printf("tr_commit\n"); for(i=0; i<tr->nr; i++) { la_apply(lg, &tr->changes[i]); la_destroy(&tr->changes[i]); } return tr_destroy(tr);}static trans *tr_abort(logger *lg, trans *tr){ int i; if (lg->debug & 1) printf("tr_abort\n"); for(i=0; i<tr->nr; i++) la_destroy(&tr->changes[i]); return tr_destroy(tr);}static intlogger_open(logger *lg){ char filename[BUFSIZ]; snprintf(filename, BUFSIZ, "%s%s." LLFMT, lg->dir, LOGFILE, lg->id); lg->log = open_wstream(filename); if (stream_errnr(lg->log)) return LOG_ERR; return LOG_OK;}static voidlogger_close(logger *lg){ stream *log = lg->log; if (log) { stream_close(log); stream_destroy(log); } lg->log = NULL;}static intlogger_readlog(logger *lg, char *filename){ trans *tr = NULL; logformat l; int err = 0; lg->log = open_rstream(filename); /* if the file doesn't exist, there is nothing to be readback */ if (!lg->log || stream_errnr(lg->log)) { if (lg->log) stream_destroy(lg->log); return 0; } while (!err && log_read_format(lg, &l)) { char *name = NULL; if (l.flag != LOG_START && l.flag != LOG_END && l.flag != LOG_SEQ) { name = log_read_string(lg); if (!name) { err = -1; break; } } /* find proper transaction record */ if (l.flag != LOG_START) tr = tr_find(tr, l.tid); switch (l.flag) { case LOG_START: if (l.nr > lg->tid) lg->tid = l.nr; tr = tr_create(tr, l.nr); if (lg->debug & 1) fprintf(stderr, "logger tstart %d\n", tr->tid); break; case LOG_END: if (l.tid != l.nr) /* abort record */ tr = tr_abort(lg, tr); else tr = tr_commit(lg, tr); break; case LOG_SEQ: err = (log_read_seq(lg, &l) != LOG_OK); break; case LOG_INSERT: case LOG_DELETE: case LOG_UPDATE: err = (log_read_updates(lg, tr, &l, name) !=LOG_OK); break; case LOG_CREATE: err = (log_read_create(lg, tr, name) != LOG_OK); break; case LOG_USE: log_read_use(lg, tr, &l, name); break; case LOG_DESTROY: log_read_destroy(lg, tr, name); break; case LOG_CLEAR: log_read_clear(lg, tr, name); break; default: err = -2; } if (name) GDKfree(name); lg->changes++; } logger_close(lg); /* remaining transactions are not committed, ie abort */ while(tr) tr = tr_abort(lg, tr); return 0;}#line 803 "/export/scratch0/monet/monet.GNU.64.64.d.14791/MonetDB/src/gdk/gdk_logger.mx"static intlogger_readlogs(logger *lg, FILE *fp, char *filename){ int res = 0; char id[BUFSIZ]; if (lg->debug & 1) fprintf(stderr, "logger_readlogs %s\n", filename); while (fgets(id, BUFSIZ, fp) != NULL) { char buf[BUFSIZ]; lng lid = strtoll(id, NULL, 10); if (lid >= lg->id) { lg->id = lid; snprintf(buf, BUFSIZ, "%s." LLFMT, filename, lg->id); if ((res = logger_readlog(lg, buf)) != 0) { /* we cannot distinguish errors from incomplete transactions (even if we would log aborts in the logs). So we simply abort and move to the next log file */ (void)res; } } } return res;}static intlogger_commit(logger *lg){ int id = LOG_SID; if (lg->debug & 1) fprintf(stderr, "logger_commit\n"); BUNdelHead(lg->seqs, &id, FALSE); BUNins(lg->seqs, &id, &lg->id, FALSE); /* cleanup old snapshots */ if (BATcount(lg->snapshots)) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -