📄 gw-prioqueue.c
字号:
/* ==================================================================== * The Kannel Software License, Version 1.0 * * Copyright (c) 2001-2004 Kannel Group * Copyright (c) 1998-2001 WapIT Ltd. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Kannel Group (http://www.kannel.org/)." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Kannel" and "Kannel Group" must not be used to * endorse or promote products derived from this software without * prior written permission. For written permission, please * contact org@kannel.org. * * 5. Products derived from this software may not be called "Kannel", * nor may "Kannel" appear in their name, without prior written * permission of the Kannel Group. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE KANNEL GROUP OR ITS CONTRIBUTORS * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Kannel Group. For more information on * the Kannel Group, please see <http://www.kannel.org/>. * * Portions of this software are based upon software originally written at * WapIT Ltd., Helsinki, Finland for the Kannel project. */ /* * gw-prioqueue.c - generic priority queue. * * Algorithm ala Robert Sedgewick. * * Alexander Malysh <olek2002 at hotmail.com>, 2004 */#include "gw-config.h"#include <pthread.h>#include "thread.h"#include "gwmem.h"#include "gwassert.h"#include "gwthread.h"#include "gw-prioqueue.h"struct gw_prioqueue { Mutex *mutex; void **tab; size_t size; long len; long producers; pthread_cond_t nonempty; int (*cmp)(const void*, const void *);};static void inline queue_lock(gw_prioqueue_t *queue){ mutex_lock(queue->mutex);}static void inline queue_unlock(gw_prioqueue_t *queue){ mutex_unlock(queue->mutex);}static void make_bigger(gw_prioqueue_t *queue, long items){ size_t size = queue->size; size_t new_size = sizeof(void*) * (queue->len + items); if (size >= new_size) return; queue->tab = gw_realloc(queue->tab, new_size); queue->size = new_size;}/** * Heapize up * @queue - our prioqueue + @index - start index */static void upheap(gw_prioqueue_t *queue, register long index){ void *v = queue->tab[index]; while (queue->tab[index / 2] != NULL && queue->cmp(queue->tab[index / 2], v) <= 0) { queue->tab[index] = queue->tab[index / 2]; index /= 2; } queue->tab[index] = v;}/** * Heapize down * @queue - our prioqueue * @index - start index */static void downheap(gw_prioqueue_t *queue, register long index){ void *v = queue->tab[index]; register long j; while (index <= queue->len / 2) { j = 2 * index; /* take the biggest child item */ if (j < queue->len && queue->cmp(queue->tab[j], queue->tab[j + 1]) < 0) j++; /* break if our item bigger */ if (queue->cmp(v, queue->tab[j]) >= 0) break; queue->tab[index] = queue->tab[j]; index = j; } queue->tab[index] = v;}gw_prioqueue_t *gw_prioqueue_create(int(*cmp)(const void*, const void *)){ gw_prioqueue_t *ret; gw_assert(cmp != NULL); ret = gw_malloc(sizeof(*ret)); ret->producers = 0; pthread_cond_init(&ret->nonempty, NULL); ret->mutex = mutex_create(); ret->tab = NULL; ret->size = 0; ret->len = 0; ret->cmp = cmp; /* put NULL item at pos 0 that is our stop marker */ make_bigger(ret, 1); ret->tab[0] = NULL; ret->len++; return ret;}void gw_prioqueue_destroy(gw_prioqueue_t *queue, void(*item_destroy)(void*)){ if (queue == NULL) return; if (item_destroy != NULL) { void *item; while((item = gw_prioqueue_remove(queue)) != NULL) item_destroy(item); } mutex_destroy(queue->mutex); pthread_cond_destroy(&queue->nonempty); gw_free(queue->tab); gw_free(queue);}long gw_prioqueue_len(gw_prioqueue_t *queue){ long len; if (queue == NULL) return 0; queue_lock(queue); len = queue->len - 1; queue_unlock(queue); return len;}void gw_prioqueue_insert(gw_prioqueue_t *queue, void *item){ gw_assert(queue != NULL); gw_assert(item != NULL); queue_lock(queue); make_bigger(queue, 1); queue->tab[queue->len] = item; upheap(queue, queue->len); queue->len++; pthread_cond_signal(&queue->nonempty); queue_unlock(queue);}void gw_prioqueue_foreach(gw_prioqueue_t *queue, void(*fn)(const void *, long)){ register long i; gw_assert(queue != NULL && fn != NULL); queue_lock(queue); for (i = 1; i < queue->len; i++) fn(queue->tab[i], i - 1); queue_unlock(queue);}void *gw_prioqueue_remove(gw_prioqueue_t *queue){ void *ret; gw_assert(queue != NULL); queue_lock(queue); if (queue->len <= 1) { queue_unlock(queue); return NULL; } ret = queue->tab[1]; queue->tab[1] = queue->tab[--queue->len]; downheap(queue, 1); queue_unlock(queue); return ret;}void *gw_prioqueue_get(gw_prioqueue_t *queue){ void *ret; gw_assert(queue != NULL); queue_lock(queue); if (queue->len > 1) ret = queue->tab[1]; else ret = NULL; queue_unlock(queue); return ret;}void *gw_prioqueue_consume(gw_prioqueue_t *queue){ void *ret; gw_assert(queue != NULL); queue_lock(queue); while (queue->len == 1 && queue->producers > 0) { queue->mutex->owner = -1; pthread_cond_wait(&queue->nonempty, &queue->mutex->mutex); queue->mutex->owner = gwthread_self(); } if (queue->len > 1) { ret = queue->tab[1]; queue->tab[1] = queue->tab[--queue->len]; downheap(queue, 1); } else { ret = NULL; } queue_unlock(queue); return ret;}void gw_prioqueue_add_producer(gw_prioqueue_t *queue){ gw_assert(queue != NULL); queue_lock(queue); queue->producers++; queue_unlock(queue);}void gw_prioqueue_remove_producer(gw_prioqueue_t *queue){ gw_assert(queue != NULL); queue_lock(queue); gw_assert(queue->producers > 0); queue->producers--; pthread_cond_broadcast(&queue->nonempty); queue_unlock(queue);}long gw_prioqueue_producer_count(gw_prioqueue_t *queue){ long ret; gw_assert(queue != NULL); queue_lock(queue); ret = queue->producers; queue_unlock(queue); return ret;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -