⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 threads.c

📁 radius服务器
💻 C
📖 第 1 页 / 共 2 页
字号:
/* * threads.c	request threading support * * Version:	$Id: threads.c,v 1.77 2004/05/07 19:37:57 aland Exp $ * *   This program is free software; you can redistribute it and/or modify *   it under the terms of the GNU General Public License as published by *   the Free Software Foundation; either version 2 of the License, or *   (at your option) any later version. * *   This program is distributed in the hope that it will be useful, *   but WITHOUT ANY WARRANTY; without even the implied warranty of *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the *   GNU General Public License for more details. * *   You should have received a copy of the GNU General Public License *   along with this program; if not, write to the Free Software *   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA * * Copyright 2000  The FreeRADIUS server project * Copyright 2000  Alan DeKok <aland@ox.org> */#include "libradius.h"#ifdef HAVE_PTHREAD_H#include <stdlib.h>#include <string.h>#include <semaphore.h>#include <signal.h>#ifdef HAVE_SYS_WAIT_H#include <sys/wait.h>#endif#include "radiusd.h"#include "rad_assert.h"#include "conffile.h"static const char rcsid[] ="$Id: threads.c,v 1.77 2004/05/07 19:37:57 aland Exp $";#define SEMAPHORE_LOCKED	(0)#define SEMAPHORE_UNLOCKED	(1)#define THREAD_RUNNING		(1)#define THREAD_CANCELLED	(2)#define THREAD_EXITED		(3)/* *  A data structure which contains the information about *  the current thread. * *  pthread_id     pthread id *  thread_num     server thread number, 1...number of threads *  semaphore     used to block the thread until a request comes in *  status        is the thread running or exited? *  request_count the number of requests that this thread has handled *  timestamp     when the thread started executing. */typedef struct THREAD_HANDLE {	struct THREAD_HANDLE *prev;	struct THREAD_HANDLE *next;	pthread_t            pthread_id;	int                  thread_num;	int                  status;	unsigned int         request_count;	time_t               timestamp;	REQUEST		     *request;} THREAD_HANDLE;/* *	For the request queue. */typedef struct request_queue_t {	REQUEST	    	  *request;	RAD_REQUEST_FUNP  fun;} request_queue_t;/* *	A data structure to manage the thread pool.  There's no real *	need for a data structure, but it makes things conceptually *	easier. */typedef struct THREAD_POOL {	THREAD_HANDLE *head;	THREAD_HANDLE *tail;	int total_threads;	int max_thread_num;	int start_threads;	int max_threads;	int min_spare_threads;	int max_spare_threads;	unsigned int max_requests_per_thread;	unsigned long request_count;	time_t time_last_spawned;	int cleanup_delay;	/*	 *	All threads wait on this semaphore, for requests	 *	to enter the queue.	 */	sem_t		semaphore;	/*	 *	To ensure only one thread at a time touches the queue.	 */	pthread_mutex_t	mutex;	int		active_threads;	int		queue_head; /* first filled entry */	int		queue_tail; /* first empty entry */	int		queue_size;	request_queue_t *queue;} THREAD_POOL;static THREAD_POOL thread_pool;static int pool_initialized = FALSE;/* *	Data structure to keep track of which child forked which *	request.  If we cared, we'd keep a list of "free" and "active" *	entries. * *	FIXME: Have a time out, so we clean up entries which haven't *	been picked up! */typedef struct rad_fork_t {	pthread_t	thread_id;	pid_t		child_pid;	sem_t	 	child_done;	int		status;	/* exit status of the child */	time_t		time_forked;} rad_fork_t;/* *  This MUST be a power of 2 for it to work properly! */#define NUM_FORKERS (8192)static rad_fork_t forkers[NUM_FORKERS];/* *	This mutex ensures that only one thread is doing certain *	kinds of magic to the previous array. */static pthread_mutex_t fork_mutex;/* *	A mapping of configuration file names to internal integers */static const CONF_PARSER thread_config[] = {	{ "start_servers",           PW_TYPE_INTEGER, 0, &thread_pool.start_threads,           "5" },	{ "max_servers",             PW_TYPE_INTEGER, 0, &thread_pool.max_threads,             "32" },	{ "min_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.min_spare_threads,       "3" },	{ "max_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.max_spare_threads,       "10" },	{ "max_requests_per_server", PW_TYPE_INTEGER, 0, &thread_pool.max_requests_per_thread, "0" },	{ "cleanup_delay",           PW_TYPE_INTEGER, 0, &thread_pool.cleanup_delay,           "5" },	{ NULL, -1, 0, NULL, NULL }};/* *	Add a request to the list of waiting requests. *	This function gets called ONLY from the main handler thread... * *	This function should never fail. */static void request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun){	int num_entries;	pthread_mutex_lock(&thread_pool.mutex);	thread_pool.request_count++;	/*	 *	If the queue is empty, re-set the indices to zero,	 *	for no particular reason...	 */	if ((thread_pool.queue_head == thread_pool.queue_tail) &&	    (thread_pool.queue_head != 0)) {		thread_pool.queue_head = thread_pool.queue_tail = 0;	}	/*	 *	If the queue is full, die.	 *	 *	The math is to take into account the fact that it's a	 *	circular queue.	 */	num_entries = ((thread_pool.queue_tail + thread_pool.queue_size) -		       thread_pool.queue_head) % thread_pool.queue_size;	if (num_entries == (thread_pool.queue_size - 1)) {		request_queue_t *new_queue;		/*		 *	If the queue becomes larger than 65536,		 *	there's a serious problem.		 */		if (thread_pool.queue_size >= 65536) {			pthread_mutex_unlock(&thread_pool.mutex);			/*			 *	Mark the request as done.			 */			radlog(L_ERR|L_CONS, "!!! ERROR !!! The server is blocked: discarding new request %d", request->number);			request->finished = TRUE;			return;		}		/*		 *	Malloc a new queue, doubled in size, copy the		 *	data from the current queue over to it, zero		 *	out the second half of the queue, free the old		 *	one, and replace thread_pool.queue with the		 *	new one.		 */		new_queue = rad_malloc(sizeof(*new_queue) * thread_pool.queue_size * 2);		memcpy(new_queue, thread_pool.queue,		       sizeof(*new_queue) * thread_pool.queue_size);		memset(new_queue + sizeof(*new_queue) * thread_pool.queue_size,		       0, sizeof(*new_queue) * thread_pool.queue_size);		free(thread_pool.queue);		thread_pool.queue = new_queue;		thread_pool.queue_size *= 2;	}	/*	 *	Add the data to the queue tail, increment the tail,	 *	and signal the semaphore that there's another request	 *	in the queue.	 */	thread_pool.queue[thread_pool.queue_tail].request = request;	thread_pool.queue[thread_pool.queue_tail].fun = fun;	thread_pool.queue_tail++;	thread_pool.queue_tail &= (thread_pool.queue_size - 1);	pthread_mutex_unlock(&thread_pool.mutex);	/*	 *	There's one more request in the queue.	 *	 *	Note that we're not touching the queue any more, so	 *	the semaphore post is outside of the mutex.  This also	 *	means that when the thread wakes up and tries to lock	 *	the mutex, it will be unlocked, and there won't be	 *	contention.	 */	sem_post(&thread_pool.semaphore);	return;}/* *	Remove a request from the queue. */static void request_dequeue(REQUEST **request, RAD_REQUEST_FUNP *fun){	pthread_mutex_lock(&thread_pool.mutex);	/*	 *	Head & tail are the same.  There's nothing in	 *	the queue.	 */	if (thread_pool.queue_head == thread_pool.queue_tail) {		pthread_mutex_unlock(&thread_pool.mutex);		*request = NULL;		*fun = NULL;		return;	}	*request = thread_pool.queue[thread_pool.queue_head].request;	*fun = thread_pool.queue[thread_pool.queue_head].fun;	rad_assert(*request != NULL);	rad_assert((*request)->magic == REQUEST_MAGIC);	rad_assert(*fun != NULL);	thread_pool.queue_head++;	thread_pool.queue_head &= (thread_pool.queue_size - 1);	/*	 *	FIXME: Check the request timestamp.  If it's more than	 *	"clean_delay" seconds old, then discard the request,	 *	log an error, and try to de-queue another request.	 *	 *	The main clean-up code won't delete the request from	 *	the request list, because it's not marked "finished"	 */	/*	 *	The thread is currently processing a request.	 */	thread_pool.active_threads++;	pthread_mutex_unlock(&thread_pool.mutex);	/*	 *	If the request is currently being processed, then that	 *	MAY be OK, if it's a proxy reply.  In that case,	 *	sending the packet may result in a reply being	 *	received before that thread clears the child_pid.	 *	 *	In that case, we busy-wait for the request to be free.	 *	 *	We COULD push it onto the queue and try to grab	 *	another request, but what if this is the only request?	 *	What if there are multiple such packets with race	 *	conditions?  We don't want to thrash the queue...	 *	 *	This busy-wait is less than optimal, but it's simple,	 *	fail-safe, and it works.	 */	if ((*request)->child_pid != NO_SUCH_CHILD_PID) {		int count, ok;		struct timeval tv;#ifdef HAVE_PTHREAD_SIGMASK		sigset_t set, old_set;		/*		 *	Block a large number of signals which could		 *	cause the select to return EINTR		 */		sigemptyset(&set);		sigaddset(&set, SIGPIPE);		sigaddset(&set, SIGCONT);		sigaddset(&set, SIGSTOP);		sigaddset(&set, SIGCHLD);		pthread_sigmask(SIG_BLOCK, &set, &old_set);#endif		rad_assert((*request)->proxy_reply != NULL);		ok = FALSE;		/*		 *	Sleep for 100 milliseconds.  If the other thread		 *	doesn't get serviced in this time, to clear		 *	the "child_pid" entry, then the server is too		 *	busy, so we die.		 */		for (count = 0; count < 10; count++) {			tv.tv_sec = 0;			tv.tv_usec = 10000; /* sleep for 10 milliseconds */			/*			 *	Portable sleep that's thread-safe.			 *			 *	Don't worry about interrupts, as they're			 *	blocked above.			 */			select(0, NULL, NULL, NULL, &tv);			if ((*request)->child_pid == NO_SUCH_CHILD_PID) {				ok = TRUE;				break;			}		}#ifdef HAVE_PTHREAD_SIGMASK		/*		 *	Restore the original thread signal mask.		 */		pthread_sigmask(SIG_SETMASK, &old_set, NULL);#endif		if (!ok) {			radlog(L_ERR, "FATAL!  Server is too busy to process requests");			exit(1);		}	}	return;}/* *	The main thread handler for requests. * *	Wait on the semaphore until we have it, and process the request. */static void *request_handler_thread(void *arg){	RAD_REQUEST_FUNP  fun;	THREAD_HANDLE	  *self = (THREAD_HANDLE *) arg;#ifdef HAVE_PTHREAD_SIGMASK	sigset_t set;	/*	 *	Block SIGHUP handling for the child threads.	 *	 *	This ensures that only the main server thread will	 *	process HUP signals.	 *	 *	If we don't have sigprocmask, then it shouldn't be	 *	a problem, either, as the sig_hup handler should check	 *	for this condition.	 */	sigemptyset(&set);	sigaddset(&set, SIGHUP);	sigaddset(&set, SIGINT);	sigaddset(&set, SIGQUIT);	sigaddset(&set, SIGTERM);	pthread_sigmask(SIG_BLOCK, &set, NULL);#endif	/*	 *	Loop forever, until told to exit.	 */	do {		/*		 *	Wait to be signalled.		 */		DEBUG2("Thread %d waiting to be assigned a request",		       self->thread_num);	re_wait:		if (sem_wait(&thread_pool.semaphore) != 0) {			/*			 *	Interrupted system call.  Go back to			 *	waiting, but DON'T print out any more			 *	text.			 */			if (errno == EINTR) {				DEBUG2("Re-wait %d", self->thread_num);				goto re_wait;			}			radlog(L_ERR, "Thread %d failed waiting for semaphore: %s: Exiting\n",			       self->thread_num, strerror(errno));			break;		}		DEBUG2("Thread %d got semaphore", self->thread_num);		/*		 *	Try to grab a request from the queue.		 *		 *	It may be empty, in which case we fail		 *	gracefully.		 */		request_dequeue(&self->request, &fun);		if (!self->request) continue;		self->request->child_pid = self->pthread_id;		self->request_count++;		DEBUG2("Thread %d handling request %d, (%d handled so far)",		       self->thread_num, self->request->number,		       self->request_count);		/*		 *	Respond, and reset request->child_pid		 */		rad_respond(self->request, fun);		self->request = NULL;		/*		 *	Update the active threads.		 */		pthread_mutex_lock(&thread_pool.mutex);		rad_assert(thread_pool.active_threads > 0);		thread_pool.active_threads--;		pthread_mutex_unlock(&thread_pool.mutex);	} while (self->status != THREAD_CANCELLED);	DEBUG2("Thread %d exiting...", self->thread_num);	/*	 *  Do this as the LAST thing before exiting.	 */	self->status = THREAD_EXITED;	return NULL;}/* *	Take a THREAD_HANDLE, and delete it from the thread pool. * *	This function is called ONLY from the main server thread. */static void delete_thread(THREAD_HANDLE *handle){	THREAD_HANDLE *prev;	THREAD_HANDLE *next;	rad_assert(handle->request == NULL);	prev = handle->prev;	next = handle->next;	rad_assert(thread_pool.total_threads > 0);	thread_pool.total_threads--;	/*	 *	Remove the handle from the list.	 */	if (prev == NULL) {		rad_assert(thread_pool.head == handle);		thread_pool.head = next;	} else {		prev->next = next;	}	if (next == NULL) {		rad_assert(thread_pool.tail == handle);		thread_pool.tail = prev;	} else {		next->prev = prev;	}	DEBUG2("Deleting thread %d", handle->thread_num);	/*	 *	This thread has exited.  Delete any additional	 *	resources associated with it.	 */	/*	 *	Free the memory, now that we're sure the thread	 *	exited.	 */	free(handle);}/* *	Spawn a new thread, and place it in the thread pool. * *	The thread is started initially in the blocked state, waiting *	for the semaphore. */static THREAD_HANDLE *spawn_thread(time_t now){	int rcode;	THREAD_HANDLE *handle;	pthread_attr_t attr;	/*	 *	Ensure that we don't spawn too many threads.	 */	if (thread_pool.total_threads >= thread_pool.max_threads) {		DEBUG2("Thread spawn failed.  Maximum number of threads (%d) already running.", thread_pool.max_threads);		return NULL;	}	/*	 *	Allocate a new thread handle.	 */	handle = (THREAD_HANDLE *) rad_malloc(sizeof(THREAD_HANDLE));	memset(handle, 0, sizeof(THREAD_HANDLE));	handle->prev = NULL;	handle->next = NULL;	handle->pthread_id = NO_SUCH_CHILD_PID;	handle->thread_num = thread_pool.max_thread_num++;	handle->request_count = 0;	handle->status = THREAD_RUNNING;	handle->timestamp = time(NULL);	/*	 *	Initialize the thread's attributes to detached.	 *	 *	We could call pthread_detach() later, but if the thread	 *	exits between the create & detach calls, it will need to	 *	be joined, which will never happen.	 */	pthread_attr_init(&attr);	pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);	/*	 *	Create the thread detached, so that it cleans up it's	 *	own memory when it exits.	 *	 *	Note that the function returns non-zero on error, NOT	 *	-1.  The return code is the error, and errno isn't set.	 */	rcode = pthread_create(&handle->pthread_id, &attr,			request_handler_thread, handle);	if (rcode != 0) {		radlog(L_ERR|L_CONS, "FATAL: Thread create failed: %s",		       strerror(rcode));		exit(1);	}	pthread_attr_destroy(&attr);	/*	 *	One more thread to go into the list.	 */	thread_pool.total_threads++;	DEBUG2("Thread spawned new child %d. Total threads in pool: %d",			handle->thread_num, thread_pool.total_threads);	/*	 *	Add the thread handle to the tail of the thread pool list.	 */	if (thread_pool.tail) {		thread_pool.tail->next = handle;		handle->prev = thread_pool.tail;		thread_pool.tail = handle;	} else {		rad_assert(thread_pool.head == NULL);		thread_pool.head = thread_pool.tail = handle;	}	/*	 *	Update the time we last spawned a thread.	 */	thread_pool.time_last_spawned = now;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -