message_queue.cpp

来自「ncbi源码」· C++ 代码 · 共 284 行

CPP
284
字号
/* * =========================================================================== * PRODUCTION $Log: message_queue.cpp,v $ * PRODUCTION Revision 1000.5  2004/06/01 20:44:07  gouriano * PRODUCTION PRODUCTION: UPGRADED [GCC34_MSVC7] Dev-tree R1.17 * PRODUCTION * =========================================================================== *//*  $Id: message_queue.cpp,v 1000.5 2004/06/01 20:44:07 gouriano Exp $ * =========================================================================== * *                            PUBLIC DOMAIN NOTICE *               National Center for Biotechnology Information * *  This software/database is a "United States Government Work" under the *  terms of the United States Copyright Act.  It was written as part of *  the author's official duties as a United States Government employee and *  thus cannot be copyrighted.  This software/database is freely available *  to the public for use. The National Library of Medicine and the U.S. *  Government have not placed any restriction on its use or reproduction. * *  Although all reasonable efforts have been taken to ensure the accuracy *  and reliability of the software and data, the NLM and the U.S. *  Government do not and cannot warrant the performance or results that *  may be obtained by using this software or data. The NLM and the U.S. *  Government disclaim all warranties, express or implied, including *  warranties of performance, merchantability or fitness for any particular *  purpose. * *  Please cite the author in any work or product based on this material. * * =========================================================================== * * Authors:  Mati Shomrat * * File Description: *    CMessageQueue - GBench message queue */#include <ncbi_pch.hpp>#include <corelib/ncbistd.hpp>#include <corelib/ncbifile.hpp>#include <util/thread_pool.hpp>#include <gui/core/plugin_handle.hpp>#include <gui/core/plugin_registry.hpp>#include <gui/core/obj_convert.hpp>#include <gui/core/message_queue.hpp>#include <gui/plugin/PluginMessage.hpp>#include <gui/plugin/PluginReply.hpp>#include <gui/plugin/PluginReplyAction.hpp>#include <gui/core/message_history.hpp>#include <gui/core/plugin_utils.hpp>#include <gui/utils/message_box.hpp>#include <objects/seqset/Seq_entry.hpp>#include <serial/serial.hpp>#include <serial/objostrasn.hpp>#include "plugin_thread.hpp"BEGIN_NCBI_SCOPEUSING_SCOPE(objects);// static variablesstatic const SIZE_TYPE          QUEUE_SIZE = 100;// static mutex guarding access to the message queueDEFINE_STATIC_MUTEX(s_MessageQueueLock);// the one and only message queueCPluginMessageQueue::TMessageQueue CPluginMessageQueue::sm_Queue(QUEUE_SIZE);// a parallel task queue to hold messages in progressCPluginMessageQueue::TTasks CPluginMessageQueue::sm_TaskList;SIZE_TYPE CPluginMessageQueue::Size(void) THROWS_NONE{    return sm_Queue.GetSize();}SIZE_TYPE CPluginMessageQueue::Capacity(void) THROWS_NONE{    return sm_Queue.GetMaxSize();}bool CPluginMessageQueue::IsEmpty(void) THROWS_NONE{    return sm_Queue.IsEmpty();}bool CPluginMessageQueue::IsFull(void) THROWS_NONE{    return sm_Queue.IsFull();}// clear the message queuevoid CPluginMessageQueue::Clear(void) THROWS_NONE{    CMutexGuard GUARD(s_MessageQueueLock);    while ( !sm_Queue.IsEmpty() ) {        Get();    }    CPluginMessageHistory::Instance().Clear();}//// process a single message off of the queue//void CPluginMessageQueue::ProcessMessage(size_t timeout_sec,                                         size_t timeout_nsec) THROWS_NONE{    CMutexGuard GUARD(s_MessageQueueLock);    // first, see if we can get a message off of the queue    CRef<CPluginMessage> msg(Get(timeout_sec, timeout_nsec));    if (msg) {        ProcessMessage(*msg);    }    // next, reclaim our finished tasks    if (sm_TaskList.size() != 0) {        TTasks::iterator iter;        for (iter = sm_TaskList.begin();  iter != sm_TaskList.end();  ) {            if ( (*iter)->IsCompleted() ) {                (*iter)->Finalize();                sm_TaskList.erase(iter++);            } else {                ++iter;            }        }    }}void CPluginMessageQueue::ProcessMessage(CPluginMessage& msg){    CMutexGuard GUARD(s_MessageQueueLock);    // write to log and history    CPluginMessageHistory::Instance().AddMessage(msg);    const CPluginMessage::TDestination* dest = 0;    // Resolve the destination of the message:    //  - if destination specified use it, otherwise    //  - if it's a reply return to sender.    //  - in other cases drop the message    if ( msg.CanGetDestination() ) {        dest = &msg.GetDestination();    }     CPluginHandle handle;    if ( dest != 0 ) {        handle = CPluginRegistry::GetPlugin(*dest);        if ( !handle ) {            x_DropMessage(msg);            return;        }    }    // call the plugin    try {        CRef<CPluginTask> new_task(new CPluginTask(handle, msg));        if (new_task->IsCompleted() ) {            new_task->Finalize();        } else {            sm_TaskList.push_back(new_task);        }    }    catch (CException& e) {        string str("Error running plugin ");        str += *dest;        str += "\n";        str += e.GetMsg();        LOG_POST(Error << str);        NcbiMessageBox(str);    }}void CPluginMessageQueue::x_DropMessage(const CPluginMessage& msg){    LOG_POST(Info << "Unhandled message: "  << msg.ToString());}CPluginMessage*CPluginMessageQueue::Get(size_t timeout_sec,                         size_t timeout_nsec) THROWS_NONE{    if ( !IsEmpty() ) {        try {            CRef<CPluginMessage> msg = sm_Queue.Get(timeout_sec, timeout_nsec);            // !!! HISTORY: time message left queue            return msg.Release();        }        catch ( exception& ) {            //          }    }    return NULL;}bool CPluginMessageQueue::Add(CPluginMessage& msg) THROWS_NONE{    try {        while ( sm_Queue.IsFull() ) {            sm_Queue.WaitForRoom();        }        CRef<CPluginMessage> ref(&msg);        sm_Queue.Put(ref);        return true;    }    catch (exception&) {    }        return false;}END_NCBI_SCOPE/* * =========================================================================== * $Log: message_queue.cpp,v $ * Revision 1000.5  2004/06/01 20:44:07  gouriano * PRODUCTION: UPGRADED [GCC34_MSVC7] Dev-tree R1.17 * * Revision 1.17  2004/05/21 22:27:40  gorelenk * Added PCH ncbi_pch.hpp * * Revision 1.16  2004/04/07 12:45:38  dicuccio * Dropped unnecessary _TRACE().  Don't add a task to the task list if it has * completed immediately * * Revision 1.15  2004/03/23 13:37:14  dicuccio * Always use the task queue for processing messages, whether they are threaded or * not * * Revision 1.14  2004/01/21 12:35:13  dicuccio * Deleted unused variables.  Changed to no object converter API. * * Revision 1.13  2004/01/13 21:03:34  ucko * Tweak to fix compilation when set::iterator yields only const access * to elements (as GCC 3 does to ensure that they're always in correct order). * * Revision 1.12  2004/01/13 20:33:33  dicuccio * Be smarted about calling UpdateAllViews() - call only once per document instead * of once per object * * Revision 1.11  2003/12/31 20:27:07  dicuccio * Code clean-up - deleted dead code * * Revision 1.10  2003/12/22 19:19:40  dicuccio * Removed internal singleton * * Revision 1.9  2003/12/09 15:44:04  dicuccio * Use CExpcetion::GetMsg() instead of what() * * Revision 1.8  2003/11/18 17:40:44  dicuccio * Added default processing of plugin replies * * =========================================================================== */

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?