📄 ndbscanoperation.cpp
字号:
last = tSignal; tSignal = tSignal->next(); } while(last != theLastKEYINFO); } tSignal = theFirstATTRINFO; while (tSignal != NULL) { AttrInfo * attrInfo = CAST_PTR(AttrInfo, tSignal->getDataPtrSend()); attrInfo->connectPtr = aTC_ConnectPtr; attrInfo->transId[0] = Uint32(transId); attrInfo->transId[1] = Uint32(transId >> 32); if (tp->sendSignal(tSignal,aProcessorId) == -1){ setErrorCode(4002); return -1; } tSignalCount++; tSignal = tSignal->next(); } theStatus = WaitResponse; m_curr_row = 0; m_sent_receivers_count = theParallelism; if(m_ordered) { m_current_api_receiver = theParallelism; m_api_receivers_count = theParallelism; } return tSignalCount;}//NdbOperation::doSendScan()/***************************************************************************** * NdbOperation* takeOverScanOp(NdbTransaction* updateTrans); * * Parameters: The update transactions NdbTransaction pointer. * Return Value: A reference to the transferred operation object * or NULL if no success. * Remark: Take over the scanning transactions NdbOperation * object for a tuple to an update transaction, * which is the last operation read in nextScanResult() * (theNdbCon->thePreviousScanRec) * * FUTURE IMPLEMENTATION: (This note was moved from header file.) * In the future, it will even be possible to transfer * to a NdbTransaction on another Ndb-object. * In this case the receiving NdbTransaction-object must call * a method receiveOpFromScan to actually receive the information. * This means that the updating transactions can be placed * in separate threads and thus increasing the parallelism during * the scan process. ****************************************************************************/intNdbScanOperation::getKeyFromKEYINFO20(Uint32* data, unsigned size){ NdbRecAttr * tRecAttr = m_curr_row; if(tRecAttr) { const Uint32 * src = (Uint32*)tRecAttr->aRef(); memcpy(data, src, 4*size); return 0; } return -1;}NdbOperation*NdbScanOperation::takeOverScanOp(OperationType opType, NdbTransaction* pTrans){ NdbRecAttr * tRecAttr = m_curr_row; if(tRecAttr) { NdbOperation * newOp = pTrans->getNdbOperation(m_currentTable); if (newOp == NULL){ return NULL; } pTrans->theSimpleState = 0; const Uint32 len = (tRecAttr->attrSize() * tRecAttr->arraySize() + 3)/4-1; newOp->theTupKeyLen = len; newOp->theOperationType = opType; if (opType == DeleteRequest) { newOp->theStatus = GetValue; } else { newOp->theStatus = SetValue; } const Uint32 * src = (Uint32*)tRecAttr->aRef(); const Uint32 tScanInfo = src[len] & 0x3FFFF; const Uint32 tTakeOverFragment = src[len] >> 20; { UintR scanInfo = 0; TcKeyReq::setTakeOverScanFlag(scanInfo, 1); TcKeyReq::setTakeOverScanFragment(scanInfo, tTakeOverFragment); TcKeyReq::setTakeOverScanInfo(scanInfo, tScanInfo); newOp->theScanInfo = scanInfo; newOp->theDistrKeyIndicator_ = 1; newOp->theDistributionKey = tTakeOverFragment; } // Copy the first 8 words of key info from KEYINF20 into TCKEYREQ TcKeyReq * tcKeyReq = CAST_PTR(TcKeyReq,newOp->theTCREQ->getDataPtrSend()); Uint32 i = 0; for (i = 0; i < TcKeyReq::MaxKeyInfo && i < len; i++) { tcKeyReq->keyInfo[i] = * src++; } if(i < len){ NdbApiSignal* tSignal = theNdb->getSignal(); newOp->theTCREQ->next(tSignal); Uint32 left = len - i; while(tSignal && left > KeyInfo::DataLength){ tSignal->setSignal(GSN_KEYINFO); KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend()); memcpy(keyInfo->keyData, src, 4 * KeyInfo::DataLength); src += KeyInfo::DataLength; left -= KeyInfo::DataLength; tSignal->next(theNdb->getSignal()); tSignal = tSignal->next(); } if(tSignal && left > 0){ tSignal->setSignal(GSN_KEYINFO); KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend()); memcpy(keyInfo->keyData, src, 4 * left); } } // create blob handles automatically if (opType == DeleteRequest && m_currentTable->m_noOfBlobs != 0) { for (unsigned i = 0; i < m_currentTable->m_columns.size(); i++) { NdbColumnImpl* c = m_currentTable->m_columns[i]; assert(c != 0); if (c->getBlobType()) { if (newOp->getBlobHandle(pTrans, c) == NULL) return NULL; } } } return newOp; } return 0;}NdbBlob*NdbScanOperation::getBlobHandle(const char* anAttrName){ m_keyInfo = 1; return NdbOperation::getBlobHandle(m_transConnection, m_currentTable->getColumn(anAttrName));}NdbBlob*NdbScanOperation::getBlobHandle(Uint32 anAttrId){ m_keyInfo = 1; return NdbOperation::getBlobHandle(m_transConnection, m_currentTable->getColumn(anAttrId));}NdbIndexScanOperation::NdbIndexScanOperation(Ndb* aNdb) : NdbScanOperation(aNdb, NdbOperation::OrderedIndexScan){}NdbIndexScanOperation::~NdbIndexScanOperation(){}intNdbIndexScanOperation::setBound(const char* anAttrName, int type, const void* aValue, Uint32 len){ return setBound(m_accessTable->getColumn(anAttrName), type, aValue, len);}intNdbIndexScanOperation::setBound(Uint32 anAttrId, int type, const void* aValue, Uint32 len){ return setBound(m_accessTable->getColumn(anAttrId), type, aValue, len);}intNdbIndexScanOperation::equal_impl(const NdbColumnImpl* anAttrObject, const char* aValue, Uint32 len){ return setBound(anAttrObject, BoundEQ, aValue, len);}NdbRecAttr*NdbIndexScanOperation::getValue_impl(const NdbColumnImpl* attrInfo, char* aValue){ if(!m_ordered){ return NdbScanOperation::getValue_impl(attrInfo, aValue); } int id = attrInfo->m_attrId; // In "real" table assert(m_accessTable->m_index); int sz = (int)m_accessTable->m_index->m_key_ids.size(); if(id >= sz || (id = m_accessTable->m_index->m_key_ids[id]) == -1){ return NdbScanOperation::getValue_impl(attrInfo, aValue); } assert(id < NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY); Uint32 marker = theTupleKeyDefined[id][0]; if(marker == SETBOUND_EQ){ return NdbScanOperation::getValue_impl(attrInfo, aValue); } else if(marker == API_PTR){ return NdbScanOperation::getValue_impl(attrInfo, aValue); } assert(marker == FAKE_PTR); UintPtr oldVal; oldVal = theTupleKeyDefined[id][1];#if (SIZEOF_CHARP == 8) oldVal = oldVal | (((UintPtr)theTupleKeyDefined[id][2]) << 32);#endif theTupleKeyDefined[id][0] = API_PTR; NdbRecAttr* tmp = (NdbRecAttr*)oldVal; tmp->setup(attrInfo, aValue); return tmp;}#include <AttributeHeader.hpp>/* * Define bound on index column in range scan. */intNdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, int type, const void* aValue, Uint32 len){ if (!tAttrInfo) { setErrorCodeAbort(4318); // Invalid attribute return -1; } if (theOperationType == OpenRangeScanRequest && (0 <= type && type <= 4) && len <= 8000) { // insert bound type Uint32 currLen = theTotalNrOfKeyWordInSignal; Uint32 remaining = KeyInfo::DataLength - currLen; Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize; bool tDistrKey = tAttrInfo->m_distributionKey; len = aValue != NULL ? sizeInBytes : 0; if (len != sizeInBytes && (len != 0)) { setErrorCodeAbort(4209); return -1; } // insert attribute header Uint32 tIndexAttrId = tAttrInfo->m_attrId; Uint32 sizeInWords = (len + 3) / 4; AttributeHeader ah(tIndexAttrId, sizeInWords); const Uint32 ahValue = ah.m_value; const Uint32 align = (UintPtr(aValue) & 7); const bool aligned = (tDistrKey && type == BoundEQ) ? (align == 0) : (align & 3) == 0; const bool nobytes = (len & 0x3) == 0; const Uint32 totalLen = 2 + sizeInWords; Uint32 tupKeyLen = theTupKeyLen; if(remaining > totalLen && aligned && nobytes){ Uint32 * dst = theKEYINFOptr + currLen; * dst ++ = type; * dst ++ = ahValue; memcpy(dst, aValue, 4 * sizeInWords); theTotalNrOfKeyWordInSignal = currLen + totalLen; } else { if(!aligned || !nobytes){ Uint32 tempData[2000]; tempData[0] = type; tempData[1] = ahValue; tempData[2 + (len >> 2)] = 0; memcpy(tempData+2, aValue, len); insertBOUNDS(tempData, 2+sizeInWords); } else { Uint32 buf[2] = { type, ahValue }; insertBOUNDS(buf, 2); insertBOUNDS((Uint32*)aValue, sizeInWords); } } theTupKeyLen = tupKeyLen + totalLen; /** * Do sorted stuff */ /** * The primary keys for an ordered index is defined in the beginning * so it's safe to use [tIndexAttrId] * (instead of looping as is NdbOperation::equal_impl) */ if(type == BoundEQ && tDistrKey) { theNoOfTupKeyLeft--; return handle_distribution_key((Uint64*)aValue, sizeInWords); } return 0; } else { setErrorCodeAbort(4228); // XXX wrong code return -1; }}intNdbIndexScanOperation::insertBOUNDS(Uint32 * data, Uint32 sz){ Uint32 len; Uint32 remaining = KeyInfo::DataLength - theTotalNrOfKeyWordInSignal; Uint32 * dst = theKEYINFOptr + theTotalNrOfKeyWordInSignal; do { len = (sz < remaining ? sz : remaining); memcpy(dst, data, 4 * len); if(sz >= remaining){ NdbApiSignal* tCurr = theLastKEYINFO; tCurr->setLength(KeyInfo::MaxSignalLength); NdbApiSignal* tSignal = tCurr->next(); if(tSignal) ; else if((tSignal = theNdb->getSignal()) != 0) { tCurr->next(tSignal); tSignal->setSignal(GSN_KEYINFO); } else { goto error; } theLastKEYINFO = tSignal; theKEYINFOptr = dst = ((KeyInfo*)tSignal->getDataPtrSend())->keyData; remaining = KeyInfo::DataLength; sz -= len; data += len; } else { len = (KeyInfo::DataLength - remaining) + len; break; } } while(true); theTotalNrOfKeyWordInSignal = len; return 0;error: setErrorCodeAbort(4228); // XXX wrong code return -1;}intNdbIndexScanOperation::readTuples(LockMode lm, Uint32 scan_flags, Uint32 parallel){ const bool order_by = scan_flags & SF_OrderBy; const bool order_desc = scan_flags & SF_Descending; const bool read_range_no = scan_flags & SF_ReadRangeNo; int res = NdbScanOperation::readTuples(lm, scan_flags, 0); if(!res && read_range_no) { m_read_range_no = 1; Uint32 word = 0; AttributeHeader::init(&word, AttributeHeader::RANGE_NO, 0); if(insertATTRINFO(word) == -1) res = -1; } if(!res && order_by){ m_ordered = true; if (order_desc) { m_descending = true; ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend()); ScanTabReq::setDescendingFlag(req->requestInfo, true); } Uint32 cnt = m_accessTable->getNoOfColumns() - 1; m_sort_columns = cnt; // -1 for NDB$NODE m_current_api_receiver = m_sent_receivers_count; m_api_receivers_count = m_sent_receivers_count; m_sort_columns = cnt; for(Uint32 i = 0; i<cnt; i++){ const NdbColumnImpl* key = m_accessTable->m_index->m_columns[i]; const NdbColumnImpl* col = m_currentTable->getColumn(key->m_keyInfoPos); NdbRecAttr* tmp = NdbScanOperation::getValue_impl(col, (char*)-1); UintPtr newVal = UintPtr(tmp); theTupleKeyDefined[i][0] = FAKE_PTR; theTupleKeyDefined[i][1] = (newVal & 0xFFFFFFFF);#if (SIZEOF_CHARP == 8) theTupleKeyDefined[i][2] = (newVal >> 32);#endif } } m_this_bound_start = 0; m_first_bound_word = theKEYINFOptr; return res;}voidNdbIndexScanOperation::fix_get_values(){ /** * Loop through all getValues and set buffer pointer to "API" pointer */ NdbRecAttr * curr = theReceiver.theFirstRecAttr; Uint32 cnt = m_accessTable->getNoOfColumns() - 1; assert(cnt < NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY); const NdbIndexImpl * idx = m_accessTable->m_index; const NdbTableImpl * tab = m_currentTable; for(Uint32 i = 0; i<cnt; i++){ Uint32 val = theTupleKeyDefined[i][0]; switch(val){ case FAKE_PTR: curr->setup(curr->m_column, 0); case API_PTR: curr = curr->next(); break; case SETBOUND_EQ: break;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -