⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 slmesg.c

📁 UNIX/LINUX下自编的消息队列程序
💻 C
📖 第 1 页 / 共 2 页
字号:
/****************************************************************
 * 程序名 : slmesg.c											*
 * 功  能 : 新利软件消息队列									*
 *      1. 取消消息队列配置文件,改用固定参数或从初始化函数参数	*
 *		2. 修改,原创建队列时未用锁,现改为创建队列时也用锁		*
 *		3. 原等待是在读写消息函数中完成,现改为单独用一个函数完成*
 ****************************************************************/
# include	<stdio.h>
# include	<fcntl.h>
# include	<string.h>
# include	<sys/ipc.h>
# include	<sys/shm.h>
# include	<sys/msg.h>
# include	<signal.h>
# include	<unistd.h>
# include	<errno.h>

# include	"slmesg.h"

# define	SLMQVER		"SLMQ7.0"
# define	LOCKFILE	"/tmp/.mqhandle"

# define	M_RCVPROC	0x01
# define	M_NOTICE	0x20
# define	M_PROCCHK	0x40

# define	MaxMsgNum		16		//每个进程中使用的最大消息队列数
# define	MaxWaitProcNum	512		//最大等待进程数量
# define	M_NOCOMPVAL		-1		//通配值定义

# define	MegSize			1048576		//定义单位大小
# define	DefaultTimeOut	120			//缺省超时时间
# define	DefaultMsgSize	2
# define	DefaultBlockSize	1024	//缺省块大小

# define	DefSysMsgNo		98765		//系统消息队列号

# define	TIMEOUTMASK		0x00FFFFFF

typedef struct {			//进程中消息队列结构
	int		msgid;
	int		MsgNo[MaxMsgNum];	//消息队列号
	char	*MsgBuf[MaxMsgNum]; //内存地址
	} ProcMesgStru;

typedef struct {		//等待进程队列结构
	int		m_sour;		//读进程源或写进程时的消息体长度
	int		m_dest;		//读进程的目标
	int		m_msgid;	//读进程的消息序列号
	int		m_msgtype;	//消息类型
	int		ProcType;	//等待类型 1-读 0-写消息
	int		next;		//下一个等待进程地址
	int		pid;		//进程号
	char	fill[4];
	} WaitProcQue;

typedef struct {
	int		m_sour;		//消息源
	int		m_dest;		//消息目标地址
	int		m_msgid;	//消息序列号
	int		m_msgtype;	//消息类型
	int		m_msglen;	//消息体长度,不包含本处的各参数长度
	int		m_errno;	//错误返回码
	int		m_creatime;	//消息产生时间
	//以上对应着消息头结构,若要更改,请同时更改
	int		Start;		//消息体首块块号
	int		m_wrtm;		//本条消息写入队列时间
	} MsgRec;

typedef struct {
	char	MsgId;			//标志
	char	VerNo[7];		//
	int		BlockSize;		//块大小,也是最大未读消息条数
	long	Tblocks;		//内存总块数
	long	UnuseBlocks;	//未用的总块数
	int		Timeout;		//超时时间
	int		WaitProcStart;	//等待进程队列起始地址
	int		FirstProc;		//当前第一个等待进程
	int		MsgNum;			//现有消息总数
	int		MacTab;			//内存块控制区字节数
	int		RecStart;		//消息记录区起始地址
	int		MacStart;		//内存控制块起始地址
	int		DataStart;		//消息数据区起始地址
	int		TotSize;		//本队列总空间,用字节数表示
//	unsigned int	rcvnum;
//	unsigned int 	sndnum;
	char	fill[8];		//Reserve
	} MsgHead;

static	ProcMesgStru	*pmsptr=NULL,pms;
static	int		tmout;

static	int	lockfd=-1;

extern	char	*getenv();

void SLMesgLock(char *msgptr, int No)
{
	MsgHead		*mh;
    char lockfile[101] = "";

    sprintf(lockfile, "%s%06d\0", LOCKFILE, No);
	if(lockfd==-1)
		if((lockfd=open(lockfile,O_WRONLY|O_CREAT,0666))<0) {
			fprintf(stderr,"open file error!\n");
			exit(0);
		}

	lockf(lockfd,F_LOCK,0);
	
	if(msgptr == NULL) return;
	mh = (MsgHead *)msgptr;
	if(mh->MsgId!= '\0') {
		printf("Execute Code has been reenter! %d\n", mh->MsgId);
		while(mh->MsgId!='\0') usleep(10);
	}
	mh->MsgId += 0x01;
}

void SLMesgUnLock(char *msgptr)
{
	MsgHead		*mh;

	if(msgptr != NULL) {
		mh = (MsgHead *)msgptr;
		if(mh->MsgId != 0x01) {
			printf("Lock Error!\n");
		}
		mh->MsgId = 0x00;
	}
	if(lockfd < 0) return;
	lockf(lockfd,F_ULOCK,0);
	close(lockfd);
	lockfd=-1;
}

int MkMesgQue(int mkey,int mlen,int blocksize,int timeout)
{
	int		shmid,i,j,n;
	char    *msgbuf;
	MsgHead *mh;

	if(mkey<1) return ;
	if(mlen<1 || mlen>32) mlen=DefaultMsgSize;

	n=0;
	i=mlen*MegSize;
	SLMesgLock(NULL, mkey);
	if((shmid = shmget((key_t)mkey, 1 , IPC_EXCL))>=0) {
		SLMesgUnLock(NULL);
		return 0;
	}
	if((shmid=shmget((key_t)mkey,i,IPC_CREAT|0666))<0) {
		SLMesgUnLock(NULL);
		return -10;
	}
	if((msgbuf=(char *)shmat(shmid,NULL,0))==(char *)-1) {
		SLMesgUnLock(NULL);
		return -11;
	}
	memset(msgbuf,'\0',i);
	mh=(MsgHead *)msgbuf;
	if(blocksize<128 || blocksize>8192) blocksize=DefaultBlockSize;
	if(blocksize & 0x03)
		blocksize+=4-blocksize & 0x03;

	strncpy(mh->VerNo,SLMQVER,7);

	mh->BlockSize=blocksize;
	mh->TotSize=i;
	mh->Timeout=timeout<0? DefaultTimeOut:timeout;
	j=sizeof(MsgHead) + MaxWaitProcNum * sizeof(WaitProcQue);
	n=(i-j)/(sizeof(MsgRec)+blocksize +sizeof(int));
	//n=总块数
	if(n<1) {
		SLMesgUnLock(NULL);
		return -12;
	}

	mh->MacTab=i-j-n*(sizeof(MsgRec)+blocksize);
	mh->Tblocks=n;
	mh->UnuseBlocks=n;
	mh->WaitProcStart=sizeof(MsgHead);
	mh->RecStart=mh->WaitProcStart+ MaxWaitProcNum*sizeof(WaitProcQue);
	mh->MacStart=mh->RecStart+n*sizeof(MsgRec);
	mh->DataStart=mh->MacStart + mh->MacTab;
	SLMesgUnLock(NULL);
	return 0;
}


//查找空块,在写入消息前查找空块用于写入消息,返回-1时表示没有空
int TabTest(char *msgptr)
{
	char    *ptr;
	MsgHead *head;
	int *t;
	int i=0;

	head=(MsgHead *)msgptr;

	ptr=(char *)(msgptr+head->MacStart);
	t=(int *)ptr;
	while(i<head->Tblocks) {
		if(*(t+i)==0) return i+1;
		i++;
	}
	return -1;
}

//设置块记录
void TabSet(char *msgptr,int No,int val)
{
	char    *ptr;
	MsgHead *head;
	int *t;

	head=(MsgHead *)msgptr;
	ptr=(char *)(msgptr+head->MacStart);
	t=(int *)ptr;
	t+=No-1;
	*t=val;
}

//读控制块值
int TabRead(char *msgptr,int No)
{
	char    *ptr;
	MsgHead *head;
	int *t,ret;

	head=(MsgHead *)msgptr;
	ptr=(char *)(msgptr+head->MacStart);
	t=(int *)ptr;
	t+=No-1;
	ret=*t;
	return ret;
}

//测试一个进程是否存在,返回 0-存在, 1-进程pid已不存在
int ValidProc(int pid)
{
	if(pid<=0) return 1;
	if(kill(pid, 0)!=-1) return 0;
	if(errno!=ESRCH) return 0;
	return 1;
}

void TimeProc(int sig)
{
	tmout=1;
	return ;
}

int MsgCompMsg(MsgRec *sr,MsgRec *dt)
{
	if((sr->m_dest!=dt->m_dest && sr->m_dest!=M_NOCOMPVAL)
	    || (sr->m_sour!=dt->m_sour && sr->m_sour!=M_NOCOMPVAL)
	    || (sr->m_msgid!=dt->m_msgid && sr->m_msgid!=M_NOCOMPVAL)
	    || (sr->m_msgtype!=dt->m_msgtype && sr->m_msgtype!=M_NOCOMPVAL))
		return 1;
	return 0;
}

//id=0 通知读进程, id=1 通知写进程
//当有消息写入或读出时通知相关等待进程
void MsgNotice(char *msgptr,int id,MsgRec *mr)
{
	WaitProcQue *start,*wpq;
	MsgHead     *head;
	int     i=0;
	struct	msgbuf	mbuf;

	head=(MsgHead *)msgptr;

	start=(WaitProcQue *)(msgptr + head->WaitProcStart);
	i=head->FirstProc;

	while(i>0) {
		wpq=start + i - 1;
		if(!id && (wpq->ProcType & M_RCVPROC)
		    && !MsgCompMsg((MsgRec *)wpq,mr)
		    && !ValidProc(wpq->pid))
			break;
		if(id && !(wpq->ProcType & M_RCVPROC)
		    && !ValidProc(wpq->pid) && !(wpq->ProcType & M_NOTICE)) {
			if(head->UnuseBlocks<wpq->m_sour) {
//				printf("UnuseBlocks = %d m_sour = %d\n", head->UnuseBlocks, wpq->m_sour);
				return ;
			}
			else break;
		}
		i=wpq->next;
	}

	if(i<=0) {
//		printf(" has not process to notice!\n");
		return ;
	}

	//如果已经有该进程的唤醒消息 先删除
	while (msgrcv(pms.msgid, &mbuf, 0, wpq->pid, IPC_NOWAIT)>=0) ;

	mbuf.mtype = wpq->pid;
	mbuf.mtext[0]='Y';

//	printf("%d to Notice %d Success!\n", getpid(), wpq->pid);
	msgsnd(pms.msgid, &mbuf, 0, IPC_NOWAIT);

	wpq->ProcType |= M_NOTICE;
}

//初始化消息队列
int SLMesgGet(int MesgNo)
{
	int     i, j,shmid;
	char        *msgptr;

	if(pmsptr==NULL) {
		//第一次初始化
		    pms.msgid=-1;
		for(i=0;i<MaxMsgNum;i++) {
			pms.MsgNo[i]=-1;
			pms.MsgBuf[i]=NULL;
		}
		pmsptr=&pms;
	}
	for(i=0;i<MaxMsgNum;i++) {
		if(pms.MsgNo[i]==MesgNo) {
			if(pms.MsgBuf[i]!=NULL)
				return 0;
			else
				pms.MsgNo[i]=-1;
		}
	}

	for(i=0;i<MaxMsgNum;i++)
		if(pms.MsgNo[i]<0) break;
	if(i>=MaxMsgNum) return -1;

	if((j=MkMesgQue(MesgNo,DefaultMsgSize, DefaultBlockSize, DefaultTimeOut))<0)
		return j;

	if((shmid = shmget((key_t)MesgNo,1, IPC_EXCL))<0) return -3;

	if((msgptr=(char *)shmat(shmid,NULL,0))==(char *)-1)
		return -4;

	if(strncmp(&msgptr[1], SLMQVER, 7)) return -5;

	if(pms.msgid < 0)
		if((pms.msgid = msgget((key_t)DefSysMsgNo,IPC_CREAT | 0666))<0)
			return -6;

	pms.MsgNo[i]=MesgNo;
	pms.MsgBuf[i]=msgptr;
	return 0;
}

int SLMesgDel(int No)
{
	int     i;

	if(No<1) return -1;
	if(pmsptr==NULL) return -2;
	for(i=0;i<MaxMsgNum;i++)
		if(pms.MsgNo[i]==No) break;
	if(i>=MaxMsgNum) return -2;
	if(pms.MsgBuf[i]==NULL) {
		pms.MsgNo[i]=-1;
		return -2;
	}
	shmdt(pms.MsgBuf[i]);
	pms.MsgBuf[i]=NULL;
	pms.MsgNo[i]=-1;
	return 0;
}

//进程开始等待,直到被唤醒
//写等待进程表,然后在系统消息队列上等待,收到系统消息后清除清待进程表
//返回值:-1  等待进程过多,不能写入
int ProcWait(int No, char *msgptr,MsgRec *mr,int rs, int timeout)
{
	MsgHead     *mh;
	WaitProcQue *wpq,*start, *wpqclr;
	int     *oldp, i=0, j;
	struct	msgbuf	mbuf;

	mh=(MsgHead *)msgptr;

	start=(WaitProcQue *)(msgptr + mh->WaitProcStart);

	for(i=0;i<MaxWaitProcNum;i++)
	{
		wpq=start + i;
		if(wpq->pid==0) break;
	}
	if(i>=MaxWaitProcNum)
	{
		return -1;
	}
	oldp=&mh->FirstProc;
	wpq=start;
	while(1)
	{
		if(*oldp < 1) break;
		wpq=start + *oldp - 1;
		oldp=&wpq->next;
	}
	*oldp = i + 1;
	wpq=start + i;
	memset((char *)wpq,'\0',sizeof(WaitProcQue));
	wpq->pid=getpid();
	if(rs & M_RCVPROC) {
		wpq->m_sour=mr->m_sour;
		wpq->m_dest=mr->m_dest;
		wpq->m_msgid=mr->m_msgid;
		wpq->m_msgtype=mr->m_msgtype;
		wpq->ProcType=M_RCVPROC;
	}
	else {
		wpq->m_sour=(mr->m_msglen+mh->BlockSize - 1)/mh->BlockSize;
		wpq->ProcType=0x00;
	}
	tmout = 0;
	if(timeout) {
		signal(SIGALRM,TimeProc);
		alarm(timeout);
	}
	SLMesgUnLock(msgptr);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -