📄 svcsock.c
字号:
* We have to be able to interrupt this wait * to bring down the daemons ... */ set_current_state(TASK_INTERRUPTIBLE); add_wait_queue(&rqstp->rq_wait, &wait); spin_unlock_bh(&pool->sp_lock); schedule_timeout(timeout); try_to_freeze(); spin_lock_bh(&pool->sp_lock); remove_wait_queue(&rqstp->rq_wait, &wait); if (!(svsk = rqstp->rq_sock)) { svc_thread_dequeue(pool, rqstp); spin_unlock_bh(&pool->sp_lock); dprintk("svc: server %p, no data yet\n", rqstp); return signalled()? -EINTR : -EAGAIN; } } spin_unlock_bh(&pool->sp_lock); dprintk("svc: server %p, pool %u, socket %p, inuse=%d\n", rqstp, pool->sp_id, svsk, atomic_read(&svsk->sk_inuse)); len = svsk->sk_recvfrom(rqstp); dprintk("svc: got len=%d\n", len); /* No data, incomplete (TCP) read, or accept() */ if (len == 0 || len == -EAGAIN) { rqstp->rq_res.len = 0; svc_sock_release(rqstp); return -EAGAIN; } svsk->sk_lastrecv = get_seconds(); clear_bit(SK_OLD, &svsk->sk_flags); rqstp->rq_secure = svc_port_is_privileged(svc_addr(rqstp)); rqstp->rq_chandle.defer = svc_defer; if (serv->sv_stats) serv->sv_stats->netcnt++; return len;}/* * Drop request */voidsvc_drop(struct svc_rqst *rqstp){ dprintk("svc: socket %p dropped request\n", rqstp->rq_sock); svc_sock_release(rqstp);}/* * Return reply to client. */intsvc_send(struct svc_rqst *rqstp){ struct svc_sock *svsk; int len; struct xdr_buf *xb; if ((svsk = rqstp->rq_sock) == NULL) { printk(KERN_WARNING "NULL socket pointer in %s:%d\n", __FILE__, __LINE__); return -EFAULT; } /* release the receive skb before sending the reply */ svc_release_skb(rqstp); /* calculate over-all length */ xb = & rqstp->rq_res; xb->len = xb->head[0].iov_len + xb->page_len + xb->tail[0].iov_len; /* Grab svsk->sk_mutex to serialize outgoing data. */ mutex_lock(&svsk->sk_mutex); if (test_bit(SK_DEAD, &svsk->sk_flags)) len = -ENOTCONN; else len = svsk->sk_sendto(rqstp); mutex_unlock(&svsk->sk_mutex); svc_sock_release(rqstp); if (len == -ECONNREFUSED || len == -ENOTCONN || len == -EAGAIN) return 0; return len;}/* * Timer function to close old temporary sockets, using * a mark-and-sweep algorithm. */static voidsvc_age_temp_sockets(unsigned long closure){ struct svc_serv *serv = (struct svc_serv *)closure; struct svc_sock *svsk; struct list_head *le, *next; LIST_HEAD(to_be_aged); dprintk("svc_age_temp_sockets\n"); if (!spin_trylock_bh(&serv->sv_lock)) { /* busy, try again 1 sec later */ dprintk("svc_age_temp_sockets: busy\n"); mod_timer(&serv->sv_temptimer, jiffies + HZ); return; } list_for_each_safe(le, next, &serv->sv_tempsocks) { svsk = list_entry(le, struct svc_sock, sk_list); if (!test_and_set_bit(SK_OLD, &svsk->sk_flags)) continue; if (atomic_read(&svsk->sk_inuse) > 1 || test_bit(SK_BUSY, &svsk->sk_flags)) continue; atomic_inc(&svsk->sk_inuse); list_move(le, &to_be_aged); set_bit(SK_CLOSE, &svsk->sk_flags); set_bit(SK_DETACHED, &svsk->sk_flags); } spin_unlock_bh(&serv->sv_lock); while (!list_empty(&to_be_aged)) { le = to_be_aged.next; /* fiddling the sk_list node is safe 'cos we're SK_DETACHED */ list_del_init(le); svsk = list_entry(le, struct svc_sock, sk_list); dprintk("queuing svsk %p for closing, %lu seconds old\n", svsk, get_seconds() - svsk->sk_lastrecv); /* a thread will dequeue and close it soon */ svc_sock_enqueue(svsk); svc_sock_put(svsk); } mod_timer(&serv->sv_temptimer, jiffies + svc_conn_age_period * HZ);}/* * Initialize socket for RPC use and create svc_sock struct * XXX: May want to setsockopt SO_SNDBUF and SO_RCVBUF. */static struct svc_sock *svc_setup_socket(struct svc_serv *serv, struct socket *sock, int *errp, int flags){ struct svc_sock *svsk; struct sock *inet; int pmap_register = !(flags & SVC_SOCK_ANONYMOUS); int is_temporary = flags & SVC_SOCK_TEMPORARY; dprintk("svc: svc_setup_socket %p\n", sock); if (!(svsk = kzalloc(sizeof(*svsk), GFP_KERNEL))) { *errp = -ENOMEM; return NULL; } inet = sock->sk; /* Register socket with portmapper */ if (*errp >= 0 && pmap_register) *errp = svc_register(serv, inet->sk_protocol, ntohs(inet_sk(inet)->sport)); if (*errp < 0) { kfree(svsk); return NULL; } set_bit(SK_BUSY, &svsk->sk_flags); inet->sk_user_data = svsk; svsk->sk_sock = sock; svsk->sk_sk = inet; svsk->sk_ostate = inet->sk_state_change; svsk->sk_odata = inet->sk_data_ready; svsk->sk_owspace = inet->sk_write_space; svsk->sk_server = serv; atomic_set(&svsk->sk_inuse, 1); svsk->sk_lastrecv = get_seconds(); spin_lock_init(&svsk->sk_lock); INIT_LIST_HEAD(&svsk->sk_deferred); INIT_LIST_HEAD(&svsk->sk_ready); mutex_init(&svsk->sk_mutex); /* Initialize the socket */ if (sock->type == SOCK_DGRAM) svc_udp_init(svsk); else svc_tcp_init(svsk); spin_lock_bh(&serv->sv_lock); if (is_temporary) { set_bit(SK_TEMP, &svsk->sk_flags); list_add(&svsk->sk_list, &serv->sv_tempsocks); serv->sv_tmpcnt++; if (serv->sv_temptimer.function == NULL) { /* setup timer to age temp sockets */ setup_timer(&serv->sv_temptimer, svc_age_temp_sockets, (unsigned long)serv); mod_timer(&serv->sv_temptimer, jiffies + svc_conn_age_period * HZ); } } else { clear_bit(SK_TEMP, &svsk->sk_flags); list_add(&svsk->sk_list, &serv->sv_permsocks); } spin_unlock_bh(&serv->sv_lock); dprintk("svc: svc_setup_socket created %p (inet %p)\n", svsk, svsk->sk_sk); return svsk;}int svc_addsock(struct svc_serv *serv, int fd, char *name_return, int *proto){ int err = 0; struct socket *so = sockfd_lookup(fd, &err); struct svc_sock *svsk = NULL; if (!so) return err; if (so->sk->sk_family != AF_INET) err = -EAFNOSUPPORT; else if (so->sk->sk_protocol != IPPROTO_TCP && so->sk->sk_protocol != IPPROTO_UDP) err = -EPROTONOSUPPORT; else if (so->state > SS_UNCONNECTED) err = -EISCONN; else { svsk = svc_setup_socket(serv, so, &err, SVC_SOCK_DEFAULTS); if (svsk) { svc_sock_received(svsk); err = 0; } } if (err) { sockfd_put(so); return err; } if (proto) *proto = so->sk->sk_protocol; return one_sock_name(name_return, svsk);}EXPORT_SYMBOL_GPL(svc_addsock);/* * Create socket for RPC service. */static int svc_create_socket(struct svc_serv *serv, int protocol, struct sockaddr *sin, int len, int flags){ struct svc_sock *svsk; struct socket *sock; int error; int type; char buf[RPC_MAX_ADDRBUFLEN]; dprintk("svc: svc_create_socket(%s, %d, %s)\n", serv->sv_program->pg_name, protocol, __svc_print_addr(sin, buf, sizeof(buf))); if (protocol != IPPROTO_UDP && protocol != IPPROTO_TCP) { printk(KERN_WARNING "svc: only UDP and TCP " "sockets supported\n"); return -EINVAL; } type = (protocol == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM; error = sock_create_kern(sin->sa_family, type, protocol, &sock); if (error < 0) return error; svc_reclassify_socket(sock); if (type == SOCK_STREAM) sock->sk->sk_reuse = 1; /* allow address reuse */ error = kernel_bind(sock, sin, len); if (error < 0) goto bummer; if (protocol == IPPROTO_TCP) { if ((error = kernel_listen(sock, 64)) < 0) goto bummer; } if ((svsk = svc_setup_socket(serv, sock, &error, flags)) != NULL) { svc_sock_received(svsk); return ntohs(inet_sk(svsk->sk_sk)->sport); }bummer: dprintk("svc: svc_create_socket error = %d\n", -error); sock_release(sock); return error;}/* * Remove a dead socket */static voidsvc_delete_socket(struct svc_sock *svsk){ struct svc_serv *serv; struct sock *sk; dprintk("svc: svc_delete_socket(%p)\n", svsk); serv = svsk->sk_server; sk = svsk->sk_sk; sk->sk_state_change = svsk->sk_ostate; sk->sk_data_ready = svsk->sk_odata; sk->sk_write_space = svsk->sk_owspace; spin_lock_bh(&serv->sv_lock); if (!test_and_set_bit(SK_DETACHED, &svsk->sk_flags)) list_del_init(&svsk->sk_list); /* * We used to delete the svc_sock from whichever list * it's sk_ready node was on, but we don't actually * need to. This is because the only time we're called * while still attached to a queue, the queue itself * is about to be destroyed (in svc_destroy). */ if (!test_and_set_bit(SK_DEAD, &svsk->sk_flags)) { BUG_ON(atomic_read(&svsk->sk_inuse)<2); atomic_dec(&svsk->sk_inuse); if (test_bit(SK_TEMP, &svsk->sk_flags)) serv->sv_tmpcnt--; } spin_unlock_bh(&serv->sv_lock);}static void svc_close_socket(struct svc_sock *svsk){ set_bit(SK_CLOSE, &svsk->sk_flags); if (test_and_set_bit(SK_BUSY, &svsk->sk_flags)) /* someone else will have to effect the close */ return; atomic_inc(&svsk->sk_inuse); svc_delete_socket(svsk); clear_bit(SK_BUSY, &svsk->sk_flags); svc_sock_put(svsk);}void svc_force_close_socket(struct svc_sock *svsk){ set_bit(SK_CLOSE, &svsk->sk_flags); if (test_bit(SK_BUSY, &svsk->sk_flags)) { /* Waiting to be processed, but no threads left, * So just remove it from the waiting list */ list_del_init(&svsk->sk_ready); clear_bit(SK_BUSY, &svsk->sk_flags); } svc_close_socket(svsk);}/** * svc_makesock - Make a socket for nfsd and lockd * @serv: RPC server structure * @protocol: transport protocol to use * @port: port to use * @flags: requested socket characteristics * */int svc_makesock(struct svc_serv *serv, int protocol, unsigned short port, int flags){ struct sockaddr_in sin = { .sin_family = AF_INET, .sin_addr.s_addr = INADDR_ANY, .sin_port = htons(port), }; dprintk("svc: creating socket proto = %d\n", protocol); return svc_create_socket(serv, protocol, (struct sockaddr *) &sin, sizeof(sin), flags);}/* * Handle defer and revisit of requests */static void svc_revisit(struct cache_deferred_req *dreq, int too_many){ struct svc_deferred_req *dr = container_of(dreq, struct svc_deferred_req, handle); struct svc_sock *svsk; if (too_many) { svc_sock_put(dr->svsk); kfree(dr); return; } dprintk("revisit queued\n"); svsk = dr->svsk; dr->svsk = NULL; spin_lock(&svsk->sk_lock); list_add(&dr->handle.recent, &svsk->sk_deferred); spin_unlock(&svsk->sk_lock); set_bit(SK_DEFERRED, &svsk->sk_flags); svc_sock_enqueue(svsk); svc_sock_put(svsk);}static struct cache_deferred_req *svc_defer(struct cache_req *req){ struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle); int size = sizeof(struct svc_deferred_req) + (rqstp->rq_arg.len); struct svc_deferred_req *dr; if (rqstp->rq_arg.page_len) return NULL; /* if more than a page, give up FIXME */ if (rqstp->rq_deferred) { dr = rqstp->rq_deferred; rqstp->rq_deferred = NULL; } else { int skip = rqstp->rq_arg.len - rqstp->rq_arg.head[0].iov_len; /* FIXME maybe discard if size too large */ dr = kmalloc(size, GFP_KERNEL); if (dr == NULL) return NULL; dr->handle.owner = rqstp->rq_server; dr->prot = rqstp->rq_prot; memcpy(&dr->addr, &rqstp->rq_addr, rqstp->rq_addrlen); dr->addrlen = rqstp->rq_addrlen; dr->daddr = rqstp->rq_daddr; dr->argslen = rqstp->rq_arg.len >> 2; memcpy(dr->args, rqstp->rq_arg.head[0].iov_base-skip, dr->argslen<<2); } atomic_inc(&rqstp->rq_sock->sk_inuse); dr->svsk = rqstp->rq_sock; dr->handle.revisit = svc_revisit; return &dr->handle;}/* * recv data from a deferred request into an active one */static int svc_deferred_recv(struct svc_rqst *rqstp){ struct svc_deferred_req *dr = rqstp->rq_deferred; rqstp->rq_arg.head[0].iov_base = dr->args; rqstp->rq_arg.head[0].iov_len = dr->argslen<<2; rqstp->rq_arg.page_len = 0; rqstp->rq_arg.len = dr->argslen<<2; rqstp->rq_prot = dr->prot; memcpy(&rqstp->rq_addr, &dr->addr, dr->addrlen); rqstp->rq_addrlen = dr->addrlen; rqstp->rq_daddr = dr->daddr; rqstp->rq_respages = rqstp->rq_pages; return dr->argslen<<2;}static struct svc_deferred_req *svc_deferred_dequeue(struct svc_sock *svsk){ struct svc_deferred_req *dr = NULL; if (!test_bit(SK_DEFERRED, &svsk->sk_flags)) return NULL; spin_lock(&svsk->sk_lock); clear_bit(SK_DEFERRED, &svsk->sk_flags); if (!list_empty(&svsk->sk_deferred)) { dr = list_entry(svsk->sk_deferred.next, struct svc_deferred_req, handle.recent); list_del_init(&dr->handle.recent); set_bit(SK_DEFERRED, &svsk->sk_flags); } spin_unlock(&svsk->sk_lock); return dr;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -