📄 gstcollectpads.c
字号:
* Flush @size bytes from the pad @data. * * This function should be called with @pads LOCK held, such as * in the callback. * * Returns: The number of bytes flushed This can be less than @size and * is 0 if the pad was end-of-stream. * * MT safe. */guintgst_collect_pads_flush (GstCollectPads * pads, GstCollectData * data, guint size){ guint flushsize; GstBuffer *buffer; g_return_val_if_fail (pads != NULL, 0); g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0); g_return_val_if_fail (data != NULL, 0); /* no buffer, must be EOS */ if ((buffer = data->buffer) == NULL) return 0; /* this is what we can flush at max */ flushsize = MIN (size, GST_BUFFER_SIZE (buffer) - data->pos); data->pos += size; if (data->pos >= GST_BUFFER_SIZE (buffer)) /* _clear will also reset data->pos to 0 */ gst_collect_pads_clear (pads, data); return flushsize;}/* see if pads were added or removed and update our stats. Any pad * added after releasing the PAD_LOCK will get collected in the next * round. * * We can do a quick check by checking the cookies, that get changed * whenever the pad list is updated. * * Must be called with LOCK. */static voidgst_collect_pads_check_pads (GstCollectPads * pads){ /* the master list and cookie are protected with the PAD_LOCK */ GST_COLLECT_PADS_PAD_LOCK (pads); if (G_UNLIKELY (pads->abidata.ABI.pad_cookie != pads->cookie)) { GSList *collected; /* clear list and stats */ g_slist_foreach (pads->data, (GFunc) unref_data, NULL); g_slist_free (pads->data); pads->data = NULL; pads->numpads = 0; pads->queuedpads = 0; pads->eospads = 0; /* loop over the master pad list */ collected = pads->abidata.ABI.pad_list; for (; collected; collected = g_slist_next (collected)) { GstCollectData *data; /* update the stats */ pads->numpads++; data = collected->data; if (data->buffer) pads->queuedpads++; if (data->abidata.ABI.eos) pads->eospads++; /* add to the list of pads to collect */ ref_data (data); pads->data = g_slist_prepend (pads->data, data); } /* and update the cookie */ pads->cookie = pads->abidata.ABI.pad_cookie; } GST_COLLECT_PADS_PAD_UNLOCK (pads);}/* checks if all the pads are collected and call the collectfunction * * Should be called with LOCK. * * Returns: The #GstFlowReturn of collection. */static GstFlowReturngst_collect_pads_check_collected (GstCollectPads * pads){ GstFlowReturn flow_ret = GST_FLOW_OK; g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR); g_return_val_if_fail (pads->func != NULL, GST_FLOW_NOT_SUPPORTED); /* check for new pads, update stats etc.. */ gst_collect_pads_check_pads (pads); if (G_UNLIKELY (pads->eospads == pads->numpads)) { /* If all our pads are EOS just collect once to let the element * do its final EOS handling. */ GST_DEBUG ("All active pads (%d) are EOS, calling %s", pads->numpads, GST_DEBUG_FUNCPTR_NAME (pads->func)); flow_ret = pads->func (pads, pads->user_data); } else { gboolean collected = FALSE; /* We call the collected function as long as our condition matches. * FIXME: should we error out if the collect function did not pop anything ? * we can get a busy loop here if the element does not pop from the collect * function */ while (((pads->queuedpads + pads->eospads) >= pads->numpads)) { GST_DEBUG ("All active pads (%d + %d >= %d) have data, calling %s", pads->queuedpads, pads->eospads, pads->numpads, GST_DEBUG_FUNCPTR_NAME (pads->func)); flow_ret = pads->func (pads, pads->user_data); collected = TRUE; /* break on error */ if (flow_ret != GST_FLOW_OK) break; /* Don't keep looping after telling the element EOS or flushing */ if (pads->queuedpads == 0) break; } if (!collected) GST_DEBUG ("Not all active pads (%d) have data, continuing", pads->numpads); } return flow_ret;}static gbooleangst_collect_pads_event (GstPad * pad, GstEvent * event){ gboolean res; GstCollectData *data; GstCollectPads *pads; /* some magic to get the managing collect_pads */ GST_OBJECT_LOCK (pad); data = (GstCollectData *) gst_pad_get_element_private (pad); if (G_UNLIKELY (data == NULL)) goto pad_removed; ref_data (data); GST_OBJECT_UNLOCK (pad); res = TRUE; pads = data->collect; GST_DEBUG ("Got %s event on pad %s:%s", GST_EVENT_TYPE_NAME (event), GST_DEBUG_PAD_NAME (data->pad)); switch (GST_EVENT_TYPE (event)) { case GST_EVENT_FLUSH_START: { /* forward event to unblock check_collected */ gst_pad_event_default (pad, event); /* now unblock the chain function. * no cond per pad, so they all unblock, * non-flushing block again */ GST_OBJECT_LOCK (pads); data->abidata.ABI.flushing = TRUE; gst_collect_pads_clear (pads, data); GST_OBJECT_UNLOCK (pads); /* event already cleaned up by forwarding */ goto done; } case GST_EVENT_FLUSH_STOP: { /* flush the 1 buffer queue */ GST_OBJECT_LOCK (pads); data->abidata.ABI.flushing = FALSE; gst_collect_pads_clear (pads, data); /* we need new segment info after the flush */ gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED); data->abidata.ABI.new_segment = FALSE; /* if the pad was EOS, remove the EOS flag and * decrement the number of eospads */ if (G_UNLIKELY (data->abidata.ABI.eos == TRUE)) { pads->eospads--; data->abidata.ABI.eos = FALSE; } GST_OBJECT_UNLOCK (pads); /* forward event */ goto forward; } case GST_EVENT_EOS: { GST_OBJECT_LOCK (pads); /* if the pad was not EOS, make it EOS and so we * have one more eospad */ if (G_LIKELY (data->abidata.ABI.eos == FALSE)) { data->abidata.ABI.eos = TRUE; pads->eospads++; } /* check if we need collecting anything, we ignore the * result. */ gst_collect_pads_check_collected (pads); GST_OBJECT_UNLOCK (pads); /* We eat this event, element should do something * in the collected callback. */ gst_event_unref (event); goto done; } case GST_EVENT_NEWSEGMENT: { gint64 start, stop, time; gdouble rate, arate; GstFormat format; gboolean update; gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format, &start, &stop, &time); GST_DEBUG_OBJECT (data->pad, "got newsegment, start %" GST_TIME_FORMAT ", stop %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (stop)); gst_segment_set_newsegment_full (&data->segment, update, rate, arate, format, start, stop, time); data->abidata.ABI.new_segment = TRUE; /* we must not forward this event since multiple segments will be * accumulated and this is certainly not what we want. */ gst_event_unref (event); /* FIXME: collect-pads based elements need to create their own newsegment * event (and only one really) * (a) make the segment part of the GstCollectData structure of each pad, * so you can just check that once you have a buffer queued on that pad, * (b) you can override a pad's event function with your own, * catch the newsegment event and then pass it on to the original * gstcollectpads event function * (that's what avimux does for something IIRC) * see #340060 */ goto done; } default: /* forward other events */ goto forward; }forward: res = gst_pad_event_default (pad, event);done: unref_data (data); return res; /* ERRORS */pad_removed: { GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad)); GST_OBJECT_UNLOCK (pad); return FALSE; }}/* For each buffer we receive we check if our collected condition is reached * and if so we call the collected function. When this is done we check if * data has been unqueued. If data is still queued we wait holding the stream * lock to make sure no EOS event can happen while we are ready to be * collected */static GstFlowReturngst_collect_pads_chain (GstPad * pad, GstBuffer * buffer){ GstCollectData *data; GstCollectPads *pads; guint64 size; GstFlowReturn ret; GstBuffer **buffer_p; GST_DEBUG ("Got buffer for pad %s:%s", GST_DEBUG_PAD_NAME (pad)); /* some magic to get the managing collect_pads */ GST_OBJECT_LOCK (pad); data = (GstCollectData *) gst_pad_get_element_private (pad); if (G_UNLIKELY (data == NULL)) goto no_data; ref_data (data); GST_OBJECT_UNLOCK (pad); pads = data->collect; size = GST_BUFFER_SIZE (buffer); GST_OBJECT_LOCK (pads); /* if not started, bail out */ if (G_UNLIKELY (!pads->started)) goto not_started; /* check if this pad is flushing */ if (G_UNLIKELY (data->abidata.ABI.flushing)) goto flushing; /* pad was EOS, we can refuse this data */ if (G_UNLIKELY (data->abidata.ABI.eos)) goto unexpected; GST_DEBUG ("Queuing buffer %p for pad %s:%s", buffer, GST_DEBUG_PAD_NAME (pad)); /* One more pad has data queued */ pads->queuedpads++; buffer_p = &data->buffer; gst_buffer_replace (buffer_p, buffer); /* update segment last position if in TIME */ if (G_LIKELY (data->segment.format == GST_FORMAT_TIME)) { GstClockTime timestamp = GST_BUFFER_TIMESTAMP (buffer); if (GST_CLOCK_TIME_IS_VALID (timestamp)) gst_segment_set_last_stop (&data->segment, GST_FORMAT_TIME, timestamp); } /* While we have data queued on this pad try to collect stuff */ do { /* Check if our collected condition is matched and call the collected function * if it is */ ret = gst_collect_pads_check_collected (pads); /* when an error occurs, we want to report this back to the caller ASAP * without having to block if the buffer was not popped */ if (G_UNLIKELY (ret != GST_FLOW_OK)) goto error; /* data was consumed, we can exit and accept new data */ if (data->buffer == NULL) break; /* Check if we got removed in the mean time, FIXME, this is racy. * Between this check and the _WAIT, the pad could be removed which will * makes us hang in the _WAIT. */ GST_OBJECT_LOCK (pad); if (G_UNLIKELY (gst_pad_get_element_private (pad) == NULL)) goto pad_removed; GST_OBJECT_UNLOCK (pad); GST_DEBUG ("Pad %s:%s has a buffer queued, waiting", GST_DEBUG_PAD_NAME (pad)); /* wait to be collected, this must happen from another thread triggered * by the _chain function of another pad. We release the lock so we * can get stopped or flushed as well. We can however not get EOS * because we still hold the STREAM_LOCK. */ GST_COLLECT_PADS_WAIT (pads); GST_DEBUG ("Pad %s:%s resuming", GST_DEBUG_PAD_NAME (pad)); /* after a signal, we could be stopped */ if (G_UNLIKELY (!pads->started)) goto not_started; /* check if this pad is flushing */ if (G_UNLIKELY (data->abidata.ABI.flushing)) goto flushing; } while (data->buffer != NULL);unlock_done: GST_OBJECT_UNLOCK (pads); unref_data (data); gst_buffer_unref (buffer); return ret;pad_removed: { GST_WARNING ("%s got removed from collectpads", GST_OBJECT_NAME (pad)); GST_OBJECT_UNLOCK (pad); ret = GST_FLOW_NOT_LINKED; goto unlock_done; } /* ERRORS */no_data: { GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad)); GST_OBJECT_UNLOCK (pad); gst_buffer_unref (buffer); return GST_FLOW_NOT_LINKED; }not_started: { GST_DEBUG ("not started"); gst_collect_pads_clear (pads, data); ret = GST_FLOW_WRONG_STATE; goto unlock_done; }flushing: { GST_DEBUG ("pad %s:%s is flushing", GST_DEBUG_PAD_NAME (pad)); gst_collect_pads_clear (pads, data); ret = GST_FLOW_WRONG_STATE; goto unlock_done; }unexpected: { /* we should not post an error for this, just inform upstream that * we don't expect anything anymore */ GST_DEBUG ("pad %s:%s is eos", GST_DEBUG_PAD_NAME (pad)); ret = GST_FLOW_UNEXPECTED; goto unlock_done; }error: { /* we print the error, the element should post a reasonable error * message for fatal errors */ GST_DEBUG ("collect failed, reason %d (%s)", ret, gst_flow_get_name (ret)); gst_collect_pads_clear (pads, data); goto unlock_done; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -