⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 utiltransactions.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
📖 第 1 页 / 共 3 页
字号:
/* Copyright (C) 2003 MySQL AB   This program is free software; you can redistribute it and/or modify   it under the terms of the GNU General Public License as published by   the Free Software Foundation; either version 2 of the License, or   (at your option) any later version.   This program is distributed in the hope that it will be useful,   but WITHOUT ANY WARRANTY; without even the implied warranty of   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the   GNU General Public License for more details.   You should have received a copy of the GNU General Public License   along with this program; if not, write to the Free Software   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */#include "UtilTransactions.hpp"#include <NdbSleep.h>#include <NdbScanFilter.hpp>#define VERBOSE 0UtilTransactions::UtilTransactions(const NdbDictionary::Table& _tab,				   const NdbDictionary::Index* _idx):  tab(_tab), idx(_idx), pTrans(0){  m_defaultClearMethod = 3;}UtilTransactions::UtilTransactions(Ndb* ndb, 				   const char * name,				   const char * index) :  tab(* ndb->getDictionary()->getTable(name)),  idx(index ? ndb->getDictionary()->getIndex(index, name) : 0),  pTrans(0){  m_defaultClearMethod = 3;}#define RESTART_SCAN 99#define RETURN_FAIL(err) return (err.code != 0 ? err.code : NDBT_FAILED) int UtilTransactions::clearTable(Ndb* pNdb, 			     int records,			     int parallelism){  if(m_defaultClearMethod == 1){    return clearTable1(pNdb, records, parallelism);  } else if(m_defaultClearMethod == 2){    return clearTable2(pNdb, records, parallelism);  } else {    return clearTable3(pNdb, records, parallelism);  }}int UtilTransactions::clearTable1(Ndb* pNdb, 			     int records,			     int parallelism){  return clearTable3(pNdb, records, 1);}int UtilTransactions::clearTable2(Ndb* pNdb, 			      int records,			      int parallelism){  return clearTable3(pNdb, records, parallelism);}int UtilTransactions::clearTable3(Ndb* pNdb, 			      int records,			      int parallelism){  // Scan all records exclusive and delete   // them one by one  int                  retryAttempt = 0;  const int            retryMax = 10;  int deletedRows = 0;  int check;  NdbScanOperation *pOp;  NdbError err;  int par = parallelism;  while (true){  restart:    if (retryAttempt++ >= retryMax){      g_info << "ERROR: has retried this operation " << retryAttempt 	     << " times, failing!" << endl;      return NDBT_FAILED;    }        pTrans = pNdb->startTransaction();    if (pTrans == NULL) {      err = pNdb->getNdbError();      if (err.status == NdbError::TemporaryError){	ERR(err);	NdbSleep_MilliSleep(50);	continue;      }      goto failed;    }    pOp = getScanOperation(pTrans);    if (pOp == NULL) {      err = pTrans->getNdbError();      if(err.status == NdbError::TemporaryError){	ERR(err);	closeTransaction(pNdb);	NdbSleep_MilliSleep(50);	par = 1;	goto restart;      }      goto failed;    }        if( pOp->readTuplesExclusive(par) ) {      err = pTrans->getNdbError();      goto failed;    }        if(pTrans->execute(NoCommit) != 0){      err = pTrans->getNdbError();          if(err.status == NdbError::TemporaryError){	ERR(err);	closeTransaction(pNdb);	NdbSleep_MilliSleep(50);	continue;      }      goto failed;    }        while((check = pOp->nextResult(true)) == 0){      do {	if (pOp->deleteCurrentTuple() != 0){	  goto failed;	}	deletedRows++;      } while((check = pOp->nextResult(false)) == 0);            if(check != -1){	check = pTrans->execute(Commit);   	pTrans->restart();      }            err = pTrans->getNdbError();          if(check == -1){	if(err.status == NdbError::TemporaryError){	  ERR(err);	  closeTransaction(pNdb);	  NdbSleep_MilliSleep(50);	  par = 1;	  goto restart;	}	goto failed;      }    }    if(check == -1){      err = pTrans->getNdbError();          if(err.status == NdbError::TemporaryError){	ERR(err);	closeTransaction(pNdb);	NdbSleep_MilliSleep(50);	par = 1;	goto restart;      }      goto failed;    }    closeTransaction(pNdb);    return NDBT_OK;  }  return NDBT_FAILED;   failed:  closeTransaction(pNdb);  ERR(err);  return (err.code != 0 ? err.code : NDBT_FAILED);}int UtilTransactions::copyTableData(Ndb* pNdb,			    const char* destName){  // Scan all records and copy   // them to destName table  int                  retryAttempt = 0;  const int            retryMax = 10;  int insertedRows = 0;  int parallelism = 240;  int check;  NdbScanOperation		*pOp;  NDBT_ResultRow       row(tab);    while (true){        if (retryAttempt >= retryMax){      g_info << "ERROR: has retried this operation " << retryAttempt 	     << " times, failing!" << endl;      return NDBT_FAILED;    }    pTrans = pNdb->startTransaction();    if (pTrans == NULL) {      const NdbError err = pNdb->getNdbError();      if (err.status == NdbError::TemporaryError){	ERR(err);	NdbSleep_MilliSleep(50);	retryAttempt++;	continue;      }      ERR(err);      return NDBT_FAILED;    }    pOp = pTrans->getNdbScanOperation(tab.getName());	    if (pOp == NULL) {      ERR(pTrans->getNdbError());      closeTransaction(pNdb);      return NDBT_FAILED;    }    if( pOp->readTuples(NdbScanOperation::LM_Read, parallelism) ) {      ERR(pTrans->getNdbError());      closeTransaction(pNdb);      return NDBT_FAILED;    }    check = pOp->interpret_exit_ok();    if( check == -1 ) {      ERR(pTrans->getNdbError());      closeTransaction(pNdb);      return NDBT_FAILED;    }      // Read all attributes    for (int a = 0; a < tab.getNoOfColumns(); a++){      if ((row.attributeStore(a) =  	   pOp->getValue(tab.getColumn(a)->getName())) == 0) {	ERR(pTrans->getNdbError());	closeTransaction(pNdb);	return NDBT_FAILED;      }    }        check = pTrans->execute(NoCommit);    if( check == -1 ) {      ERR(pTrans->getNdbError());      closeTransaction(pNdb);      return NDBT_FAILED;    }      int eof;    while((eof = pOp->nextResult(true)) == 0){      do {	insertedRows++;	if (addRowToInsert(pNdb, pTrans, row, destName) != 0){	  closeTransaction(pNdb);	  return NDBT_FAILED;	}      } while((eof = pOp->nextResult(false)) == 0);            check = pTrans->execute(Commit);         pTrans->restart();      if( check == -1 ) {	const NdbError err = pTrans->getNdbError();    	ERR(err);	closeTransaction(pNdb);	return NDBT_FAILED;      }    }      if (eof == -1) {      const NdbError err = pTrans->getNdbError();            if (err.status == NdbError::TemporaryError){	ERR(err);	closeTransaction(pNdb);	NdbSleep_MilliSleep(50);	// If error = 488 there should be no limit on number of retry attempts	if (err.code != 488) 	  retryAttempt++;	continue;      }      ERR(err);      closeTransaction(pNdb);      return NDBT_FAILED;    }        closeTransaction(pNdb);        g_info << insertedRows << " rows copied" << endl;        return NDBT_OK;  }  return NDBT_FAILED;}int UtilTransactions::addRowToInsert(Ndb* pNdb,				 NdbConnection* pInsTrans,				 NDBT_ResultRow & row,				 const char *insertTabName){  int check;  NdbOperation* pInsOp;  pInsOp = pInsTrans->getNdbOperation(insertTabName);	  if (pInsOp == NULL) {    ERR(pInsTrans->getNdbError());    return NDBT_FAILED;  }    check = pInsOp->insertTuple();  if( check == -1 ) {    ERR(pInsTrans->getNdbError());    return NDBT_FAILED;  }  // Set all attributes  for (int a = 0; a < tab.getNoOfColumns(); a++){    NdbRecAttr* r =  row.attributeStore(a);    int	 sz = r->attrSize() * r->arraySize();    if (pInsOp->setValue(tab.getColumn(a)->getName(),			 r->aRef(),			 sz) != 0) {      ERR(pInsTrans->getNdbError());      return NDBT_FAILED;    }  }    return NDBT_OK;}int UtilTransactions::scanReadRecords(Ndb* pNdb,				  int parallelism,				  NdbOperation::LockMode lm,				  int records,				  int noAttribs,				  int *attrib_list,				  ReadCallBackFn* fn){    int                  retryAttempt = 0;  const int            retryMax = 100;  int                  check;  NdbScanOperation	       *pOp;  NDBT_ResultRow       row(tab);  while (true){    if (retryAttempt >= retryMax){      g_info << "ERROR: has retried this operation " << retryAttempt 	     << " times, failing!" << endl;      return NDBT_FAILED;    }    pTrans = pNdb->startTransaction();    if (pTrans == NULL) {      const NdbError err = pNdb->getNdbError();      if (err.status == NdbError::TemporaryError){	ERR(err);	NdbSleep_MilliSleep(50);	retryAttempt++;	continue;      }      ERR(err);      return NDBT_FAILED;    }    pOp = getScanOperation(pTrans);    if (pOp == NULL) {      const NdbError err = pNdb->getNdbError();      closeTransaction(pNdb);      if (err.status == NdbError::TemporaryError){	ERR(err);	NdbSleep_MilliSleep(50);	retryAttempt++;	continue;      }      ERR(err);      return NDBT_FAILED;    }    if( pOp->readTuples(lm, 0, parallelism) ) {      ERR(pTrans->getNdbError());      closeTransaction(pNdb);      return NDBT_FAILED;    }    check = pOp->interpret_exit_ok();    if( check == -1 ) {      ERR(pTrans->getNdbError());      closeTransaction(pNdb);      return NDBT_FAILED;    }        // Call getValue for all the attributes supplied in attrib_list    // ************************************************    for (int a = 0; a < noAttribs; a++){      if (attrib_list[a] < tab.getNoOfColumns()){ 	g_info << "getValue(" << attrib_list[a] << ")" << endl;	if ((row.attributeStore(attrib_list[a]) =  	     pOp->getValue(tab.getColumn(attrib_list[a])->getName())) == 0) {	  ERR(pTrans->getNdbError());	  closeTransaction(pNdb);	  return NDBT_FAILED;	}      }    }    // *************************************************        check = pTrans->execute(NoCommit);    if( check == -1 ) {      const NdbError err = pTrans->getNdbError();      if (err.status == NdbError::TemporaryError){	ERR(err);	closeTransaction(pNdb);	NdbSleep_MilliSleep(50);	retryAttempt++;	continue;      }      ERR(err);      closeTransaction(pNdb);      return NDBT_FAILED;    }        int eof;    int rows = 0;            while((eof = pOp->nextResult()) == 0){      rows++;            // Call callback for each record returned      if(fn != NULL)	fn(&row);    }    if (eof == -1) {      const NdbError err = pTrans->getNdbError();            if (err.status == NdbError::TemporaryError){	ERR(err);	closeTransaction(pNdb);	NdbSleep_MilliSleep(50);	retryAttempt++;	continue;      }      ERR(err);      closeTransaction(pNdb);      return NDBT_FAILED;    }        closeTransaction(pNdb);    g_info << rows << " rows have been read" << endl;    if (records != 0 && rows != records){      g_info << "Check expected number of records failed" << endl 	     << "  expected=" << records <<", " << endl	     << "  read=" << rows << endl;      return NDBT_FAILED;    }        return NDBT_OK;  }  return NDBT_FAILED;}int UtilTransactions::selectCount(Ndb* pNdb, 			      int parallelism,			      int* count_rows,			      NdbOperation::LockMode lm,			      NdbConnection* pTrans){    int                  retryAttempt = 0;  const int            retryMax = 100;  int                  check;  NdbScanOperation     *pOp;  if(!pTrans)    pTrans = pNdb->startTransaction();

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -