📄 hugotransactions.cpp
字号:
intHugoTransactions::loadTable(Ndb* pNdb, int records, int batch, bool allowConstraintViolation, int doSleep, bool oneTrans){ int check, a; int retryAttempt = 0; int retryMax = 5; NdbOperation *pOp; bool first_batch = true; const int org = batch; const int cols = tab.getNoOfColumns(); const int brow = tab.getRowSizeInBytes(); const int bytes = 12 + brow + 4 * cols; batch = (batch * 256); // -> 512 -> 65536k per commit batch = batch/bytes; // batch = batch == 0 ? 1 : batch; if(batch != org){ g_info << "batch = " << org << " rowsize = " << bytes << " -> rows/commit = " << batch << endl; } g_info << "|- Inserting records..." << endl; for (int c=0 ; c<records ; ){ bool closeTrans = true; if(c + batch > records) batch = records - c; if (retryAttempt >= retryMax){ g_info << "Record " << c << " could not be inserted, has retried " << retryAttempt << " times " << endl; // Reset retry counters and continue with next record retryAttempt = 0; c++; } if (doSleep > 0) NdbSleep_MilliSleep(doSleep); // if (first_batch || !oneTrans) { if (first_batch || !pTrans) { first_batch = false; pTrans = pNdb->startTransaction(); if (pTrans == NULL) { const NdbError err = pNdb->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); return NDBT_FAILED; } } if(pkInsertRecord(pNdb, c, batch) != NDBT_OK) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } // Execute the transaction and insert the record if (!oneTrans || (c + batch) >= records) { // closeTrans = true; closeTrans = false; check = pTrans->execute( Commit ); pTrans->restart(); } else { closeTrans = false; check = pTrans->execute( NoCommit ); } if(check == -1 ) { const NdbError err = pTrans->getNdbError(); closeTransaction(pNdb); pTrans= 0; switch(err.status){ case NdbError::Success: ERR(err); g_info << "ERROR: NdbError reports success when transcaction failed" << endl; return NDBT_FAILED; break; case NdbError::TemporaryError: ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; break; case NdbError::UnknownResult: ERR(err); return NDBT_FAILED; break; case NdbError::PermanentError: if (allowConstraintViolation == true){ switch (err.classification){ case NdbError::ConstraintViolation: // Tuple already existed, OK but should be reported g_info << c << ": " << err.code << " " << err.message << endl; c++; continue; break; default: break; } } ERR(err); return err.code; break; } } else{ if (closeTrans) { closeTransaction(pNdb); pTrans= 0; } } // Step to next record c = c+batch; retryAttempt = 0; } if(pTrans) closeTransaction(pNdb); return NDBT_OK;}intHugoTransactions::fillTable(Ndb* pNdb, int batch){ int check, a, b; int retryAttempt = 0; int retryMax = 5; NdbOperation *pOp; const int org = batch; const int cols = tab.getNoOfColumns(); const int brow = tab.getRowSizeInBytes(); const int bytes = 12 + brow + 4 * cols; batch = (batch * 256); // -> 512 -> 65536k per commit batch = batch/bytes; // batch = batch == 0 ? 1 : batch; if(batch != org){ g_info << "batch = " << org << " rowsize = " << bytes << " -> rows/commit = " << batch << endl; } for (int c=0 ; ; ){ if (retryAttempt >= retryMax){ g_info << "Record " << c << " could not be inserted, has retried " << retryAttempt << " times " << endl; // Reset retry counters and continue with next record retryAttempt = 0; c++; } pTrans = pNdb->startTransaction(); if (pTrans == NULL) { const NdbError err = pNdb->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); return NDBT_FAILED; } if(pkInsertRecord(pNdb, c, batch) != NDBT_OK) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } // Execute the transaction and insert the record check = pTrans->execute( Commit, CommitAsMuchAsPossible ); if(check == -1 ) { const NdbError err = pTrans->getNdbError(); closeTransaction(pNdb); switch(err.status){ case NdbError::Success: ERR(err); g_info << "ERROR: NdbError reports success when transcaction failed" << endl; return NDBT_FAILED; break; case NdbError::TemporaryError: ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; break; case NdbError::UnknownResult: ERR(err); return NDBT_FAILED; break; case NdbError::PermanentError: // if (allowConstraintViolation == true){ // switch (err.classification){ // case NdbError::ConstraintViolation: // // Tuple already existed, OK but should be reported // g_info << c << ": " << err.code << " " << err.message << endl; // c++; // continue; // break; // default: // break;es // } // } // Check if this is the "db full" error if (err.classification==NdbError::InsufficientSpace){ ERR(err); return NDBT_OK; } if (err.classification == NdbError::ConstraintViolation){ ERR(err); break; } ERR(err); return NDBT_FAILED; break; } } else{ closeTransaction(pNdb); } // Step to next record c = c+batch; retryAttempt = 0; } return NDBT_OK;}int HugoTransactions::createEvent(Ndb* pNdb){ char eventName[1024]; sprintf(eventName,"%s_EVENT",tab.getName()); NdbDictionary::Dictionary *myDict = pNdb->getDictionary(); if (!myDict) { g_err << "Dictionary not found " << pNdb->getNdbError().code << " " << pNdb->getNdbError().message << endl; return NDBT_FAILED; } NdbDictionary::Event myEvent(eventName); myEvent.setTable(tab.getName()); myEvent.addTableEvent(NdbDictionary::Event::TE_ALL); // myEvent.addTableEvent(NdbDictionary::Event::TE_INSERT); // myEvent.addTableEvent(NdbDictionary::Event::TE_UPDATE); // myEvent.addTableEvent(NdbDictionary::Event::TE_DELETE); // const NdbDictionary::Table *_table = myDict->getTable(tab.getName()); for(int a = 0; a < tab.getNoOfColumns(); a++){ // myEvent.addEventColumn(_table->getColumn(a)->getName()); myEvent.addEventColumn(a); } int res = myDict->createEvent(myEvent); // Add event to database if (res == 0) myEvent.print(); else if (myDict->getNdbError().classification == NdbError::SchemaObjectExists) { g_info << "Event creation failed event exists\n"; res = myDict->dropEvent(eventName); if (res) { g_err << "Failed to drop event: " << myDict->getNdbError().code << " : " << myDict->getNdbError().message << endl; return NDBT_FAILED; } // try again res = myDict->createEvent(myEvent); // Add event to database if (res) { g_err << "Failed to create event (1): " << myDict->getNdbError().code << " : " << myDict->getNdbError().message << endl; return NDBT_FAILED; } } else { g_err << "Failed to create event (2): " << myDict->getNdbError().code << " : " << myDict->getNdbError().message << endl; return NDBT_FAILED; } return NDBT_OK;}#include <NdbEventOperation.hpp>#include "TestNdbEventOperation.hpp"#include <NdbAutoPtr.hpp>struct receivedEvent { Uint32 pk; Uint32 count; Uint32 event;};int XXXXX = 0;int HugoTransactions::eventOperation(Ndb* pNdb, void* pstats, int records) { int myXXXXX = XXXXX++; Uint32 i; const char function[] = "HugoTransactions::eventOperation: "; struct receivedEvent* recInsertEvent; NdbAutoObjArrayPtr<struct receivedEvent> p00( recInsertEvent = new struct receivedEvent[3*records] ); struct receivedEvent* recUpdateEvent = &recInsertEvent[records]; struct receivedEvent* recDeleteEvent = &recInsertEvent[2*records]; EventOperationStats &stats = *(EventOperationStats*)pstats; stats.n_inserts = 0; stats.n_deletes = 0; stats.n_updates = 0; stats.n_consecutive = 0; stats.n_duplicates = 0; stats.n_inconsistent_gcis = 0; for (i = 0; i < records; i++) { recInsertEvent[i].pk = 0xFFFFFFFF; recInsertEvent[i].count = 0; recInsertEvent[i].event = 0xFFFFFFFF; recUpdateEvent[i].pk = 0xFFFFFFFF; recUpdateEvent[i].count = 0; recUpdateEvent[i].event = 0xFFFFFFFF; recDeleteEvent[i].pk = 0xFFFFFFFF; recDeleteEvent[i].count = 0; recDeleteEvent[i].event = 0xFFFFFFFF; } NdbDictionary::Dictionary *myDict = pNdb->getDictionary(); if (!myDict) { g_err << function << "Event Creation failedDictionary not found\n"; return NDBT_FAILED; } int r = 0; NdbEventOperation *pOp; char eventName[1024]; sprintf(eventName,"%s_EVENT",tab.getName()); int noEventColumnName = tab.getNoOfColumns(); g_info << function << "create EventOperation\n"; pOp = pNdb->createEventOperation(eventName, 100); if ( pOp == NULL ) { g_err << function << "Event operation creation failed\n"; return NDBT_FAILED; } g_info << function << "get values\n"; NdbRecAttr* recAttr[1024]; NdbRecAttr* recAttrPre[1024]; const NdbDictionary::Table *_table = myDict->getTable(tab.getName()); for (int a = 0; a < noEventColumnName; a++) { recAttr[a] = pOp->getValue(_table->getColumn(a)->getName()); recAttrPre[a] = pOp->getPreValue(_table->getColumn(a)->getName()); } // set up the callbacks g_info << function << "execute\n"; if (pOp->execute()) { // This starts changes to "start flowing" g_err << function << "operation execution failed: \n"; g_err << pOp->getNdbError().code << " " << pOp->getNdbError().message << endl; return NDBT_FAILED; } g_info << function << "ok\n"; int count = 0; Uint32 last_inconsitant_gci = 0xEFFFFFF0; while (r < records){ //printf("now waiting for event...\n"); int res = pNdb->pollEvents(1000); // wait for event or 1000 ms if (res > 0) { //printf("got data! %d\n", r); int overrun; while (pOp->next(&overrun) > 0) { r++; r += overrun; count++; Uint32 gci = pOp->getGCI(); Uint32 pk = recAttr[0]->u_32_value(); if (!pOp->isConsistent()) { if (last_inconsitant_gci != gci) { last_inconsitant_gci = gci; stats.n_inconsistent_gcis++; } g_warning << "A node failure has occured and events might be missing\n"; } g_info << function << "GCI " << gci << ": " << count; struct receivedEvent* recEvent; switch (pOp->getEventType()) { case NdbDictionary::Event::TE_INSERT: stats.n_inserts++; g_info << " INSERT: "; recEvent = recInsertEvent; break; case NdbDictionary::Event::TE_DELETE: stats.n_deletes++; g_info << " DELETE: "; recEvent = recDeleteEvent; break; case NdbDictionary::Event::TE_UPDATE: stats.n_updates++; g_info << " UPDATE: "; recEvent = recUpdateEvent; break; case NdbDictionary::Event::TE_ALL: abort(); } if ((int)pk < records) { recEvent[pk].pk = pk; recEvent[pk].count++; } g_info << "overrun " << overrun << " pk " << pk; for (i = 1; i < noEventColumnName; i++) { if (recAttr[i]->isNULL() >= 0) { // we have a value g_info << " post[" << i << "]="; if (recAttr[i]->isNULL() == 0) // we have a non-null value g_info << recAttr[i]->u_32_value(); else // we have a null value g_info << "NULL"; } if (recAttrPre[i]->isNULL() >= 0) { // we have a value g_info << " pre[" << i << "]="; if (recAttrPre[i]->isNULL() == 0) // we have a non-null value g_info << recAttrPre[i]->u_32_value(); else // we have a null value g_info << "NULL"; } } g_info << endl; } } else ;//printf("timed out\n"); } // sleep ((XXXXX-myXXXXX)*2); g_info << myXXXXX << "dropping event operation" << endl; int res = pNdb->dropEventOperation(pOp); if (res != 0) { g_err << "operation execution failed\n"; return NDBT_FAILED; } g_info << myXXXXX << " ok" << endl; if (stats.n_inserts > 0) { stats.n_consecutive++; } if (stats.n_deletes > 0) { stats.n_consecutive++; } if (stats.n_updates > 0) { stats.n_consecutive++; } for (i = 0; i < (Uint32)records/3; i++) { if (recInsertEvent[i].pk != i) { stats.n_consecutive ++; ndbout << "missing insert pk " << i << endl; } else if (recInsertEvent[i].count > 1) { ndbout << "duplicates insert pk " << i << " count " << recInsertEvent[i].count << endl; stats.n_duplicates += recInsertEvent[i].count-1;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -