📄 consumer_restore.cpp
字号:
if (j == 1) continue; ret = op->equal(i, dataPtr, length); } else { if (j == 0) continue; if (attr_data->null) ret = op->setValue(i, NULL, 0); else ret = op->setValue(i, dataPtr, length); } if (ret < 0) { ndbout_c("Column: %d type %d %d %d %d",i, attr_desc->m_column->getType(), size, arraySize, attr_data->size); break; } } if (ret < 0) break; } if (ret < 0) { if (errorHandler(cb)) continue; exitHandler(); } // Prepare transaction (the transaction is NOT yet sent to NDB) cb->connection->executeAsynchPrepare(NdbTransaction::Commit, &callback, cb); m_transactions++; return; } err << "Retried transaction " << cb->retries << " times.\nLast error" << m_ndb->getNdbError(cb->error_code) << endl << "...Unable to recover from errors. Exiting..." << endl; exitHandler();}void BackupRestore::cback(int result, restore_callback_t *cb){ m_transactions--; if (result < 0) { /** * Error. temporary or permanent? */ if (errorHandler(cb)) tuple_a(cb); // retry else { err << "Restore: Failed to restore data due to a unrecoverable error. Exiting..." << endl; exitHandler(); } } else { /** * OK! close transaction */ m_ndb->closeTransaction(cb->connection); cb->connection= 0; cb->next= m_free_callback; m_free_callback= cb; m_dataCount++; }}/** * returns true if is recoverable, * Error handling based on hugo * false if it is an error that generates an abort. */bool BackupRestore::errorHandler(restore_callback_t *cb) { NdbError error; if(cb->connection) { error= cb->connection->getNdbError(); m_ndb->closeTransaction(cb->connection); cb->connection= 0; } else { error= m_ndb->getNdbError(); } Uint32 sleepTime = 100 + cb->retries * 300; cb->retries++; cb->error_code = error.code; switch(error.status) { case NdbError::Success: return false; // ERROR! break; case NdbError::TemporaryError: err << "Temporary error: " << error << endl; NdbSleep_MilliSleep(sleepTime); return true; // RETRY break; case NdbError::UnknownResult: err << error << endl; return false; // ERROR! break; default: case NdbError::PermanentError: //ERROR err << error << endl; return false; break; } return false;}void BackupRestore::exitHandler() { release(); NDBT_ProgramExit(NDBT_FAILED); if (opt_core) abort(); else exit(NDBT_FAILED);}voidBackupRestore::tuple_free(){ if (!m_restore) return; // Poll all transactions while (m_transactions) { m_ndb->sendPollNdb(3000); }}voidBackupRestore::endOfTuples(){ tuple_free();}voidBackupRestore::logEntry(const LogEntry & tup){ if (!m_restore) return; NdbTransaction * trans = m_ndb->startTransaction(); if (trans == NULL) { // Deep shit, TODO: handle the error err << "Cannot start transaction" << endl; exitHandler(); } // if const NdbDictionary::Table * table = get_table(tup.m_table->m_dictTable); NdbOperation * op = trans->getNdbOperation(table); if (op == NULL) { err << "Cannot get operation: " << trans->getNdbError() << endl; exitHandler(); } // if int check = 0; switch(tup.m_type) { case LogEntry::LE_INSERT: check = op->insertTuple(); break; case LogEntry::LE_UPDATE: check = op->updateTuple(); break; case LogEntry::LE_DELETE: check = op->deleteTuple(); break; default: err << "Log entry has wrong operation type." << " Exiting..."; exitHandler(); } if (check != 0) { err << "Error defining op: " << trans->getNdbError() << endl; exitHandler(); } // if Bitmask<4096> keys; for (Uint32 i= 0; i < tup.size(); i++) { const AttributeS * attr = tup[i]; int size = attr->Desc->size; int arraySize = attr->Desc->arraySize; const char * dataPtr = attr->Data.string_value; if (tup.m_table->have_auto_inc(attr->Desc->attrId)) tup.m_table->update_max_auto_val(dataPtr,size); const Uint32 length = (size / 8) * arraySize; if (attr->Desc->m_column->getPrimaryKey()) { if(!keys.get(attr->Desc->attrId)) { keys.set(attr->Desc->attrId); check= op->equal(attr->Desc->attrId, dataPtr, length); } } else check= op->setValue(attr->Desc->attrId, dataPtr, length); if (check != 0) { err << "Error defining op: " << trans->getNdbError() << endl; exitHandler(); } // if } const int ret = trans->execute(NdbTransaction::Commit); if (ret != 0) { // Both insert update and delete can fail during log running // and it's ok // TODO: check that the error is either tuple exists or tuple does not exist? bool ok= false; NdbError errobj= trans->getNdbError(); switch(tup.m_type) { case LogEntry::LE_INSERT: if(errobj.status == NdbError::PermanentError && errobj.classification == NdbError::ConstraintViolation) ok= true; break; case LogEntry::LE_UPDATE: case LogEntry::LE_DELETE: if(errobj.status == NdbError::PermanentError && errobj.classification == NdbError::NoDataFound) ok= true; break; } if (!ok) { err << "execute failed: " << errobj << endl; exitHandler(); } } m_ndb->closeTransaction(trans); m_logCount++;}voidBackupRestore::endOfLogEntrys(){ if (!m_restore) return; info << "Restored " << m_dataCount << " tuples and " << m_logCount << " log entries" << endl;}/* * callback : This is called when the transaction is polled * * (This function must have three arguments: * - The result of the transaction, * - The NdbTransaction object, and * - A pointer to an arbitrary object.) */static voidcallback(int result, NdbTransaction* trans, void* aObject){ restore_callback_t *cb = (restore_callback_t *)aObject; (cb->restore)->cback(result, cb);}#if 0 // old tuple implvoidBackupRestore::tuple(const TupleS & tup){ if (!m_restore) return; while (1) { NdbTransaction * trans = m_ndb->startTransaction(); if (trans == NULL) { // Deep shit, TODO: handle the error ndbout << "Cannot start transaction" << endl; exitHandler(); } // if const TableS * table = tup.getTable(); NdbOperation * op = trans->getNdbOperation(table->getTableName()); if (op == NULL) { ndbout << "Cannot get operation: "; ndbout << trans->getNdbError() << endl; exitHandler(); } // if // TODO: check return value and handle error if (op->writeTuple() == -1) { ndbout << "writeTuple call failed: "; ndbout << trans->getNdbError() << endl; exitHandler(); } // if for (int i = 0; i < tup.getNoOfAttributes(); i++) { const AttributeS * attr = tup[i]; int size = attr->Desc->size; int arraySize = attr->Desc->arraySize; const char * dataPtr = attr->Data.string_value; const Uint32 length = (size * arraySize) / 8; if (attr->Desc->m_column->getPrimaryKey()) op->equal(i, dataPtr, length); } for (int i = 0; i < tup.getNoOfAttributes(); i++) { const AttributeS * attr = tup[i]; int size = attr->Desc->size; int arraySize = attr->Desc->arraySize; const char * dataPtr = attr->Data.string_value; const Uint32 length = (size * arraySize) / 8; if (!attr->Desc->m_column->getPrimaryKey()) if (attr->Data.null) op->setValue(i, NULL, 0); else op->setValue(i, dataPtr, length); } int ret = trans->execute(NdbTransaction::Commit); if (ret != 0) { ndbout << "execute failed: "; ndbout << trans->getNdbError() << endl; exitHandler(); } m_ndb->closeTransaction(trans); if (ret == 0) break; } m_dataCount++;}#endiftemplate class Vector<NdbDictionary::Table*>;template class Vector<const NdbDictionary::Table*>;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -