📄 utiltransactions.cpp
字号:
} /** * Compare the two rows */ if(!null_found){ if (pScanOp) { if (pScanOp->nextResult() != 0){ const NdbError err = pTrans1->getNdbError(); ERR(err); ndbout << "Error when comparing records - index op next_result missing" << endl; ndbout << "row: " << row.c_str().c_str() << endl; goto close_all; } } if (!(tabRow.c_str() == indexRow.c_str())){ ndbout << "Error when comapring records" << endl; ndbout << " tabRow: \n" << tabRow.c_str().c_str() << endl; ndbout << " indexRow: \n" << indexRow.c_str().c_str() << endl; goto close_all; } if (pScanOp) { if (pScanOp->nextResult() == 0){ ndbout << "Error when comparing records - index op next_result to many" << endl; ndbout << "row: " << row.c_str().c_str() << endl; goto close_all; } } } return_code= NDBT_OK; goto close_all; } close_all: if (pTrans1) pNdb->closeTransaction(pTrans1); return return_code;}int UtilTransactions::verifyOrderedIndex(Ndb* pNdb, const NdbDictionary::Index* pIndex, int parallelism, bool transactional){ int retryAttempt = 0; const int retryMax = 100; int check; NdbScanOperation *pOp; NdbIndexScanOperation * iop = 0; NDBT_ResultRow scanRow(tab); NDBT_ResultRow pkRow(tab); NDBT_ResultRow indexRow(tab); const char * indexName = pIndex->getName(); int res; parallelism = 1; while (true){ if (retryAttempt >= retryMax){ g_info << "ERROR: has retried this operation " << retryAttempt << " times, failing!" << endl; return NDBT_FAILED; } pTrans = pNdb->startTransaction(); if (pTrans == NULL) { const NdbError err = pNdb->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); return NDBT_FAILED; } pOp = pTrans->getNdbScanOperation(tab.getName()); if (pOp == NULL) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } if( pOp->readTuples(NdbScanOperation::LM_Read, 0, parallelism) ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } check = pOp->interpret_exit_ok(); if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } if(get_values(pOp, scanRow)) { abort(); } check = pTrans->execute(NoCommit); if( check == -1 ) { const NdbError err = pTrans->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); closeTransaction(pNdb); return NDBT_FAILED; } int eof; int rows = 0; while(check == 0 && (eof = pOp->nextResult()) == 0){ rows++; bool null_found= false; for(int a = 0; a<(int)pIndex->getNoOfColumns(); a++){ const NdbDictionary::Column * col = pIndex->getColumn(a); if (scanRow.attributeStore(col->getName())->isNULL()) { null_found= true; break; } } // Do pk lookup NdbOperation * pk = pTrans->getNdbOperation(tab.getName()); if(!pk || pk->readTuple()) goto error; if(equal(&tab, pk, scanRow) || get_values(pk, pkRow)) goto error; if(!null_found) { if(!iop && (iop= pTrans->getNdbIndexScanOperation(indexName, tab.getName()))) { if(iop->readTuples(NdbScanOperation::LM_CommittedRead, parallelism)) goto error; iop->interpret_exit_ok(); if(get_values(iop, indexRow)) goto error; } else if(!iop || iop->reset_bounds()) { goto error; } if(equal(pIndex, iop, scanRow)) goto error; } check = pTrans->execute(NoCommit); if(check) goto error; if(scanRow.c_str() != pkRow.c_str()){ g_err << "Error when comapring records" << endl; g_err << " scanRow: \n" << scanRow.c_str().c_str() << endl; g_err << " pkRow: \n" << pkRow.c_str().c_str() << endl; closeTransaction(pNdb); return NDBT_FAILED; } if(!null_found) { if((res= iop->nextResult()) != 0){ g_err << "Failed to find row using index: " << res << endl; ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } if(scanRow.c_str() != indexRow.c_str()){ g_err << "Error when comapring records" << endl; g_err << " scanRow: \n" << scanRow.c_str().c_str() << endl; g_err << " indexRow: \n" << indexRow.c_str().c_str() << endl; closeTransaction(pNdb); return NDBT_FAILED; } if(iop->nextResult() == 0){ g_err << "Found extra row!!" << endl; g_err << " indexRow: \n" << indexRow.c_str().c_str() << endl; closeTransaction(pNdb); return NDBT_FAILED; } } } if (eof == -1 || check == -1) { error: const NdbError err = pTrans->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); iop = 0; closeTransaction(pNdb); NdbSleep_MilliSleep(50); retryAttempt++; rows--; continue; } ERR(err); closeTransaction(pNdb); return NDBT_FAILED; } closeTransaction(pNdb); return NDBT_OK; } return NDBT_FAILED;}intUtilTransactions::get_values(NdbOperation* op, NDBT_ResultRow& dst){ for (int a = 0; a < tab.getNoOfColumns(); a++){ NdbRecAttr*& ref= dst.attributeStore(a); if ((ref= op->getValue(a)) == 0) { return NDBT_FAILED; } } return 0;}intUtilTransactions::equal(const NdbDictionary::Index* pIndex, NdbOperation* op, const NDBT_ResultRow& src){ for(Uint32 a = 0; a<pIndex->getNoOfColumns(); a++){ const NdbDictionary::Column * col = pIndex->getColumn(a); if(op->equal(col->getName(), src.attributeStore(col->getName())->aRef()) != 0){ return NDBT_FAILED; } } return 0;}intUtilTransactions::equal(const NdbDictionary::Table* pTable, NdbOperation* op, const NDBT_ResultRow& src){ for(Uint32 a = 0; a<tab.getNoOfColumns(); a++){ const NdbDictionary::Column* attr = tab.getColumn(a); if (attr->getPrimaryKey() == true){ if (op->equal(attr->getName(), src.attributeStore(a)->aRef()) != 0){ return NDBT_FAILED; } } } return 0;}NdbScanOperation*UtilTransactions::getScanOperation(NdbConnection* pTrans){ return (NdbScanOperation*) getOperation(pTrans, NdbOperation::OpenScanRequest);}NdbOperation*UtilTransactions::getOperation(NdbConnection* pTrans, NdbOperation::OperationType type){ switch(type){ case NdbOperation::ReadRequest: case NdbOperation::ReadExclusive: if(idx) { switch(idx->getType()){ case NdbDictionary::Index::UniqueHashIndex: return pTrans->getNdbIndexOperation(idx->getName(), tab.getName()); case NdbDictionary::Index::OrderedIndex: return pTrans->getNdbIndexScanOperation(idx->getName(), tab.getName()); } } case NdbOperation::InsertRequest: case NdbOperation::WriteRequest: return pTrans->getNdbOperation(tab.getName()); case NdbOperation::UpdateRequest: case NdbOperation::DeleteRequest: if(idx) { switch(idx->getType()){ case NdbDictionary::Index::UniqueHashIndex: return pTrans->getNdbIndexOperation(idx->getName(), tab.getName()); } } return pTrans->getNdbOperation(tab.getName()); case NdbOperation::OpenScanRequest: if(idx) { switch(idx->getType()){ case NdbDictionary::Index::OrderedIndex: return pTrans->getNdbIndexScanOperation(idx->getName(), tab.getName()); } } return pTrans->getNdbScanOperation(tab.getName()); case NdbOperation::OpenRangeScanRequest: if(idx) { switch(idx->getType()){ case NdbDictionary::Index::OrderedIndex: return pTrans->getNdbIndexScanOperation(idx->getName(), tab.getName()); } } return 0; }}#include <HugoOperations.hpp>intUtilTransactions::closeTransaction(Ndb* pNdb){ if (pTrans != NULL){ pNdb->closeTransaction(pTrans); pTrans = NULL; } return 0;}int UtilTransactions::compare(Ndb* pNdb, const char* tab_name2, int flags){ NdbError err; int return_code= -1, row_count= 0; int retryAttempt = 0, retryMax = 10; HugoCalculator calc(tab); NDBT_ResultRow row(tab); const NdbDictionary::Table* tmp= pNdb->getDictionary()->getTable(tab_name2); if(tmp == 0) { g_err << "Unable to lookup table: " << tab_name2 << endl << pNdb->getDictionary()->getNdbError() << endl; return -1; } const NdbDictionary::Table& tab2= *tmp; HugoOperations cmp(tab2); UtilTransactions count(tab2); while (true){ if (retryAttempt++ >= retryMax){ g_info << "ERROR: has retried this operation " << retryAttempt << " times, failing!" << endl; return -1; } NdbScanOperation *pOp= 0; pTrans = pNdb->startTransaction(); if (pTrans == NULL) { err = pNdb->getNdbError(); goto error; } pOp= pTrans->getNdbScanOperation(tab.getName()); if (pOp == NULL) { ERR(err= pTrans->getNdbError()); goto error; } if( pOp->readTuples(NdbScanOperation::LM_Read) ) { ERR(err= pTrans->getNdbError()); goto error; } if( pOp->interpret_exit_ok() == -1 ) { ERR(err= pTrans->getNdbError()); goto error; } // Read all attributes { for (int a = 0; a < tab.getNoOfColumns(); a++){ if ((row.attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName())) == 0) { ERR(err= pTrans->getNdbError()); goto error; } } } if( pTrans->execute(NoCommit) == -1 ) { ERR(err= pTrans->getNdbError()); goto error; } { int eof; while((eof = pOp->nextResult(true)) == 0) { do { row_count++; if(cmp.startTransaction(pNdb) != NDBT_OK) { ERR(err= pNdb->getNdbError()); goto error; } int rowNo= calc.getIdValue(&row); if(cmp.pkReadRecord(pNdb, rowNo, 1) != NDBT_OK) { ERR(err= cmp.getTransaction()->getNdbError()); goto error; } if(cmp.execute_Commit(pNdb) != NDBT_OK) { ERR(err= cmp.getTransaction()->getNdbError()); goto error; } if(row != cmp.get_row(0)) { g_err << "COMPARE FAILED" << endl; g_err << row << endl; g_err << cmp.get_row(0) << endl; return_code= 1; goto close; } retryAttempt= 0; cmp.closeTransaction(pNdb); } while((eof = pOp->nextResult(false)) == 0); } if (eof == -1) { err = pTrans->getNdbError(); goto error; } } pTrans->close(); pTrans= 0; g_info << row_count << " rows compared" << endl; { int row_count2; if(count.selectCount(pNdb, 0, &row_count2) != NDBT_OK) { g_err << "Failed to count rows in tab_name2" << endl; return -1; } g_info << row_count2 << " rows in tab_name2" << endl; return (row_count == row_count2 ? 0 : 1); }error: if(err.status == NdbError::TemporaryError) { NdbSleep_MilliSleep(50); if(pTrans != 0) { pTrans->close(); pTrans= 0; } if(cmp.getTransaction()) cmp.closeTransaction(pNdb); continue; } break; }close: if(pTrans != 0) pTrans->close(); return return_code;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -