queuelock.c
来自「用c/c++实现的一个CMPP API」· C语言 代码 · 共 345 行
C
345 行
/*************************************************************************** Copyright : 2001-2002, ASPIRE TECHNOLOGIES (SHENZHEN) LTD. Program ID : queuelock.c Description : 队列函数实现文件 实现互斥 Version : NEWCMPPAPI Functions : 内部函数 Modification Log: DATE AUTHOR DESCRIPTION -------------------------------------------------------------------------- 11/28/2002 wenyz Create ***************************************************************************/ #include "queuelock.h" // queuelock 的原型函数#include "event.h" // event 的原型函数#include "cmppmutex.h" // mutex 的原型函数#include "log.h"//#define ErrMsg Trace/**描述: 初始化队列 ( 可以应用于不固定长度的包 ) *输入参数: nKey 队列的KEY值,nSize 每个包的长度, nCount 最大包个数 ( 可以应用于不固定长度的包 ) bBlock 是否BLOCK* 输出参数:pQFd QUEUE的句柄*返回值: QUEUE_OK 成功, * QUEUE_FAIL 失败*/int nLockInitAQueue( int nKey, int nSize, int nCount, EBool bBlock, recBufFd* pQFd ){ int nID; // temp use , for old version void* ptr; int nRet; // return value if ( nSize <= 0 || nCount <= 0 || pQFd == NULL) return QUEUE_FAIL; /* * init queue */ ptr = pvInitAQueue_Api( nKey, nSize, nCount, &nID ); if( ptr == NULL ) return QUEUE_FAIL; pQFd->pvBufPtr = ptr; /* * init mutex */ nRet = nMutexInit( &(pQFd->BufLock) ); if( nRet != 0 ) return QUEUE_FAIL; /* * init event */ nRet = nInitEvent( &(pQFd->BufEvent) ); if( nRet != 0 ) return QUEUE_FAIL; /* * set block flag; */ pQFd->bBlock = bBlock; return QUEUE_OK; }/**描述:写包进入QUEUE ( 可以应用于不固定长度的包 ) *输入参数: QFd QUEUE的句柄, pAddr(包的首地址), nSize(包大小)*输出参数: 无*返回值: QUEUE_OK 成功 * QUEUE_FAIL 失败 * QUEUE_FULL 空间已满*/int nLockWriteQueue( recBufFd* pQFd, void* pAddr, int nSize ){ int nRet; int nWriteRet; nRet = nMutexLock( &(pQFd->BufLock) ); if( nRet != 0 ) { return QUEUE_FAIL; } nWriteRet = nWriteQueue_Api( pQFd->pvBufPtr, pAddr, nSize ); nRet = nMutexUnlock( &(pQFd->BufLock) ); if( nRet != 0 ) { return QUEUE_FAIL; } /* *blocked state, to wake up recv thread *send a event */ if( pQFd->bBlock == TRUE && nWriteRet == QUEUE_OK ) { nRet = nSetEvent( &pQFd->BufEvent ); if ( nRet != 0 ) { ErrMsg( "nRet = %d, when nSetEvent\n", nRet ); return QUEUE_FAIL; } } return nWriteRet;}/**描述:从QUEUE取包*输入参数: QFd QUEUE的句柄,pAddr 包的首地址,nSize 包大小 int nTimeOut 超时时间(秒) (队列为BLOCK时生效) <=0 阻塞的等待 >0 超时的秒数*返回值: QUEUE_OK 成功 * QUEUE_FAIL 失败 * QUEUE_EMPTY没有包*/int nLockReadQueue( recBufFd* pQFd, void* pAddr, int nSize, int nTimeOut ){ int nRet; int nReadRet; if( pQFd == NULL || pAddr == NULL || nSize <= 0 || nTimeOut <= 0 ) { return QUEUE_FAIL; }AGAIN: nRet = nMutexLock( &(pQFd->BufLock) ); if( nRet != 0 ) { return QUEUE_FAIL; } nReadRet = nReadQueue_Api( pQFd->pvBufPtr, pAddr, nSize ); nRet = nMutexUnlock( &(pQFd->BufLock) ); if( nRet != 0 ) { return QUEUE_FAIL; } if( nReadRet == QUEUE_EMPTY ) { if( pQFd->bBlock != TRUE ) { return nReadRet; } else { nRet = nTimedWaitEvent( &pQFd->BufEvent, nTimeOut ); if( nRet == 1 ) // time out { return nReadRet; } if( nRet == -1 ) //error { ErrMsg( "nRet = %d, when nWaitEvent\n", nRet ); } goto AGAIN; } } return nReadRet;}/**描述:复位该QUEUE*输入参数: QFd QUEUE的句柄*返回值: QUEUE_OK 成功 * QUEUE_FAIL 失败 * */int nLockResetQueue( recBufFd* pQFd ){ int nRet; int nReset; nRet = nMutexLock( &(pQFd->BufLock) ); if( nRet != 0 ) { return QUEUE_FAIL; } nReset = nResetQueue_Api( pQFd->pvBufPtr ); nRet = nMutexUnlock( &(pQFd->BufLock) ); if( nRet != 0 ) { return QUEUE_FAIL; } return nReset;}/**描述:删除该QUEUE *输入参数: QFd QUEUE的句柄*返回值: QUEUE_OK 成功 * QUEUE_FAIL 失败 */int nLockRemoveQueue( recBufFd* pQFd ){ //nSetEvent( &pQFd->BufEvent ); nMutexDestroy( &(pQFd->BufLock) ); nDestroyEvent( &pQFd->BufEvent ); nRemoveShm_Api( (int)(pQFd->pvBufPtr) ); return QUEUE_OK;}/**描述:从QUEUE取包*输入参数: QFd QUEUE的句柄*输出参数: pnLoad( 负载系数 0-100 )pnCellNum 当前的包的个数*返回值: QUEUE_OK 成功 * QUEUE_FAIL 失败 */int nLockGetQLoad( recBufFd* pQFd, int* pnLoad, int* pnCellNum ){ return nGetQLoad_Api( pQFd->pvBufPtr, pnLoad, pnCellNum );}/**描述:调试使用函数*/void vLockDumpQueue( FILE* fp, recBufFd* pQFd ){ vDumpQueue_Api( fp, pQFd->pvBufPtr ); return ;}/** int* pnLoad 用于输出负载系数,以百分比为单位*/void vLockDumpQLink( FILE* fp, recBufFd* pQFd, int* pnLoad ){ vDumpQLink_Api( fp, pQFd->pvBufPtr, pnLoad ); return ;}/*---------------------self test code ------------*///#define SELFTEST#ifdef SELFTEST#define DPrintf printfpthread_t tid1, tid2;void *thread1( void* ), *thread2( void* );static recBufFd g_BufFd;int main( int argc, char** argv ){ int nRet; EBool bBlock = TRUE ; nRet = nLockInitAQueue( 0, 10, 10, bBlock, &g_BufFd ); DPrintf( "nRet = %d, after nMemHTInit ", nRet ); nRet = pthread_create( &tid1, NULL, thread1, NULL ); DPrintf( "nRet = %d, after creat ", nRet ); pthread_join( tid1, NULL ); pthread_create( &tid2, NULL, thread2, NULL ); pthread_join( tid2, NULL ); DPrintf( "nRet = %d, after creat ", nRet ); return 0; }void *thread1( void* arg ){ int nRet; int i; char sKey[10]; int nCellNum; int nLoad; memset( sKey, 0, 10 ); strcpy( sKey, "012345" ); for( i=0; i<1; i++ ) { nRet = nLockWriteQueue( g_BufFd, sKey, 10 ); DPrintf( "nRet = %d\n", nRet ); } nRet = nLockGetQLoad( g_BufFd, &nLoad, &nCellNum ); DPrintf( "nRet = %d,nCellNum = %d, nLoad = %d", nRet, nCellNum, nLoad ); return ( NULL );}void *thread2( void* arg ){ int nRet; char sKey[10]; char sPck[20]; memset( sKey, 0, 10 ); //strcpy( sKey, "012345" ); memset( sPck, 0, 20 ); strcpy( sPck, "99999999999" ); sleep( 1 ); nRet = nLockReadQueue( g_BufFd, sKey, 10 ); DPrintf( "nRet = %d\n", nRet ); DPrintf( "sKey=%s", sKey ); return ( NULL );}#endif/*---------------------end of self test-----------*/
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?