📄 dbutil.cpp
字号:
ndbrequire(transP->operations.first(opPtr)); ndbrequire(transP->operations.hasNext(opPtr) == false); if(transP->errorCode == 0){ jam(); // OK UtilSequenceConf * ret = (UtilSequenceConf *)signal->getDataPtrSend(); ret->senderData = transP->clientData; ret->sequenceId = transP->sequence.sequenceId; ret->requestType = transP->sequence.requestType; bool ok = false; switch(transP->sequence.requestType){ case UtilSequenceReq::CurrVal: case UtilSequenceReq::NextVal:{ ok = true; ndbrequire(opPtr.p->rsRecv == 3); ResultSetBuffer::DataBufferIterator rsit; ndbrequire(opPtr.p->rs.first(rsit)); ret->sequenceValue[0] = rsit.data[1]; ret->sequenceValue[1] = rsit.data[2]; break; } case UtilSequenceReq::Create: ok = true; ret->sequenceValue[0] = 0; ret->sequenceValue[1] = 0; break; } ndbrequire(ok); sendSignal(transP->clientRef, GSN_UTIL_SEQUENCE_CONF, signal, UtilSequenceConf::SignalLength, JBB); return; } UtilSequenceRef::ErrorCode errCode = UtilSequenceRef::TCError; switch(transP->sequence.requestType) { case UtilSequenceReq::CurrVal: case UtilSequenceReq::NextVal:{ if (transP->errorCode == 626) errCode = UtilSequenceRef::NoSuchSequence; break; } case UtilSequenceReq::Create: break; } UtilSequenceRef * ret = (UtilSequenceRef *)signal->getDataPtrSend(); ret->senderData = transP->clientData; ret->sequenceId = transP->sequence.sequenceId; ret->requestType = transP->sequence.requestType; ret->errorCode = (Uint32)errCode; sendSignal(transP->clientRef, GSN_UTIL_SEQUENCE_REF, signal, UtilSequenceRef::SignalLength, JBB);}#if 0 Ndb ndb("ndb","def"); NdbConnection* tConnection = ndb.startTransaction(); NdbOperation* tOperation = tConnection->getNdbOperation("SYSTAB_0"); //#if 0 && API_CODE if( tOperation != NULL ) { tOperation->interpretedUpdateTuple(); tOperation->equal((U_Int32)0, keyValue ); tNextId_Result = tOperation->getValue((U_Int32)1); tOperation->incValue((U_Int32)1, (U_Int32)8192); if (tConnection->execute( Commit ) != -1 ) { U_Int64 tValue = tNextId_Result->u_64_value(); // Read result value theFirstTransId = tValue; theLastTransId = tValue + 8191; closeTransaction(tConnection); return startTransactionLocal(aPriority, nodeId); } } /** * IntialReadSize = 0; * InterpretedSize = incValue(1); * FinalUpdateSize = 0; * FinalReadSize = 1; // Read value * SubroutineSize = 0; */#endif/************************************************************************** * ------------------------------------------------------------------------ * MODULE: Transaction execution request * ------------------------------------------------------------------------ * * Handle requests to execute a prepared transaction **************************************************************************/void DbUtil::execUTIL_EXECUTE_REQ(Signal* signal) { jamEntry(); UtilExecuteReq * req = (UtilExecuteReq *)signal->getDataPtr(); const Uint32 clientRef = req->senderRef; const Uint32 clientData = req->senderData; const Uint32 prepareId = req->getPrepareId(); const bool releaseFlag = req->getReleaseFlag(); if(signal->getNoOfSections() == 0) { // Missing prepare data jam(); releaseSections(signal); sendUtilExecuteRef(signal, UtilExecuteRef::MissingDataSection, 0, clientRef, clientData); return; } /******************************* * Get PreparedOperation struct *******************************/ PreparedOperationPtr prepOpPtr; c_runningPreparedOperations.first(prepOpPtr); while (!prepOpPtr.isNull() && prepOpPtr.i != prepareId) c_runningPreparedOperations.next(prepOpPtr); if (prepOpPtr.i != prepareId) { jam(); releaseSections(signal); sendUtilExecuteRef(signal, UtilExecuteRef::IllegalPrepareId, 0, clientRef, clientData); return; } prepOpPtr.p->releaseFlag = releaseFlag; TransactionPtr transPtr; OperationPtr opPtr; SegmentedSectionPtr headerPtr, dataPtr; signal->getSection(headerPtr, UtilExecuteReq::HEADER_SECTION); SectionReader headerReader(headerPtr, getSectionSegmentPool()); signal->getSection(dataPtr, UtilExecuteReq::DATA_SECTION); SectionReader dataReader(dataPtr, getSectionSegmentPool());#if 0 //def EVENT_DEBUG // Debugging printf("DbUtil::execUTIL_EXECUTEL_REQ: Headers (%u): ", headerPtr.sz); Uint32 word; while(headerReader.getWord(&word)) printf("H'%.8x ", word); printf("\n"); printf("DbUtil::execUTIL_EXECUTEL_REQ: Data (%u): ", dataPtr.sz); headerReader.reset(); while(dataReader.getWord(&word)) printf("H'%.8x ", word); printf("\n"); dataReader.reset();#endif // Uint32 totalDataLen = headerPtr.sz + dataPtr.sz; /************************************************************ * Seize Transaction record ************************************************************/ ndbrequire(c_runningTransactions.seize(transPtr)); transPtr.p->gsn = GSN_UTIL_EXECUTE_REQ; transPtr.p->clientRef = clientRef; transPtr.p->clientData = clientData; ndbrequire(transPtr.p->operations.seize(opPtr)); opPtr.p->prepOp = prepOpPtr.p; opPtr.p->prepOp_i = prepOpPtr.i; #if 0 //def EVENT_DEBUG printf("opPtr.p->rs.seize( %u )\n", prepOpPtr.p->rsLen);#endif ndbrequire(opPtr.p->rs.seize(prepOpPtr.p->rsLen)); /*********************************************************** * Store signal data on linear memory in Transaction record ***********************************************************/ KeyInfoBuffer* keyInfo = &opPtr.p->keyInfo; AttrInfoBuffer* attrInfo = &opPtr.p->attrInfo; AttributeHeader header; Uint32* tempBuf = signal->theData + 25; bool dataComplete = true; while(headerReader.getWord((Uint32 *)&header)) { Uint32* bufStart = tempBuf; header.insertHeader(tempBuf++); for(unsigned int i = 0; i < header.getDataSize(); i++) { if (!dataReader.getWord(tempBuf++)) { dataComplete = false; break; } } bool res = true;#if 0 //def EVENT_DEBUG if (TcKeyReq::getOperationType(prepOpPtr.p->tckey.requestInfo) == TcKeyReq::Read) { if(prepOpPtr.p->pkBitmask.get(header.getAttributeId())) printf("PrimaryKey\n"); } printf("AttrId %u Hdrsz %d Datasz %u \n", header.getAttributeId(), header.getHeaderSize(), header.getDataSize());#endif if(prepOpPtr.p->pkBitmask.get(header.getAttributeId())) // A primary key attribute res = keyInfo->append(bufStart + header.getHeaderSize(), header.getDataSize()); switch (TcKeyReq::getOperationType(prepOpPtr.p->tckey.requestInfo)) { case ZREAD: res &= attrInfo->append(bufStart, header.getHeaderSize()); break; case ZDELETE: // no attrinfo for Delete break; default: res &= attrInfo->append(bufStart, header.getHeaderSize() + header.getDataSize()); } if (!res) { // Failed to allocate buffer data jam(); releaseSections(signal); sendUtilExecuteRef(signal, UtilExecuteRef::AllocationError, 0, clientRef, clientData); releaseTransaction(transPtr); return; } } if (!dataComplete) { // Missing data in data section jam(); releaseSections(signal); sendUtilExecuteRef(signal, UtilExecuteRef::MissingData, 0, clientRef, clientData); releaseTransaction(transPtr); return; } const Uint32 l1 = prepOpPtr.p->tckey.attrLen; const Uint32 l2 = prepOpPtr.p->attrInfo.getSize() + opPtr.p->attrInfo.getSize(); if (TcKeyReq::getOperationType(prepOpPtr.p->tckey.requestInfo) != ZREAD){ ndbrequire(l1 == l2); } else {#if 0 ndbout_c("TcKeyReq::Read");#endif } releaseSections(signal); transPtr.p->noOfRetries = 3; runTransaction(signal, transPtr);}/************************************************************************** * ------------------------------------------------------------------------ * MODULE: General transaction machinery * ------------------------------------------------------------------------ * Executes a prepared transaction **************************************************************************/voidDbUtil::runTransaction(Signal* signal, TransactionPtr transPtr){ /* Init transaction */ transPtr.p->sent = 0; transPtr.p->recv = 0; transPtr.p->errorCode = 0; getTransId(transPtr.p); OperationPtr opPtr; ndbrequire(transPtr.p->operations.first(opPtr)); /* First operation */ Uint32 start = 0; TcKeyReq::setStartFlag(start, 1); runOperation(signal, transPtr, opPtr, start); transPtr.p->sent ++; /* Rest of operations */ start = 0; while(opPtr.i != RNIL){ runOperation(signal, transPtr, opPtr, start); transPtr.p->sent ++; } //transPtr.p->print();}voidDbUtil::runOperation(Signal* signal, TransactionPtr & transPtr, OperationPtr & opPtr, Uint32 start) { Uint32 opI = opPtr.i; Operation * op = opPtr.p; const PreparedOperation * pop = op->prepOp; if(!transPtr.p->operations.next(opPtr)){ TcKeyReq::setCommitFlag(start, 1); // Last operation TcKeyReq::setExecuteFlag(start, 1); } #if 0 //def EVENT_DEBUG if (TcKeyReq::getOperationType(pop->tckey.requestInfo) == TcKeyReq::Read) { printf("TcKeyReq::Read runOperation\n"); }#endif /** * Init operation w.r.t result set */ initResultSet(op->rs, pop->rsInfo); op->rs.first(op->rsIterator); op->rsRecv = 0;#if 0 //def EVENT_DEBUG printf("pop->rsLen %u\n", pop->rsLen);#endif op->rsExpect = 0; op->transPtrI = transPtr.i; TcKeyReq * tcKey = (TcKeyReq*)signal->getDataPtrSend(); //ndbout << "*** 6 ***"<< endl; pop->print(); memcpy(tcKey, &pop->tckey, pop->tckeyLenInBytes); //ndbout << "*** 6b ***"<< endl; //printTCKEYREQ(stdout, signal->getDataPtrSend(), // pop->tckeyLenInBytes >> 2, 0); tcKey->apiConnectPtr = transPtr.p->connectPtr; tcKey->senderData = opI; tcKey->transId1 = transPtr.p->transId[0]; tcKey->transId2 = transPtr.p->transId[1]; tcKey->requestInfo |= start; #if 0 //def EVENT_DEBUG // Debugging printf("DbUtil::runOperation: KEYINFO\n"); op->keyInfo.print(stdout); printf("DbUtil::runOperation: ATTRINFO\n"); op->attrInfo.print(stdout);#endif /** * Key Info */ //KeyInfoBuffer::DataBufferIterator kit; KeyInfoIterator kit; op->keyInfo.first(kit); Uint32 *keyDst = ((Uint32*)tcKey) + pop->keyDataPos; for(Uint32 i = 0; i<8 && kit.curr.i != RNIL; i++, op->keyInfo.next(kit)){ keyDst[i] = * kit.data; } //ndbout << "*** 7 ***" << endl; //printTCKEYREQ(stdout, signal->getDataPtrSend(), // pop->tckeyLenInBytes >> 2, 0);#if 0 //def EVENT_DEBUG printf("DbUtil::runOperation: sendSignal(DBTC_REF, GSN_TCKEYREQ, signal, %d , JBB)\n", pop->tckeyLenInBytes >> 2); printTCKEYREQ(stdout, signal->getDataPtr(), pop->tckeyLenInBytes >> 2,0);#endif sendSignal(DBTC_REF, GSN_TCKEYREQ, signal, pop->tckeyLenInBytes >> 2, JBB); /** * More the 8 words of key info not implemented */ // ndbrequire(kit.curr.i == RNIL); // Yes it is /** * KeyInfo */ KeyInfo* keyInfo = (KeyInfo *)signal->getDataPtrSend(); keyInfo->connectPtr = transPtr.p->connectPtr; keyInfo->transId[0] = transPtr.p->transId[0]; keyInfo->transId[1] = transPtr.p->transId[1]; sendKeyInfo(signal, keyInfo, op->keyInfo, kit); /** * AttrInfo */ AttrInfo* attrInfo = (AttrInfo *)signal->getDataPtrSend(); attrInfo->connectPtr = transPtr.p->connectPtr; attrInfo->transId[0] = transPtr.p->transId[0]; attrInfo->transId[1] = transPtr.p->transId[1]; AttrInfoIterator ait; pop->attrInfo.first(ait); sendAttrInfo(signal, attrInfo, pop->attrInfo, ait); op->attrInfo.first(ait); sendAttrInfo(signal, attrInfo, op->attrInfo, ait);}voidDbUtil::sendKeyInfo(Signal* signal, KeyInfo* keyInfo, const KeyInfoBuffer & keyBuf, KeyInfoIterator & kit){ while(kit.curr.i != RNIL) { Uint32 *keyDst = keyInf
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -