niobuf.c

来自「lustre 1.6.5 source code」· C语言 代码 · 共 642 行 · 第 1/2 页

C
642
字号
        LASSERT (req->rq_reqmsg != NULL);        LASSERT (req->rq_repmsg != NULL);        LASSERT (rs != NULL);        LASSERT (req->rq_repmsg == rs->rs_msg);        LASSERT ((flags & PTLRPC_REPLY_MAYBE_DIFFICULT) || !rs->rs_difficult);        LASSERT (rs->rs_cb_id.cbid_fn == reply_out_callback);        LASSERT (rs->rs_cb_id.cbid_arg == rs);        /* There may be no rq_export during failover */        if (req->rq_export && req->rq_export->exp_obd &&            req->rq_export->exp_obd->obd_fail) {                /* Failed obd's only send ENODEV */                req->rq_type = PTL_RPC_MSG_ERR;                req->rq_status = -ENODEV;                CDEBUG(D_HA, "sending ENODEV from failed obd %d\n",                       req->rq_export->exp_obd->obd_minor);        }        if (req->rq_type != PTL_RPC_MSG_ERR)                req->rq_type = PTL_RPC_MSG_REPLY;        lustre_msg_set_type(req->rq_repmsg, req->rq_type);        lustre_msg_set_status(req->rq_repmsg, req->rq_status);        lustre_msg_set_opc(req->rq_repmsg, lustre_msg_get_opc(req->rq_reqmsg));                service_time = max_t(int, cfs_time_current_sec() -                             req->rq_arrival_time.tv_sec, 1);        if (!(flags & PTLRPC_REPLY_EARLY) &&             (req->rq_type != PTL_RPC_MSG_ERR)) {                /* early replies and errors don't count toward our service                   time estimate */                int oldse = at_add(&svc->srv_at_estimate, service_time);                if (oldse != 0)                        DEBUG_REQ(D_ADAPTTO, req,                                  "svc %s changed estimate from %d to %d",                                  svc->srv_name, oldse,                                   at_get(&svc->srv_at_estimate));        }        /* Report actual service time for client latency calc */        lustre_msg_set_service_time(req->rq_repmsg, service_time);        /* Report service time estimate for future client reqs */        lustre_msg_set_timeout(req->rq_repmsg, at_get(&svc->srv_at_estimate));        if (req->rq_export && req->rq_export->exp_obd)                target_pack_pool_reply(req);        if (lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT) {                /* early replies go to offset 0, regular replies go after that*/                if (flags & PTLRPC_REPLY_EARLY) {                        offset = 0;                        /* Include a checksum on early replies - must be done                           after all other lustre_msg_set_* */                        lustre_msg_set_cksum(req->rq_repmsg,                                          lustre_msg_calc_cksum(req->rq_repmsg));                } else {                        offset = lustre_msg_early_size();                }        } else {                CDEBUG(D_ADAPTTO, "No early reply support: flags=%#x "                       "req_flags=%#x magic=%d:%x/%x len=%d\n",                       flags, lustre_msg_get_flags(req->rq_reqmsg),                      lustre_msg_is_v1(req->rq_reqmsg),                      lustre_msg_get_magic(req->rq_reqmsg),                      lustre_msg_get_magic(req->rq_repmsg), req->rq_replen);        }        if (req->rq_export == NULL || req->rq_export->exp_connection == NULL)                conn = ptlrpc_get_connection(req->rq_peer, req->rq_self, NULL);        else                conn = ptlrpc_connection_addref(req->rq_export->exp_connection);        if (conn == NULL) {                CERROR("not replying on NULL connection\n"); /* bug 9635 */                return -ENOTCONN;        }                atomic_inc (&svc->srv_outstanding_replies);        ptlrpc_rs_addref(rs);                   /* +1 ref for the network */        req->rq_sent = cfs_time_current_sec();                rc = ptl_send_buf (&rs->rs_md_h, req->rq_repmsg, req->rq_replen,                           rs->rs_difficult ? LNET_ACK_REQ : LNET_NOACK_REQ,                           &rs->rs_cb_id, conn, svc->srv_rep_portal,                           req->rq_xid, offset);        if (rc != 0) {                atomic_dec (&svc->srv_outstanding_replies);                ptlrpc_req_drop_rs(req);        }        ptlrpc_put_connection(conn);        return rc;}int ptlrpc_reply (struct ptlrpc_request *req){        return (ptlrpc_send_reply (req, 0));}int ptlrpc_error(struct ptlrpc_request *req){        int rc;        ENTRY;        if (!req->rq_repmsg) {                rc = lustre_pack_reply(req, 1, NULL, NULL);                if (rc)                        RETURN(rc);        }        req->rq_type = PTL_RPC_MSG_ERR;        rc = ptlrpc_send_reply(req, 0);        RETURN(rc);}int ptl_send_rpc(struct ptlrpc_request *request, int noreply){        int rc;        int rc2;        struct ptlrpc_connection *connection;        lnet_handle_me_t  reply_me_h;        lnet_md_t         reply_md;        struct obd_device *obd = request->rq_import->imp_obd;        ENTRY;        OBD_FAIL_RETURN(OBD_FAIL_PTLRPC_DROP_RPC, 0);         LASSERT (request->rq_type == PTL_RPC_MSG_REQUEST);        /* If this is a re-transmit, we're required to have disengaged         * cleanly from the previous attempt */        LASSERT (!request->rq_receiving_reply);        if (request->rq_import->imp_obd &&            request->rq_import->imp_obd->obd_fail) {                CDEBUG(D_HA, "muting rpc for failed imp obd %s\n",                       request->rq_import->imp_obd->obd_name);                /* this prevents us from waiting in ptlrpc_queue_wait */                request->rq_err = 1;                RETURN(-ENODEV);        }        connection = request->rq_import->imp_connection;        if (request->rq_bulk != NULL) {                rc = ptlrpc_register_bulk (request);                if (rc != 0)                        RETURN(rc);        }        lustre_msg_set_handle(request->rq_reqmsg,                              &request->rq_import->imp_remote_handle);        lustre_msg_set_type(request->rq_reqmsg, PTL_RPC_MSG_REQUEST);        lustre_msg_set_conn_cnt(request->rq_reqmsg,                                request->rq_import->imp_conn_cnt);        lustre_msghdr_set_flags(request->rq_reqmsg,                                request->rq_import->imp_msghdr_flags);        if (!noreply) {                LASSERT (request->rq_replen != 0);                if (request->rq_repbuf == NULL)                        OBD_ALLOC(request->rq_repbuf, request->rq_replen);                if (request->rq_repbuf == NULL)                        GOTO(cleanup_bulk, rc = -ENOMEM);                request->rq_repmsg = NULL;                rc = LNetMEAttach(request->rq_reply_portal,/*XXX FIXME bug 249*/                                  connection->c_peer, request->rq_xid, 0,                                  LNET_UNLINK, LNET_INS_AFTER, &reply_me_h);                if (rc != 0) {                        CERROR("LNetMEAttach failed: %d\n", rc);                        LASSERT (rc == -ENOMEM);                        GOTO(cleanup_repmsg, rc = -ENOMEM);                }        }        spin_lock(&request->rq_lock);        /* If the MD attach succeeds, there _will_ be a reply_in callback */        request->rq_receiving_reply = !noreply;        /* We are responsible for unlinking the reply buffer */        request->rq_must_unlink = !noreply;        /* Clear any flags that may be present from previous sends. */        request->rq_replied = 0;        request->rq_err = 0;        request->rq_timedout = 0;        request->rq_net_err = 0;        request->rq_resend = 0;        request->rq_restart = 0;        spin_unlock(&request->rq_lock);        if (!noreply) {                reply_md.start     = request->rq_repbuf;                reply_md.length    = request->rq_replen;                /* Allow multiple early replies */                reply_md.threshold = LNET_MD_THRESH_INF;                /* Manage remote for early replies */                reply_md.options   = PTLRPC_MD_OPTIONS | LNET_MD_OP_PUT |                        LNET_MD_MANAGE_REMOTE;                reply_md.user_ptr  = &request->rq_reply_cbid;                reply_md.eq_handle = ptlrpc_eq_h;                /* We must see the unlink callback to unset rq_must_unlink,                   so we can't auto-unlink */                rc = LNetMDAttach(reply_me_h, reply_md, LNET_RETAIN,                                  &request->rq_reply_md_h);                if (rc != 0) {                        CERROR("LNetMDAttach failed: %d\n", rc);                        LASSERT (rc == -ENOMEM);                        spin_lock(&request->rq_lock);                        /* ...but the MD attach didn't succeed... */                        request->rq_receiving_reply = 0;                        spin_unlock(&request->rq_lock);                        GOTO(cleanup_me, rc -ENOMEM);                }                CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid "LPU64                       ", portal %u\n",                       request->rq_replen, request->rq_xid,                       request->rq_reply_portal);        }        /* add references on request for request_out_callback */        ptlrpc_request_addref(request);        if (obd->obd_svc_stats != NULL)                lprocfs_counter_add(obd->obd_svc_stats, PTLRPC_REQACTIVE_CNTR,                                    request->rq_import->imp_inflight.counter);        OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_DELAY_SEND, request->rq_timeout + 5);        request->rq_sent = cfs_time_current_sec();        do_gettimeofday(&request->rq_arrival_time);        /* We give the server rq_timeout secs to process the req, and            add the network latency for our local timeout. */        request->rq_deadline = request->rq_sent + request->rq_timeout +                     ptlrpc_at_get_net_latency(request);        ptlrpc_pinger_sending_on_import(request->rq_import);                DEBUG_REQ(D_INFO, request, "send flg=%x",                  lustre_msg_get_flags(request->rq_reqmsg));        rc = ptl_send_buf(&request->rq_req_md_h,                          request->rq_reqmsg, request->rq_reqlen,                          LNET_NOACK_REQ, &request->rq_req_cbid,                          connection,                          request->rq_request_portal,                          request->rq_xid, 0);        if (rc == 0) {                ptlrpc_lprocfs_rpc_sent(request);                RETURN(rc);        }        ptlrpc_req_finished(request);        if (noreply)                RETURN(rc); cleanup_me:        /* MEUnlink is safe; the PUT didn't even get off the ground, and         * nobody apart from the PUT's target has the right nid+XID to         * access the reply buffer. */        rc2 = LNetMEUnlink(reply_me_h);        LASSERT (rc2 == 0);        /* UNLINKED callback called synchronously */        LASSERT (!request->rq_receiving_reply); cleanup_repmsg:        OBD_FREE(request->rq_repbuf, request->rq_replen);        request->rq_repbuf = NULL;        request->rq_repmsg = NULL; //remove cleanup_bulk:        if (request->rq_bulk != NULL)                ptlrpc_unregister_bulk(request);        return rc;}int ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd){        struct ptlrpc_service   *service = rqbd->rqbd_service;        static lnet_process_id_t  match_id = {LNET_NID_ANY, LNET_PID_ANY};        int                      rc;        lnet_md_t                 md;        lnet_handle_me_t          me_h;        CDEBUG(D_NET, "LNetMEAttach: portal %d\n",               service->srv_req_portal);        if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_PTLRPC_RQBD))                return (-ENOMEM);        rc = LNetMEAttach(service->srv_req_portal,                          match_id, 0, ~0, LNET_UNLINK, LNET_INS_AFTER, &me_h);        if (rc != 0) {                CERROR("LNetMEAttach failed: %d\n", rc);                return (-ENOMEM);        }        LASSERT(rqbd->rqbd_refcount == 0);        rqbd->rqbd_refcount = 1;        md.start     = rqbd->rqbd_buffer;        md.length    = service->srv_buf_size;        md.max_size  = service->srv_max_req_size;        md.threshold = LNET_MD_THRESH_INF;        md.options   = PTLRPC_MD_OPTIONS | LNET_MD_OP_PUT | LNET_MD_MAX_SIZE;        md.user_ptr  = &rqbd->rqbd_cbid;        md.eq_handle = ptlrpc_eq_h;                rc = LNetMDAttach(me_h, md, LNET_UNLINK, &rqbd->rqbd_md_h);        if (rc == 0)                return (0);        CERROR("LNetMDAttach failed: %d; \n", rc);        LASSERT (rc == -ENOMEM);        rc = LNetMEUnlink (me_h);        LASSERT (rc == 0);        rqbd->rqbd_refcount = 0;                return (-ENOMEM);}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?