📄 backup.cpp
字号:
ndbrequire(ptr.p->pages.empty()); ndbrequire(ptr.p->tables.isEmpty()); ptr.p->m_gsn = 0; ptr.p->errorCode = 0; ptr.p->clientRef = senderRef; ptr.p->clientData = senderData; ptr.p->flags = flags; ptr.p->masterRef = reference(); ptr.p->nodes = c_aliveNodes; ptr.p->backupId = 0; ptr.p->backupKey[0] = 0; ptr.p->backupKey[1] = 0; ptr.p->backupDataLen = 0; ptr.p->masterData.errorCode = 0; ptr.p->masterData.dropTrig.tableId = RNIL; ptr.p->masterData.alterTrig.tableId = RNIL; UtilSequenceReq * utilReq = (UtilSequenceReq*)signal->getDataPtrSend(); ptr.p->masterData.gsn = GSN_UTIL_SEQUENCE_REQ; utilReq->senderData = ptr.i; utilReq->sequenceId = BACKUP_SEQUENCE; utilReq->requestType = UtilSequenceReq::NextVal; sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ, signal, UtilSequenceReq::SignalLength, JBB);}voidBackup::execUTIL_SEQUENCE_REF(Signal* signal){ BackupRecordPtr ptr; jamEntry(); UtilSequenceRef * utilRef = (UtilSequenceRef*)signal->getDataPtr(); ptr.i = utilRef->senderData; c_backupPool.getPtr(ptr); ndbrequire(ptr.p->masterData.gsn == GSN_UTIL_SEQUENCE_REQ); sendBackupRef(signal, ptr, BackupRef::SequenceFailure);}//execUTIL_SEQUENCE_REF()voidBackup::sendBackupRef(Signal* signal, BackupRecordPtr ptr, Uint32 errorCode){ jam(); sendBackupRef(ptr.p->clientRef, ptr.p->flags, signal, ptr.p->clientData, errorCode); cleanup(signal, ptr);}voidBackup::sendBackupRef(BlockReference senderRef, Uint32 flags, Signal *signal, Uint32 senderData, Uint32 errorCode){ jam(); if (SEND_BACKUP_STARTED_FLAG(flags)) { BackupRef* ref = (BackupRef*)signal->getDataPtrSend(); ref->senderData = senderData; ref->errorCode = errorCode; ref->masterRef = numberToRef(BACKUP, getMasterNodeId()); sendSignal(senderRef, GSN_BACKUP_REF, signal, BackupRef::SignalLength, JBB); } if(errorCode != BackupRef::IAmNotMaster){ signal->theData[0] = NDB_LE_BackupFailedToStart; signal->theData[1] = senderRef; signal->theData[2] = errorCode; sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 3, JBB); }}voidBackup::execUTIL_SEQUENCE_CONF(Signal* signal){ jamEntry(); UtilSequenceConf * conf = (UtilSequenceConf*)signal->getDataPtr(); if(conf->requestType == UtilSequenceReq::Create) { jam(); sendSTTORRY(signal); // At startup in NDB return; } BackupRecordPtr ptr; ptr.i = conf->senderData; c_backupPool.getPtr(ptr); ndbrequire(ptr.p->masterData.gsn == GSN_UTIL_SEQUENCE_REQ); if (ptr.p->checkError()) { jam(); sendBackupRef(signal, ptr, ptr.p->errorCode); return; }//if if (ERROR_INSERTED(10023)) { sendBackupRef(signal, ptr, 323); return; }//if { Uint64 backupId; memcpy(&backupId,conf->sequenceValue,8); ptr.p->backupId= (Uint32)backupId; } ptr.p->backupKey[0] = (getOwnNodeId() << 16) | (ptr.p->backupId & 0xFFFF); ptr.p->backupKey[1] = NdbTick_CurrentMillisecond(); ptr.p->masterData.gsn = GSN_UTIL_LOCK_REQ; Mutex mutex(signal, c_mutexMgr, ptr.p->masterData.m_defineBackupMutex); Callback c = { safe_cast(&Backup::defineBackupMutex_locked), ptr.i }; ndbrequire(mutex.lock(c)); return;}voidBackup::defineBackupMutex_locked(Signal* signal, Uint32 ptrI, Uint32 retVal){ jamEntry(); ndbrequire(retVal == 0); BackupRecordPtr ptr; ptr.i = ptrI; c_backupPool.getPtr(ptr); ndbrequire(ptr.p->masterData.gsn == GSN_UTIL_LOCK_REQ); ptr.p->masterData.gsn = GSN_UTIL_LOCK_REQ; Mutex mutex(signal, c_mutexMgr, ptr.p->masterData.m_dictCommitTableMutex); Callback c = { safe_cast(&Backup::dictCommitTableMutex_locked), ptr.i }; ndbrequire(mutex.lock(c));}voidBackup::dictCommitTableMutex_locked(Signal* signal, Uint32 ptrI,Uint32 retVal){ jamEntry(); ndbrequire(retVal == 0); /** * We now have both the mutexes */ BackupRecordPtr ptr; ptr.i = ptrI; c_backupPool.getPtr(ptr); ndbrequire(ptr.p->masterData.gsn == GSN_UTIL_LOCK_REQ); if (ERROR_INSERTED(10031)) { ptr.p->setErrorCode(331); }//if if (ptr.p->checkError()) { jam(); /** * Unlock mutexes */ jam(); Mutex mutex1(signal, c_mutexMgr, ptr.p->masterData.m_dictCommitTableMutex); jam(); mutex1.unlock(); // ignore response jam(); Mutex mutex2(signal, c_mutexMgr, ptr.p->masterData.m_defineBackupMutex); jam(); mutex2.unlock(); // ignore response sendBackupRef(signal, ptr, ptr.p->errorCode); return; }//if sendDefineBackupReq(signal, ptr);}/***************************************************************************** * * Master functionallity - Define backup cont'd (from now on all slaves are in) * *****************************************************************************/boolBackup::haveAllSignals(BackupRecordPtr ptr, Uint32 gsn, Uint32 nodeId){ ndbrequire(ptr.p->masterRef == reference()); ndbrequire(ptr.p->masterData.gsn == gsn); ndbrequire(!ptr.p->masterData.sendCounter.done()); ndbrequire(ptr.p->masterData.sendCounter.isWaitingFor(nodeId)); ptr.p->masterData.sendCounter.clearWaitingFor(nodeId); return ptr.p->masterData.sendCounter.done();}voidBackup::sendDefineBackupReq(Signal *signal, BackupRecordPtr ptr){ /** * Sending define backup to all participants */ DefineBackupReq * req = (DefineBackupReq*)signal->getDataPtrSend(); req->backupId = ptr.p->backupId; req->clientRef = ptr.p->clientRef; req->clientData = ptr.p->clientData; req->senderRef = reference(); req->backupPtr = ptr.i; req->backupKey[0] = ptr.p->backupKey[0]; req->backupKey[1] = ptr.p->backupKey[1]; req->nodes = ptr.p->nodes; req->backupDataLen = ptr.p->backupDataLen; req->flags = ptr.p->flags; ptr.p->masterData.gsn = GSN_DEFINE_BACKUP_REQ; ptr.p->masterData.sendCounter = ptr.p->nodes; NodeReceiverGroup rg(BACKUP, ptr.p->nodes); sendSignal(rg, GSN_DEFINE_BACKUP_REQ, signal, DefineBackupReq::SignalLength, JBB); /** * Now send backup data */ const Uint32 len = ptr.p->backupDataLen; if(len == 0){ /** * No data to send */ jam(); return; }//if /** * Not implemented */ ndbrequire(0);}voidBackup::execDEFINE_BACKUP_REF(Signal* signal){ jamEntry(); DefineBackupRef* ref = (DefineBackupRef*)signal->getDataPtr(); const Uint32 ptrI = ref->backupPtr; //const Uint32 backupId = ref->backupId; const Uint32 nodeId = ref->nodeId; BackupRecordPtr ptr; c_backupPool.getPtr(ptr, ptrI); ptr.p->setErrorCode(ref->errorCode); defineBackupReply(signal, ptr, nodeId);}voidBackup::execDEFINE_BACKUP_CONF(Signal* signal){ jamEntry(); DefineBackupConf* conf = (DefineBackupConf*)signal->getDataPtr(); const Uint32 ptrI = conf->backupPtr; //const Uint32 backupId = conf->backupId; const Uint32 nodeId = refToNode(signal->senderBlockRef()); BackupRecordPtr ptr; c_backupPool.getPtr(ptr, ptrI); if (ERROR_INSERTED(10024)) { ptr.p->setErrorCode(324); } defineBackupReply(signal, ptr, nodeId);}voidBackup::defineBackupReply(Signal* signal, BackupRecordPtr ptr, Uint32 nodeId){ if (!haveAllSignals(ptr, GSN_DEFINE_BACKUP_REQ, nodeId)) { jam(); return; } /** * Unlock mutexes */ jam(); Mutex mutex1(signal, c_mutexMgr, ptr.p->masterData.m_dictCommitTableMutex); jam(); mutex1.unlock(); // ignore response jam(); Mutex mutex2(signal, c_mutexMgr, ptr.p->masterData.m_defineBackupMutex); jam(); mutex2.unlock(); // ignore response if(ptr.p->checkError()) { jam(); masterAbort(signal, ptr); return; } /** * Reply to client */ CRASH_INSERTION((10034)); if (SEND_BACKUP_STARTED_FLAG(ptr.p->flags)) { BackupConf * conf = (BackupConf*)signal->getDataPtrSend(); conf->backupId = ptr.p->backupId; conf->senderData = ptr.p->clientData; conf->nodes = ptr.p->nodes; sendSignal(ptr.p->clientRef, GSN_BACKUP_CONF, signal, BackupConf::SignalLength, JBB); } signal->theData[0] = NDB_LE_BackupStarted; signal->theData[1] = ptr.p->clientRef; signal->theData[2] = ptr.p->backupId; ptr.p->nodes.copyto(NdbNodeBitmask::Size, signal->theData+3); sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 3+NdbNodeBitmask::Size, JBB); /** * Prepare Trig */ TablePtr tabPtr; ndbrequire(ptr.p->tables.first(tabPtr)); sendCreateTrig(signal, ptr, tabPtr);}/***************************************************************************** * * Master functionallity - Prepare triggers * *****************************************************************************/voidBackup::createAttributeMask(TablePtr tabPtr, Bitmask<MAXNROFATTRIBUTESINWORDS> & mask){ mask.clear(); Table & table = * tabPtr.p; for(Uint32 i = 0; i<table.noOfAttributes; i++) { jam(); AttributePtr attr; table.attributes.getPtr(attr, i); mask.set(i); }}voidBackup::sendCreateTrig(Signal* signal, BackupRecordPtr ptr, TablePtr tabPtr){ CreateTrigReq * req =(CreateTrigReq *)signal->getDataPtrSend(); ptr.p->masterData.gsn = GSN_CREATE_TRIG_REQ; ptr.p->masterData.sendCounter = 3; ptr.p->masterData.createTrig.tableId = tabPtr.p->tableId; req->setUserRef(reference()); req->setConnectionPtr(ptr.i); req->setRequestType(CreateTrigReq::RT_USER); Bitmask<MAXNROFATTRIBUTESINWORDS> attrMask; createAttributeMask(tabPtr, attrMask); req->setAttributeMask(attrMask); req->setTableId(tabPtr.p->tableId); req->setIndexId(RNIL); // not used req->setTriggerId(RNIL); // to be created req->setTriggerType(TriggerType::SUBSCRIPTION); req->setTriggerActionTime(TriggerActionTime::TA_DETACHED); req->setMonitorReplicas(true); req->setMonitorAllAttributes(false); req->setOnline(false); // leave trigger offline char triggerName[MAX_TAB_NAME_SIZE]; Uint32 nameBuffer[2 + ((MAX_TAB_NAME_SIZE + 3) >> 2)]; // SP string LinearWriter w(nameBuffer, sizeof(nameBuffer) >> 2); LinearSectionPtr lsPtr[3]; for (int i=0; i < 3; i++) { req->setTriggerEvent(triggerEventValues[i]); BaseString::snprintf(triggerName, sizeof(triggerName), triggerNameFormat[i], ptr.p->backupId, tabPtr.p->tableId); w.reset(); w.add(CreateTrigReq::TriggerNameKey, triggerName); lsPtr[0].p = nameBuffer; lsPtr[0].sz = w.getWordsUsed(); sendSignal(DBDICT_REF, GSN_CREATE_TRIG_REQ, signal, CreateTrigReq::SignalLength, JBB, lsPtr, 1); }}voidBackup::execCREATE_TRIG_CONF(Signal* signal){ jamEntry(); CreateTrigConf * conf = (CreateTrigConf*)signal->getDataPtr(); const Uint32 ptrI = conf->getConnectionPtr(); const Uint32 tableId = conf->getTableId(); const TriggerEvent::Value type = conf->getTriggerEvent(); const Uint32 triggerId = conf->getTriggerId(); BackupRecordPtr ptr; c_backupPool.getPtr(ptr, ptrI); /** * Verify that I'm waiting for this conf */ ndbrequire(ptr.p->masterRef == reference()); ndbrequire(ptr.p->masterData.gsn == GSN_CREATE_TRIG_REQ); ndbrequire(ptr.p->masterData.sendCounter.done() == false); ndbrequire(ptr.p->masterData.createTrig.tableId == tableId); TablePtr tabPtr; ndbrequire(findTable(ptr, tabPtr, tableId)); ndbrequire(type < 3); // if some decides to change the enums ndbrequire(tabPtr.p->triggerIds[type] == ILLEGAL_TRIGGER_ID); tabPtr.p->triggerIds[type] = triggerId; createTrigReply(signal, ptr);}voidBackup::execCREATE_TRIG_REF(Signal* signal){ CreateTrigRef* ref = (CreateTrigRef*)signal->getDataPtr(); const Uint32 ptrI = ref->getConnectionPtr(); const Uint32 tableId = ref->getTableId(); BackupRecordPtr ptr; c_backupPool.getPtr(ptr, ptrI); /** * Verify that I'm waiting for this ref */ ndbrequire(ptr.p->masterRef == reference()); ndbrequire(ptr.p->masterData.gsn == GSN_CREATE_TRIG_REQ); ndbrequire(ptr.p->masterData.sendCounter.done() == false); ndbrequire(ptr.p->masterData.createTrig.tableId == tableId); ptr.p->setErrorCode(ref->getErrorCode()); createTrigReply(signal, ptr);}voidBackup::createTrigReply(Signal* signal, BackupRecordPtr ptr){ CRASH_INSERTION(10003); /** * Check finished with table */ ptr.p->masterData.sendCounter--; if(ptr.p->masterData.sendCounter.done() == false){ jam(); return; }//if if (ERROR_INSERTED(10025)) { ptr.p->errorCode = 325; } if(ptr.p->checkError()) { jam(); masterAbort(signal, ptr); return; }//if TablePtr tabPtr; ndbrequire(findTable(ptr, tabPtr, ptr.p->masterData.createTrig.tableId)); /** * Next table */ ptr.p->tables.next(tabPtr); if(tabPtr.i != RNIL){ jam(); sendCreateTrig(signal, ptr, tabPtr); return; }//if /** * Finished with all tables, send StartBackupReq */ ptr.p->tables.first(tabPtr);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -