📄 procpool.cpp
字号:
#include "procpool.h"
//全局变量,应用程序需用extern声明此值进行程序控制
volatile sig_atomic_t QuitFlag=0;
SimpleLog *ProcPool::__logfile=NULL;
ProcPool::ProcPool(SimpleLog &logfile,int& max,int& init,int &low,int& high,int& inc,int& dec)
{
__logfile= &logfile;
if(max<=0||max<init||low<high||inc<=0||dec<=0)
warn();
MaxNum=max;
InitNum=init;
FreeLowNum=low;
FreeHighNum=high;
IncNum=inc;
DecNum=dec;
chld_num=0;
chld_avail=0;
selectTimeOut=0;
if(MaxNum>0)
{
CldInfo=new ChildInfo[MaxNum];
assert(CldInfo!=NULL);
}
else
{
CldInfo=NULL;
}
_acceptlock=new semaphore( );
assert(_acceptlock!=NULL);
__logfile->Userlog(DEBUG,"ProcPool Constructor with 7 param,Max[%d],Init[%d],FreeLow[%d],FreeHigh[%d],Inc[%d], \
Dec[%d]",MaxNum,InitNum,FreeLowNum,FreeHighNum,IncNum,DecNum);
}
ProcPool::ProcPool(SimpleLog &logfile,int& max,int& init,int &low,int& high,int& inc,int& dec,int &timeout)
{
__logfile= &logfile;
if(max<=0||max<init||low<high||inc<=0||dec<=0)
warn();
MaxNum=max;
InitNum=init;
FreeLowNum=low;
FreeHighNum=high;
IncNum=inc;
DecNum=dec;
chld_num=0;
chld_avail=0;
selectTimeOut=timeout;
if(MaxNum>0)
{
CldInfo=new ChildInfo[MaxNum];
assert(CldInfo!=NULL);
}
else
{
CldInfo=NULL;
}
_acceptlock=new semaphore( );
assert(_acceptlock!=NULL);
__logfile->Userlog(DEBUG,"ProcPool Constructor with 8 param,Max[%d],Init[%d],FreeLow[%d],FreeHigh[%d],Inc[%d], \
Dec[%d],TimeOut[%d]",MaxNum,InitNum,FreeLowNum,FreeHighNum,IncNum,DecNum,selectTimeOut);
}
ProcPool::ProcPool(SimpleLog &logfile)
{
__logfile= &logfile;
MaxNum=DEFAULT_MAXNUM;
InitNum=DEFAULT_INITNUM;
FreeLowNum=DEFAULT_FREELOWNUM;
FreeHighNum=DEFAULT_FREEHIGHNUM;
IncNum=DEFAULT_INCNUM;
DecNum=DEFAULT_DECNUM;
chld_num=0;
chld_avail=0;
selectTimeOut=0;
if(MaxNum>0)
{
CldInfo=new ChildInfo[MaxNum];
assert(CldInfo!=NULL);
}
else
{
CldInfo=NULL;
}
_acceptlock=new semaphore( );
assert(_acceptlock!=NULL);
__logfile->Userlog(DEBUG,"ProcPool Constructor with default param");
}
ProcPool::~ProcPool()
{
KillAll();
sleep(2);
delete CldInfo;
CldInfo=NULL;
__logfile->Userlog(DEBUG,"ProcPool destructor,PID[%ld]",getpid());
}
int ProcPool::ScanChild()
{
fd_set rds;
int maxfd=0;
struct timeval timeout;
struct timeval *TV=NULL;
FD_ZERO(&rds);
timeout.tv_sec = 0;
timeout.tv_usec = 0;
if(selectTimeOut>0)
{
timeout.tv_usec = selectTimeOut*1000;
TV=&timeout;
}
//cout<<"IN scan,chld_avail="<<chld_avail<<endl;
//遍历子进程状态类,重置读集合
for(int i=0,j=0;i<MaxNum&&j<chld_num;i++)
{
if( (CldInfo+i)->pid>0 )
{
j++;
FD_SET( (CldInfo+i)->pfd,&rds);
maxfd=maxfd>(CldInfo+i)->pfd?maxfd:(CldInfo+i)->pfd;
}
}
int result=select(maxfd+1,&rds,NULL,NULL,TV);
if(result==0)
{
__logfile->Userlog(INFO,"select timeout");
return 0;//超时
}
if(result<0)
{
__logfile->Userlog(ERROR,"select error[%s]",strerror(errno));
return -1;
}
for(int i=0,j=0;i<MaxNum&&j<chld_num;i++)
{
if( (CldInfo+i)->pid>0&&FD_ISSET( (CldInfo+i)->pfd,&rds ) )
{
CHILD_STATUS status=CS_UNKNOWN;
int n=read( (CldInfo+i)->pfd,&status,sizeof(status));
if(n==0)//如果空闲子进程退出
{
if( (CldInfo+i)->state==CS_WAITING )
{
//cout<<"n==0,avail --"<<endl;
chld_avail--;
}
__logfile->Userlog(INFO,"child proc [%ld] exit",(CldInfo+i)->pid);
(CldInfo+i)->pid=-1;
close((CldInfo+i)->pfd);
(CldInfo+i)->state=CS_UNKNOWN;
chld_num--;
}
else if(n>0)
{
if(status==CS_PROCESSING)
{
//cout<<"status==PRO,avail --"<<endl;
(CldInfo+i)->state=CS_PROCESSING;
chld_avail--;
__logfile->Userlog(DEBUG,"child proc [%ld] busy",(CldInfo+i)->pid);
}
else
{
(CldInfo+i)->state=CS_WAITING;
chld_avail++;
__logfile->Userlog(DEBUG,"child proc [%ld] free",(CldInfo+i)->pid);
}
}
else/*如果出错,则程序退出*/
{
__logfile->Userlog(FATAL,"pipe read error[%s]",strerror(errno));
return -2;
}
}
if( (CldInfo+i)->pid>0 )
j++;
}
//判断空闲进程数是多还是少,并由此增加或是减少进程
if( chld_avail > FreeHighNum )
{
__logfile->Userlog(INFO,"kill free child");
KillIdle();
}
else if( chld_avail < FreeLowNum )
{
//cout<<"chld_avail="<<chld_avail<<endl;
__logfile->Userlog(INFO,"increase child");
IncreaseChld(IncNum);
}
return 0;
}
bool ProcPool::IncreaseChld(int &num)
{
//cout<<"will create "<<num<<" child"<<endl;
int pfd[2];
for(int i=0,idx=0;i<num;i++)
{
for( ; (CldInfo+i)->pid > 0;idx++ )
if( idx == MaxNum-1 )
return true;
int result=pipe(pfd);
//if(result <0 )
// cout<<"创建管道失败"<<endl;
(CldInfo+i)->pid=fork();
if( (CldInfo+i)->pid > 0 )/*父进程*/
{
(CldInfo+i)->state=CS_WAITING;
(CldInfo+i)->pfd=pfd[0];
chld_num++;
chld_avail++;
//cout<<"chld_avail= "<<chld_avail<<endl;
close(pfd[1]);
__logfile->Userlog(INFO,"increase child [%ld]",(CldInfo+i)->pid);
}
else if( (CldInfo+i)->pid == 0 )/*子进程*/
{
//修改子进程中相应类的信息
//子进程继承了该类的所有信息,但是一些信息需要修改
close(pfd[0]);
(CldInfo+i)->pid=getpid();
(CldInfo+i)->pfd=pfd[1];
ChildProc(i);
}
else
{
__logfile->Userlog(FATAL,"fork error[%s]",strerror(errno));
return false;
}
}
return true;
}
void ProcPool::KillChld(pid_t &pid,int idx)
{
kill(pid,SIGINT);
pid_t cpid=waitpid( pid,NULL,0 );
cout<<cpid<<"exit"<<endl;
__logfile->Userlog(INFO,"kill pid [%ld]",pid);
if( idx >= 0 )
{
if( (CldInfo+idx)->pid == pid )
{
chld_avail--;
chld_num--;
close( (CldInfo+idx)->pfd );
(CldInfo+idx)->pid = -1;
(CldInfo+idx)->pfd = -1;
(CldInfo+idx)->state = CS_UNKNOWN;
}
}
else
{
for(int i=0;i < MaxNum;i++)
{
if( (CldInfo+i)->pid == pid )
{
chld_avail--;
chld_num--;
close( (CldInfo+i)->pfd );
(CldInfo+i)->pid = -1;
(CldInfo+i)->pfd = -1;
(CldInfo+i)->state = CS_UNKNOWN;
break;
}
}
}
return;
}
void ProcPool::KillIdle()
{
//cout<<"in killidle"<<endl;
for(int i=0;i<MaxNum;i++)
{
if( (CldInfo+i)->state == CS_WAITING&&(CldInfo+i)->pid >1&&chld_avail>FreeLowNum )
{
__logfile->Userlog(INFO,"pid [%ld] idle,will kill it",(CldInfo+i)->pid);
KillChld((CldInfo+i)->pid,i);
}
if( chld_avail==FreeLowNum )
break;
}
return;
}
void ProcPool::KillAll()
{
__logfile->Userlog(DEBUG,"in kill all");
for(int i=0;i<MaxNum;i++)
{
if( (CldInfo+i)->pid >1 )
{
KillChld((CldInfo+i)->pid,i);
}
}
return;
}
int ProcPool::ClearUp()
{
__logfile->Userlog(DEBUG,"in clear up");
KillAll();
return 0;
}
bool ProcPool::Init()
{
__logfile->Userlog(DEBUG,"in ProcPool init");
return IncreaseChld(InitNum);
}
void ProcPool::warn()
{
__logfile->Userlog(ERROR,"初始化参数不满足下列条件之一[最大进程数>0,最大进程数>=初始进程数,最大空闲数>最小空闲数,最大空闲数>0,最小空闲数>0]");
}
void ProcPool::ChildProc(int& idx)
{
__logfile->Userlog(DEBUG,"in child proc[%ld]",getpid());
while(!QuitFlag)
{
//cout<<"child in"<<endl;
void *task=NULL;
if(!(task=WaitTask()))
continue;
(CldInfo+idx)->NotifyBusy();
__logfile->Userlog(DEBUG,"[%ld] get task over,will do",getpid());
doTask(task);
__logfile->Userlog(DEBUG,"[%ld] do task over,now free",getpid());
(CldInfo+idx)->NotifyFree();
}
//cout<<"child out"<<endl;
close((CldInfo+idx)->pfd);
exit(0);
}
void ProcPool::PrintPool()
{
//打印到屏幕
cout<<"最大进程数目 ["<<MaxNum<<"]"<<endl;
cout<<"初始子进程数目 ["<<InitNum<<"]"<<endl;
cout<<"最低空闲进程数目 ["<<FreeLowNum<<"]"<<endl;
cout<<"最高空闲进程数目 ["<<FreeHighNum<<"]"<<endl;
cout<<"每次增加的进程数 ["<<IncNum<<"]"<<endl;
cout<<"每次减少的进程数 ["<<DecNum<<"]"<<endl;
cout<<"总的进程数 ["<<chld_num<<"]"<<endl;
cout<<"总的空闲进程数 ["<<chld_avail<<"]"<<endl;
cout<<"select的超时时间 ["<<selectTimeOut<<"]毫秒"<<endl;
for(int i=0;i<MaxNum;i++)
{
if((CldInfo+i)->pid>0)
{
if((CldInfo+i)->state==CS_WAITING)
cout<<"进程["<<(CldInfo+i)->pid<<"],状态[空闲]"<<endl;
else
cout<<"进程["<<(CldInfo+i)->pid<<"],状态[忙碌]"<<endl;
}
}
//打印到日志
__logfile->Userlog(FATAL,"MaxNum [%d]",MaxNum);
__logfile->Userlog(FATAL,"InitNum [%d]",InitNum);
__logfile->Userlog(FATAL,"FreeLowNum [%d]",FreeLowNum);
__logfile->Userlog(FATAL,"FreeHighNum [%d]",FreeHighNum);
__logfile->Userlog(FATAL,"IncNum [%d]",IncNum);
__logfile->Userlog(FATAL,"DecNum [%d]",DecNum);
__logfile->Userlog(FATAL,"chld_num [%d]",chld_num);
__logfile->Userlog(FATAL,"chld_avail [%d]",chld_avail);
__logfile->Userlog(FATAL,"selectTimeOut [%d]",selectTimeOut);
for(int i=0;i<MaxNum;i++)
{
if((CldInfo+i)->pid>0)
{
if((CldInfo+i)->state==CS_WAITING)
__logfile->Userlog(FATAL,"进程[%ld],状态[空闲]",(CldInfo+i)->pid);
else
__logfile->Userlog(FATAL,"进程[%ld],状态[忙碌]",(CldInfo+i)->pid);
}
}
}
ProcPool::semaphore::semaphore( key_t& keyval,int& numsems )
{
_sid=0;
_semnum=numsems;
_keyval=keyval;
}
ProcPool::semaphore::semaphore( )
{
_sid=0;
_semnum=1;
_keyval=IPC_PRIVATE;
__logfile->Userlog(DEBUG,"in semaphore default constructor,will create()");
create( );
__logfile->Userlog(DEBUG,"in semaphore default constructor,create() over,will init");
init(0,1);
__logfile->Userlog(DEBUG,"in semaphore default constructor,init over");
}
bool ProcPool::semaphore::create( )
{
if( _semnum <= 0)
return false;
if( (_sid=semget(_keyval,_semnum,IPC_CREAT|0660))==-1 )
{
__logfile->Userlog(ERROR,"create semaphore error[%s]",strerror(errno));
return false;
}
__logfile->Userlog(INFO,"create semaphore[%ld]",_sid);
return true;
}
union semun
{
int val;
struct semid_ds *buf;
unsigned short *array;
};
bool ProcPool::semaphore::init(int seq ,int value)
{
if( seq < 0||(seq+1) >_semnum )
return false;
union semun semopts;
semopts.val=value;
if( semctl(_sid,seq,SETVAL,semopts)<0 )
{
__logfile->Userlog(ERROR,"set semaphore value error[%s]",strerror(errno));
return false;
}
__logfile->Userlog(DEBUG,"set semaphore value");
return true;
}
bool ProcPool::semaphore::lock( int seq )
{
__logfile->Userlog(DEBUG,"pid [%ld] in semaphore lock",getpid());
struct sembuf sem_lock;/*={member,-1,SEM_UNDO} */
memset(&sem_lock,0,sizeof(struct sembuf));
//if (member<0||member>MAXSEMNUM)
if ( seq<0 )
return false;
sem_lock.sem_num=seq;
sem_lock.sem_op=-1;
sem_lock.sem_flg=SEM_UNDO;
if( semop( _sid,&sem_lock,1 )<0 )
{
__logfile->Userlog(ERROR,"pid [%ld] semaphore lock error[%s]",getpid(),strerror(errno));
return false;
}
__logfile->Userlog(DEBUG,"pid [%ld] out semaphore lock",getpid());
return true;
}
bool ProcPool::semaphore::unlock( int seq )
{
__logfile->Userlog(DEBUG,"pid [%ld] in semaphore unlock",getpid());
struct sembuf sem_lock;/*={member,-1,SEM_UNDO} */
memset(&sem_lock,0,sizeof(struct sembuf));
//if (member<0||member>MAXSEMNUM)
if ( seq<0 )
return false;
sem_lock.sem_num=seq;
sem_lock.sem_op=1;
sem_lock.sem_flg=SEM_UNDO;
if( semop( _sid,&sem_lock,1 )<0 )
{
__logfile->Userlog(ERROR,"pid [%ld] semaphore unlock error[%s]",getpid(),strerror(errno));
return false;
}
__logfile->Userlog(DEBUG,"pid [%ld] out semaphore lock",getpid());
return true;
}
void ProcPool::semaphore::remove( )
{
__logfile->Userlog(DEBUG,"pid [%ld] in semaphore remove",getpid());
semctl( _sid,0,IPC_RMID,0 );
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -