Skip to content

Commit

Permalink
appsrc: Add test for testing the max-* and leaky-type properties
Browse files Browse the repository at this point in the history
  • Loading branch information
sdroege authored and GStreamer Marge Bot committed May 5, 2021
1 parent 02530e9 commit 26b8a96
Showing 1 changed file with 309 additions and 1 deletion.
310 changes: 309 additions & 1 deletion tests/check/elements/appsrc.c
Expand Up @@ -22,7 +22,7 @@
#include "config.h"
#endif

#include <gst/check/gstcheck.h>
#include <gst/check/check.h>
#include <gst/app/gstappsrc.h>
#include <gst/app/gstappsink.h>

Expand Down Expand Up @@ -1061,6 +1061,313 @@ GST_START_TEST (test_appsrc_custom_segment_twice)

GST_END_TEST;

static GstPadProbeReturn
block_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
{
return GST_PAD_PROBE_OK;
}

GST_START_TEST (test_appsrc_limits)
{
GstHarness *h;
GstPad *srcpad;
GstBuffer *buffer;
gulong probe_id;
guint64 current_level;

/* Test if the bytes limit works correctly with both leaky types */
h = gst_harness_new ("appsrc");
g_object_set (h->element,
"format", GST_FORMAT_TIME,
"max-bytes", G_GUINT64_CONSTANT (200),
"max-time", G_GUINT64_CONSTANT (0),
"max-buffers", G_GUINT64_CONSTANT (0), "leaky-type", 1 /* upstream */ ,
NULL);
gst_harness_play (h);
srcpad = gst_element_get_static_pad (h->element, "src");

/* Pad probe to ensure that the source pad task is blocked and we can
* deterministically test the behaviour of the appsrc queue */
probe_id =
gst_pad_add_probe (srcpad,
GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST |
GST_PAD_PROBE_TYPE_BLOCKING, block_probe, NULL, NULL);

buffer = gst_buffer_new_and_alloc (100);
GST_BUFFER_PTS (buffer) = 0 * GST_SECOND;
gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);

/* wait until the appsrc is blocked downstream */
while (!gst_pad_is_blocking (srcpad))
g_thread_yield ();

buffer = gst_buffer_new_and_alloc (100);
GST_BUFFER_PTS (buffer) = 1 * GST_SECOND;
gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
buffer = gst_buffer_new_and_alloc (100);
GST_BUFFER_PTS (buffer) = 2 * GST_SECOND;
gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);

/* The first buffer is not queued anymore but inside the pad probe */
g_object_get (h->element, "current-level-bytes", &current_level, NULL);
fail_unless_equals_uint64 (current_level, 200);
g_object_get (h->element, "current-level-buffers", &current_level, NULL);
fail_unless_equals_uint64 (current_level, 2);
g_object_get (h->element, "current-level-time", &current_level, NULL);
fail_unless_equals_uint64 (current_level, 1 * GST_SECOND);

buffer = gst_buffer_new_and_alloc (100);
GST_BUFFER_PTS (buffer) = 4 * GST_SECOND;
gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);

/* The new buffer was dropped now, otherwise we would have 2 seconds queued */
g_object_get (h->element, "current-level-bytes", &current_level, NULL);
fail_unless_equals_uint64 (current_level, 200);
g_object_get (h->element, "current-level-buffers", &current_level, NULL);
fail_unless_equals_uint64 (current_level, 2);
g_object_get (h->element, "current-level-time", &current_level, NULL);
fail_unless_equals_uint64 (current_level, 1 * GST_SECOND);

g_object_set (h->element, "leaky-type", 2 /* downstream */ , NULL);

buffer = gst_buffer_new_and_alloc (100);
GST_BUFFER_PTS (buffer) = 4 * GST_SECOND;
gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);

/* The oldest buffer was dropped now, otherwise we would have only 1 second queued */
g_object_get (h->element, "current-level-bytes", &current_level, NULL);
fail_unless_equals_uint64 (current_level, 200);
g_object_get (h->element, "current-level-buffers", &current_level, NULL);
fail_unless_equals_uint64 (current_level, 2);
/* 3s because the last dequeued buffer had an end timestamp of 0s, the
* buffer with timestamp 1s was dropped and the newly queued buffer has a
* start timestamp of 4s */
g_object_get (h->element, "current-level-time", &current_level, NULL);
fail_unless_equals_uint64 (current_level, 3 * GST_SECOND);

/* Remove probe and check if we get all buffers we're supposed to get */
gst_pad_remove_probe (srcpad, probe_id);

buffer = gst_harness_pull (h);
fail_unless (buffer);
fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 0 * GST_SECOND);
gst_buffer_unref (buffer);

buffer = gst_harness_pull (h);
fail_unless (buffer);
fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 2 * GST_SECOND);
/* DISCONT because the buffer with 1s was dropped */
fail_unless (GST_BUFFER_IS_DISCONT (buffer));
gst_buffer_unref (buffer);

buffer = gst_harness_pull (h);
fail_unless (buffer);
fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 4 * GST_SECOND);
/* DISCONT because the first buffer with 4s was dropped */
fail_unless (GST_BUFFER_IS_DISCONT (buffer));
gst_buffer_unref (buffer);

gst_object_unref (srcpad);
gst_harness_teardown (h);

/* Test if the buffers limit works correctly with both leaky types */
h = gst_harness_new ("appsrc");
g_object_set (h->element,
"format", GST_FORMAT_TIME,
"max-bytes", G_GUINT64_CONSTANT (0),
"max-time", G_GUINT64_CONSTANT (0),
"max-buffers", G_GUINT64_CONSTANT (2), "leaky-type", 1 /* upstream */ ,
NULL);
gst_harness_play (h);
srcpad = gst_element_get_static_pad (h->element, "src");

/* Pad probe to ensure that the source pad task is blocked and we can
* deterministically test the behaviour of the appsrc queue */
probe_id =
gst_pad_add_probe (srcpad,
GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST |
GST_PAD_PROBE_TYPE_BLOCKING, block_probe, NULL, NULL);

buffer = gst_buffer_new_and_alloc (100);
GST_BUFFER_PTS (buffer) = 0 * GST_SECOND;
gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);

/* wait until the appsrc is blocked downstream */
while (!gst_pad_is_blocking (srcpad))
g_thread_yield ();

buffer = gst_buffer_new_and_alloc (100);
GST_BUFFER_PTS (buffer) = 1 * GST_SECOND;
gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
buffer = gst_buffer_new_and_alloc (100);
GST_BUFFER_PTS (buffer) = 2 * GST_SECOND;
gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);

/* The first buffer is not queued anymore but inside the pad probe */
g_object_get (h->element, "current-level-bytes", &current_level, NULL);
fail_unless_equals_uint64 (current_level, 200);
g_object_get (h->element, "current-level-buffers", &current_level, NULL);
fail_unless_equals_uint64 (current_level, 2);
g_object_get (h->element, "current-level-time", &current_level, NULL);
fail_unless_equals_uint64 (current_level, 1 * GST_SECOND);

buffer = gst_buffer_new_and_alloc (100);
GST_BUFFER_PTS (buffer) = 4 * GST_SECOND;
gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);

/* The new buffer was dropped now, otherwise we would have 2 seconds queued */
g_object_get (h->element, "current-level-bytes", &current_level, NULL);
fail_unless_equals_uint64 (current_level, 200);
g_object_get (h->element, "current-level-buffers", &current_level, NULL);
fail_unless_equals_uint64 (current_level, 2);
g_object_get (h->element, "current-level-time", &current_level, NULL);
fail_unless_equals_uint64 (current_level, 1 * GST_SECOND);

g_object_set (h->element, "leaky-type", 2 /* downstream */ , NULL);

buffer = gst_buffer_new_and_alloc (100);
GST_BUFFER_PTS (buffer) = 4 * GST_SECOND;
gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);

/* The oldest buffer was dropped now, otherwise we would have only 1 second queued */
g_object_get (h->element, "current-level-bytes", &current_level, NULL);
fail_unless_equals_uint64 (current_level, 200);
g_object_get (h->element, "current-level-buffers", &current_level, NULL);
fail_unless_equals_uint64 (current_level, 2);
/* 3s because the last dequeued buffer had an end timestamp of 0s, the
* buffer with timestamp 1s was dropped and the newly queued buffer has a
* start timestamp of 4s */
g_object_get (h->element, "current-level-time", &current_level, NULL);
fail_unless_equals_uint64 (current_level, 3 * GST_SECOND);

/* Remove probe and check if we get all buffers we're supposed to get */
gst_pad_remove_probe (srcpad, probe_id);

buffer = gst_harness_pull (h);
fail_unless (buffer);
fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 0 * GST_SECOND);
gst_buffer_unref (buffer);

buffer = gst_harness_pull (h);
fail_unless (buffer);
fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 2 * GST_SECOND);
/* DISCONT because the buffer with 1s was dropped */
fail_unless (GST_BUFFER_IS_DISCONT (buffer));
gst_buffer_unref (buffer);

buffer = gst_harness_pull (h);
fail_unless (buffer);
fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 4 * GST_SECOND);
/* DISCONT because the first buffer with 4s was dropped */
fail_unless (GST_BUFFER_IS_DISCONT (buffer));
gst_buffer_unref (buffer);

gst_object_unref (srcpad);
gst_harness_teardown (h);

/* Test if the time limit works correctly with both leaky types */
h = gst_harness_new ("appsrc");
g_object_set (h->element,
"format", GST_FORMAT_TIME,
"max-bytes", G_GUINT64_CONSTANT (0),
"max-time", 2 * GST_SECOND,
"max-buffers", G_GUINT64_CONSTANT (0), "leaky-type", 1 /* upstream */ ,
NULL);
gst_harness_play (h);
srcpad = gst_element_get_static_pad (h->element, "src");

/* Pad probe to ensure that the source pad task is blocked and we can
* deterministically test the behaviour of the appsrc queue */
probe_id =
gst_pad_add_probe (srcpad,
GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST |
GST_PAD_PROBE_TYPE_BLOCKING, block_probe, NULL, NULL);

buffer = gst_buffer_new_and_alloc (100);
GST_BUFFER_PTS (buffer) = 0 * GST_SECOND;
GST_BUFFER_DURATION (buffer) = GST_SECOND;
gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);

/* wait until the appsrc is blocked downstream */
while (!gst_pad_is_blocking (srcpad))
g_thread_yield ();

buffer = gst_buffer_new_and_alloc (100);
GST_BUFFER_PTS (buffer) = 1 * GST_SECOND;
GST_BUFFER_DURATION (buffer) = GST_SECOND;
gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
buffer = gst_buffer_new_and_alloc (100);
GST_BUFFER_PTS (buffer) = 2 * GST_SECOND;
GST_BUFFER_DURATION (buffer) = GST_SECOND;
gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);

/* The first buffer is not queued anymore but inside the pad probe */
g_object_get (h->element, "current-level-bytes", &current_level, NULL);
fail_unless_equals_uint64 (current_level, 200);
g_object_get (h->element, "current-level-buffers", &current_level, NULL);
fail_unless_equals_uint64 (current_level, 2);
g_object_get (h->element, "current-level-time", &current_level, NULL);
fail_unless_equals_uint64 (current_level, 2 * GST_SECOND);

buffer = gst_buffer_new_and_alloc (100);
GST_BUFFER_PTS (buffer) = 4 * GST_SECOND;
GST_BUFFER_DURATION (buffer) = GST_SECOND;
gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);

/* The new buffer was dropped now, otherwise we would have more than 2 seconds queued */
g_object_get (h->element, "current-level-bytes", &current_level, NULL);
fail_unless_equals_uint64 (current_level, 200);
g_object_get (h->element, "current-level-buffers", &current_level, NULL);
fail_unless_equals_uint64 (current_level, 2);
g_object_get (h->element, "current-level-time", &current_level, NULL);
fail_unless_equals_uint64 (current_level, 2 * GST_SECOND);

g_object_set (h->element, "leaky-type", 2 /* downstream */ , NULL);

buffer = gst_buffer_new_and_alloc (100);
GST_BUFFER_PTS (buffer) = 4 * GST_SECOND;
GST_BUFFER_DURATION (buffer) = GST_SECOND;
gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);

/* The oldest buffer was dropped now, otherwise we would have only 1 second queued */
g_object_get (h->element, "current-level-bytes", &current_level, NULL);
fail_unless_equals_uint64 (current_level, 200);
g_object_get (h->element, "current-level-buffers", &current_level, NULL);
fail_unless_equals_uint64 (current_level, 2);
/* 3s because the last dequeued buffer had an end timestamp of 0s, the
* buffer with timestamp 1s was dropped and the newly queued buffer has a
* start timestamp of 4s */
g_object_get (h->element, "current-level-time", &current_level, NULL);
fail_unless_equals_uint64 (current_level, 3 * GST_SECOND);

/* Remove probe and check if we get all buffers we're supposed to get */
gst_pad_remove_probe (srcpad, probe_id);

buffer = gst_harness_pull (h);
fail_unless (buffer);
fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 0 * GST_SECOND);
gst_buffer_unref (buffer);

buffer = gst_harness_pull (h);
fail_unless (buffer);
fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 2 * GST_SECOND);
/* DISCONT because the buffer with 1s was dropped */
fail_unless (GST_BUFFER_IS_DISCONT (buffer));
gst_buffer_unref (buffer);

buffer = gst_harness_pull (h);
fail_unless (buffer);
fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 4 * GST_SECOND);
/* DISCONT because the first buffer with 4s was dropped */
fail_unless (GST_BUFFER_IS_DISCONT (buffer));
gst_buffer_unref (buffer);

gst_object_unref (srcpad);
gst_harness_teardown (h);
}

GST_END_TEST;

static Suite *
appsrc_suite (void)
{
Expand All @@ -1074,6 +1381,7 @@ appsrc_suite (void)
tcase_add_test (tc_chain, test_appsrc_push_buffer_list);
tcase_add_test (tc_chain, test_appsrc_period_with_custom_segment);
tcase_add_test (tc_chain, test_appsrc_custom_segment_twice);
tcase_add_test (tc_chain, test_appsrc_limits);

if (RUNNING_ON_VALGRIND)
tcase_add_loop_test (tc_chain, test_appsrc_block_deadlock, 0, 5);
Expand Down

0 comments on commit 26b8a96

Please sign in to comment.