⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 queueing.c

📁 开源备份软件源码 AMANDA, the Advanced Maryland Automatic Network Disk Archiver, is a backup system that a
💻 C
📖 第 1 页 / 共 2 页
字号:
    gboolean finished = FALSE;    queue_result_flags rval = 0;    /* The basic theory of operation here is to read until we have       enough data to write, then write until we don't.. */    while (!finished) {        int result;                while ((buf == NULL || buf->data_size < block_size) && !finished) {            if (next_buf == NULL)                next_buf = invent_buffer();            result = producer(producer_user_data, next_buf, block_size);            if (result != PRODUCER_MORE) {                finished = TRUE;                if (result != PRODUCER_FINISHED) {                    rval |= QUEUE_PRODUCER_ERROR;                }            }            buf = merge_buffers(buf, next_buf);            next_buf = NULL;        }        while (buf != NULL && buf->data_size > 0 &&               (buf->data_size >= block_size || finished)) {            result = consumer(consumer_user_data, buf);                        if (result > 0) {                consume_buffer(buf, result);                if (buf->data_size == 0) {                    next_buf = buf;                    buf = NULL;                }            } else {                finished = TRUE;                rval |= QUEUE_CONSUMER_ERROR;                break;            }        }    }    free_buffer(buf);    free_buffer(next_buf);    return rval;}gboolean do_consumer_producer_queue(ProducerFunctor producer,                                    gpointer producer_user_data,                                    ConsumerFunctor consumer,                                    gpointer consumer_user_data) {    return QUEUE_SUCCESS ==        do_consumer_producer_queue_full(producer, producer_user_data,                                        consumer, consumer_user_data,                                        0, DEFAULT_MAX_BUFFER_MEMORY,                                        STREAMING_REQUIREMENT_NONE);}queue_result_flagsdo_consumer_producer_queue_full(ProducerFunctor producer,                                gpointer producer_user_data,                                ConsumerFunctor consumer,                                gpointer consumer_user_data,                                int block_size,                                size_t max_memory,                                StreamingRequirement streaming_mode) {    GThread     * producer_thread;    GThread     * consumer_thread;    queue_data_t  queue_data;    gpointer      producer_result;    gpointer      consumer_result;    queue_result_flags rval;    if (block_size <= 0) {        block_size = DISK_BLOCK_BYTES;    }    g_return_val_if_fail(producer != NULL, FALSE);    g_return_val_if_fail(consumer != NULL, FALSE);    if (!g_thread_supported()) {        return do_unthreaded_consumer_producer_queue(block_size, producer,                                                     producer_user_data,                                                     consumer,                                                     consumer_user_data);    }    queue_data.block_size = block_size;    queue_data.producer = producer;    queue_data.producer_user_data = producer_user_data;    queue_data.consumer = consumer;    queue_data.consumer_user_data = consumer_user_data;    queue_data.streaming_mode = streaming_mode;    queue_data.data_queue = g_async_queue_new();    queue_data.free_queue = g_async_queue_new();    max_memory = MAX(1,MIN(max_memory, INT_MAX / 2));    queue_data.free_memory = semaphore_new_with_value(max_memory);    producer_thread = g_thread_create(do_producer_thread, &queue_data,                                      TRUE,                                      NULL /* FIXME: Should handle                                              errors. */);    consumer_thread = g_thread_create(do_consumer_thread, &queue_data,                                      TRUE,                                      NULL /* FIXME: Should handle                                              errors. */);        /* The order of cleanup here is very important, to avoid deadlock. */    /* 1) Reap the consumer. */    consumer_result = g_thread_join(consumer_thread);    /* 2) Stop the producer. */    semaphore_force_set(queue_data.free_memory, -1);    /* 3) Cleanup the free queue; add a signal flag. */    cleanup_buffer_queue(queue_data.free_queue, FALSE);    /* 4) Restart the producer (so it can exit). */    semaphore_force_set(queue_data.free_memory, INT_MAX);    /* 5) Reap the producer. */    producer_result = g_thread_join(producer_thread);    cleanup_buffer_queue(queue_data.free_queue, TRUE);    cleanup_buffer_queue(queue_data.data_queue, TRUE);    semaphore_free(queue_data.free_memory);        rval = 0;    if (!GPOINTER_TO_INT(producer_result)) {        rval |= QUEUE_PRODUCER_ERROR;    }    if (!GPOINTER_TO_INT(consumer_result)) {        rval |= QUEUE_CONSUMER_ERROR;    }    return rval;}/* Commonly-useful producers and consumers below. */producer_result_t device_read_producer(gpointer devicep,                                       queue_buffer_t *buffer,                                       int hint_size G_GNUC_UNUSED) {    Device* device;    device = (Device*) devicep;    g_assert(IS_DEVICE(device));    buffer->offset = 0;    for (;;) {        int result, read_size;        read_size = buffer->alloc_size;        result = device_read_block(device, buffer->data, &read_size);        if (result > 0) {            buffer->data_size = read_size;            return PRODUCER_MORE;        } else if (result == 0) {            buffer->data = realloc(buffer->data, read_size);            buffer->alloc_size = read_size;        } else if (device->is_eof) {            return PRODUCER_FINISHED;        } else {            buffer->data_size = 0;            return PRODUCER_ERROR;        }    }}int device_write_consumer(gpointer devicep, queue_buffer_t *buffer) {    Device* device;    unsigned int write_size;    device = (Device*) devicep;    g_assert(IS_DEVICE(device));    write_size = MIN(buffer->data_size,                     device_write_max_size(device));    if (device_write_block(device, write_size,                           buffer->data + buffer->offset,                           buffer->data_size <                               device_write_min_size(device))) {        /* Success! */        return write_size;    } else {        /* Nope, really an error. */        return -1;    }}producer_result_t fd_read_producer(gpointer fdp, queue_buffer_t *buffer,                                   int hint_size) {    int fd;    fd = GPOINTER_TO_INT(fdp);    g_assert(fd >= 0);    g_assert(buffer->data_size == 0);    buffer->offset = 0;    if (buffer->data == NULL) {        /* Set up the buffer. */        buffer->data = malloc(hint_size);        buffer->alloc_size = hint_size;    }    for (;;) {        int result;        result = read(fd, buffer->data, buffer->alloc_size);        if (result > 0) {            buffer->data_size = result;            return PRODUCER_MORE;        } else if (result == 0) {            /* End of file. */            return PRODUCER_FINISHED;        } else if (0#ifdef EAGAIN                || errno == EAGAIN#endif#ifdef EWOULDBLOCK                || errno == EWOULDBLOCK#endif#ifdef EINTR                || errno == EINTR#endif                ) {                /* Try again. */                continue;        } else {            /* Error occured. */            g_fprintf(stderr, "Error reading fd %d: %s\n", fd, strerror(errno));            return PRODUCER_ERROR;        }    }}int fd_write_consumer(gpointer fdp, queue_buffer_t *buffer) {    int fd;    fd = GPOINTER_TO_INT(fdp);    g_assert(fd >= 0);    g_return_val_if_fail(buffer->data_size > 0, 0);    for (;;) {        int write_size;        write_size = write(fd, buffer->data + buffer->offset,                           buffer->data_size);                if (write_size > 0) {            return write_size;        } else if (0#ifdef EAGAIN                || errno == EAGAIN#endif#ifdef EWOULDBLOCK                || errno == EWOULDBLOCK#endif#ifdef EINTR                || errno == EINTR#endif                ) {                /* Try again. */                continue;        } else {            /* Error occured. */            g_fprintf(stderr, "Error writing fd %d: %s\n", fd, strerror(errno));            return -1;        }            }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -