📄 hugotransactions.cpp
字号:
} if(check == -1 ) { const NdbError err = pTrans->getNdbError(); closeTransaction(pNdb); pTrans= 0; switch(err.status){ case NdbError::Success: ERR(err); g_info << "ERROR: NdbError reports success when transcaction failed" << endl; return NDBT_FAILED; break; case NdbError::TemporaryError: ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; break; case NdbError::UnknownResult: ERR(err); return NDBT_FAILED; break; case NdbError::PermanentError: if (allowConstraintViolation == true){ switch (err.classification){ case NdbError::ConstraintViolation: // Tuple already existed, OK but should be reported g_info << c << ": " << err.code << " " << err.message << endl; c++; continue; break; default: break; } } ERR(err); return err.code; break; } } else{ if (closeTrans) { closeTransaction(pNdb); pTrans= 0; } } // Step to next record c = c+batch; retryAttempt = 0; } if(pTrans) closeTransaction(pNdb); return NDBT_OK;}intHugoTransactions::fillTable(Ndb* pNdb, int batch){ int check, a, b; int retryAttempt = 0; int retryMax = 5; NdbOperation *pOp; const int org = batch; const int cols = tab.getNoOfColumns(); const int brow = tab.getRowSizeInBytes(); const int bytes = 12 + brow + 4 * cols; batch = (batch * 256); // -> 512 -> 65536k per commit batch = batch/bytes; // batch = batch == 0 ? 1 : batch; if(batch != org){ g_info << "batch = " << org << " rowsize = " << bytes << " -> rows/commit = " << batch << endl; } for (int c=0 ; ; ){ if (retryAttempt >= retryMax){ g_info << "Record " << c << " could not be inserted, has retried " << retryAttempt << " times " << endl; // Reset retry counters and continue with next record retryAttempt = 0; c++; } pTrans = pNdb->startTransaction(); if (pTrans == NULL) { const NdbError err = pNdb->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); return NDBT_FAILED; } if(pkInsertRecord(pNdb, c, batch) != NDBT_OK) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } // Execute the transaction and insert the record check = pTrans->execute( Commit, CommitAsMuchAsPossible ); if(check == -1 ) { const NdbError err = pTrans->getNdbError(); closeTransaction(pNdb); switch(err.status){ case NdbError::Success: ERR(err); g_info << "ERROR: NdbError reports success when transcaction failed" << endl; return NDBT_FAILED; break; case NdbError::TemporaryError: ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; break; case NdbError::UnknownResult: ERR(err); return NDBT_FAILED; break; case NdbError::PermanentError: // if (allowConstraintViolation == true){ // switch (err.classification){ // case NdbError::ConstraintViolation: // // Tuple already existed, OK but should be reported // g_info << c << ": " << err.code << " " << err.message << endl; // c++; // continue; // break; // default: // break;es // } // } // Check if this is the "db full" error if (err.classification==NdbError::InsufficientSpace){ ERR(err); return NDBT_OK; } if (err.classification == NdbError::ConstraintViolation){ ERR(err); break; } ERR(err); return NDBT_FAILED; break; } } else{ closeTransaction(pNdb); } // Step to next record c = c+batch; retryAttempt = 0; } return NDBT_OK;}int HugoTransactions::pkReadRecords(Ndb* pNdb, int records, int batch, NdbOperation::LockMode lm){ int reads = 0; int r = 0; int retryAttempt = 0; const int retryMax = 100; int check, a; if (batch == 0) { g_info << "ERROR: Argument batch == 0 in pkReadRecords(). Not allowed." << endl; return NDBT_FAILED; } while (r < records){ if(r + batch > records) batch = records - r; if (retryAttempt >= retryMax){ g_info << "ERROR: has retried this operation " << retryAttempt << " times, failing!" << endl; return NDBT_FAILED; } pTrans = pNdb->startTransaction(); if (pTrans == NULL) { const NdbError err = pNdb->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); return NDBT_FAILED; } if(pkReadRecord(pNdb, r, batch, lm) != NDBT_OK) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } check = pTrans->execute(Commit); if( check == -1 ) { const NdbError err = pTrans->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); retryAttempt++; continue; } switch(err.code){ case 626: // Tuple did not exist g_info << r << ": " << err.code << " " << err.message << endl; r++; break; default: ERR(err); closeTransaction(pNdb); return NDBT_FAILED; } } else { if(pIndexScanOp) { int rows_found = 0; while((check = pIndexScanOp->nextResult()) == 0) { rows_found++; if (calc.verifyRowValues(rows[0]) != 0){ closeTransaction(pNdb); return NDBT_FAILED; } } if(check != 1 || rows_found > batch) { closeTransaction(pNdb); return NDBT_FAILED; } else if(rows_found < batch) { if(batch == 1){ g_info << r << ": not found" << endl; abort(); } else g_info << "Found " << rows_found << " of " << batch << " rows" << endl; } r += batch; reads += rows_found; } else { for (int b=0; (b<batch) && (r+b<records); b++){ if (calc.verifyRowValues(rows[b]) != 0){ closeTransaction(pNdb); return NDBT_FAILED; } reads++; r++; } } } closeTransaction(pNdb); } deallocRows(); g_info << reads << " records read" << endl; return NDBT_OK;}int HugoTransactions::pkUpdateRecords(Ndb* pNdb, int records, int batch, int doSleep){ int updated = 0; int r = 0; int retryAttempt = 0; const int retryMax = 100; int check, a, b; NdbOperation *pOp; allocRows(batch); g_info << "|- Updating records (batch=" << batch << ")..." << endl; while (r < records){ if(r + batch > records) batch = records - r; if (retryAttempt >= retryMax){ g_info << "ERROR: has retried this operation " << retryAttempt << " times, failing!" << endl; return NDBT_FAILED; } if (doSleep > 0) NdbSleep_MilliSleep(doSleep); pTrans = pNdb->startTransaction(); if (pTrans == NULL) { const NdbError err = pNdb->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); return NDBT_FAILED; } if(pkReadRecord(pNdb, r, batch, NdbOperation::LM_Exclusive) != NDBT_OK) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } check = pTrans->execute(NoCommit); if( check == -1 ) { const NdbError err = pTrans->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); closeTransaction(pNdb); return NDBT_FAILED; } if(pIndexScanOp) { int rows_found = 0; while((check = pIndexScanOp->nextResult(true)) == 0) { do { if (calc.verifyRowValues(rows[0]) != 0){ closeTransaction(pNdb); return NDBT_FAILED; } int updates = calc.getUpdatesValue(rows[0]) + 1; if(pkUpdateRecord(pNdb, r+rows_found, 1, updates) != NDBT_OK) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } rows_found++; } while((check = pIndexScanOp->nextResult(false)) == 0); if(check != 2) break; if((check = pTrans->execute(NoCommit)) != 0) break; } if(check != 1 || rows_found != batch) { closeTransaction(pNdb); return NDBT_FAILED; } } else { for(b = 0; b<batch && (b+r)<records; b++) { if (calc.verifyRowValues(rows[b]) != 0) { closeTransaction(pNdb); return NDBT_FAILED; } int updates = calc.getUpdatesValue(rows[b]) + 1; if(pkUpdateRecord(pNdb, r+b, 1, updates) != NDBT_OK) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } check = pTrans->execute(Commit); } if( check == -1 ) { const NdbError err = pTrans->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); ndbout << "r = " << r << endl; closeTransaction(pNdb); return NDBT_FAILED; } else{ updated += batch; } closeTransaction(pNdb); r += batch; // Read next record } deallocRows(); g_info << "|- " << updated << " records updated" << endl; return NDBT_OK;}int HugoTransactions::pkInterpretedUpdateRecords(Ndb* pNdb, int records, int batch){ int updated = 0; int r = 0; int retryAttempt = 0; const int retryMax = 100; int check, a; while (r < records){ if (retryAttempt >= retryMax){ g_info << "ERROR: has retried this operation " << retryAttempt << " times, failing!" << endl; return NDBT_FAILED; } pTrans = pNdb->startTransaction(); if (pTrans == NULL) { const NdbError err = pNdb->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); return NDBT_FAILED; } NdbOperation* pOp = pTrans->getNdbOperation(tab.getName()); if (pOp == NULL) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } check = pOp->readTupleExclusive(); if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } // Define primary keys for(a = 0; a<tab.getNoOfColumns(); a++){ if (tab.getColumn(a)->getPrimaryKey() == true){ if(equalForAttr(pOp, a, r) != 0){ ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } // Read update value for(a = 0; a<tab.getNoOfColumns(); a++){ if (calc.isUpdateCol(a) == true){ if((row.attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName())) == 0) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } check = pTrans->execute(NoCommit); if( check == -1 ) { const NdbError err = pTrans->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); closeTransaction(pNdb); return NDBT_FAILED; } int updates = calc.getUpdatesValue(&row) + 1; NdbOperation* pUpdOp; pUpdOp = pTrans->getNdbOperation(tab.getName()); if (pUpdOp == NULL) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } check = pUpdOp->interpretedUpdateTuple(); if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } // PKs for(a = 0; a<tab.getNoOfColumns(); a++){ if (tab.getColumn(a)->getPrimaryKey() == true){ if(equalForAttr(pUpdOp, a, r) != 0){ ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } // Update col for(a = 0; a<tab.getNoOfColumns(); a++){ if ((tab.getColumn(a)->getPrimaryKey() == false) && (calc.isUpdateCol(a) == true)){ // TODO switch for 32/64 bit const NdbDictionary::Column* attr = tab.getColumn(a); Uint32 valToIncWith = 1; check = pUpdOp->incValue(attr->getName(), valToIncWith); if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } // Remaining attributes for(a = 0; a<tab.getNoOfColumns(); a++){ if ((tab.getColumn(a)->getPrimaryKey() == false) && (calc.isUpdateCol(a) == false)){ if(setValueForAttr(pUpdOp, a, r, updates ) != 0){ ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } check = pTrans->execute(Commit);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -