📄 test_event_merge.cpp
字号:
/* Copyright (C) 2005 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#include <ndb_global.h>#include <ndb_opts.h>#include <NdbApi.hpp>#include <NdbTest.hpp>#include <my_sys.h>#include <ndb_version.h>#if NDB_VERSION_D < MAKE_VERSION(5, 1, 0)#define version50#else#undef version50#endif// until rbr in 5.1#undef version51rbr#if !defined(min) || !defined(max)#define min(x, y) ((x) < (y) ? (x) : (y))#define max(x, y) ((x) > (y) ? (x) : (y))#endif/* * Test composite operations on same PK via events. The merge of event * data can happen in 2 places: * * 1) In TUP at commit, the detached triggers report a single composite * operation and its post/pre data * * 2) In event API version >= 5.1 separate commits within same GCI are * by default merged. This is required to read blob data via NdbBlob. * * Option --separate-events disables GCI merge and implies --no-blobs. * This is used to test basic events functionality. * * Option --no-blobs omits blob attributes. This is used to test GCI * merge without getting into blob bugs. * * Option --no-multiops allows 1 operation per commit. This avoids TUP * and blob multi-operation bugs. * * There are 5 ways (ignoring NUL operand) to compose 2 ops: * 5.0 bugs 5.1 bugs * INS o DEL = NUL * INS o UPD = INS type=INS * DEL o INS = UPD type=INS type=INS * UPD o DEL = DEL no event * UPD o UPD = UPD */struct Opts { my_bool abort_on_error; int loglevel; uint loop; uint maxops; uint maxpk; my_bool no_blobs; my_bool no_multiops; my_bool one_blob; const char* opstring; uint seed; my_bool separate_events; my_bool use_table;};static Opts g_opts;static const uint g_maxpk = 100;static const uint g_maxopstringpart = 100;static const char* g_opstringpart[g_maxopstringpart];static uint g_opstringparts = 0;static uint g_loop = 0;static Ndb_cluster_connection* g_ncc = 0;static Ndb* g_ndb = 0;static NdbDictionary::Dictionary* g_dic = 0;static NdbTransaction* g_con = 0;static NdbOperation* g_op = 0;static NdbScanOperation* g_scan_op = 0;static const char* g_tabname = "tem1";static const char* g_evtname = "tem1ev1";static const uint g_charlen = 5;static const char* g_charval = "abcdefgh";static const char* g_csname = "latin1_swedish_ci";static uint g_blobinlinesize = 256;static uint g_blobpartsize = 2000;static uint g_blobstripesize = 2;static const uint g_maxblobsize = 100000;static const NdbDictionary::Table* g_tab = 0;static const NdbDictionary::Event* g_evt = 0;static NdbEventOperation* g_evt_op = 0;static NdbBlob* g_bh = 0;static uinturandom(){ uint r = (uint)random(); return r;}static uinturandom(uint m){ if (m == 0) return 0; uint r = urandom(); r = r % m; return r;}static boolurandom(uint per, uint cent){ return urandom(cent) < per;}static int& g_loglevel = g_opts.loglevel; // default log level#define chkdb(x) \ do { if (x) break; ndbout << "line " << __LINE__ << " FAIL " << #x << endl; errdb(); if (g_opts.abort_on_error) abort(); return -1; } while (0)#define chkrc(x) \ do { if (x) break; ndbout << "line " << __LINE__ << " FAIL " << #x << endl; if (g_opts.abort_on_error) abort(); return -1; } while (0)#define reqrc(x) \ do { if (x) break; ndbout << "line " << __LINE__ << " ASSERT " << #x << endl; abort(); } while (0)#define ll0(x) \ do { if (g_loglevel < 0) break; ndbout << x << endl; } while (0)#define ll1(x) \ do { if (g_loglevel < 1) break; ndbout << x << endl; } while (0)#define ll2(x) \ do { if (g_loglevel < 2) break; ndbout << x << endl; } while (0)static voiderrdb(){ uint any = 0; if (g_ndb != 0) { const NdbError& e = g_ndb->getNdbError(); if (e.code != 0) ll0(++any << " ndb: error " << e); } if (g_dic != 0) { const NdbError& e = g_dic->getNdbError(); if (e.code != 0) ll0(++any << " dic: error " << e); } if (g_con != 0) { const NdbError& e = g_con->getNdbError(); if (e.code != 0) ll0(++any << " con: error " << e); } if (g_op != 0) { const NdbError& e = g_op->getNdbError(); if (e.code != 0) ll0(++any << " op: error " << e); } if (g_scan_op != 0) { const NdbError& e = g_scan_op->getNdbError(); if (e.code != 0) ll0(++any << " scan_op: error " << e); } if (g_evt_op != 0) { const NdbError& e = g_evt_op->getNdbError(); if (e.code != 0) ll0(++any << " evt_op: error " << e); } if (g_bh != 0) { const NdbError& e = g_bh->getNdbError(); if (e.code != 0) ll0(++any << " evt_op: error " << e); } if (! any) ll0("unknown db error");}struct Col { uint no; const char* name; NdbDictionary::Column::Type type; bool pk; bool nullable; uint length; uint size; bool isblob() const { return type == NdbDictionary::Column::Text; }};static Col g_col[] = { { 0, "pk1", NdbDictionary::Column::Unsigned, true, false, 1, 4 }, { 1, "pk2", NdbDictionary::Column::Char, true, false, g_charlen, g_charlen }, { 2, "seq", NdbDictionary::Column::Unsigned, false, false, 1, 4 }, { 3, "cc1", NdbDictionary::Column::Char, false, true, g_charlen, g_charlen }, { 4, "tx1", NdbDictionary::Column::Text, false, true, 0, 0 }, { 5, "tx2", NdbDictionary::Column::Text, false, true, 0, 0 }};static const uint g_maxcol = sizeof(g_col)/sizeof(g_col[0]);static uintncol(){ uint n = g_maxcol; if (g_opts.no_blobs) n -= 2; else if (g_opts.one_blob) n -= 1; return n;}static const Col&getcol(uint i){ if (i < ncol()) return g_col[i]; assert(false); return g_col[0];}static const Col&getcol(const char* name){ uint i; for (i = 0; i < ncol(); i++) if (strcmp(g_col[i].name, name) == 0) break; return getcol(i);}static intcreatetable(){ g_tab = 0; NdbDictionary::Table tab(g_tabname); tab.setLogging(false); CHARSET_INFO* cs; chkrc((cs = get_charset_by_name(g_csname, MYF(0))) != 0); uint i; for (i = 0; i < ncol(); i++) { const Col& c = g_col[i]; NdbDictionary::Column col(c.name); col.setType(c.type); col.setPrimaryKey(c.pk); if (! c.pk) col.setNullable(true); col.setLength(c.length); switch (c.type) { case NdbDictionary::Column::Unsigned: break; case NdbDictionary::Column::Char: col.setLength(c.length); col.setCharset(cs); break; case NdbDictionary::Column::Text: col.setInlineSize(g_blobinlinesize); col.setPartSize(g_blobpartsize); col.setStripeSize(g_blobstripesize); col.setCharset(cs); break; default: assert(false); break; } tab.addColumn(col); } g_dic = g_ndb->getDictionary(); if (! g_opts.use_table) { if (g_dic->getTable(g_tabname) != 0) chkdb(g_dic->dropTable(g_tabname) == 0); chkdb(g_dic->createTable(tab) == 0); } chkdb((g_tab = g_dic->getTable(g_tabname)) != 0); g_dic = 0; if (! g_opts.use_table) { // extra row for GCI probe chkdb((g_con = g_ndb->startTransaction()) != 0); chkdb((g_op = g_con->getNdbOperation(g_tabname)) != 0); chkdb(g_op->insertTuple() == 0); Uint32 pk1; char pk2[g_charlen + 1]; pk1 = g_maxpk; sprintf(pk2, "%-*u", g_charlen, pk1); chkdb(g_op->equal("pk1", (char*)&pk1) == 0); chkdb(g_op->equal("pk2", (char*)&pk2[0]) == 0); chkdb(g_con->execute(Commit) == 0); g_ndb->closeTransaction(g_con); g_op = 0; g_con = 0; } return 0;}static intdroptable(){ if (! g_opts.use_table) { g_dic = g_ndb->getDictionary(); chkdb(g_dic->dropTable(g_tab->getName()) == 0); g_tab = 0; g_dic = 0; } return 0;}struct Data { struct Txt { char* val; uint len; }; union Ptr { Uint32* u32; char* ch; Txt* txt; void* v; }; Uint32 pk1; char pk2[g_charlen + 1]; Uint32 seq; char cc1[g_charlen + 1]; Txt tx1; Txt tx2; Ptr ptr[g_maxcol]; int ind[g_maxcol]; // -1 = no data, 1 = NULL, 0 = not NULL uint noop; // bit: omit in NdbOperation (implicit NULL INS or no UPD) uint ppeq; // bit: post/pre data value equal in GCI data[0]/data[1] void init() { uint i; pk1 = 0; memset(pk2, 0, sizeof(pk2)); seq = 0; memset(cc1, 0, sizeof(cc1)); tx1.val = tx2.val = 0; tx1.len = tx2.len = 0; ptr[0].u32 = &pk1; ptr[1].ch = pk2; ptr[2].u32 = &seq; ptr[3].ch = cc1; ptr[4].txt = &tx1; ptr[5].txt = &tx2; for (i = 0; i < g_maxcol; i++) ind[i] = -1; noop = 0; ppeq = 0; } void free() { delete [] tx1.val; delete [] tx2.val; init(); }};static intcmpcol(const Col& c, const Data& d1, const Data& d2){ uint i = c.no; if (d1.ind[i] != d2.ind[i]) return 1; if (d1.ind[i] == 0) { switch (c.type) { case NdbDictionary::Column::Unsigned: if (*d1.ptr[i].u32 != *d2.ptr[i].u32) return 1; break; case NdbDictionary::Column::Char: if (memcmp(d1.ptr[i].ch, d2.ptr[i].ch, c.size) != 0) return 1; break; case NdbDictionary::Column::Text: { const Data::Txt& t1 = *d1.ptr[i].txt; const Data::Txt& t2 = *d2.ptr[i].txt; if (t1.len != t2.len) return 1; if (memcmp(t1.val, t2.val, t1.len) != 0) return 1; } break; default: assert(false); break; } } return 0;}static NdbOut&operator<<(NdbOut& out, const Data& d){ uint i; for (i = 0; i < ncol(); i++) { const Col& c = getcol(i); out << (i == 0 ? "" : " ") << c.name; out << (! (d.noop & (1 << i)) ? "=" : ":"); if (d.ind[i] == -1) continue; if (d.ind[i] == 1) { out << "NULL"; continue; } switch (c.type) { case NdbDictionary::Column::Unsigned: out << *d.ptr[i].u32; break; case NdbDictionary::Column::Char: { char buf[g_charlen + 1]; memcpy(buf, d.ptr[i].ch, g_charlen); uint n = g_charlen; while (1) { buf[n] = 0; if (n == 0 || buf[n - 1] != 0x20) break; n--; } out << "'" << buf << "'"; } break; case NdbDictionary::Column::Text: { Data::Txt& t = *d.ptr[i].txt; bool first = true; uint j = 0; while (j < t.len) { char c[2]; c[0] = t.val[j++]; c[1] = 0; uint m = 1; while (j < t.len && t.val[j] == c[0]) j++, m++; if (! first) out << "+"; first = false; out << m << c; } } break; default: assert(false); break; } } return out;}static const uint g_optypes = 3; // real ops 0-2/* * Represents single or composite operation or received event. The * post/pre data is either computed here for operations or received from * the event. */struct Op { // single or composite enum Kind { OP = 1, EV = 2 }; enum Type { NUL = -1, INS, DEL, UPD }; Kind kind; Type type; Op* next_op; // within one commit Op* next_com; // next commit chain or next event Op* next_gci; // groups commit chains (unless --separate-events) Op* next_ev; Op* next_free; // free list bool free; // on free list uint num_op; uint num_com; Data data[2]; // 0-post 1-pre bool match; // matched to event Uint32 gci; // defined for com op and event void init(Kind a_kind) { kind = a_kind; assert(kind == OP || kind == EV); type = NUL; next_op = next_com = next_gci = next_ev = next_free = 0; free = false; num_op = num_com = 0; data[0].init(); data[1].init(); match = false; gci = 0; }};static NdbOut&operator<<(NdbOut& out, Op::Type t){ switch (t) { case Op::NUL: out << "NUL"; break; case Op::INS: out << "INS"; break; case Op::DEL: out << "DEL"; break; case Op::UPD: out << "UPD"; break; default: out << (int)t; break; } return out;}static NdbOut&operator<<(NdbOut& out, const Op& op){ out << op.type; out << " " << op.data[0]; out << " [" << op.data[1] << "]"; if (op.gci != 0) out << " gci:" << op.gci; return out;}static intseteventtype(Op* ev, NdbDictionary::Event::TableEvent te){ Op::Type t = Op::NUL; switch (te) { case NdbDictionary::Event::TE_INSERT: t = Op::INS; break; case NdbDictionary::Event::TE_DELETE: t = Op::DEL; break; case NdbDictionary::Event::TE_UPDATE: t = Op::UPD; break; default: ll0("EVT: " << *ev << ": bad event type" << (int)te); return -1; } ev->type = t; return 0;}static Op* g_opfree = 0;static uint g_freeops = 0;static uint g_usedops = 0;static uint g_maxcom = 10; // max ops per commitstatic Op* g_pk_op[g_maxpk];static Op* g_pk_ev[g_maxpk];static uint g_seq = 0;static NdbRecAttr* g_ev_ra[2][g_maxcol]; // 0-post 1-prestatic NdbBlob* g_ev_bh[2][g_maxcol]; // 0-post 1-prestatic Op* g_rec_ev;static uint g_ev_pos[g_maxpk];static Op*getop(Op::Kind a_kind){ if (g_opfree == 0) { assert(g_freeops == 0); Op* op = new Op; assert(op != 0); op->next_free = g_opfree; g_opfree = op; op->free = true; g_freeops++; } Op* op = g_opfree; g_opfree = op->next_free; assert(g_freeops != 0); g_freeops--; g_usedops++; op->init(a_kind); return op;}static voidfreeop(Op* op){ assert(! op->free); op->data[0].free(); op->data[1].free(); op->free = true; op->next_free = g_opfree; g_opfree = op; g_freeops++; assert(g_usedops != 0); g_usedops--;}static void
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -