⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rpc.c

📁 lustre 1.6.5 source code
💻 C
📖 第 1 页 / 共 4 页
字号:
        return 0;}intsrpc_post_active_rqtbuf(lnet_process_id_t peer, int service, void *buf,                        int len, lnet_handle_md_t *mdh, srpc_event_t *ev){        int rc;        int portal;        if (service > SRPC_FRAMEWORK_SERVICE_MAX_ID)                portal = SRPC_REQUEST_PORTAL;        else                portal = SRPC_FRAMEWORK_REQUEST_PORTAL;        rc = srpc_post_active_rdma(portal, service, buf, len,                                    LNET_MD_OP_PUT, peer,                                   LNET_NID_ANY, mdh, ev);        return rc;}intsrpc_post_passive_rqtbuf(int service, void *buf, int len,                         lnet_handle_md_t *mdh, srpc_event_t *ev){        int               rc;        int               portal;        lnet_process_id_t any = {.nid = LNET_NID_ANY,                                 .pid = LNET_PID_ANY};        if (service > SRPC_FRAMEWORK_SERVICE_MAX_ID)                portal = SRPC_REQUEST_PORTAL;        else                portal = SRPC_FRAMEWORK_REQUEST_PORTAL;        rc = srpc_post_passive_rdma(portal, service, buf, len,                                    LNET_MD_OP_PUT, any, mdh, ev);        return rc;}intsrpc_service_post_buffer (srpc_service_t *sv, srpc_buffer_t *buf){        srpc_msg_t *msg = &buf->buf_msg;        int         rc;        LASSERT (!sv->sv_shuttingdown);        buf->buf_mdh = LNET_INVALID_HANDLE;        list_add(&buf->buf_list, &sv->sv_posted_msgq);        sv->sv_nposted_msg++;        spin_unlock(&sv->sv_lock);        rc = srpc_post_passive_rqtbuf(sv->sv_id, msg, sizeof(*msg),                                      &buf->buf_mdh, &sv->sv_ev);        /* At this point, a RPC (new or delayed) may have arrived in         * msg and its event handler has been called. So we must add         * buf to sv_posted_msgq _before_ dropping sv_lock */        spin_lock(&sv->sv_lock);        if (rc == 0) {                if (sv->sv_shuttingdown) {                        spin_unlock(&sv->sv_lock);                        /* srpc_shutdown_service might have tried to unlink me                         * when my buf_mdh was still invalid */                        LNetMDUnlink(buf->buf_mdh);                        spin_lock(&sv->sv_lock);                }                return 0;        }        sv->sv_nposted_msg--;        if (sv->sv_shuttingdown) return rc;        list_del(&buf->buf_list);        spin_unlock(&sv->sv_lock);        LIBCFS_FREE(buf, sizeof(*buf));        spin_lock(&sv->sv_lock);        return rc; }intsrpc_service_add_buffers (srpc_service_t *sv, int nbuffer){        int                rc;        int                posted;        srpc_buffer_t     *buf;        LASSERTF (nbuffer > 0,                  "nbuffer must be positive: %d\n", nbuffer);        for (posted = 0; posted < nbuffer; posted++) {                LIBCFS_ALLOC(buf, sizeof(*buf));                if (buf == NULL) break;                spin_lock(&sv->sv_lock);                rc = srpc_service_post_buffer(sv, buf);                spin_unlock(&sv->sv_lock);                if (rc != 0) break;        }        return posted;}voidsrpc_service_remove_buffers (srpc_service_t *sv, int nbuffer){        LASSERTF (nbuffer > 0,                  "nbuffer must be positive: %d\n", nbuffer);        spin_lock(&sv->sv_lock);        LASSERT (sv->sv_nprune >= 0);        LASSERT (!sv->sv_shuttingdown);        sv->sv_nprune += nbuffer;        spin_unlock(&sv->sv_lock);        return;}/* returns 1 if sv has finished, otherwise 0 */intsrpc_finish_service (srpc_service_t *sv){        srpc_server_rpc_t *rpc;        srpc_buffer_t     *buf;        spin_lock(&sv->sv_lock);        LASSERT (sv->sv_shuttingdown); /* srpc_shutdown_service called */        if (sv->sv_nposted_msg != 0 || !list_empty(&sv->sv_active_rpcq)) {                CDEBUG (D_NET,                        "waiting for %d posted buffers to unlink and "                        "in-flight RPCs to die.\n",                        sv->sv_nposted_msg);                if (!list_empty(&sv->sv_active_rpcq)) {                        rpc = list_entry(sv->sv_active_rpcq.next,                                         srpc_server_rpc_t, srpc_list);                        CDEBUG (D_NETERROR,                                "Active RPC on shutdown: sv %s, peer %s, "                                "wi %s scheduled %d running %d, "                                "ev fired %d type %d status %d lnet %d\n",                                sv->sv_name, libcfs_id2str(rpc->srpc_peer),                                swi_state2str(rpc->srpc_wi.wi_state),                                rpc->srpc_wi.wi_scheduled,                                rpc->srpc_wi.wi_running,                                rpc->srpc_ev.ev_fired,                                rpc->srpc_ev.ev_type,                                rpc->srpc_ev.ev_status,                                rpc->srpc_ev.ev_lnet);                }                spin_unlock(&sv->sv_lock);                return 0;        }        spin_unlock(&sv->sv_lock); /* no lock needed from now on */        for (;;) {                struct list_head *q;                if (!list_empty(&sv->sv_posted_msgq))                        q = &sv->sv_posted_msgq;                else if (!list_empty(&sv->sv_blocked_msgq))                        q = &sv->sv_blocked_msgq;                else                        break;                buf = list_entry(q->next, srpc_buffer_t, buf_list);                list_del(&buf->buf_list);                LIBCFS_FREE(buf, sizeof(*buf));        }        while (!list_empty(&sv->sv_free_rpcq)) {                rpc = list_entry(sv->sv_free_rpcq.next,                                 srpc_server_rpc_t, srpc_list);                list_del(&rpc->srpc_list);                LIBCFS_FREE(rpc, sizeof(*rpc));        }        return 1;}/* called with sv->sv_lock held */voidsrpc_service_recycle_buffer (srpc_service_t *sv, srpc_buffer_t *buf){        if (sv->sv_shuttingdown) goto free;        if (sv->sv_nprune == 0) {                if (srpc_service_post_buffer(sv, buf) != 0)                        CWARN ("Failed to post %s buffer\n", sv->sv_name);                return;        }        sv->sv_nprune--;free:        spin_unlock(&sv->sv_lock);        LIBCFS_FREE(buf, sizeof(*buf));        spin_lock(&sv->sv_lock);}voidsrpc_shutdown_service (srpc_service_t *sv){        srpc_server_rpc_t *rpc;        srpc_buffer_t     *buf;        spin_lock(&sv->sv_lock);        CDEBUG (D_NET, "Shutting down service: id %d, name %s\n",                sv->sv_id, sv->sv_name);        sv->sv_shuttingdown = 1; /* i.e. no new active RPC */        /* schedule in-flight RPCs to notice the shutdown */        list_for_each_entry (rpc, &sv->sv_active_rpcq, srpc_list) {                swi_schedule_workitem(&rpc->srpc_wi);        }        spin_unlock(&sv->sv_lock);        /* OK to traverse sv_posted_msgq without lock, since no one         * touches sv_posted_msgq now */        list_for_each_entry (buf, &sv->sv_posted_msgq, buf_list)                LNetMDUnlink(buf->buf_mdh);        return;}intsrpc_send_request (srpc_client_rpc_t *rpc){        srpc_event_t *ev = &rpc->crpc_reqstev;        int           rc;        ev->ev_fired = 0;        ev->ev_data  = rpc;        ev->ev_type  = SRPC_REQUEST_SENT;        rc = srpc_post_active_rqtbuf(rpc->crpc_dest, rpc->crpc_service,                                     &rpc->crpc_reqstmsg, sizeof(srpc_msg_t),                                     &rpc->crpc_reqstmdh, ev);        if (rc != 0) {                LASSERT (rc == -ENOMEM);                ev->ev_fired = 1;  /* no more event expected */        }        return rc;}intsrpc_prepare_reply (srpc_client_rpc_t *rpc){        srpc_event_t *ev = &rpc->crpc_replyev;        __u64        *id = &rpc->crpc_reqstmsg.msg_body.reqst.rpyid;        int           rc;        ev->ev_fired = 0;        ev->ev_data  = rpc;        ev->ev_type  = SRPC_REPLY_RCVD;        *id = srpc_next_id();        rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, *id,                                    &rpc->crpc_replymsg, sizeof(srpc_msg_t),                                    LNET_MD_OP_PUT, rpc->crpc_dest,                                    &rpc->crpc_replymdh, ev);        if (rc != 0) {                LASSERT (rc == -ENOMEM);                ev->ev_fired = 1;  /* no more event expected */        }        return rc;}intsrpc_prepare_bulk (srpc_client_rpc_t *rpc){        srpc_bulk_t  *bk = &rpc->crpc_bulk;        srpc_event_t *ev = &rpc->crpc_bulkev;        __u64        *id = &rpc->crpc_reqstmsg.msg_body.reqst.bulkid;        int           rc;        int           opt;        LASSERT (bk->bk_niov <= LNET_MAX_IOV);        if (bk->bk_niov == 0) return 0; /* nothing to do */        opt = bk->bk_sink ? LNET_MD_OP_PUT : LNET_MD_OP_GET;#ifdef __KERNEL__        opt |= LNET_MD_KIOV;#else        opt |= LNET_MD_IOVEC;#endif        ev->ev_fired = 0;        ev->ev_data  = rpc;        ev->ev_type  = SRPC_BULK_REQ_RCVD;        *id = srpc_next_id();        rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, *id,                                    &bk->bk_iovs[0], bk->bk_niov, opt,                                    rpc->crpc_dest, &bk->bk_mdh, ev);        if (rc != 0) {                LASSERT (rc == -ENOMEM);                ev->ev_fired = 1;  /* no more event expected */        }        return rc;}intsrpc_do_bulk (srpc_server_rpc_t *rpc){        srpc_event_t  *ev = &rpc->srpc_ev;        srpc_bulk_t   *bk = rpc->srpc_bulk;        __u64          id = rpc->srpc_reqstbuf->buf_msg.msg_body.reqst.bulkid;        int            rc;        int            opt;        LASSERT (bk != NULL);        opt = bk->bk_sink ? LNET_MD_OP_GET : LNET_MD_OP_PUT;#ifdef __KERNEL__        opt |= LNET_MD_KIOV;#else        opt |= LNET_MD_IOVEC;#endif        ev->ev_fired = 0;        ev->ev_data  = rpc;        ev->ev_type  = bk->bk_sink ? SRPC_BULK_GET_RPLD : SRPC_BULK_PUT_SENT;        rc = srpc_post_active_rdma(SRPC_RDMA_PORTAL, id,                                   &bk->bk_iovs[0], bk->bk_niov, opt,                                   rpc->srpc_peer, rpc->srpc_self,                                   &bk->bk_mdh, ev);        if (rc != 0)                ev->ev_fired = 1;  /* no more event expected */        return rc;}/* called with srpc_service_t::sv_lock held */inline voidsrpc_schedule_server_rpc (srpc_server_rpc_t *rpc){        srpc_service_t *sv = rpc->srpc_service;        if (sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID)                swi_schedule_workitem(&rpc->srpc_wi);        else    /* framework RPCs are handled one by one */                swi_schedule_serial_workitem(&rpc->srpc_wi);        return;}/* only called from srpc_handle_rpc */voidsrpc_server_rpc_done (srpc_server_rpc_t *rpc, int status){        srpc_service_t *sv = rpc->srpc_service;        srpc_buffer_t  *buffer;        LASSERT (status != 0 || rpc->srpc_wi.wi_state == SWI_STATE_DONE);        rpc->srpc_status = status;        CDEBUG (status == 0 ? D_NET : D_NETERROR,                "Server RPC done: service %s, peer %s, status %s:%d\n",                sv->sv_name, libcfs_id2str(rpc->srpc_peer),                swi_state2str(rpc->srpc_wi.wi_state), status);        if (status != 0) {                spin_lock(&srpc_data.rpc_glock);                srpc_data.rpc_counters.rpcs_dropped++;                spin_unlock(&srpc_data.rpc_glock);        }        if (rpc->srpc_done != NULL)                (*rpc->srpc_done) (rpc);        LASSERT (rpc->srpc_bulk == NULL);        spin_lock(&sv->sv_lock);        if (rpc->srpc_reqstbuf != NULL) {                /* NB might drop sv_lock in srpc_service_recycle_buffer, but                 * sv won't go away for sv_active_rpcq must not be empty */                srpc_service_recycle_buffer(sv, rpc->srpc_reqstbuf);                rpc->srpc_reqstbuf = NULL;        }        list_del(&rpc->srpc_list); /* from sv->sv_active_rpcq */        /*         * No one can schedule me now since:         * - I'm not on sv_active_rpcq.         * - all LNet events have been fired.         * Cancel pending schedules and prevent future schedule attempts:         */        LASSERT (rpc->srpc_ev.ev_fired);        swi_kill_workitem(&rpc->srpc_wi);        if (!sv->sv_shuttingdown && !list_empty(&sv->sv_blocked_msgq)) {                buffer = list_entry(sv->sv_blocked_msgq.next,                                    srpc_buffer_t, buf_list);                list_del(&buffer->buf_list);                srpc_init_server_rpc(rpc, sv, buffer);                list_add_tail(&rpc->srpc_list, &sv->sv_active_rpcq);                srpc_schedule_server_rpc(rpc);        } else {                list_add(&rpc->srpc_list, &sv->sv_free_rpcq);        }        spin_unlock(&sv->sv_lock);        return;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -