⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 asyncfile.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
📖 第 1 页 / 共 2 页
字号:
/* Copyright (C) 2003 MySQL AB   This program is free software; you can redistribute it and/or modify   it under the terms of the GNU General Public License as published by   the Free Software Foundation; either version 2 of the License, or   (at your option) any later version.   This program is distributed in the hope that it will be useful,   but WITHOUT ANY WARRANTY; without even the implied warranty of   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the   GNU General Public License for more details.   You should have received a copy of the GNU General Public License   along with this program; if not, write to the Free Software   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */#include <ndb_global.h>#include <my_sys.h>#include <my_pthread.h>#include "AsyncFile.hpp"#include <ErrorHandlingMacros.hpp>#include <kernel_types.h>#include <ndbd_malloc.hpp>#include <NdbThread.h>#include <signaldata/FsOpenReq.hpp>// use this to test broken pread code//#define HAVE_BROKEN_PREAD #ifdef HAVE_BROKEN_PREAD#undef HAVE_PWRITE#undef HAVE_PREAD#endif#if defined NDB_WIN32 || defined NDB_OSE || defined NDB_SOFTOSE#else// For readv and writev#include <sys/uio.h> #endif#ifndef NDB_WIN32#include <dirent.h>#endif// Use this define if you want printouts from AsyncFile class//#define DEBUG_ASYNCFILE#ifdef DEBUG_ASYNCFILE#include <NdbOut.hpp>#define DEBUG(x) x#define PRINT_ERRORANDFLAGS(f) printErrorAndFlags(f)void printErrorAndFlags(Uint32 used_flags);#else#define DEBUG(x) #define PRINT_ERRORANDFLAGS(f)#endif// Define the size of the write buffer (for each thread)#if defined NDB_SOFTOSE || defined NDB_OSE#define WRITEBUFFERSIZE 65536#else #define WRITEBUFFERSIZE 262144#endifconst char *actionName[] = {  "open",  "close",  "closeRemove",  "read",  "readv",  "write",  "writev",  "writeSync",  "writevSync",  "sync",  "end" };static int numAsyncFiles = 0;extern "C" void * runAsyncFile(void* arg){  ((AsyncFile*)arg)->run();  return (NULL);}AsyncFile::AsyncFile() :  theFileName(),#ifdef NDB_WIN32  hFile(INVALID_HANDLE_VALUE),#else  theFd(-1),#endif  theReportTo(0),  theMemoryChannelPtr(NULL){  m_current_request= m_last_request= 0;}voidAsyncFile::doStart(Uint32 nodeId,		   const char * filesystemPath,		   const char * backup_path) {  theFileName.init(nodeId, filesystemPath, backup_path);  // Stacksize for filesystem threads  // An 8k stack should be enough  const NDB_THREAD_STACKSIZE stackSize = 8192;  char buf[16];  numAsyncFiles++;  BaseString::snprintf(buf, sizeof(buf), "AsyncFile%d", numAsyncFiles);  theStartMutexPtr = NdbMutex_Create();  theStartConditionPtr = NdbCondition_Create();  NdbMutex_Lock(theStartMutexPtr);  theStartFlag = false;  theThreadPtr = NdbThread_Create(runAsyncFile,                                  (void**)this,                                  stackSize,                                  (char*)&buf,                                  NDB_THREAD_PRIO_MEAN);  NdbCondition_Wait(theStartConditionPtr,                    theStartMutexPtr);      NdbMutex_Unlock(theStartMutexPtr);  NdbMutex_Destroy(theStartMutexPtr);  NdbCondition_Destroy(theStartConditionPtr);}AsyncFile::~AsyncFile() {  void *status;  Request request;  request.action = Request::end;  theMemoryChannelPtr->writeChannel( &request );  NdbThread_WaitFor(theThreadPtr, &status);  NdbThread_Destroy(&theThreadPtr);  delete theMemoryChannelPtr;}voidAsyncFile::reportTo( MemoryChannel<Request> *reportTo ){  theReportTo = reportTo;}void AsyncFile::execute(Request* request) {  theMemoryChannelPtr->writeChannel( request );}voidAsyncFile::run(){  Request *request;  // Create theMemoryChannel in the thread that will wait for it  NdbMutex_Lock(theStartMutexPtr);  theMemoryChannelPtr = new MemoryChannel<Request>();  theStartFlag = true;  // Create write buffer for bigger writes  theWriteBufferSize = WRITEBUFFERSIZE;  theWriteBuffer = (char *) ndbd_malloc(theWriteBufferSize);   NdbMutex_Unlock(theStartMutexPtr);  NdbCondition_Signal(theStartConditionPtr);    if (!theWriteBuffer) {    DEBUG(ndbout_c("AsyncFile::writeReq, Failed allocating write buffer"));    return;  }//if  while (1) {    request = theMemoryChannelPtr->readChannel();    if (!request) {      DEBUG(ndbout_c("Nothing read from Memory Channel in AsyncFile"));      endReq();      return;    }//if    m_current_request= request;    switch (request->action) {    case Request:: open:      openReq(request);      break;    case Request:: close:      closeReq(request);      break;    case Request:: closeRemove:      closeReq(request);      removeReq(request);      break;    case Request:: read:      readReq(request);      break;    case Request:: readv:      readvReq(request);      break;    case Request:: write:      writeReq(request);      break;    case Request:: writev:      writevReq(request);      break;    case Request:: writeSync:      writeReq(request);      syncReq(request);      break;    case Request:: writevSync:      writevReq(request);      syncReq(request);      break;    case Request:: sync:      syncReq(request);      break;    case Request:: append:      appendReq(request);      break;    case Request::rmrf:      rmrfReq(request, (char*)theFileName.c_str(), request->par.rmrf.own_directory);      break;    case Request:: end:      if (theFd > 0)        closeReq(request);      endReq();      return;    default:      abort();      break;    }//switch    m_last_request= request;    m_current_request= 0;        // No need to signal as ndbfs only uses tryRead    theReportTo->writeChannelNoSignal(request);  }//while}//AsyncFile::run()extern bool Global_useO_SYNC;extern bool Global_useO_DIRECT;extern bool Global_unlinkO_CREAT;extern Uint32 Global_syncFreq;void AsyncFile::openReq(Request* request){    m_openedWithSync = false;  m_syncFrequency = 0;  m_syncCount= 0;  // for open.flags, see signal FSOPENREQ#ifdef NDB_WIN32  DWORD dwCreationDisposition;  DWORD dwDesiredAccess = 0;  DWORD dwShareMode = FILE_SHARE_READ | FILE_SHARE_WRITE;  DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL | FILE_FLAG_RANDOM_ACCESS | FILE_FLAG_NO_BUFFERING;  const Uint32 flags = request->par.open.flags;        // Convert file open flags from Solaris to Windows  if ((flags & FsOpenReq::OM_CREATE) && (flags & FsOpenReq::OM_TRUNCATE)){    dwCreationDisposition = CREATE_ALWAYS;  } else if (flags & FsOpenReq::OM_TRUNCATE){    dwCreationDisposition = TRUNCATE_EXISTING;  } else if (flags & FsOpenReq::OM_CREATE){    dwCreationDisposition = CREATE_NEW;  } else {    dwCreationDisposition = OPEN_EXISTING;  }    switch(flags & 3){  case FsOpenReq::OM_READONLY:    dwDesiredAccess = GENERIC_READ;    break;  case FsOpenReq::OM_WRITEONLY:    dwDesiredAccess = GENERIC_WRITE;    break;  case FsOpenReq::OM_READWRITE:    dwDesiredAccess = GENERIC_READ | GENERIC_WRITE;    break;  default:    request->error = 1000;    break;    return;  }  hFile = CreateFile(theFileName.c_str(), dwDesiredAccess, dwShareMode,                      0, dwCreationDisposition, dwFlagsAndAttributes, 0);      if(INVALID_HANDLE_VALUE == hFile) {    request->error = GetLastError();    if(((ERROR_PATH_NOT_FOUND == request->error) || (ERROR_INVALID_NAME == request->error))       && (flags & FsOpenReq::OM_CREATE)) {      createDirectories();      hFile = CreateFile(theFileName.c_str(), dwDesiredAccess, dwShareMode,                          0, dwCreationDisposition, dwFlagsAndAttributes, 0);                  if(INVALID_HANDLE_VALUE == hFile)        request->error = GetLastError();      else        request->error = 0;                  return;    }  }   else {    request->error = 0;    return;  }#else  const Uint32 flags = request->par.open.flags;  Uint32 new_flags = 0;  // Convert file open flags from Solaris to Liux  if(flags & FsOpenReq::OM_CREATE){    new_flags |= O_CREAT;  }  if(flags & FsOpenReq::OM_TRUNCATE){#if 0    if(Global_unlinkO_CREAT){      unlink(theFileName.c_str());    } else #endif      new_flags |= O_TRUNC;  }    if(flags & FsOpenReq::OM_APPEND){    new_flags |= O_APPEND;  }  if(flags & FsOpenReq::OM_SYNC){#if 0    if(Global_useO_SYNC){      new_flags |= O_SYNC;      m_openedWithSync = true;      m_syncFrequency = 0;    } else {#endif      m_openedWithSync = false;      m_syncFrequency = Global_syncFreq;#if 0    }#endif  } else {    m_openedWithSync = false;    m_syncFrequency = 0;  }#if 0  //#if NDB_LINUX  if(Global_useO_DIRECT){    new_flags |= O_DIRECT;  }#endif  switch(flags & 0x3){  case FsOpenReq::OM_READONLY:    new_flags |= O_RDONLY;    break;  case FsOpenReq::OM_WRITEONLY:    new_flags |= O_WRONLY;    break;  case FsOpenReq::OM_READWRITE:    new_flags |= O_RDWR;    break;  default:    request->error = 1000;    break;    return;  }  const int mode = S_IRUSR | S_IWUSR | S_IRGRP;    if (-1 == (theFd = ::open(theFileName.c_str(), new_flags, mode))) {    PRINT_ERRORANDFLAGS(new_flags);    if( (errno == ENOENT ) && (new_flags & O_CREAT ) ) {      createDirectories();      if (-1 == (theFd = ::open(theFileName.c_str(), new_flags, mode))) {        PRINT_ERRORANDFLAGS(new_flags);        request->error = errno;      }    } else {      request->error = errno;    }  }#endif}intAsyncFile::readBuffer(char * buf, size_t size, off_t offset){  int return_value;  #ifdef NDB_WIN32  DWORD dwSFP = SetFilePointer(hFile, offset, 0, FILE_BEGIN);  if(dwSFP != offset) {    return GetLastError();  }#elif ! defined(HAVE_PREAD)  off_t seek_val;  while((seek_val= lseek(theFd, offset, SEEK_SET)) == (off_t)-1 	&& errno == EINTR);  if(seek_val == (off_t)-1)  {    return errno;  }#endif      while (size > 0) {    size_t bytes_read = 0;    #ifdef NDB_WIN32    DWORD dwBytesRead;    BOOL bRead = ReadFile(hFile,                           buf,                          size,                          &dwBytesRead,                          0);    if(!bRead){      return GetLastError();    }     bytes_read = dwBytesRead;#elif  ! defined(HAVE_PREAD)    return_value = ::read(theFd, buf, size);#else // UNIX    return_value = ::pread(theFd, buf, size, offset);#endif#ifndef NDB_WIN32    if (return_value == -1 && errno == EINTR) {      DEBUG(ndbout_c("EINTR in read"));      continue;    } else if (return_value == -1){      return errno;    } else {      bytes_read = return_value;    }#endif    if(bytes_read == 0){      DEBUG(ndbout_c("Read underflow %d %d\n %x\n%d %d",             size, offset, buf, bytes_read, return_value));      return ERR_ReadUnderflow;    }        if(bytes_read != size){      DEBUG(ndbout_c("Warning partial read %d != %d",             bytes_read, size));    }    buf += bytes_read;    size -= bytes_read;    offset += bytes_read;  }  return 0;}voidAsyncFile::readReq( Request * request){  for(int i = 0; i < request->par.readWrite.numberOfPages ; i++) {    off_t offset = request->par.readWrite.pages[i].offset;    size_t size  = request->par.readWrite.pages[i].size;    char * buf   = request->par.readWrite.pages[i].buf;        int err = readBuffer(buf, size, offset);    if(err != 0){      request->error = err;      return;    }  }}voidAsyncFile::readvReq( Request * request){#if ! defined(HAVE_PREAD)  readReq(request);  return;#elif defined NDB_WIN32  // ReadFileScatter?  readReq(request);  return;#else  int return_value;  int length = 0;  struct iovec iov[20]; // the parameter in the signal restricts this to 20 deep  for(int i=0; i < request->par.readWrite.numberOfPages ; i++) {    iov[i].iov_base= request->par.readWrite.pages[i].buf;    iov[i].iov_len= request->par.readWrite.pages[i].size;    length = length + iov[i].iov_len;  }  lseek( theFd, request->par.readWrite.pages[0].offset, SEEK_SET );  return_value = ::readv(theFd, iov, request->par.readWrite.numberOfPages);  if (return_value == -1) {    request->error = errno;    return;  } else if (return_value != length) {    request->error = 1011;    return;  }#endif}int AsyncFile::extendfile(Request* request) {#if ! defined(HAVE_PWRITE)  // Find max size of this file in this request  int maxOffset = 0;  int maxSize = 0;  for(int i=0; i < request->par.readWrite.numberOfPages ; i++) {    if (request->par.readWrite.pages[i].offset > maxOffset) {      maxOffset = request->par.readWrite.pages[i].offset;      maxSize = request->par.readWrite.pages[i].size;    }  }  DEBUG(ndbout_c("extendfile: maxOffset=%d, size=%d", maxOffset, maxSize));  // Allocate a buffer and fill it with zeros  void* pbuf = ndbd_malloc(maxSize);  memset(pbuf, 0, maxSize);  for (int p = 0; p <= maxOffset; p = p + maxSize) {    int return_value;    return_value = lseek(theFd,                          p,                         SEEK_SET);    if((return_value == -1 ) || (return_value != p)) {      ndbd_free(pbuf,maxSize);      return -1;    }    return_value = ::write(theFd,                            pbuf,                           maxSize);    if ((return_value == -1) || (return_value != maxSize)) {      ndbd_free(pbuf,maxSize);      return -1;    }  }  ndbd_free(pbuf,maxSize);    DEBUG(ndbout_c("extendfile: \"%s\" OK!", theFileName.c_str()));  return 0;#else  request = request;  abort();  return -1;#endif}voidAsyncFile::writeReq( Request * request){  int page_num = 0;  bool write_not_complete = true;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -