📄 trix.cpp
字号:
buildFailed(signal, subRecPtr, BuildIndxRef::InternalError);}void Trix::execSUB_SYNC_CONTINUE_REQ(Signal* signal){ SubSyncContinueReq * subSyncContinueReq = (SubSyncContinueReq *) signal->getDataPtr(); SubscriptionRecPtr subRecPtr; SubscriptionRecord* subRec; subRecPtr.i = subSyncContinueReq->subscriberData; if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) { printf("Trix::execSUB_SYNC_CONTINUE_REQ: Failed to find subscription data %u\n", subRecPtr.i); return; } subRecPtr.p = subRec; subRec->pendingSubSyncContinueConf = true; checkParallelism(signal, subRec);}void Trix::execSUB_META_DATA(Signal* signal){ jamEntry();}void Trix::execSUB_TABLE_DATA(Signal* signal){ jamEntry(); SubTableData * subTableData = (SubTableData *)signal->getDataPtr(); SubscriptionRecPtr subRecPtr; SubscriptionRecord* subRec; subRecPtr.i = subTableData->subscriberData; if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) { printf("Trix::execSUB_TABLE_DATA: Failed to find subscription data %u\n", subRecPtr.i); return; } subRecPtr.p = subRec; SegmentedSectionPtr headerPtr, dataPtr; if (!signal->getSection(headerPtr, 0)) { printf("Trix::execSUB_TABLE_DATA: Failed to get header section\n"); } if (!signal->getSection(dataPtr, 1)) { printf("Trix::execSUB_TABLE_DATA: Failed to get data section\n"); } executeInsertTransaction(signal, subRecPtr, headerPtr, dataPtr);}void Trix::setupSubscription(Signal* signal, SubscriptionRecPtr subRecPtr){ Uint32 attributeList[MAX_ATTRIBUTES_IN_TABLE * 2]; SubCreateReq * subCreateReq = (SubCreateReq *)signal->getDataPtrSend(); SubscriptionRecord* subRec = subRecPtr.p;// Uint32 listLen = subRec->noOfIndexColumns + subRec->noOfKeyColumns; AttrOrderBuffer::DataBufferIterator iter; Uint32 i = 0; jam(); bool moreAttributes = subRec->attributeOrder.first(iter); while (moreAttributes) { attributeList[i++] = *iter.data; moreAttributes = subRec->attributeOrder.next(iter); } // Merge index and key column segments struct LinearSectionPtr orderPtr[3]; orderPtr[0].p = attributeList; orderPtr[0].sz = subRec->attributeOrder.getSize(); subCreateReq->subscriberRef = reference(); subCreateReq->subscriberData = subRecPtr.i; subCreateReq->subscriptionId = subRec->subscriptionId; subCreateReq->subscriptionKey = subRec->subscriptionKey; subCreateReq->tableId = subRec->sourceTableId; subCreateReq->subscriptionType = SubCreateReq::SingleTableScan; sendSignal(SUMA_REF, GSN_SUB_CREATE_REQ, signal, SubCreateReq::SignalLength+1, JBB, orderPtr, 1);}void Trix::setupTableScan(Signal* signal, SubscriptionRecPtr subRecPtr){ SubSyncReq * subSyncReq = (SubSyncReq *)signal->getDataPtrSend(); jam(); subSyncReq->subscriptionId = subRecPtr.i; subSyncReq->subscriptionKey = subRecPtr.p->subscriptionKey; subSyncReq->part = SubscriptionData::MetaData; sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ, signal, SubSyncReq::SignalLength, JBB);}void Trix::startTableScan(Signal* signal, SubscriptionRecPtr subRecPtr){ jam(); subRecPtr.p->expectedConf = 1; SubSyncReq * subSyncReq = (SubSyncReq *)signal->getDataPtrSend(); subSyncReq->subscriptionId = subRecPtr.i; subSyncReq->subscriptionKey = subRecPtr.p->subscriptionKey; subSyncReq->part = SubscriptionData::TableData; sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ, signal, SubSyncReq::SignalLength, JBB);}void Trix::prepareInsertTransactions(Signal* signal, SubscriptionRecPtr subRecPtr){ SubscriptionRecord* subRec = subRecPtr.p; UtilPrepareReq * utilPrepareReq = (UtilPrepareReq *)signal->getDataPtrSend(); jam(); utilPrepareReq->senderRef = reference(); utilPrepareReq->senderData = subRecPtr.i; const Uint32 pageSizeInWords = 128; Uint32 propPage[pageSizeInWords]; LinearWriter w(&propPage[0],128); w.first(); w.add(UtilPrepareReq::NoOfOperations, 1); w.add(UtilPrepareReq::OperationType, UtilPrepareReq::Write); w.add(UtilPrepareReq::TableId, subRec->targetTableId); // Add index attributes in increasing order and one PK attribute for(Uint32 i = 0; i < subRec->noOfIndexColumns + 1; i++) w.add(UtilPrepareReq::AttributeId, i);#if 0 // Debugging SimplePropertiesLinearReader reader(propPage, w.getWordsUsed()); printf("Trix::prepareInsertTransactions: Sent SimpleProperties:\n"); reader.printAll(ndbout);#endif struct LinearSectionPtr sectionsPtr[UtilPrepareReq::NoOfSections]; sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].p = propPage; sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].sz = w.getWordsUsed(); sendSignal(DBUTIL_REF, GSN_UTIL_PREPARE_REQ, signal, UtilPrepareReq::SignalLength, JBB, sectionsPtr, UtilPrepareReq::NoOfSections);}void Trix::executeInsertTransaction(Signal* signal, SubscriptionRecPtr subRecPtr, SegmentedSectionPtr headerPtr, SegmentedSectionPtr dataPtr){ jam(); SubscriptionRecord* subRec = subRecPtr.p; UtilExecuteReq * utilExecuteReq = (UtilExecuteReq *)signal->getDataPtrSend(); Uint32* headerBuffer = signal->theData + 25; Uint32* dataBuffer = headerBuffer + headerPtr.sz; utilExecuteReq->senderRef = reference(); utilExecuteReq->senderData = subRecPtr.i; utilExecuteReq->prepareId = subRec->prepareId;#if 0 printf("Header size %u\n", headerPtr.sz); for(int i = 0; i < headerPtr.sz; i++) printf("H'%.8x ", headerBuffer[i]); printf("\n"); printf("Data size %u\n", dataPtr.sz); for(int i = 0; i < dataPtr.sz; i++) printf("H'%.8x ", dataBuffer[i]); printf("\n");#endif // Save scan result in linear buffers copy(headerBuffer, headerPtr); copy(dataBuffer, dataPtr); // Calculate packed key size Uint32 noOfKeyData = 0; for(Uint32 i = 0; i < headerPtr.sz; i++) { AttributeHeader* keyAttrHead = (AttributeHeader *) headerBuffer + i; // Filter out NULL attributes if (keyAttrHead->isNULL()) return; if (i < subRec->noOfIndexColumns) // Renumber index attributes in consequtive order keyAttrHead->setAttributeId(i); else // Calculate total size of PK attribute noOfKeyData += keyAttrHead->getDataSize(); } // Increase expected CONF count subRec->expectedConf++; // Pack key attributes AttributeHeader::init(headerBuffer + subRec->noOfIndexColumns, subRec->noOfIndexColumns, noOfKeyData); struct LinearSectionPtr sectionsPtr[UtilExecuteReq::NoOfSections]; sectionsPtr[UtilExecuteReq::HEADER_SECTION].p = headerBuffer; sectionsPtr[UtilExecuteReq::HEADER_SECTION].sz = subRec->noOfIndexColumns + 1; sectionsPtr[UtilExecuteReq::DATA_SECTION].p = dataBuffer; sectionsPtr[UtilExecuteReq::DATA_SECTION].sz = dataPtr.sz; sendSignal(DBUTIL_REF, GSN_UTIL_EXECUTE_REQ, signal, UtilExecuteReq::SignalLength, JBB, sectionsPtr, UtilExecuteReq::NoOfSections);}void Trix::buildComplete(Signal* signal, SubscriptionRecPtr subRecPtr){ SubRemoveReq * const req = (SubRemoveReq*)signal->getDataPtrSend(); req->senderRef = reference(); req->senderData = subRecPtr.i; req->subscriptionId = subRecPtr.p->subscriptionId; req->subscriptionKey = subRecPtr.p->subscriptionKey; sendSignal(SUMA_REF, GSN_SUB_REMOVE_REQ, signal, SubRemoveReq::SignalLength, JBB);}void Trix::buildFailed(Signal* signal, SubscriptionRecPtr subRecPtr, BuildIndxRef::ErrorCode errorCode){ SubscriptionRecord* subRec = subRecPtr.p; subRec->errorCode = errorCode; // Continue accumulating since we currently cannot stop SUMA subRec->expectedConf--; checkParallelism(signal, subRec); if (subRec->expectedConf == 0) buildComplete(signal, subRecPtr);}voidTrix::execSUB_REMOVE_REF(Signal* signal){ jamEntry(); //@todo ndbrequire(false);}voidTrix::execSUB_REMOVE_CONF(Signal* signal){ jamEntry(); SubRemoveConf * const conf = (SubRemoveConf*)signal->getDataPtrSend(); SubscriptionRecPtr subRecPtr; c_theSubscriptions.getPtr(subRecPtr, conf->senderData); if(subRecPtr.p->prepareId != RNIL){ jam(); UtilReleaseReq * const req = (UtilReleaseReq*)signal->getDataPtrSend(); req->prepareId = subRecPtr.p->prepareId; req->senderData = subRecPtr.i; sendSignal(DBUTIL_REF, GSN_UTIL_RELEASE_REQ, signal, UtilReleaseReq::SignalLength , JBB); return; } { UtilReleaseConf * const conf = (UtilReleaseConf*)signal->getDataPtrSend(); conf->senderData = subRecPtr.i; execUTIL_RELEASE_CONF(signal); }}voidTrix::execUTIL_RELEASE_REF(Signal* signal){ jamEntry(); ndbrequire(false);}voidTrix::execUTIL_RELEASE_CONF(Signal* signal){ UtilReleaseConf * const conf = (UtilReleaseConf*)signal->getDataPtrSend(); SubscriptionRecPtr subRecPtr; c_theSubscriptions.getPtr(subRecPtr, conf->senderData); if(subRecPtr.p->errorCode == BuildIndxRef::NoError){ // Build is complete, reply to original sender BuildIndxConf * buildIndxConf = (BuildIndxConf *)signal->getDataPtrSend(); buildIndxConf->setUserRef(subRecPtr.p->userReference); buildIndxConf->setConnectionPtr(subRecPtr.p->connectionPtr); buildIndxConf->setRequestType(BuildIndxReq::RT_TRIX); buildIndxConf->setIndexType(subRecPtr.p->indexType); buildIndxConf->setTableId(subRecPtr.p->sourceTableId); buildIndxConf->setIndexId(subRecPtr.p->targetTableId); sendSignal(subRecPtr.p->userReference, GSN_BUILDINDXCONF, signal, BuildIndxConf::SignalLength , JBB); } else { // Build failed, reply to original sender BuildIndxRef * buildIndxRef = (BuildIndxRef *)signal->getDataPtrSend(); buildIndxRef->setUserRef(subRecPtr.p->userReference); buildIndxRef->setConnectionPtr(subRecPtr.p->connectionPtr); buildIndxRef->setRequestType(BuildIndxReq::RT_TRIX); buildIndxRef->setIndexType(subRecPtr.p->indexType); buildIndxRef->setTableId(subRecPtr.p->sourceTableId); buildIndxRef->setIndexId(subRecPtr.p->targetTableId); buildIndxRef->setErrorCode(subRecPtr.p->errorCode); sendSignal(subRecPtr.p->userReference, GSN_BUILDINDXREF, signal, BuildIndxRef::SignalLength , JBB); } // Release subscription record subRecPtr.p->attributeOrder.release(); c_theSubscriptions.release(subRecPtr.i);}void Trix::checkParallelism(Signal* signal, SubscriptionRecord* subRec){ if ((subRec->pendingSubSyncContinueConf) && (subRec->expectedConf < subRec->parallelism)) { SubSyncContinueConf * subSyncContinueConf = (SubSyncContinueConf *) signal->getDataPtrSend(); subSyncContinueConf->subscriptionId = subRec->subscriptionId; subSyncContinueConf->subscriptionKey = subRec->subscriptionKey; sendSignal(SUMA_REF, GSN_SUB_SYNC_CONTINUE_CONF, signal, SubSyncContinueConf::SignalLength , JBB); subRec->pendingSubSyncContinueConf = false; }}BLOCK_FUNCTIONS(Trix)template void append(DataBuffer<15>&,SegmentedSectionPtr,SectionSegmentPool&);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -