📄 hugotransactions.cpp
字号:
} if (recUpdateEvent[i].pk != i) { stats.n_consecutive ++; ndbout << "missing update pk " << i << endl; } else if (recUpdateEvent[i].count > 1) { ndbout << "duplicates update pk " << i << " count " << recUpdateEvent[i].count << endl; stats.n_duplicates += recUpdateEvent[i].count-1; } if (recDeleteEvent[i].pk != i) { stats.n_consecutive ++; ndbout << "missing delete pk " << i << endl; } else if (recDeleteEvent[i].count > 1) { ndbout << "duplicates delete pk " << i << " count " << recDeleteEvent[i].count << endl; stats.n_duplicates += recDeleteEvent[i].count-1; } } return NDBT_OK;}int HugoTransactions::pkReadRecords(Ndb* pNdb, int records, int batch, NdbOperation::LockMode lm){ int reads = 0; int r = 0; int retryAttempt = 0; const int retryMax = 100; int check, a; if (batch == 0) { g_info << "ERROR: Argument batch == 0 in pkReadRecords(). Not allowed." << endl; return NDBT_FAILED; } while (r < records){ if(r + batch > records) batch = records - r; if (retryAttempt >= retryMax){ g_info << "ERROR: has retried this operation " << retryAttempt << " times, failing!" << endl; return NDBT_FAILED; } pTrans = pNdb->startTransaction(); if (pTrans == NULL) { const NdbError err = pNdb->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); return NDBT_FAILED; } if(pkReadRecord(pNdb, r, batch, lm) != NDBT_OK) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } check = pTrans->execute(Commit); if( check == -1 ) { const NdbError err = pTrans->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); retryAttempt++; continue; } switch(err.code){ case 626: // Tuple did not exist g_info << r << ": " << err.code << " " << err.message << endl; r++; break; default: ERR(err); closeTransaction(pNdb); return NDBT_FAILED; } } else { if(pIndexScanOp) { int rows_found = 0; while((check = pIndexScanOp->nextResult()) == 0) { rows_found++; if (calc.verifyRowValues(rows[0]) != 0){ closeTransaction(pNdb); return NDBT_FAILED; } } if(check != 1 || rows_found > batch) { closeTransaction(pNdb); return NDBT_FAILED; } else if(rows_found < batch) { if(batch == 1){ g_info << r << ": not found" << endl; abort(); } else g_info << "Found " << rows_found << " of " << batch << " rows" << endl; } r += batch; reads += rows_found; } else { for (int b=0; (b<batch) && (r+b<records); b++){ if (calc.verifyRowValues(rows[b]) != 0){ closeTransaction(pNdb); return NDBT_FAILED; } reads++; r++; } } } closeTransaction(pNdb); } deallocRows(); g_info << reads << " records read" << endl; return NDBT_OK;}int HugoTransactions::pkUpdateRecords(Ndb* pNdb, int records, int batch, int doSleep){ int updated = 0; int r = 0; int retryAttempt = 0; const int retryMax = 100; int check, a, b; NdbOperation *pOp; allocRows(batch); g_info << "|- Updating records (batch=" << batch << ")..." << endl; while (r < records){ if(r + batch > records) batch = records - r; if (retryAttempt >= retryMax){ g_info << "ERROR: has retried this operation " << retryAttempt << " times, failing!" << endl; return NDBT_FAILED; } if (doSleep > 0) NdbSleep_MilliSleep(doSleep); pTrans = pNdb->startTransaction(); if (pTrans == NULL) { const NdbError err = pNdb->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); return NDBT_FAILED; } if(pkReadRecord(pNdb, r, batch, NdbOperation::LM_Exclusive) != NDBT_OK) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } check = pTrans->execute(NoCommit); if( check == -1 ) { const NdbError err = pTrans->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); closeTransaction(pNdb); return NDBT_FAILED; } if(pIndexScanOp) { int rows_found = 0; while((check = pIndexScanOp->nextResult(true)) == 0) { do { if (calc.verifyRowValues(rows[0]) != 0){ closeTransaction(pNdb); return NDBT_FAILED; } int updates = calc.getUpdatesValue(rows[0]) + 1; if(pkUpdateRecord(pNdb, r+rows_found, 1, updates) != NDBT_OK) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } rows_found++; } while((check = pIndexScanOp->nextResult(false)) == 0); if(check != 2) break; if((check = pTrans->execute(NoCommit)) != 0) break; } if(check != 1 || rows_found != batch) { closeTransaction(pNdb); return NDBT_FAILED; } } else { for(b = 0; b<batch && (b+r)<records; b++) { if (calc.verifyRowValues(rows[b]) != 0) { closeTransaction(pNdb); return NDBT_FAILED; } int updates = calc.getUpdatesValue(rows[b]) + 1; if(pkUpdateRecord(pNdb, r+b, 1, updates) != NDBT_OK) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } check = pTrans->execute(Commit); } if( check == -1 ) { const NdbError err = pTrans->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); ndbout << "r = " << r << endl; closeTransaction(pNdb); return NDBT_FAILED; } else{ updated += batch; } closeTransaction(pNdb); r += batch; // Read next record } deallocRows(); g_info << "|- " << updated << " records updated" << endl; return NDBT_OK;}int HugoTransactions::pkInterpretedUpdateRecords(Ndb* pNdb, int records, int batch){ int updated = 0; int r = 0; int retryAttempt = 0; const int retryMax = 100; int check, a; while (r < records){ if (retryAttempt >= retryMax){ g_info << "ERROR: has retried this operation " << retryAttempt << " times, failing!" << endl; return NDBT_FAILED; } pTrans = pNdb->startTransaction(); if (pTrans == NULL) { const NdbError err = pNdb->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); return NDBT_FAILED; } NdbOperation* pOp = pTrans->getNdbOperation(tab.getName()); if (pOp == NULL) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } check = pOp->readTupleExclusive(); if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } // Define primary keys for(a = 0; a<tab.getNoOfColumns(); a++){ if (tab.getColumn(a)->getPrimaryKey() == true){ if(equalForAttr(pOp, a, r) != 0){ ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } // Read update value for(a = 0; a<tab.getNoOfColumns(); a++){ if (calc.isUpdateCol(a) == true){ if((row.attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName())) == 0) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } check = pTrans->execute(NoCommit); if( check == -1 ) { const NdbError err = pTrans->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); closeTransaction(pNdb); return NDBT_FAILED; } int updates = calc.getUpdatesValue(&row) + 1; NdbOperation* pUpdOp; pUpdOp = pTrans->getNdbOperation(tab.getName()); if (pUpdOp == NULL) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } check = pUpdOp->interpretedUpdateTuple(); if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } // PKs for(a = 0; a<tab.getNoOfColumns(); a++){ if (tab.getColumn(a)->getPrimaryKey() == true){ if(equalForAttr(pUpdOp, a, r) != 0){ ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } // Update col for(a = 0; a<tab.getNoOfColumns(); a++){ if ((tab.getColumn(a)->getPrimaryKey() == false) && (calc.isUpdateCol(a) == true)){ // TODO switch for 32/64 bit const NdbDictionary::Column* attr = tab.getColumn(a); Uint32 valToIncWith = 1; check = pUpdOp->incValue(attr->getName(), valToIncWith); if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } // Remaining attributes for(a = 0; a<tab.getNoOfColumns(); a++){ if ((tab.getColumn(a)->getPrimaryKey() == false) && (calc.isUpdateCol(a) == false)){ if(setValueForAttr(pUpdOp, a, r, updates ) != 0){ ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } check = pTrans->execute(Commit); if( check == -1 ) { const NdbError err = pTrans->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); ndbout << "r = " << r << endl; closeTransaction(pNdb); return NDBT_FAILED; } else{ updated++; } closeTransaction(pNdb); r++; // Read next record } g_info << "|- " << updated << " records updated" << endl; return NDBT_OK;}int HugoTransactions::pkDelRecords(Ndb* pNdb, int records, int batch, bool allowConstraintViolation, int doSleep){ // TODO Batch is not implemented int deleted = 0; int r = 0; int retryAttempt = 0; const int retryMax = 100; int check, a; NdbOperation *pOp; g_info << "|- Deleting records..." << endl; while (r < records){ if(r + batch > records) batch = records - r; if (retryAttempt >= retryMax){ g_info << "ERROR: has retried this operation " << retryAttempt << " times, failing!" << endl; return NDBT_FAILED; } if (doSleep > 0) NdbSleep_MilliSleep(doSleep); pTrans = pNdb->startTransaction(); if (pTrans == NULL) { const NdbError err = pNdb->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); return NDBT_FAILED; } if(pkDeleteRecord(pNdb, r, batch) != NDBT_OK) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -