⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 osapi_message.c

📁 这是DVD中伺服部分的核心代码
💻 C
📖 第 1 页 / 共 2 页
字号:
/*****************************************************************************
******************************************************************************
**                                                                          **
**  Copyright (c) 2006 Videon Central, Inc.                                 **
**  All rights reserved.                                                    **
**                                                                          **
**  The computer program contained herein contains proprietary information  **
**  which is the property of Videon Central, Inc.  The program may be used  **
**  and/or copied only with the written permission of Videon Central, Inc.  **
**  or in accordance with the terms and conditions stipulated in the        **
**  agreement/contract under which the programs have been supplied.         **
**                                                                          **
******************************************************************************
*****************************************************************************/
/**
 * @file osapi_message.cpp
 *
 * $Revision: 1.7 $ 
 *
 * Operating System API (OSAPI) source file. Provides an abstraction from the
 * operating system.
 *
 * This is the implementation for win32
 *
 */


#include "osapi.h"
#include "DbgPrint.h"
#include "string.h"

/* MACROS AND DEFINES */
#define DBG_OSAPI DBG_ERROR
#define DBG_ON(x) (DBG_OSAPI >= x)

#define OS_MSG_Q_HEAD 0
#define OS_MSG_Q_TAIL 1



/*
 *  Message Box record
 */
typedef struct tagMessageBoxRecord          /* prefix: mbox */
{
    ULONG           ulDepth;                /* message queue depth */
    ULONG           ulSize;                 /* message size */
    ULONG           ulHead;
    ULONG           ulTail;
    BYTE*           pbBuffers;              /* buffer for messages */
    OS_SEM_ID       waitOnBufferFullSemaphore;    /* Signal waiting threads */
    OS_SEM_ID       waitOnBufferEmptySemaphore;   /* Signal waiting threads */
    OS_SEM_ID       mutex;                  /* Critical sections */
} OS_MBOX;



static OS_STATUS OS_MsgQReceiveOption(OS_MSG_Q_ID mqId, char *strMsg, ULONG ulSize, int iTimeout, int iOption);



/**
 * OS_MsgQCreate creates a message queue.
 *
 * @param int iDepth     - queue depth
 * @param int iMsgSize   - message size, all messages placed into this queue must be of this size.
 * @param int iOSOptions - operating system options (not used in this implementation)
 *
 * @retval
 *    OS_MSG_Q_ID mqId - pointer to the message box record, if successful
 *    0 - if not successful
 *
 * @verified
 *    Yes.
 */
OS_MSG_Q_ID OS_MsgQCreate(int iDepth, int iMsgSize, int iOSOptions)
{
    OS_MBOX *pmsg = NULL;

    (void)iOSOptions;

    /* Check the validity of the given message size */
    if (iMsgSize <= 0)
    {
        DbgPrint(("%s: Invalid message size (%d)!\n", __FUNCTION__, iMsgSize));
        goto error_handler;
    }

    /* Check the validity of the given depth */
    if (iDepth <= 0)
    {
        DbgPrint(("%s: Invalid queue depth (%d)!\n", __FUNCTION__, iDepth));
        goto error_handler;
    }

    /* Allocate Record */
    pmsg = (OS_MBOX *)OS_MemAlloc(sizeof(OS_MBOX));
    if (NULL == pmsg)
    {
        goto error_handler;
    }

    /* Allocate Buffers */
    pmsg->pbBuffers = (BYTE *)OS_MemAlloc(iMsgSize * (iDepth + 1));
    if (NULL == pmsg->pbBuffers)
    {
        goto error_handler;
    }

    /* Setup Head and Tail */
    /* If Head == Tail Buffers are empty */
    pmsg->ulHead  = 0;
    pmsg->ulTail  = 0;
    pmsg->ulDepth = (ULONG)iDepth;
    pmsg->ulSize  = (ULONG)iMsgSize;

    /* Create the binary semaphore */
    pmsg->waitOnBufferEmptySemaphore = OS_SemCCreate( OS_SEM_Q_FIFO, OS_SEM_EMPTY );
    if ( pmsg->waitOnBufferEmptySemaphore == 0)
    {
        DbgPrint(("OS_MsgQCreate: Could not init condWait"));
        goto error_handler;
    }

    pmsg->waitOnBufferFullSemaphore = OS_SemCCreate( OS_SEM_Q_FIFO, pmsg->ulDepth );
    if ( pmsg->waitOnBufferFullSemaphore == 0)
    {
        DbgPrint(("OS_MsgQCreate: Could not init condWait"));
        goto error_handler;
    }

    pmsg->mutex = OS_SemBCreate( OS_SEM_Q_FIFO, OS_SEM_FULL );
    if ( pmsg->mutex == 0)
    {
        DbgPrint(("OS_MsgQCreate: Could not init mutex"));
        goto error_handler;
    }

    return ((OS_MSG_Q_ID)pmsg);

error_handler:
    if (NULL != pmsg)
    {
        if (NULL != pmsg->pbBuffers)
        {
            OS_MemFree(pmsg->pbBuffers);
        }

        OS_MemFree(pmsg);
    }

    return (0);
}

/**
 * OS Message Queue Delete function.
 *
 * @param
 *    OS_MSG_Q_ID mqId - message queue identifier
 *
 * @retval
 *    Returns OS_OK.
 *    This function is always successful.
 *
 * @remark
 *    None.
 *
 * @verified
 *    Yes.
 */
OS_STATUS OS_MsgQDelete(OS_MSG_Q_ID mqId)
{
    OS_MBOX   *pmsg  = (OS_MBOX *)mqId;
    OS_STATUS   status;

    if (mqId == 0)
    {
        DbgPrint(("%s: BAD POINTER\n", __FUNCTION__));
        return (OS_FAILURE);
    }

    /* take the mutex to protect during cond destroy and to make sure noone else is holding the mutex */
    
    status = OS_SemTake(pmsg->mutex, OS_WAIT_FOREVER);
    if ( status != OS_OK ) 
    {
        DbgPrint(("OS_MsgQDelete: OS_SemTake failed\n"));
    }

    /* Destroy the wait conditions */
    status = OS_SemDelete(pmsg->waitOnBufferEmptySemaphore);
    if ( status != OS_OK ) 
    {
        DbgPrint(("OS_MsgQDelete: Could not destroy condWait\n"));
    }
    pmsg->waitOnBufferEmptySemaphore = 0;

    status = OS_SemDelete(pmsg->waitOnBufferFullSemaphore);
    if ( status != OS_OK ) 
    {
        DbgPrint(("OS_MsgQDelete: Could not destroy condWait\n"));
    }
    pmsg->waitOnBufferFullSemaphore = 0;

    status = OS_SemGive(pmsg->mutex);
    if ( status != OS_OK ) 
    {
        DbgPrint(("OS_MsgQDelete: OS_SemGive failed\n"));
    }

    status = OS_SemDelete(pmsg->mutex);
    if ( status != OS_OK ) 
    {
        DbgPrint(("OS_MsgQDelete: OS_SemDelete failed\n"));
    }


    /* DeaAllocate Buffers */
    OS_MemFree(pmsg->pbBuffers);

    /* Deallocate Record */
    OS_MemFree(pmsg);

    /* Return success */
    return (OS_OK);
}

/**
 * OS Message Queue Send function.
 *
 * @param OS_MSG_Q_ID mqId - message queue identifier
 * @param char *strMsg     - message pointer
 * @param OS_UINT uiSize   - message size
 * @param int iTimeout     - send timeout value, in ticks. (-1 = wait forever, 0 = no wait, > 1 = wait)
 * @param int iPrio        - message priority (not used)
 *
 * @retval
 *    OS_OK if successful
 *    OS_FAILURE if not successful
 *    OS_TIMEOUT if a timeout occurred
 *
 * @verified
 *    Yes.
 */
OS_STATUS OS_MsgQSend(OS_MSG_Q_ID mqId, char *strMsg, ULONG ulSize, int iTimeout, int iPrio)
{
    OS_STATUS iRet  = OS_OK;
    OS_MBOX   *pmsg = (OS_MBOX *)mqId;
    BOOLEAN        waitedOnFullSemaphore = FALSE;

    (void)iPrio;

    /* Check the validity of message queue */
    if (mqId == 0)
    {
        DbgPrint(("%s: BAD POINTER!\n", __FUNCTION__));
        goto error_handler;
    }

    /* Check the validity of message */
    if (strMsg == 0)
    {
        DbgPrint(("%s: BAD POINTER!\n", __FUNCTION__));
        goto error_handler;
    }

    if (ulSize != pmsg->ulSize)
    {
        DbgPrint(("%s:%u - Error here\n", __FILE__, __LINE__));
        goto error_handler;
    }


    iRet = OS_SemTake(pmsg->mutex, OS_WAIT_FOREVER);
    if ( iRet != OS_OK ) 
    {
        DbgPrint(("%s: Failed to obtain lock\n",__FUNCTION__));
        goto error_handler;
    }


    /* If no space, wait */
    while ( pmsg->ulHead == ( (pmsg->ulTail + 1) % (pmsg->ulDepth + 1) ) )
    {
        iRet = OS_SemGive(pmsg->mutex);
        if ( iRet != OS_OK ) 
        {
            DbgPrint(("%s: Failed to release lock\n",__FUNCTION__));
            goto error_handler;
        }

        iRet = OS_SemTake( pmsg->waitOnBufferFullSemaphore, iTimeout );
        if ( iRet == OS_TIMEOUT )
        {
            return ( iRet );
        }

        if ( iRet != OS_OK ) 
        {
            DbgPrint(("OS_MsgQSend: pthread_cond_wait ERROR\n"));
            goto error_handler;
        }

        waitedOnFullSemaphore = TRUE;
        iRet = OS_SemTake(pmsg->mutex, OS_WAIT_FOREVER);
        if ( iRet != OS_OK ) 
        {
            DbgPrint(("%s: Failed to obtain lock\n",__FUNCTION__));
            goto error_handler;
        }


        if (( pmsg->ulHead == ( (pmsg->ulTail + 1) % (pmsg->ulDepth + 1) ) ) && ( iTimeout != OS_WAIT_FOREVER ))
        {
            // If we waited for the message queue to have room, but somebody filled it up again
            // before we could add our message, and our timeout was not forever, give up with timeout
            // (otherwise would have to subtract time waited previously and only wait remaining timeout time)
            iRet = OS_TIMEOUT;
            break;
        }
    }

    /* If we did not time out put it in the queue */
    if (OS_OK == iRet)
    {
        /* Put it in the queue */
        memcpy(pmsg->pbBuffers + (pmsg->ulTail * pmsg->ulSize ), strMsg, pmsg->ulSize);
        pmsg->ulTail = ( (pmsg->ulTail + 1) % (pmsg->ulDepth + 1) );

        // buffer empty semaphore count++
        iRet = OS_SemGive(pmsg->waitOnBufferEmptySemaphore);
        if ( iRet != OS_OK ) 
        {
            DbgPrint(("%s: Broadcast Failed\n",__FUNCTION__));
        }

        // buffer full semaphore count--
        if ( waitedOnFullSemaphore == FALSE )
        {
            iRet = OS_SemTake(pmsg->waitOnBufferFullSemaphore,OS_WAIT_FOREVER);
            if ( iRet != OS_OK ) 
            {
                DbgPrint(("%s: Broadcast Failed\n",__FUNCTION__));
            }
        }
    }

    /* Release mutex */
    iRet = OS_SemGive(pmsg->mutex);
    if ( iRet != OS_OK ) 
    {
        DbgPrint(("%s: Failed to release lock\n",__FUNCTION__));
        goto error_handler;
    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -