📄 gdk_logger.c
字号:
BATclear(lg->snapshots); BATcommit(lg->snapshots); } return bm_commit(lg);}static intcheck_version(logger *lg, FILE *fp){ int version = 0; if (fscanf(fp, "%6d", &version) != 1 || version != lg->version) { GDKerror("Incompatible database version %06d, " "this server supports version %06d\n" "Please move away %s and its corresponding dbfarm.", version, lg->version, lg->dir); return -1; } fgetc(fp); /* skip \n */ fgetc(fp); /* skip \n */ return 0;}staticint bm_subcommit( BAT * list, BAT * catalog ) { BUN p,q; BAT *n = logbat_new(TYPE_void, TYPE_str, BATcount(list)*2); BAT *b = list; int res; BATseqbase(n,0); /* first loop over deleted then over current and new */ for (p = b->batDeleted; p < b->batFirst; p = BUNnext(b, p)) { bat col = *(log_bid*)BUNhead(b,p); str name = BBPname(col); BUNappend(n, name, FALSE); } BATloop(b, p, q) { bat col = *(log_bid*)BUNhead(b,p); str name = BBPname(col); BUNappend(n, name, FALSE); } BUNappend(n, BBPname(catalog->batCacheid), FALSE); /* now commit catalog, so it's also up to date on disk */ BATcommit(catalog); res = TMsubcommit(n); BBPreclaim(n); return res;}static logger *logger_new(int debug, char *fn, char *logdir, char *dbname, int version){ int id = LOG_SID; logger *lg = (struct logger*)GDKmalloc(sizeof(struct logger)); FILE *fp; char filename[BUFSIZ]; char bak[BUFSIZ]; log_bid seqs = 0; bat catalog; snprintf(bak, BUFSIZ, "%s_catalog", fn); catalog = BBPindex(bak); lg->debug = debug; lg->changes = 0; lg->version = version; lg->id = 1; lg->tid = 0; snprintf(filename, BUFSIZ, "%s%c%s%c", logdir, DIR_SEP, dbname, DIR_SEP); lg->fn = GDKstrdup(fn); lg->dir = GDKstrdup(filename); lg->log = NULL; lg->catalog = NULL; if (catalog == 0) { log_bid bid = 0; lg->catalog = logbat_new(TYPE_int, TYPE_str, BATSIZE); if (debug) printf("create %s catalog\n", fn); bid = lg->catalog->batCacheid; /* Make persistent */ BBPincref(bid, TRUE); BATmode(lg->catalog, PERSISTENT); BBPrename(lg->catalog->batCacheid, bak); if (bm_subcommit(lg->catalog, lg->catalog) != 0) return NULL; } else { /* find the persistent catalog. As non persistent bats require a logical reference we also add a logical reference for the persistent bats */ BUN p, q; BAT *b = BATdescriptor(catalog); lg->catalog = b; BATloop(b, p, q) { bat bid = *(log_bid *) BUNhead(b, p); BBPincref(bid, TRUE); } } seqs = logger_find_bat(lg, "seqs"); if (seqs == 0) { lg->seqs = logbat_new(TYPE_int, TYPE_lng, 1); BATmode(lg->seqs, PERSISTENT); snprintf(bak, BUFSIZ, "%s_seqs", fn); BBPrename(lg->seqs->batCacheid, bak); logger_add_bat(lg, lg->seqs, "seqs"); BUNins(lg->seqs, &id, &lg->id, FALSE); lg->snapshots = logbat_new(TYPE_int, TYPE_int, 1); BATmode(lg->snapshots, PERSISTENT); snprintf(bak, BUFSIZ, "%s_snapshots", fn); BBPrename(lg->snapshots->batCacheid, bak); logger_add_bat(lg, lg->snapshots, "snapshots"); bm_subcommit(lg->catalog, lg->catalog); } else { bat snapshots = logger_find_bat(lg, "snapshots"); lg->seqs = BATdescriptor(seqs); if (BATcount(lg->seqs)) { lg->id = *(lng*)BUNtail(lg->seqs, BUNfnd(lg->seqs, &id)); } else { BUNins(lg->seqs, &id, &lg->id, FALSE); } lg->snapshots = BATdescriptor(snapshots); } snprintf(filename, BUFSIZ, "%s%s", lg->dir, LOGFILE); snprintf(bak, BUFSIZ, "%s.bak", filename); if ((fp = fopen(filename, "r")) != NULL) { if (check_version(lg, fp)) return NULL; lg->changes++; logger_readlogs(lg, fp, filename); } else if ((fp = fopen(bak, "r")) != NULL) { fclose(fp); GDKmove(lg->dir, LOGFILE, "bak", lg->dir, LOGFILE, NULL); if ((fp = fopen(filename, "r")) != NULL) { if (check_version(lg, fp)) return NULL; lg->changes++; logger_readlogs(lg, fp, filename); } } else if ((fp = fopen(filename, "w")) == NULL) { if (!GDKcreatedir(filename)) { GDKerror("logger could not create log directory %s\n", lg->dir); return NULL; } else if ((fp = fopen(filename, "w")) == NULL) { GDKerror("logger could not create file %s\n", filename); return NULL; } fprintf(fp, "%06d\n\n", lg->version); lg->id ++; fprintf(fp, LLFMT "\n", lg->id); } else { fprintf(fp, "%06d\n\n", lg->version); lg->id ++; fprintf(fp, LLFMT "\n", lg->id); } fclose(fp); return lg;}logger *logger_create(int debug, char *fn, char *logdir, char *dbname, int version){ logger *lg = logger_new(debug, fn, logdir, dbname, version); if (!lg) return NULL; if (logger_open(lg) == LOG_ERR) { logger_destroy(lg); return NULL; } if (lg->changes && (logger_restart(lg) != LOG_OK || logger_cleanup(lg) != LOG_OK)) { logger_destroy(lg); return NULL; } return lg;}voidlogger_destroy(logger *lg){ if (lg->catalog) { BUN p, q; BAT *b = lg->catalog; logger_cleanup(lg); /* destroy the deleted *//* would be an error .... for (p = b->batDeleted; p < b->batFirst; p = BUNnext(b, p)) { bat bid = *(log_bid *) BUNhead(b, p); BBPdecref(bid, TRUE); }*/ /* free resources */ BATloop(b, p, q) { bat bid = *(log_bid *) BUNhead(b, p); BBPdecref(bid, TRUE); } BBPdecref(lg->catalog->batCacheid, TRUE); logbat_destroy(lg->catalog); } GDKfree(lg->dir); logger_close(lg); GDKfree(lg);}intlogger_exit(logger *lg){ FILE *fp; char filename[BUFSIZ]; logger_close(lg); if (GDKmove(lg->dir, LOGFILE, NULL, lg->dir, LOGFILE, "bak") < 0) { return LOG_ERR; } snprintf(filename, BUFSIZ, "%s%s", lg->dir, LOGFILE); if ((fp = fopen(filename, "w")) != NULL) { char ext[BUFSIZ]; fprintf(fp, "%06d\n\n", lg->version); lg->id ++; if (logger_commit(lg) != LOG_OK) return LOG_ERR; fprintf(fp, LLFMT "\n", lg->id); fclose(fp); /* atomic action, switch to new log, keep old for later cleanup actions */ snprintf(ext, BUFSIZ, "bak-" LLFMT, lg->id); if (GDKmove(lg->dir, LOGFILE, "bak", lg->dir, LOGFILE, ext) < 0) return LOG_ERR; } else { GDKerror("logger could not open %s\n", filename); return LOG_ERR; } return LOG_OK;}intlogger_restart(logger *lg){ int res = 0; if ((res = logger_exit(lg)) == LOG_OK) res = logger_open(lg); return res;}intlogger_cleanup(logger *lg){ char buf[BUFSIZ]; char id[BUFSIZ]; FILE *fp = NULL; snprintf(buf, BUFSIZ, "%s%s.bak-" LLFMT, lg->dir, LOGFILE, lg->id); if (lg->debug & 1) fprintf(stderr, "logger_cleanup %s\n", buf); if ((fp = fopen(buf, "r")) == NULL) return LOG_ERR; /* skip catalog */ while (fgets(id, BUFSIZ, fp) != NULL && id[0] != '\n') ; while (fgets(id, BUFSIZ, fp) != NULL) { char *e = strchr(id, '\n'); if (e) *e = 0; GDKunlink(lg->dir, LOGFILE, id); } fclose(fp); snprintf(buf, BUFSIZ, "bak-" LLFMT, lg->id); GDKunlink(lg->dir, LOGFILE, buf); return LOG_OK;}intlogger_changes(logger *lg){ return lg->changes;}intlogger_sequence(logger *lg, int seq, lng *id){ ptr p = BUNfnd(lg->seqs, &seq); if (p) { *id = *(lng*)BUNtail(lg->seqs, p); return 1; } return 0;}#line 1177 "/export/scratch0/monet/monet.GNU.64.64.d.14791/MonetDB/src/gdk/gdk_logger.mx"intlog_bat_persists(logger *lg, BAT *b, char *name){ char *ha, *ta; int len; char buf[BUFSIZ]; logformat l; int havevoid = 0; int flag = (b->batPersistence == PERSISTENT)?LOG_USE:LOG_CREATE; l.nr = 0; if (flag == LOG_USE) l.nr = b->batCacheid; l.flag = flag; l.tid = lg->tid; lg->changes++; if (log_write_format(lg, &l) == LOG_ERR || log_write_string(lg, name) == LOG_ERR) return LOG_ERR; if (flag == LOG_USE) { if (BUNfnd(lg->snapshots, &b->batCacheid)) { BUNdelHead(lg->snapshots, &b->batCacheid, FALSE); } else { BBPincref(b->batCacheid, TRUE); } BUNins(lg->snapshots, &b->batCacheid, &lg->tid, FALSE); return LOG_OK; } ha = ATOMname(b->htype); if (b->htype == TYPE_void && BAThdense(b)) { ha = "vid"; havevoid = 1; } ta = ATOMname(b->ttype); if (!havevoid && b->ttype == TYPE_void && BATtdense(b)) { ta = "vid"; } len = snprintf(buf, BUFSIZ, "%s,%s", ha, ta); len++; /* include EOS */ if (!stream_writeInt(lg->log, len) || stream_write(lg->log, buf, 1, len) != (ssize_t) len) return LOG_ERR; if (lg->debug & 1) fprintf(stderr, "Logged new bat [%s,%s] %s " SZFMT "\n", ha, ta, name, BATcount(b)); return log_bat(lg, b, name);}intlog_bat_transient(logger *lg, char *name){ log_bid bid = logger_find_bat(lg, name); logformat l; l.flag = LOG_DESTROY; l.tid = lg->tid; l.nr = 0; lg->changes++; /* if this is a snapshot bat, we need to skip all changes */ if (BUNfnd(lg->snapshots, &bid)) { BUNdelHead(lg->snapshots, &bid, FALSE); BUNins(lg->snapshots, &bid, &lg->tid, FALSE); } if (log_write_format(lg, &l) == LOG_ERR || log_write_string(lg, name) == LOG_ERR) return LOG_ERR; if (lg->debug & 1) fprintf(stderr, "Logged destroyed bat %s\n", name); return LOG_OK;}intlog_delta(logger *lg, BAT *b, char *name){ int ok = GDK_SUCCEED; logformat l; BUN p; if (lg->debug & 128) { /* logging is switched off */ return LOG_OK; } l.tid = lg->tid; l.nr = (int)(BUNindex(b, BUNlast(b)) - BUNindex(b, BUNfirst(b))); lg->changes += l.nr; if (l.nr) { int (*wh) (ptr, stream *, size_t) = b->htype == TYPE_void ? BATatoms[TYPE_oid].atomWrite : BATatoms[b->htype].atomWrite; int (*wt) (ptr, stream *, size_t) = BATatoms[b->ttype].atomWrite; l.flag = LOG_UPDATE; if (log_write_format(lg, &l) == LOG_ERR || log_write_string(lg, name) == LOG_ERR) return LOG_ERR; for (p = BUNfirst(b); p < BUNlast(b) && ok == GDK_SUCCEED; p = BUNnext(b, p)) { ptr h = BUNhead(b, p); ptr t = BUNtail(b, p); ok = wh(h, lg->log, 1); ok = (ok == GDK_FAIL) ? ok : wt(t, lg->log, 1); } if (lg->debug) fprintf(stderr, "Logged %s %d inserts\n", name, l.nr); } return (ok == GDK_SUCCEED) ? LOG_OK : LOG_ERR;}intlog_bat(logger *lg, BAT *b, char *name){ int ok = GDK_SUCCEED; logformat l; BUN p; if (lg->debug & 128) { /* logging is switched off */ return LOG_OK; } l.tid = lg->tid; l.nr = (int)(BUNindex(b, BUNlast(b)) - BUNindex(b, b->batInserted)); lg->changes += l.nr; if (l.nr) { int (*wh) (ptr, stream *, size_t) = BATatoms[b->htype].atomWrite; int (*wt) (ptr, stream *, size_t) = BATatoms[b->ttype].atomWrite; l.flag = LOG_INSERT; if (log_write_format(lg, &l) == LOG_ERR || log_write_string(lg, name) == LOG_ERR) return LOG_ERR; if (b->htype == TYPE_void && b->ttype < TYPE_str && !VIEWparent(b)) { ptr t = BUNtail(b, b->batInserted); ok = wt(t, lg->log, l.nr); } else { for (p = b->batInserted; p < BUNlast(b) && ok == GDK_SUCCEED; p = BUNnext(b, p)) { ptr h = BUNhead(b, p); ptr t = BUNtail(b, p); ok = wh(h, lg->log, 1); ok = (ok == GDK_FAIL) ? ok : wt(t, lg->log, 1); } } if (lg->debug) fprintf(stderr, "Logged %s %d inserts\n", name, l.nr); } l.nr = (int)(BUNindex(b, b->batFirst) - BUNindex(b, b->batDeleted)); lg->changes += l.nr; if (l.nr && ok == GDK_SUCCEED) { int (*wh) (ptr, stream *, size_t) = BATatoms[b->htype].atomWrite; int (*wt) (ptr, stream *, size_t) = BATatoms[b->ttype].atomWrite; l.flag = LOG_DELETE; if (log_write_format(lg, &l) == LOG_ERR || log_write_string(lg, name) == LOG_ERR) return LOG_ERR; for (p = b->batDeleted; p < b->batFirst && ok == GDK_SUCCEED; p = BUNnext(b, p)) { ptr h = BUNhead(b, p); ptr t = BUNtail(b, p); ok = wh(h, lg->log, 1); ok = (ok == GDK_FAIL) ? ok : wt(t, lg->log, 1); } if (lg->debug) fprintf(stderr, "Logged %s %d deletes\n", name, l.nr); } BATcommit(b); return (ok == GDK_SUCCEED) ? LOG_OK : LOG_ERR;}intlog_bat_clear(logger *lg, char *name){ int ok = GDK_SUCCEED; logformat l; if (lg->debug & 128) { /* logging is switched off */ return LOG_OK; } l.nr = 1; l.tid = lg->tid; lg->changes += l.nr; l.flag = LOG_CLEAR; if (log_write_format(lg, &l) == LOG_ERR || log_write_string(lg, name) == LOG_ERR) return LOG_ERR; return (ok == GDK_SUCCEED) ? LOG_OK : LOG_ERR;}intlog_tstart(logger *lg){ logformat l; l.flag = LOG_START; l.tid = ++lg->tid; l.nr = lg->tid; if (lg->debug) fprintf(stderr, "log_tstart %d\n", lg->tid); return log_write_format(lg, &l);}intlog_tend(logger *lg){ logformat l; int res = 0; if (lg->debug) fprintf(stderr, "log_tend %d\n", lg->tid); /* first subcommit the snapshots */ if (DELTAdirty(lg->snapshots)) { /* sub commit all new snapshots */ BAT *b = BATselect(lg->snapshots, &lg->tid, &lg->tid); res = bm_subcommit(b, lg->snapshots); BBPunfix(b->batCacheid); } l.flag = LOG_END; l.tid = lg->tid; l.nr = lg->tid; if (res || log_write_format(lg, &l) == LOG_ERR || stream_flush(lg->log) || stream_fsync(lg->log)) return LOG_ERR; return LOG_OK;}intlog_abort(logger *lg){ logformat l; if (lg->debug) fprintf(stderr, "log_abort %d\n", lg->tid); l.flag = LOG_END; l.tid = lg->tid; l.nr = -1; if (log_write_format(lg, &l) == LOG_ERR) return LOG_ERR; return LOG_OK;}/* a transaction in it self */intlog_sequence(logger *lg, int seq, lng id){ logformat l; l.flag = LOG_SEQ; l.tid = lg->tid; l.nr = seq; if (lg->debug) fprintf(stderr, "log_sequence %d" LLFMT "\n", seq, id); if (BUNfnd(lg->seqs, &seq)) { BUNdelHead(lg->seqs, &seq, FALSE); } BUNins(lg->seqs, &seq, &id, FALSE); if (log_write_format(lg, &l) == LOG_ERR || !stream_writeLng(lg->log, id) || stream_flush(lg->log) || stream_fsync(lg->log)) return LOG_ERR; return LOG_OK;}static intbm_commit(logger *lg){ BUN p; BAT *b = lg->catalog; /* remove the destroyed bats */ for (p = b->batDeleted; p < b->batFirst; p = BUNnext(b, p)) { bat bid = *(log_bid *) BUNhead(b, p); BAT *lb = BATdescriptor(bid); BATmode(lb, TRANSIENT); BBPdecref(bid, TRUE); logbat_destroy(lb); } for (p = b->batInserted; p < BUNlast(b); p = BUNnext(b, p)) { log_bid bid = *(log_bid *) BUNhead(b, p); BAT *lb = BATdescriptor(bid); BATmode(lb, PERSISTENT); if (BATcount(lb) > (size_t) REMAP_PAGE_MAXSIZE) BATmmap(lb, STORE_MMAP, STORE_MMAP, STORE_MMAP); logbat_destroy(lb); } if (bm_subcommit(lg->catalog, lg->catalog) != 0) return LOG_ERR; return LOG_OK;}log_bidlogger_add_bat(logger *lg, BAT *b, char *name){ log_bid bid = logger_find_bat(lg, name); if (bid) { if (bid != b->batCacheid) BUNdelHead(lg->catalog, &bid, FALSE); else return bid; } bid = b->batCacheid; if (lg->debug) printf("create %s\n", name); BUNins(lg->catalog, &bid, name, FALSE); BBPincref(bid, TRUE); return bid;}voidlogger_del_bat(logger *lg, log_bid bid){ BUNdelHead(lg->catalog, &bid, FALSE);}log_bidlogger_find_bat(logger *lg, char *name){ BAT *r_catalog = BATmirror(lg->catalog); log_bid res = 0; BUN p = BUNfnd(r_catalog, name); if (p) res = *(log_bid *) BUNtail(r_catalog, p); return res;}#line 1528 "/export/scratch0/monet/monet.GNU.64.64.d.14791/MonetDB/src/gdk/gdk_logger.mx"
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -