📄 svcsock.c
字号:
memcpy(xbufp->head[0].iov_base, &reclen, 4); if (test_bit(SK_DEAD, &rqstp->rq_sock->sk_flags)) return -ENOTCONN; sent = svc_sendto(rqstp, &rqstp->rq_res); if (sent != xbufp->len) { printk(KERN_NOTICE "rpc-srv/tcp: %s: %s %d when sending %d bytes - shutting down socket\n", rqstp->rq_sock->sk_server->sv_name, (sent<0)?"got error":"sent only", sent, xbufp->len); svc_delete_socket(rqstp->rq_sock); sent = -EAGAIN; } return sent;}static voidsvc_tcp_init(struct svc_sock *svsk){ struct sock *sk = svsk->sk_sk; struct tcp_opt *tp = tcp_sk(sk); svsk->sk_recvfrom = svc_tcp_recvfrom; svsk->sk_sendto = svc_tcp_sendto; if (sk->sk_state == TCP_LISTEN) { dprintk("setting up TCP socket for listening\n"); sk->sk_data_ready = svc_tcp_listen_data_ready; set_bit(SK_CONN, &svsk->sk_flags); } else { dprintk("setting up TCP socket for reading\n"); sk->sk_state_change = svc_tcp_state_change; sk->sk_data_ready = svc_tcp_data_ready; sk->sk_write_space = svc_write_space; svsk->sk_reclen = 0; svsk->sk_tcplen = 0; tp->nonagle = 1; /* disable Nagle's algorithm */ /* initialise setting must have enough space to * receive and respond to one request. * svc_tcp_recvfrom will re-adjust if necessary */ svc_sock_setbufsize(svsk->sk_sock, 3 * svsk->sk_server->sv_bufsz, 3 * svsk->sk_server->sv_bufsz); set_bit(SK_CHNGBUF, &svsk->sk_flags); set_bit(SK_DATA, &svsk->sk_flags); if (sk->sk_state != TCP_ESTABLISHED) set_bit(SK_CLOSE, &svsk->sk_flags); }}voidsvc_sock_update_bufs(struct svc_serv *serv){ /* * The number of server threads has changed. Update * rcvbuf and sndbuf accordingly on all sockets */ struct list_head *le; spin_lock_bh(&serv->sv_lock); list_for_each(le, &serv->sv_permsocks) { struct svc_sock *svsk = list_entry(le, struct svc_sock, sk_list); set_bit(SK_CHNGBUF, &svsk->sk_flags); } list_for_each(le, &serv->sv_tempsocks) { struct svc_sock *svsk = list_entry(le, struct svc_sock, sk_list); set_bit(SK_CHNGBUF, &svsk->sk_flags); } spin_unlock_bh(&serv->sv_lock);}/* * Receive the next request on any socket. */intsvc_recv(struct svc_serv *serv, struct svc_rqst *rqstp, long timeout){ struct svc_sock *svsk =NULL; int len; int pages; struct xdr_buf *arg; DECLARE_WAITQUEUE(wait, current); dprintk("svc: server %p waiting for data (to = %ld)\n", rqstp, timeout); if (rqstp->rq_sock) printk(KERN_ERR "svc_recv: service %p, socket not NULL!\n", rqstp); if (waitqueue_active(&rqstp->rq_wait)) printk(KERN_ERR "svc_recv: service %p, wait queue active!\n", rqstp); /* Initialize the buffers */ /* first reclaim pages that were moved to response list */ svc_pushback_allpages(rqstp); /* now allocate needed pages. If we get a failure, sleep briefly */ pages = 2 + (serv->sv_bufsz + PAGE_SIZE -1) / PAGE_SIZE; while (rqstp->rq_arghi < pages) { struct page *p = alloc_page(GFP_KERNEL); if (!p) { set_current_state(TASK_UNINTERRUPTIBLE); schedule_timeout(HZ/2); continue; } rqstp->rq_argpages[rqstp->rq_arghi++] = p; } /* Make arg->head point to first page and arg->pages point to rest */ arg = &rqstp->rq_arg; arg->head[0].iov_base = page_address(rqstp->rq_argpages[0]); arg->head[0].iov_len = PAGE_SIZE; rqstp->rq_argused = 1; arg->pages = rqstp->rq_argpages + 1; arg->page_base = 0; /* save at least one page for response */ arg->page_len = (pages-2)*PAGE_SIZE; arg->len = (pages-1)*PAGE_SIZE; arg->tail[0].iov_len = 0; if (signalled()) return -EINTR; spin_lock_bh(&serv->sv_lock); if (!list_empty(&serv->sv_tempsocks)) { svsk = list_entry(serv->sv_tempsocks.next, struct svc_sock, sk_list); /* apparently the "standard" is that clients close * idle connections after 5 minutes, servers after * 6 minutes * http://www.connectathon.org/talks96/nfstcp.pdf */ if (get_seconds() - svsk->sk_lastrecv < 6*60 || test_bit(SK_BUSY, &svsk->sk_flags)) svsk = NULL; } if (svsk) { set_bit(SK_BUSY, &svsk->sk_flags); set_bit(SK_CLOSE, &svsk->sk_flags); rqstp->rq_sock = svsk; svsk->sk_inuse++; } else if ((svsk = svc_sock_dequeue(serv)) != NULL) { rqstp->rq_sock = svsk; svsk->sk_inuse++; rqstp->rq_reserved = serv->sv_bufsz; svsk->sk_reserved += rqstp->rq_reserved; } else { /* No data pending. Go to sleep */ svc_serv_enqueue(serv, rqstp); /* * We have to be able to interrupt this wait * to bring down the daemons ... */ set_current_state(TASK_INTERRUPTIBLE); add_wait_queue(&rqstp->rq_wait, &wait); spin_unlock_bh(&serv->sv_lock); schedule_timeout(timeout); if (current->flags & PF_FREEZE) refrigerator(PF_FREEZE); spin_lock_bh(&serv->sv_lock); remove_wait_queue(&rqstp->rq_wait, &wait); if (!(svsk = rqstp->rq_sock)) { svc_serv_dequeue(serv, rqstp); spin_unlock_bh(&serv->sv_lock); dprintk("svc: server %p, no data yet\n", rqstp); return signalled()? -EINTR : -EAGAIN; } } spin_unlock_bh(&serv->sv_lock); dprintk("svc: server %p, socket %p, inuse=%d\n", rqstp, svsk, svsk->sk_inuse); len = svsk->sk_recvfrom(rqstp); dprintk("svc: got len=%d\n", len); /* No data, incomplete (TCP) read, or accept() */ if (len == 0 || len == -EAGAIN) { rqstp->rq_res.len = 0; svc_sock_release(rqstp); return -EAGAIN; } svsk->sk_lastrecv = get_seconds(); if (test_bit(SK_TEMP, &svsk->sk_flags)) { /* push active sockets to end of list */ spin_lock_bh(&serv->sv_lock); if (!list_empty(&svsk->sk_list)) list_move_tail(&svsk->sk_list, &serv->sv_tempsocks); spin_unlock_bh(&serv->sv_lock); } rqstp->rq_secure = ntohs(rqstp->rq_addr.sin_port) < 1024; rqstp->rq_chandle.defer = svc_defer; if (serv->sv_stats) serv->sv_stats->netcnt++; return len;}/* * Drop request */voidsvc_drop(struct svc_rqst *rqstp){ dprintk("svc: socket %p dropped request\n", rqstp->rq_sock); svc_sock_release(rqstp);}/* * Return reply to client. */intsvc_send(struct svc_rqst *rqstp){ struct svc_sock *svsk; int len; struct xdr_buf *xb; if ((svsk = rqstp->rq_sock) == NULL) { printk(KERN_WARNING "NULL socket pointer in %s:%d\n", __FILE__, __LINE__); return -EFAULT; } /* release the receive skb before sending the reply */ svc_release_skb(rqstp); /* calculate over-all length */ xb = & rqstp->rq_res; xb->len = xb->head[0].iov_len + xb->page_len + xb->tail[0].iov_len; /* Grab svsk->sk_sem to serialize outgoing data. */ down(&svsk->sk_sem); if (test_bit(SK_DEAD, &svsk->sk_flags)) len = -ENOTCONN; else len = svsk->sk_sendto(rqstp); up(&svsk->sk_sem); svc_sock_release(rqstp); if (len == -ECONNREFUSED || len == -ENOTCONN || len == -EAGAIN) return 0; return len;}/* * Initialize socket for RPC use and create svc_sock struct * XXX: May want to setsockopt SO_SNDBUF and SO_RCVBUF. */static struct svc_sock *svc_setup_socket(struct svc_serv *serv, struct socket *sock, int *errp, int pmap_register){ struct svc_sock *svsk; struct sock *inet; dprintk("svc: svc_setup_socket %p\n", sock); if (!(svsk = kmalloc(sizeof(*svsk), GFP_KERNEL))) { *errp = -ENOMEM; return NULL; } memset(svsk, 0, sizeof(*svsk)); inet = sock->sk; /* Register socket with portmapper */ if (*errp >= 0 && pmap_register) *errp = svc_register(serv, inet->sk_protocol, ntohs(inet_sk(inet)->sport)); if (*errp < 0) { kfree(svsk); return NULL; } set_bit(SK_BUSY, &svsk->sk_flags); inet->sk_user_data = svsk; svsk->sk_sock = sock; svsk->sk_sk = inet; svsk->sk_ostate = inet->sk_state_change; svsk->sk_odata = inet->sk_data_ready; svsk->sk_owspace = inet->sk_write_space; svsk->sk_server = serv; svsk->sk_lastrecv = get_seconds(); INIT_LIST_HEAD(&svsk->sk_deferred); INIT_LIST_HEAD(&svsk->sk_ready); sema_init(&svsk->sk_sem, 1); /* Initialize the socket */ if (sock->type == SOCK_DGRAM) svc_udp_init(svsk); else svc_tcp_init(svsk); spin_lock_bh(&serv->sv_lock); if (!pmap_register) { set_bit(SK_TEMP, &svsk->sk_flags); list_add(&svsk->sk_list, &serv->sv_tempsocks); serv->sv_tmpcnt++; } else { clear_bit(SK_TEMP, &svsk->sk_flags); list_add(&svsk->sk_list, &serv->sv_permsocks); } spin_unlock_bh(&serv->sv_lock); dprintk("svc: svc_setup_socket created %p (inet %p)\n", svsk, svsk->sk_sk); clear_bit(SK_BUSY, &svsk->sk_flags); svc_sock_enqueue(svsk); return svsk;}/* * Create socket for RPC service. */static intsvc_create_socket(struct svc_serv *serv, int protocol, struct sockaddr_in *sin){ struct svc_sock *svsk; struct socket *sock; int error; int type; dprintk("svc: svc_create_socket(%s, %d, %u.%u.%u.%u:%d)\n", serv->sv_program->pg_name, protocol, NIPQUAD(sin->sin_addr.s_addr), ntohs(sin->sin_port)); if (protocol != IPPROTO_UDP && protocol != IPPROTO_TCP) { printk(KERN_WARNING "svc: only UDP and TCP " "sockets supported\n"); return -EINVAL; } type = (protocol == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM; if ((error = sock_create_kern(PF_INET, type, protocol, &sock)) < 0) return error; if (sin != NULL) { if (type == SOCK_STREAM) sock->sk->sk_reuse = 1; /* allow address reuse */ error = sock->ops->bind(sock, (struct sockaddr *) sin, sizeof(*sin)); if (error < 0) goto bummer; } if (protocol == IPPROTO_TCP) { if ((error = sock->ops->listen(sock, 64)) < 0) goto bummer; } if ((svsk = svc_setup_socket(serv, sock, &error, 1)) != NULL) return 0;bummer: dprintk("svc: svc_create_socket error = %d\n", -error); sock_release(sock); return error;}/* * Remove a dead socket */voidsvc_delete_socket(struct svc_sock *svsk){ struct svc_serv *serv; struct sock *sk; dprintk("svc: svc_delete_socket(%p)\n", svsk); serv = svsk->sk_server; sk = svsk->sk_sk; sk->sk_state_change = svsk->sk_ostate; sk->sk_data_ready = svsk->sk_odata; sk->sk_write_space = svsk->sk_owspace; spin_lock_bh(&serv->sv_lock); list_del_init(&svsk->sk_list); list_del_init(&svsk->sk_ready); if (!test_and_set_bit(SK_DEAD, &svsk->sk_flags)) if (test_bit(SK_TEMP, &svsk->sk_flags)) serv->sv_tmpcnt--; if (!svsk->sk_inuse) { spin_unlock_bh(&serv->sv_lock); sock_release(svsk->sk_sock); kfree(svsk); } else { spin_unlock_bh(&serv->sv_lock); dprintk(KERN_NOTICE "svc: server socket destroy delayed\n"); /* svsk->sk_server = NULL; */ }}/* * Make a socket for nfsd and lockd */intsvc_makesock(struct svc_serv *serv, int protocol, unsigned short port){ struct sockaddr_in sin; dprintk("svc: creating socket proto = %d\n", protocol); sin.sin_family = AF_INET; sin.sin_addr.s_addr = INADDR_ANY; sin.sin_port = htons(port); return svc_create_socket(serv, protocol, &sin);}/* * Handle defer and revisit of requests */static void svc_revisit(struct cache_deferred_req *dreq, int too_many){ struct svc_deferred_req *dr = container_of(dreq, struct svc_deferred_req, handle); struct svc_serv *serv = dreq->owner; struct svc_sock *svsk; if (too_many) { svc_sock_put(dr->svsk); kfree(dr); return; } dprintk("revisit queued\n"); svsk = dr->svsk; dr->svsk = NULL; spin_lock_bh(&serv->sv_lock); list_add(&dr->handle.recent, &svsk->sk_deferred); spin_unlock_bh(&serv->sv_lock); set_bit(SK_DEFERRED, &svsk->sk_flags); svc_sock_enqueue(svsk); svc_sock_put(svsk);}static struct cache_deferred_req *svc_defer(struct cache_req *req){ struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle); int size = sizeof(struct svc_deferred_req) + (rqstp->rq_arg.len); struct svc_deferred_req *dr; if (rqstp->rq_arg.page_len) return NULL; /* if more than a page, give up FIXME */ if (rqstp->rq_deferred) { dr = rqstp->rq_deferred; rqstp->rq_deferred = NULL; } else { int skip = rqstp->rq_arg.len - rqstp->rq_arg.head[0].iov_len; /* FIXME maybe discard if size too large */ dr = kmalloc(size, GFP_KERNEL); if (dr == NULL) return NULL; dr->handle.owner = rqstp->rq_server; dr->prot = rqstp->rq_prot; dr->addr = rqstp->rq_addr; dr->argslen = rqstp->rq_arg.len >> 2; memcpy(dr->args, rqstp->rq_arg.head[0].iov_base-skip, dr->argslen<<2); } spin_lock_bh(&rqstp->rq_server->sv_lock); rqstp->rq_sock->sk_inuse++; dr->svsk = rqstp->rq_sock; spin_unlock_bh(&rqstp->rq_server->sv_lock); dr->handle.revisit = svc_revisit; return &dr->handle;}/* * recv data from a deferred request into an active one */static int svc_deferred_recv(struct svc_rqst *rqstp){ struct svc_deferred_req *dr = rqstp->rq_deferred; rqstp->rq_arg.head[0].iov_base = dr->args; rqstp->rq_arg.head[0].iov_len = dr->argslen<<2; rqstp->rq_arg.page_len = 0; rqstp->rq_arg.len = dr->argslen<<2; rqstp->rq_prot = dr->prot; rqstp->rq_addr = dr->addr; return dr->argslen<<2;}static struct svc_deferred_req *svc_deferred_dequeue(struct svc_sock *svsk){ struct svc_deferred_req *dr = NULL; struct svc_serv *serv = svsk->sk_server; if (!test_bit(SK_DEFERRED, &svsk->sk_flags)) return NULL; spin_lock_bh(&serv->sv_lock); clear_bit(SK_DEFERRED, &svsk->sk_flags); if (!list_empty(&svsk->sk_deferred)) { dr = list_entry(svsk->sk_deferred.next, struct svc_deferred_req, handle.recent); list_del_init(&dr->handle.recent); set_bit(SK_DEFERRED, &svsk->sk_flags); } spin_unlock_bh(&serv->sv_lock); return dr;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -