Skip to content

Commit

Permalink
rtpbasedepayload: handle caps change partway through buffer list
Browse files Browse the repository at this point in the history
While preparing a blist for pushing, some RTP header extension may
request caps change for a specific buffer in the list. When this
happens, depayloader should immediately push those buffers from the list
that precede the currently processed buffer (for which the caps change
was requested) and only then apply the new caps to the src pad.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/merge_requests/1011>
  • Loading branch information
xhaakon committed Mar 12, 2021
1 parent c222f32 commit 1a87a65
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 69 deletions.
192 changes: 123 additions & 69 deletions gst-libs/gst/rtp/gstrtpbasedepayload.c
Expand Up @@ -60,8 +60,6 @@ struct _GstRTPBaseDepayloadPrivate
gboolean negotiated;

GstCaps *last_caps;
gboolean needs_src_caps_update;

GstEvent *segment_event;
guint32 segment_seqnum; /* Note: this is a GstEvent seqnum */

Expand Down Expand Up @@ -1141,23 +1139,24 @@ gst_rtp_base_depayload_clear_extensions (GstRTPBaseDepayload * rtpbasepayload)
GST_OBJECT_UNLOCK (rtpbasepayload);
}

static void
static gboolean
read_rtp_header_extensions (GstRTPBaseDepayload * depayload,
GstBuffer * input, GstBuffer * output)
{
GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
guint16 bit_pattern;
guint8 *pdata;
guint wordlen;
gboolean needs_src_caps_update = FALSE;

if (!input) {
GST_DEBUG_OBJECT (depayload, "no input buffer");
return;
return needs_src_caps_update;
}

if (!gst_rtp_buffer_map (input, GST_MAP_READ, &rtp)) {
GST_WARNING_OBJECT (depayload, "Failed to map buffer");
return;
return needs_src_caps_update;
}

if (gst_rtp_buffer_get_extension_data (&rtp, &bit_pattern, (gpointer) & pdata,
Expand Down Expand Up @@ -1244,7 +1243,7 @@ read_rtp_header_extensions (GstRTPBaseDepayload * depayload,
}

if (gst_rtp_header_extension_wants_update_non_rtp_src_caps (ext)) {
depayload->priv->needs_src_caps_update = TRUE;
needs_src_caps_update = TRUE;
}

gst_object_unref (ext);
Expand All @@ -1257,32 +1256,33 @@ read_rtp_header_extensions (GstRTPBaseDepayload * depayload,

out:
gst_rtp_buffer_unmap (&rtp);

return needs_src_caps_update;
}

static gboolean
set_headers (GstBuffer ** buffer, guint idx, GstRTPBaseDepayload * depayload)
gst_rtp_base_depayload_set_headers (GstRTPBaseDepayload * depayload,
GstBuffer * buffer)
{
GstRTPBaseDepayloadPrivate *priv = depayload->priv;
GstClockTime pts, dts, duration;

*buffer = gst_buffer_make_writable (*buffer);

pts = GST_BUFFER_PTS (*buffer);
dts = GST_BUFFER_DTS (*buffer);
duration = GST_BUFFER_DURATION (*buffer);
pts = GST_BUFFER_PTS (buffer);
dts = GST_BUFFER_DTS (buffer);
duration = GST_BUFFER_DURATION (buffer);

/* apply last incoming timestamp and duration to outgoing buffer if
* not otherwise set. */
if (!GST_CLOCK_TIME_IS_VALID (pts))
GST_BUFFER_PTS (*buffer) = priv->pts;
GST_BUFFER_PTS (buffer) = priv->pts;
if (!GST_CLOCK_TIME_IS_VALID (dts))
GST_BUFFER_DTS (*buffer) = priv->dts;
GST_BUFFER_DTS (buffer) = priv->dts;
if (!GST_CLOCK_TIME_IS_VALID (duration))
GST_BUFFER_DURATION (*buffer) = priv->duration;
GST_BUFFER_DURATION (buffer) = priv->duration;

if (G_UNLIKELY (depayload->priv->discont)) {
GST_LOG_OBJECT (depayload, "Marking DISCONT on output buffer");
GST_BUFFER_FLAG_SET (*buffer, GST_BUFFER_FLAG_DISCONT);
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
depayload->priv->discont = FALSE;
}

Expand All @@ -1293,74 +1293,138 @@ set_headers (GstBuffer ** buffer, guint idx, GstRTPBaseDepayload * depayload)

if (priv->input_buffer) {
if (priv->source_info)
add_rtp_source_meta (*buffer, priv->input_buffer);
add_rtp_source_meta (buffer, priv->input_buffer);

read_rtp_header_extensions (depayload, priv->input_buffer, *buffer);
return read_rtp_header_extensions (depayload, priv->input_buffer, buffer);
}

return TRUE;
return FALSE;
}

static GstFlowReturn
gst_rtp_base_depayload_prepare_push (GstRTPBaseDepayload * filter,
gst_rtp_base_depayload_finish_push (GstRTPBaseDepayload * filter,
gboolean is_list, gpointer obj)
{
/* if this is the first buffer send a NEWSEGMENT */
if (G_UNLIKELY (filter->priv->segment_event)) {
gst_pad_push_event (filter->srcpad, filter->priv->segment_event);
filter->priv->segment_event = NULL;
GST_DEBUG_OBJECT (filter, "Pushed newsegment event on this first buffer");
}

if (is_list) {
GstBufferList **blist = obj;
gst_buffer_list_foreach (*blist, (GstBufferListFunc) set_headers, filter);
GstBufferList *blist = obj;
return gst_pad_push_list (filter->srcpad, blist);
} else {
GstBuffer **buf = obj;
set_headers (buf, 0, filter);
GstBuffer *buf = obj;
return gst_pad_push (filter->srcpad, buf);
}
}

/* header extensions may want to update src caps */
if (G_UNLIKELY (filter->priv->needs_src_caps_update)) {
GstCaps *src_caps = gst_pad_get_current_caps (filter->srcpad);
static gboolean
gst_rtp_base_depayload_set_src_caps_from_hdrext (GstRTPBaseDepayload * filter)
{
gboolean update_ok = TRUE;
GstCaps *src_caps = gst_pad_get_current_caps (filter->srcpad);

if (src_caps) {
GstCaps *new_caps;
gboolean update_ok = TRUE;
gint i;
if (src_caps) {
GstCaps *new_caps;
gint i;

new_caps = gst_caps_copy (src_caps);
for (i = 0; i < filter->priv->header_exts->len; i++) {
GstRTPHeaderExtension *ext;
new_caps = gst_caps_copy (src_caps);
for (i = 0; i < filter->priv->header_exts->len; i++) {
GstRTPHeaderExtension *ext;

ext = g_ptr_array_index (filter->priv->header_exts, i);
update_ok =
gst_rtp_header_extension_update_non_rtp_src_caps (ext, new_caps);
ext = g_ptr_array_index (filter->priv->header_exts, i);
update_ok =
gst_rtp_header_extension_update_non_rtp_src_caps (ext, new_caps);

if (!update_ok) {
GST_ELEMENT_ERROR (filter, STREAM, DECODE,
("RTP header extension (%s) could not update src caps",
GST_OBJECT_NAME (ext)), (NULL));
break;
}
if (!update_ok) {
GST_ELEMENT_ERROR (filter, STREAM, DECODE,
("RTP header extension (%s) could not update src caps",
GST_OBJECT_NAME (ext)), (NULL));
break;
}
}

if (G_UNLIKELY (update_ok && !gst_caps_is_equal (src_caps, new_caps))) {
gst_pad_set_caps (filter->srcpad, new_caps);
}

if (G_UNLIKELY (update_ok && !gst_caps_is_equal (src_caps, new_caps))) {
gst_pad_set_caps (filter->srcpad, new_caps);
gst_caps_unref (src_caps);
gst_caps_unref (new_caps);
}

return update_ok;
}

static GstFlowReturn
gst_rtp_base_depayload_do_push (GstRTPBaseDepayload * filter, gboolean is_list,
gpointer obj)
{
GstFlowReturn res;

if (is_list) {
GstBufferList *blist = obj;
guint i;
guint first_not_pushed_idx = 0;

for (i = 0; i < gst_buffer_list_length (blist); ++i) {
GstBuffer *buf = gst_buffer_list_get_writable (blist, i);

if (G_UNLIKELY (gst_rtp_base_depayload_set_headers (filter, buf))) {
/* src caps have changed; push the buffers preceding the current one,
* then apply the new caps on the src pad */
guint j;

for (j = first_not_pushed_idx; j < i; ++j) {
res = gst_rtp_base_depayload_finish_push (filter, FALSE,
gst_buffer_ref (gst_buffer_list_get (blist, j)));
if (G_UNLIKELY (res != GST_FLOW_OK)) {
goto error_list;
}
}
first_not_pushed_idx = i;

if (!gst_rtp_base_depayload_set_src_caps_from_hdrext (filter)) {
res = GST_FLOW_ERROR;
goto error_list;
}
}
}

gst_caps_unref (src_caps);
gst_caps_unref (new_caps);
if (G_LIKELY (first_not_pushed_idx == 0)) {
res = gst_rtp_base_depayload_finish_push (filter, TRUE, blist);
blist = NULL;
} else {
for (i = first_not_pushed_idx; i < gst_buffer_list_length (blist); ++i) {
res = gst_rtp_base_depayload_finish_push (filter, FALSE,
gst_buffer_ref (gst_buffer_list_get (blist, i)));
if (G_UNLIKELY (res != GST_FLOW_OK)) {
break;
}
}
}

if (!update_ok) {
return GST_FLOW_ERROR;
error_list:
gst_clear_buffer_list (&blist);
} else {
GstBuffer *buf = obj;
if (G_UNLIKELY (gst_rtp_base_depayload_set_headers (filter, buf))) {
if (!gst_rtp_base_depayload_set_src_caps_from_hdrext (filter)) {
res = GST_FLOW_ERROR;
goto error_buffer;
}
}

filter->priv->needs_src_caps_update = FALSE;
}
res = gst_rtp_base_depayload_finish_push (filter, FALSE, buf);
buf = NULL;

/* if this is the first buffer send a NEWSEGMENT */
if (G_UNLIKELY (filter->priv->segment_event)) {
gst_pad_push_event (filter->srcpad, filter->priv->segment_event);
filter->priv->segment_event = NULL;
GST_DEBUG_OBJECT (filter, "Pushed newsegment event on this first buffer");
error_buffer:
gst_clear_buffer (&buf);
}

return GST_FLOW_OK;
return res;
}

/**
Expand All @@ -1381,12 +1445,7 @@ gst_rtp_base_depayload_push (GstRTPBaseDepayload * filter, GstBuffer * out_buf)
{
GstFlowReturn res;

res = gst_rtp_base_depayload_prepare_push (filter, FALSE, &out_buf);

if (G_LIKELY (res == GST_FLOW_OK))
res = gst_pad_push (filter->srcpad, out_buf);
else
gst_buffer_unref (out_buf);
res = gst_rtp_base_depayload_do_push (filter, FALSE, out_buf);

if (res != GST_FLOW_OK)
filter->priv->process_flow_ret = res;
Expand All @@ -1410,12 +1469,7 @@ gst_rtp_base_depayload_push_list (GstRTPBaseDepayload * filter,
{
GstFlowReturn res;

res = gst_rtp_base_depayload_prepare_push (filter, TRUE, &out_list);

if (G_LIKELY (res == GST_FLOW_OK))
res = gst_pad_push_list (filter->srcpad, out_list);
else
gst_buffer_list_unref (out_list);
res = gst_rtp_base_depayload_do_push (filter, TRUE, out_list);

if (res != GST_FLOW_OK)
filter->priv->process_flow_ret = res;
Expand Down

0 comments on commit 1a87a65

Please sign in to comment.