⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 hugotransactions.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
📖 第 1 页 / 共 4 页
字号:
intHugoTransactions::loadTable(Ndb* pNdb, 			    int records,			    int batch,			    bool allowConstraintViolation,			    int doSleep,                            bool oneTrans){  int             check, a;  int             retryAttempt = 0;  int             retryMax = 5;  NdbOperation	  *pOp;  bool            first_batch = true;  const int org = batch;  const int cols = tab.getNoOfColumns();  const int brow = tab.getRowSizeInBytes();  const int bytes = 12 + brow + 4 * cols;  batch = (batch * 256); // -> 512 -> 65536k per commit  batch = batch/bytes;   //   batch = batch == 0 ? 1 : batch;   if(batch != org){    g_info << "batch = " << org << " rowsize = " << bytes	   << " -> rows/commit = " << batch << endl;  }    g_info << "|- Inserting records..." << endl;  for (int c=0 ; c<records ; ){    bool closeTrans = true;    if(c + batch > records)      batch = records - c;        if (retryAttempt >= retryMax){      g_info << "Record " << c << " could not be inserted, has retried "	     << retryAttempt << " times " << endl;      // Reset retry counters and continue with next record      retryAttempt = 0;      c++;    }    if (doSleep > 0)      NdbSleep_MilliSleep(doSleep);    //    if (first_batch || !oneTrans) {    if (first_batch || !pTrans) {      first_batch = false;      pTrans = pNdb->startTransaction();      if (pTrans == NULL) {        const NdbError err = pNdb->getNdbError();        if (err.status == NdbError::TemporaryError){          ERR(err);	  NdbSleep_MilliSleep(50);	  retryAttempt++;	  continue;        }        ERR(err);        return NDBT_FAILED;      }    }    if(pkInsertRecord(pNdb, c, batch) != NDBT_OK)    {       ERR(pTrans->getNdbError());      closeTransaction(pNdb);      return NDBT_FAILED;    }        // Execute the transaction and insert the record    if (!oneTrans || (c + batch) >= records) {      //      closeTrans = true;      closeTrans = false;      check = pTrans->execute( Commit );      pTrans->restart();    } else {      closeTrans = false;      check = pTrans->execute( NoCommit );    }    if(check == -1 ) {      const NdbError err = pTrans->getNdbError();      closeTransaction(pNdb);      pTrans= 0;      switch(err.status){      case NdbError::Success:	ERR(err);	g_info << "ERROR: NdbError reports success when transcaction failed"	       << endl;	return NDBT_FAILED;	break;	      case NdbError::TemporaryError:      	ERR(err);	NdbSleep_MilliSleep(50);	retryAttempt++;	continue;	break;	      case NdbError::UnknownResult:	ERR(err);	return NDBT_FAILED;	break;	      case NdbError::PermanentError:	if (allowConstraintViolation == true){	  switch (err.classification){	  case NdbError::ConstraintViolation:	    // Tuple already existed, OK but should be reported	    g_info << c << ": " << err.code << " " << err.message << endl;	    c++;	    continue;	    break;	  default:	    	    break;	  }	}	ERR(err);	return err.code;	break;      }    }    else{      if (closeTrans) {        closeTransaction(pNdb);	pTrans= 0;      }    }        // Step to next record    c = c+batch;     retryAttempt = 0;  }  if(pTrans)    closeTransaction(pNdb);  return NDBT_OK;}intHugoTransactions::fillTable(Ndb* pNdb, 			    int batch){  int             check, a, b;  int             retryAttempt = 0;  int             retryMax = 5;  NdbOperation	  *pOp;  const int org = batch;  const int cols = tab.getNoOfColumns();  const int brow = tab.getRowSizeInBytes();  const int bytes = 12 + brow + 4 * cols;  batch = (batch * 256); // -> 512 -> 65536k per commit  batch = batch/bytes;   //   batch = batch == 0 ? 1 : batch;   if(batch != org){    g_info << "batch = " << org << " rowsize = " << bytes	   << " -> rows/commit = " << batch << endl;  }    for (int c=0 ; ; ){    if (retryAttempt >= retryMax){      g_info << "Record " << c << " could not be inserted, has retried "	     << retryAttempt << " times " << endl;      // Reset retry counters and continue with next record      retryAttempt = 0;      c++;    }        pTrans = pNdb->startTransaction();    if (pTrans == NULL) {      const NdbError err = pNdb->getNdbError();      if (err.status == NdbError::TemporaryError){	ERR(err);	NdbSleep_MilliSleep(50);	retryAttempt++;	continue;      }      ERR(err);      return NDBT_FAILED;    }    if(pkInsertRecord(pNdb, c, batch) != NDBT_OK)    {      ERR(pTrans->getNdbError());      closeTransaction(pNdb);      return NDBT_FAILED;    }        // Execute the transaction and insert the record    check = pTrans->execute( Commit, CommitAsMuchAsPossible );     if(check == -1 ) {      const NdbError err = pTrans->getNdbError();      closeTransaction(pNdb);            switch(err.status){      case NdbError::Success:	ERR(err);	g_info << "ERROR: NdbError reports success when transcaction failed"	       << endl;	return NDBT_FAILED;	break;	      case NdbError::TemporaryError:      	ERR(err);	NdbSleep_MilliSleep(50);	retryAttempt++;	continue;	break;	      case NdbError::UnknownResult:	ERR(err);	return NDBT_FAILED;	break;	      case NdbError::PermanentError:	//  if (allowConstraintViolation == true){	//    switch (err.classification){	//    case NdbError::ConstraintViolation:	//      // Tuple already existed, OK but should be reported	//      g_info << c << ": " << err.code << " " << err.message << endl;	//      c++;	//      continue;	//      break;	//    default:	    	//      break;es	//     }	//   }	// Check if this is the "db full" error 	if (err.classification==NdbError::InsufficientSpace){	  ERR(err);	  return NDBT_OK;	}	if (err.classification == NdbError::ConstraintViolation){	  ERR(err);	  break;	}	ERR(err);	return NDBT_FAILED;	break;      }    }    else{            closeTransaction(pNdb);    }        // Step to next record    c = c+batch;     retryAttempt = 0;  }  return NDBT_OK;}int HugoTransactions::createEvent(Ndb* pNdb){  char eventName[1024];  sprintf(eventName,"%s_EVENT",tab.getName());  NdbDictionary::Dictionary *myDict = pNdb->getDictionary();  if (!myDict) {    g_err << "Dictionary not found " 	  << pNdb->getNdbError().code << " "	  << pNdb->getNdbError().message << endl;    return NDBT_FAILED;  }  NdbDictionary::Event myEvent(eventName);  myEvent.setTable(tab.getName());  myEvent.addTableEvent(NdbDictionary::Event::TE_ALL);   //  myEvent.addTableEvent(NdbDictionary::Event::TE_INSERT);   //  myEvent.addTableEvent(NdbDictionary::Event::TE_UPDATE);   //  myEvent.addTableEvent(NdbDictionary::Event::TE_DELETE);  //  const NdbDictionary::Table *_table = myDict->getTable(tab.getName());  for(int a = 0; a < tab.getNoOfColumns(); a++){    //    myEvent.addEventColumn(_table->getColumn(a)->getName());    myEvent.addEventColumn(a);  }  int res = myDict->createEvent(myEvent); // Add event to database  if (res == 0)    myEvent.print();  else if (myDict->getNdbError().classification ==	   NdbError::SchemaObjectExists)   {    g_info << "Event creation failed event exists\n";    res = myDict->dropEvent(eventName);    if (res) {      g_err << "Failed to drop event: " 	    << myDict->getNdbError().code << " : "	    << myDict->getNdbError().message << endl;      return NDBT_FAILED;    }    // try again    res = myDict->createEvent(myEvent); // Add event to database    if (res) {      g_err << "Failed to create event (1): " 	    << myDict->getNdbError().code << " : "	    << myDict->getNdbError().message << endl;      return NDBT_FAILED;    }  }  else   {    g_err << "Failed to create event (2): " 	  << myDict->getNdbError().code << " : "	  << myDict->getNdbError().message << endl;    return NDBT_FAILED;  }  return NDBT_OK;}#include <NdbEventOperation.hpp>#include "TestNdbEventOperation.hpp"#include <NdbAutoPtr.hpp>struct receivedEvent {  Uint32 pk;  Uint32 count;  Uint32 event;};int XXXXX = 0;int HugoTransactions::eventOperation(Ndb* pNdb, void* pstats,				 int records) {  int myXXXXX = XXXXX++;  Uint32 i;  const char function[] = "HugoTransactions::eventOperation: ";  struct receivedEvent* recInsertEvent;  NdbAutoObjArrayPtr<struct receivedEvent>    p00( recInsertEvent = new struct receivedEvent[3*records] );  struct receivedEvent* recUpdateEvent = &recInsertEvent[records];  struct receivedEvent* recDeleteEvent = &recInsertEvent[2*records];  EventOperationStats &stats = *(EventOperationStats*)pstats;  stats.n_inserts = 0;  stats.n_deletes = 0;  stats.n_updates = 0;  stats.n_consecutive = 0;  stats.n_duplicates = 0;  stats.n_inconsistent_gcis = 0;  for (i = 0; i < records; i++) {    recInsertEvent[i].pk    = 0xFFFFFFFF;    recInsertEvent[i].count = 0;    recInsertEvent[i].event = 0xFFFFFFFF;    recUpdateEvent[i].pk    = 0xFFFFFFFF;    recUpdateEvent[i].count = 0;    recUpdateEvent[i].event = 0xFFFFFFFF;    recDeleteEvent[i].pk    = 0xFFFFFFFF;    recDeleteEvent[i].count = 0;    recDeleteEvent[i].event = 0xFFFFFFFF;  }  NdbDictionary::Dictionary *myDict = pNdb->getDictionary();  if (!myDict) {    g_err << function << "Event Creation failedDictionary not found\n";    return NDBT_FAILED;  }  int                  r = 0;  NdbEventOperation    *pOp;  char eventName[1024];  sprintf(eventName,"%s_EVENT",tab.getName());  int noEventColumnName = tab.getNoOfColumns();  g_info << function << "create EventOperation\n";  pOp = pNdb->createEventOperation(eventName, 100);  if ( pOp == NULL ) {    g_err << function << "Event operation creation failed\n";    return NDBT_FAILED;  }  g_info << function << "get values\n";  NdbRecAttr* recAttr[1024];  NdbRecAttr* recAttrPre[1024];  const NdbDictionary::Table *_table = myDict->getTable(tab.getName());  for (int a = 0; a < noEventColumnName; a++) {    recAttr[a]    = pOp->getValue(_table->getColumn(a)->getName());    recAttrPre[a] = pOp->getPreValue(_table->getColumn(a)->getName());  }    // set up the callbacks  g_info << function << "execute\n";  if (pOp->execute()) { // This starts changes to "start flowing"    g_err << function << "operation execution failed: \n";    g_err << pOp->getNdbError().code << " "	  << pOp->getNdbError().message << endl;    return NDBT_FAILED;  }  g_info << function << "ok\n";  int count = 0;  Uint32 last_inconsitant_gci = 0xEFFFFFF0;  while (r < records){    //printf("now waiting for event...\n");    int res = pNdb->pollEvents(1000); // wait for event or 1000 ms    if (res > 0) {      //printf("got data! %d\n", r);      int overrun;      while (pOp->next(&overrun) > 0) {	r++;	r += overrun;	count++;	Uint32 gci = pOp->getGCI();	Uint32 pk = recAttr[0]->u_32_value();        if (!pOp->isConsistent()) {	  if (last_inconsitant_gci != gci) {	    last_inconsitant_gci = gci;	    stats.n_inconsistent_gcis++;	  }	  g_warning << "A node failure has occured and events might be missing\n";		}	g_info << function << "GCI " << gci << ": " << count;	struct receivedEvent* recEvent;	switch (pOp->getEventType()) {	case NdbDictionary::Event::TE_INSERT:	  stats.n_inserts++;	  g_info << " INSERT: ";	  recEvent = recInsertEvent;	  break;	case NdbDictionary::Event::TE_DELETE:	  stats.n_deletes++;	  g_info << " DELETE: ";	  recEvent = recDeleteEvent;	  break;	case NdbDictionary::Event::TE_UPDATE:	  stats.n_updates++;	  g_info << " UPDATE: ";	  recEvent = recUpdateEvent;	  break;	case NdbDictionary::Event::TE_ALL:	  abort();	}	if ((int)pk < records) {	  recEvent[pk].pk = pk;	  recEvent[pk].count++;	}	g_info << "overrun " << overrun << " pk " << pk;	for (i = 1; i < noEventColumnName; i++) {	  if (recAttr[i]->isNULL() >= 0) { // we have a value	    g_info << " post[" << i << "]=";	    if (recAttr[i]->isNULL() == 0) // we have a non-null value	      g_info << recAttr[i]->u_32_value();	    else                           // we have a null value	      g_info << "NULL";	  }	  if (recAttrPre[i]->isNULL() >= 0) { // we have a value	    g_info << " pre[" << i << "]=";	    if (recAttrPre[i]->isNULL() == 0) // we have a non-null value	      g_info << recAttrPre[i]->u_32_value();	    else                              // we have a null value	      g_info << "NULL";	  }	}	g_info << endl;      }    } else      ;//printf("timed out\n");  }  //  sleep ((XXXXX-myXXXXX)*2);  g_info << myXXXXX << "dropping event operation" << endl;  int res = pNdb->dropEventOperation(pOp);  if (res != 0) {    g_err << "operation execution failed\n";    return NDBT_FAILED;  }  g_info << myXXXXX << " ok" << endl;  if (stats.n_inserts > 0) {    stats.n_consecutive++;  }  if (stats.n_deletes > 0) {    stats.n_consecutive++;  }  if (stats.n_updates > 0) {    stats.n_consecutive++;  }  for (i = 0; i < (Uint32)records/3; i++) {    if (recInsertEvent[i].pk != i) {      stats.n_consecutive ++;      ndbout << "missing insert pk " << i << endl;    } else if (recInsertEvent[i].count > 1) {      ndbout << "duplicates insert pk " << i	     << " count " << recInsertEvent[i].count << endl;      stats.n_duplicates += recInsertEvent[i].count-1;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -