📄 gstdataqueue.c
字号:
/* GStreamer * Copyright (C) 2006 Edward Hervey <edward@fluendo.com> * * gstdataqueue.c: * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the * Free Software Foundation, Inc., 59 Temple Place - Suite 330, * Boston, MA 02111-1307, USA. *//** * SECTION:gstdataqueue * @short_description: Threadsafe queueing object * * #GstDataQueue is an object that handles threadsafe queueing of objects. It * also provides size-related functionality. This object should be used for * any #GstElement that wishes to provide some sort of queueing functionality. */#include <gst/gst.h>#include "gstdataqueue.h"GST_DEBUG_CATEGORY_STATIC (data_queue_debug);#define GST_CAT_DEFAULT (data_queue_debug)GST_DEBUG_CATEGORY_STATIC (data_queue_dataflow);/* Queue signals and args */enum{ SIGNAL_EMPTY, SIGNAL_FULL, LAST_SIGNAL};enum{ ARG_0, ARG_CUR_LEVEL_VISIBLE, ARG_CUR_LEVEL_BYTES, ARG_CUR_LEVEL_TIME /* FILL ME */};#define GST_DATA_QUEUE_MUTEX_LOCK(q) G_STMT_START { \ GST_CAT_LOG (data_queue_dataflow, \ "locking qlock from thread %p", \ g_thread_self ()); \ g_mutex_lock (q->qlock); \ GST_CAT_LOG (data_queue_dataflow, \ "locked qlock from thread %p", \ g_thread_self ()); \} G_STMT_END#define GST_DATA_QUEUE_MUTEX_LOCK_CHECK(q, label) G_STMT_START { \ GST_DATA_QUEUE_MUTEX_LOCK (q); \ if (q->flushing) \ goto label; \ } G_STMT_END#define GST_DATA_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \ GST_CAT_LOG (data_queue_dataflow, \ "unlocking qlock from thread %p", \ g_thread_self ()); \ g_mutex_unlock (q->qlock); \} G_STMT_END#define STATUS(q, msg) \ GST_CAT_LOG (data_queue_dataflow, \ "queue:%p " msg ": %u visible items, %u " \ "bytes, %"G_GUINT64_FORMAT \ " ns, %u elements", \ queue, \ q->cur_level.visible, \ q->cur_level.bytes, \ q->cur_level.time, \ q->queue->length)static void gst_data_queue_base_init (GstDataQueueClass * klass);static void gst_data_queue_class_init (GstDataQueueClass * klass);static void gst_data_queue_init (GstDataQueue * queue);static void gst_data_queue_finalize (GObject * object);static void gst_data_queue_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec);static void gst_data_queue_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec);static GObjectClass *parent_class = NULL;static guint gst_data_queue_signals[LAST_SIGNAL] = { 0 };GTypegst_data_queue_get_type (void){ static GType queue_type = 0; if (!queue_type) { static const GTypeInfo queue_info = { sizeof (GstDataQueueClass), (GBaseInitFunc) gst_data_queue_base_init, NULL, (GClassInitFunc) gst_data_queue_class_init, NULL, NULL, sizeof (GstDataQueue), 0, (GInstanceInitFunc) gst_data_queue_init, NULL }; queue_type = g_type_register_static (G_TYPE_OBJECT, "GstDataQueue", &queue_info, 0); GST_DEBUG_CATEGORY_INIT (data_queue_debug, "dataqueue", 0, "data queue object"); GST_DEBUG_CATEGORY_INIT (data_queue_dataflow, "data_queue_dataflow", 0, "dataflow inside the data queue object"); } return queue_type;}static voidgst_data_queue_base_init (GstDataQueueClass * klass){ /* Do we need anything here ?? */ return;}static voidgst_data_queue_class_init (GstDataQueueClass * klass){ GObjectClass *gobject_class = G_OBJECT_CLASS (klass); parent_class = g_type_class_peek_parent (klass); gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_data_queue_set_property); gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_data_queue_get_property); /* signals */ /** * GstDataQueue::empty: * @queue: the queue instance * * Reports that the queue became empty (empty). * A queue is empty if the total amount of visible items inside it (num-visible, time, * size) is lower than the boundary values which can be set through the GObject * properties. */ gst_data_queue_signals[SIGNAL_EMPTY] = g_signal_new ("empty", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstDataQueueClass, empty), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); /** * GstDataQueue::full: * @queue: the queue instance * * Reports that the queue became full (full). * A queue is full if the total amount of data inside it (num-visible, time, * size) is higher than the boundary values which can be set through the GObject * properties. */ gst_data_queue_signals[SIGNAL_FULL] = g_signal_new ("full", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstDataQueueClass, full), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); /* properties */ g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES, g_param_spec_uint ("current-level-bytes", "Current level (kB)", "Current amount of data in the queue (bytes)", 0, G_MAXUINT, 0, G_PARAM_READABLE)); g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_VISIBLE, g_param_spec_uint ("current-level-visible", "Current level (visible items)", "Current number of visible items in the queue", 0, G_MAXUINT, 0, G_PARAM_READABLE)); g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME, g_param_spec_uint64 ("current-level-time", "Current level (ns)", "Current amount of data in the queue (in ns)", 0, G_MAXUINT64, 0, G_PARAM_READABLE)); /* set several parent class virtual functions */ gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_data_queue_finalize);}static voidgst_data_queue_init (GstDataQueue * queue){ queue->cur_level.visible = 0; /* no content */ queue->cur_level.bytes = 0; /* no content */ queue->cur_level.time = 0; /* no content */ queue->checkfull = NULL; queue->qlock = g_mutex_new (); queue->item_add = g_cond_new (); queue->item_del = g_cond_new (); queue->queue = g_queue_new (); GST_DEBUG ("initialized queue's not_empty & not_full conditions");}/** * gst_data_queue_new: * @checkfull: the callback used to tell if the element considers the queue full * or not. * @checkdata: a #gpointer that will be given in the @checkfull callback. * * Returns: a new #GstDataQueue. */GstDataQueue *gst_data_queue_new (GstDataQueueCheckFullFunction checkfull, gpointer checkdata){ GstDataQueue *ret; g_return_val_if_fail (checkfull != NULL, NULL); ret = g_object_new (GST_TYPE_DATA_QUEUE, NULL); ret->checkfull = checkfull; ret->checkdata = checkdata; return ret;}static voidgst_data_queue_cleanup (GstDataQueue * queue){ while (!g_queue_is_empty (queue->queue)) { GstDataQueueItem *item = g_queue_pop_head (queue->queue); /* Just call the destroy notify on the item */ item->destroy (item); } queue->cur_level.visible = 0; queue->cur_level.bytes = 0; queue->cur_level.time = 0;}/* called only once, as opposed to dispose */static voidgst_data_queue_finalize (GObject * object){ GstDataQueue *queue = GST_DATA_QUEUE (object); GST_DEBUG ("finalizing queue"); gst_data_queue_cleanup (queue); g_queue_free (queue->queue); GST_DEBUG ("free mutex"); g_mutex_free (queue->qlock); GST_DEBUG ("done free mutex"); g_cond_free (queue->item_add); g_cond_free (queue->item_del); G_OBJECT_CLASS (parent_class)->finalize (object);}static voidgst_data_queue_locked_flush (GstDataQueue * queue){ STATUS (queue, "before flushing"); gst_data_queue_cleanup (queue); STATUS (queue, "after flushing"); /* we deleted something... */ g_cond_signal (queue->item_del);}static gbooleangst_data_queue_locked_is_empty (GstDataQueue * queue){ return (queue->queue->length == 0);}static gbooleangst_data_queue_locked_is_full (GstDataQueue * queue){ return queue->checkfull (queue, queue->cur_level.visible, queue->cur_level.bytes, queue->cur_level.time, queue->checkdata);}/** * gst_data_queue_flush: * @queue: a #GstDataQueue. * * Flushes all the contents of the @queue. Any call to #gst_data_queue_pull and * #gst_data_queue_pop will be released. * MT safe. */voidgst_data_queue_flush (GstDataQueue * queue){ GST_DEBUG ("queue:%p", queue); GST_DATA_QUEUE_MUTEX_LOCK (queue); gst_data_queue_locked_flush (queue); GST_DATA_QUEUE_MUTEX_UNLOCK (queue);}/** * gst_data_queue_is_empty:
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -