⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ndbscanoperation.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
📖 第 1 页 / 共 4 页
字号:
    return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed,							       forceSend);    /**   * Check current receiver   */  int retVal = 2;  Uint32 idx = m_current_api_receiver;  Uint32 last = m_api_receivers_count;  m_curr_row = 0;  if(DEBUG_NEXT_RESULT)    ndbout_c("nextResult(%d) idx=%d last=%d", fetchAllowed, idx, last);    /**   * Check next buckets   */  for(; idx < last; idx++){    NdbReceiver* tRec = m_api_receivers[idx];    if(tRec->nextResult()){      m_curr_row = tRec->copyout(theReceiver);      retVal = 0;      break;    }  }      /**   * We have advanced atleast one bucket   */  if(!fetchAllowed || !retVal){    m_current_api_receiver = idx;    if(DEBUG_NEXT_RESULT) ndbout_c("return %d", retVal);    return retVal;  }    Uint32 nodeId = theNdbCon->theDBnode;  TransporterFacade* tp = TransporterFacade::instance();  Guard guard(tp->theMutexPtr);  if(theError.code)    return -1;  Uint32 seq = theNdbCon->theNodeSequence;  if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false,							  forceSend) == 0){          idx = m_current_api_receiver;    last = m_api_receivers_count;          do {      if(theError.code){	setErrorCode(theError.code);	if(DEBUG_NEXT_RESULT) ndbout_c("return -1");	return -1;      }            Uint32 cnt = m_conf_receivers_count;      Uint32 sent = m_sent_receivers_count;      if(DEBUG_NEXT_RESULT)	ndbout_c("idx=%d last=%d cnt=%d sent=%d", idx, last, cnt, sent);	      if(cnt > 0){	/**	 * Just move completed receivers	 */	memcpy(m_api_receivers+last, m_conf_receivers, cnt * sizeof(char*));	last += cnt;	m_conf_receivers_count = 0;      } else if(retVal == 2 && sent > 0){	/**	 * No completed...	 */	theNdb->theImpl->theWaiter.m_node = nodeId;	theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;	int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);	if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {	  continue;	} else {	  idx = last;	  retVal = -2; //return_code;	}      } else if(retVal == 2){	/**	 * No completed & no sent -> EndOfData	 */	theError.code = -1; // make sure user gets error if he tries again	if(DEBUG_NEXT_RESULT) ndbout_c("return 1");	return 1;      }	      if(retVal == 0)	break;	      for(; idx < last; idx++){	NdbReceiver* tRec = m_api_receivers[idx];	if(tRec->nextResult()){	  m_curr_row = tRec->copyout(theReceiver);      	  retVal = 0;	  break;	}      }    } while(retVal == 2);  } else {    retVal = -3;  }      m_api_receivers_count = last;  m_current_api_receiver = idx;      switch(retVal){  case 0:  case 1:  case 2:    if(DEBUG_NEXT_RESULT) ndbout_c("return %d", retVal);    return retVal;  case -1:    setErrorCode(4008); // Timeout    break;  case -2:    setErrorCode(4028); // Node fail    break;  case -3: // send_next_scan -> return fail (set error-code self)    if(theError.code == 0)      setErrorCode(4028); // seq changed = Node fail    break;  }      theNdbCon->theTransactionIsStarted = false;  theNdbCon->theReleaseOnClose = true;  if(DEBUG_NEXT_RESULT) ndbout_c("return -1", retVal);  return -1;}intNdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag,				 bool forceSend){    if(cnt > 0){    NdbApiSignal tSignal(theNdb->theMyRef);    tSignal.setSignal(GSN_SCAN_NEXTREQ);        Uint32* theData = tSignal.getDataPtrSend();    theData[0] = theNdbCon->theTCConPtr;    theData[1] = stopScanFlag == true ? 1 : 0;    Uint64 transId = theNdbCon->theTransactionId;    theData[2] = transId;    theData[3] = (Uint32) (transId >> 32);        /**     * Prepare ops     */    Uint32 last = m_sent_receivers_count;    Uint32 * prep_array = (cnt > 21 ? m_prepared_receivers : theData + 4);    Uint32 sent = 0;    for(Uint32 i = 0; i<cnt; i++){      NdbReceiver * tRec = m_api_receivers[i];      if((prep_array[sent] = tRec->m_tcPtrI) != RNIL)      {	m_sent_receivers[last+sent] = tRec;	tRec->m_list_index = last+sent;	tRec->prepareSend();	sent++;      }    }    memmove(m_api_receivers, m_api_receivers+cnt, 	    (theParallelism-cnt) * sizeof(char*));        int ret = 0;    if(sent)    {      Uint32 nodeId = theNdbCon->theDBnode;      TransporterFacade * tp = TransporterFacade::instance();      if(cnt > 21){	tSignal.setLength(4);	LinearSectionPtr ptr[3];	ptr[0].p = prep_array;	ptr[0].sz = sent;	ret = tp->sendSignal(&tSignal, nodeId, ptr, 1);      } else {	tSignal.setLength(4+sent);	ret = tp->sendSignal(&tSignal, nodeId);      }    }        if (!ret) checkForceSend(forceSend);    m_sent_receivers_count = last + sent;    m_api_receivers_count -= cnt;    m_current_api_receiver = 0;        return ret;  }  return 0;}void NdbScanOperation::checkForceSend(bool forceSend){  if (forceSend) {    TransporterFacade::instance()->forceSend(theNdb->theNdbBlockNumber);  } else {    TransporterFacade::instance()->checkForceSend(theNdb->theNdbBlockNumber);  }//if}int NdbScanOperation::prepareSend(Uint32  TC_ConnectPtr, Uint64  TransactionId){  printf("NdbScanOperation::prepareSend\n");  abort();  return 0;}int NdbScanOperation::doSend(int ProcessorId){  printf("NdbScanOperation::doSend\n");  return 0;}void NdbScanOperation::close(bool forceSend, bool releaseOp){  DBUG_ENTER("NdbScanOperation::close");  DBUG_PRINT("enter", ("this=%x tcon=%x con=%x force=%d release=%d",                       (UintPtr)this,                       (UintPtr)m_transConnection, (UintPtr)theNdbCon,                       forceSend, releaseOp));  if(m_transConnection){    if(DEBUG_NEXT_RESULT)      ndbout_c("close() theError.code = %d "	       "m_api_receivers_count = %d "	       "m_conf_receivers_count = %d "	       "m_sent_receivers_count = %d",	       theError.code, 	       m_api_receivers_count,	       m_conf_receivers_count,	       m_sent_receivers_count);        TransporterFacade* tp = TransporterFacade::instance();    Guard guard(tp->theMutexPtr);    close_impl(tp, forceSend);      }  NdbConnection* tCon = theNdbCon;  NdbConnection* tTransCon = m_transConnection;  theNdbCon = NULL;  m_transConnection = NULL;  if (releaseOp && tTransCon) {    NdbIndexScanOperation* tOp = (NdbIndexScanOperation*)this;    tTransCon->releaseExecutedScanOperation(tOp);  }    tCon->theScanningOp = 0;  theNdb->closeTransaction(tCon);  theNdb->theRemainingStartTransactions--;  DBUG_VOID_RETURN;}voidNdbScanOperation::execCLOSE_SCAN_REP(){  m_conf_receivers_count = 0;  m_sent_receivers_count = 0;}void NdbScanOperation::release(){  if(theNdbCon != 0 || m_transConnection != 0){    close();  }  for(Uint32 i = 0; i<m_allocated_receivers; i++){    m_receivers[i]->release();  }  NdbOperation::release();    if(theSCAN_TABREQ)  {    theNdb->releaseSignal(theSCAN_TABREQ);    theSCAN_TABREQ = 0;  }}/***************************************************************************int prepareSendScan(Uint32 aTC_ConnectPtr,                    Uint64 aTransactionId)Return Value:   Return 0 : preparation of send was succesful.                Return -1: In all other case.   Parameters:     aTC_ConnectPtr: the Connect pointer to TC.		aTransactionId:	the Transaction identity of the transaction.Remark:         Puts the the final data into ATTRINFO signal(s)  after this                 we know the how many signal to send and their sizes***************************************************************************/int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr,				      Uint64 aTransactionId){  if (theInterpretIndicator != 1 ||      (theOperationType != OpenScanRequest &&       theOperationType != OpenRangeScanRequest)) {    setErrorCodeAbort(4005);    return -1;  }  theErrorLine = 0;  // In preapareSendInterpreted we set the sizes (word 4-8) in the  // first ATTRINFO signal.  if (prepareSendInterpreted() == -1)    return -1;    if(m_ordered){    ((NdbIndexScanOperation*)this)->fix_get_values();  }    theCurrentATTRINFO->setLength(theAI_LenInCurrAI);  /**   * Prepare all receivers   */  theReceiver.prepareSend();  bool keyInfo = m_keyInfo;  Uint32 key_size = keyInfo ? m_currentTable->m_keyLenInWords : 0;  /**   * The number of records sent by each LQH is calculated and the kernel   * is informed of this number by updating the SCAN_TABREQ signal   */  Uint32 batch_size, batch_byte_size, first_batch_size;  theReceiver.calculate_batch_size(key_size,                                   theParallelism,                                   batch_size,                                   batch_byte_size,                                   first_batch_size);  ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());  ScanTabReq::setScanBatch(req->requestInfo, batch_size);  req->batch_byte_size= batch_byte_size;  req->first_batch_size= first_batch_size;  /**   * Set keyinfo flag   *  (Always keyinfo when using blobs)   */  Uint32 reqInfo = req->requestInfo;  ScanTabReq::setKeyinfoFlag(reqInfo, keyInfo);  req->requestInfo = reqInfo;    for(Uint32 i = 0; i<theParallelism; i++){    m_receivers[i]->do_get_value(&theReceiver, batch_size, 				 key_size, 				 m_read_range_no);  }  return 0;}/*****************************************************************************int doSend()Return Value:   Return >0 : send was succesful, returns number of signals sent                Return -1: In all other case.   Parameters:     aProcessorId: Receiving processor nodeRemark:         Sends the ATTRINFO signal(s)*****************************************************************************/intNdbScanOperation::doSendScan(int aProcessorId){  Uint32 tSignalCount = 0;  NdbApiSignal* tSignal;   if (theInterpretIndicator != 1 ||      (theOperationType != OpenScanRequest &&       theOperationType != OpenRangeScanRequest)) {      setErrorCodeAbort(4005);      return -1;  }    assert(theSCAN_TABREQ != NULL);  tSignal = theSCAN_TABREQ;    Uint32 tupKeyLen = theTupKeyLen;  Uint32 len = theTotalNrOfKeyWordInSignal;  Uint32 aTC_ConnectPtr = theNdbCon->theTCConPtr;  Uint64 transId = theNdbCon->theTransactionId;    // Update the "attribute info length in words" in SCAN_TABREQ before   // sending it. This could not be done in openScan because   // we created the ATTRINFO signals after the SCAN_TABREQ signal.  ScanTabReq * const req = CAST_PTR(ScanTabReq, tSignal->getDataPtrSend());  req->attrLenKeyLen = (tupKeyLen << 16) | theTotalCurrAI_Len;  Uint32 tmp = req->requestInfo;  ScanTabReq::setDistributionKeyFlag(tmp, theDistrKeyIndicator_);  req->distributionKey = theDistributionKey;  req->requestInfo = tmp;  tSignal->setLength(ScanTabReq::StaticLength + theDistrKeyIndicator_);  TransporterFacade *tp = TransporterFacade::instance();  LinearSectionPtr ptr[3];  ptr[0].p = m_prepared_receivers;  ptr[0].sz = theParallelism;  if (tp->sendSignal(tSignal, aProcessorId, ptr, 1) == -1) {    setErrorCode(4002);    return -1;  }   if (tupKeyLen > 0){    // must have at least one signal since it contains attrLen for bounds    assert(theLastKEYINFO != NULL);    tSignal = theLastKEYINFO;    tSignal->setLength(KeyInfo::HeaderLength + theTotalNrOfKeyWordInSignal);        assert(theSCAN_TABREQ->next() != NULL);    tSignal = theSCAN_TABREQ->next();        NdbApiSignal* last;    do {      KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend());      keyInfo->connectPtr = aTC_ConnectPtr;      keyInfo->transId[0] = Uint32(transId);      keyInfo->transId[1] = Uint32(transId >> 32);            if (tp->sendSignal(tSignal,aProcessorId) == -1){	setErrorCode(4002);	return -1;      }            tSignalCount++;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -