⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 queueing.c

📁 开源备份软件源码 AMANDA, the Advanced Maryland Automatic Network Disk Archiver, is a backup system that a
💻 C
📖 第 1 页 / 共 2 页
字号:
/* * Copyright (c) 2005 Zmanda, Inc.  All Rights Reserved. *  * This library is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License version 2.1 as  * published by the Free Software Foundation. *  * This library is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public * License for more details. *  * You should have received a copy of the GNU Lesser General Public License * along with this library; if not, write to the Free Software Foundation, * Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA. *  * Contact information: Zmanda Inc., 505 N Mathlida Ave, Suite 120 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com */#include "queueing.h"#include "device.h"#include "semaphore.h"#include "amanda.h"/* Queueing framework here. */typedef struct {    guint block_size;    ProducerFunctor producer;    gpointer producer_user_data;    ConsumerFunctor consumer;    gpointer consumer_user_data;    GAsyncQueue *data_queue, *free_queue;    semaphore_t *free_memory;    StreamingRequirement streaming_mode;} queue_data_t;static queue_buffer_t *invent_buffer(void) {    queue_buffer_t *rval;    rval = malloc(sizeof(*rval));    rval->data = NULL;    rval->alloc_size = 0;    rval->data_size = 0;    rval->offset = 0;    return rval;}void free_buffer(queue_buffer_t *buf) {    if (buf != NULL)        amfree(buf->data);    amfree(buf);}static queue_buffer_t * merge_buffers(queue_buffer_t *buf1,                                      queue_buffer_t *buf2) {    if (buf1 == NULL)        return buf2;    else if (buf2 == NULL)        return buf1;    if (buf2->offset >= buf1->data_size) {        /* We can fit buf1 at the beginning of buf2. */        memcpy(buf2->data + buf2->offset - buf1->data_size,               buf1->data + buf1->offset,               buf1->data_size);        buf2->offset -= buf1->data_size;        buf2->data_size += buf1->data_size;        free_buffer(buf1);        return buf2;    } else if (buf1->alloc_size - buf1->offset - buf1->data_size               >= buf2->data_size) {        /* We can fit buf2 at the end of buf1. */        memcpy(buf1->data + buf1->offset + buf1->data_size,               buf2->data + buf2->offset, buf2->data_size);        buf1->data_size += buf2->data_size;        free_buffer(buf2);        return buf1;    } else {        /* We can grow buf1 and put everything there. */        if (buf1->offset != 0) {            /* But first we have to fix up buf1. */            memmove(buf1->data, buf1->data + buf1->offset, buf1->data_size);            buf1->offset = 0;        }        buf1->alloc_size = buf1->data_size + buf2->data_size;        buf1->data = realloc(buf1->data, buf1->alloc_size);        memcpy(buf1->data + buf1->data_size, buf2->data + buf2->offset,               buf2->data_size);        buf1->data_size = buf1->alloc_size;        free_buffer(buf2);        return buf1;    }}/* Invalidate the first "bytes" bytes of the buffer, by adjusting the   offset and data size. */static void consume_buffer(queue_buffer_t* buf, int bytes) {    buf->offset += bytes;    buf->data_size -= bytes;}/* Looks at the buffer to see how much free space it has. If it has more than * twice the data size of unused space at the end, or more than four times * the data size of unused space at the beginning, then that space is * reclaimed. */static void heatshrink_buffer(queue_buffer_t *buf) {    if (buf == NULL)        return;    if (G_UNLIKELY(buf->data_size * 4 > buf->offset)) {        /* Consolodate with memmove. We will reclaim the space in the next         * step. */        memmove(buf->data, buf->data + buf->offset, buf->data_size);        buf->offset = 0;    }     if (buf->alloc_size > buf->data_size*2 + buf->offset) {        buf->alloc_size = buf->data_size + buf->offset;        buf->data = realloc(buf->data, buf->alloc_size);    }}static gpointer do_producer_thread(gpointer datap) {    queue_data_t* data = datap;    for (;;) {        queue_buffer_t *buf;        gboolean result;        semaphore_decrement(data->free_memory, 0);        buf = g_async_queue_try_pop(data->free_queue);        if (buf != NULL && buf->data == NULL) {            /* Consumer is finished, then so are we. */            amfree(buf);            return GINT_TO_POINTER(TRUE);        }        if (buf == NULL) {            buf = invent_buffer();        }        buf->offset = 0;        buf->data_size = 0;        result = data->producer(data->producer_user_data, buf,                                data->block_size);        // Producers can allocate way too much memory.        heatshrink_buffer(buf);        if (buf->data_size > 0) {            semaphore_force_adjust(data->free_memory, -buf->alloc_size);                        g_async_queue_push(data->data_queue, buf);            buf = NULL;        } else {            g_assert(result != PRODUCER_MORE);            free_buffer(buf);            buf = NULL;        }        if (result == PRODUCER_MORE) {            continue;        } else {            /* We are finished (and the first to do so). */            g_async_queue_push(data->data_queue, invent_buffer());            semaphore_force_set(data->free_memory, INT_MIN);            return GINT_TO_POINTER(result == PRODUCER_FINISHED);        }    }}static gpointer do_consumer_thread(gpointer datap) {    queue_data_t* data = datap;    gboolean finished = FALSE;    queue_buffer_t *buf = NULL;    if (data->streaming_mode != STREAMING_REQUIREMENT_NONE) {        semaphore_wait_empty(data->free_memory);    }    for (;;) {        gboolean result;        if (finished) {            return GINT_TO_POINTER(TRUE);        }        while (buf == NULL || buf->data_size < data->block_size) {            queue_buffer_t *next_buf;            if (data->streaming_mode == STREAMING_REQUIREMENT_DESIRED) {                do {                    next_buf = g_async_queue_try_pop(data->data_queue);                    if (next_buf == NULL) {                        semaphore_wait_empty(data->free_memory);                    }                } while (next_buf == NULL);            } else {                next_buf = g_async_queue_pop(data->data_queue);                g_assert(next_buf != NULL);            }            if (next_buf->data == NULL) {                /* Producer is finished, then so are we.*/                free_buffer(next_buf);                if (buf != NULL) {                    /* But we can't quit yet, we have a buffer to flush.*/                    finished = TRUE;                    break;                } else {                    /* We are so outta here. */                    return GINT_TO_POINTER(TRUE);                }                        }            semaphore_increment(data->free_memory, next_buf->alloc_size);                        buf = merge_buffers(buf, next_buf);        }        result = data->consumer(data->consumer_user_data, buf);        if (result > 0) {            consume_buffer(buf, result);            if (buf->data_size == 0) {                g_async_queue_push(data->free_queue, buf);                buf = NULL;            }            continue;        } else {            free_buffer(buf);            return GINT_TO_POINTER(FALSE);        }    }}/* Empties a buffer queue and frees all the buffers associated with it. * * If full_cleanup is TRUE, then we delete the queue itself. * If full_cleanup is FALSE, then we leave the queue around, with a *         signal element in it. */static void cleanup_buffer_queue(GAsyncQueue *Q, gboolean full_cleanup) {    g_async_queue_lock(Q);    for (;;) {        queue_buffer_t *buftmp;        buftmp = g_async_queue_try_pop_unlocked(Q);        if (buftmp == NULL)            break;        free_buffer(buftmp);    }    if (!full_cleanup)        g_async_queue_push_unlocked(Q, invent_buffer());    g_async_queue_unlock(Q);        if (full_cleanup)        g_async_queue_unref(Q);}/* This function sacrifices performance, but will still work just   fine, on systems where threads are not supported. */static queue_result_flagsdo_unthreaded_consumer_producer_queue(guint block_size,                                      ProducerFunctor producer,                                      gpointer producer_user_data,                                      ConsumerFunctor consumer,                                      gpointer consumer_user_data) {    queue_buffer_t *buf = NULL, *next_buf = NULL;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -