events.c
来自「lustre 1.6.5 source code」· C语言 代码 · 共 735 行 · 第 1/2 页
C
735 行
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (c) 2002, 2003 Cluster File Systems, Inc. * * This file is part of the Lustre file system, http://www.lustre.org * Lustre is a trademark of Cluster File Systems, Inc. * * You may have signed or agreed to another license before downloading * this software. If so, you are bound by the terms and conditions * of that agreement, and the following does not apply to you. See the * LICENSE file included with this distribution for more information. * * If you did not agree to a different license, then this copy of Lustre * is open source software; you can redistribute it and/or modify it * under the terms of version 2 of the GNU General Public License as * published by the Free Software Foundation. * * In either case, Lustre is distributed in the hope that it will be * useful, but WITHOUT ANY WARRANTY; without even the implied warranty * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * license text for more details. * */#define DEBUG_SUBSYSTEM S_RPC#ifndef __KERNEL__# include <liblustre.h>#else# ifdef __mips64__# include <linux/kernel.h># endif#endif#include <obd_class.h>#include <lustre_net.h>#include "ptlrpc_internal.h"lnet_handle_eq_t ptlrpc_eq_h;/* * Client's outgoing request callback */void request_out_callback(lnet_event_t *ev){ struct ptlrpc_cb_id *cbid = ev->md.user_ptr; struct ptlrpc_request *req = cbid->cbid_arg; ENTRY; LASSERT (ev->type == LNET_EVENT_SEND || ev->type == LNET_EVENT_UNLINK); LASSERT (ev->unlinked); DEBUG_REQ((ev->status == 0) ? D_NET : D_ERROR, req, "type %d, status %d", ev->type, ev->status); if (ev->type == LNET_EVENT_UNLINK || ev->status != 0) { /* Failed send: make it seem like the reply timed out, just * like failing sends in client.c does currently... */ spin_lock(&req->rq_lock); req->rq_net_err = 1; spin_unlock(&req->rq_lock); ptlrpc_wake_client_req(req); } ptlrpc_req_finished(req); EXIT;}/* * Client's incoming reply callback */void reply_in_callback(lnet_event_t *ev){ struct ptlrpc_cb_id *cbid = ev->md.user_ptr; struct ptlrpc_request *req = cbid->cbid_arg; ENTRY; DEBUG_REQ((ev->status == 0) ? D_NET : D_ERROR, req, "type %d, status %d", ev->type, ev->status); LASSERT(ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_UNLINK); LASSERT(ev->md.start == req->rq_repbuf); LASSERT(ev->mlength <= req->rq_replen); /* We've set LNET_MD_MANAGE_REMOTE for all outgoing requests for adaptive timeouts' early reply. */ LASSERT((ev->md.options & LNET_MD_MANAGE_REMOTE) != 0); spin_lock(&req->rq_lock); req->rq_receiving_reply = 0; req->rq_early = 0; if (ev->status) goto out_wake; if (ev->type == LNET_EVENT_UNLINK) { req->rq_must_unlink = 0; DEBUG_REQ(D_RPCTRACE, req, "unlink"); goto out_wake; } if ((ev->offset == 0) && (lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT)) { /* Early reply */ DEBUG_REQ(D_ADAPTTO, req, "Early reply received: mlen=%u offset=%d replen=%d " "replied=%d unlinked=%d", ev->mlength, ev->offset, req->rq_replen, req->rq_replied, ev->unlinked); if (unlikely(ev->mlength != lustre_msg_early_size())) CERROR("early reply sized %u, expect %u\n", ev->mlength, lustre_msg_early_size()); req->rq_early_count++; /* number received, client side */ if (req->rq_replied) { /* If we already got the real reply, then we need to * check if lnet_finalize() unlinked the md. In that * case, there will be no further callback of type * LNET_EVENT_UNLINK. */ if (ev->unlinked) req->rq_must_unlink = 0; else DEBUG_REQ(D_RPCTRACE, req, "unlinked in reply"); goto out_wake; } req->rq_early = 1; req->rq_nob_received = ev->mlength; /* repmsg points to early reply */ req->rq_repmsg = req->rq_repbuf; /* And we're still receiving */ req->rq_receiving_reply = 1; } else { /* Real reply */ req->rq_replied = 1; req->rq_nob_received = ev->mlength; /* repmsg points to real reply */ req->rq_repmsg = (struct lustre_msg *)((char *)req->rq_repbuf + ev->offset); /* LNetMDUnlink can't be called under the LNET_LOCK, so we must unlink in ptlrpc_unregister_reply */ DEBUG_REQ(D_INFO, req, "reply in flags=%x mlen=%u offset=%d replen=%d", lustre_msg_get_flags(req->rq_reqmsg), ev->mlength, ev->offset, req->rq_replen); } req->rq_import->imp_last_reply_time = cfs_time_current_sec();out_wake: /* NB don't unlock till after wakeup; req can disappear under us * since we don't have our own ref */ ptlrpc_wake_client_req(req); spin_unlock(&req->rq_lock); EXIT;}/* * Client's bulk has been written/read */void client_bulk_callback (lnet_event_t *ev){ struct ptlrpc_cb_id *cbid = ev->md.user_ptr; struct ptlrpc_bulk_desc *desc = cbid->cbid_arg; ENTRY; if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB)) ev->status = -EIO; LASSERT ((desc->bd_type == BULK_PUT_SINK && ev->type == LNET_EVENT_PUT) || (desc->bd_type == BULK_GET_SOURCE && ev->type == LNET_EVENT_GET) || ev->type == LNET_EVENT_UNLINK); LASSERT (ev->unlinked); CDEBUG((ev->status == 0) ? D_NET : D_ERROR, "event type %d, status %d, desc %p\n", ev->type, ev->status, desc); spin_lock(&desc->bd_lock); LASSERT(desc->bd_network_rw); desc->bd_network_rw = 0; if (ev->type != LNET_EVENT_UNLINK && ev->status == 0) { desc->bd_success = 1; desc->bd_nob_transferred = ev->mlength; desc->bd_sender = ev->sender; } /* NB don't unlock till after wakeup; desc can disappear under us * otherwise */ ptlrpc_wake_client_req(desc->bd_req); spin_unlock(&desc->bd_lock); EXIT;}/* * Server's incoming request callback */void request_in_callback(lnet_event_t *ev){ struct ptlrpc_cb_id *cbid = ev->md.user_ptr; struct ptlrpc_request_buffer_desc *rqbd = cbid->cbid_arg; struct ptlrpc_service *service = rqbd->rqbd_service; struct ptlrpc_request *req; ENTRY; LASSERT (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_UNLINK); LASSERT ((char *)ev->md.start >= rqbd->rqbd_buffer); LASSERT ((char *)ev->md.start + ev->offset + ev->mlength <= rqbd->rqbd_buffer + service->srv_buf_size); CDEBUG((ev->status == 0) ? D_NET : D_ERROR, "event type %d, status %d, service %s\n", ev->type, ev->status, service->srv_name); if (ev->unlinked) { /* If this is the last request message to fit in the * request buffer we can use the request object embedded in * rqbd. Note that if we failed to allocate a request, * we'd have to re-post the rqbd, which we can't do in this * context. */ req = &rqbd->rqbd_req; memset(req, 0, sizeof (*req)); } else { LASSERT (ev->type == LNET_EVENT_PUT); if (ev->status != 0) { /* We moaned above already... */ return; } OBD_ALLOC_GFP(req, sizeof(*req), CFS_ALLOC_ATOMIC_TRY); if (req == NULL) { CERROR("Can't allocate incoming request descriptor: " "Dropping %s RPC from %s\n", service->srv_name, libcfs_id2str(ev->initiator)); return; } } /* NB we ABSOLUTELY RELY on req being zeroed, so pointers are NULL, * flags are reset and scalars are zero. We only set the message * size to non-zero if this was a successful receive. */ req->rq_xid = ev->match_bits; req->rq_reqmsg = ev->md.start + ev->offset; if (ev->type == LNET_EVENT_PUT && ev->status == 0) req->rq_reqlen = ev->mlength; do_gettimeofday(&req->rq_arrival_time); req->rq_peer = ev->initiator; req->rq_self = ev->target.nid; req->rq_rqbd = rqbd; req->rq_phase = RQ_PHASE_NEW;#ifdef CRAY_XT3 req->rq_uid = ev->uid;#endif spin_lock_init(&req->rq_lock); CFS_INIT_LIST_HEAD(&req->rq_timed_list); atomic_set(&req->rq_refcount, 1); if (ev->type == LNET_EVENT_PUT) DEBUG_REQ(D_RPCTRACE, req, "incoming req"); spin_lock(&service->srv_lock); req->rq_history_seq = service->srv_request_seq++; list_add_tail(&req->rq_history_list, &service->srv_request_history); if (ev->unlinked) { service->srv_nrqbd_receiving--; CDEBUG(D_INFO, "Buffer complete: %d buffers still posted\n", service->srv_nrqbd_receiving); /* Normally, don't complain about 0 buffers posted; LNET won't * drop incoming reqs since we set the portal lazy */ if (test_req_buffer_pressure && ev->type != LNET_EVENT_UNLINK && service->srv_nrqbd_receiving == 0) CWARN("All %s request buffers busy\n", service->srv_name); /* req takes over the network's ref on rqbd */ } else { /* req takes a ref on rqbd */ rqbd->rqbd_refcount++; } list_add_tail(&req->rq_list, &service->srv_req_in_queue); service->srv_n_queued_reqs++; /* NB everything can disappear under us once the request * has been queued and we unlock, so do the wake now... */ cfs_waitq_signal(&service->srv_waitq); spin_unlock(&service->srv_lock); EXIT;}/* * Server's outgoing reply callback */void reply_out_callback(lnet_event_t *ev){ struct ptlrpc_cb_id *cbid = ev->md.user_ptr; struct ptlrpc_reply_state *rs = cbid->cbid_arg; struct ptlrpc_service *svc = rs->rs_service; ENTRY; LASSERT (ev->type == LNET_EVENT_SEND || ev->type == LNET_EVENT_ACK || ev->type == LNET_EVENT_UNLINK); if (!rs->rs_difficult) { /* 'Easy' replies have no further processing so I drop the * net's ref on 'rs' */ LASSERT (ev->unlinked); ptlrpc_rs_decref(rs); atomic_dec (&svc->srv_outstanding_replies); EXIT; return; } LASSERT (rs->rs_on_net); if (ev->unlinked) { /* Last network callback. The net's ref on 'rs' stays put * until ptlrpc_server_handle_reply() is done with it */ spin_lock(&svc->srv_lock); rs->rs_on_net = 0; ptlrpc_schedule_difficult_reply (rs); spin_unlock(&svc->srv_lock); } EXIT;}/* * Server's bulk completion callback */void server_bulk_callback (lnet_event_t *ev){ struct ptlrpc_cb_id *cbid = ev->md.user_ptr; struct ptlrpc_bulk_desc *desc = cbid->cbid_arg; ENTRY; LASSERT (ev->type == LNET_EVENT_SEND || ev->type == LNET_EVENT_UNLINK || (desc->bd_type == BULK_PUT_SOURCE && ev->type == LNET_EVENT_ACK) || (desc->bd_type == BULK_GET_SINK && ev->type == LNET_EVENT_REPLY)); CDEBUG((ev->status == 0) ? D_NET : D_ERROR, "event type %d, status %d, desc %p\n", ev->type, ev->status, desc); spin_lock(&desc->bd_lock); if ((ev->type == LNET_EVENT_ACK || ev->type == LNET_EVENT_REPLY) && ev->status == 0) { /* We heard back from the peer, so even if we get this
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?