⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 iothread.cpp

📁 缓冲 缓冲 缓冲 缓冲 缓冲 缓冲
💻 CPP
字号:
// IOThread.cpp: implementation of the IOThread class.
//
//////////////////////////////////////////////////////////////////////

#include "stdafx.h"
#include "MultiBuffer.h"
#include "IOThread.h"
#ifdef WIN32
#include <Process.h>
#include <afxmt.h>
#else
#include <pthread.h>
#endif

#ifdef _DEBUG
#undef THIS_FILE
static char THIS_FILE[]=__FILE__;
#define new DEBUG_NEW
#endif

//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////
/*
#ifdef WIN32
CCriticalSection runningSync;
#else
pthread_mutext_t runningSync;
#endif
volatile bool running = true;
*/
static unsigned int _stdcall InputFunc(void *pVoid); //input thread function
static unsigned int _stdcall OutputFunc(void *pVoid); //output thread function

typedef struct
{
	InputStream * in;
	BufferQueue *bufQueue;
}InputThreadParam;

typedef struct
{
	OutputStream * out;
	BufferQueue *bufQueue;
}OutputThreadParam;

//static char *content ="abc";


IOThread::IOThread(InputStream *in, OutputStream *out):input(in),output(out)
{
}

IOThread::~IOThread()
{
}

bool IOThread::start()
{
	BufferQueue bufQ;
	HANDLE ThreadHandle[2];
	unsigned inputThreadID;
	unsigned outputThreadID;
	InputThreadParam *inputParam = new InputThreadParam;
	inputParam->in = input;
	inputParam->bufQueue = &bufQ;
	ThreadHandle[0] = (HANDLE)_beginthreadex(NULL,0,InputFunc,(void *)inputParam,0,&inputThreadID);
	if(ThreadHandle[0]==0)
	{
		printf("Create Input Thread Fail\n");
		return false;
	}
	OutputThreadParam *outputParam = new OutputThreadParam;
	outputParam->out = output;
	outputParam->bufQueue = &bufQ;
	ThreadHandle[1] = (HANDLE)_beginthreadex(NULL,0,OutputFunc,(void *)outputParam,0,&outputThreadID);
	if(ThreadHandle[1]==0)
	{
		printf("Create Output Thread Fail\n");
		return false;
	}
	while(WaitForMultipleObjects(2,ThreadHandle,TRUE,INFINITE)!=WAIT_OBJECT_0);
	printf("Input and Output Thread exit\n");
	CloseHandle(ThreadHandle[0]);
	CloseHandle(ThreadHandle[1]);
	return true;
}

static unsigned int _stdcall InputFunc(void *pVoid)
{
	InputThreadParam *inputParam = (InputThreadParam *)pVoid;
	InputStream *in = inputParam->in;
	BufferQueue *bufQ = inputParam->bufQueue;
	delete inputParam;
	bool IOStop;
	bool needToMoveWriteBuffer;
	//char *p = content; //for test
	int totalBytesRead = 0;
	do
	{
		/////////////////////////
		IOStop = false;
		needToMoveWriteBuffer = true;
		Buffer *buf = bufQ->getWriteBuffer(); 
		if(!buf)break;
		buf->lockBuf();
		int availableSpace = buf->getAvailableSpace();
		int size = buf->getSize();
		void *buffer = buf->getBuf();
		int readSize = in->read((void *)((char *)buffer+size*sizeof(char)),availableSpace); //read data from inputstream and put it to buffer
	    ///////////for test///////////////////////
		/*
		int readSize = 0;
		if(!*p)p=content;
		char *to = (char *)((char *)buffer+size);
		while(*p&&availableSpace>0)
		{readSize++;availableSpace--;*to++=*p++;}
		if(totalTimes<3)
		{
			IOStop = true;
			buf->setTag(STOP_ON_IO_END);
		}
		*/
		///////////for test///////////////////////
		printf("InputFunc readSize:%d\n",readSize);
		if(readSize==0)
		{
			IOStop = true;
			buf->setTag(STOP_ON_IO_END);
		}else if(readSize<0)
		{
			IOStop = true;
			buf->setTag(STOP_ON_IO_ERROR);
		}
		else
		{
			buf->setSize(size+readSize);
			needToMoveWriteBuffer =  true;
			totalBytesRead+=readSize;
		}
		buf->unlockBuf();
		/////////////////////////
		if(IOStop)
		{
			//reach the end of input stream or an error occurs,notify output thread and exit
			break;
		}
		
		bufQ->moveWriteBuffer(needToMoveWriteBuffer);
		//Sleep(200);
	
	}while(true);

	//check if it is possible that output thread is waiting
	bufQ->invalidate();
	bufQ->moveWriteBuffer(false);
	printf("Read %d bytes in total\n",totalBytesRead);
	printf("Input Thread exits\n");
	return 0;
}

static unsigned int _stdcall OutputFunc(void *pVoid)
{
	OutputThreadParam *outputParam = (OutputThreadParam *)pVoid;
	OutputStream *out = outputParam->out;
	BufferQueue *bufQ = outputParam->bufQueue;
	delete outputParam;
	bool IOStop;
	bool quitReadIO;
	bool needToMoveReadBuffer;
	int totalBytesWritten = 0;
	do{
		/////////////////////////
		IOStop = false;
		quitReadIO = false;
		needToMoveReadBuffer = true;
		Buffer *buf = bufQ->getReadBuffer();
		if(!buf)break;

		buf->lockBuf();
		if(buf->getTag()!=PROCEED_IO)
		{
			IOStop = true;
			break;
		}
		int size = buf->getSize();
		void *buffer = buf->getBuf();
		printf("Read buffer size:%d\n",size);
		int writeSize = 0;
		if(size>0)
			writeSize = out->write(buffer,size); //write data from buffer to outputstream
		///////////for test///////////////////////
		/*
		char* p = (char *)buffer;
		for(int i=0;i<size;i++,p++)
			printf("%c",*p);
		printf("\n");
		*/
		///////////for test///////////////////////
		printf("OutputFunc writeSize=%d\n",writeSize);
		if(writeSize<0)
		{
			IOStop = true;
			buf->setTag(STOP_ON_IO_ERROR);
		}
		else
		{
			buf->setSize(0);
			totalBytesWritten+=writeSize;
		}

        buf->unlockBuf();
		bufQ->moveReadBuffer(needToMoveReadBuffer);
		/////////////////////////
		if(IOStop)
		{
			//reach the end of input stream or an error occurs,notify output thread and exit
			break;
		}

		//Sleep(2);

	}while(true);

	bufQ->invalidate();
	bufQ->moveReadBuffer(false);
	printf("Write %d bytes in total\n",totalBytesWritten);
	printf("Output Thread exits\n");
	return 0;

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -