📄 ipc.c
字号:
/* TradeClient <http://tradeclient.sourceforge.net> * $Id: ipc.c,v 1.23 2001/03/20 22:19:33 ttabner Exp $ * * Copyright (C) 1999-2000 Bynari Inc. * Copyright (C) 2001 Project TradeClient * * LGPL * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU Library General Public License as published by * the Free Software Foundation; either version 2 of the License, or (at * your option) any later version. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library * General Public License for more details. * * You should have received a copy of the GNU Library General Public License * along with this program; if not, write to the Free Software Foundation, * Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */#include <stdio.h>#include <stdlib.h>#include <unistd.h>#include <string.h>#ifdef DEBUG#include <stdio.h>#endif#include "ipc.h"#include "ipc-proxy.h"#ifdef DMALLOC#include <dmalloc.h>#endif#ifdef MEMWATCH#include <memwatch.h>#endif/* * It is assumed that the data packet will look like: * * qPayload + MsgCode + Raw Data * * Whereby, Raw Data is strictly optional. MsgCode is assumed to * always be present. Only by doing this can all of the packets * be properly deplexed by the recipient. */static qStream *anchorQStream = (qStream *)NULL ;static qStream *tailQStream = (qStream *)NULL ;/* * This function creates a new SYS V IPC semaphore for us. * Notice that all sems created with this are private with * 0600 perms on them!!! */intinitSem( void ){ return semget( IPC_PRIVATE, 1, (IPC_CREAT | IPC_EXCL | 0600) ) ;}intdeInitSem( int semID ){ return semctl( semID, 0, IPC_RMID, 0 ) ;}/* * Lock a semaphore (mutex) */intlockSem( int sem ){ int semValue ; int retValue ; struct sembuf sem_lock = { 0, -1, 0 } ; /* Okay, fetch the current value to see what we need to do */ semValue = semctl( sem, 0, GETVAL, 0 ) ; if( semValue > 0 ) { retValue = semop( sem, &sem_lock, 1 ) ; } else { retValue = 0 ; if( semValue == 0 ) wait( NULL ) ; } return retValue ;}/* * Unlock a sem (mutex) */intunlockSem( int sem ){ int semValue ; struct sembuf sem_unlock = { 0, 1, 0 } ; semValue = semctl( sem, 0, GETVAL, 0 ) ; return semop( sem, &sem_unlock, 1 ) ; return 0 ;}/* * Create a private message queue. * Attempt to make the size of the queue twice * as large as it's default. I expect that this * will fail for most platforms. No matter what, * as long as we get a queue id, return it, otherwise, * report an error. */intinitMsgQ( void ){ int q ; struct msqid_ds qData, *qDatap ; q = msgget( IPC_PRIVATE, (IPC_CREAT | IPC_EXCL | 0600) ) ; /* If we got a msg queue, let's change it's profile */ if( q ) { /* Fetch the existing data values */ qDatap = &qData ; if( msgctl( q, IPC_STAT, qDatap ) == 0 ) { /* Now, let's change the old values and update the system */ qData.msg_qbytes *= 2 ; msgctl( q, IPC_SET, qDatap ) ; } } return q ;}/* * Read from a message queue * Reads can be blocked or non-blocked calls, as specified by boolean wait flag * Reads are also selected to a msgType specification, which follow IPC SYS V rules */intreadMsgQ( int mqd, qPayload *payload, unsigned char *buffer, long msgType, int wait ){ int mode ; int retValue ; IPCMsgBuf msg ;#ifdef DMALLOC dmalloc_verify( 0 ) ;#endif /* Set the IPC blocking mode */ if( wait ) mode = 0 ; else mode = IPC_NOWAIT ; retValue = msgrcv( mqd, &msg, MSGQ_MAXSIZE, msgType, mode ) ; if( retValue > 0 ) { /* If we have data, let's assume that it's been packeted correctly */ memcpy( payload, msg.mtext, sizeof(qPayload) ) ; memcpy( buffer, &msg.mtext[sizeof(qPayload)], retValue - sizeof(qPayload) ) ; retValue -= sizeof(qPayload) ; }#if DEBUG > 6 printf( "%d was returned on msgrcv() call.\n", retValue ) ;#endif return retValue ;}/* * Write to a message queue * All writes will be blocked. */int writeMsgQ( int mqd, qPayload *payload, unsigned char *buffer, int length, long msgType ){ int loop ; int sent ; int offset ; int retValue ; int subLength ; IPCMsgBuf msg ;#ifdef DMALLOC dmalloc_verify( 0 ) ;#endif /* Assign the request message type value */ msg.mtype = msgType ; /* Loop through, sending our data */ retValue = 1 ; loop = length / (MSGQ_MAXSIZE - sizeof(qPayload)) ;#if DEBUG > 5 printf( "writeMsgQ( length = %d, psize = %d, MSGQ_MAXSIZE = %d, fragments = %d).\n", length, (MSGQ_MAXSIZE - sizeof(qPayload)), MSGQ_MAXSIZE, loop+1 ) ;#endif for( sent = offset = 0 ; ((sent < loop) && (retValue >= 0)) ; sent++ ) { /* Copy that portion of the data buffer */ payload -> psize = MSGQ_MAXSIZE - sizeof(qPayload) ; offset += payload -> psize ; memcpy( msg.mtext, payload, sizeof(qPayload) ) ; memcpy( &msg.mtext[sizeof(qPayload)], &buffer[sent*payload -> psize], payload -> psize ) ; /* This is a blocking I/O operation */ retValue = msgsnd( mqd, &msg, MSGQ_MAXSIZE, 0 ) ; } if( retValue >= 0 ) { /* Now, send any remaining data */ if( (loop * (MSGQ_MAXSIZE - sizeof(qPayload))) < length ) { if( payload -> msgCode == MSG_MSG_BLOCK ) payload -> msgCode = MSG_MSG_LAST_BLOCK ; subLength = length - offset ; payload -> psize = subLength ; memcpy( msg.mtext, payload, sizeof(qPayload) ) ; memcpy( &msg.mtext[sizeof(qPayload)], &buffer[offset], subLength ) ; /* This is a blocking I/O operation */ retValue = msgsnd( mqd, &msg, subLength + sizeof(qPayload), 0 ) ;#if DEBUG > 5 printf( "writeMsgQ( psize = %ld ).\n", payload -> psize ) ;#endif } } return retValue ;}/* * Close a message queue. * WARNING! This will purge all remaining messages in * the queue. So, make sure that have all been consumed first! */intdeInitMsgQ( int q ){ long cntr ; qStream *tmp ; /* Reset the global list and the message queue here */#if DEBUG > 3 printf( "freeing unused buffered message queue data packet %p.\n", anchorQStream ) ;#endif cntr= 0L ; while( anchorQStream ) { cntr++ ; tmp = anchorQStream -> next ; freeQStream( anchorQStream ) ; anchorQStream = tmp ; } anchorQStream = (qStream *)NULL ;#if DEBUG > 3 printf( "%ld blocks were freed from anchorQStream.\n", cntr ) ;#endif return msgctl( q, IPC_RMID, NULL ) ;}/* * This simple function takes care of freeing the memory associated * with a msg queue structure. It's smart enough to figure out its * type and free any memory hanging off of it. */intfreeQMsg( qMsg *msg ){ int retValue ; retValue = 0 ; if( (msg) && (msg -> datType == cValue) ) { free( msg -> data.cValue ) ; retValue = 1 ; } free( msg ) ; return retValue ;}intfreeQStream( qStream *stream ){ int retValue ; retValue = freeQMsg( stream -> msg ) ; free( stream -> payload ) ; free( stream ) ; return retValue ;}/* * This is the high level message queue write function which * makes calls to the lower level functions. It will construct * the proper payload structure and send it onto the queue * as specified via the qMsg data structure. */intsendMsg( qMsg *msg ){ int retValue = 0 ; qPayload payload ; static unsigned long seq = 0 ; /* Construct the payload section */ payload.msgCode = msg -> msgCode ; payload.pid = getpid() ; payload.psize = 0 ; /* We don't know what this is yet, writeMsgQ takes care of this!!! */ payload.tsize = msg -> length ; /* This must be correct!!! */ payload.sequence = ++seq ; payload.timed = time( NULL ) ; payload.magic = msg -> length - seq ; /* Now, based on the msg payload type, write it to the message queue */ switch( msg -> datType ) { case cValue: retValue = writeMsgQ( msg -> msgQ, &payload, (unsigned char *)msg -> data.cValue, msg -> length, msg -> msgType ) ; break ; case iValue: retValue = writeMsgQ( msg -> msgQ, &payload, (unsigned char *)&msg -> data.iValue, msg -> length, msg -> msgType ) ; break ; default: fprintf( stderr, "sendMsg() has an unknown value of %d\n", msg -> datType ) ; break ; } /* * We really need to send the message block last message here so the recipient knows * it's okay to reassemble the message that we just sent. * Logic needs to go here....we must make sure that we actually sent a message * message in the first place. */ /* Let everyone know what the result was */ return retValue ;}/* * Okay, this function is a little odd. It will read from the message * queue. In synchronous mode, it will block until a completed message * can be reconstructed. In asynchronous mode, it will read chunk by * chunk appending to a list of chunks (a qStream). This way, once * a stream has been completely reconstructed, it will then be delivered * to the caller. In either case, a completed message will be returned * with the sole difference being if it blocks while waiting from the * completed message from all of the smaller packets. */char *recvMsg( int mqd, int to, int wait ){ int ret ; char *retValue ;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -