📄 hi_rtp.c
字号:
#ifdef __cplusplus#if __cplusplusextern "C"{#endif#endif /* __cplusplus */#include "hi_rtp.h"/*创建socket*/HI_S32 HI_Socket( HI_S32 af, HI_S32 type,HI_S32 protocol ){ return socket(af,type,protocol);}/*在非连接上接收数据*/HI_SSIZE_T HI_Recvfrom( HI_S32 s, HI_CHAR *buf, HI_S32 len, HI_S32 flags, struct sockaddr *pfrom, HI_S32 *pfromlen ){ return recvfrom( s, buf, len, flags, (struct sockaddr *)pfrom, pfromlen);}HI_S32 HI_SetSockOpt(HI_S32 s, HI_S32 level, HI_S32 optname, const HI_VOID *optval, HI_SockLen_T optlen){ return setsockopt( s, level, optname, optval, optlen);}/*关闭socket*/HI_S32 HI_CloseSocket(HI_S32 ulSocket){ return close(ulSocket);}/*将地址绑定到一个socket*/ HI_S32 HI_Bind( HI_S32 s, const struct sockaddr *paddr, HI_S32 addrlen ){ return bind(s, (struct sockaddr *)paddr, addrlen);}/* 创建一个*/HI_S32 HI_RTP_Recv_Create(IO RTP_RECV_S ** ppRtpStream, IN HI_S32 _listen_port, IN RTP_ON_RECV_CB _on_recv_fh, int *ext_args ){ RTP_RECV_S* pRecv; struct sockaddr_in local_addr; int socket_opt_value = 1; pRecv = (RTP_RECV_S*)malloc(sizeof(RTP_RECV_S)); if (NULL == pRecv) { printf("create new rtp receiver error. not enough memory.\n"); return -1; } memset(pRecv, 0 , sizeof(RTP_RECV_S)); pRecv->rtp_on_recv = _on_recv_fh; pRecv->ext_args = ext_args; pRecv->runState = RTP_RUNSTATE_STOP; pRecv->listen_port = _listen_port; if (( pRecv->sock = HI_Socket(AF_INET, SOCK_DGRAM, 0)) == -1) { printf("<RTP>create socket error.\n"); return -1; } socket_opt_value = 1; if (HI_SetSockOpt(pRecv->sock ,SOL_SOCKET,SO_REUSEADDR,&socket_opt_value,sizeof(int)) == -1) { printf("HI_SetSockOpt failed!\n"); return -1; } socket_opt_value = 0xFFFFFFFF; if (HI_SetSockOpt(pRecv->sock,SOL_SOCKET,SO_RCVBUF,(void*)(&socket_opt_value),sizeof(int)) == -1) { } local_addr.sin_family = AF_INET; // host byte order local_addr.sin_port = htons(pRecv->listen_port); // short, network byte order local_addr.sin_addr.s_addr = htonl(INADDR_ANY);// automatically fill with my IP memset(&(local_addr.sin_zero), '\0', 8); // zero the rest of the struct if (HI_Bind(pRecv->sock, (struct sockaddr *)&local_addr, sizeof(struct sockaddr)) == -1) { printf("<RTP>socket bind error.\n"); return -1; } printf("<RTP> Receiver socket create ok.\n"); *ppRtpStream = pRecv; return HI_SUCCESS;}static HI_VOID* HI_RTP_RecvHandle(HI_VOID* args){ RTP_RECV_S *pRtpStream = NULL; HI_U8 RtpRecvBuff[RPT_MAX_PACKET_BUFF]; struct sockaddr from; struct timeval tv; HI_S32 from_len = 0; HI_S32 nRet = 0; HI_S32 nByteRecv = 0; fd_set readfds; if (NULL == args) { printf("<RTP>recv thread create error. pass null para.\n"); return NULL; } pRtpStream = (RTP_RECV_S*)args; from_len = sizeof(struct sockaddr); if (pRtpStream->sock == 0) { printf("<RTP>recv thread create error. no active socket.\n"); return NULL; } printf("<RTP>Receiver thread start ok pid %d!\n",getpid()); tv.tv_sec = 0; tv.tv_usec = 5000; pRtpStream->runState = RTP_RUNSTATE_RUNNING ; while(pRtpStream->runState == RTP_RUNSTATE_RUNNING) { tv.tv_sec = 0; tv.tv_usec = 50000; memset(RtpRecvBuff, 0, RPT_MAX_PACKET_BUFF); FD_ZERO(&readfds); FD_SET(pRtpStream->sock, &readfds); nRet = select(pRtpStream->sock + 1, &readfds, NULL, NULL, &tv); if(nRet == -1) { //printf("<RTP-Error> Receiver select error. %s", strerror(errno)); } else if (nRet == 0) { //printf("<RTP-Error>TimeOut times: %llu", ++pRtpStream->stats.timeout_cnt); } else { if(FD_ISSET(pRtpStream->sock, &readfds)) { nByteRecv = HI_Recvfrom(pRtpStream->sock, RtpRecvBuff, RPT_MAX_PACKET_BUFF, 0, &from, &from_len); //printf("<RTP>Recv %d bytes.\n", nByteRecv); if (nByteRecv > 0 ) { /* Stat recv bytes */ pRtpStream->stats.recv_byte += nByteRecv; pRtpStream->stats.recv_packet++; if (pRtpStream->rtp_on_recv) { pRtpStream->rtp_on_recv(pRtpStream, RtpRecvBuff, nByteRecv, pRtpStream->ext_args); } } else { printf("<RTP-ERROR>winsocket error code: %d", errno); } } else { printf("<RTP>unknown error. "); } } } return NULL;}/* 开启接收服务 */ HI_S32 HI_RTP_Recv_Start(IN RTP_RECV_S * pRtpStream){ //void * thdRet; if (NULL == pRtpStream) { printf("<RTP>null pointer input.\n"); return -1; } pthread_create(&(pRtpStream->thd), NULL, HI_RTP_RecvHandle, (HI_VOID*)pRtpStream); return HI_SUCCESS;}HI_S32 HI_RTP_Recv_Stop(IO RTP_RECV_S * pRtpStream){ if (pRtpStream) { pRtpStream->runState = RTP_RUNSTATE_STOP ; } return (HI_S32)pthread_join(pRtpStream->thd, NULL);}HI_S32 HI_RTP_Recv_Destroy(IO RTP_RECV_S *pRtpStream){ if (pRtpStream == NULL) { return HI_SUCCESS; } if (pRtpStream->runState == RTP_RUNSTATE_RUNNING) { HI_RTP_Recv_Stop(pRtpStream); } HI_CloseSocket(pRtpStream->sock); return HI_SUCCESS; }HI_PORT HI_SOCKET_Udp_GetPort(HI_SOCKET fd){ int namelen = sizeof(struct sockaddr_in); struct sockaddr_in s; if (getsockname(fd, (struct sockaddr *)&s, &namelen)) { return 0; } return ntohs( s.sin_port );}HI_SOCKET HI_SOCKET_Udp_Open(HI_PORT port){ HI_SOCKET fd; struct sockaddr_in addr; if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { printf("UDP Socket Open error: create socket error.\n"); return -1; } addr.sin_family = AF_INET; addr.sin_addr.s_addr = htonl(INADDR_ANY); addr.sin_port = htons(port); memset(&(addr.sin_zero), '\0', 8); // zero the rest of the struct if (bind (fd, (struct sockaddr *)&addr, sizeof (addr))) { HI_CloseSocket(fd); printf("UDP Socket Open error: bind socket error.\n"); return -1; } #if 0 /* set to non-blocking */ if (ioctlsocket(f, FIONBIO, &on) < 0) { WRITE_LOG_DEBUG("UDP Socket Open error: set to nonblock mode error.\n"); return -1; } #endif return fd;}#define RTP_DEFAULT_SSRC 41030HI_S32 HI_RTP_Sender_Create(RTP_SENDER_S ** ppRtpStream, RTP_PT_E pt){ RTP_SENDER_S* pSender; HI_SOCKET sock; HI_PORT port = 0; pSender = (RTP_SENDER_S*)malloc(sizeof(RTP_SENDER_S)); if (NULL == pSender) { printf("create new rtp sender error. not enough memory.\n"); return -1; } memset(pSender, 0 , sizeof(RTP_SENDER_S)); //INIT_LIST_HEAD(&pSender->targethosts); memset(pSender->targethosts, 0, sizeof(pSender->targethosts)); pSender->bActive = HI_FALSE; pSender->last_ts = 3600; *ppRtpStream = pSender; pSender->pt = pt; pSender->ssrc = RTP_DEFAULT_SSRC; sock = HI_SOCKET_Udp_Open(0); if (sock < 0 ) { printf("<RTP>create socket error.\n"); return -1; } port = HI_SOCKET_Udp_GetPort(sock); if (port & 0x1) /*奇数*/ { pSender->rtcpport = port; close(sock); port -= 1; sock = HI_SOCKET_Udp_Open(0); if ( sock < 0 ) { printf("<RTP>create socket error.\n"); return -1; } port = HI_SOCKET_Udp_GetPort(sock); } pSender->rtpport = port; pSender->rtcpport = port + 1; pSender->sock = sock; int yes = 1; if (setsockopt(pSender->sock,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(int)) == -1) { printf("HI_ERR_RTP_SOCKET\n"); return -1; } printf("Create RTP Sender at port: %d", pSender->rtpport ); return HI_SUCCESS; }RTP_TARGETHOST_S* HI_RTP_Sender_Find(IN RTP_SENDER_S* pRtpStream, IN HI_CHAR* ip, IN HI_PORT port){ int i = 0; for (i = 0 ; i < RTP_MAX_SENDER ; i++) { if ((strcmp(ip, pRtpStream->targethosts[i].remote_ip) == 0) && (port == pRtpStream->targethosts[i].remote_port)) { return (&pRtpStream->targethosts[i]); } } return NULL; }RTP_TARGETHOST_S* HI_RTP_Sender_FindAvail(IN RTP_SENDER_S* pRtpStream){ int i = 0; for (i = 0 ; i < RTP_MAX_SENDER ; i++) { if (pRtpStream->targethosts[i].bActive == HI_FALSE) { //WRITE_LOG_DEBUG("rtp host avail %d \n", i); return (&pRtpStream->targethosts[i]); } } return NULL; }HI_S32 HI_RTP_Sender_Add(IN RTP_SENDER_S* pRtpStream, IN HI_CHAR* ip, IN HI_PORT port){ RTP_TARGETHOST_S* pTarget = NULL; printf("rtp sender add ip, %s:%d", ip, port); pTarget = HI_RTP_Sender_Find(pRtpStream, ip, port); if (pTarget == NULL) { pTarget = HI_RTP_Sender_FindAvail(pRtpStream); if (pTarget == NULL) { return -1; } memset(pTarget, 0 , sizeof(RTP_TARGETHOST_S)); strcpy(pTarget->remote_ip, ip); pTarget->remote_port = port; pTarget->remote_addr.sin_family = AF_INET; // host byte order pTarget->remote_addr.sin_port = htons(pTarget->remote_port); // short, network byte order pTarget->remote_addr.sin_addr.s_addr = inet_addr(pTarget->remote_ip); memset(&(pTarget->remote_addr.sin_zero), '\0', 8); // zero the rest of the struct } pTarget->hostState = RTP_TARGETHOST_STATE_REQ_IFrame; pTarget->bActive = HI_TRUE; return HI_SUCCESS;}/*带有同步头*/HI_S32 HI_RTP_Packet(IN RTP_SENDER_S * pRtpStream , HI_S32 ts_inc, HI_U32 marker, HI_CHAR *pPayload, HI_S32 len){ RTP_HDR_S* pRtpHdr = NULL; memset(pRtpStream->buff, 0, RPT_MAX_PACKET_BUFF); pRtpStream->buffLen = 0; pRtpHdr = (RTP_HDR_S*)pRtpStream->buff; RTP_HDR_SET_VERSION(pRtpHdr, RTP_VERSION); RTP_HDR_SET_P(pRtpHdr, 0); RTP_HDR_SET_X(pRtpHdr, 0); RTP_HDR_SET_CC(pRtpHdr, 0); RTP_HDR_SET_M(pRtpHdr, marker); RTP_HDR_SET_PT(pRtpHdr, pRtpStream->pt); RTP_HDR_SET_SEQNO(pRtpHdr, htons(pRtpStream->last_sn)); RTP_HDR_SET_TS(pRtpHdr, htonl(pRtpStream->last_ts)); RTP_HDR_SET_SSRC(pRtpHdr, htonl(pRtpStream->ssrc)); pRtpStream->last_sn++; pRtpStream->last_ts += ts_inc; memcpy(pRtpStream->buff + RTP_HDR_LEN, pPayload, len); pRtpStream->buffLen = RTP_HDR_LEN + len; return HI_SUCCESS;}HI_S32 HI_RTP_Send(IN RTP_SENDER_S * pRtpStream){ RTP_TARGETHOST_S* pHost = NULL; HI_S32 iRet; int i = 0; for(i = 0;i < RTP_MAX_SENDER;i++) { pHost = &pRtpStream->targethosts[i]; //WRITE_LOG_DEBUG("***Active: %d, BuffLen: %d, hostState:%d**\n", // pHost->bActive ,pRtpStream->buffLen ,pHost->hostState ); if (pHost->bActive && pRtpStream->buffLen > 0 ) { iRet = sendto(pRtpStream->sock, pRtpStream->buff, pRtpStream->buffLen, 0, (struct sockaddr*)&pHost->remote_addr, sizeof(struct sockaddr) ); if (iRet != pRtpStream->buffLen) { perror("send rtp error."); pRtpStream->stats.sent_error++; printf("<RTP>send packet error. %s", strerror(errno)); //HI_ERRNO(HI_ERR_RTP_SEND); } else { pRtpStream->stats.sent_byte += pRtpStream->buffLen; pRtpStream->stats.sent_packet++; } if (pRtpStream->buffLen <= 16) exit(-1); } } return HI_SUCCESS;}/************************ End For RTP ***************************************/#ifdef __cplusplus#if __cplusplus}#endif#endif /* __cplusplus */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -