📄 pipe.c
字号:
}
Boolean
pipeIsAtMark(Pipe pipe)
{
ASSERT(pipe->usedBits == 0);
return Boolean(bufIsAtMark(&pipe->residueBuf)
|| (bufIsEmpty(&pipe->residueBuf)
&& bufIsAtMark(&pipe->masterBufs[pipe->curSeg])));
}
/*
* Returns true if chunk can be got/peeked without hitting eof
*/
Boolean
pipeIsAvail(Pipe pipe, size_t size)
{
ASSERT(pipe->usedBits == 0);
return Boolean(pipeFillBuf(pipe, size) != NULL);
}
Boolean
pipeIsAvailUnmarked(Pipe pipe, size_t chunk)
{
return Boolean(pipeIsAvail(pipe, chunk) && pipeAvailUnmarked(pipe) >= chunk);
}
size_t
pipeAvailUnmarked(Pipe pipe)
{
size_t unmarked;
unmarked = pipe->residueBuf.avail - pipe->residueBuf.marked;
if (pipe->residueBuf.marked == 0) {
unmarked += pipe->masterBufs[pipe->curSeg].avail
- pipe->masterBufs[pipe->curSeg].marked;
}
return unmarked;
}
PipePosition
pipePosition(Pipe pipe)
{
Buf *bufp;
if ((bufp = pipeFillBuf(pipe, 1)) == NULL) {
/*
* There is no position for a pipe at EOF
*/
longjmp(pipe->eofJmpBuf, 1);
}
return bufp->bufPosList[bufp->bplGet].position;
}
/*
* Return the byte position in slave relative terms.
*/
PipePosition
pipeRelativePosition(Pipe pipe)
{
ASSERT(pipe->usedBits == 0);
return pipe->position;
}
/*
* Clear crc accumulator.
*/
void
pipeCrcClear(Pipe pipe)
{
ASSERT(pipe->usedBits == 0);
pipe->doCrc = TRUE;
pipe->crcSum = 0xffffffff;
}
/*
* Get accumulated crc since last crc clear.
*/
u32
pipeCrcGet(Pipe pipe)
{
ASSERT(pipe->usedBits == 0);
pipe->doCrc = FALSE;
return ~pipe->crcSum;
}
u16
pipeGetPid(Pipe pipe)
{
return pipe->pid;
}
PipeParser
pipeGetParser(Pipe pipe)
{
return pipe->parser;
}
/*************************************************************************
* Private Methods
*************************************************************************/
/*
* Non-inlinable called from inline
*
* INLINE_PRIVATE void
* pipeInlinePrivate(void)
* {
* }
*/
static RetCode
pipePut2(Pipe pipe, void *buf, size_t len, Boolean isMarked,
Boolean doFlush, PipePosition bufPosition)
{
if (pipe == PIPE_NULL) {
return RETCODE_SUCCESS;
}
switch (pipe->threadState) {
case PIPETHREADSTATE_INIT:
/*
* Thread is waiting in pipeThreadWrapper for master to complete
* instance initialization, start thread so it can run until it's
* first pipeFillBuf(). NOTE: It's illegal to detect an error before
* the first pipeFillBuf().
*/
(void) pipeFlush(pipe, FALSE);
if (pipe->threadState == PIPETHREADSTATE_ZOMBE) {
break;
}
ASSERT(pipe->threadState == PIPETHREADSTATE_RUNNING);
/* FALLTHRU */
case PIPETHREADSTATE_RUNNING:
bufAssign(&pipe->masterBufs[pipe->nSegs++], (unsigned char *)buf, len, isMarked,
bufPosition);
if (doFlush || pipe->nSegs >= PIPE_MAX_SEGMENTS) {
(void) pipeFlush(pipe, doFlush);
}
break;
case PIPETHREADSTATE_ZOMBE:
break;
default:
ABORT("Illegal threadState");
}
return pipe->retCode;
}
static RetCode
pipeFlush(Pipe pipe, Boolean isForced)
{
if (pipe == PIPE_NULL) {
return RETCODE_SUCCESS;
}
if (pipe->nSegs > 0 && pipe->threadState != PIPETHREADSTATE_ZOMBE) {
pipe->isFlush = isForced;
CHECK_IF_ERRNO(sema_post(&pipe->slaveSema));
CHECK_IF_ERRNO(sema_wait(&pipe->masterSema));
if (pipe->threadState != PIPETHREADSTATE_ZOMBE) {
ASSERT(pipe->curSeg == pipe->nSegs);
pipe->nSegs = 0;
}
}
return pipe->retCode;
}
INLINE_PRIVATE Buf *
pipeFillBuf2(Pipe pipe, size_t size)
{
Buf *bufp;
if (pipe->isEof) {
return NULL;
}
ASSERT(size <= pipe->residueBuf.len);
ASSERT(pipe->curSeg < pipe->nSegs);
while (size > pipe->residueBuf.avail
+ pipe->masterBufs[pipe->curSeg].avail) {
if (pipe->masterBufs[pipe->curSeg].avail > 0) {
(void) pipeFlush(pipe->lastDstPipe, TRUE);
bufTransfer(&pipe->residueBuf, &pipe->masterBufs[pipe->curSeg],
pipe->masterBufs[pipe->curSeg].avail);
}
pipe->curSeg += 1;
if (pipe->curSeg >= pipe->nSegs) {
if (pipe->isFlush) {
(void) pipeFlush(pipe->lastDstPipe, TRUE);
}
CHECK_IF_ERRNO(sema_post(&pipe->masterSema));
CHECK_IF_ERRNO(sema_wait(&pipe->slaveSema));
ASSERT(pipe->nSegs > 0);
pipe->curSeg = 0;
}
if (pipe->masterBufs[pipe->curSeg].avail == 0) {
pipe->curSeg += 1;
ASSERT(pipe->curSeg == pipe->nSegs);
pipe->isEof = TRUE;
return NULL;
}
ASSERT(pipe->masterBufs[pipe->curSeg].outp
== pipe->masterBufs[pipe->curSeg].basep);
}
ASSERT(pipe->residueBuf.avail + pipe->masterBufs[pipe->curSeg].avail
>= size);
if (pipe->residueBuf.avail == 0 || pipeIsPushBack(pipe)) {
bufp = &pipe->masterBufs[pipe->curSeg];
} else {
if (pipe->residueBuf.avail < size) {
size_t rem = size - pipe->residueBuf.avail;
(void) pipeFlush(pipe->lastDstPipe, TRUE);
bufTransfer(&pipe->residueBuf,
&pipe->masterBufs[pipe->curSeg], rem);
}
bufp = &pipe->residueBuf;
}
return bufp;
}
static Boolean
pipeIsPushBack(Pipe pipe)
{
Buf *masterBufp = &pipe->masterBufs[pipe->curSeg];
BufPosList *bplp;
size_t residueAvail;
size_t residueMarked;
if (pipe->residueBuf.avail > masterBufp->outp - masterBufp->basep) {
return FALSE;
}
residueAvail = pipe->residueBuf.avail;
residueMarked = pipe->residueBuf.marked;
(void) bufGet(&pipe->residueBuf, residueAvail);
if (residueMarked != 0) {
ASSERT(masterBufp->marked == 0);
masterBufp->marked = masterBufp->avail + residueMarked;
}
masterBufp->outp -= residueAvail;
masterBufp->avail += residueAvail;
ASSERT(masterBufp->bplGet < masterBufp->bplPut);
bplp = &masterBufp->bufPosList[masterBufp->bplGet];
bplp->len += residueAvail;
bplp->position -= residueAvail;
return TRUE;
}
/*************************************************************************
* Private Functions
*************************************************************************/
static void *
pipeThreadWrapper(void *arg)
{
Pipe pipe = (Pipe) arg;
jmp_buf jbuf;
RetCode retCode;
CHECK_IF_ERRNO(sema_wait(&pipe->slaveSema));
pipe->threadState = PIPETHREADSTATE_RUNNING;
pipe->eofJmpBuf = jbuf;
if (setjmp(jbuf) != 0) {
pipe->retCode = RETCODE_CONS(retCodeId, PIPE_ERROR_EOF);
goto done;
}
pipe->retCode = (*pipe->parser) (pipe->instp, pipe->cop, pipe);
/*
* Flush any pending sub-pipe data. If there's an error, it takes
* precedence -- the data was buffered and this error actually occurred
* earlier in the stream.
*/
if ((retCode = pipeFlush(pipe->lastDstPipe, TRUE)) != RETCODE_SUCCESS) {
pipe->retCode = retCode;
}
if (pipe->retCode == RETCODE_SUCCESS
&& pipe->residueBuf.avail + pipe->masterBufs[pipe->curSeg].avail > 0) {
pipe->retCode = RETCODE_CONS(retCodeId, PIPE_ERROR_EXTRANEOUS_DATA);
}
done:
/*
* Call parser to cleanup any child pipes, etc.
*/
if (pipe->cleanup != NULL) {
(*pipe->cleanup) (pipe->instp);
}
pipe->threadState = PIPETHREADSTATE_ZOMBE;
CHECK_IF_ERRNO(sema_post(&pipe->masterSema));
return NULL;
}
static Buf
bufInit(u8 *buf, size_t len, int bplSize)
{
Buf aBuf;
aBuf.basep = aBuf.outp = buf;
aBuf.len = len;
aBuf.avail = 0;
aBuf.marked = 0;
aBuf.bufPosList = NEW_ZEROED(BufPosList, bplSize);
aBuf.bplSize = bplSize;
aBuf.bplGet = 0;
aBuf.bplPut = 0;
return aBuf;
}
static void
bufAssign(Buf *bufp, u8 *datap, size_t avail, Boolean isMarked,
PipePosition bufPosition)
{
ASSERT(bufp->len == 0);
ASSERT(bufp->bplSize == 1);
bufp->basep = bufp->outp = datap;
bufp->avail = avail;
bufp->marked = isMarked ? avail : 0;
bufp->bufPosList[0].position = bufPosition;
bufp->bufPosList[0].len = avail;
bufp->bplGet = 0;
bufp->bplPut = 1;
}
static void
bufTransfer(Buf *destBufp, Buf *srcBufp, size_t len)
{
Boolean isSrcMarked = Boolean(srcBufp->marked != 0);
PipePosition srcPosition = srcBufp->bufPosList[srcBufp->bplGet].position;
BufPosList *bplp;
/*
* "Justify" data in buffer
*/
if (destBufp->outp != destBufp->basep) {
if (destBufp->avail != 0) {
(void) memmove(destBufp->basep, destBufp->outp, destBufp->avail);
}
destBufp->outp = destBufp->basep;
}
/*
* Justify bufPosList
*/
if (destBufp->bplGet > 0) {
if (destBufp->bplPut > destBufp->bplGet) {
(void) memmove(destBufp->bufPosList,
&destBufp->bufPosList[destBufp->bplGet],
sizeof(destBufp->bufPosList[0])
* (destBufp->bplPut - destBufp->bplGet));
}
destBufp->bplPut -= destBufp->bplGet;
destBufp->bplGet = 0;
}
/*
* Move data from src to dest
*/
ASSERT(destBufp->outp + destBufp->avail + len
<= destBufp->basep + destBufp->len);
(void) memcpy(destBufp->outp + destBufp->avail, bufGet(srcBufp, len), len);
destBufp->avail += len;
if (destBufp->marked != 0 || isSrcMarked) {
destBufp->marked += len;
}
/*
* Update bufPosList
*/
bplp = &destBufp->bufPosList[destBufp->bplPut - 1];
if (destBufp->bplPut > destBufp->bplGet
&& bplp->position + bplp->len == srcPosition) {
bplp->len += len;
} else {
if (destBufp->bplPut >= destBufp->bplSize) {
destBufp->bplSize *= 2;
RENEW(BufPosList, destBufp->bufPosList, destBufp->bplSize);
}
bplp = &destBufp->bufPosList[destBufp->bplPut++];
bplp->len = len;
bplp->position = srcPosition;
}
}
static Boolean
bufIsAtMark(Buf *bufp)
{
return Boolean(bufp->marked != 0 && bufp->avail == bufp->marked);
}
static Boolean
bufIsEmpty(Buf *bufp)
{
return Boolean(bufp->avail == 0);
}
#endif /* !defined(PIPE_HEADER) */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -