⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 procpool.cpp

📁 自动调整大小的进程池类的实现。 一个运用进程池的server类。 一个socket类。
💻 CPP
字号:
#include "procpool.h"

//全局变量,应用程序需用extern声明此值进行程序控制
volatile sig_atomic_t  QuitFlag=0;

SimpleLog *ProcPool::__logfile=NULL;

ProcPool::ProcPool(SimpleLog &logfile,int& max,int& init,int &low,int& high,int& inc,int& dec)
{
	__logfile= &logfile;
	if(max<=0||max<init||low<high||inc<=0||dec<=0)
		warn();
	MaxNum=max;
	InitNum=init;
	FreeLowNum=low;
	FreeHighNum=high;
	IncNum=inc;
	DecNum=dec;
	chld_num=0;
	chld_avail=0;
	selectTimeOut=0;
	if(MaxNum>0)
	{
		CldInfo=new ChildInfo[MaxNum];
		assert(CldInfo!=NULL);
	}
	else
	{
		CldInfo=NULL;
	}
	_acceptlock=new semaphore( );
	assert(_acceptlock!=NULL);
	__logfile->Userlog(DEBUG,"ProcPool Constructor with 7 param,Max[%d],Init[%d],FreeLow[%d],FreeHigh[%d],Inc[%d], \
	Dec[%d]",MaxNum,InitNum,FreeLowNum,FreeHighNum,IncNum,DecNum);
	
}
ProcPool::ProcPool(SimpleLog &logfile,int& max,int& init,int &low,int& high,int& inc,int& dec,int &timeout)
{
	__logfile= &logfile;
	if(max<=0||max<init||low<high||inc<=0||dec<=0)
		warn();
	MaxNum=max;
	InitNum=init;
	FreeLowNum=low;
	FreeHighNum=high;
	IncNum=inc;
	DecNum=dec;
	chld_num=0;
	chld_avail=0;
	selectTimeOut=timeout;
	if(MaxNum>0)
	{
		CldInfo=new ChildInfo[MaxNum];
		assert(CldInfo!=NULL);
	}
	else
	{
		CldInfo=NULL;
	}
	_acceptlock=new semaphore( );
	assert(_acceptlock!=NULL);
	__logfile->Userlog(DEBUG,"ProcPool Constructor with 8 param,Max[%d],Init[%d],FreeLow[%d],FreeHigh[%d],Inc[%d], \
	Dec[%d],TimeOut[%d]",MaxNum,InitNum,FreeLowNum,FreeHighNum,IncNum,DecNum,selectTimeOut);
}

ProcPool::ProcPool(SimpleLog &logfile)
{
	__logfile= &logfile;
	MaxNum=DEFAULT_MAXNUM;
	InitNum=DEFAULT_INITNUM;
	FreeLowNum=DEFAULT_FREELOWNUM;
	FreeHighNum=DEFAULT_FREEHIGHNUM;
	IncNum=DEFAULT_INCNUM;
	DecNum=DEFAULT_DECNUM;
	chld_num=0;
	chld_avail=0;
	selectTimeOut=0;
	if(MaxNum>0)
	{
		CldInfo=new ChildInfo[MaxNum];
		assert(CldInfo!=NULL);
	}
	else
	{
		CldInfo=NULL;
	}
	_acceptlock=new semaphore( );
	assert(_acceptlock!=NULL);
	__logfile->Userlog(DEBUG,"ProcPool Constructor with default param");
}

ProcPool::~ProcPool()
{
	KillAll();
	sleep(2);
	delete CldInfo;
	CldInfo=NULL;
	__logfile->Userlog(DEBUG,"ProcPool destructor,PID[%ld]",getpid());
}

int ProcPool::ScanChild()
{
	fd_set rds;
	int maxfd=0;
	struct timeval timeout;
	struct timeval *TV=NULL;
	
	FD_ZERO(&rds);
	timeout.tv_sec  = 0;
  timeout.tv_usec = 0;
	if(selectTimeOut>0)
	{
		timeout.tv_usec = selectTimeOut*1000;
		TV=&timeout;
	}
	//cout<<"IN scan,chld_avail="<<chld_avail<<endl;
	//遍历子进程状态类,重置读集合
	for(int i=0,j=0;i<MaxNum&&j<chld_num;i++)
	{
		if( (CldInfo+i)->pid>0 )
		{
			j++;
			FD_SET( (CldInfo+i)->pfd,&rds);
    	maxfd=maxfd>(CldInfo+i)->pfd?maxfd:(CldInfo+i)->pfd;
		}
	}
	
	int result=select(maxfd+1,&rds,NULL,NULL,TV);
	
	if(result==0)
	{
		__logfile->Userlog(INFO,"select timeout");
		return 0;//超时
	}
	if(result<0)
	{
		__logfile->Userlog(ERROR,"select error[%s]",strerror(errno));
		return -1;
	}
		
	for(int i=0,j=0;i<MaxNum&&j<chld_num;i++)
  {
    if( (CldInfo+i)->pid>0&&FD_ISSET( (CldInfo+i)->pfd,&rds ) )
    {
    	CHILD_STATUS status=CS_UNKNOWN;
    	int n=read( (CldInfo+i)->pfd,&status,sizeof(status));
    	if(n==0)//如果空闲子进程退出
    	{
    		if( (CldInfo+i)->state==CS_WAITING )
    		{
    			//cout<<"n==0,avail --"<<endl;
					chld_avail--;
				}
				__logfile->Userlog(INFO,"child proc [%ld] exit",(CldInfo+i)->pid);
				(CldInfo+i)->pid=-1;
				close((CldInfo+i)->pfd);
				(CldInfo+i)->state=CS_UNKNOWN;
				chld_num--;
    	}
    	else if(n>0)
    	{
    		if(status==CS_PROCESSING)
    		{
    			//cout<<"status==PRO,avail --"<<endl;
    			(CldInfo+i)->state=CS_PROCESSING;
    			chld_avail--;
    			__logfile->Userlog(DEBUG,"child proc [%ld] busy",(CldInfo+i)->pid);
    		}
    		else
    		{
    			(CldInfo+i)->state=CS_WAITING;
    			chld_avail++;
    			__logfile->Userlog(DEBUG,"child proc [%ld] free",(CldInfo+i)->pid);
    		}
    	}
    	else/*如果出错,则程序退出*/
    	{
    		__logfile->Userlog(FATAL,"pipe read error[%s]",strerror(errno));
    		return -2;
    	}
    }
    if( (CldInfo+i)->pid>0 )
    	j++;
   }
   //判断空闲进程数是多还是少,并由此增加或是减少进程
   if( chld_avail > FreeHighNum )
   	{
   		__logfile->Userlog(INFO,"kill free child");
   		KillIdle();
   	}
   else if( chld_avail < FreeLowNum )
   	{
   		//cout<<"chld_avail="<<chld_avail<<endl;
   		__logfile->Userlog(INFO,"increase child");
   		IncreaseChld(IncNum);
   	}
		return 0;
}

bool ProcPool::IncreaseChld(int &num)
{
	//cout<<"will create "<<num<<" child"<<endl;
	int pfd[2];
	for(int i=0,idx=0;i<num;i++)
	{
		for( ; (CldInfo+i)->pid > 0;idx++ )
			if( idx == MaxNum-1 )
				return true;
		int result=pipe(pfd);
		//if(result <0 )
		//	cout<<"创建管道失败"<<endl;
		
		(CldInfo+i)->pid=fork();
		
		if( (CldInfo+i)->pid > 0 )/*父进程*/
		{
			(CldInfo+i)->state=CS_WAITING;
			(CldInfo+i)->pfd=pfd[0];
			chld_num++;
			chld_avail++;
			//cout<<"chld_avail= "<<chld_avail<<endl;
			close(pfd[1]);
			__logfile->Userlog(INFO,"increase child [%ld]",(CldInfo+i)->pid);
		}
		else if( (CldInfo+i)->pid == 0 )/*子进程*/
		{
			//修改子进程中相应类的信息
			//子进程继承了该类的所有信息,但是一些信息需要修改
			close(pfd[0]);
			(CldInfo+i)->pid=getpid();
			(CldInfo+i)->pfd=pfd[1];
			ChildProc(i);
		}
		else
		{
			__logfile->Userlog(FATAL,"fork error[%s]",strerror(errno));
    	return false;
		}
	}
	return true;
}

void ProcPool::KillChld(pid_t &pid,int idx)
{
	kill(pid,SIGINT);
	pid_t cpid=waitpid( pid,NULL,0 );
	cout<<cpid<<"exit"<<endl;
	__logfile->Userlog(INFO,"kill pid [%ld]",pid);
	if( idx >= 0 )
	{
		if( (CldInfo+idx)->pid == pid )
		{
			chld_avail--;
			chld_num--;
			close( (CldInfo+idx)->pfd );
			(CldInfo+idx)->pid = -1;
			(CldInfo+idx)->pfd = -1;
			(CldInfo+idx)->state = CS_UNKNOWN;
		}
	}
	else
	{
		for(int i=0;i < MaxNum;i++)
		{
			if( (CldInfo+i)->pid == pid )
			{
				chld_avail--;
				chld_num--;
				close( (CldInfo+i)->pfd );
				(CldInfo+i)->pid = -1;
				(CldInfo+i)->pfd = -1;
				(CldInfo+i)->state = CS_UNKNOWN;
				break;
			}
		}
	}
	return;
}

void ProcPool::KillIdle()
{
	//cout<<"in killidle"<<endl;
	for(int i=0;i<MaxNum;i++)
	{
		if( (CldInfo+i)->state == CS_WAITING&&(CldInfo+i)->pid >1&&chld_avail>FreeLowNum )
		{
			__logfile->Userlog(INFO,"pid [%ld] idle,will kill it",(CldInfo+i)->pid);
			KillChld((CldInfo+i)->pid,i);
		}
		if( chld_avail==FreeLowNum )
			break;
	}
	return;
}

void ProcPool::KillAll()
{
	__logfile->Userlog(DEBUG,"in kill all");
	for(int i=0;i<MaxNum;i++)
	{
		if( (CldInfo+i)->pid >1 )
		{
			KillChld((CldInfo+i)->pid,i);
		}
	}
	return;
}

int ProcPool::ClearUp()
{
	__logfile->Userlog(DEBUG,"in clear up");
	KillAll();
	return 0;
}

bool ProcPool::Init()
{
	__logfile->Userlog(DEBUG,"in ProcPool init");
	return IncreaseChld(InitNum);
}

void ProcPool::warn()
{
	__logfile->Userlog(ERROR,"初始化参数不满足下列条件之一[最大进程数>0,最大进程数>=初始进程数,最大空闲数>最小空闲数,最大空闲数>0,最小空闲数>0]");
}


void ProcPool::ChildProc(int& idx)
{
	__logfile->Userlog(DEBUG,"in child proc[%ld]",getpid());
	while(!QuitFlag)
	{
		//cout<<"child in"<<endl;
		void *task=NULL;
		if(!(task=WaitTask()))
			continue;
		(CldInfo+idx)->NotifyBusy();
		__logfile->Userlog(DEBUG,"[%ld] get task over,will do",getpid());
		doTask(task);
		__logfile->Userlog(DEBUG,"[%ld] do task over,now free",getpid());
		(CldInfo+idx)->NotifyFree();
	}
	//cout<<"child out"<<endl;
	close((CldInfo+idx)->pfd);
	exit(0);
}

void ProcPool::PrintPool()
{
	//打印到屏幕
	cout<<"最大进程数目			["<<MaxNum<<"]"<<endl;
	cout<<"初始子进程数目		["<<InitNum<<"]"<<endl;
	cout<<"最低空闲进程数目	["<<FreeLowNum<<"]"<<endl;
	cout<<"最高空闲进程数目	["<<FreeHighNum<<"]"<<endl;
	cout<<"每次增加的进程数	["<<IncNum<<"]"<<endl;
	cout<<"每次减少的进程数	["<<DecNum<<"]"<<endl;
	cout<<"总的进程数				["<<chld_num<<"]"<<endl;
	cout<<"总的空闲进程数		["<<chld_avail<<"]"<<endl;
	cout<<"select的超时时间	["<<selectTimeOut<<"]毫秒"<<endl;
	for(int i=0;i<MaxNum;i++)
	{
		if((CldInfo+i)->pid>0)
		{
			if((CldInfo+i)->state==CS_WAITING)
				cout<<"进程["<<(CldInfo+i)->pid<<"],状态[空闲]"<<endl;
			else
				cout<<"进程["<<(CldInfo+i)->pid<<"],状态[忙碌]"<<endl;
		}
	}
	//打印到日志
	__logfile->Userlog(FATAL,"MaxNum        [%d]",MaxNum);
	__logfile->Userlog(FATAL,"InitNum       [%d]",InitNum);
	__logfile->Userlog(FATAL,"FreeLowNum    [%d]",FreeLowNum);
	__logfile->Userlog(FATAL,"FreeHighNum   [%d]",FreeHighNum);
	__logfile->Userlog(FATAL,"IncNum        [%d]",IncNum);
	__logfile->Userlog(FATAL,"DecNum        [%d]",DecNum);
	__logfile->Userlog(FATAL,"chld_num      [%d]",chld_num);
	__logfile->Userlog(FATAL,"chld_avail    [%d]",chld_avail);
	__logfile->Userlog(FATAL,"selectTimeOut [%d]",selectTimeOut);
	for(int i=0;i<MaxNum;i++)
	{
		if((CldInfo+i)->pid>0)
		{
			if((CldInfo+i)->state==CS_WAITING)
				__logfile->Userlog(FATAL,"进程[%ld],状态[空闲]",(CldInfo+i)->pid);
			else
				__logfile->Userlog(FATAL,"进程[%ld],状态[忙碌]",(CldInfo+i)->pid);
		}
	}
}


ProcPool::semaphore::semaphore( key_t& keyval,int& numsems )
{
	_sid=0;
	_semnum=numsems;
	_keyval=keyval;
	
}

ProcPool::semaphore::semaphore( )
{
	_sid=0;
	_semnum=1;
	_keyval=IPC_PRIVATE;
	__logfile->Userlog(DEBUG,"in semaphore default constructor,will create()");
	create( );
	__logfile->Userlog(DEBUG,"in semaphore default constructor,create() over,will init");
	init(0,1);
	__logfile->Userlog(DEBUG,"in semaphore default constructor,init over");
}


bool ProcPool::semaphore::create( )
{
	if( _semnum <= 0)
		return false;
	if( (_sid=semget(_keyval,_semnum,IPC_CREAT|0660))==-1 )
	{
		__logfile->Userlog(ERROR,"create semaphore error[%s]",strerror(errno));
		return false;
	}
	__logfile->Userlog(INFO,"create semaphore[%ld]",_sid);
	return true;
}

union semun
{
	int val;
	struct semid_ds *buf;
	unsigned short *array;
};

bool ProcPool::semaphore::init(int seq ,int value)
{
	if( seq < 0||(seq+1) >_semnum )
		return false;
	union semun semopts;
	semopts.val=value;
	if( semctl(_sid,seq,SETVAL,semopts)<0 )
	{
		__logfile->Userlog(ERROR,"set semaphore value error[%s]",strerror(errno));
		return false;
	}
	__logfile->Userlog(DEBUG,"set semaphore value");
	return true;
	
}
bool ProcPool::semaphore::lock( int seq )
{
	__logfile->Userlog(DEBUG,"pid [%ld] in semaphore lock",getpid());
	struct sembuf sem_lock;/*={member,-1,SEM_UNDO}	*/
	memset(&sem_lock,0,sizeof(struct sembuf));
	//if (member<0||member>MAXSEMNUM)
	if ( seq<0 )
		return false;	
	sem_lock.sem_num=seq;
	sem_lock.sem_op=-1;
	sem_lock.sem_flg=SEM_UNDO;
	
	if( semop( _sid,&sem_lock,1 )<0 )
	{
		__logfile->Userlog(ERROR,"pid [%ld] semaphore lock error[%s]",getpid(),strerror(errno));
		return false;
	}
	__logfile->Userlog(DEBUG,"pid [%ld] out semaphore lock",getpid());
	return true;
}

bool ProcPool::semaphore::unlock( int seq )
{
	__logfile->Userlog(DEBUG,"pid [%ld] in semaphore unlock",getpid());
	struct sembuf sem_lock;/*={member,-1,SEM_UNDO}	*/
	memset(&sem_lock,0,sizeof(struct sembuf));
	//if (member<0||member>MAXSEMNUM)
	if ( seq<0 )
		return false;	
	sem_lock.sem_num=seq;
	sem_lock.sem_op=1;
	sem_lock.sem_flg=SEM_UNDO;
	if( semop( _sid,&sem_lock,1 )<0 )
	{
		__logfile->Userlog(ERROR,"pid [%ld] semaphore unlock error[%s]",getpid(),strerror(errno));
		return false;
	}
	__logfile->Userlog(DEBUG,"pid [%ld] out semaphore lock",getpid());
	return true;
}

void ProcPool::semaphore::remove( )
{
	__logfile->Userlog(DEBUG,"pid [%ld] in semaphore remove",getpid());
	semctl( _sid,0,IPC_RMID,0 );
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -