gdk_logger.mx
来自「这个是内存数据库中的一个管理工具」· MX 代码 · 共 1,528 行 · 第 1/3 页
MX
1,528 行
if (strcmp(ha, "vid") == 0) { ht = -1; } else { ht = ATOMindex(ha); } if (strcmp(ta, "vid") == 0) { tt = -1; } else { tt = ATOMindex(ta); } tr_grow(tr); tr->changes[tr->nr].type = LOG_CREATE; tr->changes[tr->nr].ht = ht; tr->changes[tr->nr].tt = tt; tr->changes[tr->nr].name = GDKstrdup(name); tr->changes[tr->nr].b = NULL; tr->nr++; } if (buf) GDKfree(buf); return LOG_OK;}static voidla_bat_create(logger *lg, logaction *la){ int ht = (la->ht<0)?TYPE_void:la->ht; int tt = (la->tt<0)?TYPE_void:la->tt; BAT *b = BATnew(ht, tt, BATSIZE); if (la->ht<0) BATseqbase(b, 0); if (la->tt<0) BATseqbase(BATmirror(b), 0); logger_add_bat(lg, b, la->name); logbat_destroy(b);}static voidlog_read_use(logger *lg, trans *tr, logformat *l, char *name){ (void)lg; tr_grow(tr); tr->changes[tr->nr].type = LOG_USE; tr->changes[tr->nr].nr = l->nr; tr->changes[tr->nr].name = GDKstrdup(name); tr->changes[tr->nr].b = NULL; tr->nr++;}static voidla_bat_use(logger *lg, logaction *la ){ log_bid bid = la->nr; BAT *b = BATdescriptor(bid); if (!b) { GDKerror("logger could not use bat (" OIDFMT ") for %s\n", bid, la->name); return ; } logger_add_bat(lg, b, la->name); logbat_destroy(b);}#define TR_SIZE 1024static trans*tr_create(trans *tr, int tid){ trans *ntr = (trans*)GDKmalloc(sizeof(trans)); ntr->tid = tid; ntr->sz = TR_SIZE; ntr->nr = 0; ntr->changes = (logaction*)GDKmalloc(sizeof(logaction)*TR_SIZE); ntr->tr = tr; return ntr;}static trans *tr_find(trans *tr, int tid)/* finds the tid and reorders the chain list, puts trans with tid first */{ trans *t = tr, *p = NULL; while(t && t->tid != tid) { p = t; t = t->tr; } if (!t) return NULL; /* BAD missing transaction */ if (t == tr) return tr; if (t->tr) /* get this tid out of the list */ p->tr = t->tr; t->tr = tr; /* and move it to the front */ return t;}static voidla_apply( logger *lg, logaction *c ){ switch(c->type) { case LOG_INSERT: case LOG_DELETE: case LOG_UPDATE: la_bat_updates(lg, c); break; case LOG_CREATE: la_bat_create(lg, c); break; case LOG_USE: la_bat_use(lg, c); break; case LOG_DESTROY: la_bat_destroy(lg, c); break; case LOG_CLEAR: la_bat_clear(lg, c); break; }}static voidla_destroy( logaction *c ) { if (c->name) GDKfree(c->name); if (c->b) logbat_destroy(c->b);}static voidtr_grow(trans *tr){ if (tr->nr == tr->sz) { tr->sz <<= 1; tr->changes = (logaction*) GDKrealloc(tr->changes, tr->sz * sizeof(logaction)); } /* cleanup the next */ tr->changes[tr->nr].name = NULL; tr->changes[tr->nr].b = NULL;}static trans *tr_destroy(trans *tr){ trans *r = tr->tr; GDKfree(tr->changes); GDKfree(tr); return r;}static trans *tr_commit(logger *lg, trans *tr){ int i; if (lg->debug & 1) printf("tr_commit\n"); for(i=0; i<tr->nr; i++) { la_apply(lg, &tr->changes[i]); la_destroy(&tr->changes[i]); } return tr_destroy(tr);}static trans *tr_abort(logger *lg, trans *tr){ int i; if (lg->debug & 1) printf("tr_abort\n"); for(i=0; i<tr->nr; i++) la_destroy(&tr->changes[i]); return tr_destroy(tr);}static intlogger_open(logger *lg){ char filename[BUFSIZ]; snprintf(filename, BUFSIZ, "%s%s." LLFMT, lg->dir, LOGFILE, lg->id); lg->log = open_wstream(filename); if (stream_errnr(lg->log)) return LOG_ERR; return LOG_OK;}static voidlogger_close(logger *lg){ stream *log = lg->log; if (log) { stream_close(log); stream_destroy(log); } lg->log = NULL;}static intlogger_readlog(logger *lg, char *filename){ trans *tr = NULL; logformat l; int err = 0; lg->log = open_rstream(filename); /* if the file doesn't exist, there is nothing to be readback */ if (!lg->log || stream_errnr(lg->log)) { if (lg->log) stream_destroy(lg->log); return 0; } while (!err && log_read_format(lg, &l)) { char *name = NULL; if (l.flag != LOG_START && l.flag != LOG_END && l.flag != LOG_SEQ) { name = log_read_string(lg); if (!name) { err = -1; break; } } /* find proper transaction record */ if (l.flag != LOG_START) tr = tr_find(tr, l.tid); switch (l.flag) { case LOG_START: if (l.nr > lg->tid) lg->tid = l.nr; tr = tr_create(tr, l.nr); if (lg->debug & 1) fprintf(stderr, "logger tstart %d\n", tr->tid); break; case LOG_END: if (l.tid != l.nr) /* abort record */ tr = tr_abort(lg, tr); else tr = tr_commit(lg, tr); break; case LOG_SEQ: err = (log_read_seq(lg, &l) != LOG_OK); break; case LOG_INSERT: case LOG_DELETE: case LOG_UPDATE: err = (log_read_updates(lg, tr, &l, name) !=LOG_OK); break; case LOG_CREATE: err = (log_read_create(lg, tr, name) != LOG_OK); break; case LOG_USE: log_read_use(lg, tr, &l, name); break; case LOG_DESTROY: log_read_destroy(lg, tr, name); break; case LOG_CLEAR: log_read_clear(lg, tr, name); break; default: err = -2; } if (name) GDKfree(name); lg->changes++; } logger_close(lg); /* remaining transactions are not committed, ie abort */ while(tr) tr = tr_abort(lg, tr); return 0;}@-The log files are incrementally numbered. They are processed in thesame sequence.@cstatic intlogger_readlogs(logger *lg, FILE *fp, char *filename){ int res = 0; char id[BUFSIZ]; if (lg->debug & 1) fprintf(stderr, "logger_readlogs %s\n", filename); while (fgets(id, BUFSIZ, fp) != NULL) { char buf[BUFSIZ]; lng lid = strtoll(id, NULL, 10); if (lid >= lg->id) { lg->id = lid; snprintf(buf, BUFSIZ, "%s." LLFMT, filename, lg->id); if ((res = logger_readlog(lg, buf)) != 0) { /* we cannot distinguish errors from incomplete transactions (even if we would log aborts in the logs). So we simply abort and move to the next log file */ (void)res; } } } return res;}static intlogger_commit(logger *lg){ int id = LOG_SID; if (lg->debug & 1) fprintf(stderr, "logger_commit\n"); BUNdelHead(lg->seqs, &id, FALSE); BUNins(lg->seqs, &id, &lg->id, FALSE); /* cleanup old snapshots */ if (BATcount(lg->snapshots)) { BATclear(lg->snapshots); BATcommit(lg->snapshots); } return bm_commit(lg);}static intcheck_version(logger *lg, FILE *fp){ int version = 0; if (fscanf(fp, "%6d", &version) != 1 || version != lg->version) { GDKerror("Incompatible database version %06d, " "this server supports version %06d\n" "Please move away %s and its corresponding dbfarm.", version, lg->version, lg->dir); return -1; } fgetc(fp); /* skip \n */ fgetc(fp); /* skip \n */ return 0;}staticint bm_subcommit( BAT * list, BAT * catalog ) { BUN p,q; BAT *n = logbat_new(TYPE_void, TYPE_str, BATcount(list)*2); BAT *b = list; int res; BATseqbase(n,0); /* first loop over deleted then over current and new */ for (p = b->batDeleted; p < b->batFirst; p = BUNnext(b, p)) { bat col = *(log_bid*)BUNhead(b,p); str name = BBPname(col); BUNappend(n, name, FALSE); } BATloop(b, p, q) { bat col = *(log_bid*)BUNhead(b,p); str name = BBPname(col); BUNappend(n, name, FALSE); } BUNappend(n, BBPname(catalog->batCacheid), FALSE); /* now commit catalog, so it's also up to date on disk */ BATcommit(catalog); res = TMsubcommit(n); BBPreclaim(n); return res;}static logger *logger_new(int debug, char *fn, char *logdir, char *dbname, int version){ int id = LOG_SID; logger *lg = (struct logger*)GDKmalloc(sizeof(struct logger)); FILE *fp; char filename[BUFSIZ]; char bak[BUFSIZ]; log_bid seqs = 0; bat catalog; snprintf(bak, BUFSIZ, "%s_catalog", fn); catalog = BBPindex(bak); lg->debug = debug; lg->changes = 0; lg->version = version; lg->id = 1; lg->tid = 0; snprintf(filename, BUFSIZ, "%s%c%s%c", logdir, DIR_SEP, dbname, DIR_SEP); lg->fn = GDKstrdup(fn); lg->dir = GDKstrdup(filename); lg->log = NULL; lg->catalog = NULL; if (catalog == 0) { log_bid bid = 0; lg->catalog = logbat_new(TYPE_int, TYPE_str, BATSIZE); if (debug) printf("create %s catalog\n", fn); bid = lg->catalog->batCacheid; /* Make persistent */ BBPincref(bid, TRUE); BATmode(lg->catalog, PERSISTENT); BBPrename(lg->catalog->batCacheid, bak); if (bm_subcommit(lg->catalog, lg->catalog) != 0) return NULL; } else { /* find the persistent catalog. As non persistent bats require a logical reference we also add a logical reference for the persistent bats */ BUN p, q; BAT *b = BATdescriptor(catalog); lg->catalog = b; BATloop(b, p, q) { bat bid = *(log_bid *) BUNhead(b, p); BBPincref(bid, TRUE); } } seqs = logger_find_bat(lg, "seqs"); if (seqs == 0) { lg->seqs = logbat_new(TYPE_int, TYPE_lng, 1); BATmode(lg->seqs, PERSISTENT); snprintf(bak, BUFSIZ, "%s_seqs", fn); BBPrename(lg->seqs->batCacheid, bak); logger_add_bat(lg, lg->seqs, "seqs"); BUNins(lg->seqs, &id, &lg->id, FALSE); lg->snapshots = logbat_new(TYPE_int, TYPE_int, 1); BATmode(lg->snapshots, PERSISTENT); snprintf(bak, BUFSIZ, "%s_snapshots", fn); BBPrename(lg->snapshots->batCacheid, bak); logger_add_bat(lg, lg->snapshots, "snapshots"); bm_subcommit(lg->catalog, lg->catalog); } else { bat snapshots = logger_find_bat(lg, "snapshots"); lg->seqs = BATdescriptor(seqs); if (BATcount(lg->seqs)) { lg->id = *(lng*)BUNtail(lg->seqs, BUNfnd(lg->seqs, &id)); } else { BUNins(lg->seqs, &id, &lg->id, FALSE); } lg->snapshots = BATdescriptor(snapshots); } snprintf(filename, BUFSIZ, "%s%s", lg->dir, LOGFILE); snprintf(bak, BUFSIZ, "%s.bak", filename); if ((fp = fopen(filename, "r")) != NULL) { if (check_version(lg, fp)) return NULL; lg->changes++; logger_readlogs(lg, fp, filename); } else if ((fp = fopen(bak, "r")) != NULL) { fclose(fp); GDKmove(lg->dir, LOGFILE, "bak", lg->dir, LOGFILE, NULL); if ((fp = fopen(filename, "r")) != NULL) { if (check_version(lg, fp)) return NULL; lg->changes++; logger_readlogs(lg, fp, filename); } } else if ((fp = fopen(filename, "w")) == NULL) { if (!GDKcreatedir(filename)) { GDKerror("logger could not create log directory %s\n", lg->dir); return NULL; } else if ((fp = fopen(filename, "w")) == NULL) { GDKerror("logger could not create file %s\n", filename); return NULL; } fprintf(fp, "%06d\n\n", lg->version); lg->id ++; fprintf(fp, LLFMT "\n", lg->id); } else { fprintf(fp, "%06d\n\n", lg->version); lg->id ++; fprintf(fp, LLFMT "\n", lg->id); } fclose(fp); return lg;}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?