queue.c
来自「用c/c++实现的一个CMPP API」· C语言 代码 · 共 776 行 · 第 1/2 页
C
776 行
/*************************************************************************** Copyright : 2001-2002, ASPIRE TECHNOLOGIES (SHENZHEN) LTD. Program ID : queue.c Description : 队列函数实现C文件 Version : NEWCMPPAPI Functions : 内部函数 Modification Log: DATE AUTHOR DESCRIPTION -------------------------------------------------------------------------- 11/25/2002 wenyz Create 11/28/2002 wenyz 修改nReadQueue返回值***************************************************************************/#include "queue.h"#define NOUSESHAREMEMORY /* 使用内存数据,而不是SHARED MEMORY */#ifndef NOUSESHAREMEMORY #include <sys/shm.h>#endif typedef struct tagMsgCell{ void* pPckAddr; /*包首地址*/ int nPckSize; /*包内容大小*/ }recMsgCell;#define MacQueueCellNum 65536 /*queue中最多的包数量*/typedef struct tagQCell{ int nInitFlag; /*是否初始化标志*/ int nMsgMaxNum; /*队列的包最多个数*/ int nHead; /*队列的头*/ int nTail; /*队列的尾*/ int nShmSize; /*申请的总共内容包内存大小*/ void* pSAddr; /*内容包的开始地址*/ void* pEAddr; /*内容包的结束地址*/ void* pCurSAddr; /*当前内容包的开始地址*/ void* pCurEAddr; /*当前内容包的结束地址*/ //recMsgCell r1Cell[MacQueueCellNum]; /* 包的首信息 */ recMsgCell** prCell; /* 包的首信息 */}recQCell;/**内部使用函数*//*static void vDumpQCell( FILE* fp, recQCell* pQLink, int nHead );*/static int nInitQLink( void* pAddr, int nSize, void* pSAddr, void* pEAddr, int nCount );static int nCheckQFull( const recQCell* pQLink, int nSize );static int nWriteFinish( recQCell* pQLink, int nSize, long lKey );static int nWriteCellPck( const recQCell* pQLink, void* pSrcAddr, int nSize );static int nCheckQCell( const recQCell* pQLink );static int nReadFinish( recQCell* pQLink, int nSize );static int nReadCellPck( const recQCell* pQLink, void* pDstAddr, int nSize );#ifndef NOUSESHAREMEMORY/**描述: 共享内存分配函数 *输入参数:nShmKey , 关键字 lShmSize 大小, *输出参数: pnShmId 输出共享内存ID,为NULL不输出*返回值: > 0 成功,* Null 失败 */ #define MacShmFlag 0666void* pShmMalloc( int nShmKey, long lShmSize, int* pnShmId ){ int nShmId; /*共享内存ID*/ void* pvShmAddr = NULL; /*共享内存首地址*/ struct shmid_ds StuShm_Ds; /*内存信息的数据结构*/ int nRet; nShmId = shmget( nShmKey, lShmSize, IPC_CREAT | MacShmFlag ); if ( nShmId < 0 ) { ErrRet( "Error When Shmget, nKey[x] = %x, Size[x] = %x\n", nShmKey, (int)lShmSize ); nShmId = shmget( nShmKey, 0, IPC_CREAT | MacShmFlag ); if ( nShmId < 0 ) { ErrRet( "Error When Shmget, nKey[x] = %x, Size[x] = %x\n", nShmKey, 0 ); return NULL; } } /*为防止版本不对,在这里对大小进行严格的匹配*/ nRet = shmctl( nShmId, IPC_STAT, &StuShm_Ds ); if( nRet < 0 ) { ErrRet( "Error When shmctl, nShmId[x]=%x", nShmId ); return NULL; } if( StuShm_Ds.shm_segsz != lShmSize ) /*大小不对*/ { ErrMsg( "Error InitShm\nBecause Current Version Not Matching " "The Exist Version\nRequire Shmem Size[x]=%x, " "Existing Shmem Size[x]=%x\n" "Pls check the version or remove the share memory, " "KEY[x]=%d", (int)lShmSize, StuShm_Ds.shm_segsz, (int)nShmKey ); return NULL; } pvShmAddr = ( void *)shmat( nShmId, NULL, 0 ); if ( !pvShmAddr || (int)pvShmAddr == -1 ) /* operation failed. */ { ErrRet( "Error When shmat, nShmId = %d\n", nShmId ); return NULL; } if( pnShmId != NULL ) /*输出ID */ { *pnShmId = nShmId; } return pvShmAddr; } #endif/* * Display the numbers of package in Queue * * Added by YXJ(2003-12-18) * */ void vPrintCellNums( void *pAddr, int type ){ recQCell *pQLink = (recQCell *) pAddr; if( pQLink->nHead >= pQLink->nTail ) { // vSetErrorDetail("the number of QUEUE(type: %d) is %d\n", type, pQLink->nHead - pQLink->nTail ); } else { // vSetErrorDetail("the number of QUEUE(type: %d) is %d\n", type, pQLink->nMsgMaxNum - (pQLink->nTail - pQLink->nHead) ); } // Trace((char *) strGetErrorDetailString()); return;}/**DEBUG function*/void vDumpQueue_Api( FILE* fp, void* pAddr ){ FILE* fD; recQCell* pQAddr; if( pAddr == NULL ) return ; pQAddr = (recQCell*) pAddr; fD = ( fp == NULL )? stdout:fp ; fprintf( fD, "\n\n----beg of Queue Status----\n" ); fprintf( fD, "nInitFlag=%d\n", pQAddr->nInitFlag ); /*是否初始化标志*/ fprintf( fD, "nMsgMaxNum=%d\n", pQAddr->nMsgMaxNum ); /*队列的包最多个数*/ fprintf( fD, "nHead=%d\n", pQAddr->nHead ); /*队列的头*/ fprintf( fD, "nTail=%d\n", pQAddr->nTail ); /*队列的尾*/ /*申请的总共内容包内存大小*/ fprintf( fD, "nShmSize=%d\n", pQAddr->nShmSize ); fprintf( fD, "pSAddr[x]=%x\n", (int)pQAddr->pSAddr); /*内容包的开始地址*/ fprintf( fD, "pEAddr[x]=%x\n", (int)pQAddr->pEAddr); /*内容包的结束地址*/ /*当前内容包的开始地址*/ fprintf( fD, "pCurSAddr[x]=%x\n", (int)pQAddr->pCurSAddr ); /*当前内容包的结束地址*/ fprintf( fD, "pCurEAddr[x]=%x\n", (int)pQAddr->pCurEAddr); /*队列中最后一条消息接收时间*/ //fprintf( fD, "tMsgRTime=%s\n", ctime( (time_t*)&pQAddr->tMsgRTime) ); /*队列中最后一条消息发送时间*/ //fprintf( fD, "tMsgSTime=%s\n", ctime( (time_t*)&pQAddr->tMsgSTime) ); /*队列中最后一条消息接收进程ID*/ //fprintf( fD, "nMsgRPid=%d\n", pQAddr->nMsgRPid ); /*队列中最后一条消息发送进程ID*/ //fprintf( fD, "nMsgSPid=%d\n", pQAddr->nMsgSPid ); fprintf( fD, "----end of Queue Status----\n" ); fflush( fD ); return ; }/** to Get Queue Load */int nGetQLoad_Api( const void* pQLink, int* pnLoad, int* pnCellNum ){ recQCell* pQAddr; int nMsgNum; /* current msg number */ int nCurMemSize; /* current used memory size */ int nTotMemSize; /* total memory size */ int nPercent; /* used percent */ if( pQLink == NULL || pnLoad == NULL ) return QUEUE_FAIL; pQAddr = (recQCell*) pQLink; nMsgNum = pQAddr->nHead - pQAddr->nTail ; if( nMsgNum < 0 ) { nMsgNum += pQAddr->nMsgMaxNum ; } if( pnCellNum != NULL ) /*将包的个数输出*/ { *pnCellNum = nMsgNum; } nTotMemSize = (int)pQAddr->pEAddr - (int)pQAddr->pSAddr ; nCurMemSize = (int)pQAddr->pCurEAddr - (int)pQAddr->pCurSAddr; if ( nCurMemSize < 0 ) { nCurMemSize += nTotMemSize ; } nPercent = (int)( ((double)nCurMemSize/(double)nTotMemSize)*100 ); if( pnLoad != NULL ) /* 将百分比输出 */ { *pnLoad = nPercent ; } return QUEUE_OK ; }/** List Current Status of QLink*/void vDumpQLink_Api( FILE* fp, const void* pQLink, int* pnLoad ){ FILE* fD; recQCell* pQAddr; int nMsgNum; /* current msg number */ int nCurMemSize; /* current used memory size */ int nTotMemSize; /* total memory size */ int nPercent; /* used percent */ if( pQLink == NULL ) return ; pQAddr = (recQCell*) pQLink; nMsgNum = pQAddr->nHead - pQAddr->nTail ; if( nMsgNum < 0 ) { nMsgNum += pQAddr->nMsgMaxNum ; } nTotMemSize = (int)pQAddr->pEAddr - (int)pQAddr->pSAddr ; nCurMemSize = (int)pQAddr->pCurEAddr - (int)pQAddr->pCurSAddr; if ( nCurMemSize < 0 ) { nCurMemSize += nTotMemSize ; } nPercent = (int)( ((double)nCurMemSize/(double)nTotMemSize)*100 ); if( pnLoad != NULL ) /* 将百分比输出 */ { *pnLoad = nPercent ; } fD = ( fp == NULL )? stdout:fp ; fprintf( fD, "\n\n----beg of Queue Status----\n" ); /*队列中当前消息个数*/ fprintf( fD, "MsgNum=%d, MaxMsgNum=%d\n", nMsgNum, pQAddr->nMsgMaxNum ); /*队列的使用情况*/ fprintf( fD, "Used/Total:[%d]/[%d] Used:[%d%%] \n", nCurMemSize, nTotMemSize, nPercent ); fprintf( fD, "----end of Queue Status----\n" ); fflush( fD ); return ; }/*static void vDumpQCell( FILE* fp, recQCell* pQLink, int nHead ){ FILE* fD; if( pQLink == NULL ) return ; fD = ( fp == NULL )? stdout:fp ; fprintf( fD, "\n\n---beg of QCell ---\n" ); //包首地址 fprintf( fD, "pPckAddr=%x\n", (int)pQLink->r1Cell[nHead].pPckAddr ); //包内容大小 fprintf( fD, "nPckSize=%d\n", pQLink->r1Cell[nHead].nPckSize ); fprintf( fD, "---end of QCell ---\n" ); fflush( fD );}*/void* pvInitAQueue_Api( int nKey, int nSize, int nCount, int* pnShmId ){ int nRet; /*函数调用结果值*/ long lShmSize; /*需要申请内存的大小*/ char* pvShmAddr; /*内存首地址*/ #ifndef NOUSESHAREMEMORY int nShmId; /*共享内存ID*/ #endif if ( nSize <= 0 || nCount <= 0 || pnShmId == NULL) return NULL; /*包体内存区加上QUEUE的关键值的内存区*/ //lShmSize = nSize*nCount + sizeof( recQCell ); /*QUEUE的控制结构内存区+指针数组+数组的大小+包体内存区*/ lShmSize = sizeof( recQCell ) + ( sizeof( recMsgCell )+ sizeof( void*) )* nCount + nSize*nCount ; #ifdef NOUSESHAREMEMORY pvShmAddr = (char*) malloc( lShmSize ); if( pvShmAddr == NULL ) /* fail */ { ErrMsg( "Error When malloc, lShmSize = %d", (int)lShmSize ); return NULL ; } #else //2002/08/14 wenyz Add pShmMalloc() Function pvShmAddr = (char*)pShmMalloc( nKey, lShmSize, &nShmId ); if( pvShmAddr == NULL ) /* fail */ { ErrMsg( "Error When pShmMalloc, KEY = %d, lShmSize = %d", nKey, (int)lShmSize ); return NULL ; } #endif memset( pvShmAddr, 0, lShmSize ); nRet = nInitQLink( pvShmAddr, lShmSize, pvShmAddr+lShmSize-nSize*nCount, pvShmAddr+lShmSize, nCount ); if ( nRet != 0 ) { ErrMsg( "Error When nInitQLink, pvShmAddr[x] = %x\n", (int)pvShmAddr ); return NULL; } #ifdef NOUSESHAREMEMORY *pnShmId = (int)pvShmAddr; #else *pnShmId = nShmId; #endif return pvShmAddr; }/**描述:复位该内存*输入参数: pQAddr( 队列的内存地址 )*返回值: 0 成功* -1 输入参数错误* */int nResetQueue_Api( void* pQAddr ){ int nRet; /*函数调用结果值*/
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?