📄 threads.c
字号:
/* * threads.c request threading support * * Version: $Id: threads.c,v 1.142 2008/04/11 08:58:52 aland Exp $ * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA * * Copyright 2000,2006 The FreeRADIUS server project * Copyright 2000 Alan DeKok <aland@ox.org> */#include <freeradius-devel/ident.h>RCSID("$Id: threads.c,v 1.142 2008/04/11 08:58:52 aland Exp $")#include <freeradius-devel/radiusd.h>#include <freeradius-devel/rad_assert.h>/* * Other OS's have sem_init, OS X doesn't. */#ifdef HAVE_SEMAPHORE_H#include <semaphore.h>#endif#ifdef DARWIN#include <mach/task.h>#include <mach/semaphore.h>#undef sem_t#define sem_t semaphore_t#undef sem_init#define sem_init(s,p,c) semaphore_create(mach_task_self(),s,SYNC_POLICY_FIFO,c)#undef sem_wait#define sem_wait(s) semaphore_wait(*s)#undef sem_post#define sem_post(s) semaphore_signal(*s)#endif#ifdef HAVE_SYS_WAIT_H#include <sys/wait.h>#endif#ifdef HAVE_PTHREAD_H#ifdef HAVE_OPENSSL_CRYPTO_H#include <openssl/crypto.h>#endif#ifdef HAVE_OPENSSL_ERR_H#include <openssl/err.h>#endif#ifdef HAVE_OPENSSL_EVP_H#include <openssl/evp.h>#endif#define SEMAPHORE_LOCKED (0)#define SEMAPHORE_UNLOCKED (1)#define THREAD_RUNNING (1)#define THREAD_CANCELLED (2)#define THREAD_EXITED (3)#define NUM_FIFOS RAD_LISTEN_MAX/* * A data structure which contains the information about * the current thread. * * pthread_id pthread id * thread_num server thread number, 1...number of threads * semaphore used to block the thread until a request comes in * status is the thread running or exited? * request_count the number of requests that this thread has handled * timestamp when the thread started executing. */typedef struct THREAD_HANDLE { struct THREAD_HANDLE *prev; struct THREAD_HANDLE *next; pthread_t pthread_id; int thread_num; int status; unsigned int request_count; time_t timestamp; REQUEST *request;} THREAD_HANDLE;/* * For the request queue. */typedef struct request_queue_t { REQUEST *request; RAD_REQUEST_FUNP fun;} request_queue_t;typedef struct thread_fork_t { pid_t pid; int status; int exited;} thread_fork_t;/* * A data structure to manage the thread pool. There's no real * need for a data structure, but it makes things conceptually * easier. */typedef struct THREAD_POOL { THREAD_HANDLE *head; THREAD_HANDLE *tail; int total_threads; int active_threads; /* protected by queue_mutex */ int max_thread_num; int start_threads; int max_threads; int min_spare_threads; int max_spare_threads; unsigned int max_requests_per_thread; unsigned long request_count; time_t time_last_spawned; int cleanup_delay; int spawn_flag; pthread_mutex_t wait_mutex; fr_hash_table_t *waiters; /* * All threads wait on this semaphore, for requests * to enter the queue. */ sem_t semaphore; /* * To ensure only one thread at a time touches the queue. */ pthread_mutex_t queue_mutex; int max_queue_size; int num_queued; fr_fifo_t *fifo[NUM_FIFOS];} THREAD_POOL;static THREAD_POOL thread_pool;static int pool_initialized = FALSE;static time_t last_cleaned = 0;static void thread_pool_manage(time_t now);/* * A mapping of configuration file names to internal integers */static const CONF_PARSER thread_config[] = { { "start_servers", PW_TYPE_INTEGER, 0, &thread_pool.start_threads, "5" }, { "max_servers", PW_TYPE_INTEGER, 0, &thread_pool.max_threads, "32" }, { "min_spare_servers", PW_TYPE_INTEGER, 0, &thread_pool.min_spare_threads, "3" }, { "max_spare_servers", PW_TYPE_INTEGER, 0, &thread_pool.max_spare_threads, "10" }, { "max_requests_per_server", PW_TYPE_INTEGER, 0, &thread_pool.max_requests_per_thread, "0" }, { "cleanup_delay", PW_TYPE_INTEGER, 0, &thread_pool.cleanup_delay, "5" }, { "max_queue_size", PW_TYPE_INTEGER, 0, &thread_pool.max_queue_size, "65536" }, { NULL, -1, 0, NULL, NULL }};#ifdef HAVE_OPENSSL_CRYPTO_H/* * If we're linking against OpenSSL, then it is the * duty of the application, if it is multithreaded, * to provide OpenSSL with appropriate thread id * and mutex locking functions * * Note: this only implements static callbacks. * OpenSSL does not use dynamic locking callbacks * right now, but may in the futiure, so we will have * to add them at some point. */static pthread_mutex_t *ssl_mutexes = NULL;static unsigned long ssl_id_function(void){ return (unsigned long) pthread_self();}static void ssl_locking_function(int mode, int n, const char *file, int line){ file = file; /* -Wunused */ line = line; /* -Wunused */ if (mode & CRYPTO_LOCK) { pthread_mutex_lock(&(ssl_mutexes[n])); } else { pthread_mutex_unlock(&(ssl_mutexes[n])); }}static int setup_ssl_mutexes(void){ int i;#ifdef HAVE_OPENSSL_EVP_H /* * Enable all ciphers and digests. */ OpenSSL_add_all_algorithms();#endif ssl_mutexes = rad_malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t)); if (!ssl_mutexes) { radlog(L_ERR, "Error allocating memory for SSL mutexes!"); return 0; } for (i = 0; i < CRYPTO_num_locks(); i++) { pthread_mutex_init(&(ssl_mutexes[i]), NULL); } CRYPTO_set_id_callback(ssl_id_function); CRYPTO_set_locking_callback(ssl_locking_function); return 1;}#endif/* * We don't want to catch SIGCHLD for a host of reasons. * * - exec_wait means that someone, somewhere, somewhen, will * call waitpid(), and catch the child. * * - SIGCHLD is delivered to a random thread, not the one that * forked. * * - if another thread catches the child, we have to coordinate * with the thread doing the waiting. * * - if we don't waitpid() for non-wait children, they'll be zombies, * and will hang around forever. * */static void reap_children(void){ pid_t pid; int status; thread_fork_t mytf, *tf; pthread_mutex_lock(&thread_pool.wait_mutex); do { pid = waitpid(0, &status, WNOHANG); if (pid <= 0) break; mytf.pid = pid; tf = fr_hash_table_finddata(thread_pool.waiters, &mytf); if (!tf) continue; tf->status = status; tf->exited = 1; } while (fr_hash_table_num_elements(thread_pool.waiters) > 0); pthread_mutex_unlock(&thread_pool.wait_mutex);}/* * Add a request to the list of waiting requests. * This function gets called ONLY from the main handler thread... * * This function should never fail. */static int request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun){ request_queue_t *entry; pthread_mutex_lock(&thread_pool.queue_mutex); thread_pool.request_count++; if (thread_pool.num_queued >= thread_pool.max_queue_size) { pthread_mutex_unlock(&thread_pool.queue_mutex); /* * Mark the request as done. */ radlog(L_ERR, "!!! ERROR !!! The server is blocked: discarding new request %d", request->number); request->child_state = REQUEST_DONE; return 0; } entry = rad_malloc(sizeof(*entry)); entry->request = request; entry->fun = fun; /* * Push the request onto the appropriate fifo for that */ if (!fr_fifo_push(thread_pool.fifo[request->priority], entry)) { pthread_mutex_unlock(&thread_pool.queue_mutex); radlog(L_ERR, "!!! ERROR !!! Failed inserting request %d into the queue", request->number); request->child_state = REQUEST_DONE; return 0; } thread_pool.num_queued++; pthread_mutex_unlock(&thread_pool.queue_mutex); /* * There's one more request in the queue. * * Note that we're not touching the queue any more, so * the semaphore post is outside of the mutex. This also * means that when the thread wakes up and tries to lock * the mutex, it will be unlocked, and there won't be * contention. */ sem_post(&thread_pool.semaphore); return 1;}/* * Remove a request from the queue. */static int request_dequeue(REQUEST **request, RAD_REQUEST_FUNP *fun){ RAD_LISTEN_TYPE i, start; request_queue_t *entry; reap_children(); pthread_mutex_lock(&thread_pool.queue_mutex); /* * Clear old requests from all queues. * * We only do one pass over the queue, in order to * amortize the work across the child threads. Since we * do N checks for one request de-queued, the old * requests will be quickly cleared. */ for (i = 0; i < RAD_LISTEN_MAX; i++) { entry = fr_fifo_peek(thread_pool.fifo[i]); if (!entry || (entry->request->master_state != REQUEST_STOP_PROCESSING)) { continue;} /* * This entry was marked to be stopped. Acknowledge it. */ entry = fr_fifo_pop(thread_pool.fifo[i]); rad_assert(entry != NULL); entry->request->child_state = REQUEST_DONE; thread_pool.num_queued--; free(entry); } start = 0; retry: /* * Pop results from the top of the queue */ for (i = start; i < RAD_LISTEN_MAX; i++) { entry = fr_fifo_pop(thread_pool.fifo[i]); if (entry) { start = i; break; } } if (!entry) { pthread_mutex_unlock(&thread_pool.queue_mutex); *request = NULL; *fun = NULL; return 0; } rad_assert(thread_pool.num_queued > 0); thread_pool.num_queued--; *request = entry->request; *fun = entry->fun; free(entry); rad_assert(*request != NULL); rad_assert((*request)->magic == REQUEST_MAGIC); rad_assert(*fun != NULL); /* * If the request has sat in the queue for too long, * kill it. * * The main clean-up code can't delete the request from * the queue, and therefore won't clean it up until we * have acknowledged it as "done". */ if ((*request)->master_state == REQUEST_STOP_PROCESSING) { (*request)->child_state = REQUEST_DONE; goto retry; } /* * The thread is currently processing a request. */ thread_pool.active_threads++; pthread_mutex_unlock(&thread_pool.queue_mutex); return 1;}/* * The main thread handler for requests. * * Wait on the semaphore until we have it, and process the request. */static void *request_handler_thread(void *arg){ RAD_REQUEST_FUNP fun; THREAD_HANDLE *self = (THREAD_HANDLE *) arg; /* * Loop forever, until told to exit. */ do { /* * Wait to be signalled. */ DEBUG2("Thread %d waiting to be assigned a request", self->thread_num); re_wait: if (sem_wait(&thread_pool.semaphore) != 0) { /* * Interrupted system call. Go back to * waiting, but DON'T print out any more * text. */ if (errno == EINTR) { DEBUG2("Re-wait %d", self->thread_num); goto re_wait; } radlog(L_ERR, "Thread %d failed waiting for semaphore: %s: Exiting\n", self->thread_num, strerror(errno)); break; } DEBUG2("Thread %d got semaphore", self->thread_num); /* * Try to grab a request from the queue. * * It may be empty, in which case we fail * gracefully. */ if (!request_dequeue(&self->request, &fun)) continue; self->request->child_pid = self->pthread_id; self->request_count++; DEBUG2("Thread %d handling request %d, (%d handled so far)", self->thread_num, self->request->number, self->request_count); radius_handle_request(self->request, fun); /* * Update the active threads. */ pthread_mutex_lock(&thread_pool.queue_mutex); rad_assert(thread_pool.active_threads > 0); thread_pool.active_threads--; pthread_mutex_unlock(&thread_pool.queue_mutex); } while (self->status != THREAD_CANCELLED); DEBUG2("Thread %d exiting...", self->thread_num);#ifdef HAVE_OPENSSL_ERR_H /* * If we linked with OpenSSL, the application * must remove the thread's error queue before * exiting to prevent memory leaks. */ ERR_remove_state(0);#endif /* * Do this as the LAST thing before exiting. */ self->request = NULL; self->status = THREAD_EXITED; return NULL;}/* * Take a THREAD_HANDLE, delete it from the thread pool and * free its resources. * * This function is called ONLY from the main server thread, * ONLY after the thread has exited. */static void delete_thread(THREAD_HANDLE *handle){ THREAD_HANDLE *prev; THREAD_HANDLE *next; rad_assert(handle->request == NULL); DEBUG2("Deleting thread %d", handle->thread_num); prev = handle->prev; next = handle->next; rad_assert(thread_pool.total_threads > 0); thread_pool.total_threads--; /* * Remove the handle from the list. */ if (prev == NULL) { rad_assert(thread_pool.head == handle); thread_pool.head = next; } else { prev->next = next; } if (next == NULL) { rad_assert(thread_pool.tail == handle); thread_pool.tail = prev; } else { next->prev = prev; } /* * Free the handle, now that it's no longer referencable. */ free(handle);}/* * Spawn a new thread, and place it in the thread pool. * * The thread is started initially in the blocked state, waiting * for the semaphore. */static THREAD_HANDLE *spawn_thread(time_t now){
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -