📄 mqueue.c
字号:
#include <stdio.h>
#include <pthread.h>
#include <assert.h>
#include "list.h"
#include "mqueue.h"
static SLIST MsgqList;
typedef int HANDLE;
static pthread_mutex_t hMutex = PTHREAD_MUTEX_INITIALIZER;
typedef struct MSQ_tag
{
union
{
HANDLE hReadPipe;
HANDLE hWritePipe;
};
struct mq_attr Attri;
char Name[256];
} MSQ;
int mq_close(mqd_t mqdes)
{
MSQ *pMsq = (MSQ*)mqdes;
pthread_mutex_lock(&hMutex);
SListDelNodeByData(&MsgqList, pMsq);
pthread_mutex_unlock(&hMutex);
msgctl(pMsq->hReadPipe, IPC_RMID, NULL); // 删除消息队列
free(pMsq);
if (MsgqList.count <= 0)
{
pthread_mutex_destroy(&hMutex);
}
return 0;
}
mqd_t mq_open(const char *name, int oflag, ...)
{
int iret_val = 0;
MSQ *pMsq = NULL;
va_list vaCreateArgs;
struct mq_attr *pAttr;
DWORD dmsg_queue_size = 0;
key_t key;
int msgid;
struct msqid_ds msg_sinfo;
char msg_path[MAX_NAME_LEN];
SListReset(&MsgqList);
while ((pMsq = SListGetCurData(&MsgqList)) != NULL)
{
if (strcmp(pMsq->Name, name) == 0)
{
return (mqd_t)pMsq;
}
SListNextNode(&MsgqList);
}
pMsq = calloc(1, sizeof(MSQ));
strcpy(pMsq->Name, name);
va_start(vaCreateArgs, oflag);
/* get the mode argument, ignore it */
va_arg(vaCreateArgs, int);
/* get a pointer to an mq_attr structure */
pAttr = va_arg(vaCreateArgs, struct mq_attr*);
va_end(vaCreateArgs);
memcpy(&pMsq->Attri, pAttr, sizeof(struct mq_attr));
dmsg_queue_size = pMsq->Attri.mq_msgsize * pMsq->Attri.mq_maxmsg;
sprintf(msg_path, "/home/msg_queue_%d", dmsg_queue_size);
key = ftok(msg_path, 'a'); // 根据路径名获取一个标识符
msgid = msgget(key, IPC_CREAT|0666); // 创建一个消息队列
if(msgid == -1)
{
printf("\r\n\t msgget(...) fail \r\n");
return -1;
}
memset(&msg_sinfo, 0, sizeof(msg_sinfo));
iret_val = msgctl(msgid, IPC_STAT, &msg_sinfo);
if(iret_val == STATUS_FAIL)
{
msgctl(msgid, IPC_RMID, NULL); // 删除消息队列
printf("\r\n\t msgctl(...) : IPC_STAT fail \r\n");
return -1;
}
msg_sinfo.msg_qbytes = dmsg_queue_size;
iret_val = msgctl(msgid, IPC_SET, &msg_sinfo); // 设置消息头参数——要以 root 用户身份运行才能成功
if(iret_val == STATUS_FAIL)
{
msgctl(msgid, IPC_RMID, NULL); // 删除消息队列
printf("\r\n\t msgctl(...) : IPC_SET fail \r\n");
return -1;
}
pMsq->hReadPipe = msgid;
pthread_mutex_lock(&hMutex);
SListAppend(&MsgqList, pMsq);
pthread_mutex_unlock(&hMutex);
return (mqd_t)pMsq;
}
int mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio)
{
MSQ *pMsq = (MSQ *)mqdes;
TMsg_Buffer tmsg_buffer;
int iret_val = 0;
assert(mqdes);
/* Check message size validity */
if (msg_len != (unsigned int)pMsq->Attri.mq_msgsize || msg_len > 4096)
{
assert(0);
return -1;
}
memset(&tmsg_buffer, 0, sizeof(tmsg_buffer));
iret_val = msgrcv(pMsq->hReadPipe, &tmsg_buffer, (msg_len + 4), 0, 0); // 阻塞式收消息
if(iret_val > 0)
{
memcpy(msg_ptr, tmsg_buffer.pbMsg, iret_val);
return iret_val;
}
printf("\nFail to Receive msg!");
return -1;
}
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio)
{
MSQ *pMsq = (MSQ *)mqdes;
TMsg_Buffer tmsg_buffer;
int iret_val = 0;
assert(mqdes);
/* Check message size validity */
if (msg_len != (unsigned int)pMsq->Attri.mq_msgsize || msg_len > 4096)
{
assert(0);
return -1;
}
memset(&tmsg_buffer, 0, sizeof(tmsg_buffer));
memcpy(tmsg_buffer.pbMsg, msg_ptr, msg_len);
iret_val = msgsnd(pMsq->hWritePipe, &tmsg_buffer, (msg_len + 4), 0); // 阻塞式发消息
if(iret_val == (msg_len + 4))
{
return 0;
}
printf("\nFail to send msg!");
return -1;
}
int mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
unsigned *msg_prio, const struct timespec *abs_timeout)
{
return -1;
}
int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
unsigned msg_prio, const struct timespec *abs_timeout)
{
return mq_send(mqdes, msg_ptr, msg_len, msg_prio);
}
int mq_unlink(const char *name)
{
//printf("Call mq_unlink!\n");
return 0;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -