⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ndbscanoperation.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
📖 第 1 页 / 共 4 页
字号:
/* Copyright (C) 2003 MySQL AB   This program is free software; you can redistribute it and/or modify   it under the terms of the GNU General Public License as published by   the Free Software Foundation; either version 2 of the License, or   (at your option) any later version.   This program is distributed in the hope that it will be useful,   but WITHOUT ANY WARRANTY; without even the implied warranty of   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the   GNU General Public License for more details.   You should have received a copy of the GNU General Public License   along with this program; if not, write to the Free Software   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */#include <ndb_global.h>#include <Ndb.hpp>#include <NdbScanOperation.hpp>#include <NdbIndexScanOperation.hpp>#include <NdbTransaction.hpp>#include "NdbApiSignal.hpp"#include <NdbOut.hpp>#include "NdbDictionaryImpl.hpp"#include <NdbBlob.hpp>#include <NdbRecAttr.hpp>#include <NdbReceiver.hpp>#include <stdlib.h>#include <NdbSqlUtil.hpp>#include <signaldata/ScanTab.hpp>#include <signaldata/KeyInfo.hpp>#include <signaldata/AttrInfo.hpp>#include <signaldata/TcKeyReq.hpp>#define DEBUG_NEXT_RESULT 0NdbScanOperation::NdbScanOperation(Ndb* aNdb, NdbOperation::Type aType) :  NdbOperation(aNdb, aType),  m_transConnection(NULL){  theParallelism = 0;  m_allocated_receivers = 0;  m_prepared_receivers = 0;  m_api_receivers = 0;  m_conf_receivers = 0;  m_sent_receivers = 0;  m_receivers = 0;  m_array = new Uint32[1]; // skip if on delete in fix_receivers  theSCAN_TABREQ = 0;}NdbScanOperation::~NdbScanOperation(){  for(Uint32 i = 0; i<m_allocated_receivers; i++){    m_receivers[i]->release();    theNdb->releaseNdbScanRec(m_receivers[i]);  }  delete[] m_array;}voidNdbScanOperation::setErrorCode(int aErrorCode){  NdbTransaction* tmp = theNdbCon;  theNdbCon = m_transConnection;  NdbOperation::setErrorCode(aErrorCode);  theNdbCon = tmp;}voidNdbScanOperation::setErrorCodeAbort(int aErrorCode){  NdbTransaction* tmp = theNdbCon;  theNdbCon = m_transConnection;  NdbOperation::setErrorCodeAbort(aErrorCode);  theNdbCon = tmp;}  /***************************************************************************** * int init(); * * Return Value:  Return 0 : init was successful. *                Return -1: In all other case.   * Remark:        Initiates operation record after allocation. *****************************************************************************/intNdbScanOperation::init(const NdbTableImpl* tab, NdbTransaction* myConnection){  m_transConnection = myConnection;  //NdbConnection* aScanConnection = theNdb->startTransaction(myConnection);  theNdb->theRemainingStartTransactions++; // will be checked in hupp...  NdbTransaction* aScanConnection = theNdb->hupp(myConnection);  if (!aScanConnection){    theNdb->theRemainingStartTransactions--;    setErrorCodeAbort(theNdb->getNdbError().code);    return -1;  }  // NOTE! The hupped trans becomes the owner of the operation  if(NdbOperation::init(tab, aScanConnection) != 0){    theNdb->theRemainingStartTransactions--;    return -1;  }    initInterpreter();    theStatus = GetValue;  theOperationType = OpenScanRequest;  theNdbCon->theMagicNumber = 0xFE11DF;  theNoOfTupKeyLeft = tab->m_noOfDistributionKeys;  m_read_range_no = 0;  return 0;}int NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,			     Uint32 scan_flags, 			     Uint32 parallel){  m_ordered = m_descending = false;  Uint32 fragCount = m_currentTable->m_fragmentCount;  if (parallel > fragCount || parallel == 0) {     parallel = fragCount;  }  // It is only possible to call openScan if   //  1. this transcation don't already  contain another scan operation  //  2. this transaction don't already contain other operations  //  3. theScanOp contains a NdbScanOperation  if (theNdbCon->theScanningOp != NULL){    setErrorCode(4605);    return -1;  }  theNdbCon->theScanningOp = this;  theLockMode = lm;  bool lockExcl, lockHoldMode, readCommitted;  switch(lm){  case NdbScanOperation::LM_Read:    lockExcl = false;    lockHoldMode = true;    readCommitted = false;    break;  case NdbScanOperation::LM_Exclusive:    lockExcl = true;    lockHoldMode = true;    readCommitted = false;    break;  case NdbScanOperation::LM_CommittedRead:    lockExcl = false;    lockHoldMode = false;    readCommitted = true;    break;  default:    setErrorCode(4003);    return -1;  }  m_keyInfo = lockExcl ? 1 : 0;  bool rangeScan = false;  if (m_accessTable->m_indexType == NdbDictionary::Index::OrderedIndex)  {    if (m_currentTable == m_accessTable){      // Old way of scanning indexes, should not be allowed      m_currentTable = theNdb->theDictionary->	getTable(m_currentTable->m_primaryTable.c_str());      assert(m_currentTable != NULL);    }    assert (m_currentTable != m_accessTable);    // Modify operation state    theStatus = GetValue;    theOperationType  = OpenRangeScanRequest;    rangeScan = true;  }  bool tupScan = (scan_flags & SF_TupScan);  if (tupScan && rangeScan)    tupScan = false;    theParallelism = parallel;  if(fix_receivers(parallel) == -1){    setErrorCodeAbort(4000);    return -1;  }    theSCAN_TABREQ = (!theSCAN_TABREQ ? theNdb->getSignal() : theSCAN_TABREQ);  if (theSCAN_TABREQ == NULL) {    setErrorCodeAbort(4000);    return -1;  }//if    theSCAN_TABREQ->setSignal(GSN_SCAN_TABREQ);  ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());  req->apiConnectPtr = theNdbCon->theTCConPtr;  req->tableId = m_accessTable->m_tableId;  req->tableSchemaVersion = m_accessTable->m_version;  req->storedProcId = 0xFFFF;  req->buddyConPtr = theNdbCon->theBuddyConPtr;    Uint32 reqInfo = 0;  ScanTabReq::setParallelism(reqInfo, parallel);  ScanTabReq::setScanBatch(reqInfo, 0);  ScanTabReq::setLockMode(reqInfo, lockExcl);  ScanTabReq::setHoldLockFlag(reqInfo, lockHoldMode);  ScanTabReq::setReadCommittedFlag(reqInfo, readCommitted);  ScanTabReq::setRangeScanFlag(reqInfo, rangeScan);  ScanTabReq::setTupScanFlag(reqInfo, tupScan);  req->requestInfo = reqInfo;  Uint64 transId = theNdbCon->getTransactionId();  req->transId1 = (Uint32) transId;  req->transId2 = (Uint32) (transId >> 32);  NdbApiSignal* tSignal = theSCAN_TABREQ->next();  if(!tSignal)  {    theSCAN_TABREQ->next(tSignal = theNdb->getSignal());  }  theLastKEYINFO = tSignal;    tSignal->setSignal(GSN_KEYINFO);  theKEYINFOptr = ((KeyInfo*)tSignal->getDataPtrSend())->keyData;  theTotalNrOfKeyWordInSignal= 0;  getFirstATTRINFOScan();  return 0;}intNdbScanOperation::fix_receivers(Uint32 parallel){  assert(parallel > 0);  if(parallel > m_allocated_receivers){    const Uint32 sz = parallel * (4*sizeof(char*)+sizeof(Uint32));    Uint64 * tmp = new Uint64[(sz+7)/8];    // Save old receivers    memcpy(tmp, m_receivers, m_allocated_receivers*sizeof(char*));    delete[] m_array;    m_array = (Uint32*)tmp;        m_receivers = (NdbReceiver**)tmp;    m_api_receivers = m_receivers + parallel;    m_conf_receivers = m_api_receivers + parallel;    m_sent_receivers = m_conf_receivers + parallel;    m_prepared_receivers = (Uint32*)(m_sent_receivers + parallel);    // Only get/init "new" receivers    NdbReceiver* tScanRec;    for (Uint32 i = m_allocated_receivers; i < parallel; i ++) {      tScanRec = theNdb->getNdbScanRec();      if (tScanRec == NULL) {	setErrorCodeAbort(4000);	return -1;      }//if      m_receivers[i] = tScanRec;      tScanRec->init(NdbReceiver::NDB_SCANRECEIVER, this);    }    m_allocated_receivers = parallel;  }    reset_receivers(parallel, 0);  return 0;}/** * Move receiver from send array to conf:ed array */voidNdbScanOperation::receiver_delivered(NdbReceiver* tRec){  if(theError.code == 0){    if(DEBUG_NEXT_RESULT)      ndbout_c("receiver_delivered");        Uint32 idx = tRec->m_list_index;    Uint32 last = m_sent_receivers_count - 1;    if(idx != last){      NdbReceiver * move = m_sent_receivers[last];      m_sent_receivers[idx] = move;      move->m_list_index = idx;    }    m_sent_receivers_count = last;        last = m_conf_receivers_count;    m_conf_receivers[last] = tRec;    m_conf_receivers_count = last + 1;    tRec->m_list_index = last;    tRec->m_current_row = 0;  }}/** * Remove receiver as it's completed */voidNdbScanOperation::receiver_completed(NdbReceiver* tRec){  if(theError.code == 0){    if(DEBUG_NEXT_RESULT)      ndbout_c("receiver_completed");        Uint32 idx = tRec->m_list_index;    Uint32 last = m_sent_receivers_count - 1;    if(idx != last){      NdbReceiver * move = m_sent_receivers[last];      m_sent_receivers[idx] = move;      move->m_list_index = idx;    }    m_sent_receivers_count = last;  }}/***************************************************************************** * int getFirstATTRINFOScan( U_int32 aData ) * * Return Value:  Return 0:   Successful *      	  Return -1:  All other cases * Parameters:    None: 	   Only allocate the first signal. * Remark:        When a scan is defined we need to use this method instead  *                of insertATTRINFO for the first signal.  *                This is because we need not to mess up the code in  *                insertATTRINFO with if statements since we are not  *                interested in the TCKEYREQ signal. *****************************************************************************/intNdbScanOperation::getFirstATTRINFOScan(){  NdbApiSignal* tSignal;  tSignal = theNdb->getSignal();  if (tSignal == NULL){    setErrorCodeAbort(4000);          return -1;      }  tSignal->setSignal(m_attrInfoGSN);  theAI_LenInCurrAI = 8;  theATTRINFOptr = &tSignal->getDataPtrSend()[8];  theFirstATTRINFO = tSignal;  theCurrentATTRINFO = tSignal;  theCurrentATTRINFO->next(NULL);  return 0;}/** * Constats for theTupleKeyDefined[][0] */#define SETBOUND_EQ 1#define FAKE_PTR 2#define API_PTR 3#define WAITFOR_SCAN_TIMEOUT 120000intNdbScanOperation::executeCursor(int nodeId){  NdbTransaction * tCon = theNdbCon;  TransporterFacade* tp = TransporterFacade::instance();  Guard guard(tp->theMutexPtr);  Uint32 magic = tCon->theMagicNumber;  Uint32 seq = tCon->theNodeSequence;  if (tp->get_node_alive(nodeId) &&      (tp->getNodeSequence(nodeId) == seq)) {    /**     * Only call prepareSendScan first time (incase of restarts)     *   - check with theMagicNumber     */    tCon->theMagicNumber = 0x37412619;    if(magic != 0x37412619 &&        prepareSendScan(tCon->theTCConPtr, tCon->theTransactionId) == -1)      return -1;            if (doSendScan(nodeId) == -1)      return -1;    return 0;  } else {    if (!(tp->get_node_stopping(nodeId) &&	  (tp->getNodeSequence(nodeId) == seq))){      TRACE_DEBUG("The node is hard dead when attempting to start a scan");      setErrorCode(4029);      tCon->theReleaseOnClose = true;    } else {      TRACE_DEBUG("The node is stopping when attempting to start a scan");      setErrorCode(4030);    }//if    tCon->theCommitStatus = NdbTransaction::Aborted;  }//if  return -1;}int NdbScanOperation::nextResult(bool fetchAllowed, bool forceSend){  int res;  if ((res = nextResultImpl(fetchAllowed, forceSend)) == 0) {    // handle blobs    NdbBlob* tBlob = theBlobList;    while (tBlob != 0) {      if (tBlob->atNextResult() == -1)        return -1;      tBlob = tBlob->theNext;    }    /*     * Flush blob part ops on behalf of user because     * - nextResult is analogous to execute(NoCommit)     * - user is likely to want blob value before next execute     */    if (m_transConnection->executePendingBlobOps() == -1)      return -1;    return 0;  }  return res;}int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend){  if(m_ordered)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -