client.c
来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,808 行 · 第 1/5 页
C
1,808 行
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (c) 2002, 2003 Cluster File Systems, Inc. * * This file is part of the Lustre file system, http://www.lustre.org * Lustre is a trademark of Cluster File Systems, Inc. * * You may have signed or agreed to another license before downloading * this software. If so, you are bound by the terms and conditions * of that agreement, and the following does not apply to you. See the * LICENSE file included with this distribution for more information. * * If you did not agree to a different license, then this copy of Lustre * is open source software; you can redistribute it and/or modify it * under the terms of version 2 of the GNU General Public License as * published by the Free Software Foundation. * * In either case, Lustre is distributed in the hope that it will be * useful, but WITHOUT ANY WARRANTY; without even the implied warranty * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * license text for more details. * */#define DEBUG_SUBSYSTEM S_RPC#ifndef __KERNEL__#include <errno.h>#include <signal.h>#include <liblustre.h>#endif#include <obd_support.h>#include <obd_class.h>#include <lustre_lib.h>#include <lustre_ha.h>#include <lustre_import.h>#include "ptlrpc_internal.h"void ptlrpc_init_client(int req_portal, int rep_portal, char *name, struct ptlrpc_client *cl){ cl->cli_request_portal = req_portal; cl->cli_reply_portal = rep_portal; cl->cli_name = name;}struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid){ struct ptlrpc_connection *c; lnet_nid_t self; lnet_process_id_t peer; int err; err = ptlrpc_uuid_to_peer(uuid, &peer, &self); if (err != 0) { CERROR("cannot find peer %s!\n", uuid->uuid); return NULL; } c = ptlrpc_get_connection(peer, self, uuid); if (c) { memcpy(c->c_remote_uuid.uuid, uuid->uuid, sizeof(c->c_remote_uuid.uuid)); } CDEBUG(D_INFO, "%s -> %p\n", uuid->uuid, c); return c;}void ptlrpc_readdress_connection(struct ptlrpc_connection *conn, struct obd_uuid *uuid){ lnet_nid_t self; lnet_process_id_t peer; int err; err = ptlrpc_uuid_to_peer(uuid, &peer, &self); if (err != 0) { CERROR("cannot find peer %s!\n", uuid->uuid); return; } conn->c_peer = peer; conn->c_self = self; return;}static inline struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal){ struct ptlrpc_bulk_desc *desc; OBD_ALLOC(desc, offsetof (struct ptlrpc_bulk_desc, bd_iov[npages])); if (!desc) return NULL; spin_lock_init(&desc->bd_lock); cfs_waitq_init(&desc->bd_waitq); desc->bd_max_iov = npages; desc->bd_iov_count = 0; desc->bd_md_h = LNET_INVALID_HANDLE; desc->bd_portal = portal; desc->bd_type = type; return desc;}struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req, int npages, int type, int portal){ struct obd_import *imp = req->rq_import; struct ptlrpc_bulk_desc *desc; ENTRY; LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE); desc = new_bulk(npages, type, portal); if (desc == NULL) RETURN(NULL); desc->bd_import_generation = req->rq_import_generation; desc->bd_import = class_import_get(imp); desc->bd_req = req; desc->bd_cbid.cbid_fn = client_bulk_callback; desc->bd_cbid.cbid_arg = desc; /* This makes req own desc, and free it when she frees herself */ req->rq_bulk = desc; return desc;}struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp (struct ptlrpc_request *req, int npages, int type, int portal){ struct obd_export *exp = req->rq_export; struct ptlrpc_bulk_desc *desc; ENTRY; LASSERT(type == BULK_PUT_SOURCE || type == BULK_GET_SINK); desc = new_bulk(npages, type, portal); if (desc == NULL) RETURN(NULL); desc->bd_export = class_export_get(exp); desc->bd_req = req; desc->bd_cbid.cbid_fn = server_bulk_callback; desc->bd_cbid.cbid_arg = desc; /* NB we don't assign rq_bulk here; server-side requests are * re-used, and the handler frees the bulk desc explicitly. */ return desc;}void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc, cfs_page_t *page, int pageoffset, int len){ LASSERT(desc->bd_iov_count < desc->bd_max_iov); LASSERT(page != NULL); LASSERT(pageoffset >= 0); LASSERT(len > 0); LASSERT(pageoffset + len <= CFS_PAGE_SIZE); desc->bd_nob += len; ptlrpc_add_bulk_page(desc, page, pageoffset, len);}void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc){ ENTRY; LASSERT(desc != NULL); LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */ LASSERT(!desc->bd_network_rw); /* network hands off or */ LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL)); if (desc->bd_export) class_export_put(desc->bd_export); else class_import_put(desc->bd_import); OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc, bd_iov[desc->bd_max_iov])); EXIT;}/* Set server timelimit for this req */void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req){ __u32 serv_est; int idx; struct imp_at *at; LASSERT(req->rq_import); if (AT_OFF) { /* non-AT settings */ req->rq_timeout = req->rq_import->imp_server_timeout ? obd_timeout / 2 : obd_timeout; lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout); return; } at = &req->rq_import->imp_at; idx = import_at_get_index(req->rq_import, req->rq_request_portal); serv_est = at_get(&at->iat_service_estimate[idx]); /* add an arbitrary minimum: 125% +5 sec */ req->rq_timeout = serv_est + (serv_est >> 2) + 5; /* We could get even fancier here, using history to predict increased loading... */ /* Let the server know what this RPC timeout is by putting it in the reqmsg*/ lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout);}/* Adjust max service estimate based on server value */static void ptlrpc_at_adj_service(struct ptlrpc_request *req) { int idx; unsigned int serv_est, oldse; struct imp_at *at = &req->rq_import->imp_at; LASSERT(req->rq_import); /* service estimate is returned in the repmsg timeout field, may be 0 on err */ serv_est = lustre_msg_get_timeout(req->rq_repmsg); idx = import_at_get_index(req->rq_import, req->rq_request_portal); /* max service estimates are tracked on the server side, so just keep minimal history here */ oldse = at_add(&at->iat_service_estimate[idx], serv_est); if (oldse != 0) CDEBUG(D_ADAPTTO, "The RPC service estimate for %s ptl %d " "has changed from %d to %d\n", req->rq_import->imp_obd->obd_name,req->rq_request_portal, oldse, at_get(&at->iat_service_estimate[idx]));}/* Expected network latency per remote node (secs) */int ptlrpc_at_get_net_latency(struct ptlrpc_request *req){ return AT_OFF ? 0 : at_get(&req->rq_import->imp_at.iat_net_latency);}/* Adjust expected network latency */static void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req){ unsigned int st, nl, oldnl; struct imp_at *at = &req->rq_import->imp_at; time_t now = cfs_time_current_sec(); LASSERT(req->rq_import); st = lustre_msg_get_service_time(req->rq_repmsg); /* Network latency is total time less server processing time */ nl = max_t(int, now - req->rq_sent - st, 0) + 1/*st rounding*/; if (st > now - req->rq_sent + 2 /* rounding */) CERROR("Reported service time %u > total measured time %ld\n", st, now - req->rq_sent); oldnl = at_add(&at->iat_net_latency, nl); if (oldnl != 0) CDEBUG(D_ADAPTTO, "The network latency for %s (nid %s) " "has changed from %d to %d\n", req->rq_import->imp_obd->obd_name, obd_uuid2str( &req->rq_import->imp_connection->c_remote_uuid), oldnl, at_get(&at->iat_net_latency));}static int unpack_reply(struct ptlrpc_request *req){ int rc; /* Clear reply swab mask; we may have already swabbed an early reply */ req->rq_rep_swab_mask = 0; rc = lustre_unpack_msg(req->rq_repmsg, req->rq_nob_received); if (rc) { DEBUG_REQ(D_ERROR, req, "unpack_rep failed: %d", rc); return(-EPROTO); } rc = lustre_unpack_rep_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF); if (rc) { DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: %d", rc); return(-EPROTO); } return 0;}/* Handle an early reply message. We can't risk the real reply coming in and changing rq_repmsg, so this fn must be called under the rq_lock */static int ptlrpc_at_recv_early_reply(struct ptlrpc_request *req) { struct lustre_msg *oldmsg, *msgcpy; time_t olddl; int oldlen, rc; ENTRY; req->rq_early = 0; rc = unpack_reply(req); if (rc) /* Let's just ignore it - same as if it never got here */ RETURN(rc); /* We've got to make sure another early reply doesn't land on top of our current repbuf. Make a copy and verify checksum. */ oldlen = req->rq_replen; spin_unlock(&req->rq_lock); OBD_ALLOC(msgcpy, oldlen); if (!msgcpy) { spin_lock(&req->rq_lock); RETURN(-ENOMEM); } spin_lock(&req->rq_lock); /* Another reply might have changed the repmsg and replen while we dropped the lock; doesn't really matter, just use the latest. If it doesn't fit in oldlen, checksum will be wrong. */ oldmsg = req->rq_repmsg; memcpy(msgcpy, oldmsg, oldlen); if (lustre_msg_get_cksum(msgcpy) != lustre_msg_calc_cksum(msgcpy)) { CDEBUG(D_ADAPTTO, "Early reply checksum mismatch, " "discarding %x != %x\n", lustre_msg_get_cksum(msgcpy), lustre_msg_calc_cksum(msgcpy)); GOTO(out, rc = -EINVAL); } /* Our copied msg is valid, now we can adjust the timeouts without worrying that a new reply will land on the copy. */ req->rq_repmsg = msgcpy; /* Expecting to increase the service time estimate here */ ptlrpc_at_adj_service(req); ptlrpc_at_adj_net_latency(req); /* Adjust the local timeout for this req */ ptlrpc_at_set_req_timeout(req); olddl = req->rq_deadline; /* server assumes it now has rq_timeout from when it sent the early reply, so client should give it at least that long. */ req->rq_deadline = cfs_time_current_sec() + req->rq_timeout + ptlrpc_at_get_net_latency(req); DEBUG_REQ(D_ADAPTTO, req, "Early reply #%d, new deadline in %lds (%+lds)", req->rq_early_count, req->rq_deadline - cfs_time_current_sec(), req->rq_deadline - olddl); req->rq_repmsg = oldmsg;
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?