⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rpc.c

📁 lustre 1.6.5 source code
💻 C
📖 第 1 页 / 共 4 页
字号:
        rpc->crpc_aborted = 1;        rpc->crpc_status  = why;        if (peer != NULL) {                spin_lock(&peer->stp_lock);                if (!list_empty(&rpc->crpc_privl)) { /* still queued */                        list_del_init(&rpc->crpc_privl);                        srpc_client_rpc_decref(rpc); /* --ref for peer->*rpcq */                        rpc->crpc_peer = NULL;       /* no credit taken */                }                spin_unlock(&peer->stp_lock);        }        swi_schedule_workitem(&rpc->crpc_wi);        return;}/* called with rpc->crpc_lock held */voidsrpc_post_rpc (srpc_client_rpc_t *rpc){        srpc_peer_t *peer;        LASSERT (!rpc->crpc_aborted);        LASSERT (rpc->crpc_peer == NULL);        LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);        LASSERT ((rpc->crpc_bulk.bk_len & ~CFS_PAGE_MASK) == 0);        CDEBUG (D_NET, "Posting RPC: peer %s, service %d, timeout %d\n",                libcfs_id2str(rpc->crpc_dest), rpc->crpc_service,                rpc->crpc_timeout);        srpc_add_client_rpc_timer(rpc);        peer = srpc_nid2peer(rpc->crpc_dest.nid);        if (peer == NULL) {                srpc_abort_rpc(rpc, -ENOMEM);                return;        }        srpc_queue_rpc(peer, rpc);        spin_unlock(&rpc->crpc_lock);        srpc_check_sends(peer, 0);        spin_lock(&rpc->crpc_lock);        return;}intsrpc_send_reply (srpc_server_rpc_t *rpc){        srpc_event_t   *ev = &rpc->srpc_ev;        srpc_msg_t     *msg = &rpc->srpc_replymsg;        srpc_buffer_t  *buffer = rpc->srpc_reqstbuf;        srpc_service_t *sv = rpc->srpc_service;        __u64           rpyid;        int             rc;        LASSERT (buffer != NULL);        rpyid = buffer->buf_msg.msg_body.reqst.rpyid;        spin_lock(&sv->sv_lock);        if (!sv->sv_shuttingdown &&            sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID) {                /* Repost buffer before replying since test client                 * might send me another RPC once it gets the reply */                if (srpc_service_post_buffer(sv, buffer) != 0)                        CWARN ("Failed to repost %s buffer\n", sv->sv_name);                rpc->srpc_reqstbuf = NULL;        }        spin_unlock(&sv->sv_lock);        ev->ev_fired = 0;        ev->ev_data  = rpc;        ev->ev_type  = SRPC_REPLY_SENT;        msg->msg_magic   = SRPC_MSG_MAGIC;        msg->msg_version = SRPC_MSG_VERSION;        msg->msg_type    = srpc_service2reply(sv->sv_id);        rc = srpc_post_active_rdma(SRPC_RDMA_PORTAL, rpyid, msg,                                   sizeof(*msg), LNET_MD_OP_PUT,                                   rpc->srpc_peer, rpc->srpc_self,                                   &rpc->srpc_replymdh, ev);        if (rc != 0)                ev->ev_fired = 1;  /* no more event expected */        return rc;}/* when in kernel always called with LNET_LOCK() held, and in thread context */void srpc_lnet_ev_handler (lnet_event_t *ev){        srpc_event_t      *rpcev = ev->md.user_ptr;        srpc_client_rpc_t *crpc;        srpc_server_rpc_t *srpc;        srpc_buffer_t     *buffer;        srpc_service_t    *sv;        srpc_msg_t        *msg;        srpc_msg_type_t    type;        LASSERT (!in_interrupt());        if (ev->status != 0) {                spin_lock(&srpc_data.rpc_glock);                srpc_data.rpc_counters.errors++;                spin_unlock(&srpc_data.rpc_glock);        }        rpcev->ev_lnet = ev->type;        switch (rpcev->ev_type) {        default:                LBUG ();        case SRPC_REQUEST_SENT:                if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) {                        spin_lock(&srpc_data.rpc_glock);                        srpc_data.rpc_counters.rpcs_sent++;                        spin_unlock(&srpc_data.rpc_glock);                }        case SRPC_REPLY_RCVD:        case SRPC_BULK_REQ_RCVD:                crpc = rpcev->ev_data;                LASSERT (rpcev == &crpc->crpc_reqstev ||                         rpcev == &crpc->crpc_replyev ||                         rpcev == &crpc->crpc_bulkev);                spin_lock(&crpc->crpc_lock);                LASSERT (rpcev->ev_fired == 0);                rpcev->ev_fired  = 1;                rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ?                                                 -EINTR : ev->status;                swi_schedule_workitem(&crpc->crpc_wi);                spin_unlock(&crpc->crpc_lock);                break;        case SRPC_REQUEST_RCVD:                sv = rpcev->ev_data;                LASSERT (rpcev == &sv->sv_ev);                spin_lock(&sv->sv_lock);                LASSERT (ev->unlinked);                LASSERT (ev->type == LNET_EVENT_PUT ||                         ev->type == LNET_EVENT_UNLINK);                LASSERT (ev->type != LNET_EVENT_UNLINK ||                         sv->sv_shuttingdown);                buffer = container_of(ev->md.start, srpc_buffer_t, buf_msg);                buffer->buf_peer = ev->initiator;                buffer->buf_self = ev->target.nid;                sv->sv_nposted_msg--;                LASSERT (sv->sv_nposted_msg >= 0);                if (sv->sv_shuttingdown) {                        /* Leave buffer on sv->sv_posted_msgq since                          * srpc_finish_service needs to traverse it. */                        spin_unlock(&sv->sv_lock);                        break;                }                list_del(&buffer->buf_list); /* from sv->sv_posted_msgq */                msg = &buffer->buf_msg;                type = srpc_service2request(sv->sv_id);                if (ev->status != 0 || ev->mlength != sizeof(*msg) ||                    (msg->msg_type != type &&                      msg->msg_type != __swab32(type)) ||                    (msg->msg_magic != SRPC_MSG_MAGIC &&                     msg->msg_magic != __swab32(SRPC_MSG_MAGIC))) {                        CERROR ("Dropping RPC (%s) from %s: "                                "status %d mlength %d type %u magic %u.\n",                                sv->sv_name, libcfs_id2str(ev->initiator),                                ev->status, ev->mlength,                                msg->msg_type, msg->msg_magic);                        /* NB might drop sv_lock in srpc_service_recycle_buffer,                         * sv_nposted_msg++ as an implicit reference to prevent                         * sv from disappearing under me */                        sv->sv_nposted_msg++;                        srpc_service_recycle_buffer(sv, buffer);                        sv->sv_nposted_msg--;                        spin_unlock(&sv->sv_lock);                        if (ev->status == 0) { /* status!=0 counted already */                                spin_lock(&srpc_data.rpc_glock);                                srpc_data.rpc_counters.errors++;                                spin_unlock(&srpc_data.rpc_glock);                        }                        break;                }                if (!list_empty(&sv->sv_free_rpcq)) {                        srpc = list_entry(sv->sv_free_rpcq.next,                                          srpc_server_rpc_t, srpc_list);                        list_del(&srpc->srpc_list);                        srpc_init_server_rpc(srpc, sv, buffer);                        list_add_tail(&srpc->srpc_list, &sv->sv_active_rpcq);                        srpc_schedule_server_rpc(srpc);                } else {                        list_add_tail(&buffer->buf_list, &sv->sv_blocked_msgq);                }                spin_unlock(&sv->sv_lock);                spin_lock(&srpc_data.rpc_glock);                srpc_data.rpc_counters.rpcs_rcvd++;                spin_unlock(&srpc_data.rpc_glock);                break;        case SRPC_BULK_GET_RPLD:                LASSERT (ev->type == LNET_EVENT_SEND ||                         ev->type == LNET_EVENT_REPLY ||                         ev->type == LNET_EVENT_UNLINK);                if (ev->type == LNET_EVENT_SEND &&                     ev->status == 0 && !ev->unlinked)                        break; /* wait for the final LNET_EVENT_REPLY */        case SRPC_BULK_PUT_SENT:                if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) {                        spin_lock(&srpc_data.rpc_glock);                        if (rpcev->ev_type == SRPC_BULK_GET_RPLD)                                srpc_data.rpc_counters.bulk_get += ev->mlength;                        else                                srpc_data.rpc_counters.bulk_put += ev->mlength;                        spin_unlock(&srpc_data.rpc_glock);                }        case SRPC_REPLY_SENT:                srpc = rpcev->ev_data;                sv = srpc->srpc_service;                LASSERT (rpcev == &srpc->srpc_ev);                spin_lock(&sv->sv_lock);                rpcev->ev_fired  = 1;                rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ?                                                 -EINTR : ev->status;                srpc_schedule_server_rpc(srpc);                spin_unlock(&sv->sv_lock);                break;        }        return;}#ifndef __KERNEL__intsrpc_check_event (int timeout){        lnet_event_t ev;        int          rc;        int          i;        rc = LNetEQPoll(&srpc_data.rpc_lnet_eq, 1,                        timeout * 1000, &ev, &i);        if (rc == 0) return 0;                LASSERT (rc == -EOVERFLOW || rc == 1);                /* We can't affort to miss any events... */        if (rc == -EOVERFLOW) {                CERROR ("Dropped an event!!!\n");                abort();        }                        srpc_lnet_ev_handler(&ev);        return 1;}#endifintsrpc_startup (void){        int i;        int rc;        memset(&srpc_data, 0, sizeof(struct smoketest_rpc));        spin_lock_init(&srpc_data.rpc_glock);        /* 1 second pause to avoid timestamp reuse */        cfs_pause(cfs_time_seconds(1));        srpc_data.rpc_matchbits = ((__u64) cfs_time_current_sec()) << 48;        srpc_data.rpc_state = SRPC_STATE_NONE;        LIBCFS_ALLOC(srpc_data.rpc_peers,                     sizeof(struct list_head) * SRPC_PEER_HASH_SIZE);        if (srpc_data.rpc_peers == NULL) {                CERROR ("Failed to alloc peer hash.\n");                return -ENOMEM;        }        for (i = 0; i < SRPC_PEER_HASH_SIZE; i++)                CFS_INIT_LIST_HEAD(&srpc_data.rpc_peers[i]);#ifdef __KERNEL__        rc = LNetNIInit(LUSTRE_SRV_LNET_PID);#else        if (the_lnet.ln_server_mode_flag)                rc = LNetNIInit(LUSTRE_SRV_LNET_PID);        else                rc = LNetNIInit(getpid() | LNET_PID_USERFLAG);#endif        if (rc < 0) {                CERROR ("LNetNIInit() has failed: %d\n", rc);                LIBCFS_FREE(srpc_data.rpc_peers,                            sizeof(struct list_head) * SRPC_PEER_HASH_SIZE);                return rc;        }        srpc_data.rpc_state = SRPC_STATE_NI_INIT;        srpc_data.rpc_lnet_eq = LNET_EQ_NONE;#ifdef __KERNEL__        rc = LNetEQAlloc(16, srpc_lnet_ev_handler, &srpc_data.rpc_lnet_eq);#else        rc = LNetEQAlloc(10240, LNET_EQ_HANDLER_NONE, &srpc_data.rpc_lnet_eq);#endif        if (rc != 0) {                CERROR("LNetEQAlloc() has failed: %d\n", rc);                goto bail;        }        rc = LNetSetLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL);        LASSERT (rc == 0);        srpc_data.rpc_state = SRPC_STATE_EQ_INIT;        rc = swi_startup();        if (rc != 0)                goto bail;        srpc_data.rpc_state = SRPC_STATE_WI_INIT;        rc = stt_startup();bail:        if (rc != 0)                srpc_shutdown();        else                srpc_data.rpc_state = SRPC_STATE_RUNNING;        return rc;}voidsrpc_shutdown (void){        int i;        int rc;        int state;        state = srpc_data.rpc_state;        srpc_data.rpc_state = SRPC_STATE_STOPPING;        switch (state) {        default:                LBUG ();        case SRPC_STATE_RUNNING:                spin_lock(&srpc_data.rpc_glock);                for (i = 0; i <= SRPC_SERVICE_MAX_ID; i++) {                        srpc_service_t *sv = srpc_data.rpc_services[i];                        LASSERTF (sv == NULL,                                  "service not empty: id %d, name %s\n",                                  i, sv->sv_name);                }                spin_unlock(&srpc_data.rpc_glock);                stt_shutdown();        case SRPC_STATE_WI_INIT:                swi_shutdown();        case SRPC_STATE_EQ_INIT:                rc = LNetClearLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL);                LASSERT (rc == 0);                rc = LNetEQFree(srpc_data.rpc_lnet_eq);                LASSERT (rc == 0); /* the EQ should have no user by now */        case SRPC_STATE_NI_INIT:                LNetNIFini();                break;        }        /* srpc_peer_t's are kept in hash until shutdown */        for (i = 0; i < SRPC_PEER_HASH_SIZE; i++) {                srpc_peer_t *peer;                while (!list_empty(&srpc_data.rpc_peers[i])) {                        peer = list_entry(srpc_data.rpc_peers[i].next,                                          srpc_peer_t, stp_list);                        list_del(&peer->stp_list);                        LASSERT (list_empty(&peer->stp_rpcq));                        LASSERT (list_empty(&peer->stp_ctl_rpcq));                        LASSERT (peer->stp_credits == SRPC_PEER_CREDITS);                        LIBCFS_FREE(peer, sizeof(srpc_peer_t));                }        }        LIBCFS_FREE(srpc_data.rpc_peers,                    sizeof(struct list_head) * SRPC_PEER_HASH_SIZE);        return;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -