⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ndbfs.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
📖 第 1 页 / 共 2 页
字号:
/* Copyright (C) 2003 MySQL AB   This program is free software; you can redistribute it and/or modify   it under the terms of the GNU General Public License as published by   the Free Software Foundation; either version 2 of the License, or   (at your option) any later version.   This program is distributed in the hope that it will be useful,   but WITHOUT ANY WARRANTY; without even the implied warranty of   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the   GNU General Public License for more details.   You should have received a copy of the GNU General Public License   along with this program; if not, write to the Free Software   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */#include <ndb_global.h>#include "Ndbfs.hpp"#include "AsyncFile.hpp"#include "Filename.hpp"#include <signaldata/FsOpenReq.hpp>#include <signaldata/FsCloseReq.hpp>#include <signaldata/FsReadWriteReq.hpp>#include <signaldata/FsAppendReq.hpp>#include <signaldata/FsRemoveReq.hpp>#include <signaldata/FsConf.hpp>#include <signaldata/FsRef.hpp>#include <signaldata/NdbfsContinueB.hpp>#include <signaldata/DumpStateOrd.hpp>#include <RefConvert.hpp>#include <NdbSleep.h>#include <NdbOut.hpp>#include <Configuration.hpp>#define DEBUG(x) { ndbout << "FS::" << x << endl; }inlineint pageSize( const NewVARIABLE* baseAddrRef ){   int log_psize;   int log_qsize = baseAddrRef->bits.q;   int log_vsize = baseAddrRef->bits.v;   if (log_vsize < 3)      log_vsize = 3;   log_psize = log_qsize + log_vsize - 3;   return (1 << log_psize);}Ndbfs::Ndbfs(const Configuration & conf) :  SimulatedBlock(NDBFS, conf),  scanningInProgress(false),  theLastId(0),  m_maxOpenedFiles(0){  BLOCK_CONSTRUCTOR(Ndbfs);  // Set received signals  addRecSignal(GSN_READ_CONFIG_REQ, &Ndbfs::execREAD_CONFIG_REQ);  addRecSignal(GSN_DUMP_STATE_ORD,  &Ndbfs::execDUMP_STATE_ORD);  addRecSignal(GSN_STTOR,  &Ndbfs::execSTTOR);  addRecSignal(GSN_FSOPENREQ, &Ndbfs::execFSOPENREQ);  addRecSignal(GSN_FSCLOSEREQ, &Ndbfs::execFSCLOSEREQ);  addRecSignal(GSN_FSWRITEREQ, &Ndbfs::execFSWRITEREQ);  addRecSignal(GSN_FSREADREQ, &Ndbfs::execFSREADREQ);  addRecSignal(GSN_FSSYNCREQ, &Ndbfs::execFSSYNCREQ);  addRecSignal(GSN_CONTINUEB, &Ndbfs::execCONTINUEB);  addRecSignal(GSN_FSAPPENDREQ, &Ndbfs::execFSAPPENDREQ);  addRecSignal(GSN_FSREMOVEREQ, &Ndbfs::execFSREMOVEREQ);   // Set send signals  theRequestPool = 0;}Ndbfs::~Ndbfs(){  // Delete all files  // AsyncFile destuctor will take care of deleting  // the thread it has created  for (unsigned i = 0; i < theFiles.size(); i++){    AsyncFile* file = theFiles[i];    delete file;     theFiles[i] = NULL;  }//for  theFiles.clear();  if (theRequestPool)    delete theRequestPool;}void Ndbfs::execREAD_CONFIG_REQ(Signal* signal){  const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();  Uint32 ref = req->senderRef;  Uint32 senderData = req->senderData;  const ndb_mgm_configuration_iterator * p =     theConfiguration.getOwnConfigIterator();  ndbrequire(p != 0);  theFileSystemPath = theConfiguration.fileSystemPath();  theBackupFilePath = theConfiguration.backupFilePath();  theRequestPool = new Pool<Request>;  m_maxFiles = 40;  ndb_mgm_get_int_parameter(p, CFG_DB_MAX_OPEN_FILES, &m_maxFiles);    // Create idle AsyncFiles  Uint32 noIdleFiles = m_maxFiles > 27  ? 27 : m_maxFiles ;  for (Uint32 i = 0; i < noIdleFiles; i++){    theIdleFiles.push_back(createAsyncFile());  }  ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();  conf->senderRef = reference();  conf->senderData = senderData;  sendSignal(ref, GSN_READ_CONFIG_CONF, signal, 	     ReadConfigConf::SignalLength, JBB);}/* Received a restart signal. * Answer it like any other block * PR0  : StartCase * DR0  : StartPhase * DR1  : ? * DR2  : ? * DR3  : ? * DR4  : ? * DR5  : SignalKey */voidNdbfs::execSTTOR(Signal* signal){  jamEntry();    if(signal->theData[1] == 0){ // StartPhase 0    jam();    cownref = NDBFS_REF;    // close all open files    ndbrequire(theOpenFiles.size() == 0);        scanningInProgress = false;        signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY;    sendSignalWithDelay(cownref, GSN_CONTINUEB, signal, 10, 1);    signal->theData[3] = 255;    sendSignal(NDBCNTR_REF, GSN_STTORRY, signal,4, JBB);    return;  }  ndbrequire(0);}int Ndbfs::forward( AsyncFile * file, Request* request){  jam();  file->execute(request);  return 1;}void Ndbfs::execFSOPENREQ(Signal* signal){  jamEntry();  const FsOpenReq * const fsOpenReq = (FsOpenReq *)&signal->theData[0];  const BlockReference userRef = fsOpenReq->userReference;  AsyncFile* file = getIdleFile();  ndbrequire(file != NULL);  ndbrequire(signal->getLength() == FsOpenReq::SignalLength)  file->theFileName.set( userRef, fsOpenReq->fileNumber);  file->reportTo(&theFromThreads);    Request* request = theRequestPool->get();  request->action = Request::open;  request->error = 0;  request->par.open.flags = fsOpenReq->fileFlags;  request->set(userRef, fsOpenReq->userPointer, newId() );  request->file = file;  request->theTrace = signal->getTrace();    ndbrequire(forward(file, request));}void Ndbfs::execFSREMOVEREQ(Signal* signal){  jamEntry();  const FsRemoveReq * const req = (FsRemoveReq *)signal->getDataPtr();  const BlockReference userRef = req->userReference;  AsyncFile* file = getIdleFile();  ndbrequire(file != NULL);  file->theFileName.set( userRef, req->fileNumber, req->directory);  file->reportTo(&theFromThreads);    Request* request = theRequestPool->get();  request->action = Request::rmrf;  request->par.rmrf.directory = req->directory;  request->par.rmrf.own_directory = req->ownDirectory;  request->error = 0;  request->set(userRef, req->userPointer, newId() );  request->file = file;  request->theTrace = signal->getTrace();    ndbrequire(forward(file, request));}/* * PR0: File Pointer DR0: User reference DR1: User Pointer DR2: Flag bit 0= 1 * remove file */void Ndbfs::execFSCLOSEREQ(Signal * signal){  jamEntry();  const FsCloseReq * const fsCloseReq = (FsCloseReq *)&signal->theData[0];  const BlockReference userRef = fsCloseReq->userReference;  const Uint16 filePointer = (Uint16)fsCloseReq->filePointer;  const UintR userPointer = fsCloseReq->userPointer;   AsyncFile* openFile = theOpenFiles.find(filePointer);  if (openFile == NULL) {    // The file was not open, send error back to sender    jam();        // Initialise FsRef signal    FsRef * const fsRef = (FsRef *)&signal->theData[0];    fsRef->userPointer  = userPointer;     fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrFileDoesNotExist);    fsRef->osErrorCode  = ~0; // Indicate local error    sendSignal(userRef, GSN_FSCLOSEREF, signal, 3, JBB);    return;  }  Request *request = theRequestPool->get();  if( fsCloseReq->getRemoveFileFlag(fsCloseReq->fileFlag) == true ) {     jam();     request->action = Request::closeRemove;  } else {     jam();     request->action = Request::close;  }  request->set(userRef, fsCloseReq->userPointer, filePointer);  request->file = openFile;  request->error = 0;  request->theTrace = signal->getTrace();  ndbrequire(forward(openFile, request));}void Ndbfs::readWriteRequest(int action, Signal * signal){  const FsReadWriteReq * const fsRWReq = (FsReadWriteReq *)&signal->theData[0];  Uint16 filePointer =  (Uint16)fsRWReq->filePointer;  const UintR userPointer = fsRWReq->userPointer;   const BlockReference userRef = fsRWReq->userReference;  const BlockNumber blockNumber = refToBlock(userRef);  AsyncFile* openFile = theOpenFiles.find(filePointer);  const NewVARIABLE *myBaseAddrRef = &getBat(blockNumber)[fsRWReq->varIndex];  UintPtr tPageSize;  UintPtr tClusterSize;  UintPtr tNRR;  UintPtr tPageOffset;  char*        tWA;  FsRef::NdbfsErrorCodeType errorCode;  Request *request = theRequestPool->get();  request->error = 0;  request->set(userRef, userPointer, filePointer);  request->file = openFile;  request->action = (Request::Action) action;  request->theTrace = signal->getTrace();  if (fsRWReq->numberOfPages == 0) { //Zero pages not allowed    jam();    errorCode = FsRef::fsErrInvalidParameters;    goto error;  }  if (fsRWReq->varIndex >= getBatSize(blockNumber)) {    jam();// Ensure that a valid variable is used        errorCode = FsRef::fsErrInvalidParameters;    goto error;  }  if (myBaseAddrRef == NULL) {    jam(); // Ensure that a valid variable is used    errorCode = FsRef::fsErrInvalidParameters;    goto error;  }  if (openFile == NULL) {    jam(); //file not open    errorCode = FsRef::fsErrFileDoesNotExist;    goto error;  }  tPageSize = pageSize(myBaseAddrRef);  tClusterSize = myBaseAddrRef->ClusterSize;  tNRR = myBaseAddrRef->nrr;  tWA = (char*)myBaseAddrRef->WA;   switch (fsRWReq->getFormatFlag(fsRWReq->operationFlag)) {  // List of memory and file pages pairs  case FsReadWriteReq::fsFormatListOfPairs: {     jam();    for (unsigned int i = 0; i < fsRWReq->numberOfPages; i++) {      jam();      const UintPtr varIndex = fsRWReq->data.listOfPair[i].varIndex;      const UintPtr fileOffset = fsRWReq->data.listOfPair[i].fileOffset;      if (varIndex >= tNRR) {        jam();        errorCode = FsRef::fsErrInvalidParameters;        goto error;      }//if      request->par.readWrite.pages[i].buf = &tWA[varIndex * tClusterSize];      request->par.readWrite.pages[i].size = tPageSize;      request->par.readWrite.pages[i].offset = fileOffset * tPageSize;    }//for    request->par.readWrite.numberOfPages = fsRWReq->numberOfPages;    break;  }//case  // Range of memory page with one file page  case FsReadWriteReq::fsFormatArrayOfPages: {     if ((fsRWReq->numberOfPages + fsRWReq->data.arrayOfPages.varIndex) > tNRR) {      jam();      errorCode = FsRef::fsErrInvalidParameters;      goto error;    }//if    const UintPtr varIndex = fsRWReq->data.arrayOfPages.varIndex;    const UintPtr fileOffset = fsRWReq->data.arrayOfPages.fileOffset;        request->par.readWrite.pages[0].offset = fileOffset * tPageSize;    request->par.readWrite.pages[0].size = tPageSize * fsRWReq->numberOfPages;    request->par.readWrite.numberOfPages = 1;    request->par.readWrite.pages[0].buf = &tWA[varIndex * tPageSize];    break;  }//case  // List of memory pages followed by one file page  case FsReadWriteReq::fsFormatListOfMemPages: {     tPageOffset = fsRWReq->data.listOfMemPages.varIndex[fsRWReq->numberOfPages];    tPageOffset *= tPageSize;        for (unsigned int i = 0; i < fsRWReq->numberOfPages; i++) {      jam();      UintPtr varIndex = fsRWReq->data.listOfMemPages.varIndex[i];      if (varIndex >= tNRR) {        jam();        errorCode = FsRef::fsErrInvalidParameters;        goto error;      }//if      request->par.readWrite.pages[i].buf = &tWA[varIndex * tClusterSize];      request->par.readWrite.pages[i].size = tPageSize;      request->par.readWrite.pages[i].offset = tPageOffset + (i*tPageSize);    }//for    request->par.readWrite.numberOfPages = fsRWReq->numberOfPages;    break;    // make it a writev or readv  }//case    default: {    jam();    errorCode = FsRef::fsErrInvalidParameters;    goto error;  }//default    }//switch    ndbrequire(forward(openFile, request));  return;error:  theRequestPool->put(request);  FsRef * const fsRef = (FsRef *)&signal->theData[0];  fsRef->userPointer = userPointer;  fsRef->setErrorCode(fsRef->errorCode, errorCode);  fsRef->osErrorCode = ~0; // Indicate local error  switch (action) {  case Request:: write:  case Request:: writeSync: {    jam();    sendSignal(userRef, GSN_FSWRITEREF, signal, 3, JBB);    break;  }//case  case Request:: read: {    jam();    sendSignal(userRef, GSN_FSREADREF, signal, 3, JBB);  }//case  }//switch  return;}/*    PR0: File Pointer , theData[0]    DR0: User reference, theData[1]    DR1: User Pointer,   etc.    DR2: Flag    DR3: Var number    DR4: amount of pages    DR5->: Memory Page id and File page id according to Flag*/void Ndbfs::execFSWRITEREQ(Signal* signal){  jamEntry();  const FsReadWriteReq * const fsWriteReq = (FsReadWriteReq *)&signal->theData[0];    if (fsWriteReq->getSyncFlag(fsWriteReq->operationFlag) == true){    jam();    readWriteRequest( Request::writeSync, signal );  } else {    jam();    readWriteRequest( Request::write, signal );  }}/*    PR0: File Pointer    DR0: User reference    DR1: User Pointer    DR2: Flag    DR3: Var number    DR4: amount of pages    DR5->: Memory Page id and File page id according to Flag*/void Ndbfs::execFSREADREQ(Signal* signal){   jamEntry();   readWriteRequest( Request::read, signal );}/* * PR0: File Pointer DR0: User reference DR1: User Pointer */voidNdbfs::execFSSYNCREQ(Signal * signal){  jamEntry();  Uint16 filePointer =  (Uint16)signal->theData[0];  BlockReference userRef = signal->theData[1];  const UintR userPointer = signal->theData[2];   AsyncFile* openFile = theOpenFiles.find(filePointer);  if (openFile == NULL) {     jam(); //file not open     FsRef * const fsRef = (FsRef *)&signal->theData[0];     fsRef->userPointer = userPointer;     fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrFileDoesNotExist);     fsRef->osErrorCode = ~0; // Indicate local error     sendSignal(userRef, GSN_FSSYNCREF, signal, 3, JBB);     return;  }    Request *request = theRequestPool->get();  request->error = 0;  request->action = Request::sync;  request->set(userRef, userPointer, filePointer);  request->file = openFile;  request->theTrace = signal->getTrace();    ndbrequire(forward(openFile,request));}void Ndbfs::execFSAPPENDREQ(Signal * signal){  const FsAppendReq * const fsReq = (FsAppendReq *)&signal->theData[0];  const Uint16 filePointer =  (Uint16)fsReq->filePointer;  const UintR userPointer = fsReq->userPointer;   const BlockReference userRef = fsReq->userReference;  const BlockNumber blockNumber = refToBlock(userRef);  FsRef::NdbfsErrorCodeType errorCode;  AsyncFile* openFile = theOpenFiles.find(filePointer);  const NewVARIABLE *myBaseAddrRef = &getBat(blockNumber)[fsReq->varIndex];  const Uint32* tWA   = (const Uint32*)myBaseAddrRef->WA;  const Uint32  tSz   = myBaseAddrRef->nrr;  const Uint32 offset = fsReq->offset;  const Uint32 size   = fsReq->size;  Request *request = theRequestPool->get();  if (openFile == NULL) {    jam();    errorCode = FsRef::fsErrFileDoesNotExist;    goto error;  }  if (myBaseAddrRef == NULL) {    jam(); // Ensure that a valid variable is used    errorCode = FsRef::fsErrInvalidParameters;    goto error;  }    if (fsReq->varIndex >= getBatSize(blockNumber)) {    jam();// Ensure that a valid variable is used        errorCode = FsRef::fsErrInvalidParameters;    goto error;  }    if(offset + size > tSz){    jam(); // Ensure that a valid variable is used    errorCode = FsRef::fsErrInvalidParameters;    goto error;  }  request->error = 0;  request->set(userRef, userPointer, filePointer);  request->file = openFile;  request->action = Request::append;  request->theTrace = signal->getTrace();    request->par.append.buf = (const char *)(tWA + offset);  request->par.append.size = size << 2;    ndbrequire(forward(openFile, request));  return;  error:

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -