📄 threads.c
字号:
/* * And return the new handle to the caller. */ return handle;}/* * Temporary function to prevent server from executing a SIGHUP * until all threads are finished handling requests. This returns * the number of active threads to 'radiusd.c'. */int total_active_threads(void){ int rcode = 0; THREAD_HANDLE *handle; for (handle = thread_pool.head; handle != NULL; handle = handle->next){ if (handle->request != NULL) { rcode ++; } } return (rcode);}/* * Allocate the thread pool, and seed it with an initial number * of threads. * * FIXME: What to do on a SIGHUP??? */int thread_pool_init(void){ int i, rcode; CONF_SECTION *pool_cf; time_t now; DEBUG("Initializing the thread pool..."); now = time(NULL); /* * After a SIGHUP, we don't over-write the previous values. */ if (!pool_initialized) { /* * Initialize the thread pool to some reasonable values. */ memset(&thread_pool, 0, sizeof(THREAD_POOL)); thread_pool.head = NULL; thread_pool.tail = NULL; thread_pool.total_threads = 0; thread_pool.max_thread_num = 1; thread_pool.cleanup_delay = 5; } pool_cf = cf_section_find("thread"); if (pool_cf != NULL) { cf_section_parse(pool_cf, NULL, thread_config); } /* * Limit the maximum number of threads to the maximum * number of forks we can do. * * FIXME: Make this code better... */ if (thread_pool.max_threads >= NUM_FORKERS) { thread_pool.max_threads = NUM_FORKERS; } /* * The pool has already been initialized. Don't spawn * new threads, and don't forget about forked children, */ if (pool_initialized) { return 0; } /* * Initialize the queue of requests. */ rcode = sem_init(&thread_pool.semaphore, 0, SEMAPHORE_LOCKED); if (rcode != 0) { radlog(L_ERR|L_CONS, "FATAL: Failed to initialize semaphore: %s", strerror(errno)); exit(1); } rcode = pthread_mutex_init(&thread_pool.mutex,NULL); if (rcode != 0) { radlog(L_ERR, "FATAL: Failed to initialize mutex: %s", strerror(errno)); exit(1); } /* * Queue head & tail are set to zero by the memset, * above. * * Allocate an initial queue, always as a power of 2. */ thread_pool.queue_size = 256; thread_pool.queue = rad_malloc(sizeof(*thread_pool.queue) * thread_pool.queue_size); memset(thread_pool.queue, 0, (sizeof(*thread_pool.queue) * thread_pool.queue_size)); /* * Create a number of waiting threads. * * If we fail while creating them, do something intelligent. */ for (i = 0; i < thread_pool.start_threads; i++) { if (spawn_thread(now) == NULL) { return -1; } } DEBUG2("Thread pool initialized"); pool_initialized = TRUE; return 0;}/* * Assign a new request to a free thread. * * If there isn't a free thread, then try to create a new one, * up to the configured limits. */int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun){ /* * If the thread pool is busy handling requests, then * try to spawn another one. */ if (thread_pool.active_threads == thread_pool.total_threads) { if (spawn_thread(request->timestamp) == NULL) { radlog(L_INFO, "The maximum number of threads (%d) are active, cannot spawn new thread to handle request", thread_pool.max_threads); return 0; } } /* * Add the new request to the queue. */ request_enqueue(request, fun); return 1;}/* * Check the min_spare_threads and max_spare_threads. * * If there are too many or too few threads waiting, then we * either create some more, or delete some. */int thread_pool_clean(time_t now){ int spare; int i, total; THREAD_HANDLE *handle, *next; int active_threads; static time_t last_cleaned = 0; /* * Loop over the thread pool deleting exited threads. */ for (handle = thread_pool.head; handle; handle = next) { next = handle->next; /* * Maybe we've asked the thread to exit, and it * has agreed. */ if (handle->status == THREAD_EXITED) { delete_thread(handle); } } /* * We don't need a mutex lock here, as we're reading * the location, and not modifying it. We want a close * approximation of the number of active threads, and this * is good enough. */ active_threads = thread_pool.active_threads; spare = thread_pool.total_threads - active_threads; if (debug_flag) { static int old_total = -1; static int old_active = -1; if ((old_total != thread_pool.total_threads) || (old_active != active_threads)) { DEBUG2("Threads: total/active/spare threads = %d/%d/%d", thread_pool.total_threads, active_threads, spare); old_total = thread_pool.total_threads; old_active = active_threads; } } /* * If there are too few spare threads, create some more. */ if (spare < thread_pool.min_spare_threads) { total = thread_pool.min_spare_threads - spare; DEBUG2("Threads: Spawning %d spares", total); /* * Create a number of spare threads. */ for (i = 0; i < total; i++) { handle = spawn_thread(now); if (handle == NULL) { return -1; } } /* * And exit, as there can't be too many spare threads. */ return 0; } /* * Only delete spare threads if we haven't already done * so this second. */ if (now == last_cleaned) { return 0; } last_cleaned = now; /* * Only delete the spare threads if sufficient time has * passed since we last created one. This helps to minimize * the amount of create/delete cycles. */ if ((now - thread_pool.time_last_spawned) < thread_pool.cleanup_delay) { return 0; } /* * If there are too many spare threads, delete one. * * Note that we only delete ONE at a time, instead of * wiping out many. This allows the excess servers to * be slowly reaped, just in case the load spike comes again. */ if (spare > thread_pool.max_spare_threads) { spare -= thread_pool.max_spare_threads; DEBUG2("Threads: deleting 1 spare out of %d spares", spare); /* * Walk through the thread pool, deleting the * first idle thread we come across. */ for (handle = thread_pool.head; (handle != NULL) && (spare > 0) ; handle = next) { next = handle->next; /* * If the thread is not handling a * request, but still live, then tell it * to exit. * * It will eventually wake up, and realize * it's been told to commit suicide. */ if ((handle->request == NULL) && (handle->status == THREAD_RUNNING)) { handle->status = THREAD_CANCELLED; /* * Post an extra semaphore, as a * signal to wake up, and exit. */ sem_post(&thread_pool.semaphore); spare--; break; } } } /* * If the thread has handled too many requests, then make it * exit. */ if (thread_pool.max_requests_per_thread > 0) { for (handle = thread_pool.head; handle; handle = next) { next = handle->next; /* * Not handling a request, but otherwise * live, we can kill it. */ if ((handle->request == NULL) && (handle->status == THREAD_RUNNING) && (handle->request_count > thread_pool.max_requests_per_thread)) { handle->status = THREAD_CANCELLED; sem_post(&thread_pool.semaphore); } } } /* * Otherwise everything's kosher. There are not too few, * or too many spare threads. Exit happily. */ return 0;}static int exec_initialized = FALSE;/* * Initialize the stuff for keeping track of child processes. */void rad_exec_init(void){ int i; /* * Initialize the mutex used to remember calls to fork. */ pthread_mutex_init(&fork_mutex, NULL); /* * Initialize the data structure where we remember the * mappings of thread ID && child PID to exit status. */ for (i = 0; i < NUM_FORKERS; i++) { forkers[i].thread_id = NO_SUCH_CHILD_PID; forkers[i].child_pid = -1; forkers[i].status = 0; } exec_initialized = TRUE;}/* * We use the PID number as a base for the array index, so that * we can quickly turn the PID into a free array entry, instead * of rooting blindly through the entire array. */#define PID_2_ARRAY(pid) (((int) pid ) & (NUM_FORKERS - 1))/* * Thread wrapper for fork(). */pid_t rad_fork(int exec_wait){ sigset_t set; pid_t child_pid; /* * The thread is NOT interested in waiting for the exit * status of the child process, so we don't bother * updating our kludgy array. * * Or, there no NO threads, so we can just do the fork * thing. */ if (!exec_wait || !exec_initialized) { return fork(); } /* * Block SIGCLHD until such time as we've saved the PID. * * Note that we block SIGCHLD for ALL threads associated * with this process! This is to prevent race conditions! */ sigemptyset(&set); sigaddset(&set, SIGCHLD); sigprocmask(SIG_BLOCK, &set, NULL); /* * Do the fork. */ child_pid = fork(); /* * We managed to fork. Let's see if we have a free * array entry. */ if (child_pid > 0) { /* parent */ int i; int found; time_t now = time(NULL); /* * We store the information in the array * indexed by PID. This means that we have * on average an O(1) lookup to find the element, * instead of rooting through the entire array. */ i = PID_2_ARRAY(child_pid); found = -1; /* * We may have multiple threads trying to find an * empty position, so we lock the array until * we've found an entry. */ pthread_mutex_lock(&fork_mutex); do { if (forkers[i].thread_id == NO_SUCH_CHILD_PID) { found = i; break; } /* * Clean up any stale forked sessions. * * This sometimes happens, for crazy reasons. */ if ((now - forkers[i].time_forked) > 30) { forkers[i].thread_id = NO_SUCH_CHILD_PID; /* * Grab the child's exit condition, * just in case... */ waitpid(forkers[i].child_pid, &forkers[i].status, WNOHANG); sem_destroy(&forkers[i].child_done); found = i; break; } /* * Increment it, within the array. */ i++; i &= (NUM_FORKERS - 1); } while (i != PID_2_ARRAY(child_pid)); pthread_mutex_unlock(&fork_mutex); /* * Arg. We did a fork, and there was nowhere to * put the answer. */ if (found < 0) { return (pid_t) -1; } /* * In the parent, set the status, and create the * semaphore. */ forkers[found].status = -1; forkers[found].child_pid = child_pid; forkers[i].thread_id = pthread_self(); forkers[i].time_forked = now; sem_init(&forkers[found].child_done, 0, SEMAPHORE_LOCKED); } /* * Unblock SIGCHLD, now that there's no chance of bad entries * in the array. */ sigprocmask(SIG_UNBLOCK, &set, NULL); /* * Return whatever we were told. */ return child_pid;}/* * Thread wrapper for waitpid(), so threads can wait for * the PID they forked. */pid_t rad_waitpid(pid_t pid, int *status, int options){ int i, rcode; int found; pthread_t self = pthread_self(); /* * We're only allowed to wait for a SPECIFIC pid. */ if (pid <= 0) { return -1; } /* * Find the PID to wait for, starting at an index within * the array. This makes the lookups O(1) on average, * instead of O(n), when the array is filling up. */ found = -1; i = PID_2_ARRAY(pid); do { /* * We were the ones who forked this specific * child. */ if ((forkers[i].thread_id == self) && (forkers[i].child_pid == pid)) { found = i; break; } i++; i &= (NUM_FORKERS - 1); } while (i != PID_2_ARRAY(pid)); /* * No thread ID found: we're trying to wait for a child * we've never forked! */ if (found < 0) { return -1; } /* * Wait for the signal that the child's status has been * returned. */ if (options == WNOHANG) { rcode = sem_trywait(&forkers[found].child_done); if (rcode != 0) { return 0; /* no child available */ } } else { /* wait forever */ re_wait: rcode = sem_wait(&forkers[found].child_done); if ((rcode != 0) && (errno == EINTR)) { goto re_wait; } } /* * We've got the semaphore. Now destroy it. * * FIXME: Maybe we want to set up the semaphores in advance, * to prevent the creation && deletion of lots of them, * if creating and deleting them is expensive. */ sem_destroy(&forkers[found].child_done); /* * Save the status BEFORE we re-set the thread ID. */ *status = forkers[found].status; /* * This next line taints the other array entries, * due to other threads re-using the data structure. */ forkers[found].thread_id = NO_SUCH_CHILD_PID; return pid;}/* * Called by the main signal handler, to save the status of the child */int rad_savepid(pid_t pid, int status){ int i; /* * Find the PID to wait for, starting at an index within * the array. This makes the lookups O(1) on average, * instead of O(n), when the array is filling up. */ i = PID_2_ARRAY(pid); /* * Do NOT lock the array, as nothing else sets the * status and posts the semaphore. */ do { /* * Any thread can get the sigchild... */ if ((forkers[i].thread_id != NO_SUCH_CHILD_PID) && (forkers[i].child_pid == pid)) { /* * Save the status, THEN post the * semaphore. */ forkers[i].status = status; sem_post(&forkers[i].child_done); /* * FIXME: If the child is more than 60 * seconds out of date, then delete it. * * That is, we've forked, and the forker * is waiting nearly forever */ return 0; } i++; i &= (NUM_FORKERS - 1); } while (i != PID_2_ARRAY(pid)); return -1;}#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -