Skip to content

Commit

Permalink
bus: Ensure that only one GSource can be attached to the bus
Browse files Browse the repository at this point in the history
Until now we were enforcing that only 1 signal GSource was attached
the bus but we could attach as many GSource with `gst_bus_create_watch`
as we wanted... but in the end only 1 GSource will ever be dispatched for
a given `GstMessage` leading to totally broken behavior.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/718>
  • Loading branch information
thiblahute authored and GStreamer Merge Bot committed Dec 11, 2020
1 parent 5db76b4 commit 0daa48f
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 34 deletions.
95 changes: 61 additions & 34 deletions gst/gstbus.c
Expand Up @@ -147,7 +147,7 @@ struct _GstBusPrivate
guint num_signal_watchers;

guint num_sync_message_emitters;
GSource *signal_watch;
GSource *gsource;

gboolean enable_async;
GstPoll *poll;
Expand Down Expand Up @@ -869,8 +869,8 @@ gst_bus_source_dispose (GSource * source)
GST_DEBUG_OBJECT (bus, "disposing source %p", source);

GST_OBJECT_LOCK (bus);
if (bus->priv->signal_watch == source)
bus->priv->signal_watch = NULL;
if (bus->priv->gsource == source)
bus->priv->gsource = NULL;
GST_OBJECT_UNLOCK (bus);
}
#endif
Expand All @@ -885,13 +885,12 @@ gst_bus_source_finalize (GSource * source)
GST_DEBUG_OBJECT (bus, "finalize source %p", source);

GST_OBJECT_LOCK (bus);
if (bus->priv->signal_watch == source)
bus->priv->signal_watch = NULL;
if (bus->priv->gsource == source)
bus->priv->gsource = NULL;
GST_OBJECT_UNLOCK (bus);
#endif

gst_object_unref (bsource->bus);
bsource->bus = NULL;
gst_clear_object (&bsource->bus);
}

static GSourceFuncs gst_bus_source_funcs = {
Expand All @@ -901,6 +900,33 @@ static GSourceFuncs gst_bus_source_funcs = {
gst_bus_source_finalize
};


static GSource *
gst_bus_create_watch_unlocked (GstBus * bus)
{
GstBusSource *source;

if (bus->priv->gsource) {
GST_ERROR_OBJECT (bus,
"Tried to add new GSource while one was already there");
return NULL;
}

bus->priv->gsource = g_source_new (&gst_bus_source_funcs,
sizeof (GstBusSource));
source = (GstBusSource *) bus->priv->gsource;

g_source_set_name ((GSource *) source, "GStreamer message bus watch");
#if GLIB_CHECK_VERSION(2,63,3)
g_source_set_dispose_function ((GSource *) source, gst_bus_source_dispose);
#endif

source->bus = gst_object_ref (bus);
g_source_add_poll ((GSource *) source, &bus->priv->pollfd);

return (GSource *) source;
}

/**
* gst_bus_create_watch:
* @bus: a #GstBus to create the watch for
Expand All @@ -909,28 +935,24 @@ static GSourceFuncs gst_bus_source_funcs = {
* a message is on the bus. After the GSource is dispatched, the
* message is popped off the bus and unreffed.
*
* As with other watches, there can only be one watch on the bus, including
* any signal watch added with #gst_bus_add_signal_watch.
*
* Returns: (transfer full) (nullable): a #GSource that can be added to a mainloop.
*/
GSource *
gst_bus_create_watch (GstBus * bus)
{
GstBusSource *source;
GSource *source;

g_return_val_if_fail (GST_IS_BUS (bus), NULL);
g_return_val_if_fail (bus->priv->poll != NULL, NULL);

source = (GstBusSource *) g_source_new (&gst_bus_source_funcs,
sizeof (GstBusSource));

g_source_set_name ((GSource *) source, "GStreamer message bus watch");
#if GLIB_CHECK_VERSION(2,63,3)
g_source_set_dispose_function ((GSource *) source, gst_bus_source_dispose);
#endif

source->bus = gst_object_ref (bus);
g_source_add_poll ((GSource *) source, &bus->priv->pollfd);
GST_OBJECT_LOCK (bus);
source = gst_bus_create_watch_unlocked (bus);
GST_OBJECT_UNLOCK (bus);

return (GSource *) source;
return source;
}

/* must be called with the bus OBJECT LOCK */
Expand All @@ -942,13 +964,13 @@ gst_bus_add_watch_full_unlocked (GstBus * bus, gint priority,
guint id;
GSource *source;

if (bus->priv->signal_watch) {
if (bus->priv->gsource) {
GST_ERROR_OBJECT (bus,
"Tried to add new watch while one was already there");
return 0;
}

source = gst_bus_create_watch (bus);
source = gst_bus_create_watch_unlocked (bus);
if (!source) {
g_critical ("Creating bus watch failed");
return 0;
Expand All @@ -964,7 +986,7 @@ gst_bus_add_watch_full_unlocked (GstBus * bus, gint priority,
g_source_unref (source);

if (id) {
bus->priv->signal_watch = source;
bus->priv->gsource = source;
}

GST_DEBUG_OBJECT (bus, "New source %p with id %u", source, id);
Expand Down Expand Up @@ -1076,7 +1098,7 @@ gst_bus_remove_watch (GstBus * bus)

GST_OBJECT_LOCK (bus);

if (bus->priv->signal_watch == NULL) {
if (bus->priv->gsource == NULL) {
GST_ERROR_OBJECT (bus, "no bus watch was present");
goto error;
}
Expand All @@ -1087,9 +1109,8 @@ gst_bus_remove_watch (GstBus * bus)
goto error;
}

source =
bus->priv->signal_watch ? g_source_ref (bus->priv->signal_watch) : NULL;
bus->priv->signal_watch = NULL;
source = g_source_ref (bus->priv->gsource);
bus->priv->gsource = NULL;

GST_OBJECT_UNLOCK (bus);

Expand Down Expand Up @@ -1411,13 +1432,13 @@ gst_bus_add_signal_watch_full (GstBus * bus, gint priority)
if (bus->priv->num_signal_watchers > 0)
goto done;

/* this should not fail because the counter above takes care of it */
g_assert (!bus->priv->signal_watch);
if (bus->priv->gsource)
goto has_gsource;

gst_bus_add_watch_full_unlocked (bus, priority, gst_bus_async_signal_func,
NULL, NULL);

if (G_UNLIKELY (!bus->priv->signal_watch))
if (G_UNLIKELY (!bus->priv->gsource))
goto add_failed;

done:
Expand All @@ -1434,6 +1455,12 @@ gst_bus_add_signal_watch_full (GstBus * bus, gint priority)
GST_OBJECT_UNLOCK (bus);
return;
}
has_gsource:
{
g_critical ("Bus %s already has a GSource watch", GST_OBJECT_NAME (bus));
GST_OBJECT_UNLOCK (bus);
return;
}
}

/**
Expand Down Expand Up @@ -1487,12 +1514,12 @@ gst_bus_remove_signal_watch (GstBus * bus)
if (bus->priv->num_signal_watchers > 0)
goto done;

GST_DEBUG_OBJECT (bus, "removing signal watch %u",
g_source_get_id (bus->priv->signal_watch));
GST_DEBUG_OBJECT (bus, "removing gsource %u",
g_source_get_id (bus->priv->gsource));

source =
bus->priv->signal_watch ? g_source_ref (bus->priv->signal_watch) : NULL;
bus->priv->signal_watch = NULL;
g_assert (bus->priv->gsource);
source = g_source_ref (bus->priv->gsource);
bus->priv->gsource = NULL;

done:
GST_OBJECT_UNLOCK (bus);
Expand Down
27 changes: 27 additions & 0 deletions tests/check/gst/gstbus.c
Expand Up @@ -900,6 +900,32 @@ GST_START_TEST (test_async_message)

GST_END_TEST;

GST_START_TEST (test_single_gsource)
{
GstBus *bus = gst_bus_new ();
GSource *source = gst_bus_create_watch (bus);
g_source_attach (source, NULL);
g_source_unref (source);

source = gst_bus_create_watch (bus);
fail_if (source, "Only one GSource can be added to a bus");

ASSERT_CRITICAL (gst_bus_add_signal_watch (bus));
ASSERT_CRITICAL (gst_bus_remove_signal_watch (bus));

fail_unless (gst_bus_remove_watch (bus), "Could not remove watch");
gst_bus_add_signal_watch (bus);

fail_if (gst_bus_remove_watch (bus), "Signal watch should be removed"
" with gst_bus_remove_signal_watch");

gst_bus_remove_signal_watch (bus);

gst_object_unref (bus);
}

GST_END_TEST;

static Suite *
gst_bus_suite (void)
{
Expand All @@ -922,6 +948,7 @@ gst_bus_suite (void)
tcase_add_test (tc_chain, test_timed_pop_filtered_with_timeout);
tcase_add_test (tc_chain, test_custom_main_context);
tcase_add_test (tc_chain, test_async_message);
tcase_add_test (tc_chain, test_single_gsource);
return s;
}

Expand Down

0 comments on commit 0daa48f

Please sign in to comment.