Skip to content

Commit

Permalink
giosrc: Add support for growing source files
Browse files Browse the repository at this point in the history
Add a way for applications to specify that the underlying file is
growing which implies that the source won't EOS when reaching the end
of the file but instead start monitoring it and start reading it again
whenever a change is detected.

Also add a validate test to check the behavior

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/merge_requests/937>
  • Loading branch information
thiblahute committed Dec 8, 2020
1 parent a4ba868 commit f1f966d
Show file tree
Hide file tree
Showing 9 changed files with 369 additions and 5 deletions.
26 changes: 25 additions & 1 deletion docs/plugins/gst_plugins_cache.json
Expand Up @@ -2459,6 +2459,18 @@
"type": "GFile",
"writable": true
},
"is-growing": {
"blurb": "Whether the file is growing, ignoring its end",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "false",
"mutable": "null",
"readable": true,
"type": "gboolean",
"writable": true
},
"location": {
"blurb": "URI location to read from",
"conditionally-available": false,
Expand All @@ -2472,7 +2484,19 @@
"writable": true
}
},
"rank": "secondary"
"rank": "secondary",
"signals": {
"done-waiting-data": {
"args": [],
"return-type": "void",
"when": "last"
},
"waiting-data": {
"args": [],
"return-type": "void",
"when": "last"
}
}
},
"giostreamsink": {
"author": "Sebastian Dröge <sebastian.droege@collabora.co.uk>",
Expand Down
Empty file modified gst/gio/gstgio.h
Whitespace-only changes.
17 changes: 16 additions & 1 deletion gst/gio/gstgiobasesrc.c
Expand Up @@ -328,6 +328,8 @@ gst_gio_base_src_create (GstBaseSrc * base_src, guint64 offset, guint size,
GError *err = NULL;
GstBuffer *newbuffer;
GstMemory *mem;
gboolean waited_for_data = FALSE;
GstGioBaseSrcClass *klass = GST_GIO_BASE_SRC_GET_CLASS (src);

newbuffer = gst_buffer_new ();

Expand Down Expand Up @@ -384,11 +386,24 @@ gst_gio_base_src_create (GstBaseSrc * base_src, guint64 offset, guint size,
while (size - read > 0 && (res =
g_input_stream_read (G_INPUT_STREAM (src->stream),
map.data + streamread, cachesize - streamread, src->cancel,
&err)) > 0) {
&err)) >= 0) {

read += res;
streamread += res;
src->position += res;

if (res != 0)
continue;

if (!klass->wait_for_data || !klass->wait_for_data (src))
break;

waited_for_data = TRUE;
}

if (waited_for_data && klass->waited_for_data)
klass->waited_for_data (src);

gst_memory_unmap (mem, &map);
gst_buffer_append_memory (src->cache, mem);

Expand Down
6 changes: 6 additions & 0 deletions gst/gio/gstgiobasesrc.h
Expand Up @@ -62,6 +62,12 @@ struct _GstGioBaseSrcClass
GstBaseSrcClass parent_class;

GInputStream * (*get_stream) (GstGioBaseSrc *bsrc);

/* Returns TRUE if the files grew and we should try
reading again, FALSE otherwise */
gboolean (*wait_for_data) (GstGioBaseSrc *bsrc);
void (*waited_for_data) (GstGioBaseSrc *bsrc);

gboolean close_on_stop;
};

Expand Down
192 changes: 189 additions & 3 deletions gst/gio/gstgiosrc.c
Expand Up @@ -83,9 +83,13 @@ enum
{
PROP_0,
PROP_LOCATION,
PROP_FILE
PROP_FILE,
PROP_GROWING_FILE,
};

static gint waiting_data_signal = 0;
static gint done_waiting_data_signal = 0;

#define gst_gio_src_parent_class parent_class
G_DEFINE_TYPE_WITH_CODE (GstGioSrc, gst_gio_src,
GST_TYPE_GIO_BASE_SRC, gst_gio_uri_handler_do_init (g_define_type_id));
Expand All @@ -101,6 +105,113 @@ static GInputStream *gst_gio_src_get_stream (GstGioBaseSrc * bsrc);

static gboolean gst_gio_src_query (GstBaseSrc * base_src, GstQuery * query);

static void
gst_gio_src_file_changed_cb (GstGioSrc * src)
{
GST_DEBUG_OBJECT (src, "Underlying file changed.");
GST_OBJECT_LOCK (src);
src->changed = TRUE;
if (src->monitoring_mainloop)
g_main_loop_quit (src->monitoring_mainloop);
GST_OBJECT_UNLOCK (src);
}

static void
gst_gio_src_waited_for_data (GstGioBaseSrc * bsrc)
{
GstGioSrc *src = GST_GIO_SRC (bsrc);

src->waiting_for_data = FALSE;
g_signal_emit (bsrc, done_waiting_data_signal, 0, NULL);
}

static gboolean
gst_gio_src_wait_for_data (GstGioBaseSrc * bsrc)
{
GMainContext *ctx;
GstGioSrc *src = GST_GIO_SRC (bsrc);

g_return_val_if_fail (!src->monitor, FALSE);

GST_OBJECT_LOCK (src);
if (!src->is_growing) {
GST_OBJECT_UNLOCK (src);

return FALSE;
}

src->monitor = g_file_monitor (src->file, G_FILE_MONITOR_NONE,
bsrc->cancel, NULL);

if (!src->monitor) {
GST_OBJECT_UNLOCK (src);

GST_WARNING_OBJECT (bsrc, "Could not create a monitor");
return FALSE;
}

g_signal_connect_swapped (src->monitor, "changed",
G_CALLBACK (gst_gio_src_file_changed_cb), src);
GST_OBJECT_UNLOCK (src);

if (!src->waiting_for_data) {
g_signal_emit (src, waiting_data_signal, 0, NULL);
src->waiting_for_data = TRUE;
}

ctx = g_main_context_new ();
g_main_context_push_thread_default (ctx);
GST_OBJECT_LOCK (src);
src->changed = FALSE;
src->monitoring_mainloop = g_main_loop_new (ctx, FALSE);
GST_OBJECT_UNLOCK (src);

g_main_loop_run (src->monitoring_mainloop);

g_signal_handlers_disconnect_by_func (src->monitor,
gst_gio_src_file_changed_cb, src);

GST_OBJECT_LOCK (src);
gst_clear_object (&src->monitor);
g_main_loop_unref (src->monitoring_mainloop);
src->monitoring_mainloop = NULL;
GST_OBJECT_UNLOCK (src);

g_main_context_pop_thread_default (ctx);
g_main_context_unref (ctx);

return src->changed;
}

static gboolean
gst_gio_src_unlock (GstBaseSrc * base_src)
{
GstGioSrc *src = GST_GIO_SRC (base_src);

GST_LOG_OBJECT (src, "triggering cancellation");

GST_OBJECT_LOCK (src);
while (src->monitoring_mainloop) {
/* Ensure that we have already started the mainloop */
if (!g_main_loop_is_running (src->monitoring_mainloop)) {
GST_OBJECT_UNLOCK (src);

/* Letting a chance for the waiting for data function to cleanup the
* mainloop. */
g_thread_yield ();

GST_OBJECT_LOCK (src);
continue;
}
g_main_loop_quit (src->monitoring_mainloop);
break;
}
GST_OBJECT_UNLOCK (src);

return GST_CALL_PARENT_WITH_DEFAULT (GST_BASE_SRC_CLASS, unlock, (base_src),
TRUE);
}

static void
gst_gio_src_class_init (GstGioSrcClass * klass)
{
Expand All @@ -122,22 +233,64 @@ gst_gio_src_class_init (GstGioSrcClass * klass)
/**
* GstGioSrc:file:
*
* %GFile to read from.
* #GFile to read from.
*/
g_object_class_install_property (gobject_class, PROP_FILE,
g_param_spec_object ("file", "File", "GFile to read from",
G_TYPE_FILE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

/**
* GstGioSrc:is-growing:
*
* Whether the file is currently growing. When activated EOS is never pushed
* and the user needs to handle it himself. This modes allows to keep reading
* the file while it is being written on file.
*
* You can reset the property to %FALSE at any time and the file will start
* not being considered growing and EOS will be pushed when required.
*
* Since: 1.20
*/
g_object_class_install_property (gobject_class, PROP_GROWING_FILE,
g_param_spec_boolean ("is-growing", "File is growing",
"Whether the file is growing, ignoring its end",
FALSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

gst_element_class_set_static_metadata (gstelement_class, "GIO source",
"Source/File",
"Read from any GIO-supported location",
"Ren\xc3\xa9 Stadler <mail@renestadler.de>, "
"Sebastian Dröge <sebastian.droege@collabora.co.uk>");

gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_gio_src_query);
gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_gio_src_unlock);

gstgiobasesrc_class->get_stream = GST_DEBUG_FUNCPTR (gst_gio_src_get_stream);
gstgiobasesrc_class->close_on_stop = TRUE;
gstgiobasesrc_class->wait_for_data = gst_gio_src_wait_for_data;
gstgiobasesrc_class->waited_for_data = gst_gio_src_waited_for_data;

/**
* GstGioSrc::waiting-data:
*
* Signal notifying that we are stalled waiting for data
*
* Since: 1.20
*/
waiting_data_signal = g_signal_new ("waiting-data",
G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL,
NULL, G_TYPE_NONE, 0);

/**
* GstGioSrc::done-waiting-data:
*
* Signal notifying that we are done waiting for data
*
* Since: 1.20
*/
done_waiting_data_signal = g_signal_new ("done-waiting-data",
G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL,
NULL, G_TYPE_NONE, 0);
}

static void
Expand Down Expand Up @@ -193,6 +346,32 @@ gst_gio_src_set_property (GObject * object, guint prop_id,
GST_OBJECT_UNLOCK (GST_OBJECT (src));
break;
}
case PROP_GROWING_FILE:
{
gboolean was_growing;

GST_OBJECT_LOCK (src);
was_growing = src->is_growing;
src->is_growing = g_value_get_boolean (value);
gst_base_src_set_dynamic_size (GST_BASE_SRC (src), src->is_growing);
gst_base_src_set_automatic_eos (GST_BASE_SRC (src), !src->is_growing);

while (was_growing && !src->is_growing && src->monitoring_mainloop) {
/* Ensure that we have already started the mainloop */
if (!g_main_loop_is_running (src->monitoring_mainloop)) {
GST_OBJECT_UNLOCK (src);
/* Letting a chance for the waiting for data function to cleanup the
* mainloop. */
GST_OBJECT_LOCK (src);
continue;
}
g_main_loop_quit (src->monitoring_mainloop);
break;
}
GST_OBJECT_UNLOCK (src);

break;
}
case PROP_FILE:
if (GST_STATE (src) == GST_STATE_PLAYING ||
GST_STATE (src) == GST_STATE_PAUSED) {
Expand Down Expand Up @@ -241,6 +420,11 @@ gst_gio_src_get_property (GObject * object, guint prop_id,
g_value_set_object (value, src->file);
GST_OBJECT_UNLOCK (GST_OBJECT (src));
break;
case PROP_GROWING_FILE:
{
g_value_set_boolean (value, src->is_growing);
break;
}
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
Expand Down Expand Up @@ -281,8 +465,10 @@ gst_gio_src_query (GstBaseSrc * base_src, GstQuery * query)

gst_query_set_scheduling (query, flags, 1, -1, 0);
gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
if (flags & GST_SCHEDULING_FLAG_SEEKABLE)
GST_OBJECT_LOCK (src);
if (flags & GST_SCHEDULING_FLAG_SEEKABLE && !src->is_growing)
gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
GST_OBJECT_UNLOCK (src);

res = TRUE;
break;
Expand Down
6 changes: 6 additions & 0 deletions gst/gio/gstgiosrc.h
Expand Up @@ -43,6 +43,12 @@ struct _GstGioSrc

/*< private >*/
GFile *file;

gboolean is_growing;
GFileMonitor *monitor;
GMainLoop *monitoring_mainloop;
gboolean changed;
gboolean waiting_for_data;
};

G_END_DECLS
Expand Down
42 changes: 42 additions & 0 deletions tests/validate/giosrc/read-growing-file.validatetest
@@ -0,0 +1,42 @@
# Pipeline with 2 branches, the first one write random data into a file in $(growing_file_location).
# That branch is synchronized on the test clock that we drive in the scenario.
# The second branch reads it with giosrc and does some tests like waiting for the
# `done-waiting-signal` signals on the source etc...
#
# The whole dataflow is checked and we ensure that the exact same buffer content
# is read from the giosrc.
set-globals, growing_file_location="$(logsdir)/$(test_name)-growing.rand"
meta,
seek=false,
handles-states=false,
args = {
"fakesrc num-buffers=30 datarate=30 filltype=pattern-span sizetype=fixed filltype=random format=time ! filesink sync=true location=$(growing_file_location) name=filesink buffer-mode=unbuffered \
giosrc name=giosrc is-growing=true location=file://$(growing_file_location) ! fakesink name=growing-file-sink async=false" \
},
configs = {
"$(validateflow), pad=filesink:sink, record-buffers=true, ignored-fields=\"stream-start={stream-id,group-id,stream}\", buffers-checksum=as-id",
"$(validateflow), pad=growing-file-sink:sink, record-buffers=true, ignored-fields=\"stream-start={stream-id,group-id,stream}\", buffers-checksum=as-id",
}


crank-clock, repeat=5
wait, signal-name=waiting-data, target-element-name=giosrc

checkpoint

crank-clock, repeat=5
wait, signal-name=waiting-data, target-element-name=giosrc

wait, signal-name=done-waiting-data, target-element-name=giosrc, non-blocking=true
crank-clock, repeat=21

checkpoint

wait, signal-name=waiting-data, target-element-name=giosrc

checkpoint

# Make sure EOS is outputted now.
set-properties, giosrc::is_growing=false

stop, on-message=eos

0 comments on commit f1f966d

Please sign in to comment.