⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 event_msgqueue.c

📁 spserver 是一个实现了半同步/半异步(Half-Sync/Half-Async)和领导者/追随者(Leader/Follower) 模式的服务器框架
💻 C
字号:
/* * event_msgqueue.c *  * Libevent threaded message passing primitives *  * Andrew Danforth <acd@weirdness.net>, October 2006 *  * Copyright (c) Andrew Danforth, 2006 *  */#include <stdlib.h>#include <string.h>#include <unistd.h>#include <sys/types.h>#include <sys/socket.h>#include "config.h"#include "event_msgqueue.h"struct circqueue {   unsigned int head;   unsigned int tail;   unsigned int count;   unsigned int max_entries;   unsigned int array_elements;   void **entries;};#define DEFAULT_UNBOUNDED_QUEUE_SIZE 1024struct event_msgqueue {   int push_fd;   int pop_fd;   int unlock_between_callbacks;   struct event queue_ev;   pthread_mutex_t lock;   void (*callback)(void *, void *);   void *cbarg;   struct circqueue *queue;};static unsigned int nextpow2(unsigned int num) {    --num;    num |= num >> 1;    num |= num >> 2;    num |= num >> 4;    num |= num >> 8;    num |= num >> 16;    return ++num;}#define circqueue_get_length(q) ((q)->count)#define circqueue_is_empty(q) (!circqueue_get_length(q))#define circqueue_is_full(q) ((q)->count == (q)->array_elements)static struct circqueue *circqueue_new(unsigned int size) {   struct circqueue *cq;   if (!(cq = calloc(1, sizeof(struct circqueue))))      return(NULL);   cq->max_entries = size;   if (!size || !(cq->array_elements = nextpow2(size)))      cq->array_elements = DEFAULT_UNBOUNDED_QUEUE_SIZE;   cq->entries = malloc(sizeof(void *) * cq->array_elements);   if (!cq->entries) {      free(cq);      return(NULL);   }   return(cq);}static void circqueue_destroy(struct circqueue *cq) {   free(cq->entries);   free(cq);}static int circqueue_grow(struct circqueue *cq) {   void **newents;   unsigned int newsize = cq->array_elements << 1;   unsigned int headchunklen = 0, tailchunklen = 0;      if (!(newents = malloc(sizeof(void *) * newsize)))      return(-1);   if (cq->head < cq->tail)      headchunklen = cq->tail - cq->head;   else {      headchunklen = cq->array_elements - cq->head;      tailchunklen = cq->tail;   }   memcpy(newents, &cq->entries[cq->head], sizeof(void *) * headchunklen);   if (tailchunklen)      memcpy(&newents[headchunklen], cq->entries, sizeof(void *) * tailchunklen);   cq->head = 0;   cq->tail = headchunklen + tailchunklen;   cq->array_elements = newsize;   free(cq->entries);   cq->entries = newents;   return(0);}static int circqueue_push_tail(struct circqueue *cq, void *elem) {   if (cq->max_entries) {      if (cq->count == cq->max_entries)         return(-1);   } else if (circqueue_is_full(cq) && circqueue_grow(cq) != 0)       return(-1);   cq->count++;   cq->entries[cq->tail++] = elem;   cq->tail &= cq->array_elements - 1;   return(0);}static void *circqueue_pop_head(struct circqueue *cq) {   void *data;   if (!cq->count)      return(NULL);   cq->count--;   data = cq->entries[cq->head++];   cq->head &= cq->array_elements - 1;   return(data);}static void msgqueue_pop(int fd, short flags, void *arg) {   struct event_msgqueue *msgq = arg;   char buf[64];   read(fd, buf, sizeof(buf));   pthread_mutex_lock(&msgq->lock);   while(!circqueue_is_empty(msgq->queue)) {      void *qdata;      qdata = circqueue_pop_head(msgq->queue);      if (msgq->unlock_between_callbacks)         pthread_mutex_unlock(&msgq->lock);      msgq->callback(qdata, msgq->cbarg);      if (msgq->unlock_between_callbacks)         pthread_mutex_lock(&msgq->lock);   }   pthread_mutex_unlock(&msgq->lock);}struct event_msgqueue *msgqueue_new(struct event_base *base, unsigned int max_size, void (*callback)(void *, void *), void *cbarg) {   struct event_msgqueue *msgq;   struct circqueue *cq;   int fds[2];   if (!(cq = circqueue_new(max_size)))      return(NULL);   if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) != 0) {      circqueue_destroy(cq);      return(NULL);   }   if (!(msgq = malloc(sizeof(struct event_msgqueue)))) {      circqueue_destroy(cq);      close(fds[0]);      close(fds[1]);      return(NULL);   }   msgq->push_fd = fds[0];   msgq->pop_fd = fds[1];   msgq->queue = cq;   msgq->callback = callback;   msgq->cbarg = cbarg;   pthread_mutex_init(&msgq->lock, NULL);   event_set(&msgq->queue_ev, msgq->pop_fd, EV_READ | EV_PERSIST, msgqueue_pop, msgq);   event_base_set(base, &msgq->queue_ev);   event_add(&msgq->queue_ev, NULL);   msgq->unlock_between_callbacks = 1;      return(msgq);}void msgqueue_destroy(struct event_msgqueue *msgq){   for( ; msgqueue_length(msgq) > 0; ) {      sleep( 1 );   }   event_del(&msgq->queue_ev);   circqueue_destroy(msgq->queue);   close(msgq->push_fd);   close(msgq->pop_fd);   free(msgq);}int msgqueue_push(struct event_msgqueue *msgq, void *msg) {   const char buf[1] = { 0 };   int r = 0;   pthread_mutex_lock(&msgq->lock);   if ((r = circqueue_push_tail(msgq->queue, msg)) == 0) {      if (circqueue_get_length(msgq->queue) == 1)         write(msgq->push_fd, buf, 1);   }   pthread_mutex_unlock(&msgq->lock);   return(r);}unsigned int msgqueue_length(struct event_msgqueue *msgq) {   unsigned int len;   pthread_mutex_lock(&msgq->lock);   len = circqueue_get_length(msgq->queue);   pthread_mutex_unlock(&msgq->lock);   return(len);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -