⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rpc.c

📁 lustre 1.6.5 source code
💻 C
📖 第 1 页 / 共 4 页
字号:
}/* handles an incoming RPC */intsrpc_handle_rpc (swi_workitem_t *wi){        srpc_server_rpc_t *rpc = wi->wi_data;        srpc_service_t    *sv = rpc->srpc_service;        srpc_event_t      *ev = &rpc->srpc_ev;        int                rc = 0;        LASSERT (wi == &rpc->srpc_wi);        spin_lock(&sv->sv_lock);        if (sv->sv_shuttingdown) {                spin_unlock(&sv->sv_lock);                if (rpc->srpc_bulk != NULL)                        LNetMDUnlink(rpc->srpc_bulk->bk_mdh);                LNetMDUnlink(rpc->srpc_replymdh);                if (ev->ev_fired) { /* no more event, OK to finish */                        srpc_server_rpc_done(rpc, -ESHUTDOWN);                        return 1;                }                return 0;        }        spin_unlock(&sv->sv_lock);        switch (wi->wi_state) {        default:                LBUG ();        case SWI_STATE_NEWBORN: {                srpc_msg_t           *msg;                srpc_generic_reply_t *reply;                msg = &rpc->srpc_reqstbuf->buf_msg;                reply = &rpc->srpc_replymsg.msg_body.reply;                if (msg->msg_version != SRPC_MSG_VERSION &&                    msg->msg_version != __swab32(SRPC_MSG_VERSION)) {                        CWARN ("Version mismatch: %u, %u expected, from %s\n",                               msg->msg_version, SRPC_MSG_VERSION,                               libcfs_id2str(rpc->srpc_peer));                        reply->status = EPROTO;                } else {                        reply->status = 0;                        rc = (*sv->sv_handler) (rpc);                        LASSERT (reply->status == 0 || !rpc->srpc_bulk);                }                if (rc != 0) {                        srpc_server_rpc_done(rpc, rc);                        return 1;                }                wi->wi_state = SWI_STATE_BULK_STARTED;                if (rpc->srpc_bulk != NULL) {                        rc = srpc_do_bulk(rpc);                        if (rc == 0)                                return 0; /* wait for bulk */                        LASSERT (ev->ev_fired);                        ev->ev_status = rc;                }        }        case SWI_STATE_BULK_STARTED:                LASSERT (rpc->srpc_bulk == NULL || ev->ev_fired);                if (rpc->srpc_bulk != NULL) {                        rc = ev->ev_status;                        if (sv->sv_bulk_ready != NULL)                                rc = (*sv->sv_bulk_ready) (rpc, rc);                        if (rc != 0) {                                srpc_server_rpc_done(rpc, rc);                                return 1;                        }                }                wi->wi_state = SWI_STATE_REPLY_SUBMITTED;                rc = srpc_send_reply(rpc);                if (rc == 0)                        return 0; /* wait for reply */                srpc_server_rpc_done(rpc, rc);                return 1;        case SWI_STATE_REPLY_SUBMITTED:                LASSERT (ev->ev_fired);                wi->wi_state = SWI_STATE_DONE;                srpc_server_rpc_done(rpc, ev->ev_status);                return 1;        }        return 0;}voidsrpc_client_rpc_expired (void *data){        srpc_client_rpc_t *rpc = data;        CWARN ("Client RPC expired: service %d, peer %s, timeout %d.\n",               rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),               rpc->crpc_timeout);        spin_lock(&rpc->crpc_lock);        rpc->crpc_timeout = 0;        srpc_abort_rpc(rpc, -ETIMEDOUT);        spin_unlock(&rpc->crpc_lock);        spin_lock(&srpc_data.rpc_glock);        srpc_data.rpc_counters.rpcs_expired++;        spin_unlock(&srpc_data.rpc_glock);        return;}inline voidsrpc_add_client_rpc_timer (srpc_client_rpc_t *rpc){        stt_timer_t *timer = &rpc->crpc_timer;        if (rpc->crpc_timeout == 0) return;        CFS_INIT_LIST_HEAD(&timer->stt_list);        timer->stt_data    = rpc;        timer->stt_func    = srpc_client_rpc_expired;        timer->stt_expires = cfs_time_add(rpc->crpc_timeout,                                           cfs_time_current_sec());        stt_add_timer(timer);        return;}/*  * Called with rpc->crpc_lock held. * * Upon exit the RPC expiry timer is not queued and the handler is not * running on any CPU. */voidsrpc_del_client_rpc_timer (srpc_client_rpc_t *rpc){             /* timer not planted or already exploded */        if (rpc->crpc_timeout == 0) return;        /* timer sucessfully defused */        if (stt_del_timer(&rpc->crpc_timer)) return;#ifdef __KERNEL__        /* timer detonated, wait for it to explode */        while (rpc->crpc_timeout != 0) {                spin_unlock(&rpc->crpc_lock);                cfs_schedule();                 spin_lock(&rpc->crpc_lock);        }#else        LBUG(); /* impossible in single-threaded runtime */#endif        return;}voidsrpc_check_sends (srpc_peer_t *peer, int credits){        struct list_head  *q;        srpc_client_rpc_t *rpc;        LASSERT (credits >= 0);        LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);        spin_lock(&peer->stp_lock);        peer->stp_credits += credits;        while (peer->stp_credits) {                if (!list_empty(&peer->stp_ctl_rpcq))                        q = &peer->stp_ctl_rpcq;                else if (!list_empty(&peer->stp_rpcq))                        q = &peer->stp_rpcq;                else                        break;                peer->stp_credits--;                rpc = list_entry(q->next, srpc_client_rpc_t, crpc_privl);                list_del_init(&rpc->crpc_privl);                srpc_client_rpc_decref(rpc);  /* --ref for peer->*rpcq */                swi_schedule_workitem(&rpc->crpc_wi);        }        spin_unlock(&peer->stp_lock);        return;}voidsrpc_client_rpc_done (srpc_client_rpc_t *rpc, int status){        swi_workitem_t *wi = &rpc->crpc_wi;        srpc_peer_t    *peer = rpc->crpc_peer;        LASSERT (status != 0 || wi->wi_state == SWI_STATE_DONE);        spin_lock(&rpc->crpc_lock);        rpc->crpc_closed = 1;        if (rpc->crpc_status == 0)                rpc->crpc_status = status;        srpc_del_client_rpc_timer(rpc);        CDEBUG ((status == 0) ? D_NET : D_NETERROR,                "Client RPC done: service %d, peer %s, status %s:%d:%d\n",                rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),                swi_state2str(wi->wi_state), rpc->crpc_aborted, status);        /*         * No one can schedule me now since:         * - RPC timer has been defused.         * - all LNet events have been fired.         * - crpc_closed has been set, preventing srpc_abort_rpc from          *   scheduling me.         * Cancel pending schedules and prevent future schedule attempts:         */        LASSERT (!srpc_event_pending(rpc));        swi_kill_workitem(wi);        spin_unlock(&rpc->crpc_lock);        (*rpc->crpc_done) (rpc);        if (peer != NULL)                srpc_check_sends(peer, 1);        return;}/* sends an outgoing RPC */intsrpc_send_rpc (swi_workitem_t *wi){        int                rc = 0;        srpc_client_rpc_t *rpc = wi->wi_data;        srpc_msg_t        *reply = &rpc->crpc_replymsg;        int                do_bulk = rpc->crpc_bulk.bk_niov > 0;        LASSERT (rpc != NULL);        LASSERT (wi == &rpc->crpc_wi);        spin_lock(&rpc->crpc_lock);        if (rpc->crpc_aborted) {                spin_unlock(&rpc->crpc_lock);                goto abort;        }        spin_unlock(&rpc->crpc_lock);        switch (wi->wi_state) {        default:                LBUG ();        case SWI_STATE_NEWBORN:                LASSERT (!srpc_event_pending(rpc));                rc = srpc_prepare_reply(rpc);                if (rc != 0) {                        srpc_client_rpc_done(rpc, rc);                        return 1;                }                rc = srpc_prepare_bulk(rpc);                if (rc != 0) break;                wi->wi_state = SWI_STATE_REQUEST_SUBMITTED;                rc = srpc_send_request(rpc);                break;        case SWI_STATE_REQUEST_SUBMITTED:                /* CAVEAT EMPTOR: rqtev, rpyev, and bulkev may come in any                 * order; however, they're processed in a strict order:                  * rqt, rpy, and bulk. */                if (!rpc->crpc_reqstev.ev_fired) break;                rc = rpc->crpc_reqstev.ev_status;                if (rc != 0) break;                wi->wi_state = SWI_STATE_REQUEST_SENT;                /* perhaps more events, fall thru */        case SWI_STATE_REQUEST_SENT: {                srpc_msg_type_t type = srpc_service2reply(rpc->crpc_service);                if (!rpc->crpc_replyev.ev_fired) break;                rc = rpc->crpc_replyev.ev_status;                if (rc != 0) break;                if ((reply->msg_type != type &&                      reply->msg_type != __swab32(type)) ||                    (reply->msg_magic != SRPC_MSG_MAGIC &&                     reply->msg_magic != __swab32(SRPC_MSG_MAGIC))) {                        CWARN ("Bad message from %s: type %u (%d expected),"                               " magic %u (%d expected).\n",                               libcfs_id2str(rpc->crpc_dest),                               reply->msg_type, type,                               reply->msg_magic, SRPC_MSG_MAGIC);                        rc = -EBADMSG;                        break;                }                if (do_bulk && reply->msg_body.reply.status != 0) {                        CWARN ("Remote error %d at %s, unlink bulk buffer in "                               "case peer didn't initiate bulk transfer\n",                               reply->msg_body.reply.status,                               libcfs_id2str(rpc->crpc_dest));                        LNetMDUnlink(rpc->crpc_bulk.bk_mdh);                }                wi->wi_state = SWI_STATE_REPLY_RECEIVED;        }        case SWI_STATE_REPLY_RECEIVED:                if (do_bulk && !rpc->crpc_bulkev.ev_fired) break;                rc = do_bulk ? rpc->crpc_bulkev.ev_status : 0;                /* Bulk buffer was unlinked due to remote error. Clear error                 * since reply buffer still contains valid data.                 * NB rpc->crpc_done shouldn't look into bulk data in case of                 * remote error. */                if (do_bulk && rpc->crpc_bulkev.ev_lnet == LNET_EVENT_UNLINK &&                    rpc->crpc_status == 0 && reply->msg_body.reply.status != 0)                        rc = 0;                wi->wi_state = SWI_STATE_DONE;                srpc_client_rpc_done(rpc, rc);                return 1;        }        if (rc != 0) {                spin_lock(&rpc->crpc_lock);                srpc_abort_rpc(rpc, rc);                spin_unlock(&rpc->crpc_lock);        }abort:        if (rpc->crpc_aborted) {                LNetMDUnlink(rpc->crpc_reqstmdh);                LNetMDUnlink(rpc->crpc_replymdh);                LNetMDUnlink(rpc->crpc_bulk.bk_mdh);                if (!srpc_event_pending(rpc)) {                        srpc_client_rpc_done(rpc, -EINTR);                        return 1;                }        }        return 0;}srpc_client_rpc_t *srpc_create_client_rpc (lnet_process_id_t peer, int service,                        int nbulkiov, int bulklen,                        void (*rpc_done)(srpc_client_rpc_t *),                        void (*rpc_fini)(srpc_client_rpc_t *), void *priv){        srpc_client_rpc_t *rpc;	LIBCFS_ALLOC(rpc, offsetof(srpc_client_rpc_t,                                   crpc_bulk.bk_iovs[nbulkiov]));        if (rpc == NULL)                return NULL;        srpc_init_client_rpc(rpc, peer, service, nbulkiov,                             bulklen, rpc_done, rpc_fini, priv);        return rpc;}/* called with rpc->crpc_lock held */static inline voidsrpc_queue_rpc (srpc_peer_t *peer, srpc_client_rpc_t *rpc){        int service = rpc->crpc_service;        LASSERT (peer->stp_nid == rpc->crpc_dest.nid);        LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);        rpc->crpc_peer = peer;        spin_lock(&peer->stp_lock);        /* Framework RPCs that alter session state shall take precedence         * over test RPCs and framework query RPCs */        if (service <= SRPC_FRAMEWORK_SERVICE_MAX_ID &&            service != SRPC_SERVICE_DEBUG &&            service != SRPC_SERVICE_QUERY_STAT)                list_add_tail(&rpc->crpc_privl, &peer->stp_ctl_rpcq);        else                list_add_tail(&rpc->crpc_privl, &peer->stp_rpcq);        srpc_client_rpc_addref(rpc); /* ++ref for peer->*rpcq */        spin_unlock(&peer->stp_lock);        return;}/* called with rpc->crpc_lock held */voidsrpc_abort_rpc (srpc_client_rpc_t *rpc, int why){        srpc_peer_t *peer = rpc->crpc_peer;        LASSERT (why != 0);        if (rpc->crpc_aborted || /* already aborted */            rpc->crpc_closed)    /* callback imminent */                return;        CDEBUG (D_NET,                "Aborting RPC: service %d, peer %s, state %s, why %d\n",                rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),                swi_state2str(rpc->crpc_wi.wi_state), why);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -