From d987ec21f24098cb624ccafad43ddb33e04bb6ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Fri, 30 Apr 2021 19:22:46 +0300 Subject: [PATCH] appsrc: Add new max-buffers / max-time / current-level-buffers / current-level-time properties These work the same way as the corresponding properties on queue and allow to control the internal buffer size of the appsrc in a more flexible way. Unlike in queue the max-buffers and max-time properties are 0 (i.e. disabled) by default for backwards compatibility reasons. Part-of: --- docs/plugins/gst_plugins_cache.json | 56 +++ gst-libs/gst/app/gstappsrc.c | 529 ++++++++++++++++++++++++++-- gst-libs/gst/app/gstappsrc.h | 18 + 3 files changed, 575 insertions(+), 28 deletions(-) diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 285dc3b712..0353310581 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -452,6 +452,20 @@ "type": "GstCaps", "writable": true }, + "current-level-buffers": { + "blurb": "The number of currently queued buffers", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "18446744073709551615", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint64", + "writable": false + }, "current-level-bytes": { "blurb": "The number of currently queued bytes", "conditionally-available": false, @@ -466,6 +480,20 @@ "type": "guint64", "writable": false }, + "current-level-time": { + "blurb": "The amount of currently queued time", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "18446744073709551615", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint64", + "writable": false + }, "duration": { "blurb": "The duration of the data stream in nanoseconds (GST_CLOCK_TIME_NONE if unknown)", "conditionally-available": false, @@ -528,6 +556,20 @@ "type": "gboolean", "writable": true }, + "max-buffers": { + "blurb": "The maximum number of buffers to queue internally (0 = unlimited)", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "18446744073709551615", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint64", + "writable": true + }, "max-bytes": { "blurb": "The maximum number of bytes to queue internally (0 = unlimited)", "conditionally-available": false, @@ -556,6 +598,20 @@ "type": "gint64", "writable": true }, + "max-time": { + "blurb": "The maximum amount of time to queue internally (0 = unlimited)", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "18446744073709551615", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint64", + "writable": true + }, "min-latency": { "blurb": "The minimum latency (-1 = default)", "conditionally-available": false, diff --git a/gst-libs/gst/app/gstappsrc.c b/gst-libs/gst/app/gstappsrc.c index c2267a045a..73d92211ba 100644 --- a/gst-libs/gst/app/gstappsrc.c +++ b/gst-libs/gst/app/gstappsrc.c @@ -46,11 +46,12 @@ * streaming thread. It is important to note that data transport will not happen * from the thread that performed the push-buffer call. * - * The "max-bytes" property controls how much data can be queued in appsrc - * before appsrc considers the queue full. A filled internal queue will always - * signal the "enough-data" signal, which signals the application that it should - * stop pushing data into appsrc. The "block" property will cause appsrc to - * block the push-buffer method until free data becomes available again. + * The "max-bytes", "max-buffers" and "max-time" properties control how much + * data can be queued in appsrc before appsrc considers the queue full. A + * filled internal queue will always signal the "enough-data" signal, which + * signals the application that it should stop pushing data into appsrc. The + * "block" property will cause appsrc to block the push-buffer method until + * free data becomes available again. * * When the internal queue is running out of data, the "need-data" signal is * emitted, which signals the application that it should start pushing more data @@ -146,14 +147,18 @@ struct _GstAppSrcPrivate GstCaps *last_caps; GstCaps *current_caps; + /* last segment received on the input */ GstSegment last_segment; + /* currently configured segment for the output */ GstSegment current_segment; + /* queue up a segment event based on last_segment before + * the next buffer of buffer list */ gboolean pending_custom_segment; gint64 size; GstClockTime duration; GstAppStreamType stream_type; - guint64 max_bytes; + guint64 max_bytes, max_buffers, max_time; GstFormat format; gboolean block; gchar *uri; @@ -161,7 +166,11 @@ struct _GstAppSrcPrivate gboolean flushing; gboolean started; gboolean is_eos; - guint64 queued_bytes; + guint64 queued_bytes, queued_buffers; + /* Used to calculate the current time level */ + GstClockTime last_in_running_time, last_out_running_time; + /* Updated based on the above whenever they change */ + GstClockTime queued_time; guint64 offset; GstAppStreamType current_type; @@ -196,6 +205,8 @@ enum #define DEFAULT_PROP_SIZE -1 #define DEFAULT_PROP_STREAM_TYPE GST_APP_STREAM_TYPE_STREAM #define DEFAULT_PROP_MAX_BYTES 200000 +#define DEFAULT_PROP_MAX_BUFFERS 0 +#define DEFAULT_PROP_MAX_TIME (0 * GST_SECOND) #define DEFAULT_PROP_FORMAT GST_FORMAT_BYTES #define DEFAULT_PROP_BLOCK FALSE #define DEFAULT_PROP_IS_LIVE FALSE @@ -204,6 +215,8 @@ enum #define DEFAULT_PROP_EMIT_SIGNALS TRUE #define DEFAULT_PROP_MIN_PERCENT 0 #define DEFAULT_PROP_CURRENT_LEVEL_BYTES 0 +#define DEFAULT_PROP_CURRENT_LEVEL_BUFFERS 0 +#define DEFAULT_PROP_CURRENT_LEVEL_TIME 0 #define DEFAULT_PROP_DURATION GST_CLOCK_TIME_NONE #define DEFAULT_PROP_HANDLE_SEGMENT_CHANGE FALSE @@ -214,6 +227,8 @@ enum PROP_SIZE, PROP_STREAM_TYPE, PROP_MAX_BYTES, + PROP_MAX_BUFFERS, + PROP_MAX_TIME, PROP_FORMAT, PROP_BLOCK, PROP_IS_LIVE, @@ -222,6 +237,8 @@ enum PROP_EMIT_SIGNALS, PROP_MIN_PERCENT, PROP_CURRENT_LEVEL_BYTES, + PROP_CURRENT_LEVEL_BUFFERS, + PROP_CURRENT_LEVEL_TIME, PROP_DURATION, PROP_HANDLE_SEGMENT_CHANGE, PROP_LAST @@ -347,6 +364,37 @@ gst_app_src_class_init (GstAppSrcClass * klass) "The maximum number of bytes to queue internally (0 = unlimited)", 0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + /** + * GstAppSrc:max-buffers: + * + * The maximum amount of buffers that can be queued internally. + * After the maximum amount of buffers are queued, appsrc will emit the + * "enough-data" signal. + * + * Since: 1.20 + */ + g_object_class_install_property (gobject_class, PROP_MAX_BUFFERS, + g_param_spec_uint64 ("max-buffers", "Max buffers", + "The maximum number of buffers to queue internally (0 = unlimited)", + 0, G_MAXUINT64, DEFAULT_PROP_MAX_BUFFERS, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + /** + * GstAppSrc:max-time: + * + * The maximum amount of time that can be queued internally. + * After the maximum amount of time are queued, appsrc will emit the + * "enough-data" signal. + * + * Since: 1.20 + */ + g_object_class_install_property (gobject_class, PROP_MAX_TIME, + g_param_spec_uint64 ("max-time", "Max time", + "The maximum amount of time to queue internally (0 = unlimited)", + 0, G_MAXUINT64, DEFAULT_PROP_MAX_TIME, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** * GstAppSrc:block: * @@ -430,6 +478,32 @@ gst_app_src_class_init (GstAppSrcClass * klass) 0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_BYTES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + /** + * GstAppSrc:current-level-buffers: + * + * The number of currently queued buffers inside appsrc. + * + * Since: 1.20 + */ + g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_BUFFERS, + g_param_spec_uint64 ("current-level-buffers", "Current Level Buffers", + "The number of currently queued buffers", + 0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_BUFFERS, + G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + /** + * GstAppSrc:current-level-time: + * + * The amount of currently queued time inside appsrc. + * + * Since: 1.20 + */ + g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME, + g_param_spec_uint64 ("current-level-time", "Current Level Time", + "The amount of currently queued time", + 0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_TIME, + G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + /** * GstAppSrc:duration: * @@ -636,6 +710,8 @@ gst_app_src_init (GstAppSrc * appsrc) priv->duration = DEFAULT_PROP_DURATION; priv->stream_type = DEFAULT_PROP_STREAM_TYPE; priv->max_bytes = DEFAULT_PROP_MAX_BYTES; + priv->max_buffers = DEFAULT_PROP_MAX_BUFFERS; + priv->max_time = DEFAULT_PROP_MAX_TIME; priv->format = DEFAULT_PROP_FORMAT; priv->block = DEFAULT_PROP_BLOCK; priv->min_latency = DEFAULT_PROP_MIN_LATENCY; @@ -670,6 +746,10 @@ gst_app_src_flush_queued (GstAppSrc * src, gboolean retain_last_caps) } priv->queued_bytes = 0; + priv->queued_buffers = 0; + priv->queued_time = 0; + priv->last_in_running_time = GST_CLOCK_TIME_NONE; + priv->last_out_running_time = GST_CLOCK_TIME_NONE; } static void @@ -762,6 +842,12 @@ gst_app_src_set_property (GObject * object, guint prop_id, case PROP_MAX_BYTES: gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value)); break; + case PROP_MAX_BUFFERS: + gst_app_src_set_max_buffers (appsrc, g_value_get_uint64 (value)); + break; + case PROP_MAX_TIME: + gst_app_src_set_max_time (appsrc, g_value_get_uint64 (value)); + break; case PROP_FORMAT: priv->format = g_value_get_enum (value); break; @@ -818,6 +904,12 @@ gst_app_src_get_property (GObject * object, guint prop_id, GValue * value, case PROP_MAX_BYTES: g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc)); break; + case PROP_MAX_BUFFERS: + g_value_set_uint64 (value, gst_app_src_get_max_buffers (appsrc)); + break; + case PROP_MAX_TIME: + g_value_set_uint64 (value, gst_app_src_get_max_time (appsrc)); + break; case PROP_FORMAT: g_value_set_enum (value, priv->format); break; @@ -852,6 +944,13 @@ gst_app_src_get_property (GObject * object, guint prop_id, GValue * value, case PROP_CURRENT_LEVEL_BYTES: g_value_set_uint64 (value, gst_app_src_get_current_level_bytes (appsrc)); break; + case PROP_CURRENT_LEVEL_BUFFERS: + g_value_set_uint64 (value, + gst_app_src_get_current_level_buffers (appsrc)); + break; + case PROP_CURRENT_LEVEL_TIME: + g_value_set_uint64 (value, gst_app_src_get_current_level_time (appsrc)); + break; case PROP_DURATION: g_value_set_uint64 (value, gst_app_src_get_duration (appsrc)); break; @@ -1204,6 +1303,200 @@ gst_app_src_negotiate (GstBaseSrc * basesrc) return result; } +/* Update the currently queued bytes/buffers/time information for the item + * that was just removed from the queue. + * + * If update_offset is set, additionally the offset of the source will be + * moved forward accordingly as if that many bytes were output. + */ +static void +gst_app_src_update_queued_pop (GstAppSrc * appsrc, GstMiniObject * item, + gboolean update_offset) +{ + GstAppSrcPrivate *priv = appsrc->priv; + guint buf_size = 0; + guint n_buffers = 0; + GstClockTime end_buffer_ts = GST_CLOCK_TIME_NONE; + + if (GST_IS_BUFFER (item)) { + GstBuffer *buf = GST_BUFFER_CAST (item); + buf_size = gst_buffer_get_size (buf); + n_buffers = 1; + + end_buffer_ts = GST_BUFFER_DTS_OR_PTS (buf); + if (end_buffer_ts != GST_CLOCK_TIME_NONE + && GST_BUFFER_DURATION_IS_VALID (buf)) + end_buffer_ts += GST_BUFFER_DURATION (buf); + + GST_LOG_OBJECT (appsrc, "have buffer %p of size %u", buf, buf_size); + } else if (GST_IS_BUFFER_LIST (item)) { + GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item); + guint i; + + n_buffers = gst_buffer_list_length (buffer_list); + + for (i = 0; i < n_buffers; i++) { + GstBuffer *tmp = gst_buffer_list_get (buffer_list, i); + GstClockTime ts = GST_BUFFER_DTS_OR_PTS (tmp); + + buf_size += gst_buffer_get_size (tmp); + /* Update to the last buffer's timestamp that is known */ + if (ts != GST_CLOCK_TIME_NONE) { + end_buffer_ts = ts; + if (GST_BUFFER_DURATION_IS_VALID (tmp)) + end_buffer_ts += GST_BUFFER_DURATION (tmp); + } + } + } + + priv->queued_bytes -= buf_size; + priv->queued_buffers -= n_buffers; + + /* Update time level if working on a TIME segment */ + if (priv->current_segment.format == GST_FORMAT_TIME + && end_buffer_ts != GST_CLOCK_TIME_NONE) { + /* Clip to the current segment boundaries */ + if (priv->current_segment.stop != -1 + && end_buffer_ts > priv->current_segment.stop) + end_buffer_ts = priv->current_segment.stop; + else if (priv->current_segment.start > end_buffer_ts) + end_buffer_ts = priv->current_segment.start; + + priv->last_out_running_time = + gst_segment_to_running_time (&priv->current_segment, + GST_FORMAT_TIME, end_buffer_ts); + + GST_TRACE_OBJECT (appsrc, + "Last in running time %" GST_TIME_FORMAT ", last out running time %" + GST_TIME_FORMAT, GST_TIME_ARGS (priv->last_in_running_time), + GST_TIME_ARGS (priv->last_out_running_time)); + + /* If timestamps on both sides are known, calculate the current + * fill level in time and consider the queue empty if the output + * running time is lower than the input one (i.e. some kind of reset + * has happened). + */ + if (priv->last_out_running_time != GST_CLOCK_TIME_NONE + && priv->last_in_running_time != GST_CLOCK_TIME_NONE) { + if (priv->last_out_running_time > priv->last_in_running_time) { + priv->queued_time = 0; + } else { + priv->queued_time = + priv->last_in_running_time - priv->last_out_running_time; + } + } + } + + GST_DEBUG_OBJECT (appsrc, + "Currently queued: %" G_GUINT64_FORMAT " bytes, %" G_GUINT64_FORMAT + " buffers, %" GST_TIME_FORMAT, priv->queued_bytes, + priv->queued_buffers, GST_TIME_ARGS (priv->queued_time)); + + /* only update the offset when in random_access mode and when requested by + * the caller, i.e. not when just dropping the item */ + if (update_offset && priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) + priv->offset += buf_size; +} + +/* Update the currently queued bytes/buffers/time information for the item + * that was just added to the queue. + */ +static void +gst_app_src_update_queued_push (GstAppSrc * appsrc, GstMiniObject * item) +{ + GstAppSrcPrivate *priv = appsrc->priv; + GstClockTime start_buffer_ts = GST_CLOCK_TIME_NONE; + GstClockTime end_buffer_ts = GST_CLOCK_TIME_NONE; + guint buf_size = 0; + guint n_buffers = 0; + + if (GST_IS_BUFFER (item)) { + GstBuffer *buf = GST_BUFFER_CAST (item); + + buf_size = gst_buffer_get_size (buf); + n_buffers = 1; + + start_buffer_ts = end_buffer_ts = GST_BUFFER_DTS_OR_PTS (buf); + if (end_buffer_ts != GST_CLOCK_TIME_NONE + && GST_BUFFER_DURATION_IS_VALID (buf)) + end_buffer_ts += GST_BUFFER_DURATION (buf); + } else if (GST_IS_BUFFER_LIST (item)) { + GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item); + guint i; + + n_buffers = gst_buffer_list_length (buffer_list); + + for (i = 0; i < n_buffers; i++) { + GstBuffer *tmp = gst_buffer_list_get (buffer_list, i); + GstClockTime ts = GST_BUFFER_DTS_OR_PTS (tmp); + + buf_size += gst_buffer_get_size (tmp); + + if (ts != GST_CLOCK_TIME_NONE) { + if (start_buffer_ts == GST_CLOCK_TIME_NONE) + start_buffer_ts = ts; + end_buffer_ts = ts; + if (GST_BUFFER_DURATION_IS_VALID (tmp)) + end_buffer_ts += GST_BUFFER_DURATION (tmp); + } + } + } + + priv->queued_bytes += buf_size; + priv->queued_buffers += n_buffers; + + /* Update time level if working on a TIME segment */ + if (priv->last_segment.format == GST_FORMAT_TIME + && end_buffer_ts != GST_CLOCK_TIME_NONE) { + /* Clip to the last segment boundaries */ + if (priv->last_segment.stop != -1 + && end_buffer_ts > priv->last_segment.stop) + end_buffer_ts = priv->last_segment.stop; + else if (priv->last_segment.start > end_buffer_ts) + end_buffer_ts = priv->last_segment.start; + + priv->last_in_running_time = + gst_segment_to_running_time (&priv->last_segment, GST_FORMAT_TIME, + end_buffer_ts); + + /* If this is the only buffer then we can directly update the queued time + * here. This is especially useful if this was the first buffer because + * otherwise we would have to wait until it is actually unqueued to know + * the queued duration */ + if (gst_queue_array_get_length (priv->queue) == 1) { + if (priv->last_segment.stop != -1 + && start_buffer_ts > priv->last_segment.stop) + start_buffer_ts = priv->last_segment.stop; + else if (priv->last_segment.start > start_buffer_ts) + start_buffer_ts = priv->last_segment.start; + + priv->last_out_running_time = + gst_segment_to_running_time (&priv->last_segment, GST_FORMAT_TIME, + start_buffer_ts); + } + + GST_TRACE_OBJECT (appsrc, + "Last in running time %" GST_TIME_FORMAT ", last out running time %" + GST_TIME_FORMAT, GST_TIME_ARGS (priv->last_in_running_time), + GST_TIME_ARGS (priv->last_out_running_time)); + + if (priv->last_out_running_time != GST_CLOCK_TIME_NONE + && priv->last_in_running_time != GST_CLOCK_TIME_NONE) { + if (priv->last_out_running_time > priv->last_in_running_time) { + priv->queued_time = 0; + } else { + priv->queued_time = + priv->last_in_running_time - priv->last_out_running_time; + } + } + } + + GST_DEBUG_OBJECT (appsrc, + "Currently queued: %" G_GUINT64_FORMAT " bytes, %" G_GUINT64_FORMAT + " buffers, %" GST_TIME_FORMAT, priv->queued_bytes, priv->queued_buffers, + GST_TIME_ARGS (priv->queued_time)); +} + static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size, GstBuffer ** buf) @@ -1263,7 +1556,6 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size, while (TRUE) { /* return data as long as we have some */ if (!gst_queue_array_is_empty (priv->queue)) { - guint buf_size; GstMiniObject *obj = gst_queue_array_pop_head (priv->queue); if (GST_IS_CAPS (obj)) { @@ -1297,18 +1589,11 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size, if (GST_IS_BUFFER (obj)) { *buf = GST_BUFFER (obj); - buf_size = gst_buffer_get_size (*buf); - GST_LOG_OBJECT (appsrc, "have buffer %p of size %u", *buf, buf_size); } else if (GST_IS_BUFFER_LIST (obj)) { GstBufferList *buffer_list; buffer_list = GST_BUFFER_LIST (obj); - buf_size = gst_buffer_list_calculate_size (buffer_list); - - GST_LOG_OBJECT (appsrc, "have buffer list %p of size %u, %u buffers", - buffer_list, buf_size, gst_buffer_list_length (buffer_list)); - gst_base_src_submit_buffer_list (bsrc, buffer_list); *buf = NULL; } else if (GST_IS_EVENT (obj)) { @@ -1336,22 +1621,25 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size, g_assert_not_reached (); } - priv->queued_bytes -= buf_size; - - /* only update the offset when in random_access mode */ - if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) - priv->offset += buf_size; + gst_app_src_update_queued_pop (appsrc, obj, TRUE); /* signal that we removed an item */ if ((priv->wait_status & APP_WAITING)) g_cond_broadcast (&priv->cond); /* see if we go lower than the min-percent */ - if (priv->min_percent && priv->max_bytes) { - if (priv->queued_bytes * 100 / priv->max_bytes <= priv->min_percent) + if (priv->min_percent) { + if ((priv->max_bytes + && priv->queued_bytes * 100 / priv->max_bytes <= + priv->min_percent) || (priv->max_buffers + && priv->queued_buffers * 100 / priv->max_buffers <= + priv->min_percent) || (priv->max_time + && priv->queued_time * 100 / priv->max_time <= + priv->min_percent)) { /* ignore flushing state, we got a buffer and we will return it now. * Errors will be handled in the next round */ gst_app_src_emit_need_data (appsrc, size); + } } ret = GST_FLOW_OK; break; @@ -1717,7 +2005,7 @@ gst_app_src_get_max_bytes (GstAppSrc * appsrc) guint64 gst_app_src_get_current_level_bytes (GstAppSrc * appsrc) { - gint64 queued; + guint64 queued; GstAppSrcPrivate *priv; g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1); @@ -1733,6 +2021,183 @@ gst_app_src_get_current_level_bytes (GstAppSrc * appsrc) return queued; } +/** + * gst_app_src_set_max_buffers: + * @appsrc: a #GstAppSrc + * @max: the maximum number of buffers to queue + * + * Set the maximum amount of buffers that can be queued in @appsrc. + * After the maximum amount of buffers are queued, @appsrc will emit the + * "enough-data" signal. + * + * Since: 1.20 + */ +void +gst_app_src_set_max_buffers (GstAppSrc * appsrc, guint64 max) +{ + GstAppSrcPrivate *priv; + + g_return_if_fail (GST_IS_APP_SRC (appsrc)); + + priv = appsrc->priv; + + g_mutex_lock (&priv->mutex); + if (max != priv->max_buffers) { + GST_DEBUG_OBJECT (appsrc, "setting max-buffers to %" G_GUINT64_FORMAT, max); + priv->max_buffers = max; + /* signal the change */ + g_cond_broadcast (&priv->cond); + } + g_mutex_unlock (&priv->mutex); +} + +/** + * gst_app_src_get_max_buffers: + * @appsrc: a #GstAppSrc + * + * Get the maximum amount of buffers that can be queued in @appsrc. + * + * Returns: The maximum amount of buffers that can be queued. + * + * Since: 1.20 + */ +guint64 +gst_app_src_get_max_buffers (GstAppSrc * appsrc) +{ + guint64 result; + GstAppSrcPrivate *priv; + + g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0); + + priv = appsrc->priv; + + g_mutex_lock (&priv->mutex); + result = priv->max_buffers; + GST_DEBUG_OBJECT (appsrc, "getting max-buffers of %" G_GUINT64_FORMAT, + result); + g_mutex_unlock (&priv->mutex); + + return result; +} + +/** + * gst_app_src_get_current_level_buffers: + * @appsrc: a #GstAppSrc + * + * Get the number of currently queued buffers inside @appsrc. + * + * Returns: The number of currently queued buffers. + * + * Since: 1.20 + */ +guint64 +gst_app_src_get_current_level_buffers (GstAppSrc * appsrc) +{ + guint64 queued; + GstAppSrcPrivate *priv; + + g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1); + + priv = appsrc->priv; + + GST_OBJECT_LOCK (appsrc); + queued = priv->queued_buffers; + GST_DEBUG_OBJECT (appsrc, "current level buffers is %" G_GUINT64_FORMAT, + queued); + GST_OBJECT_UNLOCK (appsrc); + + return queued; +} + +/** + * gst_app_src_set_max_time: + * @appsrc: a #GstAppSrc + * @max: the maximum amonut of time to queue + * + * Set the maximum amount of time that can be queued in @appsrc. + * After the maximum amount of time are queued, @appsrc will emit the + * "enough-data" signal. + * + * Since: 1.20 + */ +void +gst_app_src_set_max_time (GstAppSrc * appsrc, GstClockTime max) +{ + GstAppSrcPrivate *priv; + + g_return_if_fail (GST_IS_APP_SRC (appsrc)); + + priv = appsrc->priv; + + g_mutex_lock (&priv->mutex); + if (max != priv->max_time) { + GST_DEBUG_OBJECT (appsrc, "setting max-time to %" GST_TIME_FORMAT, + GST_TIME_ARGS (max)); + priv->max_time = max; + /* signal the change */ + g_cond_broadcast (&priv->cond); + } + g_mutex_unlock (&priv->mutex); +} + +/** + * gst_app_src_get_max_time: + * @appsrc: a #GstAppSrc + * + * Get the maximum amount of time that can be queued in @appsrc. + * + * Returns: The maximum amount of time that can be queued. + * + * Since: 1.20 + */ +GstClockTime +gst_app_src_get_max_time (GstAppSrc * appsrc) +{ + GstClockTime result; + GstAppSrcPrivate *priv; + + g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0); + + priv = appsrc->priv; + + g_mutex_lock (&priv->mutex); + result = priv->max_time; + GST_DEBUG_OBJECT (appsrc, "getting max-time of %" GST_TIME_FORMAT, + GST_TIME_ARGS (result)); + g_mutex_unlock (&priv->mutex); + + return result; +} + +/** + * gst_app_src_get_current_level_time: + * @appsrc: a #GstAppSrc + * + * Get the amount of currently queued time inside @appsrc. + * + * Returns: The amount of currently queued time. + * + * Since: 1.20 + */ +GstClockTime +gst_app_src_get_current_level_time (GstAppSrc * appsrc) +{ + gint64 queued; + GstAppSrcPrivate *priv; + + g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_CLOCK_TIME_NONE); + + priv = appsrc->priv; + + GST_OBJECT_LOCK (appsrc); + queued = priv->queued_time; + GST_DEBUG_OBJECT (appsrc, "current level time is %" GST_TIME_FORMAT, + GST_TIME_ARGS (queued)); + GST_OBJECT_UNLOCK (appsrc); + + return queued; +} + static void gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min, gboolean do_max, guint64 max) @@ -1925,10 +2390,17 @@ gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer, if (priv->is_eos) goto eos; - if (priv->max_bytes && priv->queued_bytes >= priv->max_bytes) { + if ((priv->max_bytes && priv->queued_bytes >= priv->max_bytes) || + (priv->max_buffers && priv->queued_buffers >= priv->max_buffers) || + (priv->max_time && priv->queued_time >= priv->max_time)) { GST_DEBUG_OBJECT (appsrc, - "queue filled (%" G_GUINT64_FORMAT " >= %" G_GUINT64_FORMAT ")", - priv->queued_bytes, priv->max_bytes); + "queue filled (queued %" G_GUINT64_FORMAT " bytes, max %" + G_GUINT64_FORMAT " bytes, " "queued %" G_GUINT64_FORMAT + " buffers, max %" G_GUINT64_FORMAT " buffers, " "queued %" + GST_TIME_FORMAT " time, max %" GST_TIME_FORMAT " time)", + priv->queued_bytes, priv->max_bytes, priv->queued_buffers, + priv->max_buffers, GST_TIME_ARGS (priv->queued_time), + GST_TIME_ARGS (priv->max_time)); if (first) { Callbacks *callbacks = NULL; @@ -1983,15 +2455,16 @@ gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer, if (!steal_ref) gst_buffer_list_ref (buflist); gst_queue_array_push_tail (priv->queue, buflist); - priv->queued_bytes += gst_buffer_list_calculate_size (buflist); } else { GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer); if (!steal_ref) gst_buffer_ref (buffer); gst_queue_array_push_tail (priv->queue, buffer); - priv->queued_bytes += gst_buffer_get_size (buffer); } + gst_app_src_update_queued_push (appsrc, + buflist ? GST_MINI_OBJECT_CAST (buflist) : GST_MINI_OBJECT_CAST (buffer)); + if ((priv->wait_status & STREAM_WAITING)) g_cond_broadcast (&priv->cond); diff --git a/gst-libs/gst/app/gstappsrc.h b/gst-libs/gst/app/gstappsrc.h index 82f8c2a4e0..b4d0041b09 100644 --- a/gst-libs/gst/app/gstappsrc.h +++ b/gst-libs/gst/app/gstappsrc.h @@ -154,6 +154,24 @@ guint64 gst_app_src_get_max_bytes (GstAppSrc *appsrc); GST_APP_API guint64 gst_app_src_get_current_level_bytes (GstAppSrc *appsrc); +GST_APP_API +void gst_app_src_set_max_buffers (GstAppSrc *appsrc, guint64 max); + +GST_APP_API +guint64 gst_app_src_get_max_buffers (GstAppSrc *appsrc); + +GST_APP_API +guint64 gst_app_src_get_current_level_buffers (GstAppSrc *appsrc); + +GST_APP_API +void gst_app_src_set_max_time (GstAppSrc *appsrc, GstClockTime max); + +GST_APP_API +GstClockTime gst_app_src_get_max_time (GstAppSrc *appsrc); + +GST_APP_API +GstClockTime gst_app_src_get_current_level_time (GstAppSrc *appsrc); + GST_APP_API void gst_app_src_set_latency (GstAppSrc *appsrc, guint64 min, guint64 max);