📄 mqueue.c
字号:
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <string.h>
#include <pthread.h>
#include <assert.h>
#include <unistd.h>
#include <sys/types.h>
#include "list.h"
#include "typedef.h"
#include "mqueue.h"
static SLIST MsgqList;
typedef int HANDLE;
static pthread_mutex_t hMutex = PTHREAD_MUTEX_INITIALIZER;
typedef struct MSQ_tag
{
union
{
struct
{
HANDLE hReadPipe;
HANDLE hWritePipe;
};
int fd_pipe[2];
};
struct mq_attr Attri;
char Name[256];
} MSQ;
int mq_close(mqd_t mqdes)
{
MSQ *pMsq = (MSQ*)mqdes;
assert(mqdes);
pthread_mutex_lock(&hMutex);
SListDelNodeByData(&MsgqList, pMsq);
pthread_mutex_unlock(&hMutex);
close(pMsq->hReadPipe);
close(pMsq->hWritePipe);
free(pMsq);
if (MsgqList.count <= 0)
{
pthread_mutex_destroy(&hMutex);
}
return 0;
}
mqd_t mq_open(const char *name, int oflag, ...)
{
MSQ *pMsq = NULL;
int itemp = 0;
va_list vaCreateArgs;
struct mq_attr *pAttr;
DWORD dmsg_queue_size = 0;
SListReset(&MsgqList);
while ((pMsq = SListGetCurData(&MsgqList)) != NULL)
{
if (strcmp(pMsq->Name, name) == 0)
{
return (mqd_t)pMsq;
}
SListNextNode(&MsgqList);
}
pMsq = calloc(1, sizeof(MSQ));
if (pMsq == NULL)
{
return (mqd_t)NULL;
}
// memset(pMsq, 0, sizeof(MSQ));
strcpy(pMsq->Name, name);
va_start(vaCreateArgs, oflag);
/* get the mode argument, ignore it */
itemp = va_arg(vaCreateArgs, int);
itemp++;
/* get a pointer to an mq_attr structure */
pAttr = va_arg(vaCreateArgs, struct mq_attr*);
va_end(vaCreateArgs);
memcpy(&pMsq->Attri, pAttr, sizeof(struct mq_attr));
dmsg_queue_size = pMsq->Attri.mq_msgsize * pMsq->Attri.mq_maxmsg;
if (pipe(pMsq->fd_pipe) < 0)
{
printf("\r\n pipe error \r\n");
return -1;
}
pthread_mutex_lock(&hMutex);
SListAppend(&MsgqList, pMsq);
pthread_mutex_unlock(&hMutex);
return (mqd_t)pMsq;
}
int mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio)
{
MSQ *pMsq = (MSQ *)mqdes;
assert(mqdes);
/* Check message size validity */
if (msg_len != (unsigned int)pMsq->Attri.mq_msgsize || msg_len > 4096)
{
assert(0);
return -1;
}
if (read(pMsq->hReadPipe, msg_ptr, msg_len) == msg_len)
{
return msg_len;
}
printf("\r\n Fail to Receive msg! \r\n");
return -1;
}
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio)
{
MSQ *pMsq = (MSQ *)mqdes;
assert(mqdes);
/* Check message size validity */
if (msg_len != (unsigned int)pMsq->Attri.mq_msgsize || msg_len > 4096)
{
assert(0);
return -1;
}
if (write(pMsq->hWritePipe, msg_ptr, msg_len) == msg_len)
{
return 0;
}
printf("\r\n Fail to send msg! \r\n");
return -1;
}
int mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
unsigned *msg_prio, const struct timespec *abs_timeout)
{
return -1;
}
int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
unsigned msg_prio, const struct timespec *abs_timeout)
{
return mq_send(mqdes, msg_ptr, msg_len, msg_prio);
}
int mq_unlink(const char *name)
{
//printf("Call mq_unlink!\n");
return 0;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -