📄 queueing.c
字号:
/* * Copyright (c) 2005 Zmanda, Inc. All Rights Reserved. * * This library is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License version 2.1 as * published by the Free Software Foundation. * * This library is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public * License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with this library; if not, write to the Free Software Foundation, * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. * * Contact information: Zmanda Inc., 505 N Mathlida Ave, Suite 120 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com */#include "queueing.h"#include "device.h"#include "semaphore.h"#include "amanda.h"/* Queueing framework here. */typedef struct { guint block_size; ProducerFunctor producer; gpointer producer_user_data; ConsumerFunctor consumer; gpointer consumer_user_data; GAsyncQueue *data_queue, *free_queue; semaphore_t *free_memory; StreamingRequirement streaming_mode;} queue_data_t;static queue_buffer_t *invent_buffer(void) { queue_buffer_t *rval; rval = malloc(sizeof(*rval)); rval->data = NULL; rval->alloc_size = 0; rval->data_size = 0; rval->offset = 0; return rval;}void free_buffer(queue_buffer_t *buf) { if (buf != NULL) amfree(buf->data); amfree(buf);}static queue_buffer_t * merge_buffers(queue_buffer_t *buf1, queue_buffer_t *buf2) { if (buf1 == NULL) return buf2; else if (buf2 == NULL) return buf1; if (buf2->offset >= buf1->data_size) { /* We can fit buf1 at the beginning of buf2. */ memcpy(buf2->data + buf2->offset - buf1->data_size, buf1->data + buf1->offset, buf1->data_size); buf2->offset -= buf1->data_size; buf2->data_size += buf1->data_size; free_buffer(buf1); return buf2; } else if (buf1->alloc_size - buf1->offset - buf1->data_size >= buf2->data_size) { /* We can fit buf2 at the end of buf1. */ memcpy(buf1->data + buf1->offset + buf1->data_size, buf2->data + buf2->offset, buf2->data_size); buf1->data_size += buf2->data_size; free_buffer(buf2); return buf1; } else { /* We can grow buf1 and put everything there. */ if (buf1->offset != 0) { /* But first we have to fix up buf1. */ memmove(buf1->data, buf1->data + buf1->offset, buf1->data_size); buf1->offset = 0; } buf1->alloc_size = buf1->data_size + buf2->data_size; buf1->data = realloc(buf1->data, buf1->alloc_size); memcpy(buf1->data + buf1->data_size, buf2->data + buf2->offset, buf2->data_size); buf1->data_size = buf1->alloc_size; free_buffer(buf2); return buf1; }}/* Invalidate the first "bytes" bytes of the buffer, by adjusting the offset and data size. */static void consume_buffer(queue_buffer_t* buf, int bytes) { buf->offset += bytes; buf->data_size -= bytes;}/* Looks at the buffer to see how much free space it has. If it has more than * twice the data size of unused space at the end, or more than four times * the data size of unused space at the beginning, then that space is * reclaimed. */static void heatshrink_buffer(queue_buffer_t *buf) { if (buf == NULL) return; if (G_UNLIKELY(buf->data_size * 4 > buf->offset)) { /* Consolodate with memmove. We will reclaim the space in the next * step. */ memmove(buf->data, buf->data + buf->offset, buf->data_size); buf->offset = 0; } if (buf->alloc_size > buf->data_size*2 + buf->offset) { buf->alloc_size = buf->data_size + buf->offset; buf->data = realloc(buf->data, buf->alloc_size); }}static gpointer do_producer_thread(gpointer datap) { queue_data_t* data = datap; for (;;) { queue_buffer_t *buf; gboolean result; semaphore_decrement(data->free_memory, 0); buf = g_async_queue_try_pop(data->free_queue); if (buf != NULL && buf->data == NULL) { /* Consumer is finished, then so are we. */ amfree(buf); return GINT_TO_POINTER(TRUE); } if (buf == NULL) { buf = invent_buffer(); } buf->offset = 0; buf->data_size = 0; result = data->producer(data->producer_user_data, buf, data->block_size); // Producers can allocate way too much memory. heatshrink_buffer(buf); if (buf->data_size > 0) { semaphore_force_adjust(data->free_memory, -buf->alloc_size); g_async_queue_push(data->data_queue, buf); buf = NULL; } else { g_assert(result != PRODUCER_MORE); free_buffer(buf); buf = NULL; } if (result == PRODUCER_MORE) { continue; } else { /* We are finished (and the first to do so). */ g_async_queue_push(data->data_queue, invent_buffer()); semaphore_force_set(data->free_memory, INT_MIN); return GINT_TO_POINTER(result == PRODUCER_FINISHED); } }}static gpointer do_consumer_thread(gpointer datap) { queue_data_t* data = datap; gboolean finished = FALSE; queue_buffer_t *buf = NULL; if (data->streaming_mode != STREAMING_REQUIREMENT_NONE) { semaphore_wait_empty(data->free_memory); } for (;;) { gboolean result; if (finished) { return GINT_TO_POINTER(TRUE); } while (buf == NULL || buf->data_size < data->block_size) { queue_buffer_t *next_buf; if (data->streaming_mode == STREAMING_REQUIREMENT_DESIRED) { do { next_buf = g_async_queue_try_pop(data->data_queue); if (next_buf == NULL) { semaphore_wait_empty(data->free_memory); } } while (next_buf == NULL); } else { next_buf = g_async_queue_pop(data->data_queue); g_assert(next_buf != NULL); } if (next_buf->data == NULL) { /* Producer is finished, then so are we.*/ free_buffer(next_buf); if (buf != NULL) { /* But we can't quit yet, we have a buffer to flush.*/ finished = TRUE; break; } else { /* We are so outta here. */ return GINT_TO_POINTER(TRUE); } } semaphore_increment(data->free_memory, next_buf->alloc_size); buf = merge_buffers(buf, next_buf); } result = data->consumer(data->consumer_user_data, buf); if (result > 0) { consume_buffer(buf, result); if (buf->data_size == 0) { g_async_queue_push(data->free_queue, buf); buf = NULL; } continue; } else { free_buffer(buf); return GINT_TO_POINTER(FALSE); } }}/* Empties a buffer queue and frees all the buffers associated with it. * * If full_cleanup is TRUE, then we delete the queue itself. * If full_cleanup is FALSE, then we leave the queue around, with a * signal element in it. */static void cleanup_buffer_queue(GAsyncQueue *Q, gboolean full_cleanup) { g_async_queue_lock(Q); for (;;) { queue_buffer_t *buftmp; buftmp = g_async_queue_try_pop_unlocked(Q); if (buftmp == NULL) break; free_buffer(buftmp); } if (!full_cleanup) g_async_queue_push_unlocked(Q, invent_buffer()); g_async_queue_unlock(Q); if (full_cleanup) g_async_queue_unref(Q);}/* This function sacrifices performance, but will still work just fine, on systems where threads are not supported. */static queue_result_flagsdo_unthreaded_consumer_producer_queue(guint block_size, ProducerFunctor producer, gpointer producer_user_data, ConsumerFunctor consumer, gpointer consumer_user_data) { queue_buffer_t *buf = NULL, *next_buf = NULL;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -