diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 8184f7fd84..285dc3b712 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -2459,6 +2459,18 @@ "type": "GFile", "writable": true }, + "is-growing": { + "blurb": "Whether the file is growing, ignoring its end", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "false", + "mutable": "null", + "readable": true, + "type": "gboolean", + "writable": true + }, "location": { "blurb": "URI location to read from", "conditionally-available": false, @@ -2472,7 +2484,19 @@ "writable": true } }, - "rank": "secondary" + "rank": "secondary", + "signals": { + "done-waiting-data": { + "args": [], + "return-type": "void", + "when": "last" + }, + "waiting-data": { + "args": [], + "return-type": "void", + "when": "last" + } + } }, "giostreamsink": { "author": "Sebastian Dröge ", diff --git a/gst/gio/gstgio.h b/gst/gio/gstgio.h index eb4ce223e6..ab27600f5b 100644 --- a/gst/gio/gstgio.h +++ b/gst/gio/gstgio.h @@ -2,7 +2,7 @@ * * Copyright (C) 2007 Rene Stadler * Copyright (C) 2007 Sebastian Dröge - * + * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either diff --git a/gst/gio/gstgiobasesrc.c b/gst/gio/gstgiobasesrc.c index b56a99b442..10d1cf1de4 100644 --- a/gst/gio/gstgiobasesrc.c +++ b/gst/gio/gstgiobasesrc.c @@ -2,7 +2,7 @@ * * Copyright (C) 2007 Rene Stadler * Copyright (C) 2007-2009 Sebastian Dröge - * + * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either @@ -328,6 +328,8 @@ gst_gio_base_src_create (GstBaseSrc * base_src, guint64 offset, guint size, GError *err = NULL; GstBuffer *newbuffer; GstMemory *mem; + gboolean waited_for_data = FALSE; + GstGioBaseSrcClass *klass = GST_GIO_BASE_SRC_GET_CLASS (src); newbuffer = gst_buffer_new (); @@ -384,11 +386,24 @@ gst_gio_base_src_create (GstBaseSrc * base_src, guint64 offset, guint size, while (size - read > 0 && (res = g_input_stream_read (G_INPUT_STREAM (src->stream), map.data + streamread, cachesize - streamread, src->cancel, - &err)) > 0) { + &err)) >= 0) { + read += res; streamread += res; src->position += res; + + if (res != 0) + continue; + + if (!klass->wait_for_data || !klass->wait_for_data (src)) + break; + + waited_for_data = TRUE; } + + if (waited_for_data && klass->waited_for_data) + klass->waited_for_data (src); + gst_memory_unmap (mem, &map); gst_buffer_append_memory (src->cache, mem); diff --git a/gst/gio/gstgiobasesrc.h b/gst/gio/gstgiobasesrc.h index 153bb98c77..390d832477 100644 --- a/gst/gio/gstgiobasesrc.h +++ b/gst/gio/gstgiobasesrc.h @@ -2,7 +2,7 @@ * * Copyright (C) 2007 Rene Stadler * Copyright (C) 2007-2009 Sebastian Dröge - * + * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either @@ -47,7 +47,7 @@ typedef struct _GstGioBaseSrcClass GstGioBaseSrcClass; struct _GstGioBaseSrc { GstBaseSrc src; - + /* < protected > */ GCancellable *cancel; guint64 position; @@ -57,11 +57,17 @@ struct _GstGioBaseSrc GstBuffer *cache; }; -struct _GstGioBaseSrcClass +struct _GstGioBaseSrcClass { GstBaseSrcClass parent_class; GInputStream * (*get_stream) (GstGioBaseSrc *bsrc); + + /* Returns TRUE if the files grew and we should try + reading again, FALSE otherwise */ + gboolean (*wait_for_data) (GstGioBaseSrc *bsrc); + void (*waited_for_data) (GstGioBaseSrc *bsrc); + gboolean close_on_stop; }; diff --git a/gst/gio/gstgiosrc.c b/gst/gio/gstgiosrc.c index 167f941d93..77ee0cd89a 100644 --- a/gst/gio/gstgiosrc.c +++ b/gst/gio/gstgiosrc.c @@ -2,7 +2,7 @@ * * Copyright (C) 2007 Rene Stadler * Copyright (C) 2007-2009 Sebastian Dröge - * + * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either @@ -83,9 +83,13 @@ enum { PROP_0, PROP_LOCATION, - PROP_FILE + PROP_FILE, + PROP_GROWING_FILE, }; +static gint waiting_data_signal = 0; +static gint done_waiting_data_signal = 0; + #define gst_gio_src_parent_class parent_class G_DEFINE_TYPE_WITH_CODE (GstGioSrc, gst_gio_src, GST_TYPE_GIO_BASE_SRC, gst_gio_uri_handler_do_init (g_define_type_id)); @@ -101,6 +105,113 @@ static GInputStream *gst_gio_src_get_stream (GstGioBaseSrc * bsrc); static gboolean gst_gio_src_query (GstBaseSrc * base_src, GstQuery * query); +static void +gst_gio_src_file_changed_cb (GstGioSrc * src) +{ + GST_DEBUG_OBJECT (src, "Underlying file changed."); + GST_OBJECT_LOCK (src); + src->changed = TRUE; + if (src->monitoring_mainloop) + g_main_loop_quit (src->monitoring_mainloop); + GST_OBJECT_UNLOCK (src); +} + +static void +gst_gio_src_waited_for_data (GstGioBaseSrc * bsrc) +{ + GstGioSrc *src = GST_GIO_SRC (bsrc); + + src->waiting_for_data = FALSE; + g_signal_emit (bsrc, done_waiting_data_signal, 0, NULL); +} + +static gboolean +gst_gio_src_wait_for_data (GstGioBaseSrc * bsrc) +{ + GMainContext *ctx; + GstGioSrc *src = GST_GIO_SRC (bsrc); + + g_return_val_if_fail (!src->monitor, FALSE); + + GST_OBJECT_LOCK (src); + if (!src->is_growing) { + GST_OBJECT_UNLOCK (src); + + return FALSE; + } + + src->monitor = g_file_monitor (src->file, G_FILE_MONITOR_NONE, + bsrc->cancel, NULL); + + if (!src->monitor) { + GST_OBJECT_UNLOCK (src); + + GST_WARNING_OBJECT (bsrc, "Could not create a monitor"); + return FALSE; + } + + g_signal_connect_swapped (src->monitor, "changed", + G_CALLBACK (gst_gio_src_file_changed_cb), src); + GST_OBJECT_UNLOCK (src); + + if (!src->waiting_for_data) { + g_signal_emit (src, waiting_data_signal, 0, NULL); + src->waiting_for_data = TRUE; + } + + ctx = g_main_context_new (); + g_main_context_push_thread_default (ctx); + GST_OBJECT_LOCK (src); + src->changed = FALSE; + src->monitoring_mainloop = g_main_loop_new (ctx, FALSE); + GST_OBJECT_UNLOCK (src); + + g_main_loop_run (src->monitoring_mainloop); + + g_signal_handlers_disconnect_by_func (src->monitor, + gst_gio_src_file_changed_cb, src); + + GST_OBJECT_LOCK (src); + gst_clear_object (&src->monitor); + g_main_loop_unref (src->monitoring_mainloop); + src->monitoring_mainloop = NULL; + GST_OBJECT_UNLOCK (src); + + g_main_context_pop_thread_default (ctx); + g_main_context_unref (ctx); + + return src->changed; +} + +static gboolean +gst_gio_src_unlock (GstBaseSrc * base_src) +{ + GstGioSrc *src = GST_GIO_SRC (base_src); + + GST_LOG_OBJECT (src, "triggering cancellation"); + + GST_OBJECT_LOCK (src); + while (src->monitoring_mainloop) { + /* Ensure that we have already started the mainloop */ + if (!g_main_loop_is_running (src->monitoring_mainloop)) { + GST_OBJECT_UNLOCK (src); + + /* Letting a chance for the waiting for data function to cleanup the + * mainloop. */ + g_thread_yield (); + + GST_OBJECT_LOCK (src); + continue; + } + g_main_loop_quit (src->monitoring_mainloop); + break; + } + GST_OBJECT_UNLOCK (src); + + return GST_CALL_PARENT_WITH_DEFAULT (GST_BASE_SRC_CLASS, unlock, (base_src), + TRUE); +} + static void gst_gio_src_class_init (GstGioSrcClass * klass) { @@ -122,12 +233,29 @@ gst_gio_src_class_init (GstGioSrcClass * klass) /** * GstGioSrc:file: * - * %GFile to read from. + * #GFile to read from. */ g_object_class_install_property (gobject_class, PROP_FILE, g_param_spec_object ("file", "File", "GFile to read from", G_TYPE_FILE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** + * GstGioSrc:is-growing: + * + * Whether the file is currently growing. When activated EOS is never pushed + * and the user needs to handle it himself. This modes allows to keep reading + * the file while it is being written on file. + * + * You can reset the property to %FALSE at any time and the file will start + * not being considered growing and EOS will be pushed when required. + * + * Since: 1.20 + */ + g_object_class_install_property (gobject_class, PROP_GROWING_FILE, + g_param_spec_boolean ("is-growing", "File is growing", + "Whether the file is growing, ignoring its end", + FALSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + gst_element_class_set_static_metadata (gstelement_class, "GIO source", "Source/File", "Read from any GIO-supported location", @@ -135,9 +263,34 @@ gst_gio_src_class_init (GstGioSrcClass * klass) "Sebastian Dröge "); gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_gio_src_query); + gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_gio_src_unlock); gstgiobasesrc_class->get_stream = GST_DEBUG_FUNCPTR (gst_gio_src_get_stream); gstgiobasesrc_class->close_on_stop = TRUE; + gstgiobasesrc_class->wait_for_data = gst_gio_src_wait_for_data; + gstgiobasesrc_class->waited_for_data = gst_gio_src_waited_for_data; + + /** + * GstGioSrc::waiting-data: + * + * Signal notifying that we are stalled waiting for data + * + * Since: 1.20 + */ + waiting_data_signal = g_signal_new ("waiting-data", + G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, + NULL, G_TYPE_NONE, 0); + + /** + * GstGioSrc::done-waiting-data: + * + * Signal notifying that we are done waiting for data + * + * Since: 1.20 + */ + done_waiting_data_signal = g_signal_new ("done-waiting-data", + G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, + NULL, G_TYPE_NONE, 0); } static void @@ -193,6 +346,32 @@ gst_gio_src_set_property (GObject * object, guint prop_id, GST_OBJECT_UNLOCK (GST_OBJECT (src)); break; } + case PROP_GROWING_FILE: + { + gboolean was_growing; + + GST_OBJECT_LOCK (src); + was_growing = src->is_growing; + src->is_growing = g_value_get_boolean (value); + gst_base_src_set_dynamic_size (GST_BASE_SRC (src), src->is_growing); + gst_base_src_set_automatic_eos (GST_BASE_SRC (src), !src->is_growing); + + while (was_growing && !src->is_growing && src->monitoring_mainloop) { + /* Ensure that we have already started the mainloop */ + if (!g_main_loop_is_running (src->monitoring_mainloop)) { + GST_OBJECT_UNLOCK (src); + /* Letting a chance for the waiting for data function to cleanup the + * mainloop. */ + GST_OBJECT_LOCK (src); + continue; + } + g_main_loop_quit (src->monitoring_mainloop); + break; + } + GST_OBJECT_UNLOCK (src); + + break; + } case PROP_FILE: if (GST_STATE (src) == GST_STATE_PLAYING || GST_STATE (src) == GST_STATE_PAUSED) { @@ -241,6 +420,11 @@ gst_gio_src_get_property (GObject * object, guint prop_id, g_value_set_object (value, src->file); GST_OBJECT_UNLOCK (GST_OBJECT (src)); break; + case PROP_GROWING_FILE: + { + g_value_set_boolean (value, src->is_growing); + break; + } default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -281,8 +465,10 @@ gst_gio_src_query (GstBaseSrc * base_src, GstQuery * query) gst_query_set_scheduling (query, flags, 1, -1, 0); gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH); - if (flags & GST_SCHEDULING_FLAG_SEEKABLE) + GST_OBJECT_LOCK (src); + if (flags & GST_SCHEDULING_FLAG_SEEKABLE && !src->is_growing) gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL); + GST_OBJECT_UNLOCK (src); res = TRUE; break; diff --git a/gst/gio/gstgiosrc.h b/gst/gio/gstgiosrc.h index c6af239360..c136c2e5c7 100644 --- a/gst/gio/gstgiosrc.h +++ b/gst/gio/gstgiosrc.h @@ -2,7 +2,7 @@ * * Copyright (C) 2007 Rene Stadler * Copyright (C) 2007-2009 Sebastian Dröge - * + * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either @@ -40,9 +40,15 @@ G_DECLARE_FINAL_TYPE (GstGioSrc, gst_gio_src, GST, GIO_SRC, GstGioBaseSrc) struct _GstGioSrc { GstGioBaseSrc src; - + /*< private >*/ GFile *file; + + gboolean is_growing; + GFileMonitor *monitor; + GMainLoop *monitoring_mainloop; + gboolean changed; + gboolean waiting_for_data; }; G_END_DECLS diff --git a/tests/validate/giosrc/read-growing-file.validatetest b/tests/validate/giosrc/read-growing-file.validatetest new file mode 100644 index 0000000000..d101c0f51f --- /dev/null +++ b/tests/validate/giosrc/read-growing-file.validatetest @@ -0,0 +1,42 @@ +# Pipeline with 2 branches, the first one write random data into a file in $(growing_file_location). +# That branch is synchronized on the test clock that we drive in the scenario. +# The second branch reads it with giosrc and does some tests like waiting for the +# `done-waiting-signal` signals on the source etc... +# +# The whole dataflow is checked and we ensure that the exact same buffer content +# is read from the giosrc. +set-globals, growing_file_location="$(logsdir)/$(test_name)-growing.rand" +meta, + seek=false, + handles-states=false, + args = { + "fakesrc num-buffers=30 datarate=30 filltype=pattern-span sizetype=fixed filltype=random format=time ! filesink sync=true location=$(growing_file_location) name=filesink buffer-mode=unbuffered \ + giosrc name=giosrc is-growing=true location=file://$(growing_file_location) ! fakesink name=growing-file-sink async=false" \ + }, + configs = { + "$(validateflow), pad=filesink:sink, record-buffers=true, ignored-fields=\"stream-start={stream-id,group-id,stream}\", buffers-checksum=as-id", + "$(validateflow), pad=growing-file-sink:sink, record-buffers=true, ignored-fields=\"stream-start={stream-id,group-id,stream}\", buffers-checksum=as-id", + } + + +crank-clock, repeat=5 +wait, signal-name=waiting-data, target-element-name=giosrc + +checkpoint + +crank-clock, repeat=5 +wait, signal-name=waiting-data, target-element-name=giosrc + +wait, signal-name=done-waiting-data, target-element-name=giosrc, non-blocking=true +crank-clock, repeat=21 + +checkpoint + +wait, signal-name=waiting-data, target-element-name=giosrc + +checkpoint + +# Make sure EOS is outputted now. +set-properties, giosrc::is_growing=false + +stop, on-message=eos \ No newline at end of file diff --git a/tests/validate/giosrc/read-growing-file/flow-expectations/log-filesink-sink-expected b/tests/validate/giosrc/read-growing-file/flow-expectations/log-filesink-sink-expected new file mode 100644 index 0000000000..69a44e0d76 --- /dev/null +++ b/tests/validate/giosrc/read-growing-file/flow-expectations/log-filesink-sink-expected @@ -0,0 +1,42 @@ +event stream-start: GstEventStreamStart, flags=(GstStreamFlags)GST_STREAM_FLAG_NONE; +event segment: format=TIME, start=0:00:00.000000000, offset=0:00:00.000000000, stop=none, time=0:00:00.000000000, base=0:00:00.000000000, position=0:00:00.000000000 +buffer: content-id=0, dts=0:00:00.000000000, pts=0:00:00.000000000, dur=0:02:16.533333333, flags=discont tag-memory +buffer: content-id=1, dts=0:02:16.533333333, pts=0:02:16.533333333, dur=0:02:16.533333333, flags=tag-memory +buffer: content-id=2, dts=0:04:33.066666666, pts=0:04:33.066666666, dur=0:02:16.533333333, flags=tag-memory +buffer: content-id=3, dts=0:06:49.600000000, pts=0:06:49.600000000, dur=0:02:16.533333333, flags=tag-memory +buffer: content-id=4, dts=0:09:06.133333333, pts=0:09:06.133333333, dur=0:02:16.533333333, flags=tag-memory +buffer: content-id=5, dts=0:11:22.666666666, pts=0:11:22.666666666, dur=0:02:16.533333333, flags=tag-memory + +CHECKPOINT + +buffer: content-id=6, dts=0:13:39.200000000, pts=0:13:39.200000000, dur=0:02:16.533333333, flags=tag-memory +buffer: content-id=7, dts=0:15:55.733333333, pts=0:15:55.733333333, dur=0:02:16.533333333, flags=tag-memory +buffer: content-id=8, dts=0:18:12.266666666, pts=0:18:12.266666666, dur=0:02:16.533333333, flags=tag-memory +buffer: content-id=9, dts=0:20:28.800000000, pts=0:20:28.800000000, dur=0:02:16.533333333, flags=tag-memory +buffer: content-id=10, dts=0:22:45.333333333, pts=0:22:45.333333333, dur=0:02:16.533333333, flags=tag-memory +buffer: content-id=11, dts=0:25:01.866666666, pts=0:25:01.866666666, dur=0:02:16.533333333, flags=tag-memory +buffer: content-id=12, dts=0:27:18.400000000, pts=0:27:18.400000000, dur=0:02:16.533333333, flags=tag-memory +buffer: content-id=13, dts=0:29:34.933333333, pts=0:29:34.933333333, dur=0:02:16.533333333, flags=tag-memory +buffer: content-id=14, dts=0:31:51.466666666, pts=0:31:51.466666666, dur=0:02:16.533333333, flags=tag-memory +buffer: content-id=15, dts=0:34:08.000000000, pts=0:34:08.000000000, dur=0:02:16.533333333, flags=tag-memory +buffer: content-id=16, dts=0:36:24.533333333, pts=0:36:24.533333333, dur=0:02:16.533333333, flags=tag-memory +buffer: content-id=17, dts=0:38:41.066666666, pts=0:38:41.066666666, dur=0:02:16.533333333, flags=tag-memory +buffer: content-id=18, dts=0:40:57.600000000, pts=0:40:57.600000000, dur=0:02:16.533333333, flags=tag-memory +buffer: content-id=19, dts=0:43:14.133333333, pts=0:43:14.133333333, dur=0:02:16.533333333, flags=tag-memory +buffer: content-id=20, dts=0:45:30.666666666, pts=0:45:30.666666666, dur=0:02:16.533333333, flags=tag-memory +buffer: content-id=21, dts=0:47:47.200000000, pts=0:47:47.200000000, dur=0:02:16.533333333, flags=tag-memory +buffer: content-id=22, dts=0:50:03.733333333, pts=0:50:03.733333333, dur=0:02:16.533333333, flags=tag-memory +buffer: content-id=23, dts=0:52:20.266666666, pts=0:52:20.266666666, dur=0:02:16.533333333, flags=tag-memory +buffer: content-id=24, dts=0:54:36.800000000, pts=0:54:36.800000000, dur=0:02:16.533333333, flags=tag-memory +buffer: content-id=25, dts=0:56:53.333333333, pts=0:56:53.333333333, dur=0:02:16.533333333, flags=tag-memory +buffer: content-id=26, dts=0:59:09.866666666, pts=0:59:09.866666666, dur=0:02:16.533333333, flags=tag-memory +buffer: content-id=27, dts=1:01:26.400000000, pts=1:01:26.400000000, dur=0:02:16.533333333, flags=tag-memory +buffer: content-id=28, dts=1:03:42.933333333, pts=1:03:42.933333333, dur=0:02:16.533333333, flags=tag-memory +buffer: content-id=29, dts=1:05:59.466666666, pts=1:05:59.466666666, dur=0:02:16.533333333, flags=tag-memory +event eos: (no structure) + +CHECKPOINT + + +CHECKPOINT + diff --git a/tests/validate/giosrc/read-growing-file/flow-expectations/log-growing-file-sink-sink-expected b/tests/validate/giosrc/read-growing-file/flow-expectations/log-growing-file-sink-sink-expected new file mode 100644 index 0000000000..1d6de5d0b4 --- /dev/null +++ b/tests/validate/giosrc/read-growing-file/flow-expectations/log-growing-file-sink-sink-expected @@ -0,0 +1,42 @@ +event stream-start: GstEventStreamStart, flags=(GstStreamFlags)GST_STREAM_FLAG_NONE; +event segment: format=BYTES, start=0, offset=0, stop=18446744073709551615, time=0, base=0, position=0, duration=0 +buffer: content-id=0, dts=0:00:00.000000000, pts=0:00:00.000000000, flags=discont tag-memory +buffer: content-id=1, flags=tag-memory +buffer: content-id=2, flags=tag-memory +buffer: content-id=3, flags=tag-memory +buffer: content-id=4, flags=tag-memory + +CHECKPOINT + +buffer: content-id=5, flags=tag-memory +buffer: content-id=6, flags=tag-memory +buffer: content-id=7, flags=tag-memory +buffer: content-id=8, flags=tag-memory +buffer: content-id=9, flags=tag-memory + +CHECKPOINT + +buffer: content-id=10, flags=tag-memory +buffer: content-id=11, flags=tag-memory +buffer: content-id=12, flags=tag-memory +buffer: content-id=13, flags=tag-memory +buffer: content-id=14, flags=tag-memory +buffer: content-id=15, flags=tag-memory +buffer: content-id=16, flags=tag-memory +buffer: content-id=17, flags=tag-memory +buffer: content-id=18, flags=tag-memory +buffer: content-id=19, flags=tag-memory +buffer: content-id=20, flags=tag-memory +buffer: content-id=21, flags=tag-memory +buffer: content-id=22, flags=tag-memory +buffer: content-id=23, flags=tag-memory +buffer: content-id=24, flags=tag-memory +buffer: content-id=25, flags=tag-memory +buffer: content-id=26, flags=tag-memory +buffer: content-id=27, flags=tag-memory +buffer: content-id=28, flags=tag-memory +buffer: content-id=29, flags=tag-memory + +CHECKPOINT + +event eos: (no structure) diff --git a/tests/validate/meson.build b/tests/validate/meson.build index 776073c075..882a560663 100644 --- a/tests/validate/meson.build +++ b/tests/validate/meson.build @@ -18,6 +18,7 @@ tests = [ 'videorate/rate_2_0', 'videorate/rate_2_0_with_decoder', 'compositor/renogotiate_failing_unsupported_src_format', + 'giosrc/read-growing-file', ] env = environment()