📄 axpipe.h
字号:
void Out(CSeg *pSeg); ///< Send data to an attached CSink.
protected:
/// \brief The basic source of segments
///
/// Must override in all CSource derived classes. Should return a memory segment CSeg with
/// new data as long as there is data available.
/// \return NULL on error, zero-length on empty/eof, otherwise a CSeg with data.
virtual CSeg *In() = 0;
};
/// \brief /dev/null or NUL: in Windows
///
/// A trivial implementation of a CSource that will always return empty/eof
/// at every read.
class CSourceNull : public CSource {
protected:
/// \brief Always return an empty segment.
/// \return A pointer to a zero-length CSeg.
CSeg *In() {
return new CSeg;
}
};
/// \brief A memory buffer based source
class CSourceMem : public CSource {
CSeg *m_pSegSave; ///< The one and only segment provided by this class
protected:
/// \brief Get the one and only memory buffer the first time, then eof.
/// \return A CSeg with data or empty to indicate eof, or NULL for error.
CSeg *In() {
if (m_pSegSave) {
m_pSeg = m_pSegSave;
m_pSegSave = NULL;
return m_pSeg;
} else {
return new CSeg;
}
}
public:
/// \brief Initalize member variables
CSourceMem() {
m_pSegSave = NULL;
}
/// \brief Initialize with a buffer to read from
///
/// We do not take over ownership of the buffer! Keep track of it yourself!
/// \return A pointer to 'this' CSourceMem
CSourceMem *Init(size_t cb, const void *p) {
m_pSegSave = new CSeg(cb, p);
return this;
}
};
/// \brief A buffering filter enabling a pull programming-model.
///
/// There are some differences in the handling of pull model CFilter based
/// classes. Flush() has no effect in the pull-model filter.
///
/// Instead of overriding Out(), you should override InFilter().
/// There you use Open(), Read(), Pump() and Close() to perform opening, reading
/// writing and closing respectively.
///
/// When run in the threading version, InFilter() will execute in
/// a separate thread (as will downstream processing, until a new threaded
/// section is encountered).
///
/// All CFilter derived classes use a co-routine context to handle the reversal
/// from push to pull model. Essentially we initialize two co-routine contexts,
/// one for the caller Work(), which calls Out(), and one for InFilter() and Read().
/// Thus, when a segment arrives (is pushed), we switch to the InFilter context
/// which will the use Read() to pick up the waiting section. When Read() is then
/// called to get the next segment, it switches back to the Work co-routine, which
/// will wait for the next segment to arrive before switching back etc etc.
class CFilter : public CPipe {
private:
bool m_fFirstWork; ///< true until first call of Work()
CCoContext m_ctxFilter; ///< The InFilter() co-routine context, a newly created context.
static void CoFilter(void *pvThis); ///< Helper static member function to send as StartProc to the CCoContext m_ctxFilter.
void CoStartFilter(void *pvThis); ///< The start in-class of the filter co-routine context.
void Out(CSeg *pSeg); ///< Overriden Out() to handle switching to Filter co-routine context.
public:
CFilter(); ///< Initialize member variables.
~CFilter();
protected:
CCoContext m_ctxWork; ///< The Work() co-routine context, actually the caller current.
bool OutOpen(); ///< Prepare for processing.
bool OutClose(); ///< Send a NULL segment close signal to InFilter() and Read().
bool OutFlush(); ///< Send flush-request as a zero-length segment to Read()
void Work(); ///< Send the m_pSeg segment to the Filter
CSeg *Read(); ///< Get a segment, call from InFilter().
/// \brief The main override in a CFilter derived class.
///
/// Override and perform all processing function here. Use Read() to get
/// data, checking for NULL which indicates that this (sub)stream is empty,
/// and zero-length segments which indicate a flush request.
///
/// Always ensure that Open() get's called before getting any data with Read(),
/// and that Close() get's called after the last data is read. Also be prepared
/// to be called multiple times.
virtual void InFilter() = 0;
};
/// \brief A byte-wise filter class, enabling the caller to retrieve one byte at a time.
///
/// Use ReadByte() in your implementation of InFilter() to get a byte at a time as an int.
/// It'll return -1 on eos.
class CFilterByte : public CFilter {
protected:
bool GetNextSeg(); ///< Helper routine to get next segment.
protected:
int ReadByte(); ///< Read a byte from the stream.
CSeg *Read(); ///< Errror catcher, can't call Read() from CFilterByte derived.
size_t Skip(size_t cb); ///< Skip bytes in stream.
};
/// \brief A buffering filter class returning chunks of requested size.
class CFilterBlock : public CFilterByte {
protected:
CSeg *ReadBlock(size_t cb); ///< Attempt to get a segment of a requested size.
};
/// \brief A Y join, taking any number of streams and joining them.
///
/// Build any number of streams, with CSource's at the start
/// and any number of CPipe sections, but do not terminate them
/// with a CSink.
///
/// Call Init(int) specifying how many streams you wish to attach to the join.
///
/// Get a CSink to terminate them with by calling the GetSink(int) member, with
/// a sink index as argument. This attaches the stream to the CJoin. If a
/// stream is started from a separate CSource, use CThreadSource<> to setup
/// a thread in which to run it.
///
/// Override the In() member function to peform custom merging
/// of many streams into one. They are indexed 0 to n-1. Use
/// StreamSeg(int) to call the appropriate stream fiber
/// context. StreamIx(int) to ensure an index is valid. StreamNum()
/// to get the maximum number of streams and StreamEmpty(int) to check if
/// an input stream is marked as empty.
///
/// The class supports merging of any number of streams.
class CJoin : public CSource {
/// \brief A helper class for the merge, each in stream gets a CSinkJoin like this.
///
/// The Out() method is overriden to communicate CJoin::In() via thread sync.
/// CJoin::In() calls CSinkJoin::m_ppInSinks[i].GetSeg(), which waits for
/// a new segment to arrive from the indexed source, and then provides it.
class CTSinkJoin : public CThread<CSink> {
CThreadSync m_Sync; ///< Synchronize in streams with worker join thread
CSeg *m_pNextSeg; ///< Communicates the next segment to In().
bool m_fEmpty; ///< Set when a NULL is output, to indicate that it's empty.
protected:
void Out(CSeg *pSeg); ///< Make data available to the CJoin
bool OutClose(); ///< Mark input as empty
bool OutFlush(); ///< Forward a flush request to the CJoin
public:
CTSinkJoin(); ///< Basic init of members
CSeg *GetSeg(); ///< Get the current segment pointer.
bool IsEmpty(); ///< True if empty. Obviously.
void SinkWorkWait(); ///< Wait for this sink make a segment ready via GetSeg().
void SinkWorkEnd(); ///< Signal this sink that you've accepted the segment via GetSeg().
};
CTSinkJoin **m_ppInSinks; ///< The array of in-stream control objects.
int m_nMaxStreams; ///< The max number of streams we're prepared for with Init().
public:
CJoin(); ///< Construct the CJoin, but Init() must also be called
virtual ~CJoin(); ///< Also destruct all the in stream objects.
void OutPlug();
CSink &GetSink(int ix);
CJoin *Init(int nMaxStreams = 2); ///< Define how many streams you want here.
//
// Utility routines for In()
//
CSeg *StreamSeg(int ix); ///< Get pointer to segment from given stream
int StreamIx(int ix); ///< Reduce ix % StreamNum()
int StreamNum(); ///< Get the current number of streams.
bool StreamEmpty(int ix); ///< Tell if an indexed stream is marked as empty.
};
/*! \page PageIntro Introduction
AxPipe is suitable when one or more streams of data are to be processed,
producing one or more streams of output. A pipe line of independent sections
is built in run-time, with each stage optionally running in it's own thread
at the programmers discretion.
\section Background
AxPipe is useful in all situations where streams of data is
to be transformed. It grew out of many needs, mostly centered around
encryption and compression, but it should be useful in many other
similar cases, such as sound and video codecs and players, hashing,
splitting, joining, backup, restore, archives and hopefully many
uses I have not thought about.
\section Portability
AxPipe is currently optimized and centered around the Win32 platform. There are
no fundamental reasons why the framework could not be implemented for Linux as
well, and I'd appreciate any such contributions and would be glad to incorporate
them into the main stream code.
\section Examples
There's a small first example to study in \ref PageSample1, and a more complex one
in \ref PageSample2. It may also be helpful to study the AxPipe::Stock transformations,
as they are some samples of derived filters and pipes, as well as AxPipe::CSourceFileMap
and AxPipe::CSinkFileMap.
For a complete real life example, please see the AxDecrypt program, part of the <A HREF="http://axcrypt.sf.net">AxCrypt</A>
package. Download the source code and examine AxDecrypt.cpp.
\see \ref PageIntro "Introduction", \ref PageInstall "Installation", \ref PageSample1 "A First Example",
\ref PageSample2 "A Second Examle",
\ref PageDef "Definitions of Terms", \ref PageStock "Stock Transformations", \ref PageUtil "Utilities and Overrides"
*/
/*! \page PageInstall Installation
The easiest way to include the library is to include the AxPipe
project into your solution or workspace. It's configured to work
as a statically linked library. There is currently no DLL interface,
and there won't likely be one, it's not that kind of library.
You should include AxPipe.h at the least, it should be fairly obvious
what other includes are necessary. Your project should define the
path to where-ever you placed the AxPipe source code as an extra
include directory.
You should normally compile your project with the Multi Threaded
Run Time Library. AxPipe is configured to, and you'll get a linker
error message otherwise. If you're sure you won't be using the multi-
threaded capability you can of course change this to single-threaded,
but there's no real gain in doing so in most cases.
AxPipe has one external dependency currently, and that's ZLib. If you
don't need the AxPipe::Stock::CPipeInflate class, you can mark
CPipeInflate.cpp as not part of the compile. Optionally, you may also
choose to include AxPipe on a file-by-file basis directly into your
own project, and only include those parts that are relevant to you.
\section ZLib
The external dependency is to ZLib 1.2.1, you can pick your very own copy
at http://www.gzip.org/zlib/. The standard project is setup to expect
a directory named 'Contrib' at the same level as 'AxPipe', and for this
case a subdirectory named zlib121. In other words, the file CPipeInflate.cpp
has an additional include directory setup as ../Contrib/zlib121. If there's
nothing there, and you include CPipeInflate.cpp in the build, you'll get
a compilation error about zlib.h not found.
\see \ref PageIntro "Introduction", \ref PageInstall "Installation", \ref PageSample1 "A First Example",
\ref PageSample2 "A Second Examle",
\ref PageDef "Definitions of Terms", \ref PageStock "Stock Transformations", \ref PageUtil "Utilities and Overrides"
*/
/*! \page PageStock Stock Transformations
The idea in the long term is to collect a number of transformations that
are wrapped by this framework, so that authors may combine in new
combinations. Look to the project site, http://axpipe.sourceforge.net for
available AxPipe::Stock transforms, and do please submit your own!
\see \ref PageIntro "Introduction", \ref PageInstall "Installation", \ref PageSample1 "A First Example",
\ref PageSample2 "A Second Examle",
\ref PageDef "Definitions of Terms", \ref PageStock "Stock Transformations", \ref PageUtil "Utilities and Overrides"
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -