⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 threads.c

📁 radius服务器
💻 C
📖 第 1 页 / 共 2 页
字号:
	/*	 *	And return the new handle to the caller.	 */	return handle;}/* *      Temporary function to prevent server from executing a SIGHUP *      until all threads are finished handling requests.  This returns *      the number of active threads to 'radiusd.c'. */int total_active_threads(void){        int rcode = 0;	THREAD_HANDLE *handle;	for (handle = thread_pool.head; handle != NULL; handle = handle->next){		if (handle->request != NULL) {			rcode ++;		}	}	return (rcode);}/* *	Allocate the thread pool, and seed it with an initial number *	of threads. * *	FIXME: What to do on a SIGHUP??? */int thread_pool_init(void){	int		i, rcode;	CONF_SECTION	*pool_cf;	time_t		now;	DEBUG("Initializing the thread pool...");	now = time(NULL);	/*	 *	After a SIGHUP, we don't over-write the previous values.	 */	if (!pool_initialized) {		/*		 *	Initialize the thread pool to some reasonable values.		 */		memset(&thread_pool, 0, sizeof(THREAD_POOL));		thread_pool.head = NULL;		thread_pool.tail = NULL;		thread_pool.total_threads = 0;		thread_pool.max_thread_num = 1;		thread_pool.cleanup_delay = 5;	}	pool_cf = cf_section_find("thread");	if (pool_cf != NULL) {		cf_section_parse(pool_cf, NULL, thread_config);	}	/*	 *	Limit the maximum number of threads to the maximum	 *	number of forks we can do.	 *	 *	FIXME: Make this code better...	 */	if (thread_pool.max_threads >= NUM_FORKERS) {		thread_pool.max_threads = NUM_FORKERS;	}	/*	 *	The pool has already been initialized.  Don't spawn	 *	new threads, and don't forget about forked children,	 */	if (pool_initialized) {		return 0;	}	/*	 *	Initialize the queue of requests.	 */	rcode = sem_init(&thread_pool.semaphore, 0, SEMAPHORE_LOCKED);	if (rcode != 0) {		radlog(L_ERR|L_CONS, "FATAL: Failed to initialize semaphore: %s",		       strerror(errno));		exit(1);	}	rcode = pthread_mutex_init(&thread_pool.mutex,NULL);	if (rcode != 0) {		radlog(L_ERR, "FATAL: Failed to initialize mutex: %s",		       strerror(errno));		exit(1);	}	/*	 *	Queue head & tail are set to zero by the memset,	 *	above.	 *	 *	Allocate an initial queue, always as a power of 2.	 */	thread_pool.queue_size = 256;	thread_pool.queue = rad_malloc(sizeof(*thread_pool.queue) *				       thread_pool.queue_size);	memset(thread_pool.queue, 0, (sizeof(*thread_pool.queue) *				      thread_pool.queue_size));	/*	 *	Create a number of waiting threads.	 *	 *	If we fail while creating them, do something intelligent.	 */	for (i = 0; i < thread_pool.start_threads; i++) {		if (spawn_thread(now) == NULL) {			return -1;		}	}	DEBUG2("Thread pool initialized");	pool_initialized = TRUE;	return 0;}/* *	Assign a new request to a free thread. * *	If there isn't a free thread, then try to create a new one, *	up to the configured limits. */int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun){	/*	 *	If the thread pool is busy handling requests, then	 *	try to spawn another one.	 */	if (thread_pool.active_threads == thread_pool.total_threads) {		if (spawn_thread(request->timestamp) == NULL) {			radlog(L_INFO,			       "The maximum number of threads (%d) are active, cannot spawn new thread to handle request",			       thread_pool.max_threads);			return 0;		}	}	/*	 *	Add the new request to the queue.	 */	request_enqueue(request, fun);	return 1;}/* *	Check the min_spare_threads and max_spare_threads. * *	If there are too many or too few threads waiting, then we *	either create some more, or delete some. */int thread_pool_clean(time_t now){	int spare;	int i, total;	THREAD_HANDLE *handle, *next;	int active_threads;	static time_t last_cleaned = 0;	/*	 *	Loop over the thread pool deleting exited threads.	 */	for (handle = thread_pool.head; handle; handle = next) {		next = handle->next;		/*		 *	Maybe we've asked the thread to exit, and it		 *	has agreed.		 */		if (handle->status == THREAD_EXITED) {			delete_thread(handle);		}	}	/*	 *	We don't need a mutex lock here, as we're reading	 *	the location, and not modifying it.  We want a close	 *	approximation of the number of active threads, and this	 *	is good enough.	 */	active_threads = thread_pool.active_threads;	spare = thread_pool.total_threads - active_threads;	if (debug_flag) {		static int old_total = -1;		static int old_active = -1;		if ((old_total != thread_pool.total_threads) ||				(old_active != active_threads)) {			DEBUG2("Threads: total/active/spare threads = %d/%d/%d",					thread_pool.total_threads, active_threads, spare);			old_total = thread_pool.total_threads;			old_active = active_threads;		}	}	/*	 *	If there are too few spare threads, create some more.	 */	if (spare < thread_pool.min_spare_threads) {		total = thread_pool.min_spare_threads - spare;		DEBUG2("Threads: Spawning %d spares", total);		/*		 *	Create a number of spare threads.		 */		for (i = 0; i < total; i++) {			handle = spawn_thread(now);			if (handle == NULL) {				return -1;			}		}		/*		 *	And exit, as there can't be too many spare threads.		 */		return 0;	}	/*	 *	Only delete spare threads if we haven't already done	 *	so this second.	 */	if (now == last_cleaned) {		return 0;	}	last_cleaned = now;	/*	 *	Only delete the spare threads if sufficient time has	 *	passed since we last created one.  This helps to minimize	 *	the amount of create/delete cycles.	 */	if ((now - thread_pool.time_last_spawned) < thread_pool.cleanup_delay) {		return 0;	}	/*	 *	If there are too many spare threads, delete one.	 *	 *	Note that we only delete ONE at a time, instead of	 *	wiping out many.  This allows the excess servers to	 *	be slowly reaped, just in case the load spike comes again.	 */	if (spare > thread_pool.max_spare_threads) {		spare -= thread_pool.max_spare_threads;		DEBUG2("Threads: deleting 1 spare out of %d spares", spare);		/*		 *	Walk through the thread pool, deleting the		 *	first idle thread we come across.		 */		for (handle = thread_pool.head; (handle != NULL) && (spare > 0) ; handle = next) {			next = handle->next;			/*			 *	If the thread is not handling a			 *	request, but still live, then tell it			 *	to exit.			 *			 *	It will eventually wake up, and realize			 *	it's been told to commit suicide.			 */			if ((handle->request == NULL) &&			    (handle->status == THREAD_RUNNING)) {				handle->status = THREAD_CANCELLED;				/*				 *	Post an extra semaphore, as a				 *	signal to wake up, and exit.				 */				sem_post(&thread_pool.semaphore);				spare--;				break;			}		}	}	/*	 *	If the thread has handled too many requests, then make it	 *	exit.	 */	if (thread_pool.max_requests_per_thread > 0) {		for (handle = thread_pool.head; handle; handle = next) {			next = handle->next;			/*			 *	Not handling a request, but otherwise			 *	live, we can kill it.			 */			if ((handle->request == NULL) &&			    (handle->status == THREAD_RUNNING) &&			    (handle->request_count > thread_pool.max_requests_per_thread)) {				handle->status = THREAD_CANCELLED;				sem_post(&thread_pool.semaphore);			}		}	}	/*	 *	Otherwise everything's kosher.  There are not too few,	 *	or too many spare threads.  Exit happily.	 */	return 0;}static int exec_initialized = FALSE;/* *	Initialize the stuff for keeping track of child processes. */void rad_exec_init(void){	int i;	/*	 *	Initialize the mutex used to remember calls to fork.	 */	pthread_mutex_init(&fork_mutex, NULL);	/*	 *	Initialize the data structure where we remember the	 *	mappings of thread ID && child PID to exit status.	 */	for (i = 0; i < NUM_FORKERS; i++) {		forkers[i].thread_id = NO_SUCH_CHILD_PID;		forkers[i].child_pid = -1;		forkers[i].status = 0;	}	exec_initialized = TRUE;}/* *	We use the PID number as a base for the array index, so that *	we can quickly turn the PID into a free array entry, instead *	of rooting blindly through the entire array. */#define PID_2_ARRAY(pid) (((int) pid ) & (NUM_FORKERS - 1))/* *	Thread wrapper for fork(). */pid_t rad_fork(int exec_wait){	sigset_t set;	pid_t child_pid;	/*	 *	The thread is NOT interested in waiting for the exit	 *	status of the child process, so we don't bother	 *	updating our kludgy array.	 *	 *	Or, there no NO threads, so we can just do the fork	 *	thing.	 */	if (!exec_wait || !exec_initialized) {		return fork();	}	/*	 *	Block SIGCLHD until such time as we've saved the PID.	 *	 *	Note that we block SIGCHLD for ALL threads associated	 *	with this process!  This is to prevent race conditions!	 */	sigemptyset(&set);	sigaddset(&set, SIGCHLD);	sigprocmask(SIG_BLOCK, &set, NULL);	/*	 *	Do the fork.	 */	child_pid = fork();	/*	 *	We managed to fork.  Let's see if we have a free	 *	array entry.	 */	if (child_pid > 0) { /* parent */		int i;		int found;		time_t now = time(NULL);		/*		 *	We store the information in the array		 *	indexed by PID.  This means that we have		 *	on average an O(1) lookup to find the element,		 *	instead of rooting through the entire array.		 */		i = PID_2_ARRAY(child_pid);		found = -1;		/*		 *	We may have multiple threads trying to find an		 *	empty position, so we lock the array until		 *	we've found an entry.		 */		pthread_mutex_lock(&fork_mutex);		do {			if (forkers[i].thread_id == NO_SUCH_CHILD_PID) {				found = i;				break;			}			/*			 *	Clean up any stale forked sessions.			 *			 *	This sometimes happens, for crazy reasons.			 */			if ((now - forkers[i].time_forked) > 30) {				forkers[i].thread_id = NO_SUCH_CHILD_PID;				/*				 *	Grab the child's exit condition,				 *	just in case...				 */				waitpid(forkers[i].child_pid,					&forkers[i].status,					WNOHANG);				sem_destroy(&forkers[i].child_done);				found = i;				break;			}			/*			 *  Increment it, within the array.			 */			i++;			i &= (NUM_FORKERS - 1);		} while (i != PID_2_ARRAY(child_pid));		pthread_mutex_unlock(&fork_mutex);		/*		 *	Arg.  We did a fork, and there was nowhere to		 *	put the answer.		 */		if (found < 0) {			return (pid_t) -1;		}		/*		 *	In the parent, set the status, and create the		 *	semaphore.		 */		forkers[found].status = -1;		forkers[found].child_pid = child_pid;		forkers[i].thread_id = pthread_self();		forkers[i].time_forked = now;		sem_init(&forkers[found].child_done, 0, SEMAPHORE_LOCKED);	}	/*	 *	Unblock SIGCHLD, now that there's no chance of bad entries	 *	in the array.	 */	sigprocmask(SIG_UNBLOCK, &set, NULL);	/*	 *	Return whatever we were told.	 */	return child_pid;}/* *	Thread wrapper for waitpid(), so threads can wait for *	the PID they forked. */pid_t rad_waitpid(pid_t pid, int *status, int options){	int i, rcode;	int found;	pthread_t self = pthread_self();	/*	 *	We're only allowed to wait for a SPECIFIC pid.	 */	if (pid <= 0) {		return -1;	}	/*	 *	Find the PID to wait for, starting at an index within	 *	the array.  This makes the lookups O(1) on average,	 *	instead of O(n), when the array is filling up.	 */	found = -1;	i = PID_2_ARRAY(pid);	do {		/*		 *	We were the ones who forked this specific		 *	child.		 */		if ((forkers[i].thread_id == self) &&		    (forkers[i].child_pid == pid)) {			found = i;			break;		}		i++;		i &= (NUM_FORKERS - 1);	} while (i != PID_2_ARRAY(pid));	/*	 *	No thread ID found: we're trying to wait for a child	 *	we've never forked!	 */	if (found < 0) {		return -1;	}	/*	 *	Wait for the signal that the child's status has been	 *	returned.	 */	if (options == WNOHANG) {		rcode = sem_trywait(&forkers[found].child_done);		if (rcode != 0) {			return 0; /* no child available */		}	} else {		/* wait forever */	re_wait:		rcode = sem_wait(&forkers[found].child_done);		if ((rcode != 0) && (errno == EINTR)) {			goto re_wait;		}	}	/*	 *	We've got the semaphore.  Now destroy it.	 *	 *	FIXME: Maybe we want to set up the semaphores in advance,	 *	to prevent the creation && deletion of lots of them,	 *	if creating and deleting them is expensive.	 */	sem_destroy(&forkers[found].child_done);	/*	 *	Save the status BEFORE we re-set the thread ID.	 */	*status = forkers[found].status;	/*	 *	This next line taints the other array entries,	 *	due to other threads re-using the data structure.	 */	forkers[found].thread_id = NO_SUCH_CHILD_PID;	return pid;}/* *	Called by the main signal handler, to save the status of the child */int rad_savepid(pid_t pid, int status){	int i;	/*	 *	Find the PID to wait for, starting at an index within	 *	the array.  This makes the lookups O(1) on average,	 *	instead of O(n), when the array is filling up.	 */	i = PID_2_ARRAY(pid);	/*	 *	Do NOT lock the array, as nothing else sets the	 *	status and posts the semaphore.	 */	do {		/*		 *	Any thread can get the sigchild...		 */		if ((forkers[i].thread_id != NO_SUCH_CHILD_PID) &&		    (forkers[i].child_pid == pid)) {			/*			 *	Save the status, THEN post the			 *	semaphore.			 */			forkers[i].status = status;			sem_post(&forkers[i].child_done);			/*			 *	FIXME: If the child is more than 60			 *	seconds out of date, then delete it.			 *			 *	That is, we've forked, and the forker			 *	is waiting nearly forever			 */			return 0;		}		i++;		i &= (NUM_FORKERS - 1);	} while (i != PID_2_ARRAY(pid));	return -1;}#endif

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -