📄 backup.cpp
字号:
Backup::execINCL_NODEREQ(Signal* signal){ jamEntry(); const Uint32 senderRef = signal->theData[0]; const Uint32 inclNode = signal->theData[1]; NodePtr node; for(c_nodes.first(node); node.i != RNIL; c_nodes.next(node)) { jam(); const Uint32 nodeId = node.p->nodeId; if(inclNode == nodeId){ jam(); ndbrequire(node.p->alive == 0); ndbrequire(!c_aliveNodes.get(nodeId)); node.p->alive = 1; c_aliveNodes.set(nodeId); break; }//if }//for signal->theData[0] = reference(); sendSignal(senderRef, GSN_INCL_NODECONF, signal, 1, JBB);}/***************************************************************************** * * Master functionallity - Define backup * *****************************************************************************/voidBackup::execBACKUP_REQ(Signal* signal){ jamEntry(); BackupReq * req = (BackupReq*)signal->getDataPtr(); const Uint32 senderData = req->senderData; const BlockReference senderRef = signal->senderBlockRef(); const Uint32 dataLen32 = req->backupDataLen; // In 32 bit words const Uint32 flags = signal->getLength() > 2 ? req->flags : 2; if(getOwnNodeId() != getMasterNodeId()) { jam(); sendBackupRef(senderRef, flags, signal, senderData, BackupRef::IAmNotMaster); return; }//if if (m_diskless) { sendBackupRef(senderRef, flags, signal, senderData, BackupRef::CannotBackupDiskless); return; } if(dataLen32 != 0) { jam(); sendBackupRef(senderRef, flags, signal, senderData, BackupRef::BackupDefinitionNotImplemented); return; }//if #ifdef DEBUG_ABORT dumpUsedResources();#endif /** * Seize a backup record */ BackupRecordPtr ptr; c_backups.seize(ptr); if(ptr.i == RNIL) { jam(); sendBackupRef(senderRef, flags, signal, senderData, BackupRef::OutOfBackupRecord); return; }//if ndbrequire(ptr.p->pages.empty()); ndbrequire(ptr.p->tables.isEmpty()); ptr.p->m_gsn = 0; ptr.p->errorCode = 0; ptr.p->clientRef = senderRef; ptr.p->clientData = senderData; ptr.p->flags = flags; ptr.p->masterRef = reference(); ptr.p->nodes = c_aliveNodes; ptr.p->backupId = 0; ptr.p->backupKey[0] = 0; ptr.p->backupKey[1] = 0; ptr.p->backupDataLen = 0; ptr.p->masterData.errorCode = 0; ptr.p->masterData.dropTrig.tableId = RNIL; ptr.p->masterData.alterTrig.tableId = RNIL; UtilSequenceReq * utilReq = (UtilSequenceReq*)signal->getDataPtrSend(); ptr.p->masterData.gsn = GSN_UTIL_SEQUENCE_REQ; utilReq->senderData = ptr.i; utilReq->sequenceId = BACKUP_SEQUENCE; utilReq->requestType = UtilSequenceReq::NextVal; sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ, signal, UtilSequenceReq::SignalLength, JBB);}voidBackup::execUTIL_SEQUENCE_REF(Signal* signal){ BackupRecordPtr ptr; jamEntry(); UtilSequenceRef * utilRef = (UtilSequenceRef*)signal->getDataPtr(); ptr.i = utilRef->senderData; c_backupPool.getPtr(ptr); ndbrequire(ptr.p->masterData.gsn == GSN_UTIL_SEQUENCE_REQ); sendBackupRef(signal, ptr, BackupRef::SequenceFailure);}//execUTIL_SEQUENCE_REF()voidBackup::sendBackupRef(Signal* signal, BackupRecordPtr ptr, Uint32 errorCode){ jam(); sendBackupRef(ptr.p->clientRef, ptr.p->flags, signal, ptr.p->clientData, errorCode); cleanup(signal, ptr);}voidBackup::sendBackupRef(BlockReference senderRef, Uint32 flags, Signal *signal, Uint32 senderData, Uint32 errorCode){ jam(); if (SEND_BACKUP_STARTED_FLAG(flags)) { BackupRef* ref = (BackupRef*)signal->getDataPtrSend(); ref->senderData = senderData; ref->errorCode = errorCode; ref->masterRef = numberToRef(BACKUP, getMasterNodeId()); sendSignal(senderRef, GSN_BACKUP_REF, signal, BackupRef::SignalLength, JBB); } if(errorCode != BackupRef::IAmNotMaster){ signal->theData[0] = NDB_LE_BackupFailedToStart; signal->theData[1] = senderRef; signal->theData[2] = errorCode; sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 3, JBB); }}voidBackup::execUTIL_SEQUENCE_CONF(Signal* signal){ jamEntry(); UtilSequenceConf * conf = (UtilSequenceConf*)signal->getDataPtr(); if(conf->requestType == UtilSequenceReq::Create) { jam(); sendSTTORRY(signal); // At startup in NDB return; } BackupRecordPtr ptr; ptr.i = conf->senderData; c_backupPool.getPtr(ptr); ndbrequire(ptr.p->masterData.gsn == GSN_UTIL_SEQUENCE_REQ); if (ptr.p->checkError()) { jam(); sendBackupRef(signal, ptr, ptr.p->errorCode); return; }//if if (ERROR_INSERTED(10023)) { sendBackupRef(signal, ptr, 323); return; }//if { Uint64 backupId; memcpy(&backupId,conf->sequenceValue,8); ptr.p->backupId= (Uint32)backupId; } ptr.p->backupKey[0] = (getOwnNodeId() << 16) | (ptr.p->backupId & 0xFFFF); ptr.p->backupKey[1] = NdbTick_CurrentMillisecond(); ptr.p->masterData.gsn = GSN_UTIL_LOCK_REQ; Mutex mutex(signal, c_mutexMgr, ptr.p->masterData.m_defineBackupMutex); Callback c = { safe_cast(&Backup::defineBackupMutex_locked), ptr.i }; ndbrequire(mutex.lock(c)); return;}voidBackup::defineBackupMutex_locked(Signal* signal, Uint32 ptrI, Uint32 retVal){ jamEntry(); ndbrequire(retVal == 0); BackupRecordPtr ptr; ptr.i = ptrI; c_backupPool.getPtr(ptr); ndbrequire(ptr.p->masterData.gsn == GSN_UTIL_LOCK_REQ); ptr.p->masterData.gsn = GSN_UTIL_LOCK_REQ; Mutex mutex(signal, c_mutexMgr, ptr.p->masterData.m_dictCommitTableMutex); Callback c = { safe_cast(&Backup::dictCommitTableMutex_locked), ptr.i }; ndbrequire(mutex.lock(c));}voidBackup::dictCommitTableMutex_locked(Signal* signal, Uint32 ptrI,Uint32 retVal){ jamEntry(); ndbrequire(retVal == 0); /** * We now have both the mutexes */ BackupRecordPtr ptr; ptr.i = ptrI; c_backupPool.getPtr(ptr); ndbrequire(ptr.p->masterData.gsn == GSN_UTIL_LOCK_REQ); if (ERROR_INSERTED(10031)) { ptr.p->setErrorCode(331); }//if if (ptr.p->checkError()) { jam(); /** * Unlock mutexes */ jam(); Mutex mutex1(signal, c_mutexMgr, ptr.p->masterData.m_dictCommitTableMutex); jam(); mutex1.unlock(); // ignore response jam(); Mutex mutex2(signal, c_mutexMgr, ptr.p->masterData.m_defineBackupMutex); jam(); mutex2.unlock(); // ignore response sendBackupRef(signal, ptr, ptr.p->errorCode); return; }//if sendDefineBackupReq(signal, ptr);}/***************************************************************************** * * Master functionallity - Define backup cont'd (from now on all slaves are in) * *****************************************************************************/boolBackup::haveAllSignals(BackupRecordPtr ptr, Uint32 gsn, Uint32 nodeId){ ndbrequire(ptr.p->masterRef == reference()); ndbrequire(ptr.p->masterData.gsn == gsn); ndbrequire(!ptr.p->masterData.sendCounter.done()); ndbrequire(ptr.p->masterData.sendCounter.isWaitingFor(nodeId)); ptr.p->masterData.sendCounter.clearWaitingFor(nodeId); return ptr.p->masterData.sendCounter.done();}voidBackup::sendDefineBackupReq(Signal *signal, BackupRecordPtr ptr){ /** * Sending define backup to all participants */ DefineBackupReq * req = (DefineBackupReq*)signal->getDataPtrSend(); req->backupId = ptr.p->backupId; req->clientRef = ptr.p->clientRef; req->clientData = ptr.p->clientData; req->senderRef = reference(); req->backupPtr = ptr.i; req->backupKey[0] = ptr.p->backupKey[0]; req->backupKey[1] = ptr.p->backupKey[1]; req->nodes = ptr.p->nodes; req->backupDataLen = ptr.p->backupDataLen; req->flags = ptr.p->flags; ptr.p->masterData.gsn = GSN_DEFINE_BACKUP_REQ; ptr.p->masterData.sendCounter = ptr.p->nodes; NodeReceiverGroup rg(BACKUP, ptr.p->nodes); sendSignal(rg, GSN_DEFINE_BACKUP_REQ, signal, DefineBackupReq::SignalLength, JBB); /** * Now send backup data */ const Uint32 len = ptr.p->backupDataLen; if(len == 0){ /** * No data to send */ jam(); return; }//if /** * Not implemented */ ndbrequire(0);}voidBackup::execDEFINE_BACKUP_REF(Signal* signal){ jamEntry(); DefineBackupRef* ref = (DefineBackupRef*)signal->getDataPtr(); const Uint32 ptrI = ref->backupPtr; //const Uint32 backupId = ref->backupId; const Uint32 nodeId = ref->nodeId; BackupRecordPtr ptr; c_backupPool.getPtr(ptr, ptrI); ptr.p->setErrorCode(ref->errorCode); defineBackupReply(signal, ptr, nodeId);}voidBackup::execDEFINE_BACKUP_CONF(Signal* signal){ jamEntry(); DefineBackupConf* conf = (DefineBackupConf*)signal->getDataPtr(); const Uint32 ptrI = conf->backupPtr; //const Uint32 backupId = conf->backupId; const Uint32 nodeId = refToNode(signal->senderBlockRef()); BackupRecordPtr ptr; c_backupPool.getPtr(ptr, ptrI); if (ERROR_INSERTED(10024)) { ptr.p->setErrorCode(324); } defineBackupReply(signal, ptr, nodeId);}voidBackup::defineBackupReply(Signal* signal, BackupRecordPtr ptr, Uint32 nodeId){ if (!haveAllSignals(ptr, GSN_DEFINE_BACKUP_REQ, nodeId)) { jam(); return; } /** * Unlock mutexes */ jam(); Mutex mutex1(signal, c_mutexMgr, ptr.p->masterData.m_dictCommitTableMutex); jam(); mutex1.unlock(); // ignore response jam(); Mutex mutex2(signal, c_mutexMgr, ptr.p->masterData.m_defineBackupMutex); jam(); mutex2.unlock(); // ignore response if(ptr.p->checkError()) { jam(); masterAbort(signal, ptr); return; } /** * Reply to client */ CRASH_INSERTION((10034)); if (SEND_BACKUP_STARTED_FLAG(ptr.p->flags)) { BackupConf * conf = (BackupConf*)signal->getDataPtrSend(); conf->backupId = ptr.p->backupId; conf->senderData = ptr.p->clientData; conf->nodes = ptr.p->nodes; sendSignal(ptr.p->clientRef, GSN_BACKUP_CONF, signal, BackupConf::SignalLength, JBB); } signal->theData[0] = NDB_LE_BackupStarted; signal->theData[1] = ptr.p->clientRef; signal->theData[2] = ptr.p->backupId; ptr.p->nodes.copyto(NdbNodeBitmask::Size, signal->theData+3); sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 3+NdbNodeBitmask::Size, JBB); /** * Prepare Trig */ TablePtr tabPtr; ndbrequire(ptr.p->tables.first(tabPtr)); sendCreateTrig(signal, ptr, tabPtr);}/***************************************************************************** * * Master functionallity - Prepare triggers * *****************************************************************************/voidBackup::createAttributeMask(TablePtr tabPtr, Bitmask<MAXNROFATTRIBUTESINWORDS> & mask){ mask.clear(); Table & table = * tabPtr.p; for(Uint32 i = 0; i<table.noOfAttributes; i++) { jam(); AttributePtr attr; table.attributes.getPtr(attr, i); mask.set(i); }}voidBackup::sendCreateTrig(Signal* signal, BackupRecordPtr ptr, TablePtr tabPtr){ CreateTrigReq * req =(CreateTrigReq *)signal->getDataPtrSend(); ptr.p->masterData.gsn = GSN_CREATE_TRIG_REQ; ptr.p->masterData.sendCounter = 3; ptr.p->masterData.createTrig.tableId = tabPtr.p->tableId; req->setUserRef(reference()); req->setConnectionPtr(ptr.i); req->setRequestType(CreateTrigReq::RT_USER); Bitmask<MAXNROFATTRIBUTESINWORDS> attrMask; createAttributeMask(tabPtr, attrMask); req->setAttributeMask(attrMask); req->setTableId(tabPtr.p->tableId); req->setIndexId(RNIL); // not used req->setTriggerId(RNIL); // to be created req->setTriggerType(TriggerType::SUBSCRIPTION); req->setTriggerActionTime(TriggerActionTime::TA_DETACHED); req->setMonitorReplicas(true); req->setMonitorAllAttributes(false); req->setOnline(false); // leave trigger offline char triggerName[MAX_TAB_NAME_SIZE]; Uint32 nameBuffer[2 + ((MAX_TAB_NAME_SIZE + 3) >> 2)]; // SP string LinearWriter w(nameBuffer, sizeof(nameBuffer) >> 2); LinearSectionPtr lsPtr[3]; for (int i=0; i < 3; i++) { req->setTriggerEvent(triggerEventValues[i]); BaseString::snprintf(triggerName, sizeof(triggerName), triggerNameFormat[i], ptr.p->backupId, tabPtr.p->tableId); w.reset(); w.add(CreateTrigReq::TriggerNameKey, triggerName); lsPtr[0].p = nameBuffer; lsPtr[0].sz = w.getWordsUsed(); sendSignal(DBDICT_REF, GSN_CREATE_TRIG_REQ, signal, CreateTrigReq::SignalLength, JBB, lsPtr, 1); }}voidBackup::execCREATE_TRIG_CONF(Signal* signal){ jamEntry(); CreateTrigConf * conf = (CreateTrigConf*)signal->getDataPtr(); const Uint32 ptrI = conf->getConnectionPtr(); const Uint32 tableId = conf->getTableId(); const TriggerEvent::Value type = conf->getTriggerEvent(); const Uint32 triggerId = conf->getTriggerId(); BackupRecordPtr ptr;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -