📄 threads.c
字号:
int rcode; THREAD_HANDLE *handle; pthread_attr_t attr; /* * Ensure that we don't spawn too many threads. */ if (thread_pool.total_threads >= thread_pool.max_threads) { DEBUG2("Thread spawn failed. Maximum number of threads (%d) already running.", thread_pool.max_threads); return NULL; } /* * Allocate a new thread handle. */ handle = (THREAD_HANDLE *) rad_malloc(sizeof(THREAD_HANDLE)); memset(handle, 0, sizeof(THREAD_HANDLE)); handle->prev = NULL; handle->next = NULL; handle->pthread_id = NO_SUCH_CHILD_PID; handle->thread_num = thread_pool.max_thread_num++; handle->request_count = 0; handle->status = THREAD_RUNNING; handle->timestamp = time(NULL); /* * Initialize the thread's attributes to detached. * * We could call pthread_detach() later, but if the thread * exits between the create & detach calls, it will need to * be joined, which will never happen. */ pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); /* * Create the thread detached, so that it cleans up it's * own memory when it exits. * * Note that the function returns non-zero on error, NOT * -1. The return code is the error, and errno isn't set. */ rcode = pthread_create(&handle->pthread_id, &attr, request_handler_thread, handle); if (rcode != 0) { radlog(L_ERR, "Thread create failed: %s", strerror(rcode)); return NULL; } pthread_attr_destroy(&attr); /* * One more thread to go into the list. */ thread_pool.total_threads++; DEBUG2("Thread spawned new child %d. Total threads in pool: %d", handle->thread_num, thread_pool.total_threads); /* * Add the thread handle to the tail of the thread pool list. */ if (thread_pool.tail) { thread_pool.tail->next = handle; handle->prev = thread_pool.tail; thread_pool.tail = handle; } else { rad_assert(thread_pool.head == NULL); thread_pool.head = thread_pool.tail = handle; } /* * Update the time we last spawned a thread. */ thread_pool.time_last_spawned = now; /* * And return the new handle to the caller. */ return handle;}/* * Temporary function to prevent server from executing a SIGHUP * until all threads are finished handling requests. This returns * the number of active threads to 'radiusd.c'. */int total_active_threads(void){ /* * We don't acquire the mutex, so this is just an estimate. * We can't return with the lock held, so there's no point * in getting the guaranteed correct value; by the time * the caller sees it, it can be wrong again. */ return thread_pool.active_threads;}static uint32_t pid_hash(const void *data){ const thread_fork_t *tf = data; return fr_hash(&tf->pid, sizeof(tf->pid));}static int pid_cmp(const void *one, const void *two){ const thread_fork_t *a = one; const thread_fork_t *b = two; return (a->pid - b->pid);}/* * Allocate the thread pool, and seed it with an initial number * of threads. * * FIXME: What to do on a SIGHUP??? */int thread_pool_init(CONF_SECTION *cs, int spawn_flag){ int i, rcode; CONF_SECTION *pool_cf; time_t now; now = time(NULL); /* * We're not spawning new threads, don't do * anything. */ if (!spawn_flag) return 0; /* * After a SIGHUP, we don't over-write the previous values. */ if (!pool_initialized) { /* * Initialize the thread pool to some reasonable values. */ memset(&thread_pool, 0, sizeof(THREAD_POOL)); thread_pool.head = NULL; thread_pool.tail = NULL; thread_pool.total_threads = 0; thread_pool.max_thread_num = 1; thread_pool.cleanup_delay = 5; thread_pool.spawn_flag = spawn_flag; if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) { radlog(L_ERR, "FATAL: Failed to initialize wait mutex: %s", strerror(errno)); return -1; } /* * Create the hash table of child PID's */ thread_pool.waiters = fr_hash_table_create(pid_hash, pid_cmp, free); if (!thread_pool.waiters) { radlog(L_ERR, "FATAL: Failed to set up wait hash"); return -1; } } pool_cf = cf_subsection_find_next(cs, NULL, "thread"); if (!pool_cf) { radlog(L_ERR, "FATAL: Attempting to start in multi-threaded mode with no thread configuration in radiusd.conf"); return -1; } if (cf_section_parse(pool_cf, NULL, thread_config) < 0) { return -1; } /* * Catch corner cases. */ if (thread_pool.min_spare_threads < 1) thread_pool.min_spare_threads = 1; if (thread_pool.max_spare_threads < 1) thread_pool.max_spare_threads = 1; if (thread_pool.max_spare_threads < thread_pool.min_spare_threads) thread_pool.max_spare_threads = thread_pool.min_spare_threads; /* * The pool has already been initialized. Don't spawn * new threads, and don't forget about forked children, */ if (pool_initialized) { return 0; } /* * Initialize the queue of requests. */ memset(&thread_pool.semaphore, 0, sizeof(thread_pool.semaphore)); rcode = sem_init(&thread_pool.semaphore, 0, SEMAPHORE_LOCKED); if (rcode != 0) { radlog(L_ERR, "FATAL: Failed to initialize semaphore: %s", strerror(errno)); return -1; } rcode = pthread_mutex_init(&thread_pool.queue_mutex,NULL); if (rcode != 0) { radlog(L_ERR, "FATAL: Failed to initialize queue mutex: %s", strerror(errno)); return -1; } /* * Allocate multiple fifos. */ for (i = 0; i < RAD_LISTEN_MAX; i++) { thread_pool.fifo[i] = fr_fifo_create(65536, NULL); if (!thread_pool.fifo[i]) { radlog(L_ERR, "FATAL: Failed to set up request fifo"); return -1; } }#ifdef HAVE_OPENSSL_CRYPTO_H /* * If we're linking with OpenSSL too, then we need * to set up the mutexes and enable the thread callbacks. */ if (!setup_ssl_mutexes()) { radlog(L_ERR, "FATAL: Failed to set up SSL mutexes"); return -1; }#endif /* * Create a number of waiting threads. * * If we fail while creating them, do something intelligent. */ for (i = 0; i < thread_pool.start_threads; i++) { if (spawn_thread(now) == NULL) { return -1; } } DEBUG2("Thread pool initialized"); pool_initialized = TRUE; return 0;}/* * Assign a new request to a free thread. * * If there isn't a free thread, then try to create a new one, * up to the configured limits. */int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun){ time_t now = request->timestamp; /* * We've been told not to spawn threads, so don't. */ if (!thread_pool.spawn_flag) { radius_handle_request(request, fun); /* * Requests that care about child process exit * codes have already either called * rad_waitpid(), or they've given up. */ wait(NULL); return 1; } /* * Add the new request to the queue. */ if (!request_enqueue(request, fun)) return 0; /* * If we haven't checked the number of child threads * in a while, OR if the thread pool appears to be full, * go manage it. */ if ((last_cleaned < now) || (thread_pool.active_threads == thread_pool.total_threads)) { thread_pool_manage(now); } return 1;}/* * Check the min_spare_threads and max_spare_threads. * * If there are too many or too few threads waiting, then we * either create some more, or delete some. */static void thread_pool_manage(time_t now){ int spare; int i, total; THREAD_HANDLE *handle, *next; int active_threads; /* * We don't need a mutex lock here, as we're reading * active_threads, and not modifying it. We want a close * approximation of the number of active threads, and this * is good enough. */ active_threads = thread_pool.active_threads; spare = thread_pool.total_threads - active_threads; if (debug_flag) { static int old_total = -1; static int old_active = -1; if ((old_total != thread_pool.total_threads) || (old_active != active_threads)) { DEBUG2("Threads: total/active/spare threads = %d/%d/%d", thread_pool.total_threads, active_threads, spare); old_total = thread_pool.total_threads; old_active = active_threads; } } /* * If there are too few spare threads. Go create some more. */ if (spare < thread_pool.min_spare_threads) { total = thread_pool.min_spare_threads - spare; DEBUG2("Threads: Spawning %d spares", total); /* * Create a number of spare threads. */ for (i = 0; i < total; i++) { handle = spawn_thread(now); if (handle == NULL) { return; } } return; /* there aren't too many spare threads */ } /* * Only delete spare threads if we haven't already done * so this second. */ if (now == last_cleaned) { return; } last_cleaned = now; /* * Loop over the thread pool, deleting exited threads. */ for (handle = thread_pool.head; handle; handle = next) { next = handle->next; /* * Maybe we've asked the thread to exit, and it * has agreed. */ if (handle->status == THREAD_EXITED) { delete_thread(handle); } } /* * Only delete the spare threads if sufficient time has * passed since we last created one. This helps to minimize * the amount of create/delete cycles. */ if ((now - thread_pool.time_last_spawned) < thread_pool.cleanup_delay) { return; } /* * If there are too many spare threads, delete one. * * Note that we only delete ONE at a time, instead of * wiping out many. This allows the excess servers to * be slowly reaped, just in case the load spike comes again. */ if (spare > thread_pool.max_spare_threads) { spare -= thread_pool.max_spare_threads; DEBUG2("Threads: deleting 1 spare out of %d spares", spare); /* * Walk through the thread pool, deleting the * first idle thread we come across. */ for (handle = thread_pool.head; (handle != NULL) && (spare > 0) ; handle = next) { next = handle->next; /* * If the thread is not handling a * request, but still live, then tell it * to exit. * * It will eventually wake up, and realize * it's been told to commit suicide. */ if ((handle->request == NULL) && (handle->status == THREAD_RUNNING)) { handle->status = THREAD_CANCELLED; /* * Post an extra semaphore, as a * signal to wake up, and exit. */ sem_post(&thread_pool.semaphore); spare--; break; } } } /* * If the thread has handled too many requests, then make it * exit. */ if (thread_pool.max_requests_per_thread > 0) { for (handle = thread_pool.head; handle; handle = next) { next = handle->next; /* * Not handling a request, but otherwise * live, we can kill it. */ if ((handle->request == NULL) && (handle->status == THREAD_RUNNING) && (handle->request_count > thread_pool.max_requests_per_thread)) { handle->status = THREAD_CANCELLED; sem_post(&thread_pool.semaphore); } } } /* * Otherwise everything's kosher. There are not too few, * or too many spare threads. Exit happily. */ return;}/* * Thread wrapper for fork(). */pid_t rad_fork(void){ pid_t child_pid; if (!pool_initialized) return fork(); reap_children(); /* be nice to non-wait thingies */ if (fr_hash_table_num_elements(thread_pool.waiters) >= 1024) { return -1; } /* * Fork & save the PID for later reaping. */ child_pid = fork(); if (child_pid > 0) { int rcode; thread_fork_t *tf; tf = rad_malloc(sizeof(*tf)); memset(tf, 0, sizeof(*tf)); tf->pid = child_pid; pthread_mutex_lock(&thread_pool.wait_mutex); rcode = fr_hash_table_insert(thread_pool.waiters, tf); pthread_mutex_unlock(&thread_pool.wait_mutex); if (!rcode) { radlog(L_ERR, "Failed to store PID, creating what will be a zombie process %d", (int) child_pid); } } /* * Return whatever we were told. */ return child_pid;}/* * Wait 10 seconds at most for a child to exit, then give up. */pid_t rad_waitpid(pid_t pid, int *status){ int i; thread_fork_t mytf, *tf; if (!pool_initialized) return waitpid(pid, status, 0); if (pid <= 0) return -1; mytf.pid = pid; pthread_mutex_lock(&thread_pool.wait_mutex); tf = fr_hash_table_finddata(thread_pool.waiters, &mytf); pthread_mutex_unlock(&thread_pool.wait_mutex); if (!tf) return -1; for (i = 0; i < 100; i++) { reap_children(); if (tf->exited) { *status = tf->status; pthread_mutex_lock(&thread_pool.wait_mutex); fr_hash_table_delete(thread_pool.waiters, &mytf); pthread_mutex_unlock(&thread_pool.wait_mutex); return pid; } usleep(100000); /* sleep for 1/10 of a second */ } /* * 10 seconds have passed, give up on the child. */ pthread_mutex_lock(&thread_pool.wait_mutex); fr_hash_table_delete(thread_pool.waiters, &mytf); pthread_mutex_unlock(&thread_pool.wait_mutex); return 0;}void thread_pool_lock(void){ pthread_mutex_lock(&thread_pool.queue_mutex);}void thread_pool_unlock(void){ pthread_mutex_unlock(&thread_pool.queue_mutex);}#elseint thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun){ radius_handle_request(request, fun); return 1;}#endif /* HAVE_PTHREAD_H */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -