📄 ndbscanoperation.cpp
字号:
/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#include <ndb_global.h>#include <Ndb.hpp>#include <NdbScanOperation.hpp>#include <NdbIndexScanOperation.hpp>#include <NdbTransaction.hpp>#include "NdbApiSignal.hpp"#include <NdbOut.hpp>#include "NdbDictionaryImpl.hpp"#include <NdbBlob.hpp>#include <NdbRecAttr.hpp>#include <NdbReceiver.hpp>#include <stdlib.h>#include <NdbSqlUtil.hpp>#include <signaldata/ScanTab.hpp>#include <signaldata/KeyInfo.hpp>#include <signaldata/AttrInfo.hpp>#include <signaldata/TcKeyReq.hpp>#define DEBUG_NEXT_RESULT 0NdbScanOperation::NdbScanOperation(Ndb* aNdb, NdbOperation::Type aType) : NdbOperation(aNdb, aType), m_transConnection(NULL){ theParallelism = 0; m_allocated_receivers = 0; m_prepared_receivers = 0; m_api_receivers = 0; m_conf_receivers = 0; m_sent_receivers = 0; m_receivers = 0; m_array = new Uint32[1]; // skip if on delete in fix_receivers theSCAN_TABREQ = 0;}NdbScanOperation::~NdbScanOperation(){ for(Uint32 i = 0; i<m_allocated_receivers; i++){ m_receivers[i]->release(); theNdb->releaseNdbScanRec(m_receivers[i]); } delete[] m_array;}voidNdbScanOperation::setErrorCode(int aErrorCode){ NdbTransaction* tmp = theNdbCon; theNdbCon = m_transConnection; NdbOperation::setErrorCode(aErrorCode); theNdbCon = tmp;}voidNdbScanOperation::setErrorCodeAbort(int aErrorCode){ NdbTransaction* tmp = theNdbCon; theNdbCon = m_transConnection; NdbOperation::setErrorCodeAbort(aErrorCode); theNdbCon = tmp;} /***************************************************************************** * int init(); * * Return Value: Return 0 : init was successful. * Return -1: In all other case. * Remark: Initiates operation record after allocation. *****************************************************************************/intNdbScanOperation::init(const NdbTableImpl* tab, NdbTransaction* myConnection){ m_transConnection = myConnection; //NdbConnection* aScanConnection = theNdb->startTransaction(myConnection); theNdb->theRemainingStartTransactions++; // will be checked in hupp... NdbTransaction* aScanConnection = theNdb->hupp(myConnection); if (!aScanConnection){ theNdb->theRemainingStartTransactions--; setErrorCodeAbort(theNdb->getNdbError().code); return -1; } // NOTE! The hupped trans becomes the owner of the operation if(NdbOperation::init(tab, aScanConnection) != 0){ theNdb->theRemainingStartTransactions--; return -1; } initInterpreter(); theStatus = GetValue; theOperationType = OpenScanRequest; theNdbCon->theMagicNumber = 0xFE11DF; theNoOfTupKeyLeft = tab->m_noOfDistributionKeys; m_read_range_no = 0; return 0;}int NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, Uint32 scan_flags, Uint32 parallel){ m_ordered = m_descending = false; Uint32 fragCount = m_currentTable->m_fragmentCount; if (parallel > fragCount || parallel == 0) { parallel = fragCount; } // It is only possible to call openScan if // 1. this transcation don't already contain another scan operation // 2. this transaction don't already contain other operations // 3. theScanOp contains a NdbScanOperation if (theNdbCon->theScanningOp != NULL){ setErrorCode(4605); return -1; } theNdbCon->theScanningOp = this; theLockMode = lm; bool lockExcl, lockHoldMode, readCommitted; switch(lm){ case NdbScanOperation::LM_Read: lockExcl = false; lockHoldMode = true; readCommitted = false; break; case NdbScanOperation::LM_Exclusive: lockExcl = true; lockHoldMode = true; readCommitted = false; break; case NdbScanOperation::LM_CommittedRead: lockExcl = false; lockHoldMode = false; readCommitted = true; break; default: setErrorCode(4003); return -1; } m_keyInfo = lockExcl ? 1 : 0; bool rangeScan = false; if (m_accessTable->m_indexType == NdbDictionary::Index::OrderedIndex) { if (m_currentTable == m_accessTable){ // Old way of scanning indexes, should not be allowed m_currentTable = theNdb->theDictionary-> getTable(m_currentTable->m_primaryTable.c_str()); assert(m_currentTable != NULL); } assert (m_currentTable != m_accessTable); // Modify operation state theStatus = GetValue; theOperationType = OpenRangeScanRequest; rangeScan = true; } bool tupScan = (scan_flags & SF_TupScan); if (tupScan && rangeScan) tupScan = false; theParallelism = parallel; if(fix_receivers(parallel) == -1){ setErrorCodeAbort(4000); return -1; } theSCAN_TABREQ = (!theSCAN_TABREQ ? theNdb->getSignal() : theSCAN_TABREQ); if (theSCAN_TABREQ == NULL) { setErrorCodeAbort(4000); return -1; }//if theSCAN_TABREQ->setSignal(GSN_SCAN_TABREQ); ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend()); req->apiConnectPtr = theNdbCon->theTCConPtr; req->tableId = m_accessTable->m_tableId; req->tableSchemaVersion = m_accessTable->m_version; req->storedProcId = 0xFFFF; req->buddyConPtr = theNdbCon->theBuddyConPtr; Uint32 reqInfo = 0; ScanTabReq::setParallelism(reqInfo, parallel); ScanTabReq::setScanBatch(reqInfo, 0); ScanTabReq::setLockMode(reqInfo, lockExcl); ScanTabReq::setHoldLockFlag(reqInfo, lockHoldMode); ScanTabReq::setReadCommittedFlag(reqInfo, readCommitted); ScanTabReq::setRangeScanFlag(reqInfo, rangeScan); ScanTabReq::setTupScanFlag(reqInfo, tupScan); req->requestInfo = reqInfo; Uint64 transId = theNdbCon->getTransactionId(); req->transId1 = (Uint32) transId; req->transId2 = (Uint32) (transId >> 32); NdbApiSignal* tSignal = theSCAN_TABREQ->next(); if(!tSignal) { theSCAN_TABREQ->next(tSignal = theNdb->getSignal()); } theLastKEYINFO = tSignal; tSignal->setSignal(GSN_KEYINFO); theKEYINFOptr = ((KeyInfo*)tSignal->getDataPtrSend())->keyData; theTotalNrOfKeyWordInSignal= 0; getFirstATTRINFOScan(); return 0;}intNdbScanOperation::fix_receivers(Uint32 parallel){ assert(parallel > 0); if(parallel > m_allocated_receivers){ const Uint32 sz = parallel * (4*sizeof(char*)+sizeof(Uint32)); Uint64 * tmp = new Uint64[(sz+7)/8]; // Save old receivers memcpy(tmp, m_receivers, m_allocated_receivers*sizeof(char*)); delete[] m_array; m_array = (Uint32*)tmp; m_receivers = (NdbReceiver**)tmp; m_api_receivers = m_receivers + parallel; m_conf_receivers = m_api_receivers + parallel; m_sent_receivers = m_conf_receivers + parallel; m_prepared_receivers = (Uint32*)(m_sent_receivers + parallel); // Only get/init "new" receivers NdbReceiver* tScanRec; for (Uint32 i = m_allocated_receivers; i < parallel; i ++) { tScanRec = theNdb->getNdbScanRec(); if (tScanRec == NULL) { setErrorCodeAbort(4000); return -1; }//if m_receivers[i] = tScanRec; tScanRec->init(NdbReceiver::NDB_SCANRECEIVER, this); } m_allocated_receivers = parallel; } reset_receivers(parallel, 0); return 0;}/** * Move receiver from send array to conf:ed array */voidNdbScanOperation::receiver_delivered(NdbReceiver* tRec){ if(theError.code == 0){ if(DEBUG_NEXT_RESULT) ndbout_c("receiver_delivered"); Uint32 idx = tRec->m_list_index; Uint32 last = m_sent_receivers_count - 1; if(idx != last){ NdbReceiver * move = m_sent_receivers[last]; m_sent_receivers[idx] = move; move->m_list_index = idx; } m_sent_receivers_count = last; last = m_conf_receivers_count; m_conf_receivers[last] = tRec; m_conf_receivers_count = last + 1; tRec->m_list_index = last; tRec->m_current_row = 0; }}/** * Remove receiver as it's completed */voidNdbScanOperation::receiver_completed(NdbReceiver* tRec){ if(theError.code == 0){ if(DEBUG_NEXT_RESULT) ndbout_c("receiver_completed"); Uint32 idx = tRec->m_list_index; Uint32 last = m_sent_receivers_count - 1; if(idx != last){ NdbReceiver * move = m_sent_receivers[last]; m_sent_receivers[idx] = move; move->m_list_index = idx; } m_sent_receivers_count = last; }}/***************************************************************************** * int getFirstATTRINFOScan( U_int32 aData ) * * Return Value: Return 0: Successful * Return -1: All other cases * Parameters: None: Only allocate the first signal. * Remark: When a scan is defined we need to use this method instead * of insertATTRINFO for the first signal. * This is because we need not to mess up the code in * insertATTRINFO with if statements since we are not * interested in the TCKEYREQ signal. *****************************************************************************/intNdbScanOperation::getFirstATTRINFOScan(){ NdbApiSignal* tSignal; tSignal = theNdb->getSignal(); if (tSignal == NULL){ setErrorCodeAbort(4000); return -1; } tSignal->setSignal(m_attrInfoGSN); theAI_LenInCurrAI = 8; theATTRINFOptr = &tSignal->getDataPtrSend()[8]; theFirstATTRINFO = tSignal; theCurrentATTRINFO = tSignal; theCurrentATTRINFO->next(NULL); return 0;}/** * Constats for theTupleKeyDefined[][0] */#define SETBOUND_EQ 1#define FAKE_PTR 2#define API_PTR 3#define WAITFOR_SCAN_TIMEOUT 120000intNdbScanOperation::executeCursor(int nodeId){ NdbTransaction * tCon = theNdbCon; TransporterFacade* tp = TransporterFacade::instance(); Guard guard(tp->theMutexPtr); Uint32 magic = tCon->theMagicNumber; Uint32 seq = tCon->theNodeSequence; if (tp->get_node_alive(nodeId) && (tp->getNodeSequence(nodeId) == seq)) { /** * Only call prepareSendScan first time (incase of restarts) * - check with theMagicNumber */ tCon->theMagicNumber = 0x37412619; if(magic != 0x37412619 && prepareSendScan(tCon->theTCConPtr, tCon->theTransactionId) == -1) return -1; if (doSendScan(nodeId) == -1) return -1; return 0; } else { if (!(tp->get_node_stopping(nodeId) && (tp->getNodeSequence(nodeId) == seq))){ TRACE_DEBUG("The node is hard dead when attempting to start a scan"); setErrorCode(4029); tCon->theReleaseOnClose = true; } else { TRACE_DEBUG("The node is stopping when attempting to start a scan"); setErrorCode(4030); }//if tCon->theCommitStatus = NdbTransaction::Aborted; }//if return -1;}int NdbScanOperation::nextResult(bool fetchAllowed, bool forceSend){ int res; if ((res = nextResultImpl(fetchAllowed, forceSend)) == 0) { // handle blobs NdbBlob* tBlob = theBlobList; while (tBlob != 0) { if (tBlob->atNextResult() == -1) return -1; tBlob = tBlob->theNext; } /* * Flush blob part ops on behalf of user because * - nextResult is analogous to execute(NoCommit) * - user is likely to want blob value before next execute */ if (m_transConnection->executePendingBlobOps() == -1) return -1; return 0; } return res;}int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend){ if(m_ordered)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -