📄 svc.c
字号:
if (rqstp->rq_pages[i]) put_page(rqstp->rq_pages[i]);}/* * Create a thread in the given pool. Caller must hold BKL. * On a NUMA or SMP machine, with a multi-pool serv, the thread * will be restricted to run on the cpus belonging to the pool. */static int__svc_create_thread(svc_thread_fn func, struct svc_serv *serv, struct svc_pool *pool){ struct svc_rqst *rqstp; int error = -ENOMEM; int have_oldmask = 0; cpumask_t oldmask; rqstp = kzalloc(sizeof(*rqstp), GFP_KERNEL); if (!rqstp) goto out; init_waitqueue_head(&rqstp->rq_wait); if (!(rqstp->rq_argp = kmalloc(serv->sv_xdrsize, GFP_KERNEL)) || !(rqstp->rq_resp = kmalloc(serv->sv_xdrsize, GFP_KERNEL)) || !svc_init_buffer(rqstp, serv->sv_max_mesg)) goto out_thread; serv->sv_nrthreads++; spin_lock_bh(&pool->sp_lock); pool->sp_nrthreads++; list_add(&rqstp->rq_all, &pool->sp_all_threads); spin_unlock_bh(&pool->sp_lock); rqstp->rq_server = serv; rqstp->rq_pool = pool; if (serv->sv_nrpools > 1) have_oldmask = svc_pool_map_set_cpumask(pool->sp_id, &oldmask); error = kernel_thread((int (*)(void *)) func, rqstp, 0); if (have_oldmask) set_cpus_allowed(current, oldmask); if (error < 0) goto out_thread; svc_sock_update_bufs(serv); error = 0;out: return error;out_thread: svc_exit_thread(rqstp); goto out;}/* * Create a thread in the default pool. Caller must hold BKL. */intsvc_create_thread(svc_thread_fn func, struct svc_serv *serv){ return __svc_create_thread(func, serv, &serv->sv_pools[0]);}/* * Choose a pool in which to create a new thread, for svc_set_num_threads */static inline struct svc_pool *choose_pool(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state){ if (pool != NULL) return pool; return &serv->sv_pools[(*state)++ % serv->sv_nrpools];}/* * Choose a thread to kill, for svc_set_num_threads */static inline struct task_struct *choose_victim(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state){ unsigned int i; struct task_struct *task = NULL; if (pool != NULL) { spin_lock_bh(&pool->sp_lock); } else { /* choose a pool in round-robin fashion */ for (i = 0; i < serv->sv_nrpools; i++) { pool = &serv->sv_pools[--(*state) % serv->sv_nrpools]; spin_lock_bh(&pool->sp_lock); if (!list_empty(&pool->sp_all_threads)) goto found_pool; spin_unlock_bh(&pool->sp_lock); } return NULL; }found_pool: if (!list_empty(&pool->sp_all_threads)) { struct svc_rqst *rqstp; /* * Remove from the pool->sp_all_threads list * so we don't try to kill it again. */ rqstp = list_entry(pool->sp_all_threads.next, struct svc_rqst, rq_all); list_del_init(&rqstp->rq_all); task = rqstp->rq_task; } spin_unlock_bh(&pool->sp_lock); return task;}/* * Create or destroy enough new threads to make the number * of threads the given number. If `pool' is non-NULL, applies * only to threads in that pool, otherwise round-robins between * all pools. Must be called with a svc_get() reference and * the BKL held. * * Destroying threads relies on the service threads filling in * rqstp->rq_task, which only the nfs ones do. Assumes the serv * has been created using svc_create_pooled(). * * Based on code that used to be in nfsd_svc() but tweaked * to be pool-aware. */intsvc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs){ struct task_struct *victim; int error = 0; unsigned int state = serv->sv_nrthreads-1; if (pool == NULL) { /* The -1 assumes caller has done a svc_get() */ nrservs -= (serv->sv_nrthreads-1); } else { spin_lock_bh(&pool->sp_lock); nrservs -= pool->sp_nrthreads; spin_unlock_bh(&pool->sp_lock); } /* create new threads */ while (nrservs > 0) { nrservs--; __module_get(serv->sv_module); error = __svc_create_thread(serv->sv_function, serv, choose_pool(serv, pool, &state)); if (error < 0) { module_put(serv->sv_module); break; } } /* destroy old threads */ while (nrservs < 0 && (victim = choose_victim(serv, pool, &state)) != NULL) { send_sig(serv->sv_kill_signal, victim, 1); nrservs++; } return error;}/* * Called from a server thread as it's exiting. Caller must hold BKL. */voidsvc_exit_thread(struct svc_rqst *rqstp){ struct svc_serv *serv = rqstp->rq_server; struct svc_pool *pool = rqstp->rq_pool; svc_release_buffer(rqstp); kfree(rqstp->rq_resp); kfree(rqstp->rq_argp); kfree(rqstp->rq_auth_data); spin_lock_bh(&pool->sp_lock); pool->sp_nrthreads--; list_del(&rqstp->rq_all); spin_unlock_bh(&pool->sp_lock); kfree(rqstp); /* Release the server */ if (serv) svc_destroy(serv);}/* * Register an RPC service with the local portmapper. * To unregister a service, call this routine with * proto and port == 0. */intsvc_register(struct svc_serv *serv, int proto, unsigned short port){ struct svc_program *progp; unsigned long flags; int i, error = 0, dummy; if (!port) clear_thread_flag(TIF_SIGPENDING); for (progp = serv->sv_program; progp; progp = progp->pg_next) { for (i = 0; i < progp->pg_nvers; i++) { if (progp->pg_vers[i] == NULL) continue; dprintk("svc: svc_register(%s, %s, %d, %d)%s\n", progp->pg_name, proto == IPPROTO_UDP? "udp" : "tcp", port, i, progp->pg_vers[i]->vs_hidden? " (but not telling portmap)" : ""); if (progp->pg_vers[i]->vs_hidden) continue; error = rpcb_register(progp->pg_prog, i, proto, port, &dummy); if (error < 0) break; if (port && !dummy) { error = -EACCES; break; } } } if (!port) { spin_lock_irqsave(¤t->sighand->siglock, flags); recalc_sigpending(); spin_unlock_irqrestore(¤t->sighand->siglock, flags); } return error;}/* * Printk the given error with the address of the client that caused it. */static int__attribute__ ((format (printf, 2, 3)))svc_printk(struct svc_rqst *rqstp, const char *fmt, ...){ va_list args; int r; char buf[RPC_MAX_ADDRBUFLEN]; if (!net_ratelimit()) return 0; printk(KERN_WARNING "svc: %s: ", svc_print_addr(rqstp, buf, sizeof(buf))); va_start(args, fmt); r = vprintk(fmt, args); va_end(args); return r;}/* * Process the RPC request. */intsvc_process(struct svc_rqst *rqstp){ struct svc_program *progp; struct svc_version *versp = NULL; /* compiler food */ struct svc_procedure *procp = NULL; struct kvec * argv = &rqstp->rq_arg.head[0]; struct kvec * resv = &rqstp->rq_res.head[0]; struct svc_serv *serv = rqstp->rq_server; kxdrproc_t xdr; __be32 *statp; u32 dir, prog, vers, proc; __be32 auth_stat, rpc_stat; int auth_res; __be32 *reply_statp; rpc_stat = rpc_success; if (argv->iov_len < 6*4) goto err_short_len; /* setup response xdr_buf. * Initially it has just one page */ rqstp->rq_resused = 1; resv->iov_base = page_address(rqstp->rq_respages[0]); resv->iov_len = 0; rqstp->rq_res.pages = rqstp->rq_respages + 1; rqstp->rq_res.len = 0; rqstp->rq_res.page_base = 0; rqstp->rq_res.page_len = 0; rqstp->rq_res.buflen = PAGE_SIZE; rqstp->rq_res.tail[0].iov_base = NULL; rqstp->rq_res.tail[0].iov_len = 0; /* Will be turned off only in gss privacy case: */ rqstp->rq_splice_ok = 1; /* tcp needs a space for the record length... */ if (rqstp->rq_prot == IPPROTO_TCP) svc_putnl(resv, 0); rqstp->rq_xid = svc_getu32(argv); svc_putu32(resv, rqstp->rq_xid); dir = svc_getnl(argv); vers = svc_getnl(argv); /* First words of reply: */ svc_putnl(resv, 1); /* REPLY */ if (dir != 0) /* direction != CALL */ goto err_bad_dir; if (vers != 2) /* RPC version number */ goto err_bad_rpc; /* Save position in case we later decide to reject: */ reply_statp = resv->iov_base + resv->iov_len; svc_putnl(resv, 0); /* ACCEPT */ rqstp->rq_prog = prog = svc_getnl(argv); /* program number */ rqstp->rq_vers = vers = svc_getnl(argv); /* version number */ rqstp->rq_proc = proc = svc_getnl(argv); /* procedure number */ progp = serv->sv_program; for (progp = serv->sv_program; progp; progp = progp->pg_next) if (prog == progp->pg_prog) break; /* * Decode auth data, and add verifier to reply buffer. * We do this before anything else in order to get a decent * auth verifier. */ auth_res = svc_authenticate(rqstp, &auth_stat); /* Also give the program a chance to reject this call: */ if (auth_res == SVC_OK && progp) { auth_stat = rpc_autherr_badcred; auth_res = progp->pg_authenticate(rqstp); } switch (auth_res) { case SVC_OK: break; case SVC_GARBAGE: rpc_stat = rpc_garbage_args; goto err_bad; case SVC_SYSERR: rpc_stat = rpc_system_err; goto err_bad; case SVC_DENIED: goto err_bad_auth; case SVC_DROP: goto dropit; case SVC_COMPLETE: goto sendit; } if (progp == NULL) goto err_bad_prog; if (vers >= progp->pg_nvers || !(versp = progp->pg_vers[vers])) goto err_bad_vers; procp = versp->vs_proc + proc; if (proc >= versp->vs_nproc || !procp->pc_func) goto err_bad_proc; rqstp->rq_server = serv; rqstp->rq_procinfo = procp; /* Syntactic check complete */ serv->sv_stats->rpccnt++; /* Build the reply header. */ statp = resv->iov_base +resv->iov_len; svc_putnl(resv, RPC_SUCCESS); /* Bump per-procedure stats counter */ procp->pc_count++; /* Initialize storage for argp and resp */ memset(rqstp->rq_argp, 0, procp->pc_argsize); memset(rqstp->rq_resp, 0, procp->pc_ressize); /* un-reserve some of the out-queue now that we have a * better idea of reply size */ if (procp->pc_xdrressize) svc_reserve_auth(rqstp, procp->pc_xdrressize<<2); /* Call the function that processes the request. */ if (!versp->vs_dispatch) { /* Decode arguments */ xdr = procp->pc_decode; if (xdr && !xdr(rqstp, argv->iov_base, rqstp->rq_argp)) goto err_garbage; *statp = procp->pc_func(rqstp, rqstp->rq_argp, rqstp->rq_resp); /* Encode reply */ if (*statp == rpc_drop_reply) { if (procp->pc_release) procp->pc_release(rqstp, NULL, rqstp->rq_resp); goto dropit; } if (*statp == rpc_success && (xdr = procp->pc_encode) && !xdr(rqstp, resv->iov_base+resv->iov_len, rqstp->rq_resp)) { dprintk("svc: failed to encode reply\n"); /* serv->sv_stats->rpcsystemerr++; */ *statp = rpc_system_err; } } else { dprintk("svc: calling dispatcher\n"); if (!versp->vs_dispatch(rqstp, statp)) { /* Release reply info */ if (procp->pc_release) procp->pc_release(rqstp, NULL, rqstp->rq_resp); goto dropit; } } /* Check RPC status result */ if (*statp != rpc_success) resv->iov_len = ((void*)statp) - resv->iov_base + 4; /* Release reply info */ if (procp->pc_release) procp->pc_release(rqstp, NULL, rqstp->rq_resp); if (procp->pc_encode == NULL) goto dropit; sendit: if (svc_authorise(rqstp)) goto dropit; return svc_send(rqstp); dropit: svc_authorise(rqstp); /* doesn't hurt to call this twice */ dprintk("svc: svc_process dropit\n"); svc_drop(rqstp); return 0;err_short_len: svc_printk(rqstp, "short len %Zd, dropping request\n", argv->iov_len); goto dropit; /* drop request */err_bad_dir: svc_printk(rqstp, "bad direction %d, dropping request\n", dir); serv->sv_stats->rpcbadfmt++; goto dropit; /* drop request */err_bad_rpc: serv->sv_stats->rpcbadfmt++; svc_putnl(resv, 1); /* REJECT */ svc_putnl(resv, 0); /* RPC_MISMATCH */ svc_putnl(resv, 2); /* Only RPCv2 supported */ svc_putnl(resv, 2); goto sendit;err_bad_auth: dprintk("svc: authentication failed (%d)\n", ntohl(auth_stat)); serv->sv_stats->rpcbadauth++; /* Restore write pointer to location of accept status: */ xdr_ressize_check(rqstp, reply_statp); svc_putnl(resv, 1); /* REJECT */ svc_putnl(resv, 1); /* AUTH_ERROR */ svc_putnl(resv, ntohl(auth_stat)); /* status */ goto sendit;err_bad_prog: dprintk("svc: unknown program %d\n", prog); serv->sv_stats->rpcbadfmt++; svc_putnl(resv, RPC_PROG_UNAVAIL); goto sendit;err_bad_vers: svc_printk(rqstp, "unknown version (%d for prog %d, %s)\n", vers, prog, progp->pg_name); serv->sv_stats->rpcbadfmt++; svc_putnl(resv, RPC_PROG_MISMATCH); svc_putnl(resv, progp->pg_lovers); svc_putnl(resv, progp->pg_hivers); goto sendit;err_bad_proc: svc_printk(rqstp, "unknown procedure (%d)\n", proc); serv->sv_stats->rpcbadfmt++; svc_putnl(resv, RPC_PROC_UNAVAIL); goto sendit;err_garbage: svc_printk(rqstp, "failed to decode args\n"); rpc_stat = rpc_garbage_args;err_bad: serv->sv_stats->rpcbadfmt++; svc_putnl(resv, ntohl(rpc_stat)); goto sendit;}/* * Return (transport-specific) limit on the rpc payload. */u32 svc_max_payload(const struct svc_rqst *rqstp){ int max = RPCSVC_MAXPAYLOAD_TCP; if (rqstp->rq_sock->sk_sock->type == SOCK_DGRAM) max = RPCSVC_MAXPAYLOAD_UDP; if (rqstp->rq_server->sv_max_payload < max) max = rqstp->rq_server->sv_max_payload; return max;}EXPORT_SYMBOL_GPL(svc_max_payload);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -