📄 sinks.c
字号:
GstElement *sink, *src, *pipeline, *bin; GstStateChangeReturn ret; GstState current, pending; GstPad *srcpad, *sinkpad; pipeline = gst_pipeline_new ("pipeline"); bin = gst_bin_new ("bin"); src = gst_element_factory_make ("fakesrc", "src"); g_object_set (G_OBJECT (src), "is-live", TRUE, NULL); sink = gst_element_factory_make ("fakesink", "sink"); gst_bin_add (GST_BIN (bin), src); gst_bin_add (GST_BIN (bin), sink); gst_bin_add (GST_BIN (pipeline), bin); srcpad = gst_element_get_pad (src, "src"); sinkpad = gst_element_get_pad (sink, "sink"); gst_pad_link (srcpad, sinkpad); gst_object_unref (srcpad); gst_object_unref (sinkpad); /* PAUSED returns NO_PREROLL because of the live source */ ret = gst_element_set_state (pipeline, GST_STATE_PAUSED); fail_unless (ret == GST_STATE_CHANGE_NO_PREROLL, "no NO_PREROLL state return"); ret = gst_element_get_state (pipeline, ¤t, &pending, GST_CLOCK_TIME_NONE); fail_unless (ret == GST_STATE_CHANGE_NO_PREROLL, "not NO_PREROLL"); fail_unless (current == GST_STATE_PAUSED, "not paused"); fail_unless (pending == GST_STATE_VOID_PENDING, "not void pending"); /* when going to PLAYING, the sink should go to PLAYING ASYNC */ ret = gst_element_set_state (pipeline, GST_STATE_PLAYING); fail_unless (ret == GST_STATE_CHANGE_ASYNC, "not ASYNC"); /* now wait for PLAYING to complete */ ret = gst_element_get_state (pipeline, ¤t, &pending, GST_CLOCK_TIME_NONE); fail_unless (ret == GST_STATE_CHANGE_SUCCESS, "not playing"); fail_unless (current == GST_STATE_PLAYING, "not playing"); fail_unless (pending == GST_STATE_VOID_PENDING, "not void pending"); ret = gst_element_set_state (pipeline, GST_STATE_NULL); fail_unless (ret == GST_STATE_CHANGE_SUCCESS, "cannot null pipeline"); gst_object_unref (pipeline);}GST_END_TEST static voidsend_eos (GstPad * sinkpad){ gst_pad_send_event (sinkpad, gst_event_new_eos ());}/* push a buffer with a very long duration in a fakesink, then push an EOS * event. fakesink should emit EOS after the duration of the buffer expired. * Going to PAUSED, however should not return ASYNC while processing the * buffer. */GST_START_TEST (test_fake_eos){ GstElement *sink, *pipeline; GstBuffer *buffer; GstStateChangeReturn ret; GstPad *sinkpad; GstFlowReturn res; GThread *thread; pipeline = gst_pipeline_new ("pipeline"); sink = gst_element_factory_make ("fakesink", "sink"); g_object_set (G_OBJECT (sink), "sync", TRUE, NULL); sinkpad = gst_element_get_pad (sink, "sink"); gst_bin_add (GST_BIN_CAST (pipeline), sink); ret = gst_element_set_state (pipeline, GST_STATE_PLAYING); fail_unless (ret == GST_STATE_CHANGE_ASYNC, "no ASYNC state return"); /* push buffer of 100 seconds, since it has a timestamp of 0, it should be * rendered immediatly and the chain function should return immediatly */ buffer = gst_buffer_new_and_alloc (10); GST_BUFFER_TIMESTAMP (buffer) = 0; GST_BUFFER_DURATION (buffer) = 100 * GST_SECOND; res = gst_pad_chain (sinkpad, buffer); fail_unless (res == GST_FLOW_OK, "no OK flow return"); /* wait for preroll, this should happen really soon. */ ret = gst_element_get_state (pipeline, NULL, NULL, -1); fail_unless (ret == GST_STATE_CHANGE_SUCCESS, "no SUCCESS state return"); /* push EOS, this will block for up to 100 seconds, until the previous * buffer has finished. We therefore push it in another thread so we can do * something else while it blocks. */ thread = g_thread_create ((GThreadFunc) send_eos, sinkpad, TRUE, NULL); fail_if (thread == NULL, "no thread"); /* wait a while so that the thread manages to start and push the EOS */ g_usleep (G_USEC_PER_SEC); /* this should cancel rendering of the EOS event and should return SUCCESS * because the sink is now prerolled on the EOS. */ ret = gst_element_set_state (pipeline, GST_STATE_PAUSED); fail_unless (ret == GST_STATE_CHANGE_SUCCESS, "no SUCCESS state return"); /* we can unref the sinkpad now, we're done with it */ gst_object_unref (sinkpad); /* wait for a second, use the debug log to see that basesink does not discard * the EOS */ g_usleep (G_USEC_PER_SEC); /* go back to PLAYING, which means waiting some more in EOS, check debug log * to see this happen. */ ret = gst_element_set_state (pipeline, GST_STATE_PLAYING); fail_unless (ret == GST_STATE_CHANGE_SUCCESS, "no SUCCESS state return"); g_usleep (G_USEC_PER_SEC); /* teardown and cleanup */ ret = gst_element_set_state (pipeline, GST_STATE_NULL); fail_unless (ret == GST_STATE_CHANGE_SUCCESS, "no SUCCESS state return"); gst_object_unref (pipeline);}GST_END_TEST;/* this variable is updated in the same thread, first it is set by the * handoff-preroll signal, then it is checked when the ASYNC_DONE is posted on * the bus */static gboolean have_preroll = FALSE;static voidasync_done_handoff (GstElement * element, GstBuffer * buf, GstPad * pad, GstElement * sink){ GST_DEBUG ("we have the preroll buffer"); have_preroll = TRUE;}/* when we get the ASYNC_DONE, query the position */static GstBusSyncReplyasync_done_func (GstBus * bus, GstMessage * msg, GstElement * sink){ if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ASYNC_DONE) { GstFormat format; gint64 position; GST_DEBUG ("we have ASYNC_DONE now"); fail_unless (have_preroll == TRUE, "no preroll buffer received"); /* get the position now */ format = GST_FORMAT_TIME; gst_element_query_position (sink, &format, &position); GST_DEBUG ("we have position %" GST_TIME_FORMAT, GST_TIME_ARGS (position)); fail_unless (position == 10 * GST_SECOND, "position is wrong"); } /* we can drop the message, nothing is listening for it. */ return GST_BUS_DROP;}static voidsend_buffer (GstPad * sinkpad){ GstBuffer *buffer; GstStateChangeReturn ret; /* push a second buffer */ GST_DEBUG ("pushing last buffer"); buffer = gst_buffer_new_and_alloc (10); GST_BUFFER_TIMESTAMP (buffer) = 200 * GST_SECOND; GST_BUFFER_DURATION (buffer) = 100 * GST_SECOND; /* this function will initially block */ ret = gst_pad_chain (sinkpad, buffer); fail_unless (ret == GST_FLOW_OK, "no OK flow return");}/* when we get the ASYNC_DONE message from a sink, we want the sink to be able * to report the duration and position. The sink should also have called the * render method. */GST_START_TEST (test_async_done){ GstElement *sink; GstBuffer *buffer; GstEvent *event; GstStateChangeReturn ret; GstPad *sinkpad; GstFlowReturn res; GstBus *bus; GThread *thread; GstFormat format; gint64 position; gboolean qret; sink = gst_element_factory_make ("fakesink", "sink"); g_object_set (G_OBJECT (sink), "sync", TRUE, NULL); g_object_set (G_OBJECT (sink), "preroll-queue-len", 2, NULL); g_object_set (G_OBJECT (sink), "signal-handoffs", TRUE, NULL); g_signal_connect (sink, "preroll-handoff", (GCallback) async_done_handoff, sink); sinkpad = gst_element_get_pad (sink, "sink"); ret = gst_element_set_state (sink, GST_STATE_PAUSED); fail_unless (ret == GST_STATE_CHANGE_ASYNC, "no ASYNC state return"); /* set bus on element synchronously listen for ASYNC_DONE */ bus = gst_bus_new (); gst_element_set_bus (sink, bus); gst_bus_set_sync_handler (bus, (GstBusSyncHandler) async_done_func, sink); /* make newsegment, this sets the position to 10sec when the buffer prerolls */ GST_DEBUG ("sending segment"); event = gst_event_new_new_segment (FALSE, 1.0, GST_FORMAT_TIME, 0, -1, 10 * GST_SECOND); res = gst_pad_send_event (sinkpad, event); /* We have not yet received any buffers so we are still in the READY state, * the position is therefore still not queryable. */ format = GST_FORMAT_TIME; position = -1; qret = gst_element_query_position (sink, &format, &position); fail_unless (qret == FALSE, "position wrong"); fail_unless (position == -1, "position is wrong"); /* Since we are paused and the preroll queue has a length of 2, this function * will return immediatly, the preroll handoff will be called and the stream * position should now be 10 seconds. */ GST_DEBUG ("pushing first buffer"); buffer = gst_buffer_new_and_alloc (10); GST_BUFFER_TIMESTAMP (buffer) = 1 * GST_SECOND; GST_BUFFER_DURATION (buffer) = 100 * GST_SECOND; res = gst_pad_chain (sinkpad, buffer); fail_unless (res == GST_FLOW_OK, "no OK flow return"); /* scond buffer, will not block either but position should still be 10 * seconds */ GST_DEBUG ("pushing second buffer"); buffer = gst_buffer_new_and_alloc (10); GST_BUFFER_TIMESTAMP (buffer) = 100 * GST_SECOND; GST_BUFFER_DURATION (buffer) = 100 * GST_SECOND; res = gst_pad_chain (sinkpad, buffer); fail_unless (res == GST_FLOW_OK, "no OK flow return"); /* check if position is still 10 seconds */ format = GST_FORMAT_TIME; gst_element_query_position (sink, &format, &position); GST_DEBUG ("first buffer position %" GST_TIME_FORMAT, GST_TIME_ARGS (position)); fail_unless (position == 10 * GST_SECOND, "position is wrong"); /* last buffer, blocks because preroll queue is filled. Start the push in a * new thread so that we can check the position */ GST_DEBUG ("starting thread"); thread = g_thread_create ((GThreadFunc) send_buffer, sinkpad, TRUE, NULL); fail_if (thread == NULL, "no thread"); GST_DEBUG ("waiting 1 second"); g_usleep (G_USEC_PER_SEC); GST_DEBUG ("waiting done"); /* check if position is still 10 seconds. This is racy because the above * thread might not yet have started the push, because of the above sleep, * this is very unlikely, though. */ format = GST_FORMAT_TIME; gst_element_query_position (sink, &format, &position); GST_DEBUG ("second buffer position %" GST_TIME_FORMAT, GST_TIME_ARGS (position)); fail_unless (position == 10 * GST_SECOND, "position is wrong"); /* now we go to playing. This should unlock and stop the above thread. */ GST_DEBUG ("going to PLAYING"); ret = gst_element_set_state (sink, GST_STATE_PLAYING); /* join the thread. At this point we know the sink processed the last buffer * and the position should now be 210 seconds; the time of the last buffer we * pushed */ GST_DEBUG ("joining thread"); g_thread_join (thread); format = GST_FORMAT_TIME; gst_element_query_position (sink, &format, &position); GST_DEBUG ("last buffer position %" GST_TIME_FORMAT, GST_TIME_ARGS (position)); fail_unless (position == 210 * GST_SECOND, "position is wrong"); gst_object_unref (sinkpad); gst_element_set_state (sink, GST_STATE_NULL); gst_object_unref (sink); gst_object_unref (bus);}GST_END_TEST;/* when we get the ASYNC_DONE, query the position */static GstBusSyncReplyasync_done_eos_func (GstBus * bus, GstMessage * msg, GstElement * sink){ if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ASYNC_DONE) { GstFormat format; gint64 position; GST_DEBUG ("we have ASYNC_DONE now"); /* get the position now */ format = GST_FORMAT_TIME; gst_element_query_position (sink, &format, &position); GST_DEBUG ("we have position %" GST_TIME_FORMAT, GST_TIME_ARGS (position)); fail_unless (position == 10 * GST_SECOND, "position is wrong"); } /* we can drop the message, nothing is listening for it. */ return GST_BUS_DROP;}/* when we get the ASYNC_DONE message from a sink, we want the sink to be able * to report the duration and position. The sink should also have called the * render method. */GST_START_TEST (test_async_done_eos){ GstElement *sink; GstEvent *event; GstStateChangeReturn ret; GstPad *sinkpad; gboolean res; GstBus *bus; GstFormat format; gint64 position; gboolean qret; sink = gst_element_factory_make ("fakesink", "sink"); g_object_set (G_OBJECT (sink), "sync", TRUE, NULL); g_object_set (G_OBJECT (sink), "preroll-queue-len", 1, NULL); sinkpad = gst_element_get_pad (sink, "sink"); ret = gst_element_set_state (sink, GST_STATE_PAUSED); fail_unless (ret == GST_STATE_CHANGE_ASYNC, "no ASYNC state return"); /* set bus on element synchronously listen for ASYNC_DONE */ bus = gst_bus_new (); gst_element_set_bus (sink, bus); gst_bus_set_sync_handler (bus, (GstBusSyncHandler) async_done_eos_func, sink); /* make newsegment, this sets the position to 10sec when the buffer prerolls */ GST_DEBUG ("sending segment"); event = gst_event_new_new_segment (FALSE, 1.0, GST_FORMAT_TIME, 0, -1, 10 * GST_SECOND); res = gst_pad_send_event (sinkpad, event); /* We have not yet received any buffers so we are still in the READY state, * the position is therefore still not queryable. */ format = GST_FORMAT_TIME; position = -1; qret = gst_element_query_position (sink, &format, &position); fail_unless (qret == FALSE, "position wrong"); fail_unless (position == -1, "position is wrong"); /* Since we are paused and the preroll queue has a length of 1, this function * will return immediatly. The EOS will complete the preroll and the * position should now be 10 seconds. */ GST_DEBUG ("pushing EOS"); event = gst_event_new_eos (); res = gst_pad_send_event (sinkpad, event); fail_unless (res == TRUE, "no TRUE return"); /* check if position is still 10 seconds */ format = GST_FORMAT_TIME; gst_element_query_position (sink, &format, &position); GST_DEBUG ("EOS position %" GST_TIME_FORMAT, GST_TIME_ARGS (position)); fail_unless (position == 10 * GST_SECOND, "position is wrong"); gst_object_unref (sinkpad); gst_element_set_state (sink, GST_STATE_NULL); gst_object_unref (sink); gst_object_unref (bus);}GST_END_TEST;/* test: try changing state of sinks */Suite *gst_sinks_suite (void){ Suite *s = suite_create ("Sinks"); TCase *tc_chain = tcase_create ("general"); /* time out after 10s, not the default 3, we need this for the last test. */ tcase_set_timeout (tc_chain, 10); suite_add_tcase (s, tc_chain); tcase_add_test (tc_chain, test_sink); tcase_add_test (tc_chain, test_sink_completion); tcase_add_test (tc_chain, test_src_sink); tcase_add_test (tc_chain, test_livesrc_remove); tcase_add_test (tc_chain, test_livesrc_sink); tcase_add_test (tc_chain, test_livesrc2_sink); tcase_add_test (tc_chain, test_livesrc3_sink); tcase_add_test (tc_chain, test_locked_sink); tcase_add_test (tc_chain, test_unlinked_live); tcase_add_test (tc_chain, test_delayed_async); tcase_add_test (tc_chain, test_added_async); tcase_add_test (tc_chain, test_added_async2); tcase_add_test (tc_chain, test_add_live); tcase_add_test (tc_chain, test_add_live2); tcase_add_test (tc_chain, test_bin_live); tcase_add_test (tc_chain, test_fake_eos); tcase_add_test (tc_chain, test_async_done); tcase_add_test (tc_chain, test_async_done_eos); return s;}GST_CHECK_MAIN (gst_sinks);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -