client.c

来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,808 行 · 第 1/5 页

C
1,808
字号
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * *  Copyright (c) 2002, 2003 Cluster File Systems, Inc. * *   This file is part of the Lustre file system, http://www.lustre.org *   Lustre is a trademark of Cluster File Systems, Inc. * *   You may have signed or agreed to another license before downloading *   this software.  If so, you are bound by the terms and conditions *   of that agreement, and the following does not apply to you.  See the *   LICENSE file included with this distribution for more information. * *   If you did not agree to a different license, then this copy of Lustre *   is open source software; you can redistribute it and/or modify it *   under the terms of version 2 of the GNU General Public License as *   published by the Free Software Foundation. * *   In either case, Lustre is distributed in the hope that it will be *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the *   license text for more details. * */#define DEBUG_SUBSYSTEM S_RPC#ifndef __KERNEL__#include <errno.h>#include <signal.h>#include <liblustre.h>#endif#include <obd_support.h>#include <obd_class.h>#include <lustre_lib.h>#include <lustre_ha.h>#include <lustre_import.h>#include "ptlrpc_internal.h"void ptlrpc_init_client(int req_portal, int rep_portal, char *name,                        struct ptlrpc_client *cl){        cl->cli_request_portal = req_portal;        cl->cli_reply_portal   = rep_portal;        cl->cli_name           = name;}struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid){        struct ptlrpc_connection *c;        lnet_nid_t                self;        lnet_process_id_t         peer;        int                       err;        err = ptlrpc_uuid_to_peer(uuid, &peer, &self);        if (err != 0) {                CERROR("cannot find peer %s!\n", uuid->uuid);                return NULL;        }        c = ptlrpc_get_connection(peer, self, uuid);        if (c) {                memcpy(c->c_remote_uuid.uuid,                       uuid->uuid, sizeof(c->c_remote_uuid.uuid));        }        CDEBUG(D_INFO, "%s -> %p\n", uuid->uuid, c);        return c;}void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,                                 struct obd_uuid *uuid){        lnet_nid_t        self;        lnet_process_id_t peer;        int               err;        err = ptlrpc_uuid_to_peer(uuid, &peer, &self);        if (err != 0) {                CERROR("cannot find peer %s!\n", uuid->uuid);                return;        }        conn->c_peer = peer;        conn->c_self = self;        return;}static inline struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal){        struct ptlrpc_bulk_desc *desc;        OBD_ALLOC(desc, offsetof (struct ptlrpc_bulk_desc, bd_iov[npages]));        if (!desc)                return NULL;        spin_lock_init(&desc->bd_lock);        cfs_waitq_init(&desc->bd_waitq);        desc->bd_max_iov = npages;        desc->bd_iov_count = 0;        desc->bd_md_h = LNET_INVALID_HANDLE;        desc->bd_portal = portal;        desc->bd_type = type;        return desc;}struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,                                               int npages, int type, int portal){        struct obd_import *imp = req->rq_import;        struct ptlrpc_bulk_desc *desc;        ENTRY;        LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);        desc = new_bulk(npages, type, portal);        if (desc == NULL)                RETURN(NULL);        desc->bd_import_generation = req->rq_import_generation;        desc->bd_import = class_import_get(imp);        desc->bd_req = req;        desc->bd_cbid.cbid_fn  = client_bulk_callback;        desc->bd_cbid.cbid_arg = desc;        /* This makes req own desc, and free it when she frees herself */        req->rq_bulk = desc;        return desc;}struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp (struct ptlrpc_request *req,                                               int npages, int type, int portal){        struct obd_export *exp = req->rq_export;        struct ptlrpc_bulk_desc *desc;        ENTRY;        LASSERT(type == BULK_PUT_SOURCE || type == BULK_GET_SINK);        desc = new_bulk(npages, type, portal);        if (desc == NULL)                RETURN(NULL);        desc->bd_export = class_export_get(exp);        desc->bd_req = req;        desc->bd_cbid.cbid_fn  = server_bulk_callback;        desc->bd_cbid.cbid_arg = desc;        /* NB we don't assign rq_bulk here; server-side requests are         * re-used, and the handler frees the bulk desc explicitly. */        return desc;}void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,                           cfs_page_t *page, int pageoffset, int len){        LASSERT(desc->bd_iov_count < desc->bd_max_iov);        LASSERT(page != NULL);        LASSERT(pageoffset >= 0);        LASSERT(len > 0);        LASSERT(pageoffset + len <= CFS_PAGE_SIZE);        desc->bd_nob += len;        ptlrpc_add_bulk_page(desc, page, pageoffset, len);}void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc){        ENTRY;        LASSERT(desc != NULL);        LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */        LASSERT(!desc->bd_network_rw);         /* network hands off or */        LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));        if (desc->bd_export)                class_export_put(desc->bd_export);        else                class_import_put(desc->bd_import);        OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc,                                bd_iov[desc->bd_max_iov]));        EXIT;}/* Set server timelimit for this req */void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req){        __u32 serv_est;        int idx;        struct imp_at *at;        LASSERT(req->rq_import);        if (AT_OFF) {                /* non-AT settings */                req->rq_timeout = req->rq_import->imp_server_timeout ?                         obd_timeout / 2 : obd_timeout;                lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout);                return;        }        at = &req->rq_import->imp_at;        idx = import_at_get_index(req->rq_import,                                   req->rq_request_portal);        serv_est = at_get(&at->iat_service_estimate[idx]);        /* add an arbitrary minimum: 125% +5 sec */        req->rq_timeout = serv_est + (serv_est >> 2) + 5;        /* We could get even fancier here, using history to predict increased           loading... */                     /* Let the server know what this RPC timeout is by putting it in the            reqmsg*/        lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout);}/* Adjust max service estimate based on server value */static void ptlrpc_at_adj_service(struct ptlrpc_request *req) {        int idx;        unsigned int serv_est, oldse;        struct imp_at *at = &req->rq_import->imp_at;        LASSERT(req->rq_import);                /* service estimate is returned in the repmsg timeout field,           may be 0 on err */        serv_est = lustre_msg_get_timeout(req->rq_repmsg);        idx = import_at_get_index(req->rq_import, req->rq_request_portal);        /* max service estimates are tracked on the server side,           so just keep minimal history here */        oldse = at_add(&at->iat_service_estimate[idx], serv_est);        if (oldse != 0)                CDEBUG(D_ADAPTTO, "The RPC service estimate for %s ptl %d "                       "has changed from %d to %d\n",                        req->rq_import->imp_obd->obd_name,req->rq_request_portal,                       oldse, at_get(&at->iat_service_estimate[idx]));}/* Expected network latency per remote node (secs) */int ptlrpc_at_get_net_latency(struct ptlrpc_request *req){        return AT_OFF ? 0 : at_get(&req->rq_import->imp_at.iat_net_latency);}/* Adjust expected network latency */static void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req){        unsigned int st, nl, oldnl;        struct imp_at *at = &req->rq_import->imp_at;        time_t now = cfs_time_current_sec();        LASSERT(req->rq_import);                st = lustre_msg_get_service_time(req->rq_repmsg);                /* Network latency is total time less server processing time */        nl = max_t(int, now - req->rq_sent - st, 0) + 1/*st rounding*/;        if (st > now - req->rq_sent + 2 /* rounding */)                CERROR("Reported service time %u > total measured time %ld\n",                       st, now - req->rq_sent);        oldnl = at_add(&at->iat_net_latency, nl);        if (oldnl != 0)                CDEBUG(D_ADAPTTO, "The network latency for %s (nid %s) "                       "has changed from %d to %d\n",                        req->rq_import->imp_obd->obd_name,                       obd_uuid2str(                               &req->rq_import->imp_connection->c_remote_uuid),                       oldnl, at_get(&at->iat_net_latency));}static int unpack_reply(struct ptlrpc_request *req){        int rc;        /* Clear reply swab mask; we may have already swabbed an early reply */        req->rq_rep_swab_mask = 0;        rc = lustre_unpack_msg(req->rq_repmsg, req->rq_nob_received);        if (rc) {                DEBUG_REQ(D_ERROR, req, "unpack_rep failed: %d", rc);                return(-EPROTO);        }        rc = lustre_unpack_rep_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF);        if (rc) {                DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: %d", rc);                return(-EPROTO);        }        return 0;}/* Handle an early reply message.   We can't risk the real reply coming in and changing rq_repmsg,    so this fn must be called under the rq_lock */static int ptlrpc_at_recv_early_reply(struct ptlrpc_request *req) {        struct lustre_msg *oldmsg, *msgcpy;        time_t olddl;        int oldlen, rc;        ENTRY;        req->rq_early = 0;        rc = unpack_reply(req);        if (rc)                 /* Let's just ignore it - same as if it never got here */                 RETURN(rc);        /* We've got to make sure another early reply doesn't land on           top of our current repbuf.  Make a copy and verify checksum. */        oldlen = req->rq_replen;        spin_unlock(&req->rq_lock);        OBD_ALLOC(msgcpy, oldlen);        if (!msgcpy) {                spin_lock(&req->rq_lock);                RETURN(-ENOMEM);        }        spin_lock(&req->rq_lock);        /* Another reply might have changed the repmsg and replen while            we dropped the lock; doesn't really matter, just use the latest.           If it doesn't fit in oldlen, checksum will be wrong. */        oldmsg = req->rq_repmsg;        memcpy(msgcpy, oldmsg, oldlen);        if (lustre_msg_get_cksum(msgcpy) !=             lustre_msg_calc_cksum(msgcpy)) {                CDEBUG(D_ADAPTTO, "Early reply checksum mismatch, "                       "discarding %x != %x\n", lustre_msg_get_cksum(msgcpy),                       lustre_msg_calc_cksum(msgcpy));                GOTO(out, rc = -EINVAL);         }        /* Our copied msg is valid, now we can adjust the timeouts without            worrying that a new reply will land on the copy. */        req->rq_repmsg = msgcpy;        /* Expecting to increase the service time estimate here */        ptlrpc_at_adj_service(req);        ptlrpc_at_adj_net_latency(req);        /* Adjust the local timeout for this req */        ptlrpc_at_set_req_timeout(req);        olddl = req->rq_deadline;        /* server assumes it now has rq_timeout from when it sent the            early reply, so client should give it at least that long. */        req->rq_deadline = cfs_time_current_sec() + req->rq_timeout +                     ptlrpc_at_get_net_latency(req);        DEBUG_REQ(D_ADAPTTO, req,                   "Early reply #%d, new deadline in %lds (%+lds)",                   req->rq_early_count, req->rq_deadline -                  cfs_time_current_sec(), req->rq_deadline - olddl);                req->rq_repmsg = oldmsg;

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?