bdb_blobcache.cpp
来自「ncbi源码」· C++ 代码 · 共 1,387 行 · 第 1/3 页
CPP
1,387 行
// Check if bdb env. files are in place and try to join CDir dir(m_Path); CDir::TEntries fl = dir.GetEntries("__db.*", CDir::eIgnoreRecursive); if (fl.empty()) { if (cache_ram_size) { m_Env->SetCacheSize(cache_ram_size); } m_Env->OpenWithTrans(cache_path); } else { if (cache_ram_size) { m_Env->SetCacheSize(cache_ram_size); } try { m_Env->JoinEnv(cache_path, CBDB_Env::eThreaded); if (!m_Env->IsTransactional()) { LOG_POST(Warning << "LC: Warning: Joined non-transactional environment "); } } catch (CBDB_Exception&) { m_Env->OpenWithTrans(cache_path); } } m_Env->SetDirectDB(true); m_Env->SetDirectLog(true); m_CacheDB = new SCacheDB(); m_CacheAttrDB = new SCache_AttrDB(); m_CacheDB->SetEnv(*m_Env); m_CacheAttrDB->SetEnv(*m_Env); m_CacheDB->SetPageSize(32 * 1024); string cache_db_name = string("lcs_") + string(cache_name) + string(".db"); string attr_db_name = string("lcs_") + string(cache_name) + string("_attr") + string(".db"); m_CacheDB->Open(cache_db_name.c_str(), CBDB_RawFile::eReadWriteCreate); m_CacheAttrDB->Open(attr_db_name.c_str(), CBDB_RawFile::eReadWriteCreate); }} if (m_TimeStampFlag & fPurgeOnStartup) { Purge(GetTimeout()); } m_Env->TransactionCheckpoint();}void CBDB_Cache::Close(){ delete m_PidGuard; m_PidGuard = 0; delete m_CacheDB; m_CacheDB = 0; delete m_CacheAttrDB; m_CacheAttrDB = 0; delete m_Env; m_Env = 0;}void CBDB_Cache::SetTimeStampPolicy(TTimeStampFlags policy, int timeout){ CFastMutexGuard guard(x_BDB_BLOB_CacheMutex); m_TimeStampFlag = policy; m_Timeout = timeout;}CBDB_Cache::TTimeStampFlags CBDB_Cache::GetTimeStampPolicy() const{ return m_TimeStampFlag;}int CBDB_Cache::GetTimeout(){ return m_Timeout;}void CBDB_Cache::SetVersionRetention(EKeepVersions policy){ CFastMutexGuard guard(x_BDB_BLOB_CacheMutex); m_VersionFlag = policy;}CBDB_Cache::EKeepVersions CBDB_Cache::GetVersionRetention() const{ return m_VersionFlag;}void CBDB_Cache::Store(const string& key, int version, const string& subkey, const void* data, size_t size){ if (m_VersionFlag == eDropAll || m_VersionFlag == eDropOlder) { Purge(key, subkey, 0, m_VersionFlag); } CBDB_Transaction trans(*m_Env); m_CacheDB->SetTransaction(&trans); m_CacheAttrDB->SetTransaction(&trans); CFastMutexGuard guard(x_BDB_BLOB_CacheMutex); unsigned overflow = 0; if (size < s_WriterBufferSize) { // inline BLOB m_CacheDB->key = key; m_CacheDB->version = version; m_CacheDB->subkey = subkey; m_CacheDB->Insert(data, size); overflow = 0; } else { // overflow BLOB string path; s_MakeOverflowFileName(path, m_Path, key, version, subkey); _TRACE("LC: Making overflow file " << path); CNcbiOfstream oveflow_file(path.c_str(), IOS_BASE::out | IOS_BASE::trunc | IOS_BASE::binary); if (!oveflow_file.is_open()) { ERR_POST("LC Error:Cannot create overflow file " << path); return; } oveflow_file.write((char*)data, size); overflow = 1; } // // Update cache element's attributes // CTime time_stamp(CTime::eCurrent); m_CacheAttrDB->key = key; m_CacheAttrDB->version = version; m_CacheAttrDB->subkey = (m_TimeStampFlag & fTrackSubKey) ? subkey : ""; m_CacheAttrDB->time_stamp = (unsigned)time_stamp.GetTimeT(); m_CacheAttrDB->overflow = overflow; m_CacheAttrDB->UpdateInsert(); trans.Commit(); m_CacheAttrDB->GetEnv()->TransactionCheckpoint();}size_t CBDB_Cache::GetSize(const string& key, int version, const string& subkey){ CFastMutexGuard guard(x_BDB_BLOB_CacheMutex); m_CacheAttrDB->key = key; m_CacheAttrDB->version = version; m_CacheAttrDB->subkey = (m_TimeStampFlag & fTrackSubKey) ? subkey : ""; EBDB_ErrCode ret = m_CacheAttrDB->Fetch(); if (ret != eBDB_Ok) { return 0; } // check expiration here if (m_TimeStampFlag & fCheckExpirationAlways) { if (x_CheckTimestampExpired()) { return 0; } } int overflow = m_CacheAttrDB->overflow; if (overflow) { string path; s_MakeOverflowFileName(path, m_Path, key, version, subkey); CFile entry(path); if (entry.Exists()) { return (size_t) entry.GetLength(); } } // Regular inline BLOB m_CacheDB->key = key; m_CacheDB->version = version; m_CacheDB->subkey = subkey; ret = m_CacheDB->Fetch(); if (ret != eBDB_Ok) { return 0; } return m_CacheDB->LobSize();}bool CBDB_Cache::Read(const string& key, int version, const string& subkey, void* buf, size_t buf_size){ CFastMutexGuard guard(x_BDB_BLOB_CacheMutex); m_CacheAttrDB->key = key; m_CacheAttrDB->version = version; m_CacheAttrDB->subkey = (m_TimeStampFlag & fTrackSubKey) ? subkey : ""; EBDB_ErrCode ret = m_CacheAttrDB->Fetch(); if (ret != eBDB_Ok) { return false; } // check expiration if (m_TimeStampFlag & fCheckExpirationAlways) { if (x_CheckTimestampExpired()) { return false; } } int overflow = m_CacheAttrDB->overflow; if (overflow) { string path; s_MakeOverflowFileName(path, m_Path, key, version, subkey); auto_ptr<CNcbiIfstream> overflow_file(new CNcbiIfstream(path.c_str(), IOS_BASE::in | IOS_BASE::binary)); if (!overflow_file->is_open()) { return false; } overflow_file->read((char*)buf, buf_size); if (!*overflow_file) { return false; } } else { m_CacheDB->key = key; m_CacheDB->version = version; m_CacheDB->subkey = subkey; ret = m_CacheDB->Fetch(); if (ret != eBDB_Ok) { return false; } ret = m_CacheDB->GetData(buf, buf_size); if (ret != eBDB_Ok) { return false; } } if ( m_TimeStampFlag & fTimeStampOnRead ) { x_UpdateAccessTime(key, version, subkey); } return true;}IReader* CBDB_Cache::GetReadStream(const string& key, int version, const string& subkey){ CFastMutexGuard guard(x_BDB_BLOB_CacheMutex); m_CacheAttrDB->key = key; m_CacheAttrDB->version = version; m_CacheAttrDB->subkey = (m_TimeStampFlag & fTrackSubKey) ? subkey : ""; EBDB_ErrCode ret = m_CacheAttrDB->Fetch(); if (ret != eBDB_Ok) { return 0; } // check expiration if (m_TimeStampFlag & fCheckExpirationAlways) { if (x_CheckTimestampExpired()) { return 0; } } int overflow = m_CacheAttrDB->overflow; // Check if it's an overflow BLOB (external file) if (overflow) { string path; s_MakeOverflowFileName(path, m_Path, key, version, subkey); auto_ptr<CNcbiIfstream> overflow_file(new CNcbiIfstream(path.c_str(), IOS_BASE::in | IOS_BASE::binary)); if (!overflow_file->is_open()) { return 0; } if ( m_TimeStampFlag & fTimeStampOnRead ) { x_UpdateAccessTime(key, version, subkey); } return new CBDB_CacheIReader(overflow_file.release()); } // Inline BLOB, reading from BDB storage m_CacheDB->key = key; m_CacheDB->version = version; m_CacheDB->subkey = subkey; ret = m_CacheDB->Fetch(); if (ret != eBDB_Ok) { return 0; } size_t bsize = m_CacheDB->LobSize(); unsigned char* buf = new unsigned char[bsize+1]; ret = m_CacheDB->GetData(buf, bsize); if (ret != eBDB_Ok) { return 0; } if ( m_TimeStampFlag & fTimeStampOnRead ) { x_UpdateAccessTime(key, version, subkey); } return new CBDB_CacheIReader(buf, bsize);/* CBDB_BLobStream* bstream = m_BlobDB.CreateStream(); return new CBDB_BLOB_CacheIReader(bstream);*/}IWriter* CBDB_Cache::GetWriteStream(const string& key, int version, const string& subkey){ if (m_VersionFlag == eDropAll || m_VersionFlag == eDropOlder) { Purge(key, subkey, 0, m_VersionFlag); } CFastMutexGuard guard(x_BDB_BLOB_CacheMutex); { CBDB_Transaction trans(*m_Env); m_CacheDB->SetTransaction(&trans); m_CacheAttrDB->SetTransaction(&trans); x_DropBlob(key.c_str(), version, subkey.c_str(), 1); trans.Commit(); } m_CacheDB->key = key; m_CacheDB->version = version; m_CacheDB->subkey = subkey; CBDB_BLobStream* bstream = m_CacheDB->CreateStream(); return new CBDB_CacheIWriter(m_Path.c_str(), key, version, subkey, bstream, *m_CacheDB, *m_CacheAttrDB, m_TimeStampFlag & fTrackSubKey);}void CBDB_Cache::Remove(const string& key){ CFastMutexGuard guard(x_BDB_BLOB_CacheMutex); vector<SCacheDescr> cache_elements; // Search the records to delete {{ CBDB_FileCursor cur(*m_CacheAttrDB); cur.SetCondition(CBDB_FileCursor::eEQ); cur.From << key; while (cur.Fetch() == eBDB_Ok) { int version = m_CacheAttrDB->version; const char* subkey = m_CacheAttrDB->subkey; int overflow = m_CacheAttrDB->overflow; cache_elements.push_back(SCacheDescr(key, version, subkey, overflow)); } }} CBDB_Transaction trans(*m_Env); m_CacheDB->SetTransaction(&trans); m_CacheAttrDB->SetTransaction(&trans); // Now delete all objects ITERATE(vector<SCacheDescr>, it, cache_elements) { x_DropBlob(it->key.c_str(), it->version, it->subkey.c_str(), it->overflow); } trans.Commit(); // Second pass scan if for some resons some cache elements are // still in the database cache_elements.resize(0); {{ CBDB_FileCursor cur(*m_CacheDB); cur.SetCondition(CBDB_FileCursor::eEQ); cur.From << key; while (cur.Fetch() == eBDB_Ok) { int version = m_CacheDB->version; const char* subkey = m_CacheDB->subkey; cache_elements.push_back(SCacheDescr(key, version, subkey, 0)); } }} ITERATE(vector<SCacheDescr>, it, cache_elements) { x_DropBlob(it->key.c_str(), it->version, it->subkey.c_str(), it->overflow); } trans.Commit(); m_CacheAttrDB->GetEnv()->TransactionCheckpoint();}time_t CBDB_Cache::GetAccessTime(const string& key, int version, const string& subkey){ _ASSERT(m_CacheAttrDB); CFastMutexGuard guard(x_BDB_BLOB_CacheMutex);
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?