📄 osapi_message.c
字号:
/*****************************************************************************
******************************************************************************
** **
** Copyright (c) 2006 Videon Central, Inc. **
** All rights reserved. **
** **
** The computer program contained herein contains proprietary information **
** which is the property of Videon Central, Inc. The program may be used **
** and/or copied only with the written permission of Videon Central, Inc. **
** or in accordance with the terms and conditions stipulated in the **
** agreement/contract under which the programs have been supplied. **
** **
******************************************************************************
*****************************************************************************/
/**
* @file osapi_message.cpp
*
* $Revision: 1.7 $
*
* Operating System API (OSAPI) source file. Provides an abstraction from the
* operating system.
*
* This is the implementation for win32
*
*/
#include "osapi.h"
#include "DbgPrint.h"
#include "string.h"
/* MACROS AND DEFINES */
#define DBG_OSAPI DBG_ERROR
#define DBG_ON(x) (DBG_OSAPI >= x)
#define OS_MSG_Q_HEAD 0
#define OS_MSG_Q_TAIL 1
/*
* Message Box record
*/
typedef struct tagMessageBoxRecord /* prefix: mbox */
{
ULONG ulDepth; /* message queue depth */
ULONG ulSize; /* message size */
ULONG ulHead;
ULONG ulTail;
BYTE* pbBuffers; /* buffer for messages */
OS_SEM_ID waitOnBufferFullSemaphore; /* Signal waiting threads */
OS_SEM_ID waitOnBufferEmptySemaphore; /* Signal waiting threads */
OS_SEM_ID mutex; /* Critical sections */
} OS_MBOX;
static OS_STATUS OS_MsgQReceiveOption(OS_MSG_Q_ID mqId, char *strMsg, ULONG ulSize, int iTimeout, int iOption);
/**
* OS_MsgQCreate creates a message queue.
*
* @param int iDepth - queue depth
* @param int iMsgSize - message size, all messages placed into this queue must be of this size.
* @param int iOSOptions - operating system options (not used in this implementation)
*
* @retval
* OS_MSG_Q_ID mqId - pointer to the message box record, if successful
* 0 - if not successful
*
* @verified
* Yes.
*/
OS_MSG_Q_ID OS_MsgQCreate(int iDepth, int iMsgSize, int iOSOptions)
{
OS_MBOX *pmsg = NULL;
(void)iOSOptions;
/* Check the validity of the given message size */
if (iMsgSize <= 0)
{
DbgPrint(("%s: Invalid message size (%d)!\n", __FUNCTION__, iMsgSize));
goto error_handler;
}
/* Check the validity of the given depth */
if (iDepth <= 0)
{
DbgPrint(("%s: Invalid queue depth (%d)!\n", __FUNCTION__, iDepth));
goto error_handler;
}
/* Allocate Record */
pmsg = (OS_MBOX *)OS_MemAlloc(sizeof(OS_MBOX));
if (NULL == pmsg)
{
goto error_handler;
}
/* Allocate Buffers */
pmsg->pbBuffers = (BYTE *)OS_MemAlloc(iMsgSize * (iDepth + 1));
if (NULL == pmsg->pbBuffers)
{
goto error_handler;
}
/* Setup Head and Tail */
/* If Head == Tail Buffers are empty */
pmsg->ulHead = 0;
pmsg->ulTail = 0;
pmsg->ulDepth = (ULONG)iDepth;
pmsg->ulSize = (ULONG)iMsgSize;
/* Create the binary semaphore */
pmsg->waitOnBufferEmptySemaphore = OS_SemCCreate( OS_SEM_Q_FIFO, OS_SEM_EMPTY );
if ( pmsg->waitOnBufferEmptySemaphore == 0)
{
DbgPrint(("OS_MsgQCreate: Could not init condWait"));
goto error_handler;
}
pmsg->waitOnBufferFullSemaphore = OS_SemCCreate( OS_SEM_Q_FIFO, pmsg->ulDepth );
if ( pmsg->waitOnBufferFullSemaphore == 0)
{
DbgPrint(("OS_MsgQCreate: Could not init condWait"));
goto error_handler;
}
pmsg->mutex = OS_SemBCreate( OS_SEM_Q_FIFO, OS_SEM_FULL );
if ( pmsg->mutex == 0)
{
DbgPrint(("OS_MsgQCreate: Could not init mutex"));
goto error_handler;
}
return ((OS_MSG_Q_ID)pmsg);
error_handler:
if (NULL != pmsg)
{
if (NULL != pmsg->pbBuffers)
{
OS_MemFree(pmsg->pbBuffers);
}
OS_MemFree(pmsg);
}
return (0);
}
/**
* OS Message Queue Delete function.
*
* @param
* OS_MSG_Q_ID mqId - message queue identifier
*
* @retval
* Returns OS_OK.
* This function is always successful.
*
* @remark
* None.
*
* @verified
* Yes.
*/
OS_STATUS OS_MsgQDelete(OS_MSG_Q_ID mqId)
{
OS_MBOX *pmsg = (OS_MBOX *)mqId;
OS_STATUS status;
if (mqId == 0)
{
DbgPrint(("%s: BAD POINTER\n", __FUNCTION__));
return (OS_FAILURE);
}
/* take the mutex to protect during cond destroy and to make sure noone else is holding the mutex */
status = OS_SemTake(pmsg->mutex, OS_WAIT_FOREVER);
if ( status != OS_OK )
{
DbgPrint(("OS_MsgQDelete: OS_SemTake failed\n"));
}
/* Destroy the wait conditions */
status = OS_SemDelete(pmsg->waitOnBufferEmptySemaphore);
if ( status != OS_OK )
{
DbgPrint(("OS_MsgQDelete: Could not destroy condWait\n"));
}
pmsg->waitOnBufferEmptySemaphore = 0;
status = OS_SemDelete(pmsg->waitOnBufferFullSemaphore);
if ( status != OS_OK )
{
DbgPrint(("OS_MsgQDelete: Could not destroy condWait\n"));
}
pmsg->waitOnBufferFullSemaphore = 0;
status = OS_SemGive(pmsg->mutex);
if ( status != OS_OK )
{
DbgPrint(("OS_MsgQDelete: OS_SemGive failed\n"));
}
status = OS_SemDelete(pmsg->mutex);
if ( status != OS_OK )
{
DbgPrint(("OS_MsgQDelete: OS_SemDelete failed\n"));
}
/* DeaAllocate Buffers */
OS_MemFree(pmsg->pbBuffers);
/* Deallocate Record */
OS_MemFree(pmsg);
/* Return success */
return (OS_OK);
}
/**
* OS Message Queue Send function.
*
* @param OS_MSG_Q_ID mqId - message queue identifier
* @param char *strMsg - message pointer
* @param OS_UINT uiSize - message size
* @param int iTimeout - send timeout value, in ticks. (-1 = wait forever, 0 = no wait, > 1 = wait)
* @param int iPrio - message priority (not used)
*
* @retval
* OS_OK if successful
* OS_FAILURE if not successful
* OS_TIMEOUT if a timeout occurred
*
* @verified
* Yes.
*/
OS_STATUS OS_MsgQSend(OS_MSG_Q_ID mqId, char *strMsg, ULONG ulSize, int iTimeout, int iPrio)
{
OS_STATUS iRet = OS_OK;
OS_MBOX *pmsg = (OS_MBOX *)mqId;
BOOLEAN waitedOnFullSemaphore = FALSE;
(void)iPrio;
/* Check the validity of message queue */
if (mqId == 0)
{
DbgPrint(("%s: BAD POINTER!\n", __FUNCTION__));
goto error_handler;
}
/* Check the validity of message */
if (strMsg == 0)
{
DbgPrint(("%s: BAD POINTER!\n", __FUNCTION__));
goto error_handler;
}
if (ulSize != pmsg->ulSize)
{
DbgPrint(("%s:%u - Error here\n", __FILE__, __LINE__));
goto error_handler;
}
iRet = OS_SemTake(pmsg->mutex, OS_WAIT_FOREVER);
if ( iRet != OS_OK )
{
DbgPrint(("%s: Failed to obtain lock\n",__FUNCTION__));
goto error_handler;
}
/* If no space, wait */
while ( pmsg->ulHead == ( (pmsg->ulTail + 1) % (pmsg->ulDepth + 1) ) )
{
iRet = OS_SemGive(pmsg->mutex);
if ( iRet != OS_OK )
{
DbgPrint(("%s: Failed to release lock\n",__FUNCTION__));
goto error_handler;
}
iRet = OS_SemTake( pmsg->waitOnBufferFullSemaphore, iTimeout );
if ( iRet == OS_TIMEOUT )
{
return ( iRet );
}
if ( iRet != OS_OK )
{
DbgPrint(("OS_MsgQSend: pthread_cond_wait ERROR\n"));
goto error_handler;
}
waitedOnFullSemaphore = TRUE;
iRet = OS_SemTake(pmsg->mutex, OS_WAIT_FOREVER);
if ( iRet != OS_OK )
{
DbgPrint(("%s: Failed to obtain lock\n",__FUNCTION__));
goto error_handler;
}
if (( pmsg->ulHead == ( (pmsg->ulTail + 1) % (pmsg->ulDepth + 1) ) ) && ( iTimeout != OS_WAIT_FOREVER ))
{
// If we waited for the message queue to have room, but somebody filled it up again
// before we could add our message, and our timeout was not forever, give up with timeout
// (otherwise would have to subtract time waited previously and only wait remaining timeout time)
iRet = OS_TIMEOUT;
break;
}
}
/* If we did not time out put it in the queue */
if (OS_OK == iRet)
{
/* Put it in the queue */
memcpy(pmsg->pbBuffers + (pmsg->ulTail * pmsg->ulSize ), strMsg, pmsg->ulSize);
pmsg->ulTail = ( (pmsg->ulTail + 1) % (pmsg->ulDepth + 1) );
// buffer empty semaphore count++
iRet = OS_SemGive(pmsg->waitOnBufferEmptySemaphore);
if ( iRet != OS_OK )
{
DbgPrint(("%s: Broadcast Failed\n",__FUNCTION__));
}
// buffer full semaphore count--
if ( waitedOnFullSemaphore == FALSE )
{
iRet = OS_SemTake(pmsg->waitOnBufferFullSemaphore,OS_WAIT_FOREVER);
if ( iRet != OS_OK )
{
DbgPrint(("%s: Broadcast Failed\n",__FUNCTION__));
}
}
}
/* Release mutex */
iRet = OS_SemGive(pmsg->mutex);
if ( iRet != OS_OK )
{
DbgPrint(("%s: Failed to release lock\n",__FUNCTION__));
goto error_handler;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -