📄 utiltransactions.cpp
字号:
/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#include "UtilTransactions.hpp"#include <NdbSleep.h>#include <NdbScanFilter.hpp>#define VERBOSE 0UtilTransactions::UtilTransactions(const NdbDictionary::Table& _tab, const NdbDictionary::Index* _idx): tab(_tab), idx(_idx), pTrans(0){ m_defaultClearMethod = 3;}UtilTransactions::UtilTransactions(Ndb* ndb, const char * name, const char * index) : tab(* ndb->getDictionary()->getTable(name)), idx(index ? ndb->getDictionary()->getIndex(index, name) : 0), pTrans(0){ m_defaultClearMethod = 3;}#define RESTART_SCAN 99#define RETURN_FAIL(err) return (err.code != 0 ? err.code : NDBT_FAILED) int UtilTransactions::clearTable(Ndb* pNdb, int records, int parallelism){ if(m_defaultClearMethod == 1){ return clearTable1(pNdb, records, parallelism); } else if(m_defaultClearMethod == 2){ return clearTable2(pNdb, records, parallelism); } else { return clearTable3(pNdb, records, parallelism); }}int UtilTransactions::clearTable1(Ndb* pNdb, int records, int parallelism){ return clearTable3(pNdb, records, 1);}int UtilTransactions::clearTable2(Ndb* pNdb, int records, int parallelism){ return clearTable3(pNdb, records, parallelism);}int UtilTransactions::clearTable3(Ndb* pNdb, int records, int parallelism){ // Scan all records exclusive and delete // them one by one int retryAttempt = 0; const int retryMax = 10; int deletedRows = 0; int check; NdbScanOperation *pOp; NdbError err; int par = parallelism; while (true){ restart: if (retryAttempt++ >= retryMax){ g_info << "ERROR: has retried this operation " << retryAttempt << " times, failing!" << endl; return NDBT_FAILED; } pTrans = pNdb->startTransaction(); if (pTrans == NULL) { err = pNdb->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); NdbSleep_MilliSleep(50); continue; } goto failed; } pOp = getScanOperation(pTrans); if (pOp == NULL) { err = pTrans->getNdbError(); if(err.status == NdbError::TemporaryError){ ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); par = 1; goto restart; } goto failed; } if( pOp->readTuplesExclusive(par) ) { err = pTrans->getNdbError(); goto failed; } if(pTrans->execute(NoCommit) != 0){ err = pTrans->getNdbError(); if(err.status == NdbError::TemporaryError){ ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); continue; } goto failed; } while((check = pOp->nextResult(true)) == 0){ do { if (pOp->deleteCurrentTuple() != 0){ goto failed; } deletedRows++; } while((check = pOp->nextResult(false)) == 0); if(check != -1){ check = pTrans->execute(Commit); pTrans->restart(); } err = pTrans->getNdbError(); if(check == -1){ if(err.status == NdbError::TemporaryError){ ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); par = 1; goto restart; } goto failed; } } if(check == -1){ err = pTrans->getNdbError(); if(err.status == NdbError::TemporaryError){ ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); par = 1; goto restart; } goto failed; } closeTransaction(pNdb); return NDBT_OK; } return NDBT_FAILED; failed: closeTransaction(pNdb); ERR(err); return (err.code != 0 ? err.code : NDBT_FAILED);}int UtilTransactions::copyTableData(Ndb* pNdb, const char* destName){ // Scan all records and copy // them to destName table int retryAttempt = 0; const int retryMax = 10; int insertedRows = 0; int parallelism = 240; int check; NdbScanOperation *pOp; NDBT_ResultRow row(tab); while (true){ if (retryAttempt >= retryMax){ g_info << "ERROR: has retried this operation " << retryAttempt << " times, failing!" << endl; return NDBT_FAILED; } pTrans = pNdb->startTransaction(); if (pTrans == NULL) { const NdbError err = pNdb->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); return NDBT_FAILED; } pOp = pTrans->getNdbScanOperation(tab.getName()); if (pOp == NULL) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } if( pOp->readTuples(NdbScanOperation::LM_Read, parallelism) ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } check = pOp->interpret_exit_ok(); if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } // Read all attributes for (int a = 0; a < tab.getNoOfColumns(); a++){ if ((row.attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName())) == 0) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } check = pTrans->execute(NoCommit); if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } int eof; while((eof = pOp->nextResult(true)) == 0){ do { insertedRows++; if (addRowToInsert(pNdb, pTrans, row, destName) != 0){ closeTransaction(pNdb); return NDBT_FAILED; } } while((eof = pOp->nextResult(false)) == 0); check = pTrans->execute(Commit); pTrans->restart(); if( check == -1 ) { const NdbError err = pTrans->getNdbError(); ERR(err); closeTransaction(pNdb); return NDBT_FAILED; } } if (eof == -1) { const NdbError err = pTrans->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); // If error = 488 there should be no limit on number of retry attempts if (err.code != 488) retryAttempt++; continue; } ERR(err); closeTransaction(pNdb); return NDBT_FAILED; } closeTransaction(pNdb); g_info << insertedRows << " rows copied" << endl; return NDBT_OK; } return NDBT_FAILED;}int UtilTransactions::addRowToInsert(Ndb* pNdb, NdbConnection* pInsTrans, NDBT_ResultRow & row, const char *insertTabName){ int check; NdbOperation* pInsOp; pInsOp = pInsTrans->getNdbOperation(insertTabName); if (pInsOp == NULL) { ERR(pInsTrans->getNdbError()); return NDBT_FAILED; } check = pInsOp->insertTuple(); if( check == -1 ) { ERR(pInsTrans->getNdbError()); return NDBT_FAILED; } // Set all attributes for (int a = 0; a < tab.getNoOfColumns(); a++){ NdbRecAttr* r = row.attributeStore(a); int sz = r->attrSize() * r->arraySize(); if (pInsOp->setValue(tab.getColumn(a)->getName(), r->aRef(), sz) != 0) { ERR(pInsTrans->getNdbError()); return NDBT_FAILED; } } return NDBT_OK;}int UtilTransactions::scanReadRecords(Ndb* pNdb, int parallelism, NdbOperation::LockMode lm, int records, int noAttribs, int *attrib_list, ReadCallBackFn* fn){ int retryAttempt = 0; const int retryMax = 100; int check; NdbScanOperation *pOp; NDBT_ResultRow row(tab); while (true){ if (retryAttempt >= retryMax){ g_info << "ERROR: has retried this operation " << retryAttempt << " times, failing!" << endl; return NDBT_FAILED; } pTrans = pNdb->startTransaction(); if (pTrans == NULL) { const NdbError err = pNdb->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); return NDBT_FAILED; } pOp = getScanOperation(pTrans); if (pOp == NULL) { const NdbError err = pNdb->getNdbError(); closeTransaction(pNdb); if (err.status == NdbError::TemporaryError){ ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); return NDBT_FAILED; } if( pOp->readTuples(lm, 0, parallelism) ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } check = pOp->interpret_exit_ok(); if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } // Call getValue for all the attributes supplied in attrib_list // ************************************************ for (int a = 0; a < noAttribs; a++){ if (attrib_list[a] < tab.getNoOfColumns()){ g_info << "getValue(" << attrib_list[a] << ")" << endl; if ((row.attributeStore(attrib_list[a]) = pOp->getValue(tab.getColumn(attrib_list[a])->getName())) == 0) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } // ************************************************* check = pTrans->execute(NoCommit); if( check == -1 ) { const NdbError err = pTrans->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); closeTransaction(pNdb); return NDBT_FAILED; } int eof; int rows = 0; while((eof = pOp->nextResult()) == 0){ rows++; // Call callback for each record returned if(fn != NULL) fn(&row); } if (eof == -1) { const NdbError err = pTrans->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); closeTransaction(pNdb); return NDBT_FAILED; } closeTransaction(pNdb); g_info << rows << " rows have been read" << endl; if (records != 0 && rows != records){ g_info << "Check expected number of records failed" << endl << " expected=" << records <<", " << endl << " read=" << rows << endl; return NDBT_FAILED; } return NDBT_OK; } return NDBT_FAILED;}int UtilTransactions::selectCount(Ndb* pNdb, int parallelism, int* count_rows, NdbOperation::LockMode lm, NdbConnection* pTrans){ int retryAttempt = 0; const int retryMax = 100; int check; NdbScanOperation *pOp; if(!pTrans) pTrans = pNdb->startTransaction();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -