From 26b8a96b84650a481aaf37a2b750f40a54c5f35b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 3 May 2021 17:10:20 +0300 Subject: [PATCH] appsrc: Add test for testing the max-* and leaky-type properties Part-of: --- tests/check/elements/appsrc.c | 310 +++++++++++++++++++++++++++++++++- 1 file changed, 309 insertions(+), 1 deletion(-) diff --git a/tests/check/elements/appsrc.c b/tests/check/elements/appsrc.c index b385a75eb3..ae25429d0c 100644 --- a/tests/check/elements/appsrc.c +++ b/tests/check/elements/appsrc.c @@ -22,7 +22,7 @@ #include "config.h" #endif -#include +#include #include #include @@ -1061,6 +1061,313 @@ GST_START_TEST (test_appsrc_custom_segment_twice) GST_END_TEST; +static GstPadProbeReturn +block_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) +{ + return GST_PAD_PROBE_OK; +} + +GST_START_TEST (test_appsrc_limits) +{ + GstHarness *h; + GstPad *srcpad; + GstBuffer *buffer; + gulong probe_id; + guint64 current_level; + + /* Test if the bytes limit works correctly with both leaky types */ + h = gst_harness_new ("appsrc"); + g_object_set (h->element, + "format", GST_FORMAT_TIME, + "max-bytes", G_GUINT64_CONSTANT (200), + "max-time", G_GUINT64_CONSTANT (0), + "max-buffers", G_GUINT64_CONSTANT (0), "leaky-type", 1 /* upstream */ , + NULL); + gst_harness_play (h); + srcpad = gst_element_get_static_pad (h->element, "src"); + + /* Pad probe to ensure that the source pad task is blocked and we can + * deterministically test the behaviour of the appsrc queue */ + probe_id = + gst_pad_add_probe (srcpad, + GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST | + GST_PAD_PROBE_TYPE_BLOCKING, block_probe, NULL, NULL); + + buffer = gst_buffer_new_and_alloc (100); + GST_BUFFER_PTS (buffer) = 0 * GST_SECOND; + gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + + /* wait until the appsrc is blocked downstream */ + while (!gst_pad_is_blocking (srcpad)) + g_thread_yield (); + + buffer = gst_buffer_new_and_alloc (100); + GST_BUFFER_PTS (buffer) = 1 * GST_SECOND; + gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + buffer = gst_buffer_new_and_alloc (100); + GST_BUFFER_PTS (buffer) = 2 * GST_SECOND; + gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + + /* The first buffer is not queued anymore but inside the pad probe */ + g_object_get (h->element, "current-level-bytes", ¤t_level, NULL); + fail_unless_equals_uint64 (current_level, 200); + g_object_get (h->element, "current-level-buffers", ¤t_level, NULL); + fail_unless_equals_uint64 (current_level, 2); + g_object_get (h->element, "current-level-time", ¤t_level, NULL); + fail_unless_equals_uint64 (current_level, 1 * GST_SECOND); + + buffer = gst_buffer_new_and_alloc (100); + GST_BUFFER_PTS (buffer) = 4 * GST_SECOND; + gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + + /* The new buffer was dropped now, otherwise we would have 2 seconds queued */ + g_object_get (h->element, "current-level-bytes", ¤t_level, NULL); + fail_unless_equals_uint64 (current_level, 200); + g_object_get (h->element, "current-level-buffers", ¤t_level, NULL); + fail_unless_equals_uint64 (current_level, 2); + g_object_get (h->element, "current-level-time", ¤t_level, NULL); + fail_unless_equals_uint64 (current_level, 1 * GST_SECOND); + + g_object_set (h->element, "leaky-type", 2 /* downstream */ , NULL); + + buffer = gst_buffer_new_and_alloc (100); + GST_BUFFER_PTS (buffer) = 4 * GST_SECOND; + gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + + /* The oldest buffer was dropped now, otherwise we would have only 1 second queued */ + g_object_get (h->element, "current-level-bytes", ¤t_level, NULL); + fail_unless_equals_uint64 (current_level, 200); + g_object_get (h->element, "current-level-buffers", ¤t_level, NULL); + fail_unless_equals_uint64 (current_level, 2); + /* 3s because the last dequeued buffer had an end timestamp of 0s, the + * buffer with timestamp 1s was dropped and the newly queued buffer has a + * start timestamp of 4s */ + g_object_get (h->element, "current-level-time", ¤t_level, NULL); + fail_unless_equals_uint64 (current_level, 3 * GST_SECOND); + + /* Remove probe and check if we get all buffers we're supposed to get */ + gst_pad_remove_probe (srcpad, probe_id); + + buffer = gst_harness_pull (h); + fail_unless (buffer); + fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 0 * GST_SECOND); + gst_buffer_unref (buffer); + + buffer = gst_harness_pull (h); + fail_unless (buffer); + fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 2 * GST_SECOND); + /* DISCONT because the buffer with 1s was dropped */ + fail_unless (GST_BUFFER_IS_DISCONT (buffer)); + gst_buffer_unref (buffer); + + buffer = gst_harness_pull (h); + fail_unless (buffer); + fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 4 * GST_SECOND); + /* DISCONT because the first buffer with 4s was dropped */ + fail_unless (GST_BUFFER_IS_DISCONT (buffer)); + gst_buffer_unref (buffer); + + gst_object_unref (srcpad); + gst_harness_teardown (h); + + /* Test if the buffers limit works correctly with both leaky types */ + h = gst_harness_new ("appsrc"); + g_object_set (h->element, + "format", GST_FORMAT_TIME, + "max-bytes", G_GUINT64_CONSTANT (0), + "max-time", G_GUINT64_CONSTANT (0), + "max-buffers", G_GUINT64_CONSTANT (2), "leaky-type", 1 /* upstream */ , + NULL); + gst_harness_play (h); + srcpad = gst_element_get_static_pad (h->element, "src"); + + /* Pad probe to ensure that the source pad task is blocked and we can + * deterministically test the behaviour of the appsrc queue */ + probe_id = + gst_pad_add_probe (srcpad, + GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST | + GST_PAD_PROBE_TYPE_BLOCKING, block_probe, NULL, NULL); + + buffer = gst_buffer_new_and_alloc (100); + GST_BUFFER_PTS (buffer) = 0 * GST_SECOND; + gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + + /* wait until the appsrc is blocked downstream */ + while (!gst_pad_is_blocking (srcpad)) + g_thread_yield (); + + buffer = gst_buffer_new_and_alloc (100); + GST_BUFFER_PTS (buffer) = 1 * GST_SECOND; + gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + buffer = gst_buffer_new_and_alloc (100); + GST_BUFFER_PTS (buffer) = 2 * GST_SECOND; + gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + + /* The first buffer is not queued anymore but inside the pad probe */ + g_object_get (h->element, "current-level-bytes", ¤t_level, NULL); + fail_unless_equals_uint64 (current_level, 200); + g_object_get (h->element, "current-level-buffers", ¤t_level, NULL); + fail_unless_equals_uint64 (current_level, 2); + g_object_get (h->element, "current-level-time", ¤t_level, NULL); + fail_unless_equals_uint64 (current_level, 1 * GST_SECOND); + + buffer = gst_buffer_new_and_alloc (100); + GST_BUFFER_PTS (buffer) = 4 * GST_SECOND; + gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + + /* The new buffer was dropped now, otherwise we would have 2 seconds queued */ + g_object_get (h->element, "current-level-bytes", ¤t_level, NULL); + fail_unless_equals_uint64 (current_level, 200); + g_object_get (h->element, "current-level-buffers", ¤t_level, NULL); + fail_unless_equals_uint64 (current_level, 2); + g_object_get (h->element, "current-level-time", ¤t_level, NULL); + fail_unless_equals_uint64 (current_level, 1 * GST_SECOND); + + g_object_set (h->element, "leaky-type", 2 /* downstream */ , NULL); + + buffer = gst_buffer_new_and_alloc (100); + GST_BUFFER_PTS (buffer) = 4 * GST_SECOND; + gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + + /* The oldest buffer was dropped now, otherwise we would have only 1 second queued */ + g_object_get (h->element, "current-level-bytes", ¤t_level, NULL); + fail_unless_equals_uint64 (current_level, 200); + g_object_get (h->element, "current-level-buffers", ¤t_level, NULL); + fail_unless_equals_uint64 (current_level, 2); + /* 3s because the last dequeued buffer had an end timestamp of 0s, the + * buffer with timestamp 1s was dropped and the newly queued buffer has a + * start timestamp of 4s */ + g_object_get (h->element, "current-level-time", ¤t_level, NULL); + fail_unless_equals_uint64 (current_level, 3 * GST_SECOND); + + /* Remove probe and check if we get all buffers we're supposed to get */ + gst_pad_remove_probe (srcpad, probe_id); + + buffer = gst_harness_pull (h); + fail_unless (buffer); + fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 0 * GST_SECOND); + gst_buffer_unref (buffer); + + buffer = gst_harness_pull (h); + fail_unless (buffer); + fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 2 * GST_SECOND); + /* DISCONT because the buffer with 1s was dropped */ + fail_unless (GST_BUFFER_IS_DISCONT (buffer)); + gst_buffer_unref (buffer); + + buffer = gst_harness_pull (h); + fail_unless (buffer); + fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 4 * GST_SECOND); + /* DISCONT because the first buffer with 4s was dropped */ + fail_unless (GST_BUFFER_IS_DISCONT (buffer)); + gst_buffer_unref (buffer); + + gst_object_unref (srcpad); + gst_harness_teardown (h); + + /* Test if the time limit works correctly with both leaky types */ + h = gst_harness_new ("appsrc"); + g_object_set (h->element, + "format", GST_FORMAT_TIME, + "max-bytes", G_GUINT64_CONSTANT (0), + "max-time", 2 * GST_SECOND, + "max-buffers", G_GUINT64_CONSTANT (0), "leaky-type", 1 /* upstream */ , + NULL); + gst_harness_play (h); + srcpad = gst_element_get_static_pad (h->element, "src"); + + /* Pad probe to ensure that the source pad task is blocked and we can + * deterministically test the behaviour of the appsrc queue */ + probe_id = + gst_pad_add_probe (srcpad, + GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST | + GST_PAD_PROBE_TYPE_BLOCKING, block_probe, NULL, NULL); + + buffer = gst_buffer_new_and_alloc (100); + GST_BUFFER_PTS (buffer) = 0 * GST_SECOND; + GST_BUFFER_DURATION (buffer) = GST_SECOND; + gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + + /* wait until the appsrc is blocked downstream */ + while (!gst_pad_is_blocking (srcpad)) + g_thread_yield (); + + buffer = gst_buffer_new_and_alloc (100); + GST_BUFFER_PTS (buffer) = 1 * GST_SECOND; + GST_BUFFER_DURATION (buffer) = GST_SECOND; + gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + buffer = gst_buffer_new_and_alloc (100); + GST_BUFFER_PTS (buffer) = 2 * GST_SECOND; + GST_BUFFER_DURATION (buffer) = GST_SECOND; + gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + + /* The first buffer is not queued anymore but inside the pad probe */ + g_object_get (h->element, "current-level-bytes", ¤t_level, NULL); + fail_unless_equals_uint64 (current_level, 200); + g_object_get (h->element, "current-level-buffers", ¤t_level, NULL); + fail_unless_equals_uint64 (current_level, 2); + g_object_get (h->element, "current-level-time", ¤t_level, NULL); + fail_unless_equals_uint64 (current_level, 2 * GST_SECOND); + + buffer = gst_buffer_new_and_alloc (100); + GST_BUFFER_PTS (buffer) = 4 * GST_SECOND; + GST_BUFFER_DURATION (buffer) = GST_SECOND; + gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + + /* The new buffer was dropped now, otherwise we would have more than 2 seconds queued */ + g_object_get (h->element, "current-level-bytes", ¤t_level, NULL); + fail_unless_equals_uint64 (current_level, 200); + g_object_get (h->element, "current-level-buffers", ¤t_level, NULL); + fail_unless_equals_uint64 (current_level, 2); + g_object_get (h->element, "current-level-time", ¤t_level, NULL); + fail_unless_equals_uint64 (current_level, 2 * GST_SECOND); + + g_object_set (h->element, "leaky-type", 2 /* downstream */ , NULL); + + buffer = gst_buffer_new_and_alloc (100); + GST_BUFFER_PTS (buffer) = 4 * GST_SECOND; + GST_BUFFER_DURATION (buffer) = GST_SECOND; + gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + + /* The oldest buffer was dropped now, otherwise we would have only 1 second queued */ + g_object_get (h->element, "current-level-bytes", ¤t_level, NULL); + fail_unless_equals_uint64 (current_level, 200); + g_object_get (h->element, "current-level-buffers", ¤t_level, NULL); + fail_unless_equals_uint64 (current_level, 2); + /* 3s because the last dequeued buffer had an end timestamp of 0s, the + * buffer with timestamp 1s was dropped and the newly queued buffer has a + * start timestamp of 4s */ + g_object_get (h->element, "current-level-time", ¤t_level, NULL); + fail_unless_equals_uint64 (current_level, 3 * GST_SECOND); + + /* Remove probe and check if we get all buffers we're supposed to get */ + gst_pad_remove_probe (srcpad, probe_id); + + buffer = gst_harness_pull (h); + fail_unless (buffer); + fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 0 * GST_SECOND); + gst_buffer_unref (buffer); + + buffer = gst_harness_pull (h); + fail_unless (buffer); + fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 2 * GST_SECOND); + /* DISCONT because the buffer with 1s was dropped */ + fail_unless (GST_BUFFER_IS_DISCONT (buffer)); + gst_buffer_unref (buffer); + + buffer = gst_harness_pull (h); + fail_unless (buffer); + fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 4 * GST_SECOND); + /* DISCONT because the first buffer with 4s was dropped */ + fail_unless (GST_BUFFER_IS_DISCONT (buffer)); + gst_buffer_unref (buffer); + + gst_object_unref (srcpad); + gst_harness_teardown (h); +} + +GST_END_TEST; + static Suite * appsrc_suite (void) { @@ -1074,6 +1381,7 @@ appsrc_suite (void) tcase_add_test (tc_chain, test_appsrc_push_buffer_list); tcase_add_test (tc_chain, test_appsrc_period_with_custom_segment); tcase_add_test (tc_chain, test_appsrc_custom_segment_twice); + tcase_add_test (tc_chain, test_appsrc_limits); if (RUNNING_ON_VALGRIND) tcase_add_loop_test (tc_chain, test_appsrc_block_deadlock, 0, 5);