📄 ndbfs.cpp
字号:
jam(); theRequestPool->put(request); FsRef * const fsRef = (FsRef *)&signal->theData[0]; fsRef->userPointer = userPointer; fsRef->setErrorCode(fsRef->errorCode, errorCode); fsRef->osErrorCode = ~0; // Indicate local error jam(); sendSignal(userRef, GSN_FSAPPENDREF, signal, 3, JBB); return;}Uint16Ndbfs::newId(){ // finds a new key, eg a new filepointer for (int i = 1; i < SHRT_MAX; i++) { if (theLastId == SHRT_MAX) { jam(); theLastId = 1; } else { jam(); theLastId++; } if(theOpenFiles.find(theLastId) == NULL) { jam(); return theLastId; } } ndbrequire(1 == 0); // The program will not reach this point return 0;}AsyncFile*Ndbfs::createAsyncFile(){ // Check limit of open files if (theFiles.size()+1 == m_maxFiles) { // Print info about all open files for (unsigned i = 0; i < theFiles.size(); i++){ AsyncFile* file = theFiles[i]; ndbout_c("%2d (0x%x): %s", i, file, file->isOpen()?"OPEN":"CLOSED"); } ERROR_SET(fatal, NDBD_EXIT_AFS_MAXOPEN,""," Ndbfs::createAsyncFile"); } AsyncFile* file = new AsyncFile; file->doStart(getOwnNodeId(), theFileSystemPath, theBackupFilePath); // Put the file in list of all files theFiles.push_back(file);#ifdef VM_TRACE infoEvent("NDBFS: Created new file thread %d", theFiles.size());#endif return file;}AsyncFile*Ndbfs::getIdleFile(){ AsyncFile* file; if (theIdleFiles.size() > 0){ file = theIdleFiles[0]; theIdleFiles.erase(0); } else { file = createAsyncFile(); } return file;}voidNdbfs::report(Request * request, Signal* signal){ const Uint32 orgTrace = signal->getTrace(); signal->setTrace(request->theTrace); const BlockReference ref = request->theUserReference; if (request->error) { jam(); // Initialise FsRef signal FsRef * const fsRef = (FsRef *)&signal->theData[0]; fsRef->userPointer = request->theUserPointer; fsRef->setErrorCode(fsRef->errorCode, translateErrno(request->error)); fsRef->osErrorCode = request->error; switch (request->action) { case Request:: open: { jam(); // Put the file back in idle files list theIdleFiles.push_back(request->file); sendSignal(ref, GSN_FSOPENREF, signal, FsRef::SignalLength, JBB); break; } case Request:: closeRemove: case Request:: close: { jam(); sendSignal(ref, GSN_FSCLOSEREF, signal, FsRef::SignalLength, JBB); break; } case Request:: writeSync: case Request:: writevSync: case Request:: write: case Request:: writev: { jam(); sendSignal(ref, GSN_FSWRITEREF, signal, FsRef::SignalLength, JBB); break; } case Request:: read: case Request:: readv: { jam(); sendSignal(ref, GSN_FSREADREF, signal, FsRef::SignalLength, JBB); break; } case Request:: sync: { jam(); sendSignal(ref, GSN_FSSYNCREF, signal, FsRef::SignalLength, JBB); break; } case Request::append: { jam(); sendSignal(ref, GSN_FSAPPENDREF, signal, FsRef::SignalLength, JBB); break; } case Request::rmrf: { jam(); // Put the file back in idle files list theIdleFiles.push_back(request->file); sendSignal(ref, GSN_FSREMOVEREF, signal, FsRef::SignalLength, JBB); break; } case Request:: end: { // Report nothing break; } }//switch } else { jam(); FsConf * const fsConf = (FsConf *)&signal->theData[0]; fsConf->userPointer = request->theUserPointer; switch (request->action) { case Request:: open: { jam(); theOpenFiles.insert(request->file, request->theFilePointer); // Keep track on max number of opened files if (theOpenFiles.size() > m_maxOpenedFiles) m_maxOpenedFiles = theOpenFiles.size(); fsConf->filePointer = request->theFilePointer; sendSignal(ref, GSN_FSOPENCONF, signal, 3, JBB); break; } case Request:: closeRemove: case Request:: close: { jam(); // removes the file from OpenFiles list theOpenFiles.erase(request->theFilePointer); // Put the file in idle files list theIdleFiles.push_back(request->file); sendSignal(ref, GSN_FSCLOSECONF, signal, 1, JBB); break; } case Request:: writeSync: case Request:: writevSync: case Request:: write: case Request:: writev: { jam(); sendSignal(ref, GSN_FSWRITECONF, signal, 1, JBB); break; } case Request:: read: case Request:: readv: { jam(); sendSignal(ref, GSN_FSREADCONF, signal, 1, JBB); break; } case Request:: sync: { jam(); sendSignal(ref, GSN_FSSYNCCONF, signal, 1, JBB); break; }//case case Request::append: { jam(); signal->theData[1] = request->par.append.size; sendSignal(ref, GSN_FSAPPENDCONF, signal, 2, JBB); break; } case Request::rmrf: { jam(); // Put the file in idle files list theIdleFiles.push_back(request->file); sendSignal(ref, GSN_FSREMOVECONF, signal, 1, JBB); break; } case Request:: end: { // Report nothing break; } } }//if signal->setTrace(orgTrace);}boolNdbfs::scanIPC(Signal* signal){ Request* request = theFromThreads.tryReadChannel(); jam(); if (request) { jam(); report(request, signal); theRequestPool->put(request); return true; } return false;}#if defined NDB_WIN32int Ndbfs::translateErrno(int aErrno){ switch (aErrno) { //permission denied case ERROR_ACCESS_DENIED: return FsRef::fsErrPermissionDenied; //temporary not accessible case ERROR_PATH_BUSY: case ERROR_NO_MORE_SEARCH_HANDLES: return FsRef::fsErrTemporaryNotAccessible; //no space left on device case ERROR_HANDLE_DISK_FULL: case ERROR_DISK_FULL: return FsRef::fsErrNoSpaceLeftOnDevice; //none valid parameters case ERROR_INVALID_HANDLE: case ERROR_INVALID_DRIVE: case ERROR_INVALID_ACCESS: case ERROR_HANDLE_EOF: case ERROR_BUFFER_OVERFLOW: return FsRef::fsErrInvalidParameters; //environment error case ERROR_CRC: case ERROR_ARENA_TRASHED: case ERROR_BAD_ENVIRONMENT: case ERROR_INVALID_BLOCK: case ERROR_WRITE_FAULT: case ERROR_READ_FAULT: case ERROR_OPEN_FAILED: return FsRef::fsErrEnvironmentError; //no more process resources case ERROR_TOO_MANY_OPEN_FILES: case ERROR_NOT_ENOUGH_MEMORY: case ERROR_OUTOFMEMORY: return FsRef::fsErrNoMoreResources; //no file case ERROR_FILE_NOT_FOUND: return FsRef::fsErrFileDoesNotExist; case ERR_ReadUnderflow: return FsRef::fsErrReadUnderflow; default: return FsRef::fsErrUnknown; }}#elif defined NDB_OSE || defined NDB_SOFTOSEint Ndbfs::translateErrno(int aErrno){ switch (aErrno) { //permission denied case EACCES: case EROFS: case ENXIO: return FsRef::fsErrPermissionDenied; //temporary not accessible case EAGAIN: case ETIMEDOUT: case ENOLCK: return FsRef::fsErrTemporaryNotAccessible; //no space left on device case ENFILE: case EDQUOT: case ENOSPC: return FsRef::fsErrNoSpaceLeftOnDevice; //none valid parameters case EINVAL: case EFBIG: case EBADF: case ENAMETOOLONG: case EFAULT: case EISDIR: return FsRef::fsErrInvalidParameters; //environment error case EMLINK: case ELOOP: return FsRef::fsErrEnvironmentError; //no more process resources case EMFILE: case ENOMEM: return FsRef::fsErrNoMoreResources; //no file case ENOENT: return FsRef::fsErrFileDoesNotExist; case ERR_ReadUnderflow: return FsRef::fsErrReadUnderflow; default: return FsRef::fsErrUnknown; }}#elseint Ndbfs::translateErrno(int aErrno){ switch (aErrno) { //permission denied case EACCES: case EROFS: case ENXIO: return FsRef::fsErrPermissionDenied; //temporary not accessible case EAGAIN: case ETIMEDOUT: case ENOLCK: case EINTR: case EIO: return FsRef::fsErrTemporaryNotAccessible; //no space left on device case ENFILE: case EDQUOT:#ifdef ENOSR case ENOSR:#endif case ENOSPC: case EFBIG: return FsRef::fsErrNoSpaceLeftOnDevice; //none valid parameters case EINVAL: case EBADF: case ENAMETOOLONG: case EFAULT: case EISDIR: case ENOTDIR: case EEXIST: case ETXTBSY: return FsRef::fsErrInvalidParameters; //environment error case ELOOP:#ifdef ENOLINK case ENOLINK:#endif#ifdef EMULTIHOP case EMULTIHOP:#endif#ifdef EOPNOTSUPP case EOPNOTSUPP:#endif#ifdef ESPIPE case ESPIPE:#endif case EPIPE: return FsRef::fsErrEnvironmentError; //no more process resources case EMFILE: case ENOMEM: return FsRef::fsErrNoMoreResources; //no file case ENOENT: return FsRef::fsErrFileDoesNotExist; case ERR_ReadUnderflow: return FsRef::fsErrReadUnderflow; default: return FsRef::fsErrUnknown; }}#endifvoid Ndbfs::execCONTINUEB(Signal* signal){ jamEntry(); if (signal->theData[0] == NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY) { jam(); // Also send CONTINUEB to ourself in order to scan for // incoming answers from AsyncFile on MemoryChannel theFromThreads signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY; sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 10, 1); if (scanningInProgress == true) { jam(); return; } } if (scanIPC(signal)) { jam(); scanningInProgress = true; signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_NO_DELAY; sendSignal(reference(), GSN_CONTINUEB, signal, 1, JBB); } else { jam(); scanningInProgress = false; } return;}bool Global_useO_SYNC = false;bool Global_useO_DIRECT = false;bool Global_unlinkO_CREAT = false;Uint32 Global_syncFreq = 1024 * 1024;voidNdbfs::execDUMP_STATE_ORD(Signal* signal){ if(signal->theData[0] == 19){ if(signal->length() > 1){ Global_useO_SYNC = signal->theData[1]; } if(signal->length() > 2){ Global_syncFreq = signal->theData[2] * 1024 * 1024; } if(signal->length() > 3){ Global_unlinkO_CREAT = signal->theData[3]; } if(signal->length() > 4){ Global_useO_DIRECT = signal->theData[4]; } ndbout_c("useO_SYNC = %d syncFreq = %d unlinkO_CREATE = %d O_DIRECT = %d", Global_useO_SYNC, Global_syncFreq, Global_unlinkO_CREAT, Global_useO_DIRECT); return; } if(signal->theData[0] == DumpStateOrd::NdbfsDumpFileStat){ infoEvent("NDBFS: Files: %d Open files: %d", theFiles.size(), theOpenFiles.size()); infoEvent(" Idle files: %d Max opened files: %d", theIdleFiles.size(), m_maxOpenedFiles); infoEvent(" Max files: %d", m_maxFiles); infoEvent(" Requests: %d", theRequestPool->size()); return; } if(signal->theData[0] == DumpStateOrd::NdbfsDumpOpenFiles){ infoEvent("NDBFS: Dump open files: %d", theOpenFiles.size()); for (unsigned i = 0; i < theOpenFiles.size(); i++){ AsyncFile* file = theOpenFiles.getFile(i); infoEvent("%2d (0x%x): %s", i,file, file->theFileName.c_str()); } return; } if(signal->theData[0] == DumpStateOrd::NdbfsDumpAllFiles){ infoEvent("NDBFS: Dump all files: %d", theFiles.size()); for (unsigned i = 0; i < theFiles.size(); i++){ AsyncFile* file = theFiles[i]; infoEvent("%2d (0x%x): %s", i,file, file->isOpen()?"OPEN":"CLOSED"); } return; } if(signal->theData[0] == DumpStateOrd::NdbfsDumpIdleFiles){ infoEvent("NDBFS: Dump idle files: %d", theIdleFiles.size()); for (unsigned i = 0; i < theIdleFiles.size(); i++){ AsyncFile* file = theIdleFiles[i]; infoEvent("%2d (0x%x): %s", i,file, file->isOpen()?"OPEN":"CLOSED"); } return; } if(signal->theData[0] == 404) { ndbrequire(signal->getLength() == 2); Uint32 file= signal->theData[1]; AsyncFile* openFile = theOpenFiles.find(file); ndbrequire(openFile); ndbout_c("File: %s %p", openFile->theFileName.c_str(), openFile); Request* curr = openFile->m_current_request; Request* last = openFile->m_last_request; if(curr) ndbout << "Current request: " << *curr << endl; if(last) ndbout << "Last request: " << *last << endl; ndbout << "theReportTo " << *openFile->theReportTo << endl; ndbout << "theMemoryChannelPtr" << *openFile->theMemoryChannelPtr << endl; ndbout << "All files: " << endl; for (unsigned i = 0; i < theFiles.size(); i++){ AsyncFile* file = theFiles[i]; ndbout_c("%2d (0x%x): %s", i,file, file->isOpen()?"OPEN":"CLOSED"); } }}//Ndbfs::execDUMP_STATE_ORD()BLOCK_FUNCTIONS(Ndbfs)template class Vector<AsyncFile*>;template class Vector<OpenFiles::OpenFileItem>;template class MemoryChannel<Request>;template class Pool<Request>;template NdbOut& operator<<(NdbOut&, const MemoryChannel<Request>&);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -