📄 axpipe.h
字号:
/// not be stored by the receiver, unless it is inherently thread safe - i.e. never written to.
class CSignal : public CCriticalSection {
void *m_Id; ///< A unique identity for the receiver
void *m_Param; ///< An opaque parameter, most likely a pointer
public:
/// \brief Construct with unique identity and opaque pointer for the receiver
CSignal(void *Id = NULL, void *Param = NULL) {
m_Id = Id;
m_Param = Param;
}
/// \return The unique Id
void *Id() {
return m_Id;
}
/// \return The opaque parameter
void *Param() {
return m_Param;
}
};
/// \brief A custom segment for signalling
///
/// Use a CSegSignal segment to send in-band signals downstream. It encapsulates an
/// ClassId and an opaque parameter pointer, to be used as appropriate by the consumer.
/// Typical use is Pump(new CSegSignal(aClass::ClassId(), aClass-instance-pointer));
class CSegSignal : public CSeg {
public:
/// \brief Construct with given ClassId and parameter
/// \param Id A ClassId - typically the pointer to a static variable
/// \param Param An opaque pointer or parameter. Interpreted by the consumer.
CSegSignal(void *Id = NULL, void *Param = NULL) : CSeg(sizeof CSignal, new CSignal(Id, Param)) {
SetType(eSegTypeSignal);
}
/// \brief Destruct, deleting the pointer owned by us
virtual ~CSegSignal() {
delete (CSignal *)Ptr();
}
/// \brief Get a pointer to the CSignal
/// \return Get a properly cast pointer to the CSignal object
CSignal *PtrSignal() {
return (CSignal *)Ptr();
}
};
/// \brief Global initialization for the AxPipe framework.
///
/// It's not clear if GetCurrentFiber() is guaranteed to return NULL
/// before a fiber is created, so to be sure we keep track of it/thread
/// here.
///
/// Can't use __declspec(thread) static as it doesn't work well in delay loaded DLL's
///
/// The TLS index is TlsAlloc()'d once per process during run-time startup by
/// initializing the static. At the same time we register an atexit() function to
/// handle the TlsFee() of the TLS index.
///
/// Can't use static initializers since we sometimes want to use this functionality
/// without the benefit of run time library support for this, thus we require
/// the main thread to create an object that initializes all this.
///
/// Create exactly one object of type CGlobalInit, and let it destruct on program exit.
class CGlobalInit {
private:
HCRYPTPROV m_hCryptProv; ///< The random number generator context
public:
/// \brief Initialized thread local storage and other global data.
CGlobalInit() {
ASSAPI(CryptAcquireContext(&m_hCryptProv, NULL, NULL, PROV_RSA_FULL, CRYPT_VERIFYCONTEXT));
if (InterlockedIncrement(&nGlobalInit) == 1) {
dwTlsIndex = TlsAlloc();
}
}
/// \brief Free thread local storage.
~CGlobalInit() {
ASSAPI(CryptReleaseContext(m_hCryptProv, 0));
if (InterlockedDecrement(&nGlobalInit) == 0 && dwTlsIndex != TLS_OUT_OF_INDEXES) {
TlsFree(dwTlsIndex);
dwTlsIndex = TLS_OUT_OF_INDEXES;
}
}
/// \brief Fill buffer with random data
void Random(void *p, size_t cb) {
ASSAPI(CryptGenRandom(m_hCryptProv, (DWORD)cb, (BYTE *)p));
}
};
/// \brief The base class of all CSink and CPipe derived pipe sections.
///
/// A CSink should be at the terminating end of all pipe lines, if you
/// really don't need one, attach a /dev/null equivalent sink, CSinkNull.
/// (The framework will work anyway, but it's good practice.)
///
/// Convention dictates that all CSink derived classes are named with
/// CSink as a prefix.
///
/// CPipe is a derivation of CSink too, basically with the added logic
/// to send data onwards.
/// \see CSource
/// \see CPipe
/// \see CFilter
class CSink : public CNoThread, public CError {
friend class CPipe; ///< CPipe needs private access
protected:
bool m_fIsOpen; ///< Keep track if this part is open
CSeg *m_pSeg; ///< Next/Current segment to work on
private:
bool DoSegWork(CSeg *pSeg); ///< Send a CSeg onwards for processing, handling special types.
protected:
void Work(); ///< Process one CSeg
CSeg *GetSeg(size_t cb); ///< Allocate a new segment, possibly from the next section of the pipe.
virtual void Signal(void *vId, void *p);///< Out of band signalling down stream place holder.
longlong SizeMax(); ///< Estimate the final sinks size, if limited.
public:
virtual void OutPump(CSeg *pSeg); ///< Hand off a segment to Work()
protected:
virtual longlong OutSizeMax(); ///< Overrideable, Calculate the maximum size of the CSink.
virtual CSeg *OutGetSeg(size_t cb); ///< Overrideable, Allocate a CSeg for this sink.
virtual bool OutSignal(void *vId, void *p); ///< Overrideable, Receive an out of band signal from upstream.
virtual bool OutOpen(); ///< Overrideable, Open the data stream for processing
virtual bool OutFlush(); ///< Overrideable, Handle request for flush of buffered data.
virtual bool OutClose(); ///< Overrideable, Output any final data, close and prepare for new Open().
virtual void OutPlug(); ///< Plug this section.
virtual bool OutSpecial(CSeg *pSeg); ///< Overrideable, Consume a special segment.
/// \brief Overrideable, Consume a segment and Pump() the processed result downstream.
///
/// The provided segment is guaranteed to be non-NULL and non-zero-length
/// A special CSeg with a non-zero AxPipe::eSegType value will be sent to OutSpecial().
/// This is must be implemented in derived classes, there is no default. For CSink
/// directly derived class the actual method of consuming it is up to the CSink, for
/// CPipe derived classes, CPipe::Pump() is the normal method for sending processed
/// data.
/// \param pSeg A memory segment to process or consume. CSeg::Release() it when done with it.
/// \see CPipe::Out()
/// \see CPipe::Pump()
/// \see AxPipe::eSegType
virtual void Out(CSeg *pSeg) = 0;
public:
CSink(); ///< Default constructor.
virtual ~CSink(); ///< Clean up.
virtual void AppendSink(CSink *pSink, bool fAutoDelete); ///< Error catcher.
virtual void DestructSink(); ///< Destruct code place holder for derived classes.
virtual void Sync(); ///< Ensure that all threads downstream are idle
};
/// \brief The generic pipe-segment as an abstract class.
///
/// All non CSink objects are derived from CPipe, push and pull model
/// processing segments as well as CSource.
///
/// The minimum derived class overrides Out() and processes the CSeg provided,
/// using the utility member function Pump() to send processed data downstream.
class CPipe : public CSink {
friend class CSplit; ///< CSplit needs private access.
void DestructSink(); ///< The actual destructor code.
protected:
CSink *m_pSink; ///< Forward pointer to next section downstream.
void Work(); ///< Process one memory segment, possibly propagating.
void AppendSink(CSink *pSink, bool fAutoDelete); ///< Append a section by pointer.
CSeg *GetSeg(size_t cb); ///< Utility function, call if you think the next is a CSink that might give you an efficient segment.
void Signal(void *vId, void *p); ///< Out of band signalling downstream.
longlong OutSizeMax(); ///< Overrideable, Calculate the maximum size of the CSink.
CSeg *OutGetSeg(size_t cb); ///< Overrideable, Allocate a writeable CSeg, possibly optimized for the CSink.
bool OutSignal(void *vId, void *p); ///< Overrideable, Receive an out of band signal from upstream.
bool OutOpen(); ///< Overrideable, Open the data stream for processing
bool OutClose(); ///< Overrideable, Output any final data, close and prepare for new Open().
bool OutSpecial(CSeg *pSeg); ///< Overrideable, Process a special segment and send results downstream with Pump(). CSeg::Release() it when done with it.
/// \brief Overrideable, Consume a segment and Pump() the processed result downstream.
///
/// The provided segment is guaranteed to be non-NULL and non-zero-length
/// Special CSeg's with a non-zero eSegType value will be sent to OutSpecial() instead.
/// Out() must be implemented in derived classes, there is no default. Pump() is the normal
/// method for sending processed data.
/// \param pSeg A memory segment to process or consume.
/// \see Pump()
void Out(CSeg *pSeg) = 0;
public:
CPipe(); ///< Initialize member variables.
~CPipe(); ///< Destruct sink
CPipe *Append(CSink *pSink); ///< Append a section by pointer with auto deletion.
CPipe *Append(CSink& sink); ///< Append a section by reference.
void Sync(); ///< Ensure that all threads downstream are idle
void Open(); ///< Utility function, call to open the pipe downstream for output.
void Pump(CSeg *pSeg); ///< Utility function, call typically from Out(), to send a segment downstream.
void Flush(); ///< Utility function, call to flush the pipe downstream.
void Close(); ///< Utility function, call to close the pipe downstream for output.
};
/// \brief /dev/null or NUL: in Windows parlance
///
/// A dummy dead-end CSink.
class CSinkNull : public CSink {
public:
/// \brief Consume the given segment, guaranteed to be non-NULL by calling CSeg::Release().
/// \param pSeg A segment to consume.
inline void Out(CSeg *pSeg) {
pSeg->Release();
}
};
/// \brief A forward Y-split, divides a stream into two.
///
/// It does nothing more to the data, except pass each segment onwards,
/// but twice, to each of the streams given ('left' and 'right')
/// \see CPipe
class CSplit : public CPipe {
CPipe *m_pLeft; ///< The start of the 'left' side of the split.
CPipe *m_pRight; ///< The start of the 'right' side of the split.
void DestructSink(); ///< Always delete the left and right upon deletion of this part.
void PumpSplit(CSeg *pSeg); ///< Send the same segment down both left and right legs of the split
public:
CSplit(); ///< Construct and initialize the member variables.
void AppendSink(CSink *pSink, bool fAutoDelete); ///< Do not append a section, it's an error here.
void Sync(); ///< Ensure that all threads downstream are idle
CSplit *Init(CPipe *pLeft, CPipe *pRight); ///< Initialize split with left and right pointers to pipes.
void Out(CSeg *pSeg); ///< Send the same segment downstream to both parts of the split.
bool OutSpecial(CSeg *pSeg); ///< Send the same special segment downstream to both parts of the split.
bool OutFlush(); ///< Send a flush signal downstream to both parts of the split.
bool OutClose(); ///< Send a close signal downstream to both parts of the split.
bool OutOpen(); ///< Send an open signal downstream to both parts of the split.
};
/// \brief Accept pushed segments n blocks of m bytes at a time (except last)
///
/// Buffer data and work on them in blocks of m bytes. Each segment may be a multiple
/// of m bytes long. This simplifies working with block oriented data streams or
/// processes, such as block ciphers. If there's a partial block, it'll be available
/// from CPipeBlock::BlockPart() when CPipeBlock::OutClose() is called.
class CPipeBlock : public CPipe {
CSeg *m_pBlockPart; ///< Buffer partial blocks, always m_cbBlockSize in size.
size_t m_cbBlockSize; ///< The size of the blocks in bytes.
public:
CPipeBlock(); ///< Initialize member variables.
virtual ~CPipeBlock(); ///< Destruct additional member data.
CPipeBlock *Init(size_t cbBlockSize); ///< Set the size of the blocks to be provided to CPipeBlock::Out()
void OutPump(CSeg *pSeg); ///< Internal framework override to handle the blocking.
CSeg *PartialBlock(); ///< Get the partial block pointer.
};
/// \brief A generic source, as an abstract class.
///
/// You must override In(), and most likely provide your own constructor as well.
/// The OutOpen() override shoulde be able to handle multiple calls, with OutClose() inbetween
/// of course. Once open, Drain() should be called. This will push data from In() downstream
/// until it signals empty, whereupon a flush is sent. If the source supports it,
/// Drain() may be called multiple times in a row.
///
/// To support usage of multiple sources within a specific pipe, you may
/// implement the OutClose() and OutOpen() methods, typically to close a file,
/// and then open a new one, respectively.
///
/// Shutdown of the pipe occurs by either calling Plug() explicitly, or by calling it implicitly
/// from the destructor of the CSource.
class CSource : public CPipe {
public:
virtual ~CSource(); ///< Ensure Plug() is called.
CSource *Append(CSink *pSink); ///< Append a section by pointer with auto deletion.
CSource *Append(CSink& sink); ///< Append a section by reference.
CSource *Open(); ///< Open the source and possibly propagate downstream
CSource *Close(); ///< Close the source and possible propagate downstream
CSource *Drain(); ///< Drain the pipe until In() says we're empty for now.
CSource *Plug(); ///< Plug this pipe, prepare for exit, cannot reopen after this.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -