📄 iothread.cpp
字号:
// IOThread.cpp: implementation of the IOThread class.
//
//////////////////////////////////////////////////////////////////////
#include "stdafx.h"
#include "MultiBuffer.h"
#include "IOThread.h"
#ifdef WIN32
#include <Process.h>
#include <afxmt.h>
#else
#include <pthread.h>
#endif
#ifdef _DEBUG
#undef THIS_FILE
static char THIS_FILE[]=__FILE__;
#define new DEBUG_NEW
#endif
//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////
/*
#ifdef WIN32
CCriticalSection runningSync;
#else
pthread_mutext_t runningSync;
#endif
volatile bool running = true;
*/
static unsigned int _stdcall InputFunc(void *pVoid); //input thread function
static unsigned int _stdcall OutputFunc(void *pVoid); //output thread function
typedef struct
{
InputStream * in;
BufferQueue *bufQueue;
}InputThreadParam;
typedef struct
{
OutputStream * out;
BufferQueue *bufQueue;
}OutputThreadParam;
//static char *content ="abc";
IOThread::IOThread(InputStream *in, OutputStream *out):input(in),output(out)
{
}
IOThread::~IOThread()
{
}
bool IOThread::start()
{
BufferQueue bufQ;
HANDLE ThreadHandle[2];
unsigned inputThreadID;
unsigned outputThreadID;
InputThreadParam *inputParam = new InputThreadParam;
inputParam->in = input;
inputParam->bufQueue = &bufQ;
ThreadHandle[0] = (HANDLE)_beginthreadex(NULL,0,InputFunc,(void *)inputParam,0,&inputThreadID);
if(ThreadHandle[0]==0)
{
printf("Create Input Thread Fail\n");
return false;
}
OutputThreadParam *outputParam = new OutputThreadParam;
outputParam->out = output;
outputParam->bufQueue = &bufQ;
ThreadHandle[1] = (HANDLE)_beginthreadex(NULL,0,OutputFunc,(void *)outputParam,0,&outputThreadID);
if(ThreadHandle[1]==0)
{
printf("Create Output Thread Fail\n");
return false;
}
while(WaitForMultipleObjects(2,ThreadHandle,TRUE,INFINITE)!=WAIT_OBJECT_0);
printf("Input and Output Thread exit\n");
CloseHandle(ThreadHandle[0]);
CloseHandle(ThreadHandle[1]);
return true;
}
static unsigned int _stdcall InputFunc(void *pVoid)
{
InputThreadParam *inputParam = (InputThreadParam *)pVoid;
InputStream *in = inputParam->in;
BufferQueue *bufQ = inputParam->bufQueue;
delete inputParam;
bool IOStop;
bool needToMoveWriteBuffer;
//char *p = content; //for test
int totalBytesRead = 0;
do
{
/////////////////////////
IOStop = false;
needToMoveWriteBuffer = true;
Buffer *buf = bufQ->getWriteBuffer();
if(!buf)break;
buf->lockBuf();
int availableSpace = buf->getAvailableSpace();
int size = buf->getSize();
void *buffer = buf->getBuf();
int readSize = in->read((void *)((char *)buffer+size*sizeof(char)),availableSpace); //read data from inputstream and put it to buffer
///////////for test///////////////////////
/*
int readSize = 0;
if(!*p)p=content;
char *to = (char *)((char *)buffer+size);
while(*p&&availableSpace>0)
{readSize++;availableSpace--;*to++=*p++;}
if(totalTimes<3)
{
IOStop = true;
buf->setTag(STOP_ON_IO_END);
}
*/
///////////for test///////////////////////
printf("InputFunc readSize:%d\n",readSize);
if(readSize==0)
{
IOStop = true;
buf->setTag(STOP_ON_IO_END);
}else if(readSize<0)
{
IOStop = true;
buf->setTag(STOP_ON_IO_ERROR);
}
else
{
buf->setSize(size+readSize);
needToMoveWriteBuffer = true;
totalBytesRead+=readSize;
}
buf->unlockBuf();
/////////////////////////
if(IOStop)
{
//reach the end of input stream or an error occurs,notify output thread and exit
break;
}
bufQ->moveWriteBuffer(needToMoveWriteBuffer);
//Sleep(200);
}while(true);
//check if it is possible that output thread is waiting
bufQ->invalidate();
bufQ->moveWriteBuffer(false);
printf("Read %d bytes in total\n",totalBytesRead);
printf("Input Thread exits\n");
return 0;
}
static unsigned int _stdcall OutputFunc(void *pVoid)
{
OutputThreadParam *outputParam = (OutputThreadParam *)pVoid;
OutputStream *out = outputParam->out;
BufferQueue *bufQ = outputParam->bufQueue;
delete outputParam;
bool IOStop;
bool quitReadIO;
bool needToMoveReadBuffer;
int totalBytesWritten = 0;
do{
/////////////////////////
IOStop = false;
quitReadIO = false;
needToMoveReadBuffer = true;
Buffer *buf = bufQ->getReadBuffer();
if(!buf)break;
buf->lockBuf();
if(buf->getTag()!=PROCEED_IO)
{
IOStop = true;
break;
}
int size = buf->getSize();
void *buffer = buf->getBuf();
printf("Read buffer size:%d\n",size);
int writeSize = 0;
if(size>0)
writeSize = out->write(buffer,size); //write data from buffer to outputstream
///////////for test///////////////////////
/*
char* p = (char *)buffer;
for(int i=0;i<size;i++,p++)
printf("%c",*p);
printf("\n");
*/
///////////for test///////////////////////
printf("OutputFunc writeSize=%d\n",writeSize);
if(writeSize<0)
{
IOStop = true;
buf->setTag(STOP_ON_IO_ERROR);
}
else
{
buf->setSize(0);
totalBytesWritten+=writeSize;
}
buf->unlockBuf();
bufQ->moveReadBuffer(needToMoveReadBuffer);
/////////////////////////
if(IOStop)
{
//reach the end of input stream or an error occurs,notify output thread and exit
break;
}
//Sleep(2);
}while(true);
bufQ->invalidate();
bufQ->moveReadBuffer(false);
printf("Write %d bytes in total\n",totalBytesWritten);
printf("Output Thread exits\n");
return 0;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -