📄 ndbblob.cpp
字号:
setErrorCode(NdbBlobImpl::ErrState); DBUG_RETURN(-1); } if (theNullFlag) DBUG_RETURN(0); if (deleteParts(0, getPartCount()) == -1) DBUG_RETURN(-1); theNullFlag = true; theLength = 0; theHeadInlineUpdateFlag = true; DBUG_RETURN(0);}intNdbBlob::getLength(Uint64& len){ DBUG_ENTER("NdbBlob::getLength"); if (theState == Prepared && theSetFlag) { len = theGetSetBytes; DBUG_RETURN(0); } if (theNullFlag == -1) { setErrorCode(NdbBlobImpl::ErrState); DBUG_RETURN(-1); } len = theLength; DBUG_RETURN(0);}intNdbBlob::truncate(Uint64 length){ DBUG_ENTER("NdbBlob::truncate"); DBUG_PRINT("info", ("length=%llu", length)); if (theNullFlag == -1) { setErrorCode(NdbBlobImpl::ErrState); DBUG_RETURN(-1); } if (theLength > length) { if (length > theInlineSize) { Uint32 part1 = getPartNumber(length - 1); Uint32 part2 = getPartNumber(theLength - 1); assert(part2 >= part1); if (part2 > part1 && deleteParts(part1 + 1, part2 - part1) == -1) DBUG_RETURN(-1); } else { if (deleteParts(0, getPartCount()) == -1) DBUG_RETURN(-1); } theLength = length; theHeadInlineUpdateFlag = true; if (thePos > length) thePos = length; } DBUG_RETURN(0);}intNdbBlob::getPos(Uint64& pos){ DBUG_ENTER("NdbBlob::getPos"); if (theNullFlag == -1) { setErrorCode(NdbBlobImpl::ErrState); DBUG_RETURN(-1); } pos = thePos; DBUG_RETURN(0);}intNdbBlob::setPos(Uint64 pos){ DBUG_ENTER("NdbBlob::setPos"); DBUG_PRINT("info", ("pos=%llu", pos)); if (theNullFlag == -1) { setErrorCode(NdbBlobImpl::ErrState); DBUG_RETURN(-1); } if (pos > theLength) { setErrorCode(NdbBlobImpl::ErrSeek); DBUG_RETURN(-1); } thePos = pos; DBUG_RETURN(0);}// read/writeintNdbBlob::readData(void* data, Uint32& bytes){ if (theState != Active) { setErrorCode(NdbBlobImpl::ErrState); return -1; } char* buf = static_cast<char*>(data); return readDataPrivate(buf, bytes);}intNdbBlob::readDataPrivate(char* buf, Uint32& bytes){ DBUG_ENTER("NdbBlob::readDataPrivate"); DBUG_PRINT("info", ("bytes=%u", bytes)); assert(thePos <= theLength); Uint64 pos = thePos; if (bytes > theLength - pos) bytes = theLength - pos; Uint32 len = bytes; if (len > 0) { // inline part if (pos < theInlineSize) { Uint32 n = theInlineSize - pos; if (n > len) n = len; memcpy(buf, theInlineData + pos, n); pos += n; buf += n; len -= n; } } if (len > 0 && thePartSize == 0) { setErrorCode(NdbBlobImpl::ErrSeek); DBUG_RETURN(-1); } if (len > 0) { assert(pos >= theInlineSize); Uint32 off = (pos - theInlineSize) % thePartSize; // partial first block if (off != 0) { DBUG_PRINT("info", ("partial first block pos=%llu len=%u", pos, len)); Uint32 part = (pos - theInlineSize) / thePartSize; if (readParts(thePartBuf.data, part, 1) == -1) DBUG_RETURN(-1); // need result now if (executePendingBlobReads() == -1) DBUG_RETURN(-1); Uint32 n = thePartSize - off; if (n > len) n = len; memcpy(buf, thePartBuf.data + off, n); pos += n; buf += n; len -= n; } } if (len > 0) { assert((pos - theInlineSize) % thePartSize == 0); // complete blocks in the middle if (len >= thePartSize) { Uint32 part = (pos - theInlineSize) / thePartSize; Uint32 count = len / thePartSize; if (readParts(buf, part, count) == -1) DBUG_RETURN(-1); Uint32 n = thePartSize * count; pos += n; buf += n; len -= n; } } if (len > 0) { // partial last block DBUG_PRINT("info", ("partial last block pos=%llu len=%u", pos, len)); assert((pos - theInlineSize) % thePartSize == 0 && len < thePartSize); Uint32 part = (pos - theInlineSize) / thePartSize; if (readParts(thePartBuf.data, part, 1) == -1) DBUG_RETURN(-1); // need result now if (executePendingBlobReads() == -1) DBUG_RETURN(-1); memcpy(buf, thePartBuf.data, len); Uint32 n = len; pos += n; buf += n; len -= n; } assert(len == 0); thePos = pos; assert(thePos <= theLength); DBUG_RETURN(0);}intNdbBlob::writeData(const void* data, Uint32 bytes){ if (theState != Active) { setErrorCode(NdbBlobImpl::ErrState); return -1; } const char* buf = static_cast<const char*>(data); return writeDataPrivate(buf, bytes);}intNdbBlob::writeDataPrivate(const char* buf, Uint32 bytes){ DBUG_ENTER("NdbBlob::writeDataPrivate"); DBUG_PRINT("info", ("bytes=%u", bytes)); assert(thePos <= theLength); Uint64 pos = thePos; Uint32 len = bytes; // any write makes blob not NULL if (theNullFlag) { theNullFlag = false; theHeadInlineUpdateFlag = true; } if (len > 0) { // inline part if (pos < theInlineSize) { Uint32 n = theInlineSize - pos; if (n > len) n = len; memcpy(theInlineData + pos, buf, n); theHeadInlineUpdateFlag = true; pos += n; buf += n; len -= n; } } if (len > 0 && thePartSize == 0) { setErrorCode(NdbBlobImpl::ErrSeek); DBUG_RETURN(-1); } if (len > 0) { assert(pos >= theInlineSize); Uint32 off = (pos - theInlineSize) % thePartSize; // partial first block if (off != 0) { DBUG_PRINT("info", ("partial first block pos=%llu len=%u", pos, len)); // flush writes to guarantee correct read if (executePendingBlobWrites() == -1) DBUG_RETURN(-1); Uint32 part = (pos - theInlineSize) / thePartSize; if (readParts(thePartBuf.data, part, 1) == -1) DBUG_RETURN(-1); // need result now if (executePendingBlobReads() == -1) DBUG_RETURN(-1); Uint32 n = thePartSize - off; if (n > len) { memset(thePartBuf.data + off + len, theFillChar, n - len); n = len; } memcpy(thePartBuf.data + off, buf, n); if (updateParts(thePartBuf.data, part, 1) == -1) DBUG_RETURN(-1); pos += n; buf += n; len -= n; } } if (len > 0) { assert((pos - theInlineSize) % thePartSize == 0); // complete blocks in the middle if (len >= thePartSize) { Uint32 part = (pos - theInlineSize) / thePartSize; Uint32 count = len / thePartSize; for (unsigned i = 0; i < count; i++) { if (part + i < getPartCount()) { if (updateParts(buf, part + i, 1) == -1) DBUG_RETURN(-1); } else { if (insertParts(buf, part + i, 1) == -1) DBUG_RETURN(-1); } Uint32 n = thePartSize; pos += n; buf += n; len -= n; } } } if (len > 0) { // partial last block DBUG_PRINT("info", ("partial last block pos=%llu len=%u", pos, len)); assert((pos - theInlineSize) % thePartSize == 0 && len < thePartSize); Uint32 part = (pos - theInlineSize) / thePartSize; if (theLength > pos + len) { // flush writes to guarantee correct read if (executePendingBlobWrites() == -1) DBUG_RETURN(-1); if (readParts(thePartBuf.data, part, 1) == -1) DBUG_RETURN(-1); // need result now if (executePendingBlobReads() == -1) DBUG_RETURN(-1); memcpy(thePartBuf.data, buf, len); if (updateParts(thePartBuf.data, part, 1) == -1) DBUG_RETURN(-1); } else { memcpy(thePartBuf.data, buf, len); memset(thePartBuf.data + len, theFillChar, thePartSize - len); if (part < getPartCount()) { if (updateParts(thePartBuf.data, part, 1) == -1) DBUG_RETURN(-1); } else { if (insertParts(thePartBuf.data, part, 1) == -1) DBUG_RETURN(-1); } } Uint32 n = len; pos += n; buf += n; len -= n; } assert(len == 0); if (theLength < pos) { theLength = pos; theHeadInlineUpdateFlag = true; } thePos = pos; assert(thePos <= theLength); DBUG_RETURN(0);}intNdbBlob::readParts(char* buf, Uint32 part, Uint32 count){ DBUG_ENTER("NdbBlob::readParts"); DBUG_PRINT("info", ("part=%u count=%u", part, count)); Uint32 n = 0; while (n < count) { NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable); if (tOp == NULL || tOp->committedRead() == -1 || setPartKeyValue(tOp, part + n) == -1 || tOp->getValue((Uint32)3, buf) == NULL) { setErrorCode(tOp); DBUG_RETURN(-1); } tOp->m_abortOption = NdbTransaction::AbortOnError; buf += thePartSize; n++; thePendingBlobOps |= (1 << NdbOperation::ReadRequest); theNdbCon->thePendingBlobOps |= (1 << NdbOperation::ReadRequest); } DBUG_RETURN(0);}intNdbBlob::insertParts(const char* buf, Uint32 part, Uint32 count){ DBUG_ENTER("NdbBlob::insertParts"); DBUG_PRINT("info", ("part=%u count=%u", part, count)); Uint32 n = 0; while (n < count) { NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable); if (tOp == NULL || tOp->insertTuple() == -1 || setPartKeyValue(tOp, part + n) == -1 || tOp->setValue((Uint32)3, buf) == -1) { setErrorCode(tOp); DBUG_RETURN(-1); } tOp->m_abortOption = NdbTransaction::AbortOnError; buf += thePartSize; n++; thePendingBlobOps |= (1 << NdbOperation::InsertRequest); theNdbCon->thePendingBlobOps |= (1 << NdbOperation::InsertRequest); } DBUG_RETURN(0);}intNdbBlob::updateParts(const char* buf, Uint32 part, Uint32 count){ DBUG_ENTER("NdbBlob::updateParts"); DBUG_PRINT("info", ("part=%u count=%u", part, count)); Uint32 n = 0; while (n < count) { NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable); if (tOp == NULL || tOp->updateTuple() == -1 || setPartKeyValue(tOp, part + n) == -1 || tOp->setValue((Uint32)3, buf) == -1) { setErrorCode(tOp); DBUG_RETURN(-1); } tOp->m_abortOption = NdbTransaction::AbortOnError; buf += thePartSize; n++; thePendingBlobOps |= (1 << NdbOperation::UpdateRequest); theNdbCon->thePendingBlobOps |= (1 << NdbOperation::UpdateRequest); } DBUG_RETURN(0);}intNdbBlob::deleteParts(Uint32 part, Uint32 count){ DBUG_ENTER("NdbBlob::deleteParts"); DBUG_PRINT("info", ("part=%u count=%u", part, count)); Uint32 n = 0; while (n < count) { NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable); if (tOp == NULL || tOp->deleteTuple() == -1 || setPartKeyValue(tOp, part + n) == -1) { setErrorCode(tOp); DBUG_RETURN(-1); } tOp->m_abortOption = NdbTransaction::AbortOnError; n++; thePendingBlobOps |= (1 << NdbOperation::DeleteRequest); theNdbCon->thePendingBlobOps |= (1 << NdbOperation::DeleteRequest); } DBUG_RETURN(0);}/* * Number of blob parts not known. Used to check for race condition * when writeTuple is used for insert. Deletes all parts found. */intNdbBlob::deletePartsUnknown(Uint32 part){ DBUG_ENTER("NdbBlob::deletePartsUnknown"); DBUG_PRINT("info", ("part=%u count=all", part)); if (thePartSize == 0) // tinyblob DBUG_RETURN(0); static const unsigned maxbat = 256; static const unsigned minbat = 1; unsigned bat = minbat; NdbOperation* tOpList[maxbat]; Uint32 count = 0; while (true) { Uint32 n; n = 0; while (n < bat) { NdbOperation*& tOp = tOpList[n]; // ref tOp = theNdbCon->getNdbOperation(theBlobTable); if (tOp == NULL || tOp->deleteTuple() == -1 || setPartKeyValue(tOp, part + count + n) == -1) { setErrorCode(tOp); DBUG_RETURN(-1); } tOp->m_abortOption= NdbTransaction::AO_IgnoreError; n++; } DBUG_PRINT("info", ("bat=%u", bat)); if (theNdbCon->executeNoBlobs(NdbTransaction::NoCommit) == -1) DBUG_RETURN(-1); n = 0; while (n < bat) { NdbOperation* tOp = tOpList[n]; if (tOp->theError.code != 0) { if (tOp->theError.code != 626) { setErrorCode(tOp); DBUG_RETURN(-1); } // first non-existent part DBUG_PRINT("info", ("count=%u", count)); DBUG_RETURN(0); } n++; count++; } bat *= 4; if (bat > maxbat) bat = maxbat; }}// pending opsintNdbBlob::executePendingBlobReads(){ DBUG_ENTER("NdbBlob::executePendingBlobReads"); Uint8 flags = (1 << NdbOperation::ReadRequest); if (thePendingBlobOps & flags) { if (theNdbCon->executeNoBlobs(NdbTransaction::NoCommit) == -1) DBUG_RETURN(-1); thePendingBlobOps = 0; theNdbCon->thePendingBlobOps = 0; } DBUG_RETURN(0);}intNdbBlob::executePendingBlobWrites(){ DBUG_ENTER("NdbBlob::executePendingBlobWrites"); Uint8 flags = 0xFF & ~(1 << NdbOperation::ReadRequest); if (thePendingBlobOps & flags) { if (theNdbCon->executeNoBlobs(NdbTransaction::NoCommit) == -1) DBUG_RETURN(-1); thePendingBlobOps = 0; theNdbCon->thePendingBlobOps = 0; } DBUG_RETURN(0);}// callbacksintNdbBlob::invokeActiveHook(){ DBUG_ENTER("NdbBlob::invokeActiveHook"); assert(theState == Active && theActiveHook != NULL); int ret = (*theActiveHook)(this, theActiveHookArg); if (ret != 0) { // no error is set on blob level DBUG_RETURN(-1); } DBUG_RETURN(0);}// blob handle maintenance/* * Prepare blob handle linked to an operation. Checks blob table. * Allocates buffers. For key operation fetches key data from signal * data. For read operation adds read of head+inline. */intNdbBlob::atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl* aColumn){ DBUG_ENTER("NdbBlob::atPrepare"); DBUG_PRINT("info", ("this=%p op=%p con=%p", this, anOp, aCon)); assert(theState == Idle); // ndb api stuff theNdb = anOp->theNdb; theNdbCon = aCon; // for scan, this is the real transaction (m_transConnection) theNdbOp = anOp; theTable = anOp->m_currentTable; theAccessTable = anOp->m_accessTable; theColumn = aColumn; NdbDictionary::Column::Type partType = NdbDictionary::Column::Undefined; switch (theColumn->getType()) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -