queue.c

来自「用c/c++实现的一个CMPP API」· C语言 代码 · 共 776 行 · 第 1/2 页

C
776
字号
/***************************************************************************  Copyright    : 2001-2002, ASPIRE TECHNOLOGIES (SHENZHEN) LTD.  Program ID   : queue.c  Description  : 队列函数实现C文件  Version      : NEWCMPPAPI  Functions    : 内部函数  Modification Log:       DATE         AUTHOR          DESCRIPTION --------------------------------------------------------------------------       11/25/2002   wenyz     Create       11/28/2002   wenyz     修改nReadQueue返回值***************************************************************************/#include "queue.h"#define NOUSESHAREMEMORY    /* 使用内存数据,而不是SHARED MEMORY */#ifndef  NOUSESHAREMEMORY    #include <sys/shm.h>#endif typedef struct  tagMsgCell{        void*   pPckAddr;   /*包首地址*/    int     nPckSize;   /*包内容大小*/    }recMsgCell;#define MacQueueCellNum  65536  /*queue中最多的包数量*/typedef struct  tagQCell{    int nInitFlag;          /*是否初始化标志*/    int nMsgMaxNum;         /*队列的包最多个数*/    int nHead;              /*队列的头*/    int nTail;              /*队列的尾*/    int nShmSize;           /*申请的总共内容包内存大小*/    void*   pSAddr;         /*内容包的开始地址*/    void*   pEAddr;         /*内容包的结束地址*/    void*   pCurSAddr;      /*当前内容包的开始地址*/    void*   pCurEAddr;      /*当前内容包的结束地址*/    //recMsgCell r1Cell[MacQueueCellNum]; /* 包的首信息 */    recMsgCell** prCell;                /* 包的首信息 */}recQCell;/**内部使用函数*//*static  void    vDumpQCell( FILE* fp, recQCell* pQLink, int nHead );*/static  int     nInitQLink( void* pAddr, int nSize,                            void* pSAddr, void* pEAddr,                           int nCount );static  int     nCheckQFull( const recQCell* pQLink, int nSize );static  int     nWriteFinish( recQCell* pQLink, int nSize, long lKey );static  int     nWriteCellPck( const recQCell* pQLink,                            void* pSrcAddr,                            int nSize );static  int     nCheckQCell( const recQCell* pQLink );static  int     nReadFinish( recQCell* pQLink, int nSize );static  int     nReadCellPck( const recQCell* pQLink,                               void* pDstAddr,                               int nSize );#ifndef NOUSESHAREMEMORY/**描述:	共享内存分配函数 *输入参数:nShmKey , 关键字 lShmSize 大小, *输出参数: pnShmId 	输出共享内存ID,为NULL不输出*返回值:       > 0 成功,*	  	Null 失败 */ #define MacShmFlag 0666void*		pShmMalloc( int nShmKey, long lShmSize, int* pnShmId  ){	int	nShmId;		/*共享内存ID*/	void*	pvShmAddr = NULL;	/*共享内存首地址*/	struct	shmid_ds StuShm_Ds; /*内存信息的数据结构*/	int	nRet;		nShmId = shmget( nShmKey, lShmSize, IPC_CREAT | MacShmFlag );		if ( nShmId < 0 )	{			ErrRet( "Error When Shmget, nKey[x] = %x, Size[x] = %x\n", nShmKey, (int)lShmSize );		nShmId = shmget( nShmKey, 0, IPC_CREAT | MacShmFlag );		if ( nShmId < 0 )		{			ErrRet( "Error When Shmget, nKey[x] = %x, Size[x] = %x\n", nShmKey, 0 );			return NULL;		}			}	/*为防止版本不对,在这里对大小进行严格的匹配*/	nRet = shmctl( nShmId, IPC_STAT, &StuShm_Ds );	if( nRet < 0 )	{		ErrRet( "Error When shmctl, nShmId[x]=%x", nShmId );		return NULL;	}	if( StuShm_Ds.shm_segsz != lShmSize ) /*大小不对*/	{		ErrMsg( "Error InitShm\nBecause Current Version Not Matching "		        "The Exist Version\nRequire Shmem Size[x]=%x, "		        "Existing Shmem Size[x]=%x\n"		        "Pls check the version or remove the share memory, "		        "KEY[x]=%d", 			    (int)lShmSize, StuShm_Ds.shm_segsz, (int)nShmKey );		return NULL;	}		pvShmAddr = ( void *)shmat( nShmId, NULL, 0 );	if ( !pvShmAddr || (int)pvShmAddr == -1 )  /* operation failed. */       	{       		ErrRet( "Error When shmat, nShmId = %d\n", nShmId );       		return NULL;        }                if( pnShmId != NULL ) /*输出ID */        {        	*pnShmId = nShmId;        }        return pvShmAddr;        }	#endif/*  * Display the numbers of package in Queue  *   * Added by YXJ(2003-12-18)  *  */  void vPrintCellNums( void  *pAddr, int type ){    recQCell  *pQLink = (recQCell *) pAddr;        if( pQLink->nHead >= pQLink->nTail )    {    	//   vSetErrorDetail("the number of QUEUE(type: %d)  is %d\n", type, pQLink->nHead - pQLink->nTail );    }    else     {       //	   vSetErrorDetail("the number of QUEUE(type: %d)  is %d\n", type, pQLink->nMsgMaxNum - (pQLink->nTail - pQLink->nHead) );   	}   // Trace((char *) strGetErrorDetailString());    return;}/**DEBUG function*/void    vDumpQueue_Api( FILE* fp, void* pAddr ){    FILE* fD;    recQCell* pQAddr;        if( pAddr == NULL )    return ;        pQAddr = (recQCell*) pAddr;            fD = ( fp == NULL )? stdout:fp ;        fprintf( fD, "\n\n----beg of Queue Status----\n" );        fprintf( fD, "nInitFlag=%d\n",  pQAddr->nInitFlag );    /*是否初始化标志*/    fprintf( fD, "nMsgMaxNum=%d\n", pQAddr->nMsgMaxNum );   /*队列的包最多个数*/    fprintf( fD, "nHead=%d\n", pQAddr->nHead );     /*队列的头*/    fprintf( fD, "nTail=%d\n", pQAddr->nTail );     /*队列的尾*/        /*申请的总共内容包内存大小*/    fprintf( fD, "nShmSize=%d\n", pQAddr->nShmSize );       fprintf( fD, "pSAddr[x]=%x\n", (int)pQAddr->pSAddr);    /*内容包的开始地址*/    fprintf( fD, "pEAddr[x]=%x\n", (int)pQAddr->pEAddr);    /*内容包的结束地址*/        /*当前内容包的开始地址*/    fprintf( fD, "pCurSAddr[x]=%x\n", (int)pQAddr->pCurSAddr );        /*当前内容包的结束地址*/    fprintf( fD, "pCurEAddr[x]=%x\n", (int)pQAddr->pCurEAddr);                /*队列中最后一条消息接收时间*/    //fprintf( fD, "tMsgRTime=%s\n", ctime( (time_t*)&pQAddr->tMsgRTime) );        /*队列中最后一条消息发送时间*/    //fprintf( fD, "tMsgSTime=%s\n", ctime( (time_t*)&pQAddr->tMsgSTime) );           /*队列中最后一条消息接收进程ID*/    //fprintf( fD, "nMsgRPid=%d\n", pQAddr->nMsgRPid );           /*队列中最后一条消息发送进程ID*/    //fprintf( fD, "nMsgSPid=%d\n", pQAddr->nMsgSPid );           fprintf( fD, "----end of Queue Status----\n" );                     fflush( fD );       return ;                }/** to Get Queue Load */int nGetQLoad_Api( const void* pQLink, int* pnLoad, int* pnCellNum ){    recQCell* pQAddr;    int nMsgNum;        /* current msg number */    int nCurMemSize;    /* current used memory size */    int nTotMemSize;    /* total memory size */    int nPercent;       /* used percent */        if( pQLink == NULL || pnLoad == NULL  )    return QUEUE_FAIL;        pQAddr = (recQCell*) pQLink;    nMsgNum = pQAddr->nHead - pQAddr->nTail ;    if( nMsgNum  < 0 )    {        nMsgNum +=  pQAddr->nMsgMaxNum ;    }        if( pnCellNum != NULL ) /*将包的个数输出*/    {        *pnCellNum = nMsgNum;    }        nTotMemSize = (int)pQAddr->pEAddr - (int)pQAddr->pSAddr ;    nCurMemSize = (int)pQAddr->pCurEAddr - (int)pQAddr->pCurSAddr;    if ( nCurMemSize < 0 )     {        nCurMemSize += nTotMemSize ;     }    nPercent = (int)( ((double)nCurMemSize/(double)nTotMemSize)*100 );    if( pnLoad != NULL ) /* 将百分比输出 */    {        *pnLoad = nPercent ;    }    return QUEUE_OK ;  }/** List Current Status of QLink*/void    vDumpQLink_Api( FILE* fp, const void* pQLink, int* pnLoad ){    FILE* fD;    recQCell* pQAddr;        int nMsgNum;        /* current msg number */    int nCurMemSize;    /* current used memory size */    int nTotMemSize;    /* total memory size */    int nPercent;       /* used percent */        if( pQLink == NULL )    return ;        pQAddr = (recQCell*) pQLink;        nMsgNum = pQAddr->nHead - pQAddr->nTail ;    if( nMsgNum  < 0 )    {        nMsgNum += pQAddr->nMsgMaxNum ;    }        nTotMemSize = (int)pQAddr->pEAddr - (int)pQAddr->pSAddr ;    nCurMemSize = (int)pQAddr->pCurEAddr - (int)pQAddr->pCurSAddr;    if ( nCurMemSize < 0 )     {        nCurMemSize += nTotMemSize ;     }    nPercent = (int)( ((double)nCurMemSize/(double)nTotMemSize)*100 );        if( pnLoad != NULL ) /* 将百分比输出 */    {        *pnLoad = nPercent ;    }        fD = ( fp == NULL )? stdout:fp ;    fprintf( fD, "\n\n----beg of Queue Status----\n" );            /*队列中当前消息个数*/    fprintf( fD, "MsgNum=%d, MaxMsgNum=%d\n", nMsgNum, pQAddr->nMsgMaxNum );            /*队列的使用情况*/    fprintf( fD, "Used/Total:[%d]/[%d] Used:[%d%%] \n",                 nCurMemSize, nTotMemSize, nPercent );    fprintf( fD, "----end of Queue Status----\n" );                     fflush( fD );       return ;        }/*static  void    vDumpQCell( FILE* fp, recQCell* pQLink, int nHead ){    FILE* fD;        if( pQLink == NULL )    return ;            fD = ( fp == NULL )? stdout:fp ;        fprintf( fD, "\n\n---beg of QCell ---\n" );        //包首地址    fprintf( fD, "pPckAddr=%x\n", (int)pQLink->r1Cell[nHead].pPckAddr  );        //包内容大小    fprintf( fD, "nPckSize=%d\n", pQLink->r1Cell[nHead].nPckSize  );        fprintf( fD, "---end of QCell ---\n" );    fflush( fD );}*/void*   pvInitAQueue_Api(  int nKey, int nSize, int nCount, int* pnShmId ){    int     nRet;       /*函数调用结果值*/    long    lShmSize;   /*需要申请内存的大小*/    char*   pvShmAddr;  /*内存首地址*/        #ifndef   NOUSESHAREMEMORY        int     nShmId;     /*共享内存ID*/    #endif                if (  nSize <= 0 || nCount <= 0 || pnShmId == NULL)    return NULL;    /*包体内存区加上QUEUE的关键值的内存区*/    //lShmSize = nSize*nCount + sizeof( recQCell );         /*QUEUE的控制结构内存区+指针数组+数组的大小+包体内存区*/    lShmSize = sizeof( recQCell ) +                ( sizeof( recMsgCell )+ sizeof( void*) )* nCount                + nSize*nCount  ;         #ifdef   NOUSESHAREMEMORY        pvShmAddr = (char*) malloc( lShmSize );        if( pvShmAddr == NULL ) /* fail */        {            ErrMsg( "Error When malloc, lShmSize = %d",                      (int)lShmSize );            return NULL ;        }    #else        //2002/08/14    wenyz   Add     pShmMalloc() Function        pvShmAddr = (char*)pShmMalloc( nKey, lShmSize, &nShmId );        if( pvShmAddr == NULL ) /* fail */        {            ErrMsg( "Error When pShmMalloc, KEY = %d, lShmSize = %d",                      nKey, (int)lShmSize );            return NULL ;        }            #endif    memset( pvShmAddr, 0, lShmSize );    nRet = nInitQLink( pvShmAddr, lShmSize,                        pvShmAddr+lShmSize-nSize*nCount,                        pvShmAddr+lShmSize,                       nCount );     if ( nRet != 0 )    {        ErrMsg( "Error When nInitQLink, pvShmAddr[x] = %x\n",                 (int)pvShmAddr );        return NULL;    }    #ifdef   NOUSESHAREMEMORY            *pnShmId = (int)pvShmAddr;    #else        *pnShmId = nShmId;    #endif              return pvShmAddr;    }/**描述:复位该内存*输入参数: pQAddr( 队列的内存地址 )*返回值: 0 成功*   -1 输入参数错误*   */int nResetQueue_Api( void* pQAddr ){    int nRet;       /*函数调用结果值*/

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?