⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pipe.c

📁 Sun公司Dream项目
💻 C
📖 第 1 页 / 共 3 页
字号:
}

Boolean
pipeIsAtMark(Pipe pipe)
{
    ASSERT(pipe->usedBits == 0);
    return Boolean(bufIsAtMark(&pipe->residueBuf)
      || (bufIsEmpty(&pipe->residueBuf)
	  && bufIsAtMark(&pipe->masterBufs[pipe->curSeg])));
}

/*
 * Returns true if chunk can be got/peeked without hitting eof
 */
Boolean
pipeIsAvail(Pipe pipe, size_t size)
{
    ASSERT(pipe->usedBits == 0);
    return Boolean(pipeFillBuf(pipe, size) != NULL);
}

Boolean
pipeIsAvailUnmarked(Pipe pipe, size_t chunk)
{
    return Boolean(pipeIsAvail(pipe, chunk) && pipeAvailUnmarked(pipe) >= chunk);
}

size_t
pipeAvailUnmarked(Pipe pipe)
{
    size_t              unmarked;

    unmarked = pipe->residueBuf.avail - pipe->residueBuf.marked;
    if (pipe->residueBuf.marked == 0) {
	unmarked += pipe->masterBufs[pipe->curSeg].avail
	    - pipe->masterBufs[pipe->curSeg].marked;
    }
    return unmarked;
}

PipePosition
pipePosition(Pipe pipe)
{
    Buf *bufp;

    if ((bufp = pipeFillBuf(pipe, 1)) == NULL) {
	/*
	 * There is no position for a pipe at EOF
	 */
	longjmp(pipe->eofJmpBuf, 1);
    }
    return bufp->bufPosList[bufp->bplGet].position;
}

/*
 * Return the byte position in slave relative terms.
 */
PipePosition
pipeRelativePosition(Pipe pipe)
{
    ASSERT(pipe->usedBits == 0);
    return pipe->position;
}

/*
 * Clear crc accumulator.
 */
void
pipeCrcClear(Pipe pipe)
{
    ASSERT(pipe->usedBits == 0);
    pipe->doCrc = TRUE;
    pipe->crcSum = 0xffffffff;
}

/*
 * Get accumulated crc since last crc clear.
 */
u32
pipeCrcGet(Pipe pipe)
{
    ASSERT(pipe->usedBits == 0);
    pipe->doCrc = FALSE;
    return ~pipe->crcSum;
}

u16
pipeGetPid(Pipe pipe)
{
    return pipe->pid;
}

PipeParser
pipeGetParser(Pipe pipe)
{
    return pipe->parser;
}

/*************************************************************************
 * Private Methods
 *************************************************************************/

/*
 * Non-inlinable called from inline
 *
 * INLINE_PRIVATE void
 * pipeInlinePrivate(void)
 * {
 * }
 */

static RetCode
pipePut2(Pipe pipe, void *buf, size_t len, Boolean isMarked,
	Boolean doFlush, PipePosition bufPosition)
{
    if (pipe == PIPE_NULL) {
	return RETCODE_SUCCESS;
    }
    switch (pipe->threadState) {
    case PIPETHREADSTATE_INIT:
	/*
	 * Thread is waiting in pipeThreadWrapper for master to complete
	 * instance initialization, start thread so it can run until it's
	 * first pipeFillBuf(). NOTE: It's illegal to detect an error before
	 * the first pipeFillBuf().
	 */
	(void) pipeFlush(pipe, FALSE);
	if (pipe->threadState == PIPETHREADSTATE_ZOMBE) {
	    break;
	}
	ASSERT(pipe->threadState == PIPETHREADSTATE_RUNNING);
	/* FALLTHRU */
    case PIPETHREADSTATE_RUNNING:
	bufAssign(&pipe->masterBufs[pipe->nSegs++], (unsigned char *)buf, len, isMarked,
		bufPosition);
	if (doFlush || pipe->nSegs >= PIPE_MAX_SEGMENTS) {
	    (void) pipeFlush(pipe, doFlush);
	}
	break;
    case PIPETHREADSTATE_ZOMBE:
	break;
    default:
	ABORT("Illegal threadState");
    }
    return pipe->retCode;
}

static RetCode
pipeFlush(Pipe pipe, Boolean isForced)
{
    if (pipe == PIPE_NULL) {
	return RETCODE_SUCCESS;
    }
    if (pipe->nSegs > 0 && pipe->threadState != PIPETHREADSTATE_ZOMBE) {
	pipe->isFlush = isForced;
        CHECK_IF_ERRNO(sema_post(&pipe->slaveSema));
        CHECK_IF_ERRNO(sema_wait(&pipe->masterSema));
	if (pipe->threadState != PIPETHREADSTATE_ZOMBE) {
	    ASSERT(pipe->curSeg == pipe->nSegs);
            pipe->nSegs = 0;
	}
    }
    return pipe->retCode;
}

INLINE_PRIVATE Buf         *
pipeFillBuf2(Pipe pipe, size_t size)
{
    Buf                *bufp;

    if (pipe->isEof) {
	return NULL;
    }

    ASSERT(size <= pipe->residueBuf.len);
    ASSERT(pipe->curSeg < pipe->nSegs);
    while (size > pipe->residueBuf.avail
	   + pipe->masterBufs[pipe->curSeg].avail) {
	if (pipe->masterBufs[pipe->curSeg].avail > 0) {
	    (void) pipeFlush(pipe->lastDstPipe, TRUE);
	    bufTransfer(&pipe->residueBuf, &pipe->masterBufs[pipe->curSeg],
			pipe->masterBufs[pipe->curSeg].avail);
	}
	pipe->curSeg += 1;
	if (pipe->curSeg >= pipe->nSegs) {
	    if (pipe->isFlush) {
		(void) pipeFlush(pipe->lastDstPipe, TRUE);
	    }
	    CHECK_IF_ERRNO(sema_post(&pipe->masterSema));
	    CHECK_IF_ERRNO(sema_wait(&pipe->slaveSema));
	    ASSERT(pipe->nSegs > 0);
	    pipe->curSeg = 0;
	}
	if (pipe->masterBufs[pipe->curSeg].avail == 0) {
	    pipe->curSeg += 1;
	    ASSERT(pipe->curSeg == pipe->nSegs);
	    pipe->isEof = TRUE;
	    return NULL;
	}
	ASSERT(pipe->masterBufs[pipe->curSeg].outp
	       == pipe->masterBufs[pipe->curSeg].basep);
    }

    ASSERT(pipe->residueBuf.avail + pipe->masterBufs[pipe->curSeg].avail
	   >= size);
    if (pipe->residueBuf.avail == 0 || pipeIsPushBack(pipe)) {
	bufp = &pipe->masterBufs[pipe->curSeg];
    } else {
	if (pipe->residueBuf.avail < size) {
	    size_t              rem = size - pipe->residueBuf.avail;

	    (void) pipeFlush(pipe->lastDstPipe, TRUE);
	    bufTransfer(&pipe->residueBuf,
			&pipe->masterBufs[pipe->curSeg], rem);
	}
	bufp = &pipe->residueBuf;
    }
    return bufp;
}

static Boolean
pipeIsPushBack(Pipe pipe)
{
    Buf *masterBufp = &pipe->masterBufs[pipe->curSeg];
    BufPosList *bplp;
    size_t residueAvail;
    size_t residueMarked;

    if (pipe->residueBuf.avail > masterBufp->outp - masterBufp->basep) {
	return FALSE;
    }
    residueAvail = pipe->residueBuf.avail;
    residueMarked = pipe->residueBuf.marked;
    (void) bufGet(&pipe->residueBuf, residueAvail);
    if (residueMarked != 0) {
	ASSERT(masterBufp->marked == 0);
	masterBufp->marked = masterBufp->avail + residueMarked;
    }
    masterBufp->outp -= residueAvail;
    masterBufp->avail += residueAvail;
    ASSERT(masterBufp->bplGet < masterBufp->bplPut);
    bplp = &masterBufp->bufPosList[masterBufp->bplGet];
    bplp->len += residueAvail;
    bplp->position -= residueAvail;
    return TRUE;
}

/*************************************************************************
 * Private Functions
 *************************************************************************/

static void        *
pipeThreadWrapper(void *arg)
{
    Pipe                pipe = (Pipe) arg;
    jmp_buf             jbuf;
    RetCode		retCode;

    CHECK_IF_ERRNO(sema_wait(&pipe->slaveSema));
    pipe->threadState = PIPETHREADSTATE_RUNNING;
    pipe->eofJmpBuf = jbuf;
    if (setjmp(jbuf) != 0) {
	pipe->retCode = RETCODE_CONS(retCodeId, PIPE_ERROR_EOF);
	goto done;
    }
    pipe->retCode = (*pipe->parser) (pipe->instp, pipe->cop, pipe);
    /*
     * Flush any pending sub-pipe data.  If there's an error, it takes
     * precedence -- the data was buffered and this error actually occurred
     * earlier in the stream.
     */
    if ((retCode = pipeFlush(pipe->lastDstPipe, TRUE)) != RETCODE_SUCCESS) {
	pipe->retCode = retCode;
    }
    if (pipe->retCode == RETCODE_SUCCESS
        && pipe->residueBuf.avail + pipe->masterBufs[pipe->curSeg].avail > 0) {
	pipe->retCode = RETCODE_CONS(retCodeId, PIPE_ERROR_EXTRANEOUS_DATA);
    }
done:
    /*
     * Call parser to cleanup any child pipes, etc.
     */
    if (pipe->cleanup != NULL) {
	(*pipe->cleanup) (pipe->instp);
    }
    pipe->threadState = PIPETHREADSTATE_ZOMBE;
    CHECK_IF_ERRNO(sema_post(&pipe->masterSema));
    return NULL;
}

static Buf
bufInit(u8 *buf, size_t len, int bplSize)
{
    Buf                 aBuf;

    aBuf.basep = aBuf.outp = buf;
    aBuf.len = len;
    aBuf.avail = 0;
    aBuf.marked = 0;
    aBuf.bufPosList = NEW_ZEROED(BufPosList, bplSize);
    aBuf.bplSize = bplSize;
    aBuf.bplGet = 0;
    aBuf.bplPut = 0;
    return aBuf;
}

static void
bufAssign(Buf *bufp, u8 *datap, size_t avail, Boolean isMarked,
	PipePosition bufPosition)
{
    ASSERT(bufp->len == 0);
    ASSERT(bufp->bplSize == 1);
    bufp->basep = bufp->outp = datap;
    bufp->avail = avail;
    bufp->marked = isMarked ? avail : 0;
    bufp->bufPosList[0].position = bufPosition;
    bufp->bufPosList[0].len = avail;
    bufp->bplGet = 0;
    bufp->bplPut = 1;
}

static void
bufTransfer(Buf *destBufp, Buf *srcBufp, size_t len)
{
    Boolean isSrcMarked = Boolean(srcBufp->marked != 0);
    PipePosition srcPosition = srcBufp->bufPosList[srcBufp->bplGet].position;
    BufPosList *bplp;

    /*
     * "Justify" data in buffer
     */
    if (destBufp->outp != destBufp->basep) {
	if (destBufp->avail != 0) {
	    (void) memmove(destBufp->basep, destBufp->outp, destBufp->avail);
	}
	destBufp->outp = destBufp->basep;
    }
    /*
     * Justify bufPosList
     */
    if (destBufp->bplGet > 0) {
	if (destBufp->bplPut > destBufp->bplGet) {
	    (void) memmove(destBufp->bufPosList,
		       &destBufp->bufPosList[destBufp->bplGet],
		       sizeof(destBufp->bufPosList[0])
		       * (destBufp->bplPut - destBufp->bplGet));
	}
	destBufp->bplPut -= destBufp->bplGet;
	destBufp->bplGet = 0;
    }
    /*
     * Move data from src to dest
     */
    ASSERT(destBufp->outp + destBufp->avail + len
	    <= destBufp->basep + destBufp->len);
    (void) memcpy(destBufp->outp + destBufp->avail, bufGet(srcBufp, len), len);
    destBufp->avail += len;
    if (destBufp->marked != 0 || isSrcMarked) {
	destBufp->marked += len;
    }
    /*
     * Update bufPosList
     */
    bplp = &destBufp->bufPosList[destBufp->bplPut - 1];
    if (destBufp->bplPut > destBufp->bplGet
	&& bplp->position + bplp->len == srcPosition) {
	bplp->len += len;
    } else {
	if (destBufp->bplPut >= destBufp->bplSize) {
	    destBufp->bplSize *= 2;
	    RENEW(BufPosList, destBufp->bufPosList, destBufp->bplSize);
	}
	bplp = &destBufp->bufPosList[destBufp->bplPut++];
	bplp->len = len;
	bplp->position = srcPosition;
    }
}

static Boolean
bufIsAtMark(Buf *bufp)
{
    return Boolean(bufp->marked != 0 && bufp->avail == bufp->marked);
}

static Boolean
bufIsEmpty(Buf *bufp)
{
    return Boolean(bufp->avail == 0);
}

#endif					   /* !defined(PIPE_HEADER) */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -