📄 dbtupexecquery.cpp
字号:
if (isUndoLoggingBlocked(regFragPtr)) { TUPKEY_abort(signal, 38); return; }//if/* ---------------------------------------------------------------------- */// WE SET THE CURRENT ACTIVE OPERATION IN THE TUPLE TO POINT TO OUR//OPERATION RECORD. IF SEVERAL OPERATIONS WORK ON THIS TUPLE THEY ARE// LINKED TO OUR OPERATION RECORD. DIRTY READS CAN ACCESS THE COPY// TUPLE THROUGH OUR OPERATION RECORD./* ---------------------------------------------------------------------- */ if (Roptype == ZINSERT) { jam(); if (handleInsertReq(signal, regOperPtr, regFragPtr, regTabPtr, pagePtr.p) == -1) { return; }//if if (!regTabPtr->tuxCustomTriggers.isEmpty()) { jam(); if (executeTuxInsertTriggers(signal, regOperPtr, regTabPtr) != 0) { jam(); tupkeyErrorLab(signal); return; } } checkImmediateTriggersAfterInsert(signal, regOperPtr, regTabPtr); sendTUPKEYCONF(signal, regOperPtr, regOperPtr->logSize); return; }//if if (regTabPtr->checksumIndicator && (calculateChecksum(pagePtr.p, regOperPtr->pageOffset, regTabPtr->tupheadsize) != 0)) { jam(); terrorCode = ZTUPLE_CORRUPTED_ERROR; tupkeyErrorLab(signal); return; }//if if (Roptype == ZUPDATE) { jam(); if (handleUpdateReq(signal, regOperPtr, regFragPtr, regTabPtr, pagePtr.p) == -1) { return; }//if // If update operation is done on primary, // check any after op triggers terrorCode = 0; if (!regTabPtr->tuxCustomTriggers.isEmpty()) { jam(); if (executeTuxUpdateTriggers(signal, regOperPtr, regTabPtr) != 0) { jam(); tupkeyErrorLab(signal); return; } } checkImmediateTriggersAfterUpdate(signal, regOperPtr, regTabPtr); // XXX use terrorCode for now since all methods are void if (terrorCode != 0) { tupkeyErrorLab(signal); return; } sendTUPKEYCONF(signal, regOperPtr, regOperPtr->logSize); return; } else if (Roptype == ZDELETE) { jam(); if (handleDeleteReq(signal, regOperPtr, regFragPtr, regTabPtr, pagePtr.p) == -1) { return; }//if // If delete operation is done on primary, // check any after op triggers if (!regTabPtr->tuxCustomTriggers.isEmpty()) { jam(); if (executeTuxDeleteTriggers(signal, regOperPtr, regTabPtr) != 0) { jam(); tupkeyErrorLab(signal); return; } } checkImmediateTriggersAfterDelete(signal, regOperPtr, regTabPtr); sendTUPKEYCONF(signal, regOperPtr, 0); return; } else { ndbrequire(false); }//if}//Dbtup::execTUPKEYREQ()/* ---------------------------------------------------------------- *//* ------------------------ CONFIRM REQUEST ----------------------- *//* ---------------------------------------------------------------- */void Dbtup::sendTUPKEYCONF(Signal* signal, Operationrec * const regOperPtr, Uint32 TlogSize) { TupKeyConf * const tupKeyConf = (TupKeyConf *)signal->getDataPtrSend(); Uint32 RuserPointer = regOperPtr->userpointer; Uint32 RattroutbufLen = regOperPtr->attroutbufLen; Uint32 RnoFiredTriggers = regOperPtr->noFiredTriggers; BlockReference Ruserblockref = regOperPtr->userblockref; Uint32 lastRow = regOperPtr->lastRow; regOperPtr->transstate = STARTED; regOperPtr->tupleState = NO_OTHER_OP; tupKeyConf->userPtr = RuserPointer; tupKeyConf->readLength = RattroutbufLen; tupKeyConf->writeLength = TlogSize; tupKeyConf->noFiredTriggers = RnoFiredTriggers; tupKeyConf->lastRow = lastRow; EXECUTE_DIRECT(refToBlock(Ruserblockref), GSN_TUPKEYCONF, signal, TupKeyConf::SignalLength); return;}//Dbtup::sendTUPKEYCONF()#define MAX_READ (sizeof(signal->theData) > MAX_MESSAGE_SIZE ? MAX_MESSAGE_SIZE : sizeof(signal->theData))/* ---------------------------------------------------------------- *//* ----------------------------- READ ---------------------------- *//* ---------------------------------------------------------------- */int Dbtup::handleReadReq(Signal* signal, Operationrec* const regOperPtr, Tablerec* const regTabPtr, Page* pagePtr){ Uint32 Ttupheadoffset = regOperPtr->pageOffset; const BlockReference sendBref = regOperPtr->recBlockref; if (regTabPtr->checksumIndicator && (calculateChecksum(pagePtr, Ttupheadoffset, regTabPtr->tupheadsize) != 0)) { jam(); terrorCode = ZTUPLE_CORRUPTED_ERROR; tupkeyErrorLab(signal); return -1; }//if Uint32 * dst = &signal->theData[25]; Uint32 dstLen = (MAX_READ / 4) - 25; const Uint32 node = refToNode(sendBref); if(node != 0 && node != getOwnNodeId()) { ; } else { jam(); /** * execute direct */ dst = &signal->theData[3]; dstLen = (MAX_READ / 4) - 3; } if (regOperPtr->interpretedExec != 1) { jam(); int ret = readAttributes(pagePtr, Ttupheadoffset, &cinBuffer[0], regOperPtr->attrinbufLen, dst, dstLen, false); if (ret != -1) {/* ------------------------------------------------------------------------- */// We have read all data into coutBuffer. Now send it to the API./* ------------------------------------------------------------------------- */ jam(); Uint32 TnoOfDataRead= (Uint32) ret; regOperPtr->attroutbufLen = TnoOfDataRead; sendReadAttrinfo(signal, TnoOfDataRead, regOperPtr); return 0; }//if jam(); tupkeyErrorLab(signal); return -1; } else { jam(); regOperPtr->lastRow = 0; if (interpreterStartLab(signal, pagePtr, Ttupheadoffset) != -1) { return 0; }//if return -1; }//if}//Dbtup::handleReadReq()/* ---------------------------------------------------------------- *//* ---------------------------- UPDATE ---------------------------- *//* ---------------------------------------------------------------- */int Dbtup::handleUpdateReq(Signal* signal, Operationrec* const regOperPtr, Fragrecord* const regFragPtr, Tablerec* const regTabPtr, Page* const pagePtr) { PagePtr copyPagePtr; Uint32 tuple_size = regTabPtr->tupheadsize;//---------------------------------------------------/* --- MAKE A COPY OF THIS TUPLE ON A COPY PAGE --- *///--------------------------------------------------- Uint32 RpageOffsetC; if (!allocTh(regFragPtr, regTabPtr, COPY_PAGE, signal, RpageOffsetC, copyPagePtr)) { TUPKEY_abort(signal, 1); return -1; }//if Uint32 RpageIdC = copyPagePtr.i; Uint32 RfragPageIdC = copyPagePtr.p->pageWord[ZPAGE_FRAG_PAGE_ID_POS]; Uint32 indexC = ((RpageOffsetC - ZPAGE_HEADER_SIZE) / tuple_size) << 1; regOperPtr->pageIndexC = indexC; regOperPtr->fragPageIdC = RfragPageIdC; regOperPtr->realPageIdC = RpageIdC; regOperPtr->pageOffsetC = RpageOffsetC; /* -------------------------------------------------------------- */ /* IF WE HAVE AN ONGING CHECKPOINT WE HAVE TO LOG THE ALLOCATION */ /* OF THE TUPLE HEADER TO BE ABLE TO DELETE IT UPON RESTART */ /* THE ONLY DATA EXCEPT THE TYPE, PAGE, INDEX IS THE SIZE TO FREE */ /* -------------------------------------------------------------- */ if (isUndoLoggingActive(regFragPtr)) { if (isPageUndoLogged(regFragPtr, RfragPageIdC)) { jam(); regOperPtr->undoLogged = true; cprAddUndoLogRecord(signal, ZLCPR_TYPE_DELETE_TH, RfragPageIdC, indexC, regOperPtr->tableRef, regOperPtr->fragId, regFragPtr->checkpointVersion); }//if if (isPageUndoLogged(regFragPtr, regOperPtr->fragPageId)) { jam(); cprAddUndoLogRecord(signal, ZLCPR_TYPE_UPDATE_TH, regOperPtr->fragPageId, regOperPtr->pageIndex, regOperPtr->tableRef, regOperPtr->fragId, regFragPtr->checkpointVersion); cprAddData(signal, regFragPtr, regOperPtr->realPageId, tuple_size, regOperPtr->pageOffset); }//if }//if Uint32 RwordCount = tuple_size - 1; Uint32 end_dest = RpageOffsetC + tuple_size; Uint32 offset = regOperPtr->pageOffset; Uint32 end_source = offset + tuple_size; ndbrequire(end_dest <= ZWORDS_ON_PAGE && end_source <= ZWORDS_ON_PAGE); void* Tdestination = (void*)©PagePtr.p->pageWord[RpageOffsetC + 1]; const void* Tsource = (void*)&pagePtr->pageWord[offset + 1]; MEMCOPY_NO_WORDS(Tdestination, Tsource, RwordCount); Uint32 prev_tup_version; // nextActiveOp is before this op in event order if (regOperPtr->nextActiveOp == RNIL) { jam(); prev_tup_version = ((const Uint32*)Tsource)[0]; } else { OperationrecPtr prevOperPtr; jam(); prevOperPtr.i = regOperPtr->nextActiveOp; ptrCheckGuard(prevOperPtr, cnoOfOprec, operationrec); prev_tup_version = prevOperPtr.p->tupVersion; }//if regOperPtr->tupVersion = (prev_tup_version + 1) & ((1 << ZTUP_VERSION_BITS) - 1); // global variable alert ndbassert(operationrec + operPtr.i == regOperPtr); copyPagePtr.p->pageWord[RpageOffsetC] = operPtr.i; return updateStartLab(signal, regOperPtr, regTabPtr, pagePtr);}//Dbtup::handleUpdateReq()/* ---------------------------------------------------------------- *//* ----------------------------- INSERT --------------------------- *//* ---------------------------------------------------------------- */int Dbtup::handleInsertReq(Signal* signal, Operationrec* const regOperPtr, Fragrecord* const regFragPtr, Tablerec* const regTabPtr, Page* const pagePtr) { Uint32 ret_value; if (regOperPtr->nextActiveOp != RNIL) { jam(); OperationrecPtr prevExecOpPtr; prevExecOpPtr.i = regOperPtr->nextActiveOp; ptrCheckGuard(prevExecOpPtr, cnoOfOprec, operationrec); if (prevExecOpPtr.p->optype != ZDELETE) { terrorCode = ZINSERT_ERROR; tupkeyErrorLab(signal); return -1; }//if ret_value = handleUpdateReq(signal, regOperPtr, regFragPtr, regTabPtr, pagePtr); } else { jam(); regOperPtr->tupVersion = 0; ret_value = updateStartLab(signal, regOperPtr, regTabPtr, pagePtr); }//if if (ret_value != (Uint32)-1) { if (checkNullAttributes(regOperPtr, regTabPtr)) { jam(); return 0; }//if TUPKEY_abort(signal, 17); }//if return -1;}//Dbtup::handleInsertReq()/* ---------------------------------------------------------------- *//* ---------------------------- DELETE ---------------------------- *//* ---------------------------------------------------------------- */int Dbtup::handleDeleteReq(Signal* signal, Operationrec* const regOperPtr, Fragrecord* const regFragPtr, Tablerec* const regTabPtr, Page* const pagePtr){ // delete must set but not increment tupVersion if (regOperPtr->nextActiveOp != RNIL) { OperationrecPtr prevExecOpPtr; prevExecOpPtr.i = regOperPtr->nextActiveOp; ptrCheckGuard(prevExecOpPtr, cnoOfOprec, operationrec); regOperPtr->tupVersion = prevExecOpPtr.p->tupVersion; } else { jam(); regOperPtr->tupVersion = pagePtr->pageWord[regOperPtr->pageOffset + 1]; } if (isUndoLoggingNeeded(regFragPtr, regOperPtr->fragPageId)) { jam(); cprAddUndoLogRecord(signal, ZINDICATE_NO_OP_ACTIVE, regOperPtr->fragPageId, regOperPtr->pageIndex, regOperPtr->tableRef, regOperPtr->fragId, regFragPtr->checkpointVersion); }//if if (regOperPtr->attrinbufLen == 0) { return 0; }//if/* ------------------------------------------------------------------------ *//* THE APPLICATION WANTS TO READ THE TUPLE BEFORE IT IS DELETED. *//* ------------------------------------------------------------------------ */ return handleReadReq(signal, regOperPtr, regTabPtr, pagePtr);}//Dbtup::handleDeleteReq()intDbtup::updateStartLab(Signal* signal, Operationrec* const regOperPtr, Tablerec* const regTabPtr, Page* const pagePtr){ int retValue; if (regOperPtr->optype == ZINSERT) { jam(); setNullBits(pagePtr, regTabPtr, regOperPtr->pageOffset); } if (regOperPtr->interpretedExec != 1) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -