📄 sched.c
字号:
* @queue: rpc_wait_queue on which the tasks are sleeping * * Grabs queue->lock */void rpc_wake_up(struct rpc_wait_queue *queue){ struct rpc_task *task, *next; struct list_head *head; rcu_read_lock_bh(); spin_lock(&queue->lock); head = &queue->tasks[queue->maxpriority]; for (;;) { list_for_each_entry_safe(task, next, head, u.tk_wait.list) __rpc_wake_up_task(task); if (head == &queue->tasks[0]) break; head--; } spin_unlock(&queue->lock); rcu_read_unlock_bh();}/** * rpc_wake_up_status - wake up all rpc_tasks and set their status value. * @queue: rpc_wait_queue on which the tasks are sleeping * @status: status value to set * * Grabs queue->lock */void rpc_wake_up_status(struct rpc_wait_queue *queue, int status){ struct rpc_task *task, *next; struct list_head *head; rcu_read_lock_bh(); spin_lock(&queue->lock); head = &queue->tasks[queue->maxpriority]; for (;;) { list_for_each_entry_safe(task, next, head, u.tk_wait.list) { task->tk_status = status; __rpc_wake_up_task(task); } if (head == &queue->tasks[0]) break; head--; } spin_unlock(&queue->lock); rcu_read_unlock_bh();}static void __rpc_atrun(struct rpc_task *task){ rpc_wake_up_task(task);}/* * Run a task at a later time */void rpc_delay(struct rpc_task *task, unsigned long delay){ task->tk_timeout = delay; rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun);}/* * Helper to call task->tk_ops->rpc_call_prepare */static void rpc_prepare_task(struct rpc_task *task){ lock_kernel(); task->tk_ops->rpc_call_prepare(task, task->tk_calldata); unlock_kernel();}/* * Helper that calls task->tk_ops->rpc_call_done if it exists */void rpc_exit_task(struct rpc_task *task){ task->tk_action = NULL; if (task->tk_ops->rpc_call_done != NULL) { lock_kernel(); task->tk_ops->rpc_call_done(task, task->tk_calldata); unlock_kernel(); if (task->tk_action != NULL) { WARN_ON(RPC_ASSASSINATED(task)); /* Always release the RPC slot and buffer memory */ xprt_release(task); } }}EXPORT_SYMBOL(rpc_exit_task);void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata){ if (ops->rpc_release != NULL) { lock_kernel(); ops->rpc_release(calldata); unlock_kernel(); }}/* * This is the RPC `scheduler' (or rather, the finite state machine). */static void __rpc_execute(struct rpc_task *task){ int status = 0; dprintk("RPC: %5u __rpc_execute flags=0x%x\n", task->tk_pid, task->tk_flags); BUG_ON(RPC_IS_QUEUED(task)); for (;;) { /* * Garbage collection of pending timers... */ rpc_delete_timer(task); /* * Execute any pending callback. */ if (RPC_DO_CALLBACK(task)) { /* Define a callback save pointer */ void (*save_callback)(struct rpc_task *); /* * If a callback exists, save it, reset it, * call it. * The save is needed to stop from resetting * another callback set within the callback handler * - Dave */ save_callback=task->tk_callback; task->tk_callback=NULL; save_callback(task); } /* * Perform the next FSM step. * tk_action may be NULL when the task has been killed * by someone else. */ if (!RPC_IS_QUEUED(task)) { if (task->tk_action == NULL) break; task->tk_action(task); } /* * Lockless check for whether task is sleeping or not. */ if (!RPC_IS_QUEUED(task)) continue; rpc_clear_running(task); if (RPC_IS_ASYNC(task)) { /* Careful! we may have raced... */ if (RPC_IS_QUEUED(task)) return; if (rpc_test_and_set_running(task)) return; continue; } /* sync task: sleep here */ dprintk("RPC: %5u sync task going to sleep\n", task->tk_pid); /* Note: Caller should be using rpc_clnt_sigmask() */ status = out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_QUEUED, rpc_wait_bit_interruptible, TASK_INTERRUPTIBLE); if (status == -ERESTARTSYS) { /* * When a sync task receives a signal, it exits with * -ERESTARTSYS. In order to catch any callbacks that * clean up after sleeping on some queue, we don't * break the loop here, but go around once more. */ dprintk("RPC: %5u got signal\n", task->tk_pid); task->tk_flags |= RPC_TASK_KILLED; rpc_exit(task, -ERESTARTSYS); rpc_wake_up_task(task); } rpc_set_running(task); dprintk("RPC: %5u sync task resuming\n", task->tk_pid); } dprintk("RPC: %5u return %d, status %d\n", task->tk_pid, status, task->tk_status); /* Release all resources associated with the task */ rpc_release_task(task);}/* * User-visible entry point to the scheduler. * * This may be called recursively if e.g. an async NFS task updates * the attributes and finds that dirty pages must be flushed. * NOTE: Upon exit of this function the task is guaranteed to be * released. In particular note that tk_release() will have * been called, so your task memory may have been freed. */void rpc_execute(struct rpc_task *task){ rpc_set_active(task); rpc_set_running(task); __rpc_execute(task);}static void rpc_async_schedule(struct work_struct *work){ __rpc_execute(container_of(work, struct rpc_task, u.tk_work));}struct rpc_buffer { size_t len; char data[];};/** * rpc_malloc - allocate an RPC buffer * @task: RPC task that will use this buffer * @size: requested byte size * * To prevent rpciod from hanging, this allocator never sleeps, * returning NULL if the request cannot be serviced immediately. * The caller can arrange to sleep in a way that is safe for rpciod. * * Most requests are 'small' (under 2KiB) and can be serviced from a * mempool, ensuring that NFS reads and writes can always proceed, * and that there is good locality of reference for these buffers. * * In order to avoid memory starvation triggering more writebacks of * NFS requests, we avoid using GFP_KERNEL. */void *rpc_malloc(struct rpc_task *task, size_t size){ struct rpc_buffer *buf; gfp_t gfp = RPC_IS_SWAPPER(task) ? GFP_ATOMIC : GFP_NOWAIT; size += sizeof(struct rpc_buffer); if (size <= RPC_BUFFER_MAXSIZE) buf = mempool_alloc(rpc_buffer_mempool, gfp); else buf = kmalloc(size, gfp); if (!buf) return NULL; buf->len = size; dprintk("RPC: %5u allocated buffer of size %zu at %p\n", task->tk_pid, size, buf); return &buf->data;}EXPORT_SYMBOL_GPL(rpc_malloc);/** * rpc_free - free buffer allocated via rpc_malloc * @buffer: buffer to free * */void rpc_free(void *buffer){ size_t size; struct rpc_buffer *buf; if (!buffer) return; buf = container_of(buffer, struct rpc_buffer, data); size = buf->len; dprintk("RPC: freeing buffer of size %zu at %p\n", size, buf); if (size <= RPC_BUFFER_MAXSIZE) mempool_free(buf, rpc_buffer_mempool); else kfree(buf);}EXPORT_SYMBOL_GPL(rpc_free);/* * Creation and deletion of RPC task structures */void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata){ memset(task, 0, sizeof(*task)); init_timer(&task->tk_timer); task->tk_timer.data = (unsigned long) task; task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer; atomic_set(&task->tk_count, 1); task->tk_client = clnt; task->tk_flags = flags; task->tk_ops = tk_ops; if (tk_ops->rpc_call_prepare != NULL) task->tk_action = rpc_prepare_task; task->tk_calldata = calldata; INIT_LIST_HEAD(&task->tk_task); /* Initialize retry counters */ task->tk_garb_retry = 2; task->tk_cred_retry = 2; task->tk_priority = RPC_PRIORITY_NORMAL; task->tk_cookie = (unsigned long)current; /* Initialize workqueue for async tasks */ task->tk_workqueue = rpciod_workqueue; if (clnt) { kref_get(&clnt->cl_kref); if (clnt->cl_softrtry) task->tk_flags |= RPC_TASK_SOFT; if (!clnt->cl_intr) task->tk_flags |= RPC_TASK_NOINTR; } BUG_ON(task->tk_ops == NULL); /* starting timestamp */ task->tk_start = jiffies; dprintk("RPC: new task initialized, procpid %u\n", task_pid_nr(current));}static struct rpc_task *rpc_alloc_task(void){ return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS);}static void rpc_free_task(struct rcu_head *rcu){ struct rpc_task *task = container_of(rcu, struct rpc_task, u.tk_rcu); dprintk("RPC: %5u freeing task\n", task->tk_pid); mempool_free(task, rpc_task_mempool);}/* * Create a new task for the specified client. */struct rpc_task *rpc_new_task(struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata){ struct rpc_task *task; task = rpc_alloc_task(); if (!task) goto out; rpc_init_task(task, clnt, flags, tk_ops, calldata); dprintk("RPC: allocated task %p\n", task); task->tk_flags |= RPC_TASK_DYNAMIC;out: return task;}void rpc_put_task(struct rpc_task *task){ const struct rpc_call_ops *tk_ops = task->tk_ops; void *calldata = task->tk_calldata; if (!atomic_dec_and_test(&task->tk_count)) return; /* Release resources */ if (task->tk_rqstp) xprt_release(task); if (task->tk_msg.rpc_cred) rpcauth_unbindcred(task); if (task->tk_client) { rpc_release_client(task->tk_client); task->tk_client = NULL; } if (task->tk_flags & RPC_TASK_DYNAMIC) call_rcu_bh(&task->u.tk_rcu, rpc_free_task); rpc_release_calldata(tk_ops, calldata);}EXPORT_SYMBOL(rpc_put_task);static void rpc_release_task(struct rpc_task *task){#ifdef RPC_DEBUG BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID);#endif dprintk("RPC: %5u release task\n", task->tk_pid); if (!list_empty(&task->tk_task)) { struct rpc_clnt *clnt = task->tk_client; /* Remove from client task list */ spin_lock(&clnt->cl_lock); list_del(&task->tk_task); spin_unlock(&clnt->cl_lock); } BUG_ON (RPC_IS_QUEUED(task)); /* Synchronously delete any running timer */ rpc_delete_timer(task);#ifdef RPC_DEBUG task->tk_magic = 0;#endif /* Wake up anyone who is waiting for task completion */ rpc_mark_complete_task(task); rpc_put_task(task);}/* * Kill all tasks for the given client. * XXX: kill their descendants as well? */void rpc_killall_tasks(struct rpc_clnt *clnt){ struct rpc_task *rovr; if (list_empty(&clnt->cl_tasks)) return; dprintk("RPC: killing all tasks for client %p\n", clnt); /* * Spin lock all_tasks to prevent changes... */ spin_lock(&clnt->cl_lock); list_for_each_entry(rovr, &clnt->cl_tasks, tk_task) { if (! RPC_IS_ACTIVATED(rovr)) continue; if (!(rovr->tk_flags & RPC_TASK_KILLED)) { rovr->tk_flags |= RPC_TASK_KILLED; rpc_exit(rovr, -EIO); rpc_wake_up_task(rovr); } } spin_unlock(&clnt->cl_lock);}int rpciod_up(void){ return try_module_get(THIS_MODULE) ? 0 : -EINVAL;}void rpciod_down(void){ module_put(THIS_MODULE);}/* * Start up the rpciod workqueue. */static int rpciod_start(void){ struct workqueue_struct *wq; /* * Create the rpciod thread and wait for it to start. */ dprintk("RPC: creating workqueue rpciod\n"); wq = create_workqueue("rpciod"); rpciod_workqueue = wq; return rpciod_workqueue != NULL;}static void rpciod_stop(void){ struct workqueue_struct *wq = NULL; if (rpciod_workqueue == NULL) return; dprintk("RPC: destroying workqueue rpciod\n"); wq = rpciod_workqueue; rpciod_workqueue = NULL; destroy_workqueue(wq);}voidrpc_destroy_mempool(void){ rpciod_stop(); if (rpc_buffer_mempool) mempool_destroy(rpc_buffer_mempool); if (rpc_task_mempool) mempool_destroy(rpc_task_mempool); if (rpc_task_slabp) kmem_cache_destroy(rpc_task_slabp); if (rpc_buffer_slabp) kmem_cache_destroy(rpc_buffer_slabp);}intrpc_init_mempool(void){ rpc_task_slabp = kmem_cache_create("rpc_tasks", sizeof(struct rpc_task), 0, SLAB_HWCACHE_ALIGN, NULL); if (!rpc_task_slabp) goto err_nomem; rpc_buffer_slabp = kmem_cache_create("rpc_buffers", RPC_BUFFER_MAXSIZE, 0, SLAB_HWCACHE_ALIGN, NULL); if (!rpc_buffer_slabp) goto err_nomem; rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE, rpc_task_slabp); if (!rpc_task_mempool) goto err_nomem; rpc_buffer_mempool = mempool_create_slab_pool(RPC_BUFFER_POOLSIZE, rpc_buffer_slabp); if (!rpc_buffer_mempool) goto err_nomem; if (!rpciod_start()) goto err_nomem; return 0;err_nomem: rpc_destroy_mempool(); return -ENOMEM;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -