⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pipe.c

📁 Sun公司Dream项目
💻 C
📖 第 1 页 / 共 3 页
字号:
};

static RetCodeId    retCodeId;

/*************************************************************************
 * Class Methods
 *************************************************************************/

void
pipeInit(void)
{
    retCodeId = retCodeRegisterWithTable(PIPE_CLASSNAME, errorTable);
}

Pipe
pipeNew(size_t maxChunk, PipeParser parser, PipeCleanup cleanup,
	void *instp, void *cop)
{
    Pipe                newPipe = NEW_ZEROED(struct _Pipe, 1);
    int			i;

    if (retCodeId == 0) {
	pipeInit();
    }

    newPipe->curSeg = 0;
    newPipe->nSegs = 0;
    newPipe->residueBuf = bufInit(NEW(u8, maxChunk), maxChunk, N_BUF_POS);
    for (i = 0; i < NELEM(newPipe->masterBufs); i++) {
	newPipe->masterBufs[i] = bufInit(NULL, 0, 1);
    }
    /*
     * Slave code assumes that masterBufs[curSeg] is always valid,
     * so create an empty segment here to meet that expectation
     */
    bufAssign(&newPipe->masterBufs[newPipe->nSegs++], NULL, 0, FALSE, 0);
    newPipe->position = 0LL;
    newPipe->usedBits = 0;
    newPipe->doCrc = FALSE;
    newPipe->crcSum = 0xffffffff;
    newPipe->eofJmpBuf = NULL;
    newPipe->retCode = RETCODE_SUCCESS;
    newPipe->lastDstPipe = PIPE_NULL;
    newPipe->isFlush = FALSE;
    newPipe->isEof = FALSE;
    newPipe->parser = parser;
    newPipe->cleanup = cleanup;
    newPipe->instp = instp;
    newPipe->cop = cop;
    newPipe->pid = MMP_PID_NULL;
    newPipe->threadState = PIPETHREADSTATE_INIT;
    ABORT_IF_ERRNO(sema_init(&newPipe->masterSema, 0, USYNC_THREAD, NULL));
    ABORT_IF_ERRNO(sema_init(&newPipe->slaveSema, 0, USYNC_THREAD, NULL));
#if	THR_BKG_BUG != 0
    ABORT_IF_ERRNO(thr_create(NULL, 0, pipeThreadWrapper, newPipe, THR_NEW_LWP,
			  &newPipe->threadId));
#else					   /* THR_BKG_BUG */
    ABORT_IF_ERRNO(thr_create(NULL, 0, pipeThreadWrapper, newPipe, 0,
			  &newPipe->threadId));
#endif					   /* THR_BKG_BUG */
    return newPipe;
}

/*************************************************************************
 * Instance Methods
 *************************************************************************/


/*
 * Master-side calls
 */

/*
 * Free the pipe.
 * 
 * Returns retCode from thread.
 */
RetCode
pipeFree(Pipe pipe)
{
    RetCode             retCode;
    int			i;

    if (pipe == PIPE_NULL) {
	return RETCODE_SUCCESS;
    }
    retCode = pipeEof(pipe);
    ABORT_IF_ERRNO(thr_join(pipe->threadId, NULL, NULL));
    free(pipe->residueBuf.basep);
    free(pipe->residueBuf.bufPosList);
    for (i = 0; i < NELEM(pipe->masterBufs); i++) {
	free(pipe->masterBufs[i].bufPosList);
    }
    ABORT_IF_ERRNO(sema_destroy(&pipe->masterSema));
    ABORT_IF_ERRNO(sema_destroy(&pipe->slaveSema));
    free(pipe);
    return retCode;
}

/*
 * Puts len bytes for buf into stream and gives control to thread.
 * Should ONLY be called by original data source, not by a parser.
 */
RetCode
pipePut(Pipe pipe, void *buf, size_t len, Boolean isMarked,
	PipePosition putPosition)
{
    return pipePut2(pipe, buf, len, isMarked, TRUE, putPosition);
}

RetCode
pipeTransfer(Pipe dstPipe, Pipe srcPipe, size_t len,
	     Boolean isMarked, Boolean doFlush)
{
    RetCode retCode = RETCODE_SUCCESS;

    if (srcPipe->lastDstPipe != dstPipe
	&& (retCode = pipeFlush(srcPipe->lastDstPipe, TRUE))
		!= RETCODE_SUCCESS) {
	goto done;
    }
    if (dstPipe == PIPE_NULL) {
	pipeSkip(srcPipe, len);
	goto done;
    }
    srcPipe->lastDstPipe = dstPipe;
    while (len != 0) {
	Buf *bufp;
	size_t	chunk;
	PipePosition bufPosition;

	if ((bufp = pipeFillBuf(srcPipe, 1)) == NULL) {
	    longjmp(srcPipe->eofJmpBuf, 1);
	}
	chunk = MIN(len, bufp->avail);
	ASSERT(bufp->bplGet < bufp->bplPut);
	ASSERT(bufp->bufPosList[bufp->bplGet].len != 0);
	bufPosition = bufp->bufPosList[bufp->bplGet].position;
	if ((retCode = pipePut2(dstPipe, pipeGet(srcPipe, chunk), chunk,
			        isMarked, doFlush, bufPosition))
		!= RETCODE_SUCCESS) {
	    break;
	}
	isMarked = FALSE;
	len -= chunk;
    }
done:
    return retCode;
}

// This is a temporary function where the dst pipe is pipeNUll
// and hence the pipe skip goes into a file file 
RetCode
pipeTransfer2(Pipe dstPipe, Pipe srcPipe, size_t len,
	     Boolean isMarked, Boolean doFlush, int file)
{
    RetCode retCode = RETCODE_SUCCESS;
    u8 *p;
    while (len != 0) {
	Buf *bufp;
	size_t	chunk;
	PipePosition bufPosition;

	if ((bufp = pipeFillBuf(srcPipe, 1)) == NULL) {
	    longjmp(srcPipe->eofJmpBuf, 1);
	}
	chunk = MIN(len, bufp->avail);
	ASSERT(bufp->bplGet < bufp->bplPut);
	ASSERT(bufp->bufPosList[bufp->bplGet].len != 0);
	bufPosition = bufp->bufPosList[bufp->bplGet].position;
	

	p = pipeGet(srcPipe,chunk);
	fwrite(p,1,chunk, (FILE*)file);
/*	if ((retCode = pipePut2(dstPipe, pipeGet(srcPipe, chunk), chunk,
			        isMarked, doFlush, bufPosition))
		!= RETCODE_SUCCESS) {
	    break;
	}
*/
	isMarked = FALSE;
	len -= chunk;
    }
done:
    return retCode;
}

void
pipeSetPid(Pipe pipe, u16 pid)
{
    if (pipe != PIPE_NULL) {
	pipe->pid = pid;
    }
}

/*
 * Force all data placed onto child pipes to be parsed.
 * NOTE: This is called on the parent pipe!
 */
RetCode
pipeSync(Pipe pipe)
{
    return pipeFlush(pipe->lastDstPipe, TRUE);
}

RetCode
pipeEof(Pipe pipe)
{
    RetCode             retCode;

    if (pipe == PIPE_NULL) {
	return RETCODE_SUCCESS;
    }
    retCode = pipePut2(pipe, NULL, 0, FALSE, TRUE, 0);
    ASSERT(pipe->threadState == PIPETHREADSTATE_ZOMBE);
    return retCode;
}

void
pipeRecover(Pipe pipe)
{
    if (pipe == PIPE_NULL) {
	return;
    }
    (void) pipeEof(pipe);

    ABORT_IF_ERRNO(thr_join(pipe->threadId, NULL, NULL));

    pipeByteAlign(pipe);
    pipe->doCrc = FALSE;
    pipe->retCode = RETCODE_SUCCESS;
    pipe->threadState = PIPETHREADSTATE_INIT;
    ASSERT (sema_trywait(&pipe->masterSema) == EBUSY);
    ASSERT (sema_trywait(&pipe->slaveSema) == EBUSY);
#if	THR_BKG_BUG != 0
    ABORT_IF_ERRNO(thr_create(NULL, 0, pipeThreadWrapper, pipe, THR_BOUND,
			  &pipe->threadId));
#else					   /* THR_BKG_BUG */
    ABORT_IF_ERRNO(thr_create(NULL, 0, pipeThreadWrapper, pipe, 0,
			  &pipe->threadId));
#endif					   /* THR_BKG_BUG */
}

/*
 * Thread side calls
 */

RetCode
pipeSkipStuffBytes(Pipe pipe, size_t nStuffBytes, u8 stuffByte)
{
    size_t              osize = nStuffBytes;
    size_t              rem;
    u8                 *p;

    ASSERT(pipe->usedBits == 0);
    while (nStuffBytes != 0) {
        Buf		       *bufp;

	if ((bufp = pipeFillBuf(pipe, 1)) == NULL) {
	    longjmp(pipe->eofJmpBuf, 1);
	}

	rem = MIN(nStuffBytes, bufp->avail);
	nStuffBytes -= rem;

	p = bufGet(bufp, rem);
	if (pipe->doCrc) {
	    pipeCrcSum(pipe, p, rem);
	}

	while (rem != 0) {
	    if (*p++ != stuffByte) {
		return RETCODE_CONS(retCodeId, PIPE_ERROR_STUFF_BYTE);
	    }
	    rem -= 1;
	}
    }
    pipe->position += osize;
    return RETCODE_SUCCESS;
}

u32
pipeGetBits(Pipe pipe, size_t nBits)
{
    u32                 bits = pipePeekBits(pipe, nBits);

    pipeSkipBits(pipe, nBits);
    return bits;
}

u32
pipePeekBits(Pipe pipe, size_t nBits)
{
    size_t              gBits = pipe->usedBits + nBits;
    size_t              bytes = (gBits + 7) / 8;
    u8                 *p;
    u32                 buf;
    Buf                *bufp;

    ASSERT(nBits <= 32);
    if ((bufp = pipeFillBuf(pipe, bytes)) == NULL) {
	longjmp(pipe->eofJmpBuf, 1);
    }
    p = bufp->outp;
    if (gBits > 8) {
	buf = 0;
	do {
	    buf = (buf << 8) | *p++;
	    gBits -= 8;
	} while (gBits > 8);
	buf = (buf << gBits) | (*p >> (8 - gBits));
    } else {
	buf = *p >> (8 - gBits);
    }
    return buf & bitMask[nBits];
}

Boolean
pipeIsNextBits(Pipe pipe, PipeBits pipeBits)
{
    return Boolean(pipeBits.bitPattern == pipePeekBits(pipe, pipeBits.nBits));
}

void
pipeSkipBits(Pipe pipe, size_t nBits)
{
    pipe->usedBits += nBits;
    if (pipe->usedBits >= 8) {
	size_t usedBytes = pipe->usedBits / 8;
	size_t usedBits = pipe->usedBits & 7;

	pipe->usedBits = 0;
	pipeSkip(pipe, usedBytes);
	pipe->usedBits = usedBits;
    }
}

void
pipePeekByteBlock(Pipe pipe, u8 *buf, size_t nBytes)
{
    u8                 *p;
    Buf                *bufp;

    if (pipe->usedBits == 0) {
	p = pipePeek(pipe, nBytes);
	while (nBytes-- > 0) {
	    *buf++ = *p++;
	}
    } else {
	u8                  oldByte;

	if ((bufp = pipeFillBuf(pipe, nBytes + 1)) == NULL) {
	    longjmp(pipe->eofJmpBuf, 1);
	}
	p = bufp->outp;
	oldByte = *p++;
	while (nBytes-- > 0) {
	    u8                  newByte = *p++;

	    *buf++ = (oldByte << (pipe->usedBits))
	      | (newByte >> (8 - pipe->usedBits));
	    oldByte = newByte;
	}
    }
}

void
pipeGetByteBlock(Pipe pipe, u8 *buf, size_t nBytes)
{
    pipePeekByteBlock(pipe, buf, nBytes);
    pipeSkipBits(pipe, nBytes * 8);
}

void
pipeByteAlign(Pipe pipe)
{
    if (pipe->usedBits != 0) {
	pipe->usedBits = 0;
	pipeSkip(pipe, 1);
    }
}

void
pipeFindMark(Pipe pipe)
{
    ASSERT(pipe->usedBits == 0);
    while (!pipeIsAtMark(pipe)) {
	if (pipeFillBuf(pipe, 1) == NULL) {
	    longjmp(pipe->eofJmpBuf, 1);
	}
	pipeSkip(pipe, pipeAvailUnmarked(pipe));
    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -