📄 rtsp_thread.c
字号:
change_state(state, RTP_DATA_UNKNOWN);
return 0;
}
static int data_unknown (rtsp_client_t *info,
rtp_state_t *state)
{
int blen;
int ix;
blen = rtsp_bytes_in_buffer(info);
if (info->m_offset_on != 0) {
move_end_of_buffer(info, blen);
}
blen = rtsp_receive_socket(info,
info->m_resp_buffer + info->m_offset_on,
RECV_BUFF_DEFAULT_LEN - info->m_offset_on,
0,
0);
if (blen < 0) return -1;
info->m_buffer_len += blen;
blen = rtsp_bytes_in_buffer(info);
for (ix = 0; ix < blen; ix++) {
if (info->m_resp_buffer[info->m_offset_on] == '$') {
info->m_offset_on++;
change_state(state, RTP_HEADER_CHECK);
return 0;
} else if (tolower(info->m_resp_buffer[info->m_offset_on]) == 'r') {
change_state(state, RTSP_HEADER_CHECK);
return 0;
} else {
info->m_offset_on++;
}
}
return -1;
}
/*
* rtsp_thread() - rtsp thread handler - receives and
* processes all data
*/
int rtsp_thread (void *data)
{
rtsp_client_t *info = (rtsp_client_t *)data;
int continue_thread;
int ret;
unsigned int ix;
int state_cont;
int bytes;
int got_rtp_pak;
rtp_state_t state;
continue_thread = 0;
memset(&state, sizeof(state), 0);
state.rtp_ptr = NULL;
state.state = RTP_DATA_UNKNOWN;
rtsp_thread_init_thread_info(info);
while (continue_thread == 0) {
// rtsp_debug(LOG_DEBUG, "thread waiting");
ret = rtsp_thread_wait_for_event(info);
if (ret <= 0) {
if (ret < 0) {
//rtsp_debug(LOG_ERR, "RTSP loop error %d errno %d", ret, errno);
} else {
if (info->server_socket != -1) {
for (ix = 0; ix < MAX_RTP_THREAD_SESSIONS; ix++) {
if (info->m_callback[ix].rtp_callback_set &&
info->m_callback[ix].rtp_periodic != NULL) {
(info->m_callback[ix].rtp_periodic)(info->m_callback[ix].rtp_userdata);
}
}
}
}
continue;
}
/*
* See if the communications socket for IPC has any data
*/
//rtsp_debug(LOG_DEBUG, "Thread checking control");
ret = rtsp_thread_has_control_message(info);
if (ret) {
rtsp_msg_type_t msg_type;
int read;
/*
* Yes - read the message type.
*/
read = rtsp_thread_get_control_message(info, &msg_type);
if (read == sizeof(msg_type)) {
// received message
//rtsp_debug(LOG_DEBUG, "Comm socket msg %d", msg_type);
switch (msg_type) {
case RTSP_MSG_QUIT:
continue_thread = 1;
break;
case RTSP_MSG_START:
ret = rtsp_thread_start_cmd(info);
rtsp_thread_ipc_respond(info, ret);
break;
case RTSP_MSG_SEND_AND_GET:
ret = rtsp_thread_send_and_get(info);
if (ret < 0) {
rtsp_thread_ipc_respond(info, ret);
} else {
// indicate we're supposed to receive...
state.receiving_rtsp_response = 1;
}
break;
case RTSP_MSG_PERFORM_CALLBACK:
ret = rtsp_msg_thread_perform_callback(info);
rtsp_thread_ipc_respond(info, ret);
break;
case RTSP_MSG_SET_RTP_CALLBACK:
ret = rtsp_msg_thread_set_rtp_callback(info);
rtsp_thread_ipc_respond(info, ret);
break;
default:
rtsp_debug(LOG_ERR, "Unknown message %d received", msg_type);
}
}
}
/*
* See if the data socket has any data
*/
//rtsp_debug(LOG_DEBUG, "Thread checking socket");
if (info->server_socket != -1) {
ret = rtsp_thread_has_receive_data(info);
if (ret) {
state_cont = 0;
while (state_cont == 0) {
got_rtp_pak = 0;
switch (state.state) {
case RTP_DATA_UNKNOWN:
state_cont = data_unknown(info, &state);
break;
case RTP_HEADER_CHECK:
got_rtp_pak = 1;
state_cont = check_rtp_header(info, &state);
break;
case RTSP_HEADER_CHECK:
state_cont = check_rtsp_resp(info, &state);
break;
case RTP_DATA_START:
/*
* At the beginning... Either we're getting a $, or getting
* a RTP packet.
*/
bytes = rtsp_bytes_in_buffer(info);
if (bytes < 4) {
if (bytes != 0 && info->m_offset_on != 0) {
move_end_of_buffer(info, bytes);
}
ret = rtsp_receive_socket(info,
info->m_resp_buffer + bytes,
4 - bytes,
0,
0);
if (ret < 0) {
state_cont = 1;
break;
}
bytes += ret;
info->m_offset_on = 0;
info->m_buffer_len = bytes;
if (bytes < 4) {
state_cont = 1;
break;
}
}
// we either have a $ - indicating RTP, or a R (for RTSP response)
if (info->m_resp_buffer[info->m_offset_on] == '$') {
/*
* read the 3 byte header - 1 byte for interleaved channel,
* 2 byte length.
*/
info->m_offset_on++;
ret = rtsp_recv(info, state.header, 3);
if (ret != 3) continue;
state.rtp_len = (state.header[1] << 8) | state.header[2];
state_cont = get_rtp_packet(info, &state);
got_rtp_pak = 1;
} else if (tolower(info->m_resp_buffer[info->m_offset_on]) == 'r') {
state_cont = rtsp_get_resp(info, &state);
} else {
info->m_offset_on++;
rtsp_debug(LOG_INFO, "Unknown data %d in rtp stream",
info->m_resp_buffer[info->m_offset_on]);
change_state(&state, RTP_DATA_UNKNOWN);
}
break;
case RTP_DATA_CONTINUE:
state_cont = get_rtp_packet(info, &state);
got_rtp_pak = 1;
break;
}
if (got_rtp_pak == 1 && state.try_periodic != 0) {
state.try_periodic = 0;
ix = state.header[0] / 2;
if (info->m_callback[ix].rtp_callback_set &&
info->m_callback[ix].rtp_periodic != NULL) {
(info->m_callback[ix].rtp_periodic)(info->m_callback[ix].rtp_userdata);
}
}
}
} // end server_socket has data
} // end have server socket
} // end while continue_thread
SDL_Delay(10);
/*
* Okay - we've gotten a quit - we're done
*/
if (state.rtp_ptr != NULL) {
xfree(state.rtp_ptr);
}
// exiting thread - get rid of the sockets.
rtsp_thread_close(info);
#ifdef _WINDOWS
WSACleanup();
#endif
return 0;
}
int rtsp_thread_perform_callback (rtsp_client_t *info,
rtsp_thread_callback_f func,
void *ud)
{
int ret, callback_ret;
rtsp_wrap_msg_callback_t callback_body;
callback_body.msg = RTSP_MSG_PERFORM_CALLBACK;
callback_body.body.func = func;
callback_body.body.ud = ud;
ret = rtsp_thread_ipc_send_wait(info,
(unsigned char *)&callback_body,
sizeof(callback_body),
&callback_ret);
if (ret != sizeof(callback_ret)) {
return -1;
}
return (callback_ret);
}
int rtsp_thread_set_rtp_callback (rtsp_client_t *info,
rtp_callback_f rtp_callback,
rtsp_thread_callback_f rtp_periodic,
int rtp_interleave,
void *ud)
{
int ret, callback_ret;
rtsp_wrap_msg_rtp_callback_t callback_body;
callback_body.msg = RTSP_MSG_SET_RTP_CALLBACK;
callback_body.body.callback_func = rtp_callback;
callback_body.body.periodic_func = rtp_periodic;
callback_body.body.ud = ud;
callback_body.body.interleave = rtp_interleave;
ret = rtsp_thread_ipc_send_wait(info,
(unsigned char *)&callback_body,
sizeof(callback_body),
&callback_ret);
if (ret != sizeof(callback_ret)) {
return -1;
}
return (callback_ret);
}
/*
* rtsp_create_client_for_rtp_tcp
* create threaded rtsp session
*/
rtsp_client_t *rtsp_create_client_for_rtp_tcp (const char *url,
int *err)
{
rtsp_client_t *info;
int ret;
rtsp_msg_type_t msg;
rtsp_msg_resp_t resp;
#if 0
if (func == NULL) {
rtsp_debug(LOG_CRIT, "Callback is NULL");
*err = EINVAL;
return NULL;
}
#endif
info = rtsp_create_client_common(url, err);
if (info == NULL) return (NULL);
info->msg_mutex = SDL_CreateMutex();
if (rtsp_create_thread(info) != 0) {
free_rtsp_client(info);
return (NULL);
}
msg = RTSP_MSG_START;
ret = rtsp_thread_ipc_send_wait(info,
(unsigned char *)&msg,
sizeof(msg),
&resp);
if (ret < 0 || resp < 0) {
free_rtsp_client(info);
*err = resp;
return NULL;
}
return (info);
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -