📄 dbtuxscan.cpp
字号:
scan.m_state = ScanOp::Aborting; return; } if (scan.m_state == ScanOp::Locked) { jam(); ndbrequire(scan.m_accLockOp != RNIL); AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); lockReq->returnCode = RNIL; lockReq->requestInfo = AccLockReq::Unlock; lockReq->accOpPtr = scan.m_accLockOp; EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); jamEntry(); ndbrequire(lockReq->returnCode == AccLockReq::Success); scan.m_accLockOp = RNIL; } scan.m_state = ScanOp::Aborting; scanClose(signal, scanPtr); return; case NextScanReq::ZSCAN_NEXT_ABORT: jam(); default: jam(); ndbrequire(false); break; } // start looking for next scan result AccCheckScan* checkReq = (AccCheckScan*)signal->getDataPtrSend(); checkReq->accPtr = scanPtr.i; checkReq->checkLcpStop = AccCheckScan::ZNOT_CHECK_LCP_STOP; EXECUTE_DIRECT(DBTUX, GSN_ACC_CHECK_SCAN, signal, AccCheckScan::SignalLength); jamEntry();}voidDbtux::execACC_CHECK_SCAN(Signal* signal){ jamEntry(); const AccCheckScan reqCopy = *(const AccCheckScan*)signal->getDataPtr(); const AccCheckScan* const req = &reqCopy; ScanOpPtr scanPtr; scanPtr.i = req->accPtr; c_scanOpPool.getPtr(scanPtr); ScanOp& scan = *scanPtr.p; Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);#ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "ACC_CHECK_SCAN scan " << scanPtr.i << " " << scan << endl; }#endif if (req->checkLcpStop == AccCheckScan::ZCHECK_LCP_STOP) { jam(); signal->theData[0] = scan.m_userPtr; signal->theData[1] = true; EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2); jamEntry(); return; // stop } if (scan.m_lockwait) { jam(); // LQH asks if we are waiting for lock and we tell it to ask again const TreeEnt ent = scan.m_scanEnt; NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend(); conf->scanPtr = scan.m_userPtr; conf->accOperationPtr = RNIL; // no tuple returned conf->fragId = frag.m_fragId | ent.m_fragBit; unsigned signalLength = 3; // if TC has ordered scan close, it will be detected here sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF, signal, signalLength, JBB); return; // stop } if (scan.m_state == ScanOp::First) { jam(); // search is done only once in single range scan scanFirst(scanPtr);#ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "First scan " << scanPtr.i << " " << scan << endl; }#endif } if (scan.m_state == ScanOp::Next) { jam(); // look for next scanNext(scanPtr, false); } // for reading tuple key in Current or Locked state Data pkData = c_dataBuffer; unsigned pkSize = 0; // indicates not yet done if (scan.m_state == ScanOp::Current) { // found an entry to return jam(); ndbrequire(scan.m_accLockOp == RNIL); if (! scan.m_readCommitted) { jam(); const TreeEnt ent = scan.m_scanEnt; // read tuple key readTablePk(frag, ent, pkData, pkSize); // get read lock or exclusive lock AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); lockReq->returnCode = RNIL; lockReq->requestInfo = scan.m_lockMode == 0 ? AccLockReq::LockShared : AccLockReq::LockExclusive; lockReq->accOpPtr = RNIL; lockReq->userPtr = scanPtr.i; lockReq->userRef = reference(); lockReq->tableId = scan.m_tableId; lockReq->fragId = frag.m_fragId | ent.m_fragBit; lockReq->fragPtrI = frag.m_accTableFragPtrI[ent.m_fragBit]; const Uint32* const buf32 = static_cast<Uint32*>(pkData); const Uint64* const buf64 = reinterpret_cast<const Uint64*>(buf32); lockReq->hashValue = md5_hash(buf64, pkSize); lockReq->tupAddr = getTupAddr(frag, ent); lockReq->transId1 = scan.m_transId1; lockReq->transId2 = scan.m_transId2; // execute EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::LockSignalLength); jamEntry(); switch (lockReq->returnCode) { case AccLockReq::Success: jam(); scan.m_state = ScanOp::Locked; scan.m_accLockOp = lockReq->accOpPtr;#ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "Lock immediate scan " << scanPtr.i << " " << scan << endl; }#endif break; case AccLockReq::IsBlocked: jam(); // normal lock wait scan.m_state = ScanOp::Blocked; scan.m_lockwait = true; scan.m_accLockOp = lockReq->accOpPtr;#ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "Lock wait scan " << scanPtr.i << " " << scan << endl; }#endif // LQH will wake us up signal->theData[0] = scan.m_userPtr; signal->theData[1] = true; EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2); jamEntry(); return; // stop break; case AccLockReq::Refused: jam(); // we cannot see deleted tuple (assert only) ndbassert(false); // skip it scan.m_state = ScanOp::Next; signal->theData[0] = scan.m_userPtr; signal->theData[1] = true; EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2); jamEntry(); return; // stop break; case AccLockReq::NoFreeOp: jam(); // max ops should depend on max scans (assert only) ndbassert(false); // stay in Current state scan.m_state = ScanOp::Current; signal->theData[0] = scan.m_userPtr; signal->theData[1] = true; EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2); jamEntry(); return; // stop break; default: ndbrequire(false); break; } } else { scan.m_state = ScanOp::Locked; } } if (scan.m_state == ScanOp::Locked) { // we have lock or do not need one jam(); // read keys if not already done (uses signal) const TreeEnt ent = scan.m_scanEnt; // conf signal NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend(); conf->scanPtr = scan.m_userPtr; // the lock is passed to LQH Uint32 accLockOp = scan.m_accLockOp; if (accLockOp != RNIL) { scan.m_accLockOp = RNIL; // remember it until LQH unlocks it addAccLockOp(scan, accLockOp); } else { ndbrequire(scan.m_readCommitted); // operation RNIL in LQH would signal no tuple returned accLockOp = (Uint32)-1; } conf->accOperationPtr = accLockOp; conf->fragId = frag.m_fragId | ent.m_fragBit; conf->localKey[0] = getTupAddr(frag, ent); conf->localKey[1] = 0; conf->localKeyLength = 1; unsigned signalLength = 6; // add key info if (! scan.m_readCommitted) { sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF, signal, signalLength, JBB); } else { Uint32 blockNo = refToBlock(scan.m_userRef); EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, signalLength); } // next time look for next entry scan.m_state = ScanOp::Next; return; } // XXX in ACC this is checked before req->checkLcpStop if (scan.m_state == ScanOp::Last || scan.m_state == ScanOp::Invalid) { jam(); NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend(); conf->scanPtr = scan.m_userPtr; conf->accOperationPtr = RNIL; conf->fragId = RNIL; unsigned signalLength = 3; sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF, signal, signalLength, JBB); return; } ndbrequire(false);}/* * Lock succeeded (after delay) in ACC. If the lock is for current * entry, set state to Locked. If the lock is for an entry we were * moved away from, simply unlock it. Finally, if we are closing the * scan, do nothing since we have already sent an abort request. */voidDbtux::execACCKEYCONF(Signal* signal){ jamEntry(); ScanOpPtr scanPtr; scanPtr.i = signal->theData[0]; c_scanOpPool.getPtr(scanPtr); ScanOp& scan = *scanPtr.p;#ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "Lock obtained scan " << scanPtr.i << " " << scan << endl; }#endif ndbrequire(scan.m_lockwait && scan.m_accLockOp != RNIL); scan.m_lockwait = false; if (scan.m_state == ScanOp::Blocked) { // the lock wait was for current entry jam(); scan.m_state = ScanOp::Locked; // LQH has the ball return; } if (scan.m_state != ScanOp::Aborting) { // we were moved, release lock jam(); AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); lockReq->returnCode = RNIL; lockReq->requestInfo = AccLockReq::Unlock; lockReq->accOpPtr = scan.m_accLockOp; EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); jamEntry(); ndbrequire(lockReq->returnCode == AccLockReq::Success); scan.m_accLockOp = RNIL; // LQH has the ball return; } // lose the lock scan.m_accLockOp = RNIL; // continue at ACC_ABORTCONF}/* * Lock failed (after delay) in ACC. Probably means somebody ahead of * us in lock queue deleted the tuple. */voidDbtux::execACCKEYREF(Signal* signal){ jamEntry(); ScanOpPtr scanPtr; scanPtr.i = signal->theData[0]; c_scanOpPool.getPtr(scanPtr); ScanOp& scan = *scanPtr.p;#ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "Lock refused scan " << scanPtr.i << " " << scan << endl; }#endif ndbrequire(scan.m_lockwait && scan.m_accLockOp != RNIL); scan.m_lockwait = false; if (scan.m_state != ScanOp::Aborting) { jam(); // release the operation AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); lockReq->returnCode = RNIL; lockReq->requestInfo = AccLockReq::Abort; lockReq->accOpPtr = scan.m_accLockOp; EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); jamEntry(); ndbrequire(lockReq->returnCode == AccLockReq::Success); scan.m_accLockOp = RNIL; // scan position should already have been moved (assert only) if (scan.m_state == ScanOp::Blocked) { jam(); ndbassert(false); scan.m_state = ScanOp::Next; } // LQH has the ball return; } // lose the lock scan.m_accLockOp = RNIL; // continue at ACC_ABORTCONF}/* * Received when scan is closing. This signal arrives after any * ACCKEYCON or ACCKEYREF which may have been in job buffer. */voidDbtux::execACC_ABORTCONF(Signal* signal){ jamEntry(); ScanOpPtr scanPtr; scanPtr.i = signal->theData[0]; c_scanOpPool.getPtr(scanPtr); ScanOp& scan = *scanPtr.p;#ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "ACC_ABORTCONF scan " << scanPtr.i << " " << scan << endl; }#endif ndbrequire(scan.m_state == ScanOp::Aborting); // most likely we are still in lock wait if (scan.m_lockwait) { jam(); scan.m_lockwait = false; scan.m_accLockOp = RNIL; } scanClose(signal, scanPtr);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -