📄 rtsp_thread.c
字号:
change_state(state, RTP_DATA_UNKNOWN); return 0;}static int data_unknown (rtsp_client_t *info, rtp_state_t *state){ int blen; int ix; blen = rtsp_bytes_in_buffer(info); if (info->m_offset_on != 0) { move_end_of_buffer(info, blen); } blen = rtsp_receive_socket(info, info->m_resp_buffer + info->m_offset_on, RECV_BUFF_DEFAULT_LEN - info->m_offset_on, 0, 0); if (blen < 0) return -1; info->m_buffer_len += blen; blen = rtsp_bytes_in_buffer(info); for (ix = 0; ix < blen; ix++) { if (info->m_resp_buffer[info->m_offset_on] == '$') { info->m_offset_on++; change_state(state, RTP_HEADER_CHECK); return 0; } else if (tolower(info->m_resp_buffer[info->m_offset_on]) == 'r') { change_state(state, RTSP_HEADER_CHECK); return 0; } else { info->m_offset_on++; } } return -1;}/* * rtsp_thread() - rtsp thread handler - receives and * processes all data */int rtsp_thread (void *data){ rtsp_client_t *info = (rtsp_client_t *)data; int continue_thread; int ret; unsigned int ix; int state_cont; int bytes; int got_rtp_pak; rtp_state_t state; continue_thread = 0; memset(&state, sizeof(state), 0); state.rtp_ptr = NULL; state.state = RTP_DATA_UNKNOWN; rtsp_thread_init_thread_info(info); while (continue_thread == 0) { // rtsp_debug(LOG_DEBUG, "thread waiting"); ret = rtsp_thread_wait_for_event(info); if (ret <= 0) { if (ret < 0) { //rtsp_debug(LOG_ERR, "RTSP loop error %d errno %d", ret, errno); } else { if (info->server_socket != -1) { for (ix = 0; ix < MAX_RTP_THREAD_SESSIONS; ix++) { if (info->m_callback[ix].rtp_callback_set && info->m_callback[ix].rtp_periodic != NULL) { (info->m_callback[ix].rtp_periodic)(info->m_callback[ix].rtp_userdata); } } } } continue; } /* * See if the communications socket for IPC has any data */ //rtsp_debug(LOG_DEBUG, "Thread checking control"); ret = rtsp_thread_has_control_message(info); if (ret) { rtsp_msg_type_t msg_type; int read; /* * Yes - read the message type. */ read = rtsp_thread_get_control_message(info, &msg_type); if (read == sizeof(msg_type)) { // received message //rtsp_debug(LOG_DEBUG, "Comm socket msg %d", msg_type); switch (msg_type) { case RTSP_MSG_QUIT: continue_thread = 1; break; case RTSP_MSG_START: ret = rtsp_thread_start_cmd(info); rtsp_thread_ipc_respond(info, ret); break; case RTSP_MSG_SEND_AND_GET: ret = rtsp_thread_send_and_get(info); if (ret < 0) { rtsp_thread_ipc_respond(info, ret); } else { // indicate we're supposed to receive... state.receiving_rtsp_response = 1; } break; case RTSP_MSG_PERFORM_CALLBACK: ret = rtsp_msg_thread_perform_callback(info); rtsp_thread_ipc_respond(info, ret); break; case RTSP_MSG_SET_RTP_CALLBACK: ret = rtsp_msg_thread_set_rtp_callback(info); rtsp_thread_ipc_respond(info, ret); break; default: rtsp_debug(LOG_ERR, "Unknown message %d received", msg_type); } } } /* * See if the data socket has any data */ //rtsp_debug(LOG_DEBUG, "Thread checking socket"); if (info->server_socket != -1) { ret = rtsp_thread_has_receive_data(info); if (ret) { state_cont = 0; while (state_cont == 0) { got_rtp_pak = 0; switch (state.state) { case RTP_DATA_UNKNOWN: state_cont = data_unknown(info, &state); break; case RTP_HEADER_CHECK: got_rtp_pak = 1; state_cont = check_rtp_header(info, &state); break; case RTSP_HEADER_CHECK: state_cont = check_rtsp_resp(info, &state); break; case RTP_DATA_START: /* * At the beginning... Either we're getting a $, or getting * a RTP packet. */ bytes = rtsp_bytes_in_buffer(info); if (bytes < 4) { if (bytes != 0 && info->m_offset_on != 0) { move_end_of_buffer(info, bytes); } ret = rtsp_receive_socket(info, info->m_resp_buffer + bytes, 4 - bytes, 0, 0); if (ret < 0) { state_cont = 1; break; } bytes += ret; info->m_offset_on = 0; info->m_buffer_len = bytes; if (bytes < 4) { state_cont = 1; break; } } // we either have a $ - indicating RTP, or a R (for RTSP response) if (info->m_resp_buffer[info->m_offset_on] == '$') { /* * read the 3 byte header - 1 byte for interleaved channel, * 2 byte length. */ info->m_offset_on++; ret = rtsp_recv(info, state.header, 3); if (ret != 3) continue; state.rtp_len = (state.header[1] << 8) | state.header[2]; state_cont = get_rtp_packet(info, &state); got_rtp_pak = 1; } else if (tolower(info->m_resp_buffer[info->m_offset_on]) == 'r') { state_cont = rtsp_get_resp(info, &state); } else { info->m_offset_on++; rtsp_debug(LOG_INFO, "Unknown data %d in rtp stream", info->m_resp_buffer[info->m_offset_on]); change_state(&state, RTP_DATA_UNKNOWN); } break; case RTP_DATA_CONTINUE: state_cont = get_rtp_packet(info, &state); got_rtp_pak = 1; break; } if (got_rtp_pak == 1 && state.try_periodic != 0) { state.try_periodic = 0; ix = state.header[0] / 2; if (info->m_callback[ix].rtp_callback_set && info->m_callback[ix].rtp_periodic != NULL) { (info->m_callback[ix].rtp_periodic)(info->m_callback[ix].rtp_userdata); } } } } // end server_socket has data } // end have server socket } // end while continue_thread SDL_Delay(10); /* * Okay - we've gotten a quit - we're done */ if (state.rtp_ptr != NULL) { xfree(state.rtp_ptr); } // exiting thread - get rid of the sockets. rtsp_thread_close(info);#ifdef _WIN32 WSACleanup();#endif return 0;}int rtsp_thread_perform_callback (rtsp_client_t *info, rtsp_thread_callback_f func, void *ud){ int ret, callback_ret; rtsp_wrap_msg_callback_t callback_body; callback_body.msg = RTSP_MSG_PERFORM_CALLBACK; callback_body.body.func = func; callback_body.body.ud = ud; ret = rtsp_thread_ipc_send_wait(info, (unsigned char *)&callback_body, sizeof(callback_body), &callback_ret); if (ret != sizeof(callback_ret)) { return -1; } return (callback_ret);}int rtsp_thread_set_rtp_callback (rtsp_client_t *info, rtp_callback_f rtp_callback, rtsp_thread_callback_f rtp_periodic, int rtp_interleave, void *ud){ int ret, callback_ret; rtsp_wrap_msg_rtp_callback_t callback_body; callback_body.msg = RTSP_MSG_SET_RTP_CALLBACK; callback_body.body.callback_func = rtp_callback; callback_body.body.periodic_func = rtp_periodic; callback_body.body.ud = ud; callback_body.body.interleave = rtp_interleave; ret = rtsp_thread_ipc_send_wait(info, (unsigned char *)&callback_body, sizeof(callback_body), &callback_ret); if (ret != sizeof(callback_ret)) { return -1; } return (callback_ret);} /* * rtsp_create_client_for_rtp_tcp * create threaded rtsp session */rtsp_client_t *rtsp_create_client_for_rtp_tcp (const char *url, int *err){ rtsp_client_t *info; int ret; rtsp_msg_type_t msg; rtsp_msg_resp_t resp;#if 0 if (func == NULL) { rtsp_debug(LOG_CRIT, "Callback is NULL"); *err = EINVAL; return NULL; }#endif info = rtsp_create_client_common(url, err); if (info == NULL) return (NULL); info->msg_mutex = SDL_CreateMutex(); if (rtsp_create_thread(info) != 0) { free_rtsp_client(info); return (NULL); } msg = RTSP_MSG_START; ret = rtsp_thread_ipc_send_wait(info, (unsigned char *)&msg, sizeof(msg), &resp); if (ret < 0 || resp < 0) { free_rtsp_client(info); *err = resp; return NULL; } return (info);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -