📄 asyncfile.cpp
字号:
while(write_not_complete) { int totsize = 0; off_t offset = request->par.readWrite.pages[page_num].offset; char* bufptr = theWriteBuffer; write_not_complete = false; if (request->par.readWrite.numberOfPages > 1) { off_t page_offset = offset; // Multiple page write, copy to buffer for one write for(int i=page_num; i < request->par.readWrite.numberOfPages; i++) { memcpy(bufptr, request->par.readWrite.pages[i].buf, request->par.readWrite.pages[i].size); bufptr += request->par.readWrite.pages[i].size; totsize += request->par.readWrite.pages[i].size; if (((i + 1) < request->par.readWrite.numberOfPages)) { // There are more pages to write // Check that offsets are consequtive off_t tmp = page_offset + request->par.readWrite.pages[i].size; if (tmp != request->par.readWrite.pages[i+1].offset) { // Next page is not aligned with previous, not allowed DEBUG(ndbout_c("Page offsets are not aligned")); request->error = EINVAL; return; } if ((unsigned)(totsize + request->par.readWrite.pages[i+1].size) > (unsigned)theWriteBufferSize) { // We are not finished and the buffer is full write_not_complete = true; // Start again with next page page_num = i + 1; break; } } page_offset += request->par.readWrite.pages[i].size; } bufptr = theWriteBuffer; } else { // One page write, write page directly bufptr = request->par.readWrite.pages[0].buf; totsize = request->par.readWrite.pages[0].size; } int err = writeBuffer(bufptr, totsize, offset); if(err != 0){ request->error = err; return; } } // while(write_not_complete)}intAsyncFile::writeBuffer(const char * buf, size_t size, off_t offset, size_t chunk_size){ size_t bytes_to_write = chunk_size; int return_value;#ifdef NDB_WIN32 DWORD dwSFP = SetFilePointer(hFile, offset, 0, FILE_BEGIN); if(dwSFP != offset) { return GetLastError(); }#elif ! defined(HAVE_PWRITE) off_t seek_val; while((seek_val= lseek(theFd, offset, SEEK_SET)) == (off_t)-1 && errno == EINTR); if(seek_val == (off_t)-1) { return errno; }#endif while (size > 0) { if (size < bytes_to_write){ // We are at the last chunk bytes_to_write = size; } size_t bytes_written = 0; #ifdef NDB_WIN32 DWORD dwWritten; BOOL bWrite = WriteFile(hFile, buf, bytes_to_write, &dwWritten, 0); if(!bWrite) { return GetLastError(); } bytes_written = dwWritten; if (bytes_written != bytes_to_write) { DEBUG(ndbout_c("Warning partial write %d != %d", bytes_written, bytes_to_write)); } #elif ! defined(HAVE_PWRITE) return_value = ::write(theFd, buf, bytes_to_write);#else // UNIX return_value = ::pwrite(theFd, buf, bytes_to_write, offset);#endif#ifndef NDB_WIN32 if (return_value == -1 && errno == EINTR) { bytes_written = 0; DEBUG(ndbout_c("EINTR in write")); } else if (return_value == -1){ return errno; } else { bytes_written = return_value; if(bytes_written == 0){ abort(); } if(bytes_written != bytes_to_write){ DEBUG(ndbout_c("Warning partial write %d != %d", bytes_written, bytes_to_write)); } }#endif m_syncCount+= bytes_written; buf += bytes_written; size -= bytes_written; offset += bytes_written; } return 0;}voidAsyncFile::writevReq( Request * request){ // WriteFileGather on WIN32? writeReq(request);}voidAsyncFile::closeReq(Request * request){ syncReq(request);#ifdef NDB_WIN32 if(!CloseHandle(hFile)) { request->error = GetLastError(); } hFile = INVALID_HANDLE_VALUE;#else if (-1 == ::close(theFd)) {#ifndef DBUG_OFF if (theFd == -1) abort();#endif request->error = errno; } theFd = -1;#endif}bool AsyncFile::isOpen(){#ifdef NDB_WIN32 return (hFile != INVALID_HANDLE_VALUE);#else return (theFd != -1);#endif}voidAsyncFile::syncReq(Request * request){ if(m_openedWithSync || m_syncCount == 0){ return; }#ifdef NDB_WIN32 if(!FlushFileBuffers(hFile)) { request->error = GetLastError(); return; }#else if (-1 == ::fsync(theFd)){ request->error = errno; return; }#endif m_syncCount = 0;}voidAsyncFile::appendReq(Request * request){ const char * buf = request->par.append.buf; Uint32 size = request->par.append.size; m_syncCount += size;#ifdef NDB_WIN32 DWORD dwWritten = 0; while(size > 0){ if(!WriteFile(hFile, buf, size, &dwWritten, 0)){ request->error = GetLastError(); return ; } buf += dwWritten; size -= dwWritten; }#else while(size > 0){ const int n = write(theFd, buf, size); if(n == -1 && errno == EINTR){ continue; } if(n == -1){ request->error = errno; return; } if(n == 0){ abort(); } size -= n; buf += n; }#endif if(m_syncFrequency != 0 && m_syncCount > m_syncFrequency){ syncReq(request); }}voidAsyncFile::removeReq(Request * request){#ifdef NDB_WIN32 if(!DeleteFile(theFileName.c_str())) { request->error = GetLastError(); }#else if (-1 == ::remove(theFileName.c_str())) { request->error = errno; }#endif}voidAsyncFile::rmrfReq(Request * request, char * path, bool removePath){ Uint32 path_len = strlen(path); Uint32 path_max_copy = PATH_MAX - path_len; char* path_add = &path[path_len];#ifndef NDB_WIN32 if(!request->par.rmrf.directory){ // Remove file if(unlink((const char *)path) != 0 && errno != ENOENT) request->error = errno; return; } // Remove directory DIR* dirp = opendir((const char *)path); if(dirp == 0){ if(errno != ENOENT) request->error = errno; return; } struct dirent * dp; while ((dp = readdir(dirp)) != NULL){ if ((strcmp(".", dp->d_name) != 0) && (strcmp("..", dp->d_name) != 0)) { BaseString::snprintf(path_add, (size_t)path_max_copy, "%s%s", DIR_SEPARATOR, dp->d_name); if(remove((const char*)path) == 0){ path[path_len] = 0; continue; } rmrfReq(request, path, true); path[path_len] = 0; if(request->error != 0){ closedir(dirp); return; } } } closedir(dirp); if(removePath && rmdir((const char *)path) != 0){ request->error = errno; } return;#else if(!request->par.rmrf.directory){ // Remove file if(!DeleteFile(path)){ DWORD dwError = GetLastError(); if(dwError!=ERROR_FILE_NOT_FOUND) request->error = dwError; } return; } strcat(path, "\\*"); WIN32_FIND_DATA ffd; HANDLE hFindFile = FindFirstFile(path, &ffd); path[path_len] = 0; if(INVALID_HANDLE_VALUE==hFindFile){ DWORD dwError = GetLastError(); if(dwError!=ERROR_PATH_NOT_FOUND) request->error = dwError; return; } do { if(0!=strcmp(".", ffd.cFileName) && 0!=strcmp("..", ffd.cFileName)){ strcat(path, "\\"); strcat(path, ffd.cFileName); if(DeleteFile(path)) { path[path_len] = 0; continue; }//if rmrfReq(request, path, true); path[path_len] = 0; if(request->error != 0){ FindClose(hFindFile); return; } } } while(FindNextFile(hFindFile, &ffd)); FindClose(hFindFile); if(removePath && !RemoveDirectory(path)) request->error = GetLastError(); #endif}void AsyncFile::endReq(){ // Thread is ended with return if (theWriteBuffer) ndbd_free(theWriteBuffer, theWriteBufferSize);}void AsyncFile::createDirectories(){ for (int i = 0; i < theFileName.levels(); i++) {#ifdef NDB_WIN32 CreateDirectory(theFileName.directory(i), 0);#else //printf("AsyncFile::createDirectories : \"%s\"\n", theFileName.directory(i)); mkdir(theFileName.directory(i), S_IRUSR | S_IWUSR | S_IXUSR | S_IXGRP | S_IRGRP);#endif }}#ifdef DEBUG_ASYNCFILEvoid printErrorAndFlags(Uint32 used_flags) { char buf[255]; sprintf(buf, "PEAF: errno=%d \"", errno); switch(errno) { case EACCES: strcat(buf, "EACCES"); break; case EDQUOT: strcat(buf, "EDQUOT"); break; case EEXIST : strcat(buf, "EEXIST"); break; case EINTR : strcat(buf, "EINTR"); break; case EFAULT : strcat(buf, "EFAULT"); break; case EIO : strcat(buf, "EIO"); break; case EISDIR : strcat(buf, "EISDIR"); break; case ELOOP : strcat(buf, "ELOOP"); break; case EMFILE : strcat(buf, "EMFILE"); break; case ENFILE : strcat(buf, "ENFILE"); break; case ENOENT : strcat(buf, "ENOENT "); break; case ENOSPC : strcat(buf, "ENOSPC"); break; case ENOTDIR : strcat(buf, "ENOTDIR"); break; case ENXIO : strcat(buf, "ENXIO"); break; case EOPNOTSUPP: strcat(buf, "EOPNOTSUPP"); break;#if !defined NDB_OSE && !defined NDB_SOFTOSE case EMULTIHOP : strcat(buf, "EMULTIHOP"); break; case ENOLINK : strcat(buf, "ENOLINK"); break; case ENOSR : strcat(buf, "ENOSR"); break; case EOVERFLOW : strcat(buf, "EOVERFLOW"); break;#endif case EROFS : strcat(buf, "EROFS"); break; case EAGAIN : strcat(buf, "EAGAIN"); break; case EINVAL : strcat(buf, "EINVAL"); break; case ENOMEM : strcat(buf, "ENOMEM"); break; case ETXTBSY : strcat(buf, "ETXTBSY"); break; case ENAMETOOLONG: strcat(buf, "ENAMETOOLONG"); break; case EBADF: strcat(buf, "EBADF"); break; case ESPIPE: strcat(buf, "ESPIPE"); break; case ESTALE: strcat(buf, "ESTALE"); break; default: strcat(buf, "EOTHER"); break; } strcat(buf, "\" ");#if defined NDB_OSE strcat(buf, strerror(errno) << " ");#endif strcat(buf, " flags: "); switch(used_flags & 3){ case O_RDONLY: strcat(buf, "O_RDONLY, "); break; case O_WRONLY: strcat(buf, "O_WRONLY, "); break; case O_RDWR: strcat(buf, "O_RDWR, "); break; default: strcat(buf, "Unknown!!, "); } if((used_flags & O_APPEND)==O_APPEND) strcat(buf, "O_APPEND, "); if((used_flags & O_CREAT)==O_CREAT) strcat(buf, "O_CREAT, "); if((used_flags & O_EXCL)==O_EXCL) strcat(buf, "O_EXCL, "); if((used_flags & O_NOCTTY) == O_NOCTTY) strcat(buf, "O_NOCTTY, "); if((used_flags & O_NONBLOCK)==O_NONBLOCK) strcat(buf, "O_NONBLOCK, "); if((used_flags & O_TRUNC)==O_TRUNC) strcat(buf, "O_TRUNC, ");#if !defined NDB_OSE && !defined NDB_SOFTOSE if((used_flags & O_DSYNC)==O_DSYNC) strcat(buf, "O_DSYNC, "); if((used_flags & O_NDELAY)==O_NDELAY) strcat(buf, "O_NDELAY, "); if((used_flags & O_RSYNC)==O_RSYNC) strcat(buf, "O_RSYNC, "); if((used_flags & O_SYNC)==O_SYNC) strcat(buf, "O_SYNC, "); DEBUG(ndbout_c(buf));#endif}#endifNdbOut&operator<<(NdbOut& out, const Request& req){ out << "[ Request: file: " << hex << req.file << " userRef: " << hex << req.theUserReference << " userData: " << dec << req.theUserPointer << " theFilePointer: " << req.theFilePointer << " action: "; switch(req.action){ case Request::open: out << "open"; break; case Request::close: out << "close"; break; case Request::closeRemove: out << "closeRemove"; break; case Request::read: // Allways leave readv directly after out << "read"; break; case Request::readv: out << "readv"; break; case Request::write:// Allways leave writev directly after out << "write"; break; case Request::writev: out << "writev"; break; case Request::writeSync:// Allways leave writevSync directly after out << "writeSync"; break; // writeSync because SimblockAsyncFileSystem depends on it case Request::writevSync: out << "writevSync"; break; case Request::sync: out << "sync"; break; case Request::end: out << "end"; break; case Request::append: out << "append"; break; case Request::rmrf: out << "rmrf"; break; default: out << (Uint32)req.action; break; } out << " ]"; return out;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -