📄 shmipc.cpp
字号:
/*********************************************************************************Copyright: cchbds Tech .Co*Filename: shmipc.c*Author Version Date:*Description: store and forward the data in share memery that other thread put it in *Function List 1 init_shm 2 get_send_shm 3 shm_send 4 shmproducer*History:*********************************************************************************/#ifdef HAVE_CONFIG_H#include <config.h>#endif#include "shmipc.h"void send_share_Buffer(char * buffer){//return ; int fd; int index=0; long offset; char * databuf; int lastnoverflow, temp; lastnoverflow = 0; //sem_wait(&ptr->nstored); for ( ; ; ) {
EnterCriticalSection (&cs);
LeaveCriticalSection(&cs);
//sem_wait(&ptr->mutex); //printf("PID[%d]: send_share_M sem_wait ptr->mutex OK\n",getpid()); if(!qempty(ptr->front,ptr->rear) && (ptr->nput)>0 ) { // printf("queue has packet ,nput %d, front %d,rear %d\n",ptr->nput,ptr->front,ptr->rear); int i=0; int ipid=0; int comid=0; int signal=0; long len=0; unsigned long mark=0; long sending_front=0; unsigned short k=0; unsigned short commark; sending_front=ptr->index_front;
while(i < ptr->nput) { offset = ptr->index[sending_front];//ptr_front len=ptr->index[sending_front+1]; mark=ptr->index[sending_front+2]; commark=mark & 0x0000ffff;//printf("send packet packet NUM %d index_front %d ,index_rear %d, front %d, rear %d \n",i,sending_front,ptr->index_rear, ptr->index[sending_front],ptr->rear); k=0; while(k<16) { if((comid = get_one_ocomid( commark, k)) != -1 && ( scomfd[comid]>0)) { int retu=-1; sem_post(&ptr->mutex); retu=send_packet(comid,&(ptr->msgdata[offset]),len); sem_wait(&ptr->mutex); if (retu==0) {printf("send packet oK \n"); //发送成功,改标志 ptr->index[sending_front+2]=ptr->index[sending_front+2] & (~power(2,comid));printf("new mark %d \n", ptr->index[sending_front+2] ); if(!(ptr->index[sending_front+2] )&& (offset ==ptr->front)) { //全部发送完毕 //删除 包,移队头指针 move_index_front(ptr); signal=1; } sem_post(&ptr->mutex); } } k++; } if (signal) { printf(" packet has been send to all com ok \n"); sending_front=ptr->index_front; signal=0; } else { //printf(" packet has not been send to all com not ok \n"); if((sending_front += 3) > NMESG ) { sending_front=0; } while (ptr->index[sending_front]==-1 && sending_front !=ptr->index_rear) { printf(" still have packet not send \n"); if((sending_front += 3) > NMESG ) { sending_front=0; } } } ++i; // printf(" begin send next packet ! %d ,total packet num %d \n",i,ptr->nput); } if(ptr->nput==0) { //empty ptr->rear=0; ptr->front=0; ptr->index_rear=0; ptr->index_front=0; }printf("consumer ptr->nput %d index_front %d ,index_rear %d, front %d, rear %d \n",ptr->nput,ptr->index_front,ptr->index_rear,ptr->front,ptr->rear); } sem_post(&ptr->mutex); //sem_post(&ptr->nstored);// printf("wait new packet \n"); sleep(2); } return ;}/*********************************************************************************Function: shmproducer*Description: write unsend data to sharebuffer*Calls: qoverflow qinsert*Called By: every recv pthread*Input: indata :msg len*Output: *Return: the result suc or failed*Others:*********************************************************************************/int shmproducer(char *msg,unsigned long len,unsigned long mark){ int fd; //pid_t pid; long offset; shmstruct *ptr; printf("*******************************shmproducer begin \n"); if((fd = shm_open(SHMOBJECT, O_RDWR,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH))<0)//00644); return 0; ptr = mmap(NULL, sizeof( shmstruct), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); close(fd); //pid = getpid(); printf("&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&shmproducer wait ptr->mutex \n"); sem_wait(&ptr->mutex); if(qoverflow(ptr->front,ptr->rear,BUFFERSIZE,len)) { //overflow,丢最先收到的包,改变front的值 ptr->front=ptr->index[ptr->index_front]; //数据缓存区溢出 while(qoverflow(ptr->front,ptr->rear,BUFFERSIZE,len)) { //移动索引区头指针 move_index_front(ptr); } } //数据区正常无溢出 if(qoverflow(ptr->index_front,ptr->index_rear,NMESG,3)) { //索引区溢出 move_index_front(ptr); } //两者均无溢出,开始放入数据到数据区尾 if(ptr->index_rear + 3 > NMESG) { ptr->index_rear=0; } ptr->index[ptr->index_rear]=ptr->rear;printf("save packet to queue offset %d ptr->rear %d \n",ptr->rear ,ptr->index[ptr->index_rear] ); (ptr->index_rear)++; ptr->index[ptr->index_rear]=len;printf("save packet to queue len %d len %d \n",len ,ptr->index[ptr->index_rear] ); (ptr->index_rear)++; ptr->index[ptr->index_rear]=mark; (ptr->index_rear)++; //放入数据printf("save packet to queue offset %d len %d mark %d \n",ptr->rear ,offset ,len , mark); qinsert(ptr->msgdata,msg,len, ptr->front,ptr->rear,BUFFERSIZE); if((ptr->rear+len) > BUFFERSIZE) { ptr->rear=ptr->rear+len- BUFFERSIZE; } else { ptr->rear=ptr->rear+len; } ++(ptr->nput); sem_post(&ptr->mutex); sem_post(&ptr->nstored);printf("shmproducer OK \n");printf("shmproducer ptr->nput %d index_front %d ,index_rear %d, front %d, rear %d \n",ptr->nput,ptr->index_front,ptr->index_rear,ptr->front,ptr->rear); return 1;}int move_index_front(shmstruct *ptr){ if((ptr->index_front+=3) > NMESG ) { //索引区溢出 ptr->index[ptr->index_front-3]=-1; ptr->index_front=0; //ptr->nput--; } while(ptr->index[ptr->index_front]==-1) { //中间有包已经发送成功,但未从缓存中移走 if((ptr->index_front+=3)>NMESG) { //索引区溢出 ptr->index[ptr->index_front-3]=-1; ptr->index_front=0; } ptr->index[ptr->index_front]=-1; ptr->index_front+=3; } ptr->front=ptr->index[ptr->index_front];// (ptr->nput)--; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -