📄 test_event_merge.cpp
字号:
// copy to gci level copyop(com_op, gci_op); tot_op->num_com += 1; } ll1("makeops: used ops = " << g_usedops);}static intaddndbop(Op* op){ chkdb((g_op = g_con->getNdbOperation(g_tabname)) != 0); switch (op->type) { case Op::INS: chkdb(g_op->insertTuple() == 0); break; case Op::DEL: chkdb(g_op->deleteTuple() == 0); break; case Op::UPD: chkdb(g_op->updateTuple() == 0); break; default: assert(false); break; } uint i; for (i = 0; i < ncol(); i++) { const Col& c = getcol(i); const Data& d = op->data[0]; if (! c.pk) continue; chkdb(g_op->equal(c.name, (const char*)d.ptr[i].v) == 0); } if (op->type != Op::DEL) { for (i = 0; i < ncol(); i++) { const Col& c = getcol(i); const Data& d = op->data[0]; if (c.pk) continue; if (d.noop & (1 << i)) continue; assert(d.ind[i] >= 0); if (! c.isblob()) { if (d.ind[i] == 0) chkdb(g_op->setValue(c.name, (const char*)d.ptr[i].v) == 0); else chkdb(g_op->setValue(c.name, (const char*)0) == 0); } else { const Data::Txt& t = *d.ptr[i].txt; g_bh = g_op->getBlobHandle(c.name); if (d.ind[i] == 0) chkdb(g_bh->setValue(t.val, t.len) == 0); else chkdb(g_bh->setValue(0, 0) == 0); g_bh = 0; } } } g_op = 0; return 0;}static intrunops(){ ll1("runops"); Uint32 pk1; Op* gci_op[g_maxpk]; uint left = 0; // number of pks with ops for (pk1 = 0; pk1 < g_opts.maxpk; pk1++) { gci_op[pk1] = 0; // total op on the pk Op* tot_op = g_pk_op[pk1]; if (tot_op == 0) continue; // first commit chain assert(tot_op->next_gci != 0); gci_op[pk1] = tot_op->next_gci; left++; } while (left != 0) { pk1 = urandom(g_opts.maxpk); if (gci_op[pk1] == 0) continue; // do the ops in one transaction chkdb((g_con = g_ndb->startTransaction()) != 0); Op* com_op = gci_op[pk1]->next_com; assert(com_op != 0); // first op in chain Op* op = com_op->next_op; assert(op != 0); while (op != 0) { ll2("runops:" << *op); chkrc(addndbop(op) == 0); op = op->next_op; } chkdb(g_con->execute(Commit) == 0); gci_op[pk1]->gci = com_op->gci = g_con->getGCI(); ll2("commit: gci=" << com_op->gci); g_ndb->closeTransaction(g_con); g_con = 0; // next chain gci_op[pk1] = gci_op[pk1]->next_gci; if (gci_op[pk1] == 0) { assert(left != 0); left--; } } assert(left == 0); return 0;}// move com chains with same gci under same gci entrystatic intmergeops(){ ll1("mergeops"); uint mergecnt = 0; Uint32 pk1; for (pk1 = 0; pk1 < g_opts.maxpk; pk1++) { Op* tot_op = g_pk_op[pk1]; if (tot_op == 0) continue; Op* gci_op = tot_op->next_gci; assert(gci_op != 0); while (gci_op != 0) { Op* com_op = gci_op->next_com; assert(com_op != 0 && com_op->next_com == 0); assert(gci_op->gci == com_op->gci); Op* last_com = com_op; Op* gci_op2 = gci_op->next_gci; while (gci_op2 != 0 && gci_op->gci == gci_op2->gci) { // move link to com level last_com = last_com->next_com = gci_op2->next_com; // merge to gci reqrc(compop(gci_op, gci_op2, gci_op) == 0); // move to next and discard Op* tmp_op = gci_op2; gci_op2 = gci_op2->next_gci; freeop(tmp_op); mergecnt++; } gci_op = gci_op->next_gci = gci_op2; } } ll1("mergeops: used ops = " << g_usedops); ll1("mergeops: merged " << mergecnt << " gci entries"); return 0;}// set bit for equal post/pre data in UPD, for use in event matchstatic voidcmppostpre(){ ll1("cmppostpre"); Uint32 pk1; for (pk1 = 0; pk1 < g_opts.maxpk; pk1++) { Op* tot_op = g_pk_op[pk1]; Op* gci_op = tot_op ? tot_op->next_gci : 0; while (gci_op != 0) { if (gci_op->type == Op::UPD) { Data (&d)[2] = gci_op->data; uint i; for (i = 0; i < ncol(); i++) { const Col& c = getcol(i); bool eq = d[0].ind[i] == 1 && d[1].ind[i] == 1 || d[0].ind[i] == 0 && d[1].ind[i] == 0 && cmpcol(c, d[0], d[1]) == 0; if (eq) { d[0].ppeq |= (1 << i); d[1].ppeq |= (1 << i); } } } gci_op = gci_op->next_gci; } }}static intcmpopevdata(const Data& d1, const Data& d2){ uint i; for (i = 0; i < ncol(); i++) { const Col& c = getcol(i); if (cmpcol(c, d1, d2) != 0) { if ((d1.ppeq & (1 << i)) && d2.ind[i] == -1) ; // post/pre data equal and no event data returned is OK else return 1; } } return 0;}// compare operation to event datastatic intcmpopevdata(const Data (&d1)[2], const Data (&d2)[2]){ if (cmpopevdata(d1[0], d2[0]) != 0) return 1; if (cmpopevdata(d1[1], d2[1]) != 0) return 1; return 0;}static intmatchevent(Op* ev){ Op::Type t = ev->type; Data (&d2)[2] = ev->data; // get PK Uint32 pk1 = d2[0].pk1; chkrc(pk1 < g_opts.maxpk); // on error repeat and print details uint loop = 0; while (loop <= 1) { uint g_loglevel = loop == 0 ? g_opts.loglevel : 2; ll1("matchevent: pk1=" << pk1 << " type=" << t); ll2("EVT: " << *ev); Op* tot_op = g_pk_op[pk1]; Op* gci_op = tot_op ? tot_op->next_gci : 0; uint pos = 0; bool ok = false; while (gci_op != 0) { ll2("GCI: " << *gci_op); // print details Op* com_op = gci_op->next_com; assert(com_op != 0); while (com_op != 0) { ll2("COM: " << *com_op); Op* op = com_op->next_op; assert(op != 0); while (op != 0) { ll2("OP : " << *op); op = op->next_op; } com_op = com_op->next_com; } // match agains GCI op if (gci_op->type != Op::NUL) { const Data (&d1)[2] = gci_op->data; if (cmpopevdata(d1, d2) == 0) { bool tmpok = true; if (gci_op->type != t) { ll2("***: wrong type " << gci_op->type << " != " << t); tmpok = false; } if (gci_op->match) { ll2("***: duplicate match"); tmpok = false; } if (pos != g_ev_pos[pk1]) { ll2("***: wrong pos " << pos << " != " << g_ev_pos[pk1]); tmpok = false; } if (gci_op->gci != ev->gci) { ll2("***: wrong gci " << gci_op->gci << " != " << ev->gci); tmpok = false; } if (tmpok) { ok = gci_op->match = true; ll2("===: match"); } } pos++; } gci_op = gci_op->next_gci; } if (ok) { ll1("matchevent: match"); return 0; } ll1("matchevent: ERROR: no match"); if (g_loglevel >= 2) return -1; loop++; } return 0;}static intmatchevents(){ uint nomatch = 0; Uint32 pk1; for (pk1 = 0; pk1 < g_opts.maxpk; pk1++) { Op* tot_ev = g_pk_ev[pk1]; if (tot_ev == 0) continue; Op* ev = tot_ev->next_ev; while (ev != 0) { if (matchevent(ev) < 0) nomatch++; g_ev_pos[pk1]++; ev = ev->next_ev; } } chkrc(nomatch == 0); return 0;}static intmatchops(){ Uint32 pk1; for (pk1 = 0; pk1 < g_opts.maxpk; pk1++) { Op* tot_op = g_pk_op[pk1]; if (tot_op == 0) continue; Op* com_op = tot_op->next_com; while (com_op != 0) { if (com_op->type != Op::NUL && ! com_op->match) { ll0("COM: " << *com_op); Op* op = com_op->next_op; assert(op != 0); while (op != 0) { ll0("---: " << *op); op = op->next_op; } ll0("no matching event"); return -1; } com_op = com_op->next_com; } } return 0;}static voidgeteventdata(){ Data (&d)[2] = g_rec_ev->data; int i, j; for (j = 0; j < 2; j++) { for (i = 0; i < ncol(); i++) { const Col& c = getcol(i); int ind, ret; if (! c.isblob()) { NdbRecAttr* ra = g_ev_ra[j][i]; ind = ra->isNULL(); } else {#ifdef version51rbr NdbBlob* bh = g_ev_bh[j][i]; ret = bh->getDefined(ind); assert(ret == 0); if (ind == 0) { // value was returned and is not NULL Data::Txt& t = *d[j].ptr[i].txt; Uint64 len64; ret = bh->getLength(len64); assert(ret == 0); t.len = (uint)len64; delete [] t.val; t.val = new char [t.len]; memset(t.val, 'X', t.len); Uint32 len = t.len; ret = bh->readData(t.val, len); assert(ret == 0 && len == t.len); }#endif } d[j].ind[i] = ind; } }}static intrunevents(){ ll1("runevents"); uint mspoll = 1000; uint npoll = 6; // strangely long delay while (npoll != 0) { npoll--; int ret; ll1("poll"); ret = g_ndb->pollEvents(mspoll); if (ret <= 0) continue; while (1) { g_rec_ev->init(Op::EV);#ifdef version50 int overrun = g_opts.maxops; chkdb((ret = g_evt_op->next(&overrun)) >= 0); chkrc(overrun == 0); if (ret == 0) break;#else NdbEventOperation* tmp_op = g_ndb->nextEvent(); if (tmp_op == 0) break; reqrc(g_evt_op == tmp_op);#endif chkrc(seteventtype(g_rec_ev, g_evt_op->getEventType()) == 0); geteventdata(); g_rec_ev->gci = g_evt_op->getGCI();#ifdef version50 // fix to match 5.1 if (g_rec_ev->type == Op::UPD) { Uint32 pk1 = g_rec_ev->data[0].pk1; makedata(getcol("pk1"), g_rec_ev->data[1], pk1, Op::UPD); makedata(getcol("pk2"), g_rec_ev->data[1], pk1, Op::UPD); }#endif // get indicators and blob value ll2("runevents: EVT: " << *g_rec_ev); // check basic sanity Uint32 pk1 = ~(Uint32)0; chkrc(checkop(g_rec_ev, pk1) == 0); // add to events Op* tot_ev = g_pk_ev[pk1]; if (tot_ev == 0) tot_ev = g_pk_ev[pk1] = getop(Op::EV); Op* last_ev = tot_ev; while (last_ev->next_ev != 0) last_ev = last_ev->next_ev; // copy and add Op* ev = getop(Op::EV); copyop(g_rec_ev, ev); last_ev->next_ev = ev; } } ll1("runevents: used ops = " << g_usedops); return 0;}static voidsetseed(int n){ uint seed; if (n == -1) { if (g_opts.seed == 0) return; if (g_opts.seed != -1) seed = (uint)g_opts.seed; else seed = 1 + (ushort)getpid(); } else { if (g_opts.seed != 0) return; seed = n; } ll0("seed=" << seed); srandom(seed);}static intruntest(){ setseed(-1); chkrc(createtable() == 0); chkrc(createevent() == 0); for (g_loop = 0; g_opts.loop == 0 || g_loop < g_opts.loop; g_loop++) { ll0("loop " << g_loop); setseed(g_loop); resetmem(); chkrc(scantab() == 0); // alternative: save tot_op for loop > 0 makeops(); g_rec_ev = getop(Op::EV); chkrc(createeventop() == 0); chkdb(g_evt_op->execute() == 0); chkrc(waitgci() == 0); chkrc(runops() == 0); if (! g_opts.separate_events) chkrc(mergeops() == 0); cmppostpre(); chkrc(runevents() == 0); chkrc(matchevents() == 0); chkrc(matchops() == 0); chkrc(dropeventop() == 0); } chkrc(dropevent() == 0); chkrc(droptable() == 0); return 0;}NDB_STD_OPTS_VARS;static struct my_optionmy_long_options[] ={ NDB_STD_OPTS("test_event_merge"), { "abort-on-error", 1001, "Do abort() on any error", (gptr*)&g_opts.abort_on_error, (gptr*)&g_opts.abort_on_error, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, { "loglevel", 1002, "Logging level in this program (default 0)", (gptr*)&g_opts.loglevel, (gptr*)&g_opts.loglevel, 0, GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, { "loop", 1003, "Number of test loops (default 2, 0=forever)", (gptr*)&g_opts.loop, (gptr*)&g_opts.loop, 0, GET_INT, REQUIRED_ARG, 2, 0, 0, 0, 0, 0 }, { "maxops", 1004, "Approx number of PK operations (default 1000)", (gptr*)&g_opts.maxops, (gptr*)&g_opts.maxops, 0, GET_UINT, REQUIRED_ARG, 1000, 0, 0, 0, 0, 0 }, { "maxpk", 1005, "Number of different PK values (default 10)", (gptr*)&g_opts.maxpk, (gptr*)&g_opts.maxpk, 0, GET_UINT, REQUIRED_ARG, 10, 1, g_maxpk, 0, 0, 0 }, { "no-blobs", 1006, "Omit blob attributes (5.0: true)", (gptr*)&g_opts.no_blobs, (gptr*)&g_opts.no_blobs, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, { "no-multiops", 1007, "Allow only 1 operation per commit", (gptr*)&g_opts.no_multiops, (gptr*)&g_opts.no_multiops, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, { "one-blob", 1008, "Only one blob attribute (defautt 2)", (gptr*)&g_opts.one_blob, (gptr*)&g_opts.one_blob, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, { "opstring", 1009, "Operations to run e.g. idiucdc (c is commit) or" " iuuc:uudc (the : separates loops)", (gptr*)&g_opts.opstring, (gptr*)&g_opts.opstring, 0, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, { "seed", 1010, "Random seed (0=loop number, default -1=random)", (gptr*)&g_opts.seed, (gptr*)&g_opts.seed, 0, GET_INT, REQUIRED_ARG, -1, 0, 0, 0, 0, 0 }, { "separate-events", 1011, "Do not combine events per GCI (5.0: true)", (gptr*)&g_opts.separate_events, (gptr*)&g_opts.separate_events, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, { "use-table", 1012, "Use existing table 'tem1'", (gptr*)&g_opts.use_table, (gptr*)&g_opts.use_table, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, { 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }};static voidusage(){ my_print_help(my_long_options);}static intcheckopts(){#ifdef version50 g_opts.separate_events = true;#endif if (g_opts.separate_events) { g_opts.no_blobs = true; } if (g_opts.no_multiops) { g_maxcom = 1; } if (g_opts.opstring != 0) { uint len = strlen(g_opts.opstring); char* str = new char [len + 1]; memcpy(str, g_opts.opstring, len + 1); char* s = str; while (1) { g_opstringpart[g_opstringparts++] = s; s = strchr(s, ':'); if (s == 0) break; *s++ = 0; } uint i; for (i = 0; i < g_opstringparts; i++) { const char* s = g_opstringpart[i]; while (*s != 0) if (strchr("iduc", *s++) == 0) return -1; if (s == g_opstringpart[i] || s[-1] != 'c') return -1; } } return 0;}intmain(int argc, char** argv){ ndb_init(); const char* progname = strchr(argv[0], '/') ? strrchr(argv[0], '/') + 1 : argv[0]; uint i; ndbout << progname; for (i = 1; i < argc; i++) ndbout << " " << argv[i]; ndbout << endl; int ret; ret = handle_options(&argc, &argv, my_long_options, ndb_std_get_one_option); if (ret != 0 || argc != 0 || checkopts() != 0) return NDBT_ProgramExit(NDBT_WRONGARGS); g_ncc = new Ndb_cluster_connection(); if (g_ncc->connect(30) == 0) { g_ndb = new Ndb(g_ncc, "TEST_DB"); if (g_ndb->init() == 0 && g_ndb->waitUntilReady(30) == 0) { if (runtest() == 0) return NDBT_ProgramExit(NDBT_OK); } } if (g_evt_op != 0) { (void)dropeventop(); g_evt_op = 0; } delete g_ndb; delete g_ncc; return NDBT_ProgramExit(NDBT_FAILED);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -