⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 hugotransactions.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
📖 第 1 页 / 共 4 页
字号:
    }    if (recUpdateEvent[i].pk != i) {      stats.n_consecutive ++;      ndbout << "missing update pk " << i << endl;    } else if (recUpdateEvent[i].count > 1) {      ndbout << "duplicates update pk " << i	     << " count " << recUpdateEvent[i].count << endl;      stats.n_duplicates += recUpdateEvent[i].count-1;    }    if (recDeleteEvent[i].pk != i) {      stats.n_consecutive ++;      ndbout << "missing delete pk " << i << endl;    } else if (recDeleteEvent[i].count > 1) {      ndbout << "duplicates delete pk " << i	     << " count " << recDeleteEvent[i].count << endl;      stats.n_duplicates += recDeleteEvent[i].count-1;    }  }    return NDBT_OK;}int HugoTransactions::pkReadRecords(Ndb* pNdb, 				int records,				int batch,				NdbOperation::LockMode lm){  int                  reads = 0;  int                  r = 0;  int                  retryAttempt = 0;  const int            retryMax = 100;  int                  check, a;  if (batch == 0) {    g_info << "ERROR: Argument batch == 0 in pkReadRecords(). Not allowed." << endl;    return NDBT_FAILED;  }  while (r < records){    if(r + batch > records)      batch = records - r;    if (retryAttempt >= retryMax){      g_info << "ERROR: has retried this operation " << retryAttempt 	     << " times, failing!" << endl;      return NDBT_FAILED;    }        pTrans = pNdb->startTransaction();    if (pTrans == NULL) {      const NdbError err = pNdb->getNdbError();            if (err.status == NdbError::TemporaryError){	ERR(err);	NdbSleep_MilliSleep(50);	retryAttempt++;	continue;      }      ERR(err);      return NDBT_FAILED;    }    if(pkReadRecord(pNdb, r, batch, lm) != NDBT_OK)    {      ERR(pTrans->getNdbError());      closeTransaction(pNdb);      return NDBT_FAILED;    }        check = pTrans->execute(Commit);       if( check == -1 ) {      const NdbError err = pTrans->getNdbError();            if (err.status == NdbError::TemporaryError){	ERR(err);	closeTransaction(pNdb);	NdbSleep_MilliSleep(50);	retryAttempt++;	continue;      }      switch(err.code){      case 626: // Tuple did not exist	g_info << r << ": " << err.code << " " << err.message << endl;	r++;	break;      default:	ERR(err);	closeTransaction(pNdb);	return NDBT_FAILED;      }    } else {      if(pIndexScanOp)      {	int rows_found = 0;	while((check = pIndexScanOp->nextResult()) == 0)	{	  rows_found++;	  if (calc.verifyRowValues(rows[0]) != 0){	    closeTransaction(pNdb);	    return NDBT_FAILED;	  }	}	if(check != 1 || rows_found > batch)	{	  closeTransaction(pNdb);	  return NDBT_FAILED;	}	else if(rows_found < batch)	{	  if(batch == 1){	    g_info << r << ": not found" << endl; abort(); }	  else	    g_info << "Found " << rows_found << " of " 		   << batch << " rows" << endl;	}	r += batch;	reads += rows_found;      }      else       {	for (int b=0; (b<batch) && (r+b<records); b++){ 	  if (calc.verifyRowValues(rows[b]) != 0){	    closeTransaction(pNdb);	    return NDBT_FAILED;	  }	  reads++;	  r++;	}      }    }        closeTransaction(pNdb);  }  deallocRows();  g_info << reads << " records read" << endl;  return NDBT_OK;}int HugoTransactions::pkUpdateRecords(Ndb* pNdb, 				  int records,				  int batch,				  int doSleep){  int updated = 0;  int                  r = 0;  int                  retryAttempt = 0;  const int            retryMax = 100;  int                  check, a, b;  NdbOperation	       *pOp;  allocRows(batch);  g_info << "|- Updating records (batch=" << batch << ")..." << endl;  while (r < records){    if(r + batch > records)      batch = records - r;        if (retryAttempt >= retryMax){      g_info << "ERROR: has retried this operation " << retryAttempt 	     << " times, failing!" << endl;      return NDBT_FAILED;    }        if (doSleep > 0)      NdbSleep_MilliSleep(doSleep);    pTrans = pNdb->startTransaction();    if (pTrans == NULL) {      const NdbError err = pNdb->getNdbError();            if (err.status == NdbError::TemporaryError){	ERR(err);	NdbSleep_MilliSleep(50);	retryAttempt++;	continue;      }      ERR(err);      return NDBT_FAILED;    }    if(pkReadRecord(pNdb, r, batch, NdbOperation::LM_Exclusive) != NDBT_OK)    {      ERR(pTrans->getNdbError());      closeTransaction(pNdb);      return NDBT_FAILED;    }        check = pTrans->execute(NoCommit);       if( check == -1 ) {      const NdbError err = pTrans->getNdbError();            if (err.status == NdbError::TemporaryError){	ERR(err);	closeTransaction(pNdb);	NdbSleep_MilliSleep(50);	retryAttempt++;	continue;      }      ERR(err);      closeTransaction(pNdb);      return NDBT_FAILED;    }    if(pIndexScanOp)    {      int rows_found = 0;      while((check = pIndexScanOp->nextResult(true)) == 0)      {	do {	  	  if (calc.verifyRowValues(rows[0]) != 0){	    closeTransaction(pNdb);	    return NDBT_FAILED;	  }	  	  int updates = calc.getUpdatesValue(rows[0]) + 1;	  	  if(pkUpdateRecord(pNdb, r+rows_found, 1, updates) != NDBT_OK)	  {	    ERR(pTrans->getNdbError());	    closeTransaction(pNdb);	    return NDBT_FAILED;	  }	  rows_found++;	} while((check = pIndexScanOp->nextResult(false)) == 0);		if(check != 2)	  break;	if((check = pTrans->execute(NoCommit)) != 0)	  break;      }      if(check != 1 || rows_found != batch)      {	closeTransaction(pNdb);	return NDBT_FAILED;      }    }    else    {      for(b = 0; b<batch && (b+r)<records; b++)      {	if (calc.verifyRowValues(rows[b]) != 0)	{	  closeTransaction(pNdb);	  return NDBT_FAILED;	}		int updates = calc.getUpdatesValue(rows[b]) + 1;		if(pkUpdateRecord(pNdb, r+b, 1, updates) != NDBT_OK)	{	  ERR(pTrans->getNdbError());	  closeTransaction(pNdb);	  return NDBT_FAILED;	}      }      check = pTrans->execute(Commit);       }    if( check == -1 ) {      const NdbError err = pTrans->getNdbError();      if (err.status == NdbError::TemporaryError){	ERR(err);	closeTransaction(pNdb);	NdbSleep_MilliSleep(50);	retryAttempt++;	continue;      }      ERR(err);      ndbout << "r = " << r << endl;      closeTransaction(pNdb);      return NDBT_FAILED;    }    else{      updated += batch;    }        closeTransaction(pNdb);        r += batch; // Read next record  }    deallocRows();  g_info << "|- " << updated << " records updated" << endl;  return NDBT_OK;}int HugoTransactions::pkInterpretedUpdateRecords(Ndb* pNdb, 					     int records,					     int batch){  int updated = 0;  int r = 0;  int retryAttempt = 0;  const int retryMax = 100;  int check, a;  while (r < records){        if (retryAttempt >= retryMax){      g_info << "ERROR: has retried this operation " << retryAttempt 	     << " times, failing!" << endl;      return NDBT_FAILED;    }        pTrans = pNdb->startTransaction();    if (pTrans == NULL) {      const NdbError err = pNdb->getNdbError();            if (err.status == NdbError::TemporaryError){	ERR(err);	NdbSleep_MilliSleep(50);	retryAttempt++;	continue;      }      ERR(err);      return NDBT_FAILED;    }   NdbOperation* pOp = pTrans->getNdbOperation(tab.getName());	   if (pOp == NULL) {     ERR(pTrans->getNdbError());     closeTransaction(pNdb);     return NDBT_FAILED;   }      check = pOp->readTupleExclusive();   if( check == -1 ) {     ERR(pTrans->getNdbError());     closeTransaction(pNdb);     return NDBT_FAILED;   }      // Define primary keys   for(a = 0; a<tab.getNoOfColumns(); a++){     if (tab.getColumn(a)->getPrimaryKey() == true){       if(equalForAttr(pOp, a, r) != 0){	 ERR(pTrans->getNdbError());	 closeTransaction(pNdb);	 return NDBT_FAILED;       }     }   }      // Read update value   for(a = 0; a<tab.getNoOfColumns(); a++){     if (calc.isUpdateCol(a) == true){       if((row.attributeStore(a) = 	   pOp->getValue(tab.getColumn(a)->getName())) == 0) {	 ERR(pTrans->getNdbError());	 closeTransaction(pNdb);	 return NDBT_FAILED;       }     }   }       check = pTrans->execute(NoCommit);       if( check == -1 ) {      const NdbError err = pTrans->getNdbError();      if (err.status == NdbError::TemporaryError){	ERR(err);	closeTransaction(pNdb);	NdbSleep_MilliSleep(50);	retryAttempt++;	continue;      }      ERR(err);      closeTransaction(pNdb);      return NDBT_FAILED;    }    int updates = calc.getUpdatesValue(&row) + 1;    NdbOperation* pUpdOp;    pUpdOp = pTrans->getNdbOperation(tab.getName());	    if (pUpdOp == NULL) {      ERR(pTrans->getNdbError());      closeTransaction(pNdb);      return NDBT_FAILED;    }    check = pUpdOp->interpretedUpdateTuple();    if( check == -1 ) {      ERR(pTrans->getNdbError());      closeTransaction(pNdb);      return NDBT_FAILED;    }    // PKs    for(a = 0; a<tab.getNoOfColumns(); a++){      if (tab.getColumn(a)->getPrimaryKey() == true){	if(equalForAttr(pUpdOp, a, r) != 0){	  ERR(pTrans->getNdbError());	  closeTransaction(pNdb);	  return NDBT_FAILED;	}      }    }    // Update col    for(a = 0; a<tab.getNoOfColumns(); a++){      if ((tab.getColumn(a)->getPrimaryKey() == false) && 	  (calc.isUpdateCol(a) == true)){		// TODO switch for 32/64 bit	const NdbDictionary::Column* attr = tab.getColumn(a);	Uint32 valToIncWith = 1;	check = pUpdOp->incValue(attr->getName(), valToIncWith);	if( check == -1 ) {	  ERR(pTrans->getNdbError());	  closeTransaction(pNdb);	  return NDBT_FAILED;	}      }    }    // Remaining attributes    for(a = 0; a<tab.getNoOfColumns(); a++){      if ((tab.getColumn(a)->getPrimaryKey() == false) && 	  (calc.isUpdateCol(a) == false)){	if(setValueForAttr(pUpdOp, a, r, updates ) != 0){	  ERR(pTrans->getNdbError());	  closeTransaction(pNdb);	  return NDBT_FAILED;	}      }    }        check = pTrans->execute(Commit);       if( check == -1 ) {      const NdbError err = pTrans->getNdbError();      if (err.status == NdbError::TemporaryError){	ERR(err);	closeTransaction(pNdb);	NdbSleep_MilliSleep(50);	retryAttempt++;	continue;      }      ERR(err);      ndbout << "r = " << r << endl;      closeTransaction(pNdb);      return NDBT_FAILED;    }    else{      updated++;    }    closeTransaction(pNdb);    r++; // Read next record  }  g_info << "|- " << updated << " records updated" << endl;  return NDBT_OK;}int HugoTransactions::pkDelRecords(Ndb* pNdb, 			       int records,			       int batch,			       bool allowConstraintViolation,			       int doSleep){  // TODO Batch is not implemented  int deleted = 0;  int                  r = 0;  int                  retryAttempt = 0;  const int            retryMax = 100;  int                  check, a;  NdbOperation	       *pOp;  g_info << "|- Deleting records..." << endl;  while (r < records){    if(r + batch > records)      batch = records - r;    if (retryAttempt >= retryMax){      g_info << "ERROR: has retried this operation " << retryAttempt 	     << " times, failing!" << endl;      return NDBT_FAILED;    }    if (doSleep > 0)      NdbSleep_MilliSleep(doSleep);    pTrans = pNdb->startTransaction();    if (pTrans == NULL) {      const NdbError err = pNdb->getNdbError();      if (err.status == NdbError::TemporaryError){	ERR(err);	NdbSleep_MilliSleep(50);	retryAttempt++;	continue;      }      ERR(err);      return NDBT_FAILED;    }    if(pkDeleteRecord(pNdb, r, batch) != NDBT_OK)    {      ERR(pTrans->getNdbError());      closeTransaction(pNdb);      return NDBT_FAILED;    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -