⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ndbscanoperation.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
📖 第 1 页 / 共 4 页
字号:
#ifdef VM_TRACE    default:      abort();#endif    }  }}intNdbIndexScanOperation::compare(Uint32 skip, Uint32 cols, 			       const NdbReceiver* t1, 			       const NdbReceiver* t2){  NdbRecAttr * r1 = t1->m_rows[t1->m_current_row];  NdbRecAttr * r2 = t2->m_rows[t2->m_current_row];  r1 = (skip ? r1->next() : r1);  r2 = (skip ? r2->next() : r2);  const int jdir = 1 - 2 * (int)m_descending;  assert(jdir == 1 || jdir == -1);  while(cols > 0){    Uint32 * d1 = (Uint32*)r1->aRef();    Uint32 * d2 = (Uint32*)r2->aRef();    unsigned r1_null = r1->isNULL();    if((r1_null ^ (unsigned)r2->isNULL())){      return (r1_null ? -1 : 1) * jdir;    }    const NdbColumnImpl & col = NdbColumnImpl::getImpl(* r1->m_column);    Uint32 len = r1->theAttrSize * r1->theArraySize;    if(!r1_null){      const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(col.m_type);      int r = (*sqlType.m_cmp)(col.m_cs, d1, len, d2, len, true);      if(r){	assert(r != NdbSqlUtil::CmpUnknown);	return r * jdir;      }    }    cols--;    r1 = r1->next();    r2 = r2->next();  }  return 0;}intNdbIndexScanOperation::next_result_ordered(bool fetchAllowed,					   bool forceSend){    m_curr_row = 0;  Uint32 u_idx = 0, u_last = 0;  Uint32 s_idx   = m_current_api_receiver; // first sorted  Uint32 s_last  = theParallelism;         // last sorted  NdbReceiver** arr = m_api_receivers;  NdbReceiver* tRec = arr[s_idx];    if(DEBUG_NEXT_RESULT) ndbout_c("nextOrderedResult(%d) nextResult: %d",				 fetchAllowed, 				 (s_idx < s_last ? tRec->nextResult() : 0));    if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]", 				 u_idx, u_last,				 s_idx, s_last);    bool fetchNeeded = (s_idx == s_last) || !tRec->nextResult();    if(fetchNeeded){    if(fetchAllowed){      if(DEBUG_NEXT_RESULT) ndbout_c("performing fetch...");      TransporterFacade* tp = TransporterFacade::instance();      Guard guard(tp->theMutexPtr);      if(theError.code)	return -1;      Uint32 seq = theNdbCon->theNodeSequence;      Uint32 nodeId = theNdbCon->theDBnode;      if(seq == tp->getNodeSequence(nodeId) &&	 !send_next_scan_ordered(s_idx, forceSend)){	Uint32 tmp = m_sent_receivers_count;	s_idx = m_current_api_receiver; 	while(m_sent_receivers_count > 0 && !theError.code){	  theNdb->theImpl->theWaiter.m_node = nodeId;	  theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;	  int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);	  if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {	    continue;	  }	  if(DEBUG_NEXT_RESULT) ndbout_c("return -1");	  setErrorCode(4028);	  return -1;	}		if(theError.code){	  setErrorCode(theError.code);	  if(DEBUG_NEXT_RESULT) ndbout_c("return -1");	  return -1;	}		u_idx = 0;	u_last = m_conf_receivers_count;	m_conf_receivers_count = 0;	memcpy(arr, m_conf_receivers, u_last * sizeof(char*));		if(DEBUG_NEXT_RESULT) ndbout_c("sent: %d recv: %d", tmp, u_last);      } else {	setErrorCode(4028);	return -1;      }    } else {      if(DEBUG_NEXT_RESULT) ndbout_c("return 2");      return 2;    }  } else {    u_idx = s_idx;    u_last = s_idx + 1;    s_idx++;  }    if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]", 				 u_idx, u_last,				 s_idx, s_last);  Uint32 cols = m_sort_columns + m_read_range_no;  Uint32 skip = m_keyInfo;  while(u_idx < u_last){    u_last--;    tRec = arr[u_last];        // Do binary search instead to find place    Uint32 place = s_idx;    for(; place < s_last; place++){      if(compare(skip, cols, tRec, arr[place]) <= 0){	break;      }    }        if(place != s_idx){      if(DEBUG_NEXT_RESULT) 	ndbout_c("memmove(%d, %d, %d)", s_idx-1, s_idx, (place - s_idx));      memmove(arr+s_idx-1, arr+s_idx, sizeof(char*)*(place - s_idx));    }        if(DEBUG_NEXT_RESULT) ndbout_c("putting %d @ %d", u_last, place - 1);    m_api_receivers[place-1] = tRec;    s_idx--;  }  if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]", 				 u_idx, u_last,				 s_idx, s_last);    m_current_api_receiver = s_idx;    if(DEBUG_NEXT_RESULT)    for(Uint32 i = s_idx; i<s_last; i++)      ndbout_c("%p", arr[i]);    tRec = m_api_receivers[s_idx];      if(s_idx < s_last && tRec->nextResult()){    m_curr_row = tRec->copyout(theReceiver);          if(DEBUG_NEXT_RESULT) ndbout_c("return 0");    return 0;  }  theError.code = -1;  if(DEBUG_NEXT_RESULT) ndbout_c("return 1");  return 1;}intNdbIndexScanOperation::send_next_scan_ordered(Uint32 idx, bool forceSend){    if(idx == theParallelism)    return 0;    NdbReceiver* tRec = m_api_receivers[idx];  NdbApiSignal tSignal(theNdb->theMyRef);  tSignal.setSignal(GSN_SCAN_NEXTREQ);    Uint32 last = m_sent_receivers_count;  Uint32* theData = tSignal.getDataPtrSend();  Uint32* prep_array = theData + 4;    m_current_api_receiver = idx + 1;  if((prep_array[0] = tRec->m_tcPtrI) == RNIL)  {    if(DEBUG_NEXT_RESULT)      ndbout_c("receiver completed, don't send");    return 0;  }    theData[0] = theNdbCon->theTCConPtr;  theData[1] = 0;  Uint64 transId = theNdbCon->theTransactionId;  theData[2] = transId;  theData[3] = (Uint32) (transId >> 32);    /**   * Prepare ops   */  m_sent_receivers[last] = tRec;  tRec->m_list_index = last;  tRec->prepareSend();  m_sent_receivers_count = last + 1;    Uint32 nodeId = theNdbCon->theDBnode;  TransporterFacade * tp = TransporterFacade::instance();  tSignal.setLength(4+1);  int ret= tp->sendSignal(&tSignal, nodeId);  if (!ret) checkForceSend(forceSend);  return ret;}intNdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){  Uint32 seq = theNdbCon->theNodeSequence;  Uint32 nodeId = theNdbCon->theDBnode;    if(seq != tp->getNodeSequence(nodeId))  {    theNdbCon->theReleaseOnClose = true;    return -1;  }    /**   * Wait for outstanding   */  while(theError.code == 0 && m_sent_receivers_count)  {    theNdb->theImpl->theWaiter.m_node = nodeId;    theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;    int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);    switch(return_code){    case 0:      break;    case -1:      setErrorCode(4008);    case -2:      m_api_receivers_count = 0;      m_conf_receivers_count = 0;      m_sent_receivers_count = 0;      theNdbCon->theReleaseOnClose = true;      return -1;    }  }  if(theError.code)  {    m_api_receivers_count = 0;    m_current_api_receiver = m_ordered ? theParallelism : 0;  }  /**   * move all conf'ed into api   *   so that send_next_scan can check if they needs to be closed   */  Uint32 api = m_api_receivers_count;  Uint32 conf = m_conf_receivers_count;  if(m_ordered)  {    /**     * Ordered scan, keep the m_api_receivers "to the right"     */    memmove(m_api_receivers, m_api_receivers+m_current_api_receiver, 	    (theParallelism - m_current_api_receiver) * sizeof(char*));    api = (theParallelism - m_current_api_receiver);    m_api_receivers_count = api;  }    if(DEBUG_NEXT_RESULT)    ndbout_c("close_impl: [order api conf sent curr parr] %d %d %d %d %d %d",	     m_ordered, api, conf, 	     m_sent_receivers_count, m_current_api_receiver, theParallelism);    if(api+conf)  {    /**     * There's something to close     *   setup m_api_receivers (for send_next_scan)     */    memcpy(m_api_receivers+api, m_conf_receivers, conf * sizeof(char*));    m_api_receivers_count = api + conf;    m_conf_receivers_count = 0;  }    // Send close scan  if(send_next_scan(api+conf, true, forceSend) == -1)  {    theNdbCon->theReleaseOnClose = true;    return -1;  }    /**   * wait for close scan conf   */  while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count)  {    theNdb->theImpl->theWaiter.m_node = nodeId;    theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;    int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);    switch(return_code){    case 0:      break;    case -1:      setErrorCode(4008);    case -2:      m_api_receivers_count = 0;      m_conf_receivers_count = 0;      m_sent_receivers_count = 0;      theNdbCon->theReleaseOnClose = true;      return -1;    }  }    return 0;}voidNdbScanOperation::reset_receivers(Uint32 parallell, Uint32 ordered){  for(Uint32 i = 0; i<parallell; i++){    m_receivers[i]->m_list_index = i;    m_prepared_receivers[i] = m_receivers[i]->getId();    m_sent_receivers[i] = m_receivers[i];    m_conf_receivers[i] = 0;    m_api_receivers[i] = 0;    m_receivers[i]->prepareSend();  }    m_api_receivers_count = 0;  m_current_api_receiver = 0;  m_sent_receivers_count = 0;  m_conf_receivers_count = 0;}intNdbScanOperation::restart(bool forceSend){    TransporterFacade* tp = TransporterFacade::instance();  Guard guard(tp->theMutexPtr);  Uint32 nodeId = theNdbCon->theDBnode;    {    int res;    if((res= close_impl(tp, forceSend)))    {      return res;    }  }    /**   * Reset receivers   */  reset_receivers(theParallelism, m_ordered);    theError.code = 0;  if (doSendScan(nodeId) == -1)    return -1;    return 0;}intNdbIndexScanOperation::reset_bounds(bool forceSend){  int res;    {    TransporterFacade* tp = TransporterFacade::instance();    Guard guard(tp->theMutexPtr);    res= close_impl(tp, forceSend);  }  if(!res)  {    theError.code = 0;    reset_receivers(theParallelism, m_ordered);        theLastKEYINFO = theSCAN_TABREQ->next();    theKEYINFOptr = ((KeyInfo*)theLastKEYINFO->getDataPtrSend())->keyData;    theTupKeyLen = 0;    theTotalNrOfKeyWordInSignal = 0;    theNoOfTupKeyLeft = m_accessTable->m_noOfDistributionKeys;    theDistrKeyIndicator_ = 0;    m_this_bound_start = 0;    m_first_bound_word = theKEYINFOptr;    m_transConnection      ->remove_list((NdbOperation*&)m_transConnection->m_firstExecutedScanOp,		    this);    m_transConnection->define_scan_op(this);    return 0;  }  return res;}intNdbIndexScanOperation::end_of_bound(Uint32 no){  if(no < (1 << 13)) // Only 12-bits no of ranges  {    Uint32 bound_head = * m_first_bound_word;    bound_head |= (theTupKeyLen - m_this_bound_start) << 16 | (no << 4);    * m_first_bound_word = bound_head;        m_first_bound_word = theKEYINFOptr + theTotalNrOfKeyWordInSignal;;    m_this_bound_start = theTupKeyLen;    return 0;  }  return -1;}intNdbIndexScanOperation::get_range_no(){  NdbRecAttr* tRecAttr = m_curr_row;  if(m_read_range_no && tRecAttr)  {    if(m_keyInfo)      tRecAttr = tRecAttr->next();    Uint32 ret = *(Uint32*)tRecAttr->aRef();    return ret;  }  return -1;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -