📄 file_messaging.cpp
字号:
/* * =========================================================================== * PRODUCTION $Log: file_messaging.cpp,v $ * PRODUCTION Revision 1000.2 2004/06/01 18:28:37 gouriano * PRODUCTION PRODUCTION: UPGRADED [GCC34_MSVC7] Dev-tree R1.11 * PRODUCTION * =========================================================================== *//* $Id: file_messaging.cpp,v 1000.2 2004/06/01 18:28:37 gouriano Exp $* ===========================================================================** PUBLIC DOMAIN NOTICE* National Center for Biotechnology Information** This software/database is a "United States Government Work" under the* terms of the United States Copyright Act. It was written as part of* the author's official duties as a United States Government employee and* thus cannot be copyrighted. This software/database is freely available* to the public for use. The National Library of Medicine and the U.S.* Government have not placed any restriction on its use or reproduction.** Although all reasonable efforts have been taken to ensure the accuracy* and reliability of the software and data, the NLM and the U.S.* Government do not and cannot warrant the performance or results that* may be obtained by using this software or data. The NLM and the U.S.* Government disclaim all warranties, express or implied, including* warranties of performance, merchantability or fitness for any particular* purpose.** Please cite the author in any work or product based on this material.** ===========================================================================** Authors: Paul Thiessen** File Description:* file-based messaging system** ===========================================================================*/#ifdef _MSC_VER#pragma warning(disable:4018) // disable signed/unsigned mismatch warning in MSVC#endif#include <ncbi_pch.hpp>#include <corelib/ncbistd.hpp>#include <corelib/ncbidiag.hpp>#include <corelib/ncbi_system.hpp>#include <util/stream_utils.hpp>#include <memory>#include "file_messaging.hpp"BEGIN_NCBI_SCOPE// diagnostic streams#define TRACEMSG(stream) ERR_POST(Trace << stream)#define INFOMSG(stream) ERR_POST(Info << stream)#define WARNINGMSG(stream) ERR_POST(Warning << stream)#define ERRORMSG(stream) ERR_POST(Error << stream)#define FATALMSG(stream) ERR_POST(Fatal << stream)FileMessenger::FileMessenger(FileMessagingManager *parentManager, const std::string& messageFilename, MessageResponder *responderObject, bool isReadOnly) : manager(parentManager), responder(responderObject), messageFile(messageFilename), lockFile(string(messageFilename) + ".lock"), lastKnownSize(0), readOnly(isReadOnly){ TRACEMSG("monitoring message file " << messageFilename);}static fstream * CreateLock(const CDirEntry& lockFile){ if (lockFile.Exists()) { TRACEMSG("unable to establish a lock - lock file exists already"); return NULL; } auto_ptr<fstream> lockStream(CFile::CreateTmpFile(lockFile.GetPath(), CFile::eText, CFile::eAllowRead)); if (lockStream.get() == NULL || !(*lockStream)) { TRACEMSG("unable to establish a lock - cannot create lock file"); return NULL; } char lockWord[4]; lockStream->seekg(0); if (CStreamUtils::Readsome(*lockStream, lockWord, 4) == 4 && lockWord[0] == 'L' && lockWord[1] == 'O' && lockWord[2] == 'C' && lockWord[3] == 'K') { ERRORMSG("lock file opened for writing but apparently already LOCKed!"); return NULL; } lockStream->seekg(0); lockStream->write("LOCK", 4); lockStream->flush(); TRACEMSG("lock file established: " << lockFile.GetPath()); return lockStream.release();}FileMessenger::~FileMessenger(void){ // sanity check to make sure each command issued received a reply bool okay = false; CommandOriginators::const_iterator c, ce = commandsSent.end(); TargetApp2Command::const_iterator a, ae; if (commandsSent.size() == repliesReceived.size()) { for (c=commandsSent.begin(); c!=ce; ++c) { CommandReplies::const_iterator r = repliesReceived.find(c->first); if (r == repliesReceived.end()) break; for (a=c->second.begin(), ae=c->second.end(); a!=ae; ++a) { if (r->second.find(a->first) == r->second.end()) break; } if (a != ae) break; } if (c == ce) okay = true; } if (!okay) WARNINGMSG("FileMessenger: did not receive a reply to all commands sent!"); // last-minute attempt to write any pending commands to the file if (pendingCommands.size() > 0) { auto_ptr<fstream> lockStream(CreateLock(lockFile.GetPath())); if (lockStream.get() == NULL) { int nTries = 1; do { SleepSec(1); lockStream.reset(CreateLock(lockFile)); ++nTries; } while (lockStream.get() == NULL && nTries <= 30); } if (lockStream.get() != NULL) SendPendingCommands(); else ERRORMSG("Timeout occurred when attempting to flush pending commands to file"); } // sanity check to make sure each command received was sent a reply okay = false; ce = commandsReceived.end(); if (commandsReceived.size() == repliesSent.size()) { for (c=commandsReceived.begin(); c!=ce; ++c) { CommandReplies::const_iterator r = repliesSent.find(c->first); if (r == repliesSent.end()) break; for (a=c->second.begin(), ae=c->second.end(); a!=ae; ++a) { if (r->second.find(a->first) == r->second.end()) break; } if (a != ae) break; } if (c == ce) okay = true; } if (!okay) ERRORMSG("FileMessenger: did not send a reply to all commands received!");}void FileMessenger::SendCommand(const std::string& targetApp, unsigned long id, const std::string& command, const std::string& data){ if (readOnly) { WARNINGMSG("command '" << command << "' to " << targetApp << " received but not written to read-only message file"); return; } // check against record of commands already sent CommandOriginators::iterator c = commandsSent.find(id); if (c != commandsSent.end() && c->second.find(targetApp) != c->second.end()) { ERRORMSG("Already sent command " << id << " to " << targetApp << '!'); return; } commandsSent[id][targetApp] = command; // create a new CommandInfo on the queue - will actually be sent later pendingCommands.resize(pendingCommands.size() + 1); pendingCommands.back().to = targetApp; pendingCommands.back().id = id; pendingCommands.back().command = command; pendingCommands.back().data = data;}void FileMessenger::SendReply(const std::string& targetApp, unsigned long id, MessageResponder::ReplyStatus status, const std::string& data){ // check against record of commands already received and replies already sent CommandOriginators::iterator c = commandsReceived.find(id); if (c == commandsReceived.end() || c->second.find(targetApp) == c->second.end()) { ERRORMSG("Can't reply; have not received command " << id << " from " << targetApp << '!'); return; } CommandReplies::iterator r = repliesSent.find(id); if (r != repliesSent.end() && r->second.find(targetApp) != r->second.end()) { ERRORMSG("Already sent reply " << id << " to " << targetApp << '!'); return; } repliesSent[id][targetApp] = status; if (readOnly) { TRACEMSG("reply " << id << " to " << targetApp << " logged but not written to read-only message file"); } else { // create a new CommandInfo on the queue - will actually be sent later pendingCommands.resize(pendingCommands.size() + 1); pendingCommands.back().to = targetApp; pendingCommands.back().id = id; switch (status) { case MessageResponder::REPLY_OKAY: pendingCommands.back().command = "OKAY"; break; case MessageResponder::REPLY_ERROR: pendingCommands.back().command = "ERROR"; break; default: ERRORMSG("Unknown reply status " << status << '!'); pendingCommands.back().command = "ERROR"; break; } pendingCommands.back().data = data; }}void FileMessenger::PollMessageFile(void){ // skip all checking if message file doesn't exist or lock file is already present if (!messageFile.Exists() || !messageFile.IsFile() || lockFile.Exists()) return; // check to see if we need to read file's contents CFile mf(messageFile.GetPath()); Int8 messageFileSize = mf.GetLength(); if (messageFileSize < 0) { ERRORMSG("Couldn't get message file size!"); return; } bool needToRead = (messageFileSize > lastKnownSize); // only continue if have new commands to receive, or have pending commands to send if (!needToRead && pendingCommands.size() == 0) return; TRACEMSG("message file: " << messageFile.GetPath());// TRACEMSG("current size: " << (long) messageFileSize);// TRACEMSG("last known size: " << (long) lastKnownSize); if (needToRead) TRACEMSG("message file has grown since last read"); if (pendingCommands.size() > 0) TRACEMSG("has pending commands to send"); // since we're going to read or write the file, establish a lock now auto_ptr<fstream> lockStream(CreateLock(lockFile)); if (lockStream.get() == NULL) return; // try again later, so program isn't locked during wait // first read any new commands from the file if (needToRead) ReceiveCommands(); // then send any pending commands if (pendingCommands.size() > 0) SendPendingCommands(); // now update the size stamp to current size so we don't unnecessarily read in any commands just sent lastKnownSize = mf.GetLength(); if (lastKnownSize < 0) { ERRORMSG("Couldn't get message file size!"); lastKnownSize = 0; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -