📄 dbtuxscan.cpp
字号:
/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#define DBTUX_SCAN_CPP#include "Dbtux.hpp"#include <my_sys.h>voidDbtux::execACC_SCANREQ(Signal* signal){ jamEntry(); const AccScanReq reqCopy = *(const AccScanReq*)signal->getDataPtr(); const AccScanReq* const req = &reqCopy; ScanOpPtr scanPtr; scanPtr.i = RNIL; do { // get the index IndexPtr indexPtr; c_indexPool.getPtr(indexPtr, req->tableId); // get the fragment FragPtr fragPtr; fragPtr.i = RNIL; for (unsigned i = 0; i < indexPtr.p->m_numFrags; i++) { jam(); if (indexPtr.p->m_fragId[i] == req->fragmentNo << 1) { jam(); c_fragPool.getPtr(fragPtr, indexPtr.p->m_fragPtrI[i]); break; } } ndbrequire(fragPtr.i != RNIL); Frag& frag = *fragPtr.p; // must be normal DIH/TC fragment TreeHead& tree = frag.m_tree; // check for empty fragment if (tree.m_root == NullTupLoc) { jam(); AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend(); conf->scanPtr = req->senderData; conf->accPtr = RNIL; conf->flag = AccScanConf::ZEMPTY_FRAGMENT; sendSignal(req->senderRef, GSN_ACC_SCANCONF, signal, AccScanConf::SignalLength, JBB); return; } // seize from pool and link to per-fragment list if (! frag.m_scanList.seize(scanPtr)) { jam(); break; } new (scanPtr.p) ScanOp(c_scanBoundPool); scanPtr.p->m_state = ScanOp::First; scanPtr.p->m_userPtr = req->senderData; scanPtr.p->m_userRef = req->senderRef; scanPtr.p->m_tableId = indexPtr.p->m_tableId; scanPtr.p->m_indexId = indexPtr.i; scanPtr.p->m_fragId = fragPtr.p->m_fragId; scanPtr.p->m_fragPtrI = fragPtr.i; scanPtr.p->m_transId1 = req->transId1; scanPtr.p->m_transId2 = req->transId2; scanPtr.p->m_savePointId = req->savePointId; scanPtr.p->m_readCommitted = AccScanReq::getReadCommittedFlag(req->requestInfo); scanPtr.p->m_lockMode = AccScanReq::getLockMode(req->requestInfo); scanPtr.p->m_descending = AccScanReq::getDescendingFlag(req->requestInfo); /* * readCommitted lockMode keyInfo * 1 0 0 - read committed (no lock) * 0 0 0 - read latest (read lock) * 0 1 1 - read exclusive (write lock) */#ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "Seize scan " << scanPtr.i << " " << *scanPtr.p << endl; }#endif // conf AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend(); conf->scanPtr = req->senderData; conf->accPtr = scanPtr.i; conf->flag = AccScanConf::ZNOT_EMPTY_FRAGMENT; sendSignal(req->senderRef, GSN_ACC_SCANCONF, signal, AccScanConf::SignalLength, JBB); return; } while (0); if (scanPtr.i != RNIL) { jam(); releaseScanOp(scanPtr); } // LQH does not handle REF signal->theData[0] = 0x313; sendSignal(req->senderRef, GSN_ACC_SCANREF, signal, 1, JBB);}/* * Receive bounds for scan in single direct call. The bounds can arrive * in any order. Attribute ids are those of index table. * * Replace EQ by equivalent LE + GE. Check for conflicting bounds. * Check that sets of lower and upper bounds are on initial sequences of * keys and that all but possibly last bound is non-strict. * * Finally save the sets of lower and upper bounds (i.e. start key and * end key). Full bound type is included but only the strict bit is * used since lower and upper have now been separated. */voidDbtux::execTUX_BOUND_INFO(Signal* signal){ jamEntry(); // get records TuxBoundInfo* const sig = (TuxBoundInfo*)signal->getDataPtrSend(); const TuxBoundInfo* const req = (const TuxBoundInfo*)sig; ScanOp& scan = *c_scanOpPool.getPtr(req->tuxScanPtrI); const Index& index = *c_indexPool.getPtr(scan.m_indexId); const DescEnt& descEnt = getDescEnt(index.m_descPage, index.m_descOff); // collect normalized lower and upper bounds struct BoundInfo { int type2; // with EQ -> LE/GE Uint32 offset; // offset in xfrmData Uint32 size; }; BoundInfo boundInfo[2][MaxIndexAttributes]; const unsigned dstSize = 1024 * MAX_XFRM_MULTIPLY; Uint32 xfrmData[dstSize]; Uint32 dstPos = 0; // largest attrId seen plus one Uint32 maxAttrId[2] = { 0, 0 }; // walk through entries const Uint32* const data = (Uint32*)sig + TuxBoundInfo::SignalLength; Uint32 offset = 0; while (offset + 2 <= req->boundAiLength) { jam(); const unsigned type = data[offset]; const AttributeHeader* ah = (const AttributeHeader*)&data[offset + 1]; const Uint32 attrId = ah->getAttributeId(); const Uint32 dataSize = ah->getDataSize(); if (type > 4 || attrId >= index.m_numAttrs || dstPos + 2 + dataSize > dstSize) { jam(); scan.m_state = ScanOp::Invalid; sig->errorCode = TuxBoundInfo::InvalidAttrInfo; return; } // copy header xfrmData[dstPos + 0] = data[offset + 0]; xfrmData[dstPos + 1] = data[offset + 1]; // copy bound value Uint32 dstWords = 0; if (! ah->isNULL()) { jam(); const DescAttr& descAttr = descEnt.m_descAttr[attrId]; Uint32 srcBytes = AttributeDescriptor::getSizeInBytes(descAttr.m_attrDesc); Uint32 srcWords = (srcBytes + 3) / 4; if (srcWords != dataSize) { jam(); scan.m_state = ScanOp::Invalid; sig->errorCode = TuxBoundInfo::InvalidAttrInfo; return; } uchar* dstPtr = (uchar*)&xfrmData[dstPos + 2]; const uchar* srcPtr = (const uchar*)&data[offset + 2]; if (descAttr.m_charset == 0) { memcpy(dstPtr, srcPtr, srcWords << 2); dstWords = srcWords; } else { jam(); Uint32 typeId = descAttr.m_typeId; Uint32 lb, len; bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, srcBytes, lb, len); if (! ok) { jam(); scan.m_state = ScanOp::Invalid; sig->errorCode = TuxBoundInfo::InvalidCharFormat; return; } CHARSET_INFO* cs = all_charsets[descAttr.m_charset]; Uint32 xmul = cs->strxfrm_multiply; if (xmul == 0) xmul = 1; // see comment in DbtcMain.cpp Uint32 dstLen = xmul * (srcBytes - lb); if (dstLen > ((dstSize - dstPos) << 2)) { jam(); scan.m_state = ScanOp::Invalid; sig->errorCode = TuxBoundInfo::TooMuchAttrInfo; return; } int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len); ndbrequire(n != -1); while ((n & 3) != 0) { dstPtr[n++] = 0; } dstWords = n / 4; } } for (unsigned j = 0; j <= 1; j++) { jam(); // check if lower/upper bit matches const unsigned luBit = (j << 1); if ((type & 0x2) != luBit && type != 4) continue; // EQ -> LE, GE const unsigned type2 = (type & 0x1) | luBit; // fill in any gap while (maxAttrId[j] <= attrId) { jam(); BoundInfo& b = boundInfo[j][maxAttrId[j]++]; b.type2 = -1; } BoundInfo& b = boundInfo[j][attrId]; if (b.type2 != -1) { // compare with previously defined bound if (b.type2 != (int)type2 || b.size != 2 + dstWords || memcmp(&xfrmData[b.offset + 2], &xfrmData[dstPos + 2], dstWords << 2) != 0) { jam(); scan.m_state = ScanOp::Invalid; sig->errorCode = TuxBoundInfo::InvalidBounds; return; } } else { // fix length AttributeHeader* ah = (AttributeHeader*)&xfrmData[dstPos + 1]; ah->setDataSize(dstWords); // enter new bound jam(); b.type2 = type2; b.offset = dstPos; b.size = 2 + dstWords; } } // jump to next offset += 2 + dataSize; dstPos += 2 + dstWords; } if (offset != req->boundAiLength) { jam(); scan.m_state = ScanOp::Invalid; sig->errorCode = TuxBoundInfo::InvalidAttrInfo; return; } for (unsigned j = 0; j <= 1; j++) { // save lower/upper bound in index attribute id order for (unsigned i = 0; i < maxAttrId[j]; i++) { jam(); const BoundInfo& b = boundInfo[j][i]; // check for gap or strict bound before last if (b.type2 == -1 || (i + 1 < maxAttrId[j] && (b.type2 & 0x1))) { jam(); scan.m_state = ScanOp::Invalid; sig->errorCode = TuxBoundInfo::InvalidBounds; return; } bool ok = scan.m_bound[j]->append(&xfrmData[b.offset], b.size); if (! ok) { jam(); scan.m_state = ScanOp::Invalid; sig->errorCode = TuxBoundInfo::OutOfBuffers; return; } } scan.m_boundCnt[j] = maxAttrId[j]; } // no error sig->errorCode = 0;}voidDbtux::execNEXT_SCANREQ(Signal* signal){ jamEntry(); const NextScanReq reqCopy = *(const NextScanReq*)signal->getDataPtr(); const NextScanReq* const req = &reqCopy; ScanOpPtr scanPtr; scanPtr.i = req->accPtr; c_scanOpPool.getPtr(scanPtr); ScanOp& scan = *scanPtr.p; Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);#ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "NEXT_SCANREQ scan " << scanPtr.i << " " << scan << endl; }#endif // handle unlock previous and close scan switch (req->scanFlag) { case NextScanReq::ZSCAN_NEXT: jam(); break; case NextScanReq::ZSCAN_NEXT_COMMIT: jam(); case NextScanReq::ZSCAN_COMMIT: jam(); if (! scan.m_readCommitted) { jam(); AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); lockReq->returnCode = RNIL; lockReq->requestInfo = AccLockReq::Unlock; lockReq->accOpPtr = req->accOperationPtr; EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); jamEntry(); ndbrequire(lockReq->returnCode == AccLockReq::Success); removeAccLockOp(scan, req->accOperationPtr); } if (req->scanFlag == NextScanReq::ZSCAN_COMMIT) { jam(); NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend(); conf->scanPtr = scan.m_userPtr; unsigned signalLength = 1; sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF, signal, signalLength, JBB); return; } break; case NextScanReq::ZSCAN_CLOSE: jam(); // unlink from tree node first to avoid state changes if (scan.m_scanPos.m_loc != NullTupLoc) { jam(); const TupLoc loc = scan.m_scanPos.m_loc; NodeHandle node(frag); selectNode(node, loc); unlinkScan(node, scanPtr); scan.m_scanPos.m_loc = NullTupLoc; } if (scan.m_lockwait) { jam(); ndbrequire(scan.m_accLockOp != RNIL); // use ACC_ABORTCONF to flush out any reply in job buffer AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); lockReq->returnCode = RNIL; lockReq->requestInfo = AccLockReq::AbortWithConf; lockReq->accOpPtr = scan.m_accLockOp; EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); jamEntry(); ndbrequire(lockReq->returnCode == AccLockReq::Success);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -