📄 threads.c
字号:
*/ self->status = THREAD_EXITED; return NULL;}/* * Take a THREAD_HANDLE, and delete it from the thread pool. * * This function is called ONLY from the main server thread. */static void delete_thread(THREAD_HANDLE *handle){ THREAD_HANDLE *prev; THREAD_HANDLE *next; rad_assert(handle->request == NULL); prev = handle->prev; next = handle->next; rad_assert(thread_pool.total_threads > 0); thread_pool.total_threads--; /* * Remove the handle from the list. */ if (prev == NULL) { rad_assert(thread_pool.head == handle); thread_pool.head = next; } else { prev->next = next; } if (next == NULL) { rad_assert(thread_pool.tail == handle); thread_pool.tail = prev; } else { next->prev = prev; } DEBUG2("Deleting thread %d", handle->thread_num); /* * This thread has exited. Delete any additional * resources associated with it. */ /* * Free the memory, now that we're sure the thread * exited. */ free(handle);}/* * Spawn a new thread, and place it in the thread pool. * * The thread is started initially in the blocked state, waiting * for the semaphore. */static THREAD_HANDLE *spawn_thread(time_t now){ int rcode; THREAD_HANDLE *handle; pthread_attr_t attr; /* * Ensure that we don't spawn too many threads. */ if (thread_pool.total_threads >= thread_pool.max_threads) { DEBUG2("Thread spawn failed. Maximum number of threads (%d) already running.", thread_pool.max_threads); return NULL; } /* * Allocate a new thread handle. */ handle = (THREAD_HANDLE *) rad_malloc(sizeof(THREAD_HANDLE)); memset(handle, 0, sizeof(THREAD_HANDLE)); handle->prev = NULL; handle->next = NULL; handle->pthread_id = NO_SUCH_CHILD_PID; handle->thread_num = thread_pool.max_thread_num++; handle->request_count = 0; handle->status = THREAD_RUNNING; handle->timestamp = time(NULL); /* * Initialize the thread's attributes to detached. * * We could call pthread_detach() later, but if the thread * exits between the create & detach calls, it will need to * be joined, which will never happen. */ pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); /* * Create the thread detached, so that it cleans up it's * own memory when it exits. * * Note that the function returns non-zero on error, NOT * -1. The return code is the error, and errno isn't set. */ rcode = pthread_create(&handle->pthread_id, &attr, request_handler_thread, handle); if (rcode != 0) { radlog(L_ERR|L_CONS, "FATAL: Thread create failed: %s", strerror(rcode)); exit(1); } pthread_attr_destroy(&attr); /* * One more thread to go into the list. */ thread_pool.total_threads++; DEBUG2("Thread spawned new child %d. Total threads in pool: %d", handle->thread_num, thread_pool.total_threads); /* * Add the thread handle to the tail of the thread pool list. */ if (thread_pool.tail) { thread_pool.tail->next = handle; handle->prev = thread_pool.tail; thread_pool.tail = handle; } else { rad_assert(thread_pool.head == NULL); thread_pool.head = thread_pool.tail = handle; } /* * Update the time we last spawned a thread. */ thread_pool.time_last_spawned = now; /* * And return the new handle to the caller. */ return handle;}/* * Temporary function to prevent server from executing a SIGHUP * until all threads are finished handling requests. This returns * the number of active threads to 'radiusd.c'. */int total_active_threads(void){ int rcode = 0; THREAD_HANDLE *handle; for (handle = thread_pool.head; handle != NULL; handle = handle->next){ if (handle->request != NULL) { rcode ++; } } return (rcode);}/* * Allocate the thread pool, and seed it with an initial number * of threads. * * FIXME: What to do on a SIGHUP??? */int thread_pool_init(void){ int i, rcode; CONF_SECTION *pool_cf; time_t now; DEBUG("Initializing the thread pool..."); now = time(NULL); /* * After a SIGHUP, we don't over-write the previous values. */ if (!pool_initialized) { /* * Initialize the thread pool to some reasonable values. */ memset(&thread_pool, 0, sizeof(THREAD_POOL)); thread_pool.head = NULL; thread_pool.tail = NULL; thread_pool.total_threads = 0; thread_pool.max_thread_num = 1; thread_pool.cleanup_delay = 5; thread_pool.wait_head = thread_pool.wait_tail = 0; if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) { radlog(L_ERR, "FATAL: Failed to initialize mutex: %s", strerror(errno)); exit(1); } } pool_cf = cf_section_find("thread"); if (pool_cf != NULL) { cf_section_parse(pool_cf, NULL, thread_config); } /* * The pool has already been initialized. Don't spawn * new threads, and don't forget about forked children, */ if (pool_initialized) { return 0; } /* * Initialize the queue of requests. */ rcode = sem_init(&thread_pool.semaphore, 0, SEMAPHORE_LOCKED); if (rcode != 0) { radlog(L_ERR|L_CONS, "FATAL: Failed to initialize semaphore: %s", strerror(errno)); exit(1); } rcode = pthread_mutex_init(&thread_pool.mutex,NULL); if (rcode != 0) { radlog(L_ERR, "FATAL: Failed to initialize mutex: %s", strerror(errno)); exit(1); } /* * Queue head & tail are set to zero by the memset, * above. * * Allocate an initial queue, always as a power of 2. */ thread_pool.queue_size = 256; thread_pool.queue = rad_malloc(sizeof(*thread_pool.queue) * thread_pool.queue_size); memset(thread_pool.queue, 0, (sizeof(*thread_pool.queue) * thread_pool.queue_size)); /* * Create a number of waiting threads. * * If we fail while creating them, do something intelligent. */ for (i = 0; i < thread_pool.start_threads; i++) { if (spawn_thread(now) == NULL) { return -1; } } DEBUG2("Thread pool initialized"); pool_initialized = TRUE; return 0;}/* * Assign a new request to a free thread. * * If there isn't a free thread, then try to create a new one, * up to the configured limits. */int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun){ /* * If the thread pool is busy handling requests, then * try to spawn another one. */ if (thread_pool.active_threads == thread_pool.total_threads) { if (spawn_thread(request->timestamp) == NULL) { radlog(L_INFO, "The maximum number of threads (%d) are active, cannot spawn new thread to handle request", thread_pool.max_threads); return 0; } } /* * Add the new request to the queue. */ request_enqueue(request, fun); return 1;}/* * Check the min_spare_threads and max_spare_threads. * * If there are too many or too few threads waiting, then we * either create some more, or delete some. */int thread_pool_clean(time_t now){ int spare; int i, total; THREAD_HANDLE *handle, *next; int active_threads; static time_t last_cleaned = 0; /* * Loop over the thread pool deleting exited threads. */ for (handle = thread_pool.head; handle; handle = next) { next = handle->next; /* * Maybe we've asked the thread to exit, and it * has agreed. */ if (handle->status == THREAD_EXITED) { delete_thread(handle); } } /* * We don't need a mutex lock here, as we're reading * the location, and not modifying it. We want a close * approximation of the number of active threads, and this * is good enough. */ active_threads = thread_pool.active_threads; spare = thread_pool.total_threads - active_threads; if (debug_flag) { static int old_total = -1; static int old_active = -1; if ((old_total != thread_pool.total_threads) || (old_active != active_threads)) { DEBUG2("Threads: total/active/spare threads = %d/%d/%d", thread_pool.total_threads, active_threads, spare); old_total = thread_pool.total_threads; old_active = active_threads; } } /* * If there are too few spare threads, create some more. */ if (spare < thread_pool.min_spare_threads) { total = thread_pool.min_spare_threads - spare; DEBUG2("Threads: Spawning %d spares", total); /* * Create a number of spare threads. */ for (i = 0; i < total; i++) { handle = spawn_thread(now); if (handle == NULL) { return -1; } } /* * And exit, as there can't be too many spare threads. */ return 0; } /* * Only delete spare threads if we haven't already done * so this second. */ if (now == last_cleaned) { return 0; } last_cleaned = now; /* * Only delete the spare threads if sufficient time has * passed since we last created one. This helps to minimize * the amount of create/delete cycles. */ if ((now - thread_pool.time_last_spawned) < thread_pool.cleanup_delay) { return 0; } /* * If there are too many spare threads, delete one. * * Note that we only delete ONE at a time, instead of * wiping out many. This allows the excess servers to * be slowly reaped, just in case the load spike comes again. */ if (spare > thread_pool.max_spare_threads) { spare -= thread_pool.max_spare_threads; DEBUG2("Threads: deleting 1 spare out of %d spares", spare); /* * Walk through the thread pool, deleting the * first idle thread we come across. */ for (handle = thread_pool.head; (handle != NULL) && (spare > 0) ; handle = next) { next = handle->next; /* * If the thread is not handling a * request, but still live, then tell it * to exit. * * It will eventually wake up, and realize * it's been told to commit suicide. */ if ((handle->request == NULL) && (handle->status == THREAD_RUNNING)) { handle->status = THREAD_CANCELLED; /* * Post an extra semaphore, as a * signal to wake up, and exit. */ sem_post(&thread_pool.semaphore); spare--; break; } } } /* * If the thread has handled too many requests, then make it * exit. */ if (thread_pool.max_requests_per_thread > 0) { for (handle = thread_pool.head; handle; handle = next) { next = handle->next; /* * Not handling a request, but otherwise * live, we can kill it. */ if ((handle->request == NULL) && (handle->status == THREAD_RUNNING) && (handle->request_count > thread_pool.max_requests_per_thread)) { handle->status = THREAD_CANCELLED; sem_post(&thread_pool.semaphore); } } } /* * Otherwise everything's kosher. There are not too few, * or too many spare threads. Exit happily. */ return 0;}/* * Thread wrapper for fork(). */pid_t rad_fork(int exec_wait){ pid_t child_pid; if (exec_wait) return fork(); /* * Lock the mutex. */ pthread_mutex_lock(&thread_pool.wait_mutex); /* * No room to save the PID: die. */ if (((thread_pool.wait_tail + 1) % MAX_WAITERS) == thread_pool.wait_head) { rad_assert(0 == 1); } /* * Fork & save the PID for later reaping. */ child_pid = fork(); if (child_pid != 0) { thread_pool.wait[thread_pool.wait_tail] = child_pid; thread_pool.wait_tail++; thread_pool.wait_tail %= MAX_WAITERS; /* * Unlock the mutex. */ pthread_mutex_unlock(&thread_pool.wait_mutex); } /* * Return whatever we were told. */ return child_pid;}/* * We may not need this any more... */pid_t rad_waitpid(pid_t pid, int *status, int options){ reap_children(); /* be nice to non-wait thingies */ return waitpid(pid, status, options); }#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -