📄 sbcachemanager.cpp
字号:
// when a writer opens the entry, we attempt to open for read // access here before the writer finishes writing the entry, then // the writer invalidates the entry on us. When that happens, if // we're attempting MODE_READ_CREATE we want to get that invalid // entry out of the table and create the entry ourselves returning // in WRITE mode, for MODE_READ we just want to see if another // writer has created a new entry that is valid for us to use // otherwise we bail out with the normal RESULT_NOT_FOUND. if (( rc == VXIcache_RESULT_SUCCESS ) && ( ! entryOpened )) { // If this is a write over an existing entry, wipe the previous entry // accounting from the cache. if (finalMode == CACHE_MODE_WRITE) _curSizeBytes.Decrement(entry.GetSizeBytes(false)); rc = entry.Open (log, moduleName, key, entry.GetPath( ), finalMode, flags, _entryMaxSizeBytes, properties, streamInfo, stream); // Don't let a corrupt entry persist if (( rc != VXIcache_RESULT_SUCCESS ) && ( rc != VXIcache_RESULT_FAILURE )) Delete (log, key); } // Accessed, so move it to the top of the LRU list if ( _entryTableMutex.Lock( ) != VXItrd_RESULT_SUCCESS ) { Error (log, 110, L"%s%s", L"mutex", L"entry table mutex"); rc = VXIcache_RESULT_SYSTEM_ERROR; } else { if (rc == VXIcache_RESULT_SUCCESS) TouchEntry(entry); if ( _entryTableMutex.Unlock( ) != VXItrd_RESULT_SUCCESS ) { Error (log, 111, L"%s%s", L"mutex", L"entry table mutex"); rc = VXIcache_RESULT_SYSTEM_ERROR; } } } while ( rc == VXIcache_RESULT_FAILURE ); // Finish up returning the stream information if (( rc == VXIcache_RESULT_SUCCESS ) && ( streamInfo ) && ( VXIMapSetProperty (streamInfo, CACHE_INFO_FINAL_KEY, (VXIValue *) VXIStringCreate(key.c_str( ))) != VXIvalue_RESULT_SUCCESS )) { Error (log, 100, NULL); (*stream)->Close (true); *stream = NULL; rc = VXIcache_RESULT_OUT_OF_MEMORY; } if (( rc == VXIcache_RESULT_SUCCESS ) && ( mode == CACHE_MODE_READ_CREATE ) && ( finalMode == CACHE_MODE_WRITE )) rc = VXIcache_RESULT_ENTRY_CREATED; // Maybe do cleanup, ignore all but fatal errors VXIcacheResult rc2 = Cleanup (false, key); if ( rc2 < VXIcache_RESULT_SUCCESS ) { (*stream)->Close (true); *stream = NULL; rc = rc2; } Diag (log, SBCACHE_MGR_TAGID, L"Open", L"%s: rc = %d", key.c_str( ), rc); return rc; } // Notification of data writes VXIcacheResult SBcacheManager::WriteNotification (VXIlogInterface *log, const SBcacheString &moduleName, VXIulong nwritten, const SBcacheKey &key) { VXIcacheResult rc = VXIcache_RESULT_SUCCESS; // moduleName is reserved for future use, may want to allow // configuring the cache with specific cache allocations on a // per-module basis // Note that _curSizeBytes is a thread-safe object, intentionally // ignore cleanup errors unless fatal if ( _curSizeBytes.IncrementTest (nwritten, _maxSizeBytes) > 0 ) { VXIcacheResult rc2 = Cleanup (true, key); if ( rc2 < VXIcache_RESULT_SUCCESS ) rc = rc2; } return rc; } // Unlock an entry VXIcacheResult SBcacheManager::Unlock(VXIlogInterface *log, const SBcacheKey &key) { VXIcacheResult rc = VXIcache_RESULT_SUCCESS; if ( _entryTableMutex.StartRead( ) != VXItrd_RESULT_SUCCESS ) { Error (log, 110, L"%s%s", L"mutex", L"entry table mutex"); rc = VXIcache_RESULT_SYSTEM_ERROR; } else { // Find the entry and unlock it, only need read permission SBcacheEntryTable::iterator vi = _entryTable.find (key); if ( vi != _entryTable.end( ) ) { rc = (*vi).second.Unlock (log); } else { rc = VXIcache_RESULT_NOT_FOUND; } if ( _entryTableMutex.EndRead( ) != VXItrd_RESULT_SUCCESS ) { Error (log, 111, L"%s%s", L"mutex", L"entry table mutex"); rc = VXIcache_RESULT_SYSTEM_ERROR; } } Diag (log, SBCACHE_MGR_TAGID, L"Unlock", L"%s: rc = %d", key.c_str( ), rc); return rc; } // Delete an entry VXIcacheResult SBcacheManager::Delete(VXIlogInterface *log, const SBcacheKey &key, bool haveEntryOpen) { VXIcacheResult rc = VXIcache_RESULT_SUCCESS; // Find the entry and delete it, only need read permission. Note // that we need to hold a reference to the entry in order to make // sure it only gets deleted after we release our lock here (that // may be a costly operation). The extra braces ensure the actual // deletion is done prior to logging our exit. { SBcacheEntry entry; if ( _entryTableMutex.Lock( ) != VXItrd_RESULT_SUCCESS ) { Error (log, 110, L"%s%s", L"mutex", L"entry table mutex"); rc = VXIcache_RESULT_SYSTEM_ERROR; } else { SBcacheEntryTable::iterator vi = _entryTable.find (key); if ( vi != _entryTable.end( ) ) { entry = (*vi).second; RemoveEntry(entry); } else { rc = VXIcache_RESULT_NOT_FOUND; } if ( _entryTableMutex.Unlock( ) != VXItrd_RESULT_SUCCESS ) { Error (log, 111, L"%s%s", L"mutex", L"entry table mutex"); rc = VXIcache_RESULT_SYSTEM_ERROR; } } // Note that even getting the size can take time since we // may lock on a write mutex until some write is done, and note that // erasing the entries may take time since that involves deleting // on-disk files in the usual case where there are no active streams // for the entry. if ( rc == VXIcache_RESULT_SUCCESS ) _curSizeBytes.Decrement (entry.GetSizeBytes (haveEntryOpen)); } Diag (log, SBCACHE_MGR_TAGID, L"Delete", L"%s: rc = %d", key.c_str( ), rc); return rc; } // Write out the index file, used to handle abnormal termination VXIcacheResult SBcacheManager::WriteIndex( ) { VXIcacheResult rc = VXIcache_RESULT_SUCCESS; Diag (SBCACHE_MGR_TAGID, L"WriteIndex", L"entering"); // TBD write the index from the entry table, schedule periodically? Diag (SBCACHE_MGR_TAGID, L"WriteIndex", L"exiting: rc = %d", rc); return rc; } // Read the index file, used at startup VXIcacheResult SBcacheManager::ReadIndex(const SBcacheNString &cacheDir) { VXIcacheResult rc = VXIcache_RESULT_SUCCESS; Diag (SBCACHE_MGR_TAGID, L"ReadIndex", L"entering: %S", cacheDir.c_str( )); // TBD, read the index to create the entry table // TBD, purge files with our extension that aren't in the index but // are in the cache, warn about those without our extension Diag (SBCACHE_MGR_TAGID, L"ReadIndex", L"exiting: rc = %d", rc); return rc; } // Get a new path for an entry SBcachePath SBcacheManager::GetNewEntryPath(const SBcacheString &moduleName, const SBcacheKey &key) { VXIcacheResult rc = VXIcache_RESULT_SUCCESS; // Get the index number, note that _pathSeqNum is a thread-safe object VXIulong index = _pathSeqNum.IncrementSeqNum( ); // Construct the path, return an empty string on failure. The path // is a directory tree where there is a subdirectory for each module // name, then subdirectories underneath each module to limit each // subdirectory to no more then 256 entries in order to avoid OS // limitations on files per directory for some older OSes #if defined(__GNUC__) && (__GNUC__ == 3) && (__GNUC_MINOR__ < 2) std::ostrstream pathStream; #else std::basic_ostringstream<char> pathStream; #endif const wchar_t *ptr = moduleName.c_str( ); if (( ptr ) && ( *ptr )) { while (*ptr != L'\0') { if ((( *ptr >= L'0' ) && ( *ptr <= L'9' )) || (( *ptr >= L'A' ) && ( *ptr <= L'Z' )) || (( *ptr >= L'a' ) && ( *ptr <= L'z' )) || ( *ptr == L'.' )) pathStream << static_cast<char>(*ptr); else pathStream << '_'; ptr++; } } else { pathStream << "unknown_module"; } pathStream << SBcachePath::PATH_SEPARATOR << index / CACHE_MAX_DIR_ENTRIES << SBcachePath::PATH_SEPARATOR << index % CACHE_MAX_DIR_ENTRIES << CACHE_ENTRY_FILE_EXTENSION; pathStream << ends; SBcachePath path (_cacheDir, pathStream.str( )); #if defined(__GNUC__) && (__GNUC__ == 3) && (__GNUC_MINOR__ < 2) // str() freezes the stream, we unfreeze it here to allow pathStream to // cleanup after itself. pathStream.freeze(false); #else // Not required in earlier GCC or windows? #endif return path; } // Clean up the cache to eliminate expired entries and if neccessary // delete other entries to remain within the allocated size VXIcacheResult SBcacheManager::Cleanup (bool forcedCleanup, const SBcacheKey& writingKey) { VXIcacheResult rc = VXIcache_RESULT_SUCCESS; if (_curSizeBytes.Get() <= _lowWaterBytes) return rc; // Log to Mgr and Cleanup logs, and emit an warning the first time // cleanup is entered so integrations know if they're filling it too // rapidly. Diag (SBCACHE_MGR_TAGID, L"Cleanup", L"cleanup starting: %lu bytes in cache, %lu allocation", _curSizeBytes.Get( ), _maxSizeBytes); Diag (SBCACHE_CLEANUP_TAGID, L"Cleanup", L"cleanup starting: %lu bytes in cache, %lu allocation", _curSizeBytes.Get( ), _maxSizeBytes); { static int once = 0; if (once == 0) { once = 1; Error(302, L"%s%s", L"Cleanup", L"One-time notification: Disk cache " L"filled to maximum for first time, performing cleanup."); } } int bytesToExpire = _curSizeBytes.Get() - _lowWaterBytes; time_t now = time(0); SBcacheEntryTable::iterator vi; SBcacheEntryList::iterator li; SBcacheEntryList expiredEntries; VXIulong totalBytesFreed = 0; time_t oldestAccessed = now; int entriesFreed = 0; // Lock cache and transfer enough entries from the cache to our temporary // list to get us below the cache low water mark. // if ( _entryTableMutex.Lock( ) != VXItrd_RESULT_SUCCESS ) { Error (110, L"%s%s", L"mutex", L"entry table mutex"); rc = VXIcache_RESULT_SYSTEM_ERROR; } else { // Copy entries to our temporary list for (li = _entryLRUList.begin(); li != _entryLRUList.end(); ++li) { if (bytesToExpire <= 0) break; int entrySize = (*li).GetSizeBytes(true); if ((*li).GetKey() != writingKey && 0 < entrySize && (*li).IsExpired(now, &oldestAccessed)) { expiredEntries.push_back(*li); totalBytesFreed += entrySize; bytesToExpire -= entrySize; entriesFreed++; } } // Remove them from cache for (li = expiredEntries.begin(); li != expiredEntries.end(); ++li) RemoveEntry(*li); if ( _entryTableMutex.Unlock( ) != VXItrd_RESULT_SUCCESS ) { Error (111, L"%s%s", L"mutex", L"entry table mutex"); rc = VXIcache_RESULT_SYSTEM_ERROR; } } // Now that we're outside the cache lock, actually remove all the // entries by removing the last reference to them in our list. expiredEntries.erase(expiredEntries.begin(), expiredEntries.end()); _curSizeBytes.Decrement(totalBytesFreed); // Log prior to setting the next cleanup time to ensure we log our // completion prior to another cleanup starting Diag (SBCACHE_MGR_TAGID, L"Cleanup", L"exiting: rc = %d, released %u entries, %lu bytes, %lu bytes in cache", rc, entriesFreed, totalBytesFreed, _curSizeBytes.Get( )); Diag (SBCACHE_CLEANUP_TAGID, L"Cleanup", L"exiting: rc = %d, released %u entries, %lu bytes, %lu bytes in cache", rc, entriesFreed, totalBytesFreed, _curSizeBytes.Get( )); return rc; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -