events.c

来自「lustre 1.6.5 source code」· C语言 代码 · 共 735 行 · 第 1/2 页

C
735
字号
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * *  Copyright (c) 2002, 2003 Cluster File Systems, Inc. * *   This file is part of the Lustre file system, http://www.lustre.org *   Lustre is a trademark of Cluster File Systems, Inc. * *   You may have signed or agreed to another license before downloading *   this software.  If so, you are bound by the terms and conditions *   of that agreement, and the following does not apply to you.  See the *   LICENSE file included with this distribution for more information. * *   If you did not agree to a different license, then this copy of Lustre *   is open source software; you can redistribute it and/or modify it *   under the terms of version 2 of the GNU General Public License as *   published by the Free Software Foundation. * *   In either case, Lustre is distributed in the hope that it will be *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the *   license text for more details. * */#define DEBUG_SUBSYSTEM S_RPC#ifndef __KERNEL__# include <liblustre.h>#else# ifdef __mips64__#  include <linux/kernel.h># endif#endif#include <obd_class.h>#include <lustre_net.h>#include "ptlrpc_internal.h"lnet_handle_eq_t   ptlrpc_eq_h;/*   *  Client's outgoing request callback */void request_out_callback(lnet_event_t *ev){        struct ptlrpc_cb_id   *cbid = ev->md.user_ptr;        struct ptlrpc_request *req = cbid->cbid_arg;        ENTRY;        LASSERT (ev->type == LNET_EVENT_SEND ||                 ev->type == LNET_EVENT_UNLINK);        LASSERT (ev->unlinked);        DEBUG_REQ((ev->status == 0) ? D_NET : D_ERROR, req,                  "type %d, status %d", ev->type, ev->status);        if (ev->type == LNET_EVENT_UNLINK || ev->status != 0) {                /* Failed send: make it seem like the reply timed out, just                 * like failing sends in client.c does currently...  */                spin_lock(&req->rq_lock);                req->rq_net_err = 1;                spin_unlock(&req->rq_lock);                ptlrpc_wake_client_req(req);        }        ptlrpc_req_finished(req);        EXIT;}/* * Client's incoming reply callback */void reply_in_callback(lnet_event_t *ev){        struct ptlrpc_cb_id   *cbid = ev->md.user_ptr;        struct ptlrpc_request *req = cbid->cbid_arg;        ENTRY;        DEBUG_REQ((ev->status == 0) ? D_NET : D_ERROR, req,                  "type %d, status %d", ev->type, ev->status);                LASSERT(ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_UNLINK);        LASSERT(ev->md.start == req->rq_repbuf);        LASSERT(ev->mlength <= req->rq_replen);        /* We've set LNET_MD_MANAGE_REMOTE for all outgoing requests           for adaptive timeouts' early reply. */        LASSERT((ev->md.options & LNET_MD_MANAGE_REMOTE) != 0);        spin_lock(&req->rq_lock);        req->rq_receiving_reply = 0;        req->rq_early = 0;                if (ev->status)                goto out_wake;        if (ev->type == LNET_EVENT_UNLINK) {                req->rq_must_unlink = 0;                DEBUG_REQ(D_RPCTRACE, req, "unlink");                goto out_wake;        }        if ((ev->offset == 0) &&             (lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT)) {                /* Early reply */                DEBUG_REQ(D_ADAPTTO, req,                          "Early reply received: mlen=%u offset=%d replen=%d "                          "replied=%d unlinked=%d", ev->mlength, ev->offset,                          req->rq_replen, req->rq_replied, ev->unlinked);                if (unlikely(ev->mlength != lustre_msg_early_size()))                        CERROR("early reply sized %u, expect %u\n",                               ev->mlength, lustre_msg_early_size());                req->rq_early_count++; /* number received, client side */                if (req->rq_replied) {                        /* If we already got the real reply, then we need to                         * check if lnet_finalize() unlinked the md.  In that                         * case, there will be no further callback of type                         * LNET_EVENT_UNLINK.                         */                        if (ev->unlinked)                                req->rq_must_unlink = 0;                        else                                DEBUG_REQ(D_RPCTRACE, req, "unlinked in reply");                        goto out_wake;                }                req->rq_early = 1;                req->rq_nob_received = ev->mlength;                /* repmsg points to early reply */                req->rq_repmsg = req->rq_repbuf;                /* And we're still receiving */                req->rq_receiving_reply = 1;        } else {                /* Real reply */                req->rq_replied = 1;                req->rq_nob_received = ev->mlength;                /* repmsg points to real reply */                req->rq_repmsg = (struct lustre_msg *)((char *)req->rq_repbuf +                                                       ev->offset);                /* LNetMDUnlink can't be called under the LNET_LOCK,                   so we must unlink in ptlrpc_unregister_reply */                DEBUG_REQ(D_INFO, req,                           "reply in flags=%x mlen=%u offset=%d replen=%d",                          lustre_msg_get_flags(req->rq_reqmsg),                          ev->mlength, ev->offset, req->rq_replen);        }        req->rq_import->imp_last_reply_time = cfs_time_current_sec();out_wake:        /* NB don't unlock till after wakeup; req can disappear under us         * since we don't have our own ref */        ptlrpc_wake_client_req(req);        spin_unlock(&req->rq_lock);        EXIT;}/*  * Client's bulk has been written/read */void client_bulk_callback (lnet_event_t *ev){        struct ptlrpc_cb_id     *cbid = ev->md.user_ptr;        struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;        ENTRY;        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB))                ev->status = -EIO;        LASSERT ((desc->bd_type == BULK_PUT_SINK &&                   ev->type == LNET_EVENT_PUT) ||                 (desc->bd_type == BULK_GET_SOURCE &&                  ev->type == LNET_EVENT_GET) ||                 ev->type == LNET_EVENT_UNLINK);        LASSERT (ev->unlinked);        CDEBUG((ev->status == 0) ? D_NET : D_ERROR,               "event type %d, status %d, desc %p\n",                ev->type, ev->status, desc);        spin_lock(&desc->bd_lock);        LASSERT(desc->bd_network_rw);        desc->bd_network_rw = 0;        if (ev->type != LNET_EVENT_UNLINK && ev->status == 0) {                desc->bd_success = 1;                desc->bd_nob_transferred = ev->mlength;                desc->bd_sender = ev->sender;        }        /* NB don't unlock till after wakeup; desc can disappear under us         * otherwise */        ptlrpc_wake_client_req(desc->bd_req);        spin_unlock(&desc->bd_lock);        EXIT;}/*  * Server's incoming request callback */void request_in_callback(lnet_event_t *ev){        struct ptlrpc_cb_id               *cbid = ev->md.user_ptr;        struct ptlrpc_request_buffer_desc *rqbd = cbid->cbid_arg;        struct ptlrpc_service             *service = rqbd->rqbd_service;        struct ptlrpc_request             *req;        ENTRY;        LASSERT (ev->type == LNET_EVENT_PUT ||                 ev->type == LNET_EVENT_UNLINK);        LASSERT ((char *)ev->md.start >= rqbd->rqbd_buffer);        LASSERT ((char *)ev->md.start + ev->offset + ev->mlength <=                 rqbd->rqbd_buffer + service->srv_buf_size);        CDEBUG((ev->status == 0) ? D_NET : D_ERROR,               "event type %d, status %d, service %s\n",                ev->type, ev->status, service->srv_name);        if (ev->unlinked) {                /* If this is the last request message to fit in the                 * request buffer we can use the request object embedded in                 * rqbd.  Note that if we failed to allocate a request,                 * we'd have to re-post the rqbd, which we can't do in this                 * context. */                req = &rqbd->rqbd_req;                memset(req, 0, sizeof (*req));        } else {                LASSERT (ev->type == LNET_EVENT_PUT);                if (ev->status != 0) {                        /* We moaned above already... */                        return;                }                OBD_ALLOC_GFP(req, sizeof(*req), CFS_ALLOC_ATOMIC_TRY);                if (req == NULL) {                        CERROR("Can't allocate incoming request descriptor: "                               "Dropping %s RPC from %s\n",                               service->srv_name,                                libcfs_id2str(ev->initiator));                        return;                }        }        /* NB we ABSOLUTELY RELY on req being zeroed, so pointers are NULL,         * flags are reset and scalars are zero.  We only set the message         * size to non-zero if this was a successful receive. */        req->rq_xid = ev->match_bits;        req->rq_reqmsg = ev->md.start + ev->offset;        if (ev->type == LNET_EVENT_PUT && ev->status == 0)                req->rq_reqlen = ev->mlength;        do_gettimeofday(&req->rq_arrival_time);        req->rq_peer = ev->initiator;        req->rq_self = ev->target.nid;        req->rq_rqbd = rqbd;        req->rq_phase = RQ_PHASE_NEW;#ifdef CRAY_XT3        req->rq_uid = ev->uid;#endif        spin_lock_init(&req->rq_lock);        CFS_INIT_LIST_HEAD(&req->rq_timed_list);        atomic_set(&req->rq_refcount, 1);        if (ev->type == LNET_EVENT_PUT)                DEBUG_REQ(D_RPCTRACE, req, "incoming req");        spin_lock(&service->srv_lock);        req->rq_history_seq = service->srv_request_seq++;        list_add_tail(&req->rq_history_list, &service->srv_request_history);        if (ev->unlinked) {                service->srv_nrqbd_receiving--;                CDEBUG(D_INFO, "Buffer complete: %d buffers still posted\n",                       service->srv_nrqbd_receiving);                /* Normally, don't complain about 0 buffers posted; LNET won't                 * drop incoming reqs since we set the portal lazy */                if (test_req_buffer_pressure &&                    ev->type != LNET_EVENT_UNLINK &&                    service->srv_nrqbd_receiving == 0)                        CWARN("All %s request buffers busy\n",                              service->srv_name);                /* req takes over the network's ref on rqbd */        } else {                /* req takes a ref on rqbd */                rqbd->rqbd_refcount++;        }        list_add_tail(&req->rq_list, &service->srv_req_in_queue);        service->srv_n_queued_reqs++;        /* NB everything can disappear under us once the request         * has been queued and we unlock, so do the wake now... */        cfs_waitq_signal(&service->srv_waitq);        spin_unlock(&service->srv_lock);        EXIT;}/* *  Server's outgoing reply callback */void reply_out_callback(lnet_event_t *ev){        struct ptlrpc_cb_id       *cbid = ev->md.user_ptr;        struct ptlrpc_reply_state *rs = cbid->cbid_arg;        struct ptlrpc_service     *svc = rs->rs_service;        ENTRY;        LASSERT (ev->type == LNET_EVENT_SEND ||                 ev->type == LNET_EVENT_ACK ||                 ev->type == LNET_EVENT_UNLINK);        if (!rs->rs_difficult) {                /* 'Easy' replies have no further processing so I drop the                 * net's ref on 'rs' */                LASSERT (ev->unlinked);                ptlrpc_rs_decref(rs);                atomic_dec (&svc->srv_outstanding_replies);                EXIT;                return;        }        LASSERT (rs->rs_on_net);        if (ev->unlinked) {                /* Last network callback.  The net's ref on 'rs' stays put                 * until ptlrpc_server_handle_reply() is done with it */                spin_lock(&svc->srv_lock);                rs->rs_on_net = 0;                ptlrpc_schedule_difficult_reply (rs);                spin_unlock(&svc->srv_lock);        }        EXIT;}/* * Server's bulk completion callback */void server_bulk_callback (lnet_event_t *ev){        struct ptlrpc_cb_id     *cbid = ev->md.user_ptr;        struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;        ENTRY;        LASSERT (ev->type == LNET_EVENT_SEND ||                 ev->type == LNET_EVENT_UNLINK ||                 (desc->bd_type == BULK_PUT_SOURCE &&                  ev->type == LNET_EVENT_ACK) ||                 (desc->bd_type == BULK_GET_SINK &&                  ev->type == LNET_EVENT_REPLY));        CDEBUG((ev->status == 0) ? D_NET : D_ERROR,               "event type %d, status %d, desc %p\n",                ev->type, ev->status, desc);        spin_lock(&desc->bd_lock);                if ((ev->type == LNET_EVENT_ACK ||             ev->type == LNET_EVENT_REPLY) &&            ev->status == 0) {                /* We heard back from the peer, so even if we get this

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?