📄 ndbscanoperation.cpp
字号:
return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed, forceSend); /** * Check current receiver */ int retVal = 2; Uint32 idx = m_current_api_receiver; Uint32 last = m_api_receivers_count; m_curr_row = 0; if(DEBUG_NEXT_RESULT) ndbout_c("nextResult(%d) idx=%d last=%d", fetchAllowed, idx, last); /** * Check next buckets */ for(; idx < last; idx++){ NdbReceiver* tRec = m_api_receivers[idx]; if(tRec->nextResult()){ m_curr_row = tRec->copyout(theReceiver); retVal = 0; break; } } /** * We have advanced atleast one bucket */ if(!fetchAllowed || !retVal){ m_current_api_receiver = idx; if(DEBUG_NEXT_RESULT) ndbout_c("return %d", retVal); return retVal; } Uint32 nodeId = theNdbCon->theDBnode; TransporterFacade* tp = TransporterFacade::instance(); Guard guard(tp->theMutexPtr); if(theError.code) return -1; Uint32 seq = theNdbCon->theNodeSequence; if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false, forceSend) == 0){ idx = m_current_api_receiver; last = m_api_receivers_count; do { if(theError.code){ setErrorCode(theError.code); if(DEBUG_NEXT_RESULT) ndbout_c("return -1"); return -1; } Uint32 cnt = m_conf_receivers_count; Uint32 sent = m_sent_receivers_count; if(DEBUG_NEXT_RESULT) ndbout_c("idx=%d last=%d cnt=%d sent=%d", idx, last, cnt, sent); if(cnt > 0){ /** * Just move completed receivers */ memcpy(m_api_receivers+last, m_conf_receivers, cnt * sizeof(char*)); last += cnt; m_conf_receivers_count = 0; } else if(retVal == 2 && sent > 0){ /** * No completed... */ theNdb->theImpl->theWaiter.m_node = nodeId; theNdb->theImpl->theWaiter.m_state = WAIT_SCAN; int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) { continue; } else { idx = last; retVal = -2; //return_code; } } else if(retVal == 2){ /** * No completed & no sent -> EndOfData */ theError.code = -1; // make sure user gets error if he tries again if(DEBUG_NEXT_RESULT) ndbout_c("return 1"); return 1; } if(retVal == 0) break; for(; idx < last; idx++){ NdbReceiver* tRec = m_api_receivers[idx]; if(tRec->nextResult()){ m_curr_row = tRec->copyout(theReceiver); retVal = 0; break; } } } while(retVal == 2); } else { retVal = -3; } m_api_receivers_count = last; m_current_api_receiver = idx; switch(retVal){ case 0: case 1: case 2: if(DEBUG_NEXT_RESULT) ndbout_c("return %d", retVal); return retVal; case -1: setErrorCode(4008); // Timeout break; case -2: setErrorCode(4028); // Node fail break; case -3: // send_next_scan -> return fail (set error-code self) if(theError.code == 0) setErrorCode(4028); // seq changed = Node fail break; } theNdbCon->theTransactionIsStarted = false; theNdbCon->theReleaseOnClose = true; if(DEBUG_NEXT_RESULT) ndbout_c("return -1", retVal); return -1;}intNdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag, bool forceSend){ if(cnt > 0){ NdbApiSignal tSignal(theNdb->theMyRef); tSignal.setSignal(GSN_SCAN_NEXTREQ); Uint32* theData = tSignal.getDataPtrSend(); theData[0] = theNdbCon->theTCConPtr; theData[1] = stopScanFlag == true ? 1 : 0; Uint64 transId = theNdbCon->theTransactionId; theData[2] = transId; theData[3] = (Uint32) (transId >> 32); /** * Prepare ops */ Uint32 last = m_sent_receivers_count; Uint32 * prep_array = (cnt > 21 ? m_prepared_receivers : theData + 4); Uint32 sent = 0; for(Uint32 i = 0; i<cnt; i++){ NdbReceiver * tRec = m_api_receivers[i]; if((prep_array[sent] = tRec->m_tcPtrI) != RNIL) { m_sent_receivers[last+sent] = tRec; tRec->m_list_index = last+sent; tRec->prepareSend(); sent++; } } memmove(m_api_receivers, m_api_receivers+cnt, (theParallelism-cnt) * sizeof(char*)); int ret = 0; if(sent) { Uint32 nodeId = theNdbCon->theDBnode; TransporterFacade * tp = TransporterFacade::instance(); if(cnt > 21){ tSignal.setLength(4); LinearSectionPtr ptr[3]; ptr[0].p = prep_array; ptr[0].sz = sent; ret = tp->sendSignal(&tSignal, nodeId, ptr, 1); } else { tSignal.setLength(4+sent); ret = tp->sendSignal(&tSignal, nodeId); } } if (!ret) checkForceSend(forceSend); m_sent_receivers_count = last + sent; m_api_receivers_count -= cnt; m_current_api_receiver = 0; return ret; } return 0;}void NdbScanOperation::checkForceSend(bool forceSend){ if (forceSend) { TransporterFacade::instance()->forceSend(theNdb->theNdbBlockNumber); } else { TransporterFacade::instance()->checkForceSend(theNdb->theNdbBlockNumber); }//if}int NdbScanOperation::prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId){ printf("NdbScanOperation::prepareSend\n"); abort(); return 0;}int NdbScanOperation::doSend(int ProcessorId){ printf("NdbScanOperation::doSend\n"); return 0;}void NdbScanOperation::close(bool forceSend, bool releaseOp){ DBUG_ENTER("NdbScanOperation::close"); DBUG_PRINT("enter", ("this=%x tcon=%x con=%x force=%d release=%d", (UintPtr)this, (UintPtr)m_transConnection, (UintPtr)theNdbCon, forceSend, releaseOp)); if(m_transConnection){ if(DEBUG_NEXT_RESULT) ndbout_c("close() theError.code = %d " "m_api_receivers_count = %d " "m_conf_receivers_count = %d " "m_sent_receivers_count = %d", theError.code, m_api_receivers_count, m_conf_receivers_count, m_sent_receivers_count); TransporterFacade* tp = TransporterFacade::instance(); Guard guard(tp->theMutexPtr); close_impl(tp, forceSend); } NdbConnection* tCon = theNdbCon; NdbConnection* tTransCon = m_transConnection; theNdbCon = NULL; m_transConnection = NULL; if (releaseOp && tTransCon) { NdbIndexScanOperation* tOp = (NdbIndexScanOperation*)this; tTransCon->releaseExecutedScanOperation(tOp); } tCon->theScanningOp = 0; theNdb->closeTransaction(tCon); theNdb->theRemainingStartTransactions--; DBUG_VOID_RETURN;}voidNdbScanOperation::execCLOSE_SCAN_REP(){ m_conf_receivers_count = 0; m_sent_receivers_count = 0;}void NdbScanOperation::release(){ if(theNdbCon != 0 || m_transConnection != 0){ close(); } for(Uint32 i = 0; i<m_allocated_receivers; i++){ m_receivers[i]->release(); } NdbOperation::release(); if(theSCAN_TABREQ) { theNdb->releaseSignal(theSCAN_TABREQ); theSCAN_TABREQ = 0; }}/***************************************************************************int prepareSendScan(Uint32 aTC_ConnectPtr, Uint64 aTransactionId)Return Value: Return 0 : preparation of send was succesful. Return -1: In all other case. Parameters: aTC_ConnectPtr: the Connect pointer to TC. aTransactionId: the Transaction identity of the transaction.Remark: Puts the the final data into ATTRINFO signal(s) after this we know the how many signal to send and their sizes***************************************************************************/int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr, Uint64 aTransactionId){ if (theInterpretIndicator != 1 || (theOperationType != OpenScanRequest && theOperationType != OpenRangeScanRequest)) { setErrorCodeAbort(4005); return -1; } theErrorLine = 0; // In preapareSendInterpreted we set the sizes (word 4-8) in the // first ATTRINFO signal. if (prepareSendInterpreted() == -1) return -1; if(m_ordered){ ((NdbIndexScanOperation*)this)->fix_get_values(); } theCurrentATTRINFO->setLength(theAI_LenInCurrAI); /** * Prepare all receivers */ theReceiver.prepareSend(); bool keyInfo = m_keyInfo; Uint32 key_size = keyInfo ? m_currentTable->m_keyLenInWords : 0; /** * The number of records sent by each LQH is calculated and the kernel * is informed of this number by updating the SCAN_TABREQ signal */ Uint32 batch_size, batch_byte_size, first_batch_size; theReceiver.calculate_batch_size(key_size, theParallelism, batch_size, batch_byte_size, first_batch_size); ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend()); ScanTabReq::setScanBatch(req->requestInfo, batch_size); req->batch_byte_size= batch_byte_size; req->first_batch_size= first_batch_size; /** * Set keyinfo flag * (Always keyinfo when using blobs) */ Uint32 reqInfo = req->requestInfo; ScanTabReq::setKeyinfoFlag(reqInfo, keyInfo); req->requestInfo = reqInfo; for(Uint32 i = 0; i<theParallelism; i++){ m_receivers[i]->do_get_value(&theReceiver, batch_size, key_size, m_read_range_no); } return 0;}/*****************************************************************************int doSend()Return Value: Return >0 : send was succesful, returns number of signals sent Return -1: In all other case. Parameters: aProcessorId: Receiving processor nodeRemark: Sends the ATTRINFO signal(s)*****************************************************************************/intNdbScanOperation::doSendScan(int aProcessorId){ Uint32 tSignalCount = 0; NdbApiSignal* tSignal; if (theInterpretIndicator != 1 || (theOperationType != OpenScanRequest && theOperationType != OpenRangeScanRequest)) { setErrorCodeAbort(4005); return -1; } assert(theSCAN_TABREQ != NULL); tSignal = theSCAN_TABREQ; Uint32 tupKeyLen = theTupKeyLen; Uint32 len = theTotalNrOfKeyWordInSignal; Uint32 aTC_ConnectPtr = theNdbCon->theTCConPtr; Uint64 transId = theNdbCon->theTransactionId; // Update the "attribute info length in words" in SCAN_TABREQ before // sending it. This could not be done in openScan because // we created the ATTRINFO signals after the SCAN_TABREQ signal. ScanTabReq * const req = CAST_PTR(ScanTabReq, tSignal->getDataPtrSend()); req->attrLenKeyLen = (tupKeyLen << 16) | theTotalCurrAI_Len; Uint32 tmp = req->requestInfo; ScanTabReq::setDistributionKeyFlag(tmp, theDistrKeyIndicator_); req->distributionKey = theDistributionKey; req->requestInfo = tmp; tSignal->setLength(ScanTabReq::StaticLength + theDistrKeyIndicator_); TransporterFacade *tp = TransporterFacade::instance(); LinearSectionPtr ptr[3]; ptr[0].p = m_prepared_receivers; ptr[0].sz = theParallelism; if (tp->sendSignal(tSignal, aProcessorId, ptr, 1) == -1) { setErrorCode(4002); return -1; } if (tupKeyLen > 0){ // must have at least one signal since it contains attrLen for bounds assert(theLastKEYINFO != NULL); tSignal = theLastKEYINFO; tSignal->setLength(KeyInfo::HeaderLength + theTotalNrOfKeyWordInSignal); assert(theSCAN_TABREQ->next() != NULL); tSignal = theSCAN_TABREQ->next(); NdbApiSignal* last; do { KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend()); keyInfo->connectPtr = aTC_ConnectPtr; keyInfo->transId[0] = Uint32(transId); keyInfo->transId[1] = Uint32(transId >> 32); if (tp->sendSignal(tSignal,aProcessorId) == -1){ setErrorCode(4002); return -1; } tSignalCount++;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -