queuelock.c

来自「用c/c++实现的一个CMPP API」· C语言 代码 · 共 345 行

C
345
字号
/***************************************************************************  Copyright    : 2001-2002, ASPIRE TECHNOLOGIES (SHENZHEN) LTD.  Program ID   : queuelock.c  Description  : 队列函数实现文件 实现互斥  Version      : NEWCMPPAPI  Functions    : 内部函数  Modification Log:       DATE         AUTHOR          DESCRIPTION --------------------------------------------------------------------------       11/28/2002   wenyz     Create       ***************************************************************************/ #include "queuelock.h"      // queuelock 的原型函数#include "event.h"          // event 的原型函数#include "cmppmutex.h"      // mutex 的原型函数#include "log.h"//#define ErrMsg Trace/**描述: 初始化队列  ( 可以应用于不固定长度的包 ) *输入参数: nKey 队列的KEY值,nSize 每个包的长度,            nCount 最大包个数 ( 可以应用于不固定长度的包 )            bBlock 是否BLOCK* 输出参数:pQFd QUEUE的句柄*返回值:   QUEUE_OK 成功, *           QUEUE_FAIL 失败*/int nLockInitAQueue(  int nKey, int nSize, int nCount,                        EBool bBlock, recBufFd* pQFd ){    int nID; // temp use , for old version    void* ptr;    int nRet;   // return value        if (  nSize <= 0 || nCount <= 0 || pQFd == NULL)    return QUEUE_FAIL;            /*    * init queue    */    ptr = pvInitAQueue_Api(   nKey,  nSize,  nCount, &nID );    if( ptr == NULL )    return QUEUE_FAIL;        pQFd->pvBufPtr = ptr;        /*    * init mutex     */    nRet = nMutexInit(   &(pQFd->BufLock) );    if( nRet != 0 )    return QUEUE_FAIL;        /*    * init event    */    nRet = nInitEvent( &(pQFd->BufEvent) );    if( nRet != 0 )    return QUEUE_FAIL;        /*    * set block flag;    */    pQFd->bBlock = bBlock;        return QUEUE_OK;    }/**描述:写包进入QUEUE  ( 可以应用于不固定长度的包 ) *输入参数: QFd QUEUE的句柄, pAddr(包的首地址), nSize(包大小)*输出参数: 无*返回值: QUEUE_OK 成功 *       QUEUE_FAIL 失败     *       QUEUE_FULL 空间已满*/int nLockWriteQueue(  recBufFd* pQFd, void* pAddr, int nSize ){    int nRet;    int nWriteRet;        nRet = nMutexLock( &(pQFd->BufLock) );    if( nRet != 0 )    {        return QUEUE_FAIL;    }        nWriteRet = nWriteQueue_Api( pQFd->pvBufPtr, pAddr, nSize );            nRet = nMutexUnlock( &(pQFd->BufLock) );    if( nRet != 0 )    {        return QUEUE_FAIL;    }        /*    *blocked state, to wake up recv thread     *send a event    */    if( pQFd->bBlock == TRUE &&         nWriteRet == QUEUE_OK )        {        nRet = nSetEvent( &pQFd->BufEvent );        if ( nRet != 0 )        {            ErrMsg( "nRet = %d, when nSetEvent\n", nRet );            return QUEUE_FAIL;        }                    }    return nWriteRet;}/**描述:从QUEUE取包*输入参数: QFd QUEUE的句柄,pAddr 包的首地址,nSize 包大小               int nTimeOut    超时时间(秒)              (队列为BLOCK时生效)                  <=0 阻塞的等待                >0 超时的秒数*返回值: QUEUE_OK 成功 *       QUEUE_FAIL 失败     *       QUEUE_EMPTY没有包*/int nLockReadQueue( recBufFd* pQFd, void* pAddr, int nSize, int nTimeOut ){    int nRet;    int nReadRet;	if( pQFd == NULL ||  pAddr == NULL  ||		nSize <= 0 || nTimeOut <= 0 )	{		return QUEUE_FAIL;	}AGAIN:        nRet = nMutexLock( &(pQFd->BufLock) );    if( nRet != 0 )    {        return QUEUE_FAIL;    }        nReadRet = nReadQueue_Api( pQFd->pvBufPtr, pAddr, nSize );        nRet = nMutexUnlock( &(pQFd->BufLock) );    if( nRet != 0 )    {        return QUEUE_FAIL;    }        if( nReadRet == QUEUE_EMPTY )    {        if( pQFd->bBlock != TRUE )        {            return nReadRet;            }        else        {            nRet = nTimedWaitEvent( &pQFd->BufEvent, nTimeOut );            if( nRet == 1 ) // time out             {                return nReadRet;            }            if( nRet == -1 ) //error            {                ErrMsg( "nRet = %d, when nWaitEvent\n", nRet );            }            goto AGAIN;        }    }        return nReadRet;}/**描述:复位该QUEUE*输入参数: QFd QUEUE的句柄*返回值: QUEUE_OK 成功 *       QUEUE_FAIL 失败     *   */int nLockResetQueue( recBufFd* pQFd ){    int nRet;     int nReset;        nRet = nMutexLock( &(pQFd->BufLock) );    if( nRet != 0 )    {        return QUEUE_FAIL;    }        nReset = nResetQueue_Api( pQFd->pvBufPtr );            nRet = nMutexUnlock( &(pQFd->BufLock) );    if( nRet != 0 )    {        return QUEUE_FAIL;    }        return nReset;}/**描述:删除该QUEUE *输入参数: QFd QUEUE的句柄*返回值: QUEUE_OK 成功 *       QUEUE_FAIL 失败     */int nLockRemoveQueue( recBufFd* pQFd ){        //nSetEvent( &pQFd->BufEvent );    nMutexDestroy( &(pQFd->BufLock) );    nDestroyEvent( &pQFd->BufEvent );    nRemoveShm_Api( (int)(pQFd->pvBufPtr) );        return QUEUE_OK;}/**描述:从QUEUE取包*输入参数: QFd QUEUE的句柄*输出参数: pnLoad( 负载系数 0-100 )pnCellNum 当前的包的个数*返回值: QUEUE_OK 成功 *       QUEUE_FAIL 失败     */int nLockGetQLoad( recBufFd* pQFd, int* pnLoad, int* pnCellNum ){    return nGetQLoad_Api( pQFd->pvBufPtr, pnLoad, pnCellNum );}/**描述:调试使用函数*/void    vLockDumpQueue( FILE* fp, recBufFd* pQFd ){    vDumpQueue_Api( fp, pQFd->pvBufPtr );    return ;}/** int* pnLoad 用于输出负载系数,以百分比为单位*/void    vLockDumpQLink( FILE* fp, recBufFd* pQFd, int* pnLoad ){    vDumpQLink_Api( fp,  pQFd->pvBufPtr, pnLoad );    return ;}/*---------------------self test code ------------*///#define   SELFTEST#ifdef  SELFTEST#define DPrintf printfpthread_t tid1, tid2;void	*thread1( void* ), *thread2( void* );static  recBufFd g_BufFd;int main( int argc, char** argv ){	int	nRet;	EBool bBlock = TRUE ;	nRet = nLockInitAQueue(  0, 10, 10,  bBlock, &g_BufFd );	DPrintf( "nRet = %d, after nMemHTInit ", nRet  );		nRet = pthread_create( &tid1, NULL, thread1, NULL );	DPrintf( "nRet = %d, after creat ", nRet  );	pthread_join( tid1, NULL );		pthread_create( &tid2, NULL, thread2, NULL );	pthread_join( tid2, NULL );	DPrintf( "nRet = %d, after creat ", nRet  );		return 0;	}void *thread1( void* arg ){	int	nRet;	int	i;	char sKey[10];	int nCellNum;	int nLoad;		memset( sKey, 0, 10 );	strcpy( sKey, "012345" );				for( i=0; i<1; i++ )	{		nRet = nLockWriteQueue( g_BufFd, sKey, 10 );		DPrintf( "nRet = %d\n", nRet );	}		nRet = 	nLockGetQLoad( g_BufFd, &nLoad, &nCellNum );	DPrintf( "nRet = %d,nCellNum = %d, nLoad = %d", 	          nRet, nCellNum, nLoad );				return ( NULL );}void *thread2( void* arg ){	int	nRet;	char sKey[10];	char sPck[20];		memset( sKey, 0, 10 );	//strcpy( sKey, "012345" );		memset( sPck, 0, 20 );	strcpy( sPck, "99999999999" );		sleep( 1 );	nRet = nLockReadQueue( g_BufFd, sKey, 10 );	DPrintf( "nRet = %d\n", nRet );	DPrintf( "sKey=%s", sKey );				return ( NULL );}#endif/*---------------------end of self test-----------*/

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?