bdb_blobcache.cpp

来自「ncbi源码」· C++ 代码 · 共 1,387 行 · 第 1/3 页

CPP
1,387
字号
    // Check if bdb env. files are in place and try to join    CDir dir(m_Path);    CDir::TEntries fl = dir.GetEntries("__db.*", CDir::eIgnoreRecursive);    if (fl.empty()) {        if (cache_ram_size) {            m_Env->SetCacheSize(cache_ram_size);        }        m_Env->OpenWithTrans(cache_path);    } else {        if (cache_ram_size) {            m_Env->SetCacheSize(cache_ram_size);        }        try {            m_Env->JoinEnv(cache_path, CBDB_Env::eThreaded);            if (!m_Env->IsTransactional()) {                LOG_POST(Warning <<                          "LC: Warning: Joined non-transactional environment ");            }        }         catch (CBDB_Exception&)        {            m_Env->OpenWithTrans(cache_path);        }    }    m_Env->SetDirectDB(true);    m_Env->SetDirectLog(true);    m_CacheDB = new SCacheDB();    m_CacheAttrDB = new SCache_AttrDB();    m_CacheDB->SetEnv(*m_Env);    m_CacheAttrDB->SetEnv(*m_Env);    m_CacheDB->SetPageSize(32 * 1024);    string cache_db_name =        string("lcs_") + string(cache_name) + string(".db");    string attr_db_name =        string("lcs_") + string(cache_name) + string("_attr") + string(".db");    m_CacheDB->Open(cache_db_name.c_str(),    CBDB_RawFile::eReadWriteCreate);    m_CacheAttrDB->Open(attr_db_name.c_str(), CBDB_RawFile::eReadWriteCreate);        }}    if (m_TimeStampFlag & fPurgeOnStartup) {        Purge(GetTimeout());    }    m_Env->TransactionCheckpoint();}void CBDB_Cache::Close(){    delete m_PidGuard;    m_PidGuard = 0;    delete m_CacheDB;     m_CacheDB = 0;    delete m_CacheAttrDB; m_CacheAttrDB = 0;    delete m_Env;         m_Env = 0;}void CBDB_Cache::SetTimeStampPolicy(TTimeStampFlags policy,                                     int             timeout){    CFastMutexGuard guard(x_BDB_BLOB_CacheMutex);    m_TimeStampFlag = policy;    m_Timeout = timeout;}CBDB_Cache::TTimeStampFlags CBDB_Cache::GetTimeStampPolicy() const{    return m_TimeStampFlag;}int CBDB_Cache::GetTimeout(){    return m_Timeout;}void CBDB_Cache::SetVersionRetention(EKeepVersions policy){    CFastMutexGuard guard(x_BDB_BLOB_CacheMutex);    m_VersionFlag = policy;}CBDB_Cache::EKeepVersions CBDB_Cache::GetVersionRetention() const{    return m_VersionFlag;}void CBDB_Cache::Store(const string&  key,                       int            version,                       const string&  subkey,                       const void*    data,                       size_t         size){    if (m_VersionFlag == eDropAll || m_VersionFlag == eDropOlder) {        Purge(key, subkey, 0, m_VersionFlag);    }    CBDB_Transaction trans(*m_Env);    m_CacheDB->SetTransaction(&trans);    m_CacheAttrDB->SetTransaction(&trans);    CFastMutexGuard guard(x_BDB_BLOB_CacheMutex);    unsigned overflow = 0;    if (size < s_WriterBufferSize) {  // inline BLOB        m_CacheDB->key = key;        m_CacheDB->version = version;        m_CacheDB->subkey = subkey;        m_CacheDB->Insert(data, size);        overflow = 0;    } else { // overflow BLOB        string path;        s_MakeOverflowFileName(path, m_Path, key, version, subkey);        _TRACE("LC: Making overflow file " << path);        CNcbiOfstream oveflow_file(path.c_str(),                                    IOS_BASE::out |                                    IOS_BASE::trunc |                                    IOS_BASE::binary);        if (!oveflow_file.is_open()) {            ERR_POST("LC Error:Cannot create overflow file " << path);            return;        }        oveflow_file.write((char*)data, size);        overflow = 1;    }    //    // Update cache element's attributes    //    CTime time_stamp(CTime::eCurrent);    m_CacheAttrDB->key = key;    m_CacheAttrDB->version = version;    m_CacheAttrDB->subkey = (m_TimeStampFlag & fTrackSubKey) ? subkey : "";    m_CacheAttrDB->time_stamp = (unsigned)time_stamp.GetTimeT();    m_CacheAttrDB->overflow = overflow;    m_CacheAttrDB->UpdateInsert();    trans.Commit();    m_CacheAttrDB->GetEnv()->TransactionCheckpoint();}size_t CBDB_Cache::GetSize(const string&  key,                           int            version,                           const string&  subkey){    CFastMutexGuard guard(x_BDB_BLOB_CacheMutex);    m_CacheAttrDB->key = key;    m_CacheAttrDB->version = version;    m_CacheAttrDB->subkey = (m_TimeStampFlag & fTrackSubKey) ? subkey : "";    EBDB_ErrCode ret = m_CacheAttrDB->Fetch();    if (ret != eBDB_Ok) {        return 0;    }    // check expiration here    if (m_TimeStampFlag & fCheckExpirationAlways) {        if (x_CheckTimestampExpired()) {            return 0;        }    }    int overflow = m_CacheAttrDB->overflow;    if (overflow) {        string path;        s_MakeOverflowFileName(path, m_Path, key, version, subkey);        CFile entry(path);        if (entry.Exists()) {            return (size_t) entry.GetLength();        }    }    // Regular inline BLOB    m_CacheDB->key = key;    m_CacheDB->version = version;    m_CacheDB->subkey = subkey;    ret = m_CacheDB->Fetch();    if (ret != eBDB_Ok) {        return 0;    }    return m_CacheDB->LobSize();}bool CBDB_Cache::Read(const string& key,                       int           version,                       const string& subkey,                      void*         buf,                       size_t        buf_size){    CFastMutexGuard guard(x_BDB_BLOB_CacheMutex);    m_CacheAttrDB->key = key;    m_CacheAttrDB->version = version;    m_CacheAttrDB->subkey = (m_TimeStampFlag & fTrackSubKey) ? subkey : "";    EBDB_ErrCode ret = m_CacheAttrDB->Fetch();    if (ret != eBDB_Ok) {        return false;    }    // check expiration    if (m_TimeStampFlag & fCheckExpirationAlways) {        if (x_CheckTimestampExpired()) {            return false;        }    }    int overflow = m_CacheAttrDB->overflow;    if (overflow) {        string path;        s_MakeOverflowFileName(path, m_Path, key, version, subkey);        auto_ptr<CNcbiIfstream>            overflow_file(new CNcbiIfstream(path.c_str(),                                            IOS_BASE::in | IOS_BASE::binary));        if (!overflow_file->is_open()) {            return false;        }        overflow_file->read((char*)buf, buf_size);        if (!*overflow_file) {            return false;        }    }    else {        m_CacheDB->key = key;        m_CacheDB->version = version;        m_CacheDB->subkey = subkey;        ret = m_CacheDB->Fetch();        if (ret != eBDB_Ok) {            return false;        }        ret = m_CacheDB->GetData(buf, buf_size);        if (ret != eBDB_Ok) {            return false;        }    }    if ( m_TimeStampFlag & fTimeStampOnRead ) {        x_UpdateAccessTime(key, version, subkey);    }    return true;}IReader* CBDB_Cache::GetReadStream(const string&  key,                                    int            version,                                   const string&  subkey){    CFastMutexGuard guard(x_BDB_BLOB_CacheMutex);    m_CacheAttrDB->key = key;    m_CacheAttrDB->version = version;    m_CacheAttrDB->subkey = (m_TimeStampFlag & fTrackSubKey) ? subkey : "";    EBDB_ErrCode ret = m_CacheAttrDB->Fetch();    if (ret != eBDB_Ok) {        return 0;    }    // check expiration    if (m_TimeStampFlag & fCheckExpirationAlways) {        if (x_CheckTimestampExpired()) {            return 0;        }    }    int overflow = m_CacheAttrDB->overflow;    // Check if it's an overflow BLOB (external file)    if (overflow) {        string path;        s_MakeOverflowFileName(path, m_Path, key, version, subkey);        auto_ptr<CNcbiIfstream>             overflow_file(new CNcbiIfstream(path.c_str(),                                            IOS_BASE::in | IOS_BASE::binary));        if (!overflow_file->is_open()) {            return 0;        }        if ( m_TimeStampFlag & fTimeStampOnRead ) {            x_UpdateAccessTime(key, version, subkey);        }        return new CBDB_CacheIReader(overflow_file.release());    }    // Inline BLOB, reading from BDB storage    m_CacheDB->key = key;    m_CacheDB->version = version;    m_CacheDB->subkey = subkey;        ret = m_CacheDB->Fetch();    if (ret != eBDB_Ok) {        return 0;    }    size_t bsize = m_CacheDB->LobSize();    unsigned char* buf = new unsigned char[bsize+1];    ret = m_CacheDB->GetData(buf, bsize);    if (ret != eBDB_Ok) {        return 0;    }    if ( m_TimeStampFlag & fTimeStampOnRead ) {        x_UpdateAccessTime(key, version, subkey);    }    return new CBDB_CacheIReader(buf, bsize);/*    CBDB_BLobStream* bstream = m_BlobDB.CreateStream();    return new CBDB_BLOB_CacheIReader(bstream);*/}IWriter* CBDB_Cache::GetWriteStream(const string&    key,                                    int              version,                                    const string&    subkey){    if (m_VersionFlag == eDropAll || m_VersionFlag == eDropOlder) {        Purge(key, subkey, 0, m_VersionFlag);    }    CFastMutexGuard guard(x_BDB_BLOB_CacheMutex);    {        CBDB_Transaction trans(*m_Env);        m_CacheDB->SetTransaction(&trans);        m_CacheAttrDB->SetTransaction(&trans);        x_DropBlob(key.c_str(), version, subkey.c_str(), 1);        trans.Commit();    }    m_CacheDB->key = key;    m_CacheDB->version = version;    m_CacheDB->subkey = subkey;    CBDB_BLobStream* bstream = m_CacheDB->CreateStream();    return         new CBDB_CacheIWriter(m_Path.c_str(),                               key,                               version,                              subkey,                              bstream,                               *m_CacheDB,                              *m_CacheAttrDB,                              m_TimeStampFlag & fTrackSubKey);}void CBDB_Cache::Remove(const string& key){    CFastMutexGuard guard(x_BDB_BLOB_CacheMutex);    vector<SCacheDescr>  cache_elements;    // Search the records to delete    {{    CBDB_FileCursor cur(*m_CacheAttrDB);    cur.SetCondition(CBDB_FileCursor::eEQ);    cur.From << key;    while (cur.Fetch() == eBDB_Ok) {        int version = m_CacheAttrDB->version;        const char* subkey = m_CacheAttrDB->subkey;        int overflow = m_CacheAttrDB->overflow;        cache_elements.push_back(SCacheDescr(key, version, subkey, overflow));    }    }}    CBDB_Transaction trans(*m_Env);    m_CacheDB->SetTransaction(&trans);    m_CacheAttrDB->SetTransaction(&trans);    // Now delete all objects    ITERATE(vector<SCacheDescr>, it, cache_elements) {        x_DropBlob(it->key.c_str(),                    it->version,                    it->subkey.c_str(),                    it->overflow);    }	trans.Commit();    // Second pass scan if for some resons some cache elements are     // still in the database    cache_elements.resize(0);    {{    CBDB_FileCursor cur(*m_CacheDB);    cur.SetCondition(CBDB_FileCursor::eEQ);    cur.From << key;    while (cur.Fetch() == eBDB_Ok) {        int version = m_CacheDB->version;        const char* subkey = m_CacheDB->subkey;        cache_elements.push_back(SCacheDescr(key, version, subkey, 0));    }    }}    ITERATE(vector<SCacheDescr>, it, cache_elements) {        x_DropBlob(it->key.c_str(),                    it->version,                    it->subkey.c_str(),                    it->overflow);    }    trans.Commit();    m_CacheAttrDB->GetEnv()->TransactionCheckpoint();}time_t CBDB_Cache::GetAccessTime(const string&  key,                                 int            version,                                 const string&  subkey){    _ASSERT(m_CacheAttrDB);    CFastMutexGuard guard(x_BDB_BLOB_CacheMutex);

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?