📄 pipe.c
字号:
};
static RetCodeId retCodeId;
/*************************************************************************
* Class Methods
*************************************************************************/
void
pipeInit(void)
{
retCodeId = retCodeRegisterWithTable(PIPE_CLASSNAME, errorTable);
}
Pipe
pipeNew(size_t maxChunk, PipeParser parser, PipeCleanup cleanup,
void *instp, void *cop)
{
Pipe newPipe = NEW_ZEROED(struct _Pipe, 1);
int i;
if (retCodeId == 0) {
pipeInit();
}
newPipe->curSeg = 0;
newPipe->nSegs = 0;
newPipe->residueBuf = bufInit(NEW(u8, maxChunk), maxChunk, N_BUF_POS);
for (i = 0; i < NELEM(newPipe->masterBufs); i++) {
newPipe->masterBufs[i] = bufInit(NULL, 0, 1);
}
/*
* Slave code assumes that masterBufs[curSeg] is always valid,
* so create an empty segment here to meet that expectation
*/
bufAssign(&newPipe->masterBufs[newPipe->nSegs++], NULL, 0, FALSE, 0);
newPipe->position = 0LL;
newPipe->usedBits = 0;
newPipe->doCrc = FALSE;
newPipe->crcSum = 0xffffffff;
newPipe->eofJmpBuf = NULL;
newPipe->retCode = RETCODE_SUCCESS;
newPipe->lastDstPipe = PIPE_NULL;
newPipe->isFlush = FALSE;
newPipe->isEof = FALSE;
newPipe->parser = parser;
newPipe->cleanup = cleanup;
newPipe->instp = instp;
newPipe->cop = cop;
newPipe->pid = MMP_PID_NULL;
newPipe->threadState = PIPETHREADSTATE_INIT;
ABORT_IF_ERRNO(sema_init(&newPipe->masterSema, 0, USYNC_THREAD, NULL));
ABORT_IF_ERRNO(sema_init(&newPipe->slaveSema, 0, USYNC_THREAD, NULL));
#if THR_BKG_BUG != 0
ABORT_IF_ERRNO(thr_create(NULL, 0, pipeThreadWrapper, newPipe, THR_NEW_LWP,
&newPipe->threadId));
#else /* THR_BKG_BUG */
ABORT_IF_ERRNO(thr_create(NULL, 0, pipeThreadWrapper, newPipe, 0,
&newPipe->threadId));
#endif /* THR_BKG_BUG */
return newPipe;
}
/*************************************************************************
* Instance Methods
*************************************************************************/
/*
* Master-side calls
*/
/*
* Free the pipe.
*
* Returns retCode from thread.
*/
RetCode
pipeFree(Pipe pipe)
{
RetCode retCode;
int i;
if (pipe == PIPE_NULL) {
return RETCODE_SUCCESS;
}
retCode = pipeEof(pipe);
ABORT_IF_ERRNO(thr_join(pipe->threadId, NULL, NULL));
free(pipe->residueBuf.basep);
free(pipe->residueBuf.bufPosList);
for (i = 0; i < NELEM(pipe->masterBufs); i++) {
free(pipe->masterBufs[i].bufPosList);
}
ABORT_IF_ERRNO(sema_destroy(&pipe->masterSema));
ABORT_IF_ERRNO(sema_destroy(&pipe->slaveSema));
free(pipe);
return retCode;
}
/*
* Puts len bytes for buf into stream and gives control to thread.
* Should ONLY be called by original data source, not by a parser.
*/
RetCode
pipePut(Pipe pipe, void *buf, size_t len, Boolean isMarked,
PipePosition putPosition)
{
return pipePut2(pipe, buf, len, isMarked, TRUE, putPosition);
}
RetCode
pipeTransfer(Pipe dstPipe, Pipe srcPipe, size_t len,
Boolean isMarked, Boolean doFlush)
{
RetCode retCode = RETCODE_SUCCESS;
if (srcPipe->lastDstPipe != dstPipe
&& (retCode = pipeFlush(srcPipe->lastDstPipe, TRUE))
!= RETCODE_SUCCESS) {
goto done;
}
if (dstPipe == PIPE_NULL) {
pipeSkip(srcPipe, len);
goto done;
}
srcPipe->lastDstPipe = dstPipe;
while (len != 0) {
Buf *bufp;
size_t chunk;
PipePosition bufPosition;
if ((bufp = pipeFillBuf(srcPipe, 1)) == NULL) {
longjmp(srcPipe->eofJmpBuf, 1);
}
chunk = MIN(len, bufp->avail);
ASSERT(bufp->bplGet < bufp->bplPut);
ASSERT(bufp->bufPosList[bufp->bplGet].len != 0);
bufPosition = bufp->bufPosList[bufp->bplGet].position;
if ((retCode = pipePut2(dstPipe, pipeGet(srcPipe, chunk), chunk,
isMarked, doFlush, bufPosition))
!= RETCODE_SUCCESS) {
break;
}
isMarked = FALSE;
len -= chunk;
}
done:
return retCode;
}
// This is a temporary function where the dst pipe is pipeNUll
// and hence the pipe skip goes into a file file
RetCode
pipeTransfer2(Pipe dstPipe, Pipe srcPipe, size_t len,
Boolean isMarked, Boolean doFlush, int file)
{
RetCode retCode = RETCODE_SUCCESS;
u8 *p;
while (len != 0) {
Buf *bufp;
size_t chunk;
PipePosition bufPosition;
if ((bufp = pipeFillBuf(srcPipe, 1)) == NULL) {
longjmp(srcPipe->eofJmpBuf, 1);
}
chunk = MIN(len, bufp->avail);
ASSERT(bufp->bplGet < bufp->bplPut);
ASSERT(bufp->bufPosList[bufp->bplGet].len != 0);
bufPosition = bufp->bufPosList[bufp->bplGet].position;
p = pipeGet(srcPipe,chunk);
fwrite(p,1,chunk, (FILE*)file);
/* if ((retCode = pipePut2(dstPipe, pipeGet(srcPipe, chunk), chunk,
isMarked, doFlush, bufPosition))
!= RETCODE_SUCCESS) {
break;
}
*/
isMarked = FALSE;
len -= chunk;
}
done:
return retCode;
}
void
pipeSetPid(Pipe pipe, u16 pid)
{
if (pipe != PIPE_NULL) {
pipe->pid = pid;
}
}
/*
* Force all data placed onto child pipes to be parsed.
* NOTE: This is called on the parent pipe!
*/
RetCode
pipeSync(Pipe pipe)
{
return pipeFlush(pipe->lastDstPipe, TRUE);
}
RetCode
pipeEof(Pipe pipe)
{
RetCode retCode;
if (pipe == PIPE_NULL) {
return RETCODE_SUCCESS;
}
retCode = pipePut2(pipe, NULL, 0, FALSE, TRUE, 0);
ASSERT(pipe->threadState == PIPETHREADSTATE_ZOMBE);
return retCode;
}
void
pipeRecover(Pipe pipe)
{
if (pipe == PIPE_NULL) {
return;
}
(void) pipeEof(pipe);
ABORT_IF_ERRNO(thr_join(pipe->threadId, NULL, NULL));
pipeByteAlign(pipe);
pipe->doCrc = FALSE;
pipe->retCode = RETCODE_SUCCESS;
pipe->threadState = PIPETHREADSTATE_INIT;
ASSERT (sema_trywait(&pipe->masterSema) == EBUSY);
ASSERT (sema_trywait(&pipe->slaveSema) == EBUSY);
#if THR_BKG_BUG != 0
ABORT_IF_ERRNO(thr_create(NULL, 0, pipeThreadWrapper, pipe, THR_BOUND,
&pipe->threadId));
#else /* THR_BKG_BUG */
ABORT_IF_ERRNO(thr_create(NULL, 0, pipeThreadWrapper, pipe, 0,
&pipe->threadId));
#endif /* THR_BKG_BUG */
}
/*
* Thread side calls
*/
RetCode
pipeSkipStuffBytes(Pipe pipe, size_t nStuffBytes, u8 stuffByte)
{
size_t osize = nStuffBytes;
size_t rem;
u8 *p;
ASSERT(pipe->usedBits == 0);
while (nStuffBytes != 0) {
Buf *bufp;
if ((bufp = pipeFillBuf(pipe, 1)) == NULL) {
longjmp(pipe->eofJmpBuf, 1);
}
rem = MIN(nStuffBytes, bufp->avail);
nStuffBytes -= rem;
p = bufGet(bufp, rem);
if (pipe->doCrc) {
pipeCrcSum(pipe, p, rem);
}
while (rem != 0) {
if (*p++ != stuffByte) {
return RETCODE_CONS(retCodeId, PIPE_ERROR_STUFF_BYTE);
}
rem -= 1;
}
}
pipe->position += osize;
return RETCODE_SUCCESS;
}
u32
pipeGetBits(Pipe pipe, size_t nBits)
{
u32 bits = pipePeekBits(pipe, nBits);
pipeSkipBits(pipe, nBits);
return bits;
}
u32
pipePeekBits(Pipe pipe, size_t nBits)
{
size_t gBits = pipe->usedBits + nBits;
size_t bytes = (gBits + 7) / 8;
u8 *p;
u32 buf;
Buf *bufp;
ASSERT(nBits <= 32);
if ((bufp = pipeFillBuf(pipe, bytes)) == NULL) {
longjmp(pipe->eofJmpBuf, 1);
}
p = bufp->outp;
if (gBits > 8) {
buf = 0;
do {
buf = (buf << 8) | *p++;
gBits -= 8;
} while (gBits > 8);
buf = (buf << gBits) | (*p >> (8 - gBits));
} else {
buf = *p >> (8 - gBits);
}
return buf & bitMask[nBits];
}
Boolean
pipeIsNextBits(Pipe pipe, PipeBits pipeBits)
{
return Boolean(pipeBits.bitPattern == pipePeekBits(pipe, pipeBits.nBits));
}
void
pipeSkipBits(Pipe pipe, size_t nBits)
{
pipe->usedBits += nBits;
if (pipe->usedBits >= 8) {
size_t usedBytes = pipe->usedBits / 8;
size_t usedBits = pipe->usedBits & 7;
pipe->usedBits = 0;
pipeSkip(pipe, usedBytes);
pipe->usedBits = usedBits;
}
}
void
pipePeekByteBlock(Pipe pipe, u8 *buf, size_t nBytes)
{
u8 *p;
Buf *bufp;
if (pipe->usedBits == 0) {
p = pipePeek(pipe, nBytes);
while (nBytes-- > 0) {
*buf++ = *p++;
}
} else {
u8 oldByte;
if ((bufp = pipeFillBuf(pipe, nBytes + 1)) == NULL) {
longjmp(pipe->eofJmpBuf, 1);
}
p = bufp->outp;
oldByte = *p++;
while (nBytes-- > 0) {
u8 newByte = *p++;
*buf++ = (oldByte << (pipe->usedBits))
| (newByte >> (8 - pipe->usedBits));
oldByte = newByte;
}
}
}
void
pipeGetByteBlock(Pipe pipe, u8 *buf, size_t nBytes)
{
pipePeekByteBlock(pipe, buf, nBytes);
pipeSkipBits(pipe, nBytes * 8);
}
void
pipeByteAlign(Pipe pipe)
{
if (pipe->usedBits != 0) {
pipe->usedBits = 0;
pipeSkip(pipe, 1);
}
}
void
pipeFindMark(Pipe pipe)
{
ASSERT(pipe->usedBits == 0);
while (!pipeIsAtMark(pipe)) {
if (pipeFillBuf(pipe, 1) == NULL) {
longjmp(pipe->eofJmpBuf, 1);
}
pipeSkip(pipe, pipeAvailUnmarked(pipe));
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -