📄 emm.c
字号:
break;
case EMMG2MUX_Bandwidth:
bandwidth=*(unsigned short *)((char *)pItemHead + sizeof(CA_MESSAGE_ITEM_HEAD));
break;
}
cmdlen-=pItemHead->parameter_length + sizeof(CA_MESSAGE_ITEM_HEAD);
pItemHead=(CA_MESSAGE_ITEM_HEAD *)((char *)pItemHead + sizeof(CA_MESSAGE_ITEM_HEAD) + pItemHead->parameter_length);
}
for(ucCACount = 0;ucCACount<3;ucCACount++)
{
if(g_emmg_stream_info[ucCACount].client_ID == clientid)
{
/* g_emmg_stram_bw_request[ucCACount].client_ID = clientid;
g_emmg_stram_bw_request[ucCACount].data_channel_ID = channelid;
g_emmg_stram_bw_request[ucCACount].data_stream_ID = streamid;
g_emmg_stram_bw_request[ucCACount].bandwidth = bandwidth;*/
break;
}
}
if(ucCACount >=3)
break;
/* caSendLog(LOGTYPE_NORMAL,"emmg: Receive stream bandwidth alloaction message,streamid=%d.",g_emmg_stram_bw_request[ucCACount].data_stream_ID);
Display("emmg: Receive stream bandwidth alloaction message,client_ID=%d.",g_emmg_stram_bw_request[ucCACount].client_ID);*/
/*回复带宽分配应答*/
emmgsendstreambwallocation(socket,ucCACount,bandwidth);
break;
/*获得EMM数据*/
case EMMG2MUX_DATA_PROVISION:
/* g_emmg_data_provision.data_num=0;*/
while(cmdlen>0)
{
switch(pItemHead->parameter_type)
{
case EMMG2MUX_client_ID:
clientid = *(unsigned long *)((char *)pItemHead + sizeof(CA_MESSAGE_ITEM_HEAD));
/* g_emmg_data_provision.client_ID=clientid;*/
break;
case EMMG2MUX_data_channel_ID:
/* g_emmg_data_provision.data_channel_ID=*(unsigned short *)((char *)pItemHead + sizeof(CA_MESSAGE_ITEM_HEAD));*/
break;
case EMMG2MUX_data_stream_ID:
/* g_emmg_data_provision.data_stream_ID=*(unsigned short *)((char *)pItemHead + sizeof(CA_MESSAGE_ITEM_HEAD));*/
break;
case EMMG2MUX_datagram:
g_emm_datagram_length=pItemHead->parameter_length;
if(g_emm_datagram_length<=MAX_EMMG_BUFFER_LENGTH)
{
/*MemCopy(EmmgDatagramBuf,(char *)pItemHead + sizeof(CA_MESSAGE_ITEM_HEAD),g_emm_datagram_length);*/
/*发送EMM数据*/
for(i=0;i<3;i++)
{
if(g_emmg_channel_info[i].client_ID == clientid)
break;
}
/* Display1("EmmPid:%x,index:%d",g_ca_config.Para_Config[i].EMM_Pid,i);*/
if(i>=3)
break;
send_emm_datagram((char *)pItemHead + sizeof(CA_MESSAGE_ITEM_HEAD),g_emm_datagram_length,g_ca_config.Para_Config[i].EMM_Pid,i);
}
break;
}
cmdlen-=pItemHead->parameter_length + sizeof(CA_MESSAGE_ITEM_HEAD);
pItemHead=(CA_MESSAGE_ITEM_HEAD *)((char *)pItemHead + sizeof(CA_MESSAGE_ITEM_HEAD) + pItemHead->parameter_length);
}
/* caSendLog(LOGTYPE_NORMAL,"emmg: Receive emm provision message,streamdid=%d.",g_emmg_data_provision.data_stream_ID); */
break;
default:
break;
}
pbuf+=sizeof(CA_MESSAGE_HEAD) + pHead->message_length;
msglen-=sizeof(CA_MESSAGE_HEAD) + pHead->message_length;
pHead=(CA_MESSAGE_HEAD *)pbuf;
cmd=ca_get_message_type(pbuf);
orgcmdlen=cmdlen=ca_get_message_length(pbuf);
pItemHead=(CA_MESSAGE_ITEM_HEAD *)((char *)pHead + sizeof(CA_MESSAGE_HEAD));
}
/* rc = mu_unlock(ulMultEMM);
if(rc)
Display("unlock error");
*/
return 0;
}
/*此任务建立监听,接收EMMG的连接及处理*/
void tk_emmgMgr(void)
{
int g_new_socket_emmg=0;
struct sockaddr_in g_EcmgAddr;
int g_EcmgAddrLen;
int i;
struct sockaddr_in MuxAddr;
int rc;
/*unsigned long Msg[4];*/
int MsgLen=0;
unsigned long RcvBufLen,SendBufLen;
unsigned long IsKeepAlvie;
unsigned long KeepAliveCnt;
unsigned long KeepAliveIdleTime;
unsigned long KeepAliveTime;
unsigned long MSL;
int iCurConnect;
int RecvSockId;
struct timeval wait;
int nb;
static int isFirst = 0;
static int bRestart= 0;
int index;
char emmgrecvbuf[MAX_EMMG_BUFFER_LENGTH]; /*by xu*/
fd_set read_mask;
unsigned long int waitTime=0;
while(1)
{
if(g_new_socket_emmg)
{
close(g_new_socket_emmg);
}
if(g_socket_emmg)
{/*如为真,则关闭套接口*/
close(g_socket_emmg);
g_socket_emmg = 0;
}
g_Emmg_connected[0] = 0;
/*创建套接口*/
g_socket_emmg = socket(AF_INET, SOCK_STREAM, TCP);
if(g_socket_emmg == -1)
{
Display("emmg: Socket create error, error code=0x%x.\r\n",errno);
close(0);
tm_wkafter(100);/*by xu*/
continue;
}
Display("emmg: Socket create ok!\r\n");
MuxAddr.sin_family = AF_INET;
MuxAddr.sin_port = htons(g_ca_config.Para_Config[0].EMMG_Port);
MuxAddr.sin_addr.s_addr = htonl(INADDR_ANY);
for(i=0;i<8;i++)
MuxAddr.sin_zero[i] = 0;
/*绑定地址及端口号*/
rc = bind(g_socket_emmg, (struct sockaddr *)&MuxAddr, sizeof(MuxAddr));
if(rc == -1)
{
close(0);
Display("emmg: Bind error, socket=%d,error code=0x%x.\r\n",g_socket_emmg,errno);
tm_wkafter(100);/*by xu*/
continue;
}
Display("emmg:Bind ok!\r\n");
break;
}
while(1)
{
/*启动监听*/
rc = listen(g_socket_emmg,1);
if(rc == -1)
{
close(g_socket_emmg);
Display("emmg: Listen error, socket=%d,error code=0x%x.\r\n",g_socket_emmg,errno);
tm_wkafter(100);/*by xu*/
continue;
}
else
{
Display("emmg: listen ok!\r\n");
}
g_EcmgAddrLen=sizeof(struct sockaddr_in);
/*接收连接事件*/
/*Display1("Start to Listen the connect!");*/
waitTime=0;
while((g_new_socket_emmg = accept(g_socket_emmg, (struct sockaddr *)&g_EcmgAddr, &g_EcmgAddrLen)) == -1)
{
Display("emmg: Accept error,error code=0x%x.\r\n",errno);
tm_wkafter(10);/*by xu*/
waitTime++;
if(waitTime>20)
{
break;
}
continue;
}
if(waitTime>20)
{
continue;
}
waitTime=0;
Display("emmg: Accept ok,start to receive message!\r\n");
g_Emmg_connected[0] = 1;
#if 0
IsKeepAlvie=1;
KeepAliveCnt =2;
KeepAliveIdleTime=5;
KeepAliveTime =5;
MSL =5;
rc=setsockopt(g_new_socket_emmg,SOL_SOCKET,SO_KEEPALIVE,(char *)&IsKeepAlvie,sizeof(IsKeepAlvie));
rc=setsockopt(g_new_socket_emmg,IPPROTO_TCP,TCP_KEEPALIVE_CNT,(char *)&KeepAliveCnt ,4);
rc=setsockopt(g_new_socket_emmg,IPPROTO_TCP,TCP_KEEPALIVE_IDLE,(char *)&KeepAliveIdleTime,sizeof(KeepAliveIdleTime));
rc=setsockopt(g_new_socket_emmg,IPPROTO_TCP,TCP_KEEPALIVE_INTVL,(char *)&KeepAliveTime,sizeof(KeepAliveTime));
rc=setsockopt(g_new_socket_emmg,IPPROTO_TCP,TCP_MSL,(char *)&MSL,sizeof(MSL));
#endif
SendBufLen=4*1024;
RcvBufLen =32*1024;
rc=setsockopt(g_new_socket_emmg,SOL_SOCKET,SO_RCVBUF,(char *)&RcvBufLen,4);
rc=setsockopt(g_new_socket_emmg,SOL_SOCKET,SO_SNDBUF,(char *)&SendBufLen,4);
while(1)
{
if(g_Emmg_connected[0] ==0)
{
close(g_new_socket_emmg);
g_new_socket_emmg=0;
bRestart=1;
break;
}
FD_ZERO (&read_mask);
FD_SET (g_new_socket_emmg, &read_mask);
wait.tv_sec = 0;
wait.tv_usec = 1000*1000;
nb = select (FD_SETSIZE, &read_mask, (fd_set *) 0,(fd_set *) 0, &wait);
if(nb<0)
{
g_Emmg_connected[0] = 0;
}
else if(nb==0)
{
waitTime++;
if(waitTime>15)
{
g_Emmg_connected[0] = 0;
}
}
else
{
if (FD_ISSET(g_new_socket_emmg, &read_mask))
{
rc=recv(g_new_socket_emmg,emmgrecvbuf,MAX_EMMG_BUFFER_LENGTH,0);
if(rc>0)
{
emmg_cmd_proc(emmgrecvbuf,g_new_socket_emmg,rc,0);
waitTime=0;
}
else
{
g_Emmg_connected[0] = 0;
}
}
}
}
}
}
void tk_emmgMgr1(void)
{
int g_new_socket_emmg=0;
struct sockaddr_in g_EcmgAddr;
int g_EcmgAddrLen;
int i;
struct sockaddr_in MuxAddr;
int rc;
unsigned long Msg[4];
int MsgLen=0;
unsigned long RcvBufLen,SendBufLen;
unsigned long IsKeepAlvie;
unsigned long KeepAliveCnt;
unsigned long KeepAliveIdleTime;
unsigned long KeepAliveTime;
unsigned long MSL;
int iCurConnect;
int RecvSockId;
struct timeval wait;
int nb;
static int isFirst = 0;
static int bRestart=0;
int index;
char emmgrecvbuf[MAX_EMMG_BUFFER_LENGTH];
fd_set read_mask;
unsigned long int waitTime=0;
while(1)
{
if(g_new_socket_emmg)
{
close(g_new_socket_emmg);
}
if(g_socket_emmg1)
{/*如为真,则关闭套接口*/
close(g_socket_emmg1);
g_socket_emmg1 = 0;
}
g_Emmg_connected[1] = 0;
/*创建套接口*/
g_socket_emmg1 = socket(AF_INET, SOCK_STREAM, TCP);
if(g_socket_emmg1 == -1)
{
Display("emmg1: Socket create error, error code=0x%x.\r\n",errno);
close(0);
tm_wkafter(10);/*by xu*/
continue;
}
Display("emmg1: Socket create ok!\r\n");
MuxAddr.sin_family = AF_INET;
MuxAddr.sin_port = htons(g_ca_config.Para_Config[1].EMMG_Port);
MuxAddr.sin_addr.s_addr = htonl(INADDR_ANY);
for(i=0;i<8;i++)
MuxAddr.sin_zero[i] = 0;
/*绑定地址及端口号*/
rc = bind(g_socket_emmg1, (struct sockaddr *)&MuxAddr, sizeof(MuxAddr));
if(rc == -1)
{
close(0);
Display("emmg1: Bind error, socket=%d,error code=0x%x.\r\n",g_socket_emmg1,errno);
tm_wkafter(10);
continue;
}
Display("emmg1:Bind ok!\r\n");
/*设置缓冲的大小*/
rc=setsockopt(g_new_socket_emmg,SOL_SOCKET,SO_RCVBUF,(char *)&RcvBufLen,4);
rc=setsockopt(g_new_socket_emmg,SOL_SOCKET,SO_SNDBUF,(char *)&SendBufLen,4);
break;
}
while(1)
{
/*启动监听*/
rc = listen(g_socket_emmg1,1);
if(rc == -1)
{
close(0);
Display("emmg1: Listen error, socket=%d,error code=0x%x.\r\n",g_socket_emmg1,errno);
tm_wkafter(10);/*by xu*/
continue;
}
else
{
Display("emmg1: listen ok!\r\n");
}
g_EcmgAddrLen=sizeof(struct sockaddr_in);
/*接收连接事件*/
/*Display1("Start to Listen the connect!");*/
waitTime=0;
while((g_new_socket_emmg = accept(g_socket_emmg1, (struct sockaddr *)&g_EcmgAddr, &g_EcmgAddrLen)) == -1)
{
Display("emmg: Accept error,error code=0x%x.\r\n",errno);
tm_wkafter(10);/*by xu*/
waitTime++;
if(waitTime>20)
{
break;
}
continue;
}
if(waitTime>20)
{
continue;
}
waitTime=0;
g_Emmg_connected[1] = 1;
Display("emmg1: Accept ok,start to receive message!\r\n");
#if 0
IsKeepAlvie=1;
KeepAliveCnt =2;
KeepAliveIdleTime=5;
KeepAliveTime =5;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -