📄 ndbblob.cpp
字号:
case NdbDictionary::Column::Blob: partType = NdbDictionary::Column::Binary; theFillChar = 0x0; break; case NdbDictionary::Column::Text: partType = NdbDictionary::Column::Char; theFillChar = 0x20; break; default: setErrorCode(NdbBlobImpl::ErrUsage); DBUG_RETURN(-1); } // sizes theInlineSize = theColumn->getInlineSize(); thePartSize = theColumn->getPartSize(); theStripeSize = theColumn->getStripeSize(); // sanity check assert((NDB_BLOB_HEAD_SIZE << 2) == sizeof(Head)); assert(theColumn->m_attrSize * theColumn->m_arraySize == sizeof(Head) + theInlineSize); if (thePartSize > 0) { const NdbDictionary::Table* bt = NULL; const NdbDictionary::Column* bc = NULL; if (theStripeSize == 0 || (bt = theColumn->getBlobTable()) == NULL || (bc = bt->getColumn("DATA")) == NULL || bc->getType() != partType || bc->getLength() != (int)thePartSize) { setErrorCode(NdbBlobImpl::ErrTable); DBUG_RETURN(-1); } theBlobTable = &NdbTableImpl::getImpl(*bt); } // buffers theKeyBuf.alloc(theTable->m_keyLenInWords << 2); theAccessKeyBuf.alloc(theAccessTable->m_keyLenInWords << 2); theHeadInlineBuf.alloc(sizeof(Head) + theInlineSize); theHeadInlineCopyBuf.alloc(sizeof(Head) + theInlineSize); thePartBuf.alloc(thePartSize); theHead = (Head*)theHeadInlineBuf.data; theInlineData = theHeadInlineBuf.data + sizeof(Head); // handle different operation types bool supportedOp = false; if (isKeyOp()) { if (isTableOp()) { // get table key Uint32* data = (Uint32*)theKeyBuf.data; unsigned size = theTable->m_keyLenInWords; if (theNdbOp->getKeyFromTCREQ(data, size) == -1) { setErrorCode(NdbBlobImpl::ErrUsage); DBUG_RETURN(-1); } } if (isIndexOp()) { // get index key Uint32* data = (Uint32*)theAccessKeyBuf.data; unsigned size = theAccessTable->m_keyLenInWords; if (theNdbOp->getKeyFromTCREQ(data, size) == -1) { setErrorCode(NdbBlobImpl::ErrUsage); DBUG_RETURN(-1); } } if (isReadOp()) { // add read of head+inline in this op if (getHeadInlineValue(theNdbOp) == -1) DBUG_RETURN(-1); } if (isInsertOp()) { // becomes NULL unless set before execute theNullFlag = true; theLength = 0; } if (isWriteOp()) { // becomes NULL unless set before execute theNullFlag = true; theLength = 0; theHeadInlineUpdateFlag = true; } supportedOp = true; } if (isScanOp()) { // add read of head+inline in this op if (getHeadInlineValue(theNdbOp) == -1) DBUG_RETURN(-1); supportedOp = true; } if (! supportedOp) { setErrorCode(NdbBlobImpl::ErrUsage); DBUG_RETURN(-1); } setState(Prepared); DBUG_RETURN(0);}/* * Before execute of prepared operation. May add new operations before * this one. May ask that this operation and all before it (a "batch") * is executed immediately in no-commit mode. In this case remaining * prepared operations are saved in a separate list. They are added * back after postExecute. */intNdbBlob::preExecute(NdbTransaction::ExecType anExecType, bool& batch){ DBUG_ENTER("NdbBlob::preExecute"); DBUG_PRINT("info", ("this=%p op=%p con=%p", this, theNdbOp, theNdbCon)); if (theState == Invalid) DBUG_RETURN(-1); assert(theState == Prepared); // handle different operation types assert(isKeyOp()); if (isReadOp()) { if (theGetFlag && theGetSetBytes > theInlineSize) { // need blob head before proceeding batch = true; } } if (isInsertOp()) { if (theSetFlag && theGetSetBytes > theInlineSize) { // add ops to write rest of a setValue assert(theSetBuf != NULL); const char* buf = theSetBuf + theInlineSize; Uint32 bytes = theGetSetBytes - theInlineSize; assert(thePos == theInlineSize); if (writeDataPrivate(buf, bytes) == -1) DBUG_RETURN(-1); if (theHeadInlineUpdateFlag) { // add an operation to update head+inline NdbOperation* tOp = theNdbCon->getNdbOperation(theTable); if (tOp == NULL || tOp->updateTuple() == -1 || setTableKeyValue(tOp) == -1 || setHeadInlineValue(tOp) == -1) { setErrorCode(NdbBlobImpl::ErrAbort); DBUG_RETURN(-1); } DBUG_PRINT("info", ("add op to update head+inline")); } } } if (isTableOp()) { if (isUpdateOp() || isWriteOp() || isDeleteOp()) { // add operation before this one to read head+inline NdbOperation* tOp = theNdbCon->getNdbOperation(theTable, theNdbOp); if (tOp == NULL || tOp->readTuple() == -1 || setTableKeyValue(tOp) == -1 || getHeadInlineValue(tOp) == -1) { setErrorCode(tOp); DBUG_RETURN(-1); } if (isWriteOp()) { tOp->m_abortOption = NdbTransaction::AO_IgnoreError; } theHeadInlineReadOp = tOp; // execute immediately batch = true; DBUG_PRINT("info", ("add op before to read head+inline")); } } if (isIndexOp()) { // add op before this one to read table key NdbBlob* tFirstBlob = theNdbOp->theBlobList; if (this == tFirstBlob) { // first blob does it for all if (g_ndb_blob_ok_to_read_index_table) { Uint32 pkAttrId = theAccessTable->getNoOfColumns() - 1; NdbOperation* tOp = theNdbCon->getNdbOperation(theAccessTable, theNdbOp); if (tOp == NULL || tOp->readTuple() == -1 || setAccessKeyValue(tOp) == -1 || tOp->getValue(pkAttrId, theKeyBuf.data) == NULL) { setErrorCode(tOp); DBUG_RETURN(-1); } } else { NdbIndexOperation* tOp = theNdbCon->getNdbIndexOperation(theAccessTable->m_index, theTable, theNdbOp); if (tOp == NULL || tOp->readTuple() == -1 || setAccessKeyValue(tOp) == -1 || getTableKeyValue(tOp) == -1) { setErrorCode(tOp); DBUG_RETURN(-1); } } } DBUG_PRINT("info", ("added op before to read table key")); if (isUpdateOp() || isDeleteOp()) { // add op before this one to read head+inline via index NdbIndexOperation* tOp = theNdbCon->getNdbIndexOperation(theAccessTable->m_index, theTable, theNdbOp); if (tOp == NULL || tOp->readTuple() == -1 || setAccessKeyValue(tOp) == -1 || getHeadInlineValue(tOp) == -1) { setErrorCode(tOp); DBUG_RETURN(-1); } if (isWriteOp()) { tOp->m_abortOption = NdbTransaction::AO_IgnoreError; } theHeadInlineReadOp = tOp; // execute immediately batch = true; DBUG_PRINT("info", ("added index op before to read head+inline")); } if (isWriteOp()) { // XXX until IgnoreError fixed for index op batch = true; } } if (isWriteOp()) { if (theSetFlag) { // write head+inline now theNullFlag = true; theLength = 0; if (theSetBuf != NULL) { Uint32 n = theGetSetBytes; if (n > theInlineSize) n = theInlineSize; assert(thePos == 0); if (writeDataPrivate(theSetBuf, n) == -1) DBUG_RETURN(-1); } if (setHeadInlineValue(theNdbOp) == -1) DBUG_RETURN(-1); // the read op before us may overwrite theHeadInlineCopyBuf.copyfrom(theHeadInlineBuf); } } if (theActiveHook != NULL) { // need blob head for callback batch = true; } DBUG_PRINT("info", ("batch=%u", batch)); DBUG_RETURN(0);}/* * After execute, for any operation. If already Active, this routine * has been done previously. Operations which requested a no-commit * batch can add new operations after this one. They are added before * any remaining prepared operations. */intNdbBlob::postExecute(NdbTransaction::ExecType anExecType){ DBUG_ENTER("NdbBlob::postExecute"); DBUG_PRINT("info", ("this=%p op=%p con=%p anExecType=%u", this, theNdbOp, theNdbCon, anExecType)); if (theState == Invalid) DBUG_RETURN(-1); if (theState == Active) { setState(anExecType == NdbTransaction::NoCommit ? Active : Closed); DBUG_PRINT("info", ("skip active")); DBUG_RETURN(0); } assert(theState == Prepared); setState(anExecType == NdbTransaction::NoCommit ? Active : Closed); assert(isKeyOp()); if (isIndexOp()) { NdbBlob* tFirstBlob = theNdbOp->theBlobList; if (this != tFirstBlob) { // copy key from first blob assert(theKeyBuf.size == tFirstBlob->theKeyBuf.size); memcpy(theKeyBuf.data, tFirstBlob->theKeyBuf.data, tFirstBlob->theKeyBuf.size); } } if (isReadOp()) { getHeadFromRecAttr(); if (setPos(0) == -1) DBUG_RETURN(-1); if (theGetFlag) { assert(theGetSetBytes == 0 || theGetBuf != 0); assert(theGetSetBytes <= theInlineSize || anExecType == NdbTransaction::NoCommit); Uint32 bytes = theGetSetBytes; if (readDataPrivate(theGetBuf, bytes) == -1) DBUG_RETURN(-1); } } if (isUpdateOp()) { assert(anExecType == NdbTransaction::NoCommit); getHeadFromRecAttr(); if (theSetFlag) { // setValue overwrites everything if (theSetBuf != NULL) { if (truncate(0) == -1) DBUG_RETURN(-1); assert(thePos == 0); if (writeDataPrivate(theSetBuf, theGetSetBytes) == -1) DBUG_RETURN(-1); } else { if (setNull() == -1) DBUG_RETURN(-1); } } } if (isWriteOp() && isTableOp()) { assert(anExecType == NdbTransaction::NoCommit); if (theHeadInlineReadOp->theError.code == 0) { int tNullFlag = theNullFlag; Uint64 tLength = theLength; Uint64 tPos = thePos; getHeadFromRecAttr(); DBUG_PRINT("info", ("tuple found")); if (truncate(0) == -1) DBUG_RETURN(-1); // restore previous head+inline theHeadInlineBuf.copyfrom(theHeadInlineCopyBuf); theNullFlag = tNullFlag; theLength = tLength; thePos = tPos; } else { if (theHeadInlineReadOp->theError.code != 626) { setErrorCode(theHeadInlineReadOp); DBUG_RETURN(-1); } DBUG_PRINT("info", ("tuple not found")); /* * Read found no tuple but it is possible that a tuple was * created after the read by another transaction. Delete all * blob parts which may exist. */ if (deletePartsUnknown(0) == -1) DBUG_RETURN(-1); } if (theSetFlag && theGetSetBytes > theInlineSize) { assert(theSetBuf != NULL); const char* buf = theSetBuf + theInlineSize; Uint32 bytes = theGetSetBytes - theInlineSize; assert(thePos == theInlineSize); if (writeDataPrivate(buf, bytes) == -1) DBUG_RETURN(-1); } } if (isWriteOp() && isIndexOp()) { // XXX until IgnoreError fixed for index op if (deletePartsUnknown(0) == -1) DBUG_RETURN(-1); if (theSetFlag && theGetSetBytes > theInlineSize) { assert(theSetBuf != NULL); const char* buf = theSetBuf + theInlineSize; Uint32 bytes = theGetSetBytes - theInlineSize; assert(thePos == theInlineSize); if (writeDataPrivate(buf, bytes) == -1) DBUG_RETURN(-1); } } if (isDeleteOp()) { assert(anExecType == NdbTransaction::NoCommit); getHeadFromRecAttr(); if (deleteParts(0, getPartCount()) == -1) DBUG_RETURN(-1); } setState(anExecType == NdbTransaction::NoCommit ? Active : Closed); // activation callback if (theActiveHook != NULL) { if (invokeActiveHook() == -1) DBUG_RETURN(-1); } if (anExecType == NdbTransaction::NoCommit && theHeadInlineUpdateFlag) { NdbOperation* tOp = theNdbCon->getNdbOperation(theTable); if (tOp == NULL || tOp->updateTuple() == -1 || setTableKeyValue(tOp) == -1 || setHeadInlineValue(tOp) == -1) { setErrorCode(NdbBlobImpl::ErrAbort); DBUG_RETURN(-1); } tOp->m_abortOption = NdbTransaction::AbortOnError; DBUG_PRINT("info", ("added op to update head+inline")); } DBUG_RETURN(0);}/* * Before commit of completed operation. For write add operation to * update head+inline. */intNdbBlob::preCommit(){ DBUG_ENTER("NdbBlob::preCommit"); DBUG_PRINT("info", ("this=%p op=%p con=%p", this, theNdbOp, theNdbCon)); if (theState == Invalid) DBUG_RETURN(-1); assert(theState == Active); assert(isKeyOp()); if (isInsertOp() || isUpdateOp() || isWriteOp()) { if (theHeadInlineUpdateFlag) { // add an operation to update head+inline NdbOperation* tOp = theNdbCon->getNdbOperation(theTable); if (tOp == NULL || tOp->updateTuple() == -1 || setTableKeyValue(tOp) == -1 || setHeadInlineValue(tOp) == -1) { setErrorCode(NdbBlobImpl::ErrAbort); DBUG_RETURN(-1); } tOp->m_abortOption = NdbTransaction::AbortOnError; DBUG_PRINT("info", ("added op to update head+inline")); } } DBUG_RETURN(0);}/* * After next scan result. Handle like read op above. */intNdbBlob::atNextResult(){ DBUG_ENTER("NdbBlob::atNextResult"); DBUG_PRINT("info", ("this=%p op=%p con=%p", this, theNdbOp, theNdbCon)); if (theState == Invalid) DBUG_RETURN(-1); assert(isScanOp()); // get primary key { Uint32* data = (Uint32*)theKeyBuf.data; unsigned size = theTable->m_keyLenInWords; if (((NdbScanOperation*)theNdbOp)->getKeyFromKEYINFO20(data, size) == -1) { setErrorCode(NdbBlobImpl::ErrUsage); DBUG_RETURN(-1); } } getHeadFromRecAttr(); if (setPos(0) == -1) DBUG_RETURN(-1); if (theGetFlag) { assert(theGetSetBytes == 0 || theGetBuf != 0); Uint32 bytes = theGetSetBytes; if (readDataPrivate(theGetBuf, bytes) == -1) DBUG_RETURN(-1); } setState(Active); // activation callback if (theActiveHook != NULL) { if (invokeActiveHook() == -1) DBUG_RETURN(-1); } DBUG_RETURN(0);}// miscconst NdbDictionary::Column*NdbBlob::getColumn(){ return theColumn;}// errorsvoidNdbBlob::setErrorCode(int anErrorCode, bool invalidFlag){ DBUG_ENTER("NdbBlob::setErrorCode"); DBUG_PRINT("info", ("this=%p code=%u", this, anErrorCode)); theError.code = anErrorCode; // conditionally copy error to operation level if (theNdbOp != NULL && theNdbOp->theError.code == 0) theNdbOp->setErrorCode(theError.code); if (invalidFlag) setState(Invalid); DBUG_VOID_RETURN;}voidNdbBlob::setErrorCode(NdbOperation* anOp, bool invalidFlag){ int code = 0; if (anOp != NULL && (code = anOp->theError.code) != 0) ; else if ((code = theNdbCon->theError.code) != 0) ; else if ((code = theNdb->theError.code) != 0) ; else code = NdbBlobImpl::ErrUnknown; setErrorCode(code, invalidFlag);}voidNdbBlob::setErrorCode(NdbTransaction* aCon, bool invalidFlag){ int code = 0; if (theNdbCon != NULL && (code = theNdbCon->theError.code) != 0) ; else if ((code = theNdb->theError.code) != 0) ; else code = NdbBlobImpl::ErrUnknown; setErrorCode(code, invalidFlag);}// info about all blobs in this operationNdbBlob*NdbBlob::blobsFirstBlob(){ return theNdbOp->theBlobList;}NdbBlob*NdbBlob::blobsNextBlob(){ return theNext;}// debug#ifdef VM_TRACEinline intNdbBlob::getOperationType() const{ return theNdbOp != NULL ? theNdbOp->theOperationType : -1;}NdbOut&operator<<(NdbOut& out, const NdbBlob& blob){ ndbout << dec << "o=" << blob.getOperationType(); ndbout << dec << " s=" << (Uint32) blob.theState; ndbout << dec << " n=" << blob.theNullFlag;; ndbout << dec << " l=" << blob.theLength; ndbout << dec << " p=" << blob.thePos; ndbout << dec << " u=" << (Uint32)blob.theHeadInlineUpdateFlag; ndbout << dec << " g=" << (Uint32)blob.theGetSetBytes; return out;}#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -