📄 dbtupscan.cpp
字号:
/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#define DBTUP_C#include "Dbtup.hpp"#include <signaldata/AccScan.hpp>#include <signaldata/NextScan.hpp>#undef jam#undef jamEntry#define jam() { jamLine(32000 + __LINE__); }#define jamEntry() { jamEntryLine(32000 + __LINE__); }voidDbtup::execACC_SCANREQ(Signal* signal){ jamEntry(); const AccScanReq reqCopy = *(const AccScanReq*)signal->getDataPtr(); const AccScanReq* const req = &reqCopy; ScanOpPtr scanPtr; scanPtr.i = RNIL; do { // find table and fragments TablerecPtr tablePtr; tablePtr.i = req->tableId; ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec); FragrecordPtr fragPtr[2]; Uint32 fragId = req->fragmentNo << 1; fragPtr[0].i = fragPtr[1].i = RNIL; getFragmentrec(fragPtr[0], fragId | 0, tablePtr.p); getFragmentrec(fragPtr[1], fragId | 1, tablePtr.p); ndbrequire(fragPtr[0].i != RNIL && fragPtr[1].i != RNIL); Fragrecord& frag = *fragPtr[0].p; // seize from pool and link to per-fragment list if (! frag.m_scanList.seize(scanPtr)) { jam(); break; } new (scanPtr.p) ScanOp(); ScanOp& scan = *scanPtr.p; scan.m_state = ScanOp::First; scan.m_userPtr = req->senderData; scan.m_userRef = req->senderRef; scan.m_tableId = tablePtr.i; scan.m_fragId = frag.fragmentId; scan.m_fragPtrI[0] = fragPtr[0].i; scan.m_fragPtrI[1] = fragPtr[1].i; scan.m_transId1 = req->transId1; scan.m_transId2 = req->transId2; // conf AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend(); conf->scanPtr = req->senderData; conf->accPtr = scanPtr.i; conf->flag = AccScanConf::ZNOT_EMPTY_FRAGMENT; sendSignal(req->senderRef, GSN_ACC_SCANCONF, signal, AccScanConf::SignalLength, JBB); return; } while (0); if (scanPtr.i != RNIL) { jam(); releaseScanOp(scanPtr); } // LQH does not handle REF signal->theData[0] = 0x313; sendSignal(req->senderRef, GSN_ACC_SCANREF, signal, 1, JBB);}voidDbtup::execNEXT_SCANREQ(Signal* signal){ jamEntry(); const NextScanReq reqCopy = *(const NextScanReq*)signal->getDataPtr(); const NextScanReq* const req = &reqCopy; ScanOpPtr scanPtr; c_scanOpPool.getPtr(scanPtr, req->accPtr); ScanOp& scan = *scanPtr.p; FragrecordPtr fragPtr; fragPtr.i = scan.m_fragPtrI[0]; ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord); Fragrecord& frag = *fragPtr.p; switch (req->scanFlag) { case NextScanReq::ZSCAN_NEXT: jam(); break; case NextScanReq::ZSCAN_NEXT_COMMIT: jam(); break; case NextScanReq::ZSCAN_COMMIT: jam(); { NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend(); conf->scanPtr = scan.m_userPtr; unsigned signalLength = 1; sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF, signal, signalLength, JBB); return; } break; case NextScanReq::ZSCAN_CLOSE: jam(); scanClose(signal, scanPtr); return; case NextScanReq::ZSCAN_NEXT_ABORT: jam(); default: jam(); ndbrequire(false); break; } // start looking for next scan result AccCheckScan* checkReq = (AccCheckScan*)signal->getDataPtrSend(); checkReq->accPtr = scanPtr.i; checkReq->checkLcpStop = AccCheckScan::ZNOT_CHECK_LCP_STOP; EXECUTE_DIRECT(DBTUP, GSN_ACC_CHECK_SCAN, signal, AccCheckScan::SignalLength); jamEntry();}voidDbtup::execACC_CHECK_SCAN(Signal* signal){ jamEntry(); const AccCheckScan reqCopy = *(const AccCheckScan*)signal->getDataPtr(); const AccCheckScan* const req = &reqCopy; ScanOpPtr scanPtr; c_scanOpPool.getPtr(scanPtr, req->accPtr); ScanOp& scan = *scanPtr.p; FragrecordPtr fragPtr; fragPtr.i = scan.m_fragPtrI[0]; ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord); Fragrecord& frag = *fragPtr.p; if (req->checkLcpStop == AccCheckScan::ZCHECK_LCP_STOP) { jam(); signal->theData[0] = scan.m_userPtr; signal->theData[1] = true; EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2); jamEntry(); return; } if (scan.m_state == ScanOp::First) { jam(); scanFirst(signal, scanPtr); } if (scan.m_state == ScanOp::Next) { jam(); scanNext(signal, scanPtr); } if (scan.m_state == ScanOp::Locked) { jam(); const PagePos& pos = scan.m_scanPos; NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend(); conf->scanPtr = scan.m_userPtr; conf->accOperationPtr = (Uint32)-1; // no lock returned conf->fragId = frag.fragmentId | pos.m_fragBit; conf->localKey[0] = (pos.m_pageId << MAX_TUPLES_BITS) | (pos.m_tupleNo << 1); conf->localKey[1] = 0; conf->localKeyLength = 1; unsigned signalLength = 6; Uint32 blockNo = refToBlock(scan.m_userRef); EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, signalLength); jamEntry(); // next time look for next entry scan.m_state = ScanOp::Next; return; } if (scan.m_state == ScanOp::Last || scan.m_state == ScanOp::Invalid) { jam(); NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend(); conf->scanPtr = scan.m_userPtr; conf->accOperationPtr = RNIL; conf->fragId = RNIL; unsigned signalLength = 3; sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF, signal, signalLength, JBB); return; } ndbrequire(false);}voidDbtup::scanFirst(Signal* signal, ScanOpPtr scanPtr){ ScanOp& scan = *scanPtr.p; // set to first fragment, first page, first tuple PagePos& pos = scan.m_scanPos; pos.m_fragId = scan.m_fragId; pos.m_fragBit = 0; pos.m_pageId = 0; pos.m_tupleNo = 0; // just before pos.m_match = false; // let scanNext() do the work scan.m_state = ScanOp::Next;}// TODO optimize this + index buildvoidDbtup::scanNext(Signal* signal, ScanOpPtr scanPtr){ ScanOp& scan = *scanPtr.p; PagePos& pos = scan.m_scanPos; TablerecPtr tablePtr; tablePtr.i = scan.m_tableId; ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec); while (true) { // TODO time-slice here after X loops jam(); // get fragment if (pos.m_fragBit == 2) { jam(); scan.m_state = ScanOp::Last; break; } ndbrequire(pos.m_fragBit <= 1); FragrecordPtr fragPtr; fragPtr.i = scan.m_fragPtrI[pos.m_fragBit]; ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord); Fragrecord& frag = *fragPtr.p; // get page PagePtr pagePtr; if (pos.m_pageId >= frag.noOfPages) { jam(); pos.m_fragBit++; pos.m_pageId = 0; pos.m_tupleNo = 0; pos.m_match = false; continue; } Uint32 realPageId = getRealpid(fragPtr.p, pos.m_pageId); pagePtr.i = realPageId; ptrCheckGuard(pagePtr, cnoOfPage, page); const Uint32 pageState = pagePtr.p->pageWord[ZPAGE_STATE_POS]; if (pageState != ZTH_MM_FREE && pageState != ZTH_MM_FULL) { jam(); pos.m_pageId++; pos.m_tupleNo = 0; pos.m_match = false; continue; } // get next tuple if (pos.m_match) pos.m_tupleNo++; pos.m_match = true; const Uint32 tupheadsize = tablePtr.p->tupheadsize; Uint32 pageOffset = ZPAGE_HEADER_SIZE + pos.m_tupleNo * tupheadsize; if (pageOffset + tupheadsize > ZWORDS_ON_PAGE) { jam(); pos.m_pageId++; pos.m_tupleNo = 0; pos.m_match = false; continue; } // skip over free tuple bool isFree = false; if (pageState == ZTH_MM_FREE) { jam(); if ((pagePtr.p->pageWord[pageOffset] >> 16) == tupheadsize) { Uint32 nextTuple = pagePtr.p->pageWord[ZFREELIST_HEADER_POS] >> 16; while (nextTuple != 0) { jam(); if (nextTuple == pageOffset) { jam(); isFree = true; break; } nextTuple = pagePtr.p->pageWord[nextTuple] & 0xffff; } } } if (isFree) { jam(); continue; } // TODO check for operation and return latest in own tx scan.m_state = ScanOp::Locked; break; }}voidDbtup::scanClose(Signal* signal, ScanOpPtr scanPtr){ NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend(); conf->scanPtr = scanPtr.p->m_userPtr; conf->accOperationPtr = RNIL; conf->fragId = RNIL; unsigned signalLength = 3; sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF, signal, signalLength, JBB); releaseScanOp(scanPtr);}voidDbtup::releaseScanOp(ScanOpPtr& scanPtr){ FragrecordPtr fragPtr; fragPtr.i = scanPtr.p->m_fragPtrI[0]; ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord); fragPtr.p->m_scanList.release(scanPtr);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -