Skip to content

Commit

Permalink
appsrc: Implement a leaky property similar to the queue element
Browse files Browse the repository at this point in the history
This allows dropping the newest or oldest buffer when the internal queue
is full instead of blocking or continuing to grow.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/merge_requests/1133>
  • Loading branch information
sdroege authored and GStreamer Marge Bot committed May 5, 2021
1 parent d987ec2 commit 02530e9
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 2 deletions.
12 changes: 12 additions & 0 deletions docs/plugins/gst_plugins_cache.json
Expand Up @@ -556,6 +556,18 @@
"type": "gboolean",
"writable": true
},
"leaky-type": {
"blurb": "Whether to drop buffers once the internal queue is full",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "none (0)",
"mutable": "ready",
"readable": true,
"type": "GstAppLeakyType",
"writable": true
},
"max-buffers": {
"blurb": "The maximum number of buffers to queue internally (0 = unlimited)",
"conditionally-available": false,
Expand Down
180 changes: 178 additions & 2 deletions gst-libs/gst/app/gstappsrc.c
Expand Up @@ -155,6 +155,13 @@ struct _GstAppSrcPrivate
* the next buffer of buffer list */
gboolean pending_custom_segment;

/* the next buffer that will be queued needs a discont flag
* because the previous one was dropped - GST_APP_LEAKY_TYPE_UPSTREAM */
gboolean need_discont_upstream;
/* the next buffer that will be dequeued needs a discont flag
* because the previous one was dropped - GST_APP_LEAKY_TYPE_DOWNSTREAM */
gboolean need_discont_downstream;

gint64 size;
GstClockTime duration;
GstAppStreamType stream_type;
Expand All @@ -180,6 +187,8 @@ struct _GstAppSrcPrivate
guint min_percent;
gboolean handle_segment_change;

GstAppLeakyType leaky_type;

Callbacks *callbacks;
};

Expand Down Expand Up @@ -219,6 +228,7 @@ enum
#define DEFAULT_PROP_CURRENT_LEVEL_TIME 0
#define DEFAULT_PROP_DURATION GST_CLOCK_TIME_NONE
#define DEFAULT_PROP_HANDLE_SEGMENT_CHANGE FALSE
#define DEFAULT_PROP_LEAKY_TYPE GST_APP_LEAKY_TYPE_NONE

enum
{
Expand All @@ -241,6 +251,7 @@ enum
PROP_CURRENT_LEVEL_TIME,
PROP_DURATION,
PROP_HANDLE_SEGMENT_CHANGE,
PROP_LEAKY_TYPE,
PROP_LAST
};

Expand Down Expand Up @@ -541,6 +552,24 @@ gst_app_src_class_init (GstAppSrcClass * klass)
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
G_PARAM_STATIC_STRINGS));

/**
* GstAppSrc:leaky-type:
*
* When set to any other value than GST_APP_LEAKY_TYPE_NONE then the appsrc
* will drop any buffers that are pushed into it once its internal queue is
* full. The selected type defines whether to drop the oldest or new
* buffers.
*
* Since: 1.20
*/
g_object_class_install_property (gobject_class, PROP_LEAKY_TYPE,
g_param_spec_enum ("leaky-type", "Leaky Type",
"Whether to drop buffers once the internal queue is full",
GST_TYPE_APP_LEAKY_TYPE,
DEFAULT_PROP_LEAKY_TYPE,
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
G_PARAM_STATIC_STRINGS));

/**
* GstAppSrc::need-data:
* @appsrc: the appsrc element that emitted the signal
Expand Down Expand Up @@ -719,6 +748,7 @@ gst_app_src_init (GstAppSrc * appsrc)
priv->emit_signals = DEFAULT_PROP_EMIT_SIGNALS;
priv->min_percent = DEFAULT_PROP_MIN_PERCENT;
priv->handle_segment_change = DEFAULT_PROP_HANDLE_SEGMENT_CHANGE;
priv->leaky_type = DEFAULT_PROP_LEAKY_TYPE;

gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE);
}
Expand Down Expand Up @@ -750,6 +780,8 @@ gst_app_src_flush_queued (GstAppSrc * src, gboolean retain_last_caps)
priv->queued_time = 0;
priv->last_in_running_time = GST_CLOCK_TIME_NONE;
priv->last_out_running_time = GST_CLOCK_TIME_NONE;
priv->need_discont_upstream = FALSE;
priv->need_discont_downstream = FALSE;
}

static void
Expand Down Expand Up @@ -878,6 +910,9 @@ gst_app_src_set_property (GObject * object, guint prop_id,
case PROP_HANDLE_SEGMENT_CHANGE:
priv->handle_segment_change = g_value_get_boolean (value);
break;
case PROP_LEAKY_TYPE:
priv->leaky_type = g_value_get_enum (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
Expand Down Expand Up @@ -957,6 +992,9 @@ gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
case PROP_HANDLE_SEGMENT_CHANGE:
g_value_set_boolean (value, priv->handle_segment_change);
break;
case PROP_LEAKY_TYPE:
g_value_set_enum (value, priv->leaky_type);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
Expand Down Expand Up @@ -1588,12 +1626,33 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
}

if (GST_IS_BUFFER (obj)) {
*buf = GST_BUFFER (obj);
GstBuffer *buffer = GST_BUFFER (obj);

/* Mark the buffer as DISCONT if we previously dropped a buffer
* instead of outputting it */
if (priv->need_discont_downstream) {
buffer = gst_buffer_make_writable (buffer);
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
priv->need_discont_downstream = FALSE;
}

*buf = buffer;
} else if (GST_IS_BUFFER_LIST (obj)) {
GstBufferList *buffer_list;

buffer_list = GST_BUFFER_LIST (obj);

/* Mark the first buffer of the buffer list as DISCONT if we
* previously dropped a buffer instead of outputting it */
if (priv->need_discont_downstream) {
GstBuffer *buffer;

buffer_list = gst_buffer_list_make_writable (buffer_list);
buffer = gst_buffer_list_get_writable (buffer_list, 0);
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
priv->need_discont_downstream = FALSE;
}

gst_base_src_submit_buffer_list (bsrc, buffer_list);
*buf = NULL;
} else if (GST_IS_EVENT (obj)) {
Expand Down Expand Up @@ -2223,6 +2282,45 @@ gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min,
}
}

/**
* gst_app_src_set_leaky_type:
* @appsrc: a #GstAppSrc
* @leaky: the #GstAppLeakyType
*
* When set to any other value than GST_APP_LEAKY_TYPE_NONE then the appsrc
* will drop any buffers that are pushed into it once its internal queue is
* full. The selected type defines whether to drop the oldest or new
* buffers.
*
* Since: 1.20
*/
void
gst_app_src_set_leaky_type (GstAppSrc * appsrc, GstAppLeakyType leaky)
{
g_return_if_fail (GST_IS_APP_SRC (appsrc));

appsrc->priv->leaky_type = leaky;
}

/**
* gst_app_src_get_leaky_type:
* @appsrc: a #GstAppSrc
*
* Returns the currently set #GstAppLeakyType. See gst_app_src_set_leaky_type()
* for more details.
*
* Returns: The currently set #GstAppLeakyType.
*
* Since: 1.20
*/
GstAppLeakyType
gst_app_src_get_leaky_type (GstAppSrc * appsrc)
{
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_APP_LEAKY_TYPE_NONE);

return appsrc->priv->leaky_type;
}

/**
* gst_app_src_set_latency:
* @appsrc: a #GstAppSrc
Expand Down Expand Up @@ -2402,6 +2500,43 @@ gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
priv->max_buffers, GST_TIME_ARGS (priv->queued_time),
GST_TIME_ARGS (priv->max_time));

if (priv->leaky_type == GST_APP_LEAKY_TYPE_UPSTREAM) {
priv->need_discont_upstream = TRUE;
goto dropped;
} else if (priv->leaky_type == GST_APP_LEAKY_TYPE_DOWNSTREAM) {
guint i, length = gst_queue_array_get_length (priv->queue);
GstMiniObject *item = NULL;

/* Find the oldest buffer or buffer list and drop it, then update the
* limits. Dropping one is sufficient to go below the limits again.
*/
for (i = 0; i < length; i++) {
item = gst_queue_array_peek_nth (priv->queue, i);
if (GST_IS_BUFFER (item) || GST_IS_BUFFER_LIST (item)) {
gst_queue_array_drop_element (priv->queue, i);
break;
}
/* To not accidentally have an event after the loop */
item = NULL;
}

if (!item) {
GST_FIXME_OBJECT (appsrc,
"No buffer or buffer list queued but queue is full");
/* This shouldn't really happen but in this case we can't really do
* anything apart from accepting the buffer / bufferlist */
break;
}

GST_WARNING_OBJECT (appsrc, "Dropping old item %" GST_PTR_FORMAT, item);

gst_app_src_update_queued_pop (appsrc, item, FALSE);
gst_mini_object_unref (item);

priv->need_discont_downstream = TRUE;
continue;
}

if (first) {
Callbacks *callbacks = NULL;
gboolean emit;
Expand Down Expand Up @@ -2438,8 +2573,9 @@ gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
* stops pushing buffers. */
break;
}
} else
} else {
break;
}
}

if (priv->pending_custom_segment) {
Expand All @@ -2451,11 +2587,39 @@ gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
}

if (buflist != NULL) {
/* Mark the first buffer of the buffer list as DISCONT if we previously
* dropped a buffer instead of queueing it */
if (priv->need_discont_upstream) {
if (!steal_ref) {
buflist = gst_buffer_list_copy (buflist);
steal_ref = TRUE;
} else {
buflist = gst_buffer_list_make_writable (buflist);
}
buffer = gst_buffer_list_get_writable (buflist, 0);
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
priv->need_discont_upstream = FALSE;
}

GST_DEBUG_OBJECT (appsrc, "queueing buffer list %p", buflist);

if (!steal_ref)
gst_buffer_list_ref (buflist);
gst_queue_array_push_tail (priv->queue, buflist);
} else {
/* Mark the buffer as DISCONT if we previously dropped a buffer instead of
* queueing it */
if (priv->need_discont_upstream) {
if (!steal_ref) {
buffer = gst_buffer_copy (buffer);
steal_ref = TRUE;
} else {
buffer = gst_buffer_make_writable (buffer);
}
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
priv->need_discont_upstream = FALSE;
}

GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
if (!steal_ref)
gst_buffer_ref (buffer);
Expand Down Expand Up @@ -2497,6 +2661,18 @@ gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
g_mutex_unlock (&priv->mutex);
return GST_FLOW_EOS;
}
dropped:
{
GST_DEBUG_OBJECT (appsrc, "dropped new buffer %p, we are full", buffer);
if (steal_ref) {
if (buflist)
gst_buffer_list_unref (buflist);
else
gst_buffer_unref (buffer);
}
g_mutex_unlock (&priv->mutex);
return GST_FLOW_EOS;
}
}

static GstFlowReturn
Expand Down
23 changes: 23 additions & 0 deletions gst-libs/gst/app/gstappsrc.h
Expand Up @@ -88,6 +88,23 @@ typedef enum
GST_APP_STREAM_TYPE_RANDOM_ACCESS
} GstAppStreamType;

/**
* GstAppLeakyType:
* @GST_APP_LEAKY_TYPE_NONE: Not Leaky
* @GST_APP_LEAKY_TYPE_UPSTREAM: Leaky on upstream (new buffers)
* @GST_APP_LEAKY_TYPE_DOWNSTREAM: Leaky on downstream (old buffers)
*
* Buffer dropping scheme to avoid the element's internal queue to block when
* full.
*
* Since: 1.20
*/
typedef enum {
GST_APP_LEAKY_TYPE_NONE,
GST_APP_LEAKY_TYPE_UPSTREAM,
GST_APP_LEAKY_TYPE_DOWNSTREAM
} GstAppLeakyType;

struct _GstAppSrc
{
GstBaseSrc basesrc;
Expand Down Expand Up @@ -172,6 +189,12 @@ GstClockTime gst_app_src_get_max_time (GstAppSrc *appsrc);
GST_APP_API
GstClockTime gst_app_src_get_current_level_time (GstAppSrc *appsrc);

GST_APP_API
void gst_app_src_set_leaky_type (GstAppSrc *appsrc, GstAppLeakyType leaky);

GST_APP_API
GstAppLeakyType gst_app_src_get_leaky_type (GstAppSrc *appsrc);

GST_APP_API
void gst_app_src_set_latency (GstAppSrc *appsrc, guint64 min, guint64 max);

Expand Down

0 comments on commit 02530e9

Please sign in to comment.