📄 sbcachemanager.cpp
字号:
// the writer invalidates the entry on us. When that happens, if
// we're attempting MODE_READ_CREATE we want to get that invalid
// entry out of the table and create the entry ourselves returning
// in WRITE mode, for MODE_READ we just want to see if another
// writer has created a new entry that is valid for us to use
// otherwise we bail out with the normal RESULT_NOT_FOUND.
if (( rc == VXIcache_RESULT_SUCCESS ) && ( ! entryOpened )) {
// If this is a write over an existing entry, wipe the previous entry
// accounting from the cache.
if (finalMode == CACHE_MODE_WRITE)
_curSizeBytes.Decrement(entry.GetSizeBytes(false));
rc = entry.Open (log, moduleName, key, entry.GetPath( ), finalMode,
flags, _entryMaxSizeBytes, properties, streamInfo,
stream);
// Don't let a corrupt entry persist
if (( rc != VXIcache_RESULT_SUCCESS ) &&
( rc != VXIcache_RESULT_FAILURE ))
Delete (log, key);
}
// Accessed, so move it to the top of the LRU list
if ( _entryTableMutex.Lock( ) != VXItrd_RESULT_SUCCESS ) {
Error (log, 110, L"%s%s", L"mutex", L"entry table mutex");
rc = VXIcache_RESULT_SYSTEM_ERROR;
} else {
if (rc == VXIcache_RESULT_SUCCESS)
TouchEntry(entry);
if ( _entryTableMutex.Unlock( ) != VXItrd_RESULT_SUCCESS ) {
Error (log, 111, L"%s%s", L"mutex", L"entry table mutex");
rc = VXIcache_RESULT_SYSTEM_ERROR;
}
}
} while ( rc == VXIcache_RESULT_FAILURE );
// Finish up returning the stream information
if (( rc == VXIcache_RESULT_SUCCESS ) && ( streamInfo ) &&
( VXIMapSetProperty (streamInfo, CACHE_INFO_FINAL_KEY,
(VXIValue *) VXIStringCreate(key.c_str( ))) !=
VXIvalue_RESULT_SUCCESS )) {
Error (log, 100, NULL);
(*stream)->Close (true);
*stream = NULL;
rc = VXIcache_RESULT_OUT_OF_MEMORY;
}
if (( rc == VXIcache_RESULT_SUCCESS ) &&
( mode == CACHE_MODE_READ_CREATE ) && ( finalMode == CACHE_MODE_WRITE ))
rc = VXIcache_RESULT_ENTRY_CREATED;
// Maybe do cleanup, ignore all but fatal errors
VXIcacheResult rc2 = Cleanup (false, key);
if ( rc2 < VXIcache_RESULT_SUCCESS ) {
(*stream)->Close (true);
*stream = NULL;
rc = rc2;
}
Diag (log, SBCACHE_MGR_TAGID, L"Open", L"%s: rc = %d", key.c_str( ), rc);
return rc;
}
// Notification of data writes
VXIcacheResult
SBcacheManager::WriteNotification (VXIlogInterface *log,
const SBcacheString &moduleName,
VXIulong nwritten,
const SBcacheKey &key)
{
VXIcacheResult rc = VXIcache_RESULT_SUCCESS;
// moduleName is reserved for future use, may want to allow
// configuring the cache with specific cache allocations on a
// per-module basis
// Note that _curSizeBytes is a thread-safe object, intentionally
// ignore cleanup errors unless fatal
if ( _curSizeBytes.IncrementTest (nwritten, _maxSizeBytes) > 0 ) {
VXIcacheResult rc2 = Cleanup (true, key);
if ( rc2 < VXIcache_RESULT_SUCCESS )
rc = rc2;
}
return rc;
}
// Unlock an entry
VXIcacheResult SBcacheManager::Unlock(VXIlogInterface *log,
const SBcacheKey &key)
{
VXIcacheResult rc = VXIcache_RESULT_SUCCESS;
if ( _entryTableMutex.StartRead( ) != VXItrd_RESULT_SUCCESS ) {
Error (log, 110, L"%s%s", L"mutex", L"entry table mutex");
rc = VXIcache_RESULT_SYSTEM_ERROR;
} else {
// Find the entry and unlock it, only need read permission
SBcacheEntryTable::iterator vi = _entryTable.find (key);
if ( vi != _entryTable.end( ) ) {
rc = (*vi).second.Unlock (log);
} else {
rc = VXIcache_RESULT_NOT_FOUND;
}
if ( _entryTableMutex.EndRead( ) != VXItrd_RESULT_SUCCESS ) {
Error (log, 111, L"%s%s", L"mutex", L"entry table mutex");
rc = VXIcache_RESULT_SYSTEM_ERROR;
}
}
Diag (log, SBCACHE_MGR_TAGID, L"Unlock", L"%s: rc = %d", key.c_str( ), rc);
return rc;
}
// Delete an entry
VXIcacheResult SBcacheManager::Delete(VXIlogInterface *log,
const SBcacheKey &key,
bool haveEntryOpen)
{
VXIcacheResult rc = VXIcache_RESULT_SUCCESS;
// Find the entry and delete it, only need read permission. Note
// that we need to hold a reference to the entry in order to make
// sure it only gets deleted after we release our lock here (that
// may be a costly operation). The extra braces ensure the actual
// deletion is done prior to logging our exit.
{
SBcacheEntry entry;
if ( _entryTableMutex.Lock( ) != VXItrd_RESULT_SUCCESS ) {
Error (log, 110, L"%s%s", L"mutex", L"entry table mutex");
rc = VXIcache_RESULT_SYSTEM_ERROR;
} else {
SBcacheEntryTable::iterator vi = _entryTable.find (key);
if ( vi != _entryTable.end( ) ) {
entry = (*vi).second;
RemoveEntry(entry);
} else {
rc = VXIcache_RESULT_NOT_FOUND;
}
if ( _entryTableMutex.Unlock( ) != VXItrd_RESULT_SUCCESS ) {
Error (log, 111, L"%s%s", L"mutex", L"entry table mutex");
rc = VXIcache_RESULT_SYSTEM_ERROR;
}
}
// Note that even getting the size can take time since we
// may lock on a write mutex until some write is done, and note that
// erasing the entries may take time since that involves deleting
// on-disk files in the usual case where there are no active streams
// for the entry.
if ( rc == VXIcache_RESULT_SUCCESS )
_curSizeBytes.Decrement (entry.GetSizeBytes (haveEntryOpen));
}
Diag (log, SBCACHE_MGR_TAGID, L"Delete", L"%s: rc = %d", key.c_str( ), rc);
return rc;
}
// Write out the index file, used to handle abnormal termination
VXIcacheResult SBcacheManager::WriteIndex( )
{
VXIcacheResult rc = VXIcache_RESULT_SUCCESS;
Diag (SBCACHE_MGR_TAGID, L"WriteIndex", L"entering");
// TBD write the index from the entry table, schedule periodically?
Diag (SBCACHE_MGR_TAGID, L"WriteIndex", L"exiting: rc = %d", rc);
return rc;
}
// Read the index file, used at startup
VXIcacheResult SBcacheManager::ReadIndex(const SBcacheNString &cacheDir)
{
VXIcacheResult rc = VXIcache_RESULT_SUCCESS;
Diag (SBCACHE_MGR_TAGID, L"ReadIndex", L"entering: %S", cacheDir.c_str( ));
// TBD, read the index to create the entry table
// TBD, purge files with our extension that aren't in the index but
// are in the cache, warn about those without our extension
Diag (SBCACHE_MGR_TAGID, L"ReadIndex", L"exiting: rc = %d", rc);
return rc;
}
// Get a new path for an entry
SBcachePath SBcacheManager::GetNewEntryPath(const SBcacheString &moduleName,
const SBcacheKey &key)
{
//VXIcacheResult rc = VXIcache_RESULT_SUCCESS;
// Get the index number, note that _pathSeqNum is a thread-safe object
VXIulong index = _pathSeqNum.IncrementSeqNum( );
// Construct the path, return an empty string on failure. The path
// is a directory tree where there is a subdirectory for each module
// name, then subdirectories underneath each module to limit each
// subdirectory to no more then 256 entries in order to avoid OS
// limitations on files per directory for some older OSes
#if defined(__GNUC__) && (__GNUC__ == 3) && (__GNUC_MINOR__ < 2)
std::ostrstream pathStream;
#else
std::basic_ostringstream<char> pathStream;
#endif
const wchar_t *ptr = moduleName.c_str( );
if (( ptr ) && ( *ptr )) {
while (*ptr != L'\0') {
if ((( *ptr >= L'0' ) && ( *ptr <= L'9' )) ||
(( *ptr >= L'A' ) && ( *ptr <= L'Z' )) ||
(( *ptr >= L'a' ) && ( *ptr <= L'z' )) ||
( *ptr == L'.' ))
pathStream << static_cast<char>(*ptr);
else
pathStream << '_';
ptr++;
}
} else {
pathStream << "unknown_module";
}
pathStream << SBcachePath::PATH_SEPARATOR
<< index / CACHE_MAX_DIR_ENTRIES << SBcachePath::PATH_SEPARATOR
<< index % CACHE_MAX_DIR_ENTRIES << CACHE_ENTRY_FILE_EXTENSION;
pathStream << ends;
SBcachePath path (_cacheDir, pathStream.str( ));
#if defined(__GNUC__) && (__GNUC__ == 3) && (__GNUC_MINOR__ < 2)
// str() freezes the stream, we unfreeze it here to allow pathStream to
// cleanup after itself.
pathStream.freeze(false);
#else
// Not required in earlier GCC or windows?
#endif
return path;
}
// Clean up the cache to eliminate expired entries and if neccessary
// delete other entries to remain within the allocated size
VXIcacheResult SBcacheManager::Cleanup (bool forcedCleanup,
const SBcacheKey& writingKey)
{
VXIcacheResult rc = VXIcache_RESULT_SUCCESS;
if (_curSizeBytes.Get() <= _lowWaterBytes)
{
return rc;
}
// Log to Mgr and Cleanup logs, and emit an warning the first time
// cleanup is entered so integrations know if they're filling it too
// rapidly.
Diag (SBCACHE_MGR_TAGID, L"Cleanup",
L"cleanup starting: %lu bytes in cache, %lu allocation",
_curSizeBytes.Get( ), _maxSizeBytes);
Diag (SBCACHE_CLEANUP_TAGID, L"Cleanup",
L"cleanup starting: %lu bytes in cache, %lu allocation",
_curSizeBytes.Get( ), _maxSizeBytes);
{ static int once = 0;
if (once == 0) {
once = 1;
Error(302, L"%s%s", L"Cleanup", L"One-time notification: Disk cache "
L"filled to maximum for first time, performing cleanup.");
}
}
int bytesToExpire = _curSizeBytes.Get() - _lowWaterBytes;
time_t now = time(0);
SBcacheEntryTable::iterator vi;
SBcacheEntryList::iterator li;
SBcacheEntryList expiredEntries;
VXIulong totalBytesFreed = 0;
time_t oldestAccessed = now;
int entriesFreed = 0;
// Lock cache and transfer enough entries from the cache to our temporary
// list to get us below the cache low water mark.
//
if ( _entryTableMutex.Lock( ) != VXItrd_RESULT_SUCCESS ) {
Error (110, L"%s%s", L"mutex", L"entry table mutex");
rc = VXIcache_RESULT_SYSTEM_ERROR;
} else {
// Copy entries to our temporary list
for (li = _entryLRUList.begin(); li != _entryLRUList.end(); ++li) {
if (bytesToExpire <= 0) break;
int entrySize = (*li).GetSizeBytes(true);
if ((*li).GetKey() != writingKey &&
0 < entrySize &&
(*li).IsExpired(now, &oldestAccessed)) {
expiredEntries.push_back(*li);
totalBytesFreed += entrySize;
bytesToExpire -= entrySize;
entriesFreed++;
}
}
// Remove them from cache
for (li = expiredEntries.begin(); li != expiredEntries.end(); ++li)
RemoveEntry(*li);
if ( _entryTableMutex.Unlock( ) != VXItrd_RESULT_SUCCESS ) {
Error (111, L"%s%s", L"mutex", L"entry table mutex");
rc = VXIcache_RESULT_SYSTEM_ERROR;
}
}
// Now that we're outside the cache lock, actually remove all the
// entries by removing the last reference to them in our list.
expiredEntries.erase(expiredEntries.begin(), expiredEntries.end());
_curSizeBytes.Decrement(totalBytesFreed);
// Log prior to setting the next cleanup time to ensure we log our
// completion prior to another cleanup starting
Diag (SBCACHE_MGR_TAGID, L"Cleanup",
L"exiting: rc = %d, released %u entries, %lu bytes, %lu bytes in cache",
rc, entriesFreed, totalBytesFreed, _curSizeBytes.Get( ));
Diag (SBCACHE_CLEANUP_TAGID, L"Cleanup",
L"exiting: rc = %d, released %u entries, %lu bytes, %lu bytes in cache",
rc, entriesFreed, totalBytesFreed, _curSizeBytes.Get( ));
return rc;
}
#endif // P_VXI
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -