📄 asyncfile.cpp
字号:
/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#include <ndb_global.h>#include <my_sys.h>#include <my_pthread.h>#include "AsyncFile.hpp"#include <ErrorHandlingMacros.hpp>#include <kernel_types.h>#include <ndbd_malloc.hpp>#include <NdbThread.h>#include <signaldata/FsOpenReq.hpp>// use this to test broken pread code//#define HAVE_BROKEN_PREAD #ifdef HAVE_BROKEN_PREAD#undef HAVE_PWRITE#undef HAVE_PREAD#endif#if defined NDB_WIN32 || defined NDB_OSE || defined NDB_SOFTOSE#else// For readv and writev#include <sys/uio.h> #endif#ifndef NDB_WIN32#include <dirent.h>#endif// Use this define if you want printouts from AsyncFile class//#define DEBUG_ASYNCFILE#ifdef DEBUG_ASYNCFILE#include <NdbOut.hpp>#define DEBUG(x) x#define PRINT_ERRORANDFLAGS(f) printErrorAndFlags(f)void printErrorAndFlags(Uint32 used_flags);#else#define DEBUG(x) #define PRINT_ERRORANDFLAGS(f)#endif// Define the size of the write buffer (for each thread)#if defined NDB_SOFTOSE || defined NDB_OSE#define WRITEBUFFERSIZE 65536#else #define WRITEBUFFERSIZE 262144#endifconst char *actionName[] = { "open", "close", "closeRemove", "read", "readv", "write", "writev", "writeSync", "writevSync", "sync", "end" };static int numAsyncFiles = 0;extern "C" void * runAsyncFile(void* arg){ ((AsyncFile*)arg)->run(); return (NULL);}AsyncFile::AsyncFile() : theFileName(),#ifdef NDB_WIN32 hFile(INVALID_HANDLE_VALUE),#else theFd(-1),#endif theReportTo(0), theMemoryChannelPtr(NULL){ m_current_request= m_last_request= 0;}voidAsyncFile::doStart(Uint32 nodeId, const char * filesystemPath, const char * backup_path) { theFileName.init(nodeId, filesystemPath, backup_path); // Stacksize for filesystem threads // An 8k stack should be enough const NDB_THREAD_STACKSIZE stackSize = 8192; char buf[16]; numAsyncFiles++; BaseString::snprintf(buf, sizeof(buf), "AsyncFile%d", numAsyncFiles); theStartMutexPtr = NdbMutex_Create(); theStartConditionPtr = NdbCondition_Create(); NdbMutex_Lock(theStartMutexPtr); theStartFlag = false; theThreadPtr = NdbThread_Create(runAsyncFile, (void**)this, stackSize, (char*)&buf, NDB_THREAD_PRIO_MEAN); NdbCondition_Wait(theStartConditionPtr, theStartMutexPtr); NdbMutex_Unlock(theStartMutexPtr); NdbMutex_Destroy(theStartMutexPtr); NdbCondition_Destroy(theStartConditionPtr);}AsyncFile::~AsyncFile() { void *status; Request request; request.action = Request::end; theMemoryChannelPtr->writeChannel( &request ); NdbThread_WaitFor(theThreadPtr, &status); NdbThread_Destroy(&theThreadPtr); delete theMemoryChannelPtr;}voidAsyncFile::reportTo( MemoryChannel<Request> *reportTo ){ theReportTo = reportTo;}void AsyncFile::execute(Request* request) { theMemoryChannelPtr->writeChannel( request );}voidAsyncFile::run(){ Request *request; // Create theMemoryChannel in the thread that will wait for it NdbMutex_Lock(theStartMutexPtr); theMemoryChannelPtr = new MemoryChannel<Request>(); theStartFlag = true; // Create write buffer for bigger writes theWriteBufferSize = WRITEBUFFERSIZE; theWriteBuffer = (char *) ndbd_malloc(theWriteBufferSize); NdbMutex_Unlock(theStartMutexPtr); NdbCondition_Signal(theStartConditionPtr); if (!theWriteBuffer) { DEBUG(ndbout_c("AsyncFile::writeReq, Failed allocating write buffer")); return; }//if while (1) { request = theMemoryChannelPtr->readChannel(); if (!request) { DEBUG(ndbout_c("Nothing read from Memory Channel in AsyncFile")); endReq(); return; }//if m_current_request= request; switch (request->action) { case Request:: open: openReq(request); break; case Request:: close: closeReq(request); break; case Request:: closeRemove: closeReq(request); removeReq(request); break; case Request:: read: readReq(request); break; case Request:: readv: readvReq(request); break; case Request:: write: writeReq(request); break; case Request:: writev: writevReq(request); break; case Request:: writeSync: writeReq(request); syncReq(request); break; case Request:: writevSync: writevReq(request); syncReq(request); break; case Request:: sync: syncReq(request); break; case Request:: append: appendReq(request); break; case Request::rmrf: rmrfReq(request, (char*)theFileName.c_str(), request->par.rmrf.own_directory); break; case Request:: end: if (theFd > 0) closeReq(request); endReq(); return; default: abort(); break; }//switch m_last_request= request; m_current_request= 0; // No need to signal as ndbfs only uses tryRead theReportTo->writeChannelNoSignal(request); }//while}//AsyncFile::run()extern bool Global_useO_SYNC;extern bool Global_useO_DIRECT;extern bool Global_unlinkO_CREAT;extern Uint32 Global_syncFreq;void AsyncFile::openReq(Request* request){ m_openedWithSync = false; m_syncFrequency = 0; m_syncCount= 0; // for open.flags, see signal FSOPENREQ#ifdef NDB_WIN32 DWORD dwCreationDisposition; DWORD dwDesiredAccess = 0; DWORD dwShareMode = FILE_SHARE_READ | FILE_SHARE_WRITE; DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL | FILE_FLAG_RANDOM_ACCESS | FILE_FLAG_NO_BUFFERING; const Uint32 flags = request->par.open.flags; // Convert file open flags from Solaris to Windows if ((flags & FsOpenReq::OM_CREATE) && (flags & FsOpenReq::OM_TRUNCATE)){ dwCreationDisposition = CREATE_ALWAYS; } else if (flags & FsOpenReq::OM_TRUNCATE){ dwCreationDisposition = TRUNCATE_EXISTING; } else if (flags & FsOpenReq::OM_CREATE){ dwCreationDisposition = CREATE_NEW; } else { dwCreationDisposition = OPEN_EXISTING; } switch(flags & 3){ case FsOpenReq::OM_READONLY: dwDesiredAccess = GENERIC_READ; break; case FsOpenReq::OM_WRITEONLY: dwDesiredAccess = GENERIC_WRITE; break; case FsOpenReq::OM_READWRITE: dwDesiredAccess = GENERIC_READ | GENERIC_WRITE; break; default: request->error = 1000; break; return; } hFile = CreateFile(theFileName.c_str(), dwDesiredAccess, dwShareMode, 0, dwCreationDisposition, dwFlagsAndAttributes, 0); if(INVALID_HANDLE_VALUE == hFile) { request->error = GetLastError(); if(((ERROR_PATH_NOT_FOUND == request->error) || (ERROR_INVALID_NAME == request->error)) && (flags & FsOpenReq::OM_CREATE)) { createDirectories(); hFile = CreateFile(theFileName.c_str(), dwDesiredAccess, dwShareMode, 0, dwCreationDisposition, dwFlagsAndAttributes, 0); if(INVALID_HANDLE_VALUE == hFile) request->error = GetLastError(); else request->error = 0; return; } } else { request->error = 0; return; }#else const Uint32 flags = request->par.open.flags; Uint32 new_flags = 0; // Convert file open flags from Solaris to Liux if(flags & FsOpenReq::OM_CREATE){ new_flags |= O_CREAT; } if(flags & FsOpenReq::OM_TRUNCATE){#if 0 if(Global_unlinkO_CREAT){ unlink(theFileName.c_str()); } else #endif new_flags |= O_TRUNC; } if(flags & FsOpenReq::OM_APPEND){ new_flags |= O_APPEND; } if(flags & FsOpenReq::OM_SYNC){#if 0 if(Global_useO_SYNC){ new_flags |= O_SYNC; m_openedWithSync = true; m_syncFrequency = 0; } else {#endif m_openedWithSync = false; m_syncFrequency = Global_syncFreq;#if 0 }#endif } else { m_openedWithSync = false; m_syncFrequency = 0; }#if 0 //#if NDB_LINUX if(Global_useO_DIRECT){ new_flags |= O_DIRECT; }#endif switch(flags & 0x3){ case FsOpenReq::OM_READONLY: new_flags |= O_RDONLY; break; case FsOpenReq::OM_WRITEONLY: new_flags |= O_WRONLY; break; case FsOpenReq::OM_READWRITE: new_flags |= O_RDWR; break; default: request->error = 1000; break; return; } const int mode = S_IRUSR | S_IWUSR | S_IRGRP; if (-1 == (theFd = ::open(theFileName.c_str(), new_flags, mode))) { PRINT_ERRORANDFLAGS(new_flags); if( (errno == ENOENT ) && (new_flags & O_CREAT ) ) { createDirectories(); if (-1 == (theFd = ::open(theFileName.c_str(), new_flags, mode))) { PRINT_ERRORANDFLAGS(new_flags); request->error = errno; } } else { request->error = errno; } }#endif}intAsyncFile::readBuffer(char * buf, size_t size, off_t offset){ int return_value; #ifdef NDB_WIN32 DWORD dwSFP = SetFilePointer(hFile, offset, 0, FILE_BEGIN); if(dwSFP != offset) { return GetLastError(); }#elif ! defined(HAVE_PREAD) off_t seek_val; while((seek_val= lseek(theFd, offset, SEEK_SET)) == (off_t)-1 && errno == EINTR); if(seek_val == (off_t)-1) { return errno; }#endif while (size > 0) { size_t bytes_read = 0; #ifdef NDB_WIN32 DWORD dwBytesRead; BOOL bRead = ReadFile(hFile, buf, size, &dwBytesRead, 0); if(!bRead){ return GetLastError(); } bytes_read = dwBytesRead;#elif ! defined(HAVE_PREAD) return_value = ::read(theFd, buf, size);#else // UNIX return_value = ::pread(theFd, buf, size, offset);#endif#ifndef NDB_WIN32 if (return_value == -1 && errno == EINTR) { DEBUG(ndbout_c("EINTR in read")); continue; } else if (return_value == -1){ return errno; } else { bytes_read = return_value; }#endif if(bytes_read == 0){ DEBUG(ndbout_c("Read underflow %d %d\n %x\n%d %d", size, offset, buf, bytes_read, return_value)); return ERR_ReadUnderflow; } if(bytes_read != size){ DEBUG(ndbout_c("Warning partial read %d != %d", bytes_read, size)); } buf += bytes_read; size -= bytes_read; offset += bytes_read; } return 0;}voidAsyncFile::readReq( Request * request){ for(int i = 0; i < request->par.readWrite.numberOfPages ; i++) { off_t offset = request->par.readWrite.pages[i].offset; size_t size = request->par.readWrite.pages[i].size; char * buf = request->par.readWrite.pages[i].buf; int err = readBuffer(buf, size, offset); if(err != 0){ request->error = err; return; } }}voidAsyncFile::readvReq( Request * request){#if ! defined(HAVE_PREAD) readReq(request); return;#elif defined NDB_WIN32 // ReadFileScatter? readReq(request); return;#else int return_value; int length = 0; struct iovec iov[20]; // the parameter in the signal restricts this to 20 deep for(int i=0; i < request->par.readWrite.numberOfPages ; i++) { iov[i].iov_base= request->par.readWrite.pages[i].buf; iov[i].iov_len= request->par.readWrite.pages[i].size; length = length + iov[i].iov_len; } lseek( theFd, request->par.readWrite.pages[0].offset, SEEK_SET ); return_value = ::readv(theFd, iov, request->par.readWrite.numberOfPages); if (return_value == -1) { request->error = errno; return; } else if (return_value != length) { request->error = 1011; return; }#endif}int AsyncFile::extendfile(Request* request) {#if ! defined(HAVE_PWRITE) // Find max size of this file in this request int maxOffset = 0; int maxSize = 0; for(int i=0; i < request->par.readWrite.numberOfPages ; i++) { if (request->par.readWrite.pages[i].offset > maxOffset) { maxOffset = request->par.readWrite.pages[i].offset; maxSize = request->par.readWrite.pages[i].size; } } DEBUG(ndbout_c("extendfile: maxOffset=%d, size=%d", maxOffset, maxSize)); // Allocate a buffer and fill it with zeros void* pbuf = ndbd_malloc(maxSize); memset(pbuf, 0, maxSize); for (int p = 0; p <= maxOffset; p = p + maxSize) { int return_value; return_value = lseek(theFd, p, SEEK_SET); if((return_value == -1 ) || (return_value != p)) { ndbd_free(pbuf,maxSize); return -1; } return_value = ::write(theFd, pbuf, maxSize); if ((return_value == -1) || (return_value != maxSize)) { ndbd_free(pbuf,maxSize); return -1; } } ndbd_free(pbuf,maxSize); DEBUG(ndbout_c("extendfile: \"%s\" OK!", theFileName.c_str())); return 0;#else request = request; abort(); return -1;#endif}voidAsyncFile::writeReq( Request * request){ int page_num = 0; bool write_not_complete = true;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -