📄 dbdict.cpp
字号:
Uint32 chk = computeChecksum((const Uint32*)tmpPagePtr.p, sz); ndbrequire((chk == 0) || !crashInd); if(chk != 0){ jam(); ndbrequire(fsPtr.p->fsState == FsConnectRecord::READ_TAB_FILE1); readTableRef(signal, fsPtr); return; }//if fsPtr.p->fsState = FsConnectRecord::CLOSE_READ_TAB_FILE; closeFile(signal, fsPtr.p->filePtr, fsPtr.i); return;}//Dbdict::readTableConf()void Dbdict::readTableRef(Signal* signal, FsConnectRecordPtr fsPtr){ fsPtr.p->fsState = FsConnectRecord::OPEN_READ_TAB_FILE2; openTableFile(signal, 1, fsPtr.i, c_readTableRecord.tableId, false); return;}//Dbdict::readTableRef()void Dbdict::closeReadTableConf(Signal* signal, FsConnectRecordPtr fsPtr){ c_fsConnectRecordPool.release(fsPtr); c_readTableRecord.inUse = false; execute(signal, c_readTableRecord.m_callback, 0); return;}//Dbdict::closeReadTableConf()/* ---------------------------------------------------------------- */// Routines to handle Read/Write of Schema Files/* ---------------------------------------------------------------- */voidDbdict::updateSchemaState(Signal* signal, Uint32 tableId, SchemaFile::TableEntry* te, Callback* callback){ jam(); ndbrequire(tableId < c_tableRecordPool.getSize()); XSchemaFile * xsf = &c_schemaFile[c_schemaRecord.schemaPage != 0]; SchemaFile::TableEntry * tableEntry = getTableEntry(xsf, tableId); SchemaFile::TableState newState = (SchemaFile::TableState)te->m_tableState; SchemaFile::TableState oldState = (SchemaFile::TableState)tableEntry->m_tableState; Uint32 newVersion = te->m_tableVersion; Uint32 oldVersion = tableEntry->m_tableVersion; bool ok = false; switch(newState){ case SchemaFile::ADD_STARTED: jam(); ok = true; ndbrequire(create_table_inc_schema_version(oldVersion) == newVersion); ndbrequire(oldState == SchemaFile::INIT || oldState == SchemaFile::DROP_TABLE_COMMITTED); break; case SchemaFile::TABLE_ADD_COMMITTED: jam(); ok = true; ndbrequire(newVersion == oldVersion); ndbrequire(oldState == SchemaFile::ADD_STARTED); break; case SchemaFile::ALTER_TABLE_COMMITTED: jam(); ok = true; ndbrequire(alter_table_inc_schema_version(oldVersion) == newVersion); ndbrequire(oldState == SchemaFile::TABLE_ADD_COMMITTED || oldState == SchemaFile::ALTER_TABLE_COMMITTED); break; case SchemaFile::DROP_TABLE_STARTED: jam(); case SchemaFile::DROP_TABLE_COMMITTED: jam(); ok = true; ndbrequire(false); break; case SchemaFile::INIT: jam(); ok = true; ndbrequire((oldState == SchemaFile::ADD_STARTED)); }//if ndbrequire(ok); * tableEntry = * te; computeChecksum(xsf, tableId / NDB_SF_PAGE_ENTRIES); ndbrequire(c_writeSchemaRecord.inUse == false); c_writeSchemaRecord.inUse = true; c_writeSchemaRecord.pageId = c_schemaRecord.schemaPage; c_writeSchemaRecord.newFile = false; c_writeSchemaRecord.firstPage = tableId / NDB_SF_PAGE_ENTRIES; c_writeSchemaRecord.noOfPages = 1; c_writeSchemaRecord.m_callback = * callback; startWriteSchemaFile(signal);}void Dbdict::startWriteSchemaFile(Signal* signal){ FsConnectRecordPtr fsPtr; c_fsConnectRecordPool.getPtr(fsPtr, getFsConnRecord()); fsPtr.p->fsState = FsConnectRecord::OPEN_WRITE_SCHEMA; openSchemaFile(signal, 0, fsPtr.i, true, c_writeSchemaRecord.newFile); c_writeSchemaRecord.noOfSchemaFilesHandled = 0;}//Dbdict::startWriteSchemaFile()void Dbdict::openSchemaFile(Signal* signal, Uint32 fileNo, Uint32 fsConPtr, bool writeFlag, bool newFile){ FsOpenReq * const fsOpenReq = (FsOpenReq *)&signal->theData[0]; fsOpenReq->userReference = reference(); fsOpenReq->userPointer = fsConPtr; if (writeFlag) { jam(); fsOpenReq->fileFlags = FsOpenReq::OM_WRITEONLY | FsOpenReq::OM_SYNC; if (newFile) fsOpenReq->fileFlags |= FsOpenReq::OM_TRUNCATE | FsOpenReq::OM_CREATE; } else { jam(); fsOpenReq->fileFlags = FsOpenReq::OM_READONLY; }//if fsOpenReq->fileNumber[3] = 0; // Initialise before byte changes FsOpenReq::setVersion(fsOpenReq->fileNumber, 1); FsOpenReq::setSuffix(fsOpenReq->fileNumber, FsOpenReq::S_SCHEMALOG); FsOpenReq::v1_setDisk(fsOpenReq->fileNumber, (fileNo + 1)); FsOpenReq::v1_setTable(fsOpenReq->fileNumber, (Uint32)-1); FsOpenReq::v1_setFragment(fsOpenReq->fileNumber, (Uint32)-1); FsOpenReq::v1_setS(fsOpenReq->fileNumber, (Uint32)-1); FsOpenReq::v1_setP(fsOpenReq->fileNumber, 0);/* ---------------------------------------------------------------- */// File name : D1/DBDICT/P0.SchemaLog// D1 means Disk 1 (set by fileNo + 1). Writes to both D1 and D2// SchemaLog indicates that this is a file giving a list of current tables./* ---------------------------------------------------------------- */ sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, FsOpenReq::SignalLength, JBA);}//openSchemaFile()void Dbdict::writeSchemaFile(Signal* signal, Uint32 filePtr, Uint32 fsConPtr) { FsReadWriteReq * const fsRWReq = (FsReadWriteReq *)&signal->theData[0]; // check write record WriteSchemaRecord & wr = c_writeSchemaRecord; ndbrequire(wr.pageId == (wr.pageId != 0) * NDB_SF_MAX_PAGES); ndbrequire(wr.noOfPages != 0); ndbrequire(wr.firstPage + wr.noOfPages <= NDB_SF_MAX_PAGES); fsRWReq->filePointer = filePtr; fsRWReq->userReference = reference(); fsRWReq->userPointer = fsConPtr; fsRWReq->operationFlag = 0; // Initialise before bit changes FsReadWriteReq::setSyncFlag(fsRWReq->operationFlag, 1); FsReadWriteReq::setFormatFlag(fsRWReq->operationFlag, FsReadWriteReq::fsFormatArrayOfPages); fsRWReq->varIndex = ZBAT_SCHEMA_FILE; fsRWReq->numberOfPages = wr.noOfPages; // Write from memory page fsRWReq->data.arrayOfPages.varIndex = wr.pageId + wr.firstPage; fsRWReq->data.arrayOfPages.fileOffset = wr.firstPage; sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal, 8, JBA);}//writeSchemaFile()void Dbdict::writeSchemaConf(Signal* signal, FsConnectRecordPtr fsPtr){ fsPtr.p->fsState = FsConnectRecord::CLOSE_WRITE_SCHEMA; closeFile(signal, fsPtr.p->filePtr, fsPtr.i); return;}//Dbdict::writeSchemaConf()void Dbdict::closeFile(Signal* signal, Uint32 filePtr, Uint32 fsConPtr) { FsCloseReq * const fsCloseReq = (FsCloseReq *)&signal->theData[0]; fsCloseReq->filePointer = filePtr; fsCloseReq->userReference = reference(); fsCloseReq->userPointer = fsConPtr; FsCloseReq::setRemoveFileFlag(fsCloseReq->fileFlag, false); sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal, FsCloseReq::SignalLength, JBA); return;}//closeFile()void Dbdict::closeWriteSchemaConf(Signal* signal, FsConnectRecordPtr fsPtr){ c_writeSchemaRecord.noOfSchemaFilesHandled++; if (c_writeSchemaRecord.noOfSchemaFilesHandled < 2) { jam(); fsPtr.p->fsState = FsConnectRecord::OPEN_WRITE_SCHEMA; openSchemaFile(signal, 1, fsPtr.i, true, c_writeSchemaRecord.newFile); return; } ndbrequire(c_writeSchemaRecord.noOfSchemaFilesHandled == 2); c_fsConnectRecordPool.release(fsPtr); c_writeSchemaRecord.inUse = false; execute(signal, c_writeSchemaRecord.m_callback, 0); return;}//Dbdict::closeWriteSchemaConf()void Dbdict::startReadSchemaFile(Signal* signal){ //globalSignalLoggers.log(number(), "startReadSchemaFile"); FsConnectRecordPtr fsPtr; c_fsConnectRecordPool.getPtr(fsPtr, getFsConnRecord()); fsPtr.p->fsState = FsConnectRecord::OPEN_READ_SCHEMA1; openSchemaFile(signal, 0, fsPtr.i, false, false);}//Dbdict::startReadSchemaFile()void Dbdict::openReadSchemaRef(Signal* signal, FsConnectRecordPtr fsPtr) { fsPtr.p->fsState = FsConnectRecord::OPEN_READ_SCHEMA2; openSchemaFile(signal, 1, fsPtr.i, false, false);}//Dbdict::openReadSchemaRef()void Dbdict::readSchemaFile(Signal* signal, Uint32 filePtr, Uint32 fsConPtr) { FsReadWriteReq * const fsRWReq = (FsReadWriteReq *)&signal->theData[0]; // check read record ReadSchemaRecord & rr = c_readSchemaRecord; ndbrequire(rr.pageId == (rr.pageId != 0) * NDB_SF_MAX_PAGES); ndbrequire(rr.noOfPages != 0); ndbrequire(rr.firstPage + rr.noOfPages <= NDB_SF_MAX_PAGES); fsRWReq->filePointer = filePtr; fsRWReq->userReference = reference(); fsRWReq->userPointer = fsConPtr; fsRWReq->operationFlag = 0; // Initialise before bit changes FsReadWriteReq::setSyncFlag(fsRWReq->operationFlag, 0); FsReadWriteReq::setFormatFlag(fsRWReq->operationFlag, FsReadWriteReq::fsFormatArrayOfPages); fsRWReq->varIndex = ZBAT_SCHEMA_FILE; fsRWReq->numberOfPages = rr.noOfPages; fsRWReq->data.arrayOfPages.varIndex = rr.pageId + rr.firstPage; fsRWReq->data.arrayOfPages.fileOffset = rr.firstPage; sendSignal(NDBFS_REF, GSN_FSREADREQ, signal, 8, JBA);}//readSchemaFile()void Dbdict::readSchemaConf(Signal* signal, FsConnectRecordPtr fsPtr){/* ---------------------------------------------------------------- */// Verify the data read from disk/* ---------------------------------------------------------------- */ bool crashInd; if (fsPtr.p->fsState == FsConnectRecord::READ_SCHEMA1) { jam(); crashInd = false; } else { jam(); crashInd = true; }//if ReadSchemaRecord & rr = c_readSchemaRecord; XSchemaFile * xsf = &c_schemaFile[rr.pageId != 0]; if (rr.schemaReadState == ReadSchemaRecord::INITIAL_READ_HEAD) { jam(); ndbrequire(rr.firstPage == 0); SchemaFile * sf = &xsf->schemaPage[0]; Uint32 noOfPages; if (sf->NdbVersion < NDB_SF_VERSION_5_0_6) { jam(); const Uint32 pageSize_old = 32 * 1024; noOfPages = pageSize_old / NDB_SF_PAGE_SIZE - 1; } else { noOfPages = sf->FileSize / NDB_SF_PAGE_SIZE - 1; } rr.schemaReadState = ReadSchemaRecord::INITIAL_READ; if (noOfPages != 0) { rr.firstPage = 1; rr.noOfPages = noOfPages; readSchemaFile(signal, fsPtr.p->filePtr, fsPtr.i); return; } } SchemaFile * sf0 = &xsf->schemaPage[0]; xsf->noOfPages = sf0->FileSize / NDB_SF_PAGE_SIZE; if (sf0->NdbVersion < NDB_SF_VERSION_5_0_6 && ! convertSchemaFileTo_5_0_6(xsf)) { jam(); ndbrequire(! crashInd); ndbrequire(fsPtr.p->fsState == FsConnectRecord::READ_SCHEMA1); readSchemaRef(signal, fsPtr); return; } for (Uint32 n = 0; n < xsf->noOfPages; n++) { SchemaFile * sf = &xsf->schemaPage[n]; bool ok = memcmp(sf->Magic, NDB_SF_MAGIC, sizeof(sf->Magic)) == 0 && sf->FileSize != 0 && sf->FileSize % NDB_SF_PAGE_SIZE == 0 && sf->FileSize == sf0->FileSize && sf->PageNumber == n && computeChecksum((Uint32*)sf, NDB_SF_PAGE_SIZE_IN_WORDS) == 0; ndbrequire(ok || !crashInd); if (! ok) { jam(); ndbrequire(fsPtr.p->fsState == FsConnectRecord::READ_SCHEMA1); readSchemaRef(signal, fsPtr); return; } } fsPtr.p->fsState = FsConnectRecord::CLOSE_READ_SCHEMA; closeFile(signal, fsPtr.p->filePtr, fsPtr.i); return;}//Dbdict::readSchemaConf()void Dbdict::readSchemaRef(Signal* signal, FsConnectRecordPtr fsPtr){ /** * First close corrupt file */ fsPtr.p->fsState = FsConnectRecord::OPEN_READ_SCHEMA2; closeFile(signal, fsPtr.p->filePtr, fsPtr.i); return;}void Dbdict::closeReadSchemaConf(Signal* signal, FsConnectRecordPtr fsPtr){ c_fsConnectRecordPool.release(fsPtr); ReadSchemaRecord::SchemaReadState state = c_readSchemaRecord.schemaReadState; c_readSchemaRecord.schemaReadState = ReadSchemaRecord::IDLE; switch(state) { case ReadSchemaRecord::INITIAL_READ : jam(); { // write back both copies ndbrequire(c_writeSchemaRecord.inUse == false); XSchemaFile * xsf = &c_schemaFile[c_schemaRecord.oldSchemaPage != 0 ]; Uint32 noOfPages = (c_tableRecordPool.getSize() + NDB_SF_PAGE_ENTRIES - 1) / NDB_SF_PAGE_ENTRIES; resizeSchemaFile(xsf, noOfPages); c_writeSchemaRecord.inUse = true; c_writeSchemaRecord.pageId = c_schemaRecord.oldSchemaPage; c_writeSchemaRecord.newFile = true; c_writeSchemaRecord.firstPage = 0; c_writeSchemaRecord.noOfPages = xsf->noOfPages; c_writeSchemaRecord.m_callback.m_callbackFunction = safe_cast(&Dbdict::initSchemaFile_conf); startWriteSchemaFile(signal); } break; default : ndbrequire(false); break; }//switch}//Dbdict::closeReadSchemaConf()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -