⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 threads.c

📁 使用最广泛的radius的linux的源码
💻 C
📖 第 1 页 / 共 2 页
字号:
/* * threads.c	request threading support * * Version:	$Id: threads.c,v 1.142 2008/04/11 08:58:52 aland Exp $ * *   This program is free software; you can redistribute it and/or modify *   it under the terms of the GNU General Public License as published by *   the Free Software Foundation; either version 2 of the License, or *   (at your option) any later version. * *   This program is distributed in the hope that it will be useful, *   but WITHOUT ANY WARRANTY; without even the implied warranty of *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the *   GNU General Public License for more details. * *   You should have received a copy of the GNU General Public License *   along with this program; if not, write to the Free Software *   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA * * Copyright 2000,2006  The FreeRADIUS server project * Copyright 2000  Alan DeKok <aland@ox.org> */#include <freeradius-devel/ident.h>RCSID("$Id: threads.c,v 1.142 2008/04/11 08:58:52 aland Exp $")#include <freeradius-devel/radiusd.h>#include <freeradius-devel/rad_assert.h>/* *	Other OS's have sem_init, OS X doesn't. */#ifdef HAVE_SEMAPHORE_H#include <semaphore.h>#endif#ifdef DARWIN#include <mach/task.h>#include <mach/semaphore.h>#undef sem_t#define sem_t semaphore_t#undef sem_init#define sem_init(s,p,c) semaphore_create(mach_task_self(),s,SYNC_POLICY_FIFO,c)#undef sem_wait#define sem_wait(s) semaphore_wait(*s)#undef sem_post#define sem_post(s) semaphore_signal(*s)#endif#ifdef HAVE_SYS_WAIT_H#include <sys/wait.h>#endif#ifdef HAVE_PTHREAD_H#ifdef HAVE_OPENSSL_CRYPTO_H#include <openssl/crypto.h>#endif#ifdef HAVE_OPENSSL_ERR_H#include <openssl/err.h>#endif#ifdef HAVE_OPENSSL_EVP_H#include <openssl/evp.h>#endif#define SEMAPHORE_LOCKED	(0)#define SEMAPHORE_UNLOCKED	(1)#define THREAD_RUNNING		(1)#define THREAD_CANCELLED	(2)#define THREAD_EXITED		(3)#define NUM_FIFOS               RAD_LISTEN_MAX/* *  A data structure which contains the information about *  the current thread. * *  pthread_id     pthread id *  thread_num     server thread number, 1...number of threads *  semaphore     used to block the thread until a request comes in *  status        is the thread running or exited? *  request_count the number of requests that this thread has handled *  timestamp     when the thread started executing. */typedef struct THREAD_HANDLE {	struct THREAD_HANDLE *prev;	struct THREAD_HANDLE *next;	pthread_t            pthread_id;	int                  thread_num;	int                  status;	unsigned int         request_count;	time_t               timestamp;	REQUEST		     *request;} THREAD_HANDLE;/* *	For the request queue. */typedef struct request_queue_t {	REQUEST	    	  *request;	RAD_REQUEST_FUNP  fun;} request_queue_t;typedef struct thread_fork_t {	pid_t		pid;	int		status;	int		exited;} thread_fork_t;/* *	A data structure to manage the thread pool.  There's no real *	need for a data structure, but it makes things conceptually *	easier. */typedef struct THREAD_POOL {	THREAD_HANDLE *head;	THREAD_HANDLE *tail;	int total_threads;	int active_threads;	/* protected by queue_mutex */	int max_thread_num;	int start_threads;	int max_threads;	int min_spare_threads;	int max_spare_threads;	unsigned int max_requests_per_thread;	unsigned long request_count;	time_t time_last_spawned;	int cleanup_delay;	int spawn_flag;	pthread_mutex_t	wait_mutex;	fr_hash_table_t *waiters;	/*	 *	All threads wait on this semaphore, for requests	 *	to enter the queue.	 */	sem_t		semaphore;	/*	 *	To ensure only one thread at a time touches the queue.	 */	pthread_mutex_t	queue_mutex;	int		max_queue_size;	int		num_queued;	fr_fifo_t	*fifo[NUM_FIFOS];} THREAD_POOL;static THREAD_POOL thread_pool;static int pool_initialized = FALSE;static time_t last_cleaned = 0;static void thread_pool_manage(time_t now);/* *	A mapping of configuration file names to internal integers */static const CONF_PARSER thread_config[] = {	{ "start_servers",           PW_TYPE_INTEGER, 0, &thread_pool.start_threads,           "5" },	{ "max_servers",             PW_TYPE_INTEGER, 0, &thread_pool.max_threads,             "32" },	{ "min_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.min_spare_threads,       "3" },	{ "max_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.max_spare_threads,       "10" },	{ "max_requests_per_server", PW_TYPE_INTEGER, 0, &thread_pool.max_requests_per_thread, "0" },	{ "cleanup_delay",           PW_TYPE_INTEGER, 0, &thread_pool.cleanup_delay,           "5" },	{ "max_queue_size",          PW_TYPE_INTEGER, 0, &thread_pool.max_queue_size,           "65536" },	{ NULL, -1, 0, NULL, NULL }};#ifdef HAVE_OPENSSL_CRYPTO_H/* *	If we're linking against OpenSSL, then it is the *	duty of the application, if it is multithreaded, *	to provide OpenSSL with appropriate thread id *	and mutex locking functions * *	Note: this only implements static callbacks. *	OpenSSL does not use dynamic locking callbacks *	right now, but may in the futiure, so we will have *	to add them at some point. */static pthread_mutex_t *ssl_mutexes = NULL;static unsigned long ssl_id_function(void){	return (unsigned long) pthread_self();}static void ssl_locking_function(int mode, int n, const char *file, int line){	file = file;		/* -Wunused */	line = line;		/* -Wunused */	if (mode & CRYPTO_LOCK) {		pthread_mutex_lock(&(ssl_mutexes[n]));	} else {		pthread_mutex_unlock(&(ssl_mutexes[n]));	}}static int setup_ssl_mutexes(void){	int i;#ifdef HAVE_OPENSSL_EVP_H	/*	 *	Enable all ciphers and digests.	 */	OpenSSL_add_all_algorithms();#endif	ssl_mutexes = rad_malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t));	if (!ssl_mutexes) {		radlog(L_ERR, "Error allocating memory for SSL mutexes!");		return 0;	}	for (i = 0; i < CRYPTO_num_locks(); i++) {		pthread_mutex_init(&(ssl_mutexes[i]), NULL);	}	CRYPTO_set_id_callback(ssl_id_function);	CRYPTO_set_locking_callback(ssl_locking_function);	return 1;}#endif/* *	We don't want to catch SIGCHLD for a host of reasons. * *	- exec_wait means that someone, somewhere, somewhen, will *	call waitpid(), and catch the child. * *	- SIGCHLD is delivered to a random thread, not the one that *	forked. * *	- if another thread catches the child, we have to coordinate *	with the thread doing the waiting. * *	- if we don't waitpid() for non-wait children, they'll be zombies, *	and will hang around forever. * */static void reap_children(void){	pid_t pid;	int status;	thread_fork_t mytf, *tf;	pthread_mutex_lock(&thread_pool.wait_mutex);	do {		pid = waitpid(0, &status, WNOHANG);		if (pid <= 0) break;		mytf.pid = pid;		tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);		if (!tf) continue;		tf->status = status;		tf->exited = 1;	} while (fr_hash_table_num_elements(thread_pool.waiters) > 0);	pthread_mutex_unlock(&thread_pool.wait_mutex);}/* *	Add a request to the list of waiting requests. *	This function gets called ONLY from the main handler thread... * *	This function should never fail. */static int request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun){	request_queue_t *entry;	pthread_mutex_lock(&thread_pool.queue_mutex);	thread_pool.request_count++;	if (thread_pool.num_queued >= thread_pool.max_queue_size) {		pthread_mutex_unlock(&thread_pool.queue_mutex);		/*		 *	Mark the request as done.		 */		radlog(L_ERR, "!!! ERROR !!! The server is blocked: discarding new request %d", request->number);		request->child_state = REQUEST_DONE;		return 0;	}	entry = rad_malloc(sizeof(*entry));	entry->request = request;	entry->fun = fun;	/*	 *	Push the request onto the appropriate fifo for that	 */	if (!fr_fifo_push(thread_pool.fifo[request->priority],			    entry)) {		pthread_mutex_unlock(&thread_pool.queue_mutex);		radlog(L_ERR, "!!! ERROR !!! Failed inserting request %d into the queue", request->number);		request->child_state = REQUEST_DONE;		return 0;	}	thread_pool.num_queued++;	pthread_mutex_unlock(&thread_pool.queue_mutex);	/*	 *	There's one more request in the queue.	 *	 *	Note that we're not touching the queue any more, so	 *	the semaphore post is outside of the mutex.  This also	 *	means that when the thread wakes up and tries to lock	 *	the mutex, it will be unlocked, and there won't be	 *	contention.	 */	sem_post(&thread_pool.semaphore);	return 1;}/* *	Remove a request from the queue. */static int request_dequeue(REQUEST **request, RAD_REQUEST_FUNP *fun){	RAD_LISTEN_TYPE i, start;	request_queue_t *entry;	reap_children();	pthread_mutex_lock(&thread_pool.queue_mutex);	/*	 *	Clear old requests from all queues.	 *	 *	We only do one pass over the queue, in order to	 *	amortize the work across the child threads.  Since we	 *	do N checks for one request de-queued, the old	 *	requests will be quickly cleared.	 */	for (i = 0; i < RAD_LISTEN_MAX; i++) {		entry = fr_fifo_peek(thread_pool.fifo[i]);		if (!entry ||		    (entry->request->master_state != REQUEST_STOP_PROCESSING)) {			continue;}		/*		 *	This entry was marked to be stopped.  Acknowledge it.		 */		entry = fr_fifo_pop(thread_pool.fifo[i]);		rad_assert(entry != NULL);		entry->request->child_state = REQUEST_DONE;		thread_pool.num_queued--;		free(entry);	}	start = 0; retry:	/*	 *	Pop results from the top of the queue	 */	for (i = start; i < RAD_LISTEN_MAX; i++) {		entry = fr_fifo_pop(thread_pool.fifo[i]);		if (entry) {			start = i;			break;		}	}	if (!entry) {		pthread_mutex_unlock(&thread_pool.queue_mutex);		*request = NULL;		*fun = NULL;		return 0;	}	rad_assert(thread_pool.num_queued > 0);	thread_pool.num_queued--;	*request = entry->request;	*fun = entry->fun;	free(entry);	rad_assert(*request != NULL);	rad_assert((*request)->magic == REQUEST_MAGIC);	rad_assert(*fun != NULL);	/*	 *	If the request has sat in the queue for too long,	 *	kill it.	 *	 *	The main clean-up code can't delete the request from	 *	the queue, and therefore won't clean it up until we	 *	have acknowledged it as "done".	 */	if ((*request)->master_state == REQUEST_STOP_PROCESSING) {		(*request)->child_state = REQUEST_DONE;		goto retry;	}	/*	 *	The thread is currently processing a request.	 */	thread_pool.active_threads++;	pthread_mutex_unlock(&thread_pool.queue_mutex);	return 1;}/* *	The main thread handler for requests. * *	Wait on the semaphore until we have it, and process the request. */static void *request_handler_thread(void *arg){	RAD_REQUEST_FUNP  fun;	THREAD_HANDLE	  *self = (THREAD_HANDLE *) arg;	/*	 *	Loop forever, until told to exit.	 */	do {		/*		 *	Wait to be signalled.		 */		DEBUG2("Thread %d waiting to be assigned a request",		       self->thread_num);	re_wait:		if (sem_wait(&thread_pool.semaphore) != 0) {			/*			 *	Interrupted system call.  Go back to			 *	waiting, but DON'T print out any more			 *	text.			 */			if (errno == EINTR) {				DEBUG2("Re-wait %d", self->thread_num);				goto re_wait;			}			radlog(L_ERR, "Thread %d failed waiting for semaphore: %s: Exiting\n",			       self->thread_num, strerror(errno));			break;		}		DEBUG2("Thread %d got semaphore", self->thread_num);		/*		 *	Try to grab a request from the queue.		 *		 *	It may be empty, in which case we fail		 *	gracefully.		 */		if (!request_dequeue(&self->request, &fun)) continue;		self->request->child_pid = self->pthread_id;		self->request_count++;		DEBUG2("Thread %d handling request %d, (%d handled so far)",		       self->thread_num, self->request->number,		       self->request_count);		radius_handle_request(self->request, fun);		/*		 *	Update the active threads.		 */		pthread_mutex_lock(&thread_pool.queue_mutex);		rad_assert(thread_pool.active_threads > 0);		thread_pool.active_threads--;		pthread_mutex_unlock(&thread_pool.queue_mutex);	} while (self->status != THREAD_CANCELLED);	DEBUG2("Thread %d exiting...", self->thread_num);#ifdef HAVE_OPENSSL_ERR_H	/*	 *	If we linked with OpenSSL, the application	 *	must remove the thread's error queue before	 *	exiting to prevent memory leaks.	 */	ERR_remove_state(0);#endif	/*	 *  Do this as the LAST thing before exiting.	 */	self->request = NULL;	self->status = THREAD_EXITED;	return NULL;}/* *	Take a THREAD_HANDLE, delete it from the thread pool and *	free its resources. * *	This function is called ONLY from the main server thread, *	ONLY after the thread has exited. */static void delete_thread(THREAD_HANDLE *handle){	THREAD_HANDLE *prev;	THREAD_HANDLE *next;	rad_assert(handle->request == NULL);	DEBUG2("Deleting thread %d", handle->thread_num);	prev = handle->prev;	next = handle->next;	rad_assert(thread_pool.total_threads > 0);	thread_pool.total_threads--;	/*	 *	Remove the handle from the list.	 */	if (prev == NULL) {		rad_assert(thread_pool.head == handle);		thread_pool.head = next;	} else {		prev->next = next;	}	if (next == NULL) {		rad_assert(thread_pool.tail == handle);		thread_pool.tail = prev;	} else {		next->prev = prev;	}	/*	 *	Free the handle, now that it's no longer referencable.	 */	free(handle);}/* *	Spawn a new thread, and place it in the thread pool. * *	The thread is started initially in the blocked state, waiting *	for the semaphore. */static THREAD_HANDLE *spawn_thread(time_t now){

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -