📄 queueing.c
字号:
gboolean finished = FALSE; queue_result_flags rval = 0; /* The basic theory of operation here is to read until we have enough data to write, then write until we don't.. */ while (!finished) { int result; while ((buf == NULL || buf->data_size < block_size) && !finished) { if (next_buf == NULL) next_buf = invent_buffer(); result = producer(producer_user_data, next_buf, block_size); if (result != PRODUCER_MORE) { finished = TRUE; if (result != PRODUCER_FINISHED) { rval |= QUEUE_PRODUCER_ERROR; } } buf = merge_buffers(buf, next_buf); next_buf = NULL; } while (buf != NULL && buf->data_size > 0 && (buf->data_size >= block_size || finished)) { result = consumer(consumer_user_data, buf); if (result > 0) { consume_buffer(buf, result); if (buf->data_size == 0) { next_buf = buf; buf = NULL; } } else { finished = TRUE; rval |= QUEUE_CONSUMER_ERROR; break; } } } free_buffer(buf); free_buffer(next_buf); return rval;}gboolean do_consumer_producer_queue(ProducerFunctor producer, gpointer producer_user_data, ConsumerFunctor consumer, gpointer consumer_user_data) { return QUEUE_SUCCESS == do_consumer_producer_queue_full(producer, producer_user_data, consumer, consumer_user_data, 0, DEFAULT_MAX_BUFFER_MEMORY, STREAMING_REQUIREMENT_NONE);}queue_result_flagsdo_consumer_producer_queue_full(ProducerFunctor producer, gpointer producer_user_data, ConsumerFunctor consumer, gpointer consumer_user_data, int block_size, size_t max_memory, StreamingRequirement streaming_mode) { GThread * producer_thread; GThread * consumer_thread; queue_data_t queue_data; gpointer producer_result; gpointer consumer_result; queue_result_flags rval; if (block_size <= 0) { block_size = DISK_BLOCK_BYTES; } g_return_val_if_fail(producer != NULL, FALSE); g_return_val_if_fail(consumer != NULL, FALSE); if (!g_thread_supported()) { return do_unthreaded_consumer_producer_queue(block_size, producer, producer_user_data, consumer, consumer_user_data); } queue_data.block_size = block_size; queue_data.producer = producer; queue_data.producer_user_data = producer_user_data; queue_data.consumer = consumer; queue_data.consumer_user_data = consumer_user_data; queue_data.streaming_mode = streaming_mode; queue_data.data_queue = g_async_queue_new(); queue_data.free_queue = g_async_queue_new(); max_memory = MAX(1,MIN(max_memory, INT_MAX / 2)); queue_data.free_memory = semaphore_new_with_value(max_memory); producer_thread = g_thread_create(do_producer_thread, &queue_data, TRUE, NULL /* FIXME: Should handle errors. */); consumer_thread = g_thread_create(do_consumer_thread, &queue_data, TRUE, NULL /* FIXME: Should handle errors. */); /* The order of cleanup here is very important, to avoid deadlock. */ /* 1) Reap the consumer. */ consumer_result = g_thread_join(consumer_thread); /* 2) Stop the producer. */ semaphore_force_set(queue_data.free_memory, -1); /* 3) Cleanup the free queue; add a signal flag. */ cleanup_buffer_queue(queue_data.free_queue, FALSE); /* 4) Restart the producer (so it can exit). */ semaphore_force_set(queue_data.free_memory, INT_MAX); /* 5) Reap the producer. */ producer_result = g_thread_join(producer_thread); cleanup_buffer_queue(queue_data.free_queue, TRUE); cleanup_buffer_queue(queue_data.data_queue, TRUE); semaphore_free(queue_data.free_memory); rval = 0; if (!GPOINTER_TO_INT(producer_result)) { rval |= QUEUE_PRODUCER_ERROR; } if (!GPOINTER_TO_INT(consumer_result)) { rval |= QUEUE_CONSUMER_ERROR; } return rval;}/* Commonly-useful producers and consumers below. */producer_result_t device_read_producer(gpointer devicep, queue_buffer_t *buffer, int hint_size G_GNUC_UNUSED) { Device* device; device = (Device*) devicep; g_assert(IS_DEVICE(device)); buffer->offset = 0; for (;;) { int result, read_size; read_size = buffer->alloc_size; result = device_read_block(device, buffer->data, &read_size); if (result > 0) { buffer->data_size = read_size; return PRODUCER_MORE; } else if (result == 0) { buffer->data = realloc(buffer->data, read_size); buffer->alloc_size = read_size; } else if (device->is_eof) { return PRODUCER_FINISHED; } else { buffer->data_size = 0; return PRODUCER_ERROR; } }}int device_write_consumer(gpointer devicep, queue_buffer_t *buffer) { Device* device; unsigned int write_size; device = (Device*) devicep; g_assert(IS_DEVICE(device)); write_size = MIN(buffer->data_size, device_write_max_size(device)); if (device_write_block(device, write_size, buffer->data + buffer->offset, buffer->data_size < device_write_min_size(device))) { /* Success! */ return write_size; } else { /* Nope, really an error. */ return -1; }}producer_result_t fd_read_producer(gpointer fdp, queue_buffer_t *buffer, int hint_size) { int fd; fd = GPOINTER_TO_INT(fdp); g_assert(fd >= 0); g_assert(buffer->data_size == 0); buffer->offset = 0; if (buffer->data == NULL) { /* Set up the buffer. */ buffer->data = malloc(hint_size); buffer->alloc_size = hint_size; } for (;;) { int result; result = read(fd, buffer->data, buffer->alloc_size); if (result > 0) { buffer->data_size = result; return PRODUCER_MORE; } else if (result == 0) { /* End of file. */ return PRODUCER_FINISHED; } else if (0#ifdef EAGAIN || errno == EAGAIN#endif#ifdef EWOULDBLOCK || errno == EWOULDBLOCK#endif#ifdef EINTR || errno == EINTR#endif ) { /* Try again. */ continue; } else { /* Error occured. */ g_fprintf(stderr, "Error reading fd %d: %s\n", fd, strerror(errno)); return PRODUCER_ERROR; } }}int fd_write_consumer(gpointer fdp, queue_buffer_t *buffer) { int fd; fd = GPOINTER_TO_INT(fdp); g_assert(fd >= 0); g_return_val_if_fail(buffer->data_size > 0, 0); for (;;) { int write_size; write_size = write(fd, buffer->data + buffer->offset, buffer->data_size); if (write_size > 0) { return write_size; } else if (0#ifdef EAGAIN || errno == EAGAIN#endif#ifdef EWOULDBLOCK || errno == EWOULDBLOCK#endif#ifdef EINTR || errno == EINTR#endif ) { /* Try again. */ continue; } else { /* Error occured. */ g_fprintf(stderr, "Error writing fd %d: %s\n", fd, strerror(errno)); return -1; } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -