📄 dblqhmain.cpp
字号:
jamEntry(); ReadNodesConf * const readNodes = (ReadNodesConf *)&signal->theData[0]; cnoOfNodes = readNodes->noOfNodes; unsigned ind = 0; unsigned i = 0; for (i = 1; i < MAX_NDB_NODES; i++) { jam(); if (NodeBitmask::get(readNodes->allNodes, i)) { jam(); cnodeData[ind] = i; cnodeStatus[ind] = NodeBitmask::get(readNodes->inactiveNodes, i); //readNodes->getVersionId(i, readNodes->theVersionIds) not used ind++; }//if }//for ndbrequire(ind == cnoOfNodes); ndbrequire(cnoOfNodes >= 1 && cnoOfNodes < MAX_NDB_NODES); ndbrequire(!(cnoOfNodes == 1 && cstartType == NodeState::ST_NODE_RESTART)); caddNodeState = ZFALSE; if (cstartType == NodeState::ST_SYSTEM_RESTART) { jam(); sendNdbSttorryLab(signal); return; }//if checkStartCompletedLab(signal); return;}//Dblqh::execREAD_NODESCONF()void Dblqh::checkStartCompletedLab(Signal* signal) { if (caddNodeState == ZFALSE) { if (cinitialStartOngoing == ZFALSE) { jam(); sendNdbSttorryLab(signal); return; }//if }//if return;}//Dblqh::checkStartCompletedLab()void Dblqh::startphase4Lab(Signal* signal) { sendNdbSttorryLab(signal); return;}//Dblqh::startphase4Lab()/* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ *//* SET CONCURRENCY OF LOCAL CHECKPOINTS TO BE USED AFTER SYSTEM RESTART. *//* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */void Dblqh::startphase6Lab(Signal* signal) { cstartPhase = ZNIL; cstartType = ZNIL; sendNdbSttorryLab(signal); return;}//Dblqh::startphase6Lab()void Dblqh::sendNdbSttorryLab(Signal* signal) { signal->theData[0] = cownref; sendSignal(NDBCNTR_REF, GSN_NDB_STTORRY, signal, 1, JBB); return;}//Dblqh::sendNdbSttorryLab()void Dblqh::sendsttorryLab(Signal* signal) {/* *********<< *//* STTORRY < *//* *********<< */ signal->theData[0] = csignalKey; /* SIGNAL KEY */ signal->theData[1] = 3; /* BLOCK CATEGORY */ signal->theData[2] = 2; /* SIGNAL VERSION NUMBER */ signal->theData[3] = ZSTART_PHASE1; signal->theData[4] = 255; sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 5, JBB); return;}//Dblqh::sendsttorryLab()/* ***************>> *//* READ_NODESREF > *//* ***************>> */void Dblqh::execREAD_NODESREF(Signal* signal) { jamEntry(); ndbrequire(false);}//Dblqh::execREAD_NODESREF()/* *************** *//* SIZEALT_REP > *//* *************** */void Dblqh::execREAD_CONFIG_REQ(Signal* signal) { const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr(); Uint32 ref = req->senderRef; Uint32 senderData = req->senderData; ndbrequire(req->noOfParameters == 0); jamEntry(); const ndb_mgm_configuration_iterator * p = theConfiguration.getOwnConfigIterator(); ndbrequire(p != 0); cnoLogFiles = 8; ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_NO_REDOLOG_FILES, &cnoLogFiles)); ndbrequire(cnoLogFiles > 0); ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_FRAG, &cfragrecFileSize)); ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_TABLE, &ctabrecFileSize)); ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_TC_CONNECT, &ctcConnectrecFileSize)); clogFileFileSize = 4 * cnoLogFiles; ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_SCAN, &cscanrecFileSize)); cmaxAccOps = cscanrecFileSize * MAX_PARALLEL_OP_PER_SCAN; ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_DISCLESS, &c_diskless)); initRecords(); initialiseRecordsLab(signal, 0, ref, senderData); return;}//Dblqh::execSIZEALT_REP()/* ########################################################################## *//* ####### ADD/DELETE FRAGMENT MODULE ####### *//* THIS MODULE IS USED BY DICTIONARY TO CREATE NEW FRAGMENTS AND DELETE *//* OLD FRAGMENTS. *//* *//* ########################################################################## *//* -------------------------------------------------------------- *//* FRAG REQ *//* -------------------------------------------------------------- *//* *********************************************************> *//* LQHFRAGREQ: Create new fragments for a table. Sender DICT *//* *********************************************************> */// this unbelievable mess could be replaced by one signal to LQH// and execute direct to local DICT to get everything at oncevoid Dblqh::execLQHFRAGREQ(Signal* signal) { jamEntry(); LqhFragReq * req = (LqhFragReq*)signal->getDataPtr(); Uint32 retPtr = req->senderData; BlockReference retRef = req->senderRef; Uint32 fragId = req->fragmentId; Uint32 reqinfo = req->requestInfo; tabptr.i = req->tableId; Uint16 tlocalKeylen = req->localKeyLength; Uint32 tmaxLoadFactor = req->maxLoadFactor; Uint32 tminLoadFactor = req->minLoadFactor; Uint8 tk = req->kValue; Uint8 tlhstar = req->lh3DistrBits; Uint8 tlh = req->lh3PageBits; Uint32 tnoOfAttr = req->noOfAttributes; Uint32 tnoOfNull = req->noOfNullAttributes; Uint32 noOfAlloc = req->noOfPagesToPreAllocate; Uint32 tschemaVersion = req->schemaVersion; Uint32 ttupKeyLength = req->keyLength; Uint32 nextLcp = req->nextLCP; Uint32 noOfKeyAttr = req->noOfKeyAttr; Uint32 noOfNewAttr = req->noOfNewAttr; Uint32 checksumIndicator = req->checksumIndicator; Uint32 noOfAttributeGroups = req->noOfAttributeGroups; Uint32 gcpIndicator = req->GCPIndicator; Uint32 startGci = req->startGci; Uint32 tableType = req->tableType; Uint32 primaryTableId = req->primaryTableId; ptrCheckGuard(tabptr, ctabrecFileSize, tablerec); bool tempTable = ((reqinfo & LqhFragReq::TemporaryTable) != 0); /* Temporary tables set to defined in system restart */ if (tabptr.p->tableStatus == Tablerec::NOT_DEFINED){ tabptr.p->tableStatus = Tablerec::ADD_TABLE_ONGOING; tabptr.p->tableType = tableType; tabptr.p->primaryTableId = primaryTableId; tabptr.p->schemaVersion = tschemaVersion; }//if if (tabptr.p->tableStatus != Tablerec::ADD_TABLE_ONGOING){ jam(); fragrefLab(signal, retRef, retPtr, ZTAB_STATE_ERROR); return; }//if //-------------------------------------------------------------------- // We could arrive here if we create the fragment as part of a take // over by a hot spare node. The table is then is already created // and bit 31 is set, thus indicating that we are creating a fragment // by copy creation. Also since the node has already been started we // know that it is not a node restart ongoing. //-------------------------------------------------------------------- if (getFragmentrec(signal, fragId)) { jam(); fragrefLab(signal, retRef, retPtr, terrorCode); return; }//if if (!insertFragrec(signal, fragId)) { jam(); fragrefLab(signal, retRef, retPtr, terrorCode); return; }//if Uint32 copyType = reqinfo & 3; initFragrec(signal, tabptr.i, fragId, copyType); fragptr.p->startGci = startGci; fragptr.p->newestGci = startGci; fragptr.p->tableType = tableType; if (DictTabInfo::isOrderedIndex(tableType)) { jam(); // find corresponding primary table fragment TablerecPtr tTablePtr; tTablePtr.i = primaryTableId; ptrCheckGuard(tTablePtr, ctabrecFileSize, tablerec); FragrecordPtr tFragPtr; tFragPtr.i = RNIL; for (Uint32 i = 0; i < MAX_FRAG_PER_NODE; i++) { if (tTablePtr.p->fragid[i] == fragptr.p->fragId) { jam(); tFragPtr.i = tTablePtr.p->fragrec[i]; break; } } ndbrequire(tFragPtr.i != RNIL); // store it fragptr.p->tableFragptr = tFragPtr.i; } else { fragptr.p->tableFragptr = fragptr.i; } if (tempTable) {//--------------------------------------------// reqinfo bit 3-4 = 2 means temporary table// without logging or checkpointing.//-------------------------------------------- jam(); fragptr.p->logFlag = Fragrecord::STATE_FALSE; fragptr.p->lcpFlag = Fragrecord::LCP_STATE_FALSE; }//if fragptr.p->nextLcp = nextLcp; //----------------------------------------------// For node restarts it is not necessarily zero //---------------------------------------------- if (cfirstfreeAddfragrec == RNIL) { jam(); deleteFragrec(fragId); fragrefLab(signal, retRef, retPtr, ZNO_ADD_FRAGREC); return; }//if seizeAddfragrec(signal); addfragptr.p->addFragid = fragId; addfragptr.p->fragmentPtr = fragptr.i; addfragptr.p->dictBlockref = retRef; addfragptr.p->dictConnectptr = retPtr; addfragptr.p->m_senderAttrPtr = RNIL; addfragptr.p->noOfAttr = tnoOfAttr; addfragptr.p->noOfNull = tnoOfNull; addfragptr.p->noOfAllocPages = noOfAlloc; addfragptr.p->tabId = tabptr.i; addfragptr.p->totalAttrReceived = 0; addfragptr.p->attrSentToTup = ZNIL;/* TO FIND PROGRAMMING ERRORS QUICKLY */ addfragptr.p->schemaVer = tschemaVersion; Uint32 tmp = (reqinfo & LqhFragReq::CreateInRunning); addfragptr.p->fragCopyCreation = (tmp == 0 ? 0 : 1); addfragptr.p->addfragErrorCode = 0; addfragptr.p->noOfKeyAttr = noOfKeyAttr; addfragptr.p->noOfNewAttr = noOfNewAttr; addfragptr.p->checksumIndicator = checksumIndicator; addfragptr.p->noOfAttributeGroups = noOfAttributeGroups; addfragptr.p->GCPIndicator = gcpIndicator; addfragptr.p->lh3DistrBits = tlhstar; addfragptr.p->tableType = tableType; addfragptr.p->primaryTableId = primaryTableId; // addfragptr.p->tup1Connectptr = RNIL; addfragptr.p->tup2Connectptr = RNIL; addfragptr.p->tux1Connectptr = RNIL; addfragptr.p->tux2Connectptr = RNIL; if (DictTabInfo::isTable(tableType) || DictTabInfo::isHashIndex(tableType)) { jam(); AccFragReq* const accreq = (AccFragReq*)signal->getDataPtrSend(); accreq->userPtr = addfragptr.i; accreq->userRef = cownref; accreq->tableId = tabptr.i; accreq->reqInfo = copyType << 4; accreq->fragId = fragId; accreq->localKeyLen = tlocalKeylen; accreq->maxLoadFactor = tmaxLoadFactor; accreq->minLoadFactor = tminLoadFactor; accreq->kValue = tk; accreq->lhFragBits = tlhstar; accreq->lhDirBits = tlh; accreq->keyLength = ttupKeyLength; /* ----------------------------------------------------------------------- */ /* Send ACCFRAGREQ, when confirmation is received send 2 * TUPFRAGREQ to */ /* create 2 tuple fragments on this node. */ /* ----------------------------------------------------------------------- */ addfragptr.p->addfragStatus = AddFragRecord::ACC_ADDFRAG; sendSignal(fragptr.p->accBlockref, GSN_ACCFRAGREQ, signal, AccFragReq::SignalLength, JBB); return; } if (DictTabInfo::isOrderedIndex(tableType)) { jam(); // NOTE: next 2 lines stolen from ACC addfragptr.p->fragid1 = (fragId << 1) | 0; addfragptr.p->fragid2 = (fragId << 1) | 1; addfragptr.p->addfragStatus = AddFragRecord::WAIT_TWO_TUP; sendAddFragReq(signal); return; } ndbrequire(false);}//Dblqh::execLQHFRAGREQ()/* *************** *//* ACCFRAGCONF > *//* *************** */void Dblqh::execACCFRAGCONF(Signal* signal) { jamEntry(); addfragptr.i = signal->theData[0]; Uint32 taccConnectptr = signal->theData[1]; Uint32 fragId1 = signal->theData[2]; Uint32 fragId2 = signal->theData[3]; Uint32 accFragPtr1 = signal->theData[4]; Uint32 accFragPtr2 = signal->theData[5]; ptrCheckGuard(addfragptr, caddfragrecFileSize, addFragRecord); ndbrequire(addfragptr.p->addfragStatus == AddFragRecord::ACC_ADDFRAG); addfragptr.p->accConnectptr = taccConnectptr; addfragptr.p->fragid1 = fragId1; addfragptr.p->fragid2 = fragId2; fragptr.i = addfragptr.p->fragmentPtr; ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord); fragptr.p->accFragptr[0] = accFragPtr1; fragptr.p->accFragptr[1] = accFragPtr2; addfragptr.p->addfragStatus = AddFragRecord::WAIT_TWO_TUP; sendAddFragReq(signal);}//Dblqh::execACCFRAGCONF()/* *************** *//* TUPFRAGCONF > *//* *************** */void Dblqh::execTUPFRAGCONF(Signal* signal) { jamEntry(); addfragptr.i = signal->theData[0]; Uint32 tupConnectptr = signal->theData[1]; Uint32 tupFragPtr = signal->theData[2]; /* TUP FRAGMENT POINTER */ Uint32 localFragId = signal->theData[3]; /* LOCAL FRAGMENT ID */ ptrCheckGuard(addfragptr, caddfragrecFileSize, addFragRecord); fragptr.i = addfragptr.p->fragmentPtr; ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord); if (localFragId == addfragptr.p->fragid1) { jam(); fragptr.p->tupFragptr[0] = tupFragPtr; } else if (localFragId == addfragptr.p->fragid2) { jam(); fragptr.p->tupFragptr[1] = tupFragPtr; } else { ndbrequire(false); return; }//if switch (addfragptr.p->addfragStatus) { case AddFragRecord::WAIT_TWO_TUP: jam(); fragptr.p->tupFragptr[0] = tupFragPtr; addfragptr.p->tup1Connectptr = tupConnectptr; addfragptr.p->addfragStatus = AddFragRecord::WAIT_ONE_TUP; sendAddFragReq(signal); break; case AddFragRecord::WAIT_ONE_TUP: jam(); fragptr.p->tupFragptr[1] = tupFragPtr; addfragptr.p->tup2Connectptr = tupConnectptr; if (DictTabInfo::isOrderedIndex(addfragptr.p->tableType)) { addfragptr.p->addfragStatus = AddFragRecord::WAIT_TWO_TUX; sendAddFragReq(signal); break; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -