📄 ndbfs.cpp
字号:
/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#include <ndb_global.h>#include "Ndbfs.hpp"#include "AsyncFile.hpp"#include "Filename.hpp"#include <signaldata/FsOpenReq.hpp>#include <signaldata/FsCloseReq.hpp>#include <signaldata/FsReadWriteReq.hpp>#include <signaldata/FsAppendReq.hpp>#include <signaldata/FsRemoveReq.hpp>#include <signaldata/FsConf.hpp>#include <signaldata/FsRef.hpp>#include <signaldata/NdbfsContinueB.hpp>#include <signaldata/DumpStateOrd.hpp>#include <RefConvert.hpp>#include <NdbSleep.h>#include <NdbOut.hpp>#include <Configuration.hpp>#define DEBUG(x) { ndbout << "FS::" << x << endl; }inlineint pageSize( const NewVARIABLE* baseAddrRef ){ int log_psize; int log_qsize = baseAddrRef->bits.q; int log_vsize = baseAddrRef->bits.v; if (log_vsize < 3) log_vsize = 3; log_psize = log_qsize + log_vsize - 3; return (1 << log_psize);}Ndbfs::Ndbfs(const Configuration & conf) : SimulatedBlock(NDBFS, conf), scanningInProgress(false), theLastId(0), m_maxOpenedFiles(0){ BLOCK_CONSTRUCTOR(Ndbfs); // Set received signals addRecSignal(GSN_READ_CONFIG_REQ, &Ndbfs::execREAD_CONFIG_REQ); addRecSignal(GSN_DUMP_STATE_ORD, &Ndbfs::execDUMP_STATE_ORD); addRecSignal(GSN_STTOR, &Ndbfs::execSTTOR); addRecSignal(GSN_FSOPENREQ, &Ndbfs::execFSOPENREQ); addRecSignal(GSN_FSCLOSEREQ, &Ndbfs::execFSCLOSEREQ); addRecSignal(GSN_FSWRITEREQ, &Ndbfs::execFSWRITEREQ); addRecSignal(GSN_FSREADREQ, &Ndbfs::execFSREADREQ); addRecSignal(GSN_FSSYNCREQ, &Ndbfs::execFSSYNCREQ); addRecSignal(GSN_CONTINUEB, &Ndbfs::execCONTINUEB); addRecSignal(GSN_FSAPPENDREQ, &Ndbfs::execFSAPPENDREQ); addRecSignal(GSN_FSREMOVEREQ, &Ndbfs::execFSREMOVEREQ); // Set send signals theRequestPool = 0;}Ndbfs::~Ndbfs(){ // Delete all files // AsyncFile destuctor will take care of deleting // the thread it has created for (unsigned i = 0; i < theFiles.size(); i++){ AsyncFile* file = theFiles[i]; delete file; theFiles[i] = NULL; }//for theFiles.clear(); if (theRequestPool) delete theRequestPool;}void Ndbfs::execREAD_CONFIG_REQ(Signal* signal){ const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr(); Uint32 ref = req->senderRef; Uint32 senderData = req->senderData; const ndb_mgm_configuration_iterator * p = theConfiguration.getOwnConfigIterator(); ndbrequire(p != 0); theFileSystemPath = theConfiguration.fileSystemPath(); theBackupFilePath = theConfiguration.backupFilePath(); theRequestPool = new Pool<Request>; m_maxFiles = 40; ndb_mgm_get_int_parameter(p, CFG_DB_MAX_OPEN_FILES, &m_maxFiles); // Create idle AsyncFiles Uint32 noIdleFiles = m_maxFiles > 27 ? 27 : m_maxFiles ; for (Uint32 i = 0; i < noIdleFiles; i++){ theIdleFiles.push_back(createAsyncFile()); } ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend(); conf->senderRef = reference(); conf->senderData = senderData; sendSignal(ref, GSN_READ_CONFIG_CONF, signal, ReadConfigConf::SignalLength, JBB);}/* Received a restart signal. * Answer it like any other block * PR0 : StartCase * DR0 : StartPhase * DR1 : ? * DR2 : ? * DR3 : ? * DR4 : ? * DR5 : SignalKey */voidNdbfs::execSTTOR(Signal* signal){ jamEntry(); if(signal->theData[1] == 0){ // StartPhase 0 jam(); cownref = NDBFS_REF; // close all open files ndbrequire(theOpenFiles.size() == 0); scanningInProgress = false; signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY; sendSignalWithDelay(cownref, GSN_CONTINUEB, signal, 10, 1); signal->theData[3] = 255; sendSignal(NDBCNTR_REF, GSN_STTORRY, signal,4, JBB); return; } ndbrequire(0);}int Ndbfs::forward( AsyncFile * file, Request* request){ jam(); file->execute(request); return 1;}void Ndbfs::execFSOPENREQ(Signal* signal){ jamEntry(); const FsOpenReq * const fsOpenReq = (FsOpenReq *)&signal->theData[0]; const BlockReference userRef = fsOpenReq->userReference; AsyncFile* file = getIdleFile(); ndbrequire(file != NULL); ndbrequire(signal->getLength() == FsOpenReq::SignalLength) file->theFileName.set( userRef, fsOpenReq->fileNumber); file->reportTo(&theFromThreads); Request* request = theRequestPool->get(); request->action = Request::open; request->error = 0; request->par.open.flags = fsOpenReq->fileFlags; request->set(userRef, fsOpenReq->userPointer, newId() ); request->file = file; request->theTrace = signal->getTrace(); ndbrequire(forward(file, request));}void Ndbfs::execFSREMOVEREQ(Signal* signal){ jamEntry(); const FsRemoveReq * const req = (FsRemoveReq *)signal->getDataPtr(); const BlockReference userRef = req->userReference; AsyncFile* file = getIdleFile(); ndbrequire(file != NULL); file->theFileName.set( userRef, req->fileNumber, req->directory); file->reportTo(&theFromThreads); Request* request = theRequestPool->get(); request->action = Request::rmrf; request->par.rmrf.directory = req->directory; request->par.rmrf.own_directory = req->ownDirectory; request->error = 0; request->set(userRef, req->userPointer, newId() ); request->file = file; request->theTrace = signal->getTrace(); ndbrequire(forward(file, request));}/* * PR0: File Pointer DR0: User reference DR1: User Pointer DR2: Flag bit 0= 1 * remove file */void Ndbfs::execFSCLOSEREQ(Signal * signal){ jamEntry(); const FsCloseReq * const fsCloseReq = (FsCloseReq *)&signal->theData[0]; const BlockReference userRef = fsCloseReq->userReference; const Uint16 filePointer = (Uint16)fsCloseReq->filePointer; const UintR userPointer = fsCloseReq->userPointer; AsyncFile* openFile = theOpenFiles.find(filePointer); if (openFile == NULL) { // The file was not open, send error back to sender jam(); // Initialise FsRef signal FsRef * const fsRef = (FsRef *)&signal->theData[0]; fsRef->userPointer = userPointer; fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrFileDoesNotExist); fsRef->osErrorCode = ~0; // Indicate local error sendSignal(userRef, GSN_FSCLOSEREF, signal, 3, JBB); return; } Request *request = theRequestPool->get(); if( fsCloseReq->getRemoveFileFlag(fsCloseReq->fileFlag) == true ) { jam(); request->action = Request::closeRemove; } else { jam(); request->action = Request::close; } request->set(userRef, fsCloseReq->userPointer, filePointer); request->file = openFile; request->error = 0; request->theTrace = signal->getTrace(); ndbrequire(forward(openFile, request));}void Ndbfs::readWriteRequest(int action, Signal * signal){ const FsReadWriteReq * const fsRWReq = (FsReadWriteReq *)&signal->theData[0]; Uint16 filePointer = (Uint16)fsRWReq->filePointer; const UintR userPointer = fsRWReq->userPointer; const BlockReference userRef = fsRWReq->userReference; const BlockNumber blockNumber = refToBlock(userRef); AsyncFile* openFile = theOpenFiles.find(filePointer); const NewVARIABLE *myBaseAddrRef = &getBat(blockNumber)[fsRWReq->varIndex]; UintPtr tPageSize; UintPtr tClusterSize; UintPtr tNRR; UintPtr tPageOffset; char* tWA; FsRef::NdbfsErrorCodeType errorCode; Request *request = theRequestPool->get(); request->error = 0; request->set(userRef, userPointer, filePointer); request->file = openFile; request->action = (Request::Action) action; request->theTrace = signal->getTrace(); if (fsRWReq->numberOfPages == 0) { //Zero pages not allowed jam(); errorCode = FsRef::fsErrInvalidParameters; goto error; } if (fsRWReq->varIndex >= getBatSize(blockNumber)) { jam();// Ensure that a valid variable is used errorCode = FsRef::fsErrInvalidParameters; goto error; } if (myBaseAddrRef == NULL) { jam(); // Ensure that a valid variable is used errorCode = FsRef::fsErrInvalidParameters; goto error; } if (openFile == NULL) { jam(); //file not open errorCode = FsRef::fsErrFileDoesNotExist; goto error; } tPageSize = pageSize(myBaseAddrRef); tClusterSize = myBaseAddrRef->ClusterSize; tNRR = myBaseAddrRef->nrr; tWA = (char*)myBaseAddrRef->WA; switch (fsRWReq->getFormatFlag(fsRWReq->operationFlag)) { // List of memory and file pages pairs case FsReadWriteReq::fsFormatListOfPairs: { jam(); for (unsigned int i = 0; i < fsRWReq->numberOfPages; i++) { jam(); const UintPtr varIndex = fsRWReq->data.listOfPair[i].varIndex; const UintPtr fileOffset = fsRWReq->data.listOfPair[i].fileOffset; if (varIndex >= tNRR) { jam(); errorCode = FsRef::fsErrInvalidParameters; goto error; }//if request->par.readWrite.pages[i].buf = &tWA[varIndex * tClusterSize]; request->par.readWrite.pages[i].size = tPageSize; request->par.readWrite.pages[i].offset = fileOffset * tPageSize; }//for request->par.readWrite.numberOfPages = fsRWReq->numberOfPages; break; }//case // Range of memory page with one file page case FsReadWriteReq::fsFormatArrayOfPages: { if ((fsRWReq->numberOfPages + fsRWReq->data.arrayOfPages.varIndex) > tNRR) { jam(); errorCode = FsRef::fsErrInvalidParameters; goto error; }//if const UintPtr varIndex = fsRWReq->data.arrayOfPages.varIndex; const UintPtr fileOffset = fsRWReq->data.arrayOfPages.fileOffset; request->par.readWrite.pages[0].offset = fileOffset * tPageSize; request->par.readWrite.pages[0].size = tPageSize * fsRWReq->numberOfPages; request->par.readWrite.numberOfPages = 1; request->par.readWrite.pages[0].buf = &tWA[varIndex * tPageSize]; break; }//case // List of memory pages followed by one file page case FsReadWriteReq::fsFormatListOfMemPages: { tPageOffset = fsRWReq->data.listOfMemPages.varIndex[fsRWReq->numberOfPages]; tPageOffset *= tPageSize; for (unsigned int i = 0; i < fsRWReq->numberOfPages; i++) { jam(); UintPtr varIndex = fsRWReq->data.listOfMemPages.varIndex[i]; if (varIndex >= tNRR) { jam(); errorCode = FsRef::fsErrInvalidParameters; goto error; }//if request->par.readWrite.pages[i].buf = &tWA[varIndex * tClusterSize]; request->par.readWrite.pages[i].size = tPageSize; request->par.readWrite.pages[i].offset = tPageOffset + (i*tPageSize); }//for request->par.readWrite.numberOfPages = fsRWReq->numberOfPages; break; // make it a writev or readv }//case default: { jam(); errorCode = FsRef::fsErrInvalidParameters; goto error; }//default }//switch ndbrequire(forward(openFile, request)); return;error: theRequestPool->put(request); FsRef * const fsRef = (FsRef *)&signal->theData[0]; fsRef->userPointer = userPointer; fsRef->setErrorCode(fsRef->errorCode, errorCode); fsRef->osErrorCode = ~0; // Indicate local error switch (action) { case Request:: write: case Request:: writeSync: { jam(); sendSignal(userRef, GSN_FSWRITEREF, signal, 3, JBB); break; }//case case Request:: read: { jam(); sendSignal(userRef, GSN_FSREADREF, signal, 3, JBB); }//case }//switch return;}/* PR0: File Pointer , theData[0] DR0: User reference, theData[1] DR1: User Pointer, etc. DR2: Flag DR3: Var number DR4: amount of pages DR5->: Memory Page id and File page id according to Flag*/void Ndbfs::execFSWRITEREQ(Signal* signal){ jamEntry(); const FsReadWriteReq * const fsWriteReq = (FsReadWriteReq *)&signal->theData[0]; if (fsWriteReq->getSyncFlag(fsWriteReq->operationFlag) == true){ jam(); readWriteRequest( Request::writeSync, signal ); } else { jam(); readWriteRequest( Request::write, signal ); }}/* PR0: File Pointer DR0: User reference DR1: User Pointer DR2: Flag DR3: Var number DR4: amount of pages DR5->: Memory Page id and File page id according to Flag*/void Ndbfs::execFSREADREQ(Signal* signal){ jamEntry(); readWriteRequest( Request::read, signal );}/* * PR0: File Pointer DR0: User reference DR1: User Pointer */voidNdbfs::execFSSYNCREQ(Signal * signal){ jamEntry(); Uint16 filePointer = (Uint16)signal->theData[0]; BlockReference userRef = signal->theData[1]; const UintR userPointer = signal->theData[2]; AsyncFile* openFile = theOpenFiles.find(filePointer); if (openFile == NULL) { jam(); //file not open FsRef * const fsRef = (FsRef *)&signal->theData[0]; fsRef->userPointer = userPointer; fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrFileDoesNotExist); fsRef->osErrorCode = ~0; // Indicate local error sendSignal(userRef, GSN_FSSYNCREF, signal, 3, JBB); return; } Request *request = theRequestPool->get(); request->error = 0; request->action = Request::sync; request->set(userRef, userPointer, filePointer); request->file = openFile; request->theTrace = signal->getTrace(); ndbrequire(forward(openFile,request));}void Ndbfs::execFSAPPENDREQ(Signal * signal){ const FsAppendReq * const fsReq = (FsAppendReq *)&signal->theData[0]; const Uint16 filePointer = (Uint16)fsReq->filePointer; const UintR userPointer = fsReq->userPointer; const BlockReference userRef = fsReq->userReference; const BlockNumber blockNumber = refToBlock(userRef); FsRef::NdbfsErrorCodeType errorCode; AsyncFile* openFile = theOpenFiles.find(filePointer); const NewVARIABLE *myBaseAddrRef = &getBat(blockNumber)[fsReq->varIndex]; const Uint32* tWA = (const Uint32*)myBaseAddrRef->WA; const Uint32 tSz = myBaseAddrRef->nrr; const Uint32 offset = fsReq->offset; const Uint32 size = fsReq->size; Request *request = theRequestPool->get(); if (openFile == NULL) { jam(); errorCode = FsRef::fsErrFileDoesNotExist; goto error; } if (myBaseAddrRef == NULL) { jam(); // Ensure that a valid variable is used errorCode = FsRef::fsErrInvalidParameters; goto error; } if (fsReq->varIndex >= getBatSize(blockNumber)) { jam();// Ensure that a valid variable is used errorCode = FsRef::fsErrInvalidParameters; goto error; } if(offset + size > tSz){ jam(); // Ensure that a valid variable is used errorCode = FsRef::fsErrInvalidParameters; goto error; } request->error = 0; request->set(userRef, userPointer, filePointer); request->file = openFile; request->action = Request::append; request->theTrace = signal->getTrace(); request->par.append.buf = (const char *)(tWA + offset); request->par.append.size = size << 2; ndbrequire(forward(openFile, request)); return; error:
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -