⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ralnd.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 4 页
字号:
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (C) 2004 Cluster File Systems, Inc. *   Author: Eric Barton <eric@bartonsoftware.com> * *   This file is part of Lustre, http://www.lustre.org. * *   Lustre is free software; you can redistribute it and/or *   modify it under the terms of version 2 of the GNU General Public *   License as published by the Free Software Foundation. * *   Lustre is distributed in the hope that it will be useful, *   but WITHOUT ANY WARRANTY; without even the implied warranty of *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the *   GNU General Public License for more details. * *   You should have received a copy of the GNU General Public License *   along with Lustre; if not, write to the Free Software *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. * */#include "ralnd.h"static int        kranal_devids[RANAL_MAXDEVS] = {RAPK_MAIN_DEVICE_ID,                                                  RAPK_EXPANSION_DEVICE_ID};lnd_t the_kralnd = {        .lnd_type       = RALND,        .lnd_startup    = kranal_startup,        .lnd_shutdown   = kranal_shutdown,        .lnd_ctl        = kranal_ctl,        .lnd_send       = kranal_send,        .lnd_recv       = kranal_recv,        .lnd_eager_recv = kranal_eager_recv,        .lnd_accept     = kranal_accept,};kra_data_t              kranal_data;voidkranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn, lnet_nid_t dstnid){        RAP_RETURN   rrc;        memset(connreq, 0, sizeof(*connreq));        connreq->racr_magic     = RANAL_MSG_MAGIC;        connreq->racr_version   = RANAL_MSG_VERSION;        if (conn == NULL)                       /* prepping a "stub" reply */                return;        connreq->racr_devid     = conn->rac_device->rad_id;        connreq->racr_srcnid    = lnet_ptlcompat_srcnid(kranal_data.kra_ni->ni_nid,                                                        dstnid);        connreq->racr_dstnid    = dstnid;        connreq->racr_peerstamp = kranal_data.kra_peerstamp;        connreq->racr_connstamp = conn->rac_my_connstamp;        connreq->racr_timeout   = conn->rac_timeout;        rrc = RapkGetRiParams(conn->rac_rihandle, &connreq->racr_riparams);        LASSERT(rrc == RAP_SUCCESS);}intkranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int active){        int         timeout = active ? *kranal_tunables.kra_timeout :                                        lnet_acceptor_timeout();        int         swab;        int         rc;        /* return 0 on success, -ve on error, +ve to tell the peer I'm "old" */        rc = libcfs_sock_read(sock, &connreq->racr_magic,                               sizeof(connreq->racr_magic), timeout);        if (rc != 0) {                CERROR("Read(magic) failed(1): %d\n", rc);                return -EIO;        }        if (connreq->racr_magic != RANAL_MSG_MAGIC &&            connreq->racr_magic != __swab32(RANAL_MSG_MAGIC)) {                /* Unexpected magic! */                if (!active &&                    the_lnet.ln_ptlcompat == 0 &&                    (connreq->racr_magic == LNET_PROTO_MAGIC ||                     connreq->racr_magic == __swab32(LNET_PROTO_MAGIC))) {                        /* future protocol version compatibility!                         * When LNET unifies protocols over all LNDs, the first                         * thing sent will be a version query.  +ve rc means I                         * reply with my current magic/version */                        return EPROTO;                }                if (active ||                    the_lnet.ln_ptlcompat == 0) {                        CERROR("Unexpected magic %08x (1)\n",                               connreq->racr_magic);                        return -EPROTO;                }                /* When portals compatibility is set, I may be passed a new                 * connection "blindly" by the acceptor, and I have to                 * determine if my peer has sent an acceptor connection request                 * or not.  This isn't a connreq, so I'll get the acceptor to                 * look at it... */                rc = lnet_accept(kranal_data.kra_ni, sock, connreq->racr_magic);                if (rc != 0)                        return -EPROTO;                /* ...and if it's OK I'm back to looking for a connreq... */                rc = libcfs_sock_read(sock, &connreq->racr_magic,                                      sizeof(connreq->racr_magic), timeout);                if (rc != 0) {                        CERROR("Read(magic) failed(2): %d\n", rc);                        return -EIO;                }                if (connreq->racr_magic != RANAL_MSG_MAGIC &&                    connreq->racr_magic != __swab32(RANAL_MSG_MAGIC)) {                        CERROR("Unexpected magic %08x(2)\n",                               connreq->racr_magic);                        return -EPROTO;                }        }        swab = (connreq->racr_magic == __swab32(RANAL_MSG_MAGIC));        rc = libcfs_sock_read(sock, &connreq->racr_version,                              sizeof(connreq->racr_version), timeout);        if (rc != 0) {                CERROR("Read(version) failed: %d\n", rc);                return -EIO;        }        if (swab)                __swab16s(&connreq->racr_version);                if (connreq->racr_version != RANAL_MSG_VERSION) {                if (active) {                        CERROR("Unexpected version %d\n", connreq->racr_version);                        return -EPROTO;                }                /* If this is a future version of the ralnd protocol, and I'm                 * passive (accepted the connection), tell my peer I'm "old"                 * (+ve rc) */                return EPROTO;        }        rc = libcfs_sock_read(sock, &connreq->racr_devid,                              sizeof(connreq->racr_version) -                              offsetof(kra_connreq_t, racr_devid),                              timeout);        if (rc != 0) {                CERROR("Read(body) failed: %d\n", rc);                return -EIO;        }        if (swab) {                __swab32s(&connreq->racr_magic);                __swab16s(&connreq->racr_version);                __swab16s(&connreq->racr_devid);                __swab64s(&connreq->racr_srcnid);                __swab64s(&connreq->racr_dstnid);                __swab64s(&connreq->racr_peerstamp);                __swab64s(&connreq->racr_connstamp);                __swab32s(&connreq->racr_timeout);                __swab32s(&connreq->racr_riparams.HostId);                __swab32s(&connreq->racr_riparams.FmaDomainHndl);                __swab32s(&connreq->racr_riparams.PTag);                __swab32s(&connreq->racr_riparams.CompletionCookie);        }        if (connreq->racr_srcnid == LNET_NID_ANY ||            connreq->racr_dstnid == LNET_NID_ANY) {                CERROR("Received LNET_NID_ANY\n");                return -EPROTO;        }        if (connreq->racr_timeout < RANAL_MIN_TIMEOUT) {                CERROR("Received timeout %d < MIN %d\n",                       connreq->racr_timeout, RANAL_MIN_TIMEOUT);                return -EPROTO;        }        return 0;}intkranal_close_stale_conns_locked (kra_peer_t *peer, kra_conn_t *newconn){        kra_conn_t         *conn;        struct list_head   *ctmp;        struct list_head   *cnxt;        int                 loopback;        int                 count = 0;        loopback = peer->rap_nid == kranal_data.kra_ni->ni_nid;        list_for_each_safe (ctmp, cnxt, &peer->rap_conns) {                conn = list_entry(ctmp, kra_conn_t, rac_list);                if (conn == newconn)                        continue;                if (conn->rac_peerstamp != newconn->rac_peerstamp) {                        CDEBUG(D_NET, "Closing stale conn nid: %s "                               " peerstamp:"LPX64"("LPX64")\n",                                libcfs_nid2str(peer->rap_nid),                               conn->rac_peerstamp, newconn->rac_peerstamp);                        LASSERT (conn->rac_peerstamp < newconn->rac_peerstamp);                        count++;                        kranal_close_conn_locked(conn, -ESTALE);                        continue;                }                if (conn->rac_device != newconn->rac_device)                        continue;                if (loopback &&                    newconn->rac_my_connstamp == conn->rac_peer_connstamp &&                    newconn->rac_peer_connstamp == conn->rac_my_connstamp)                        continue;                LASSERT (conn->rac_peer_connstamp < newconn->rac_peer_connstamp);                CDEBUG(D_NET, "Closing stale conn nid: %s"                       " connstamp:"LPX64"("LPX64")\n",                        libcfs_nid2str(peer->rap_nid),                       conn->rac_peer_connstamp, newconn->rac_peer_connstamp);                count++;                kranal_close_conn_locked(conn, -ESTALE);        }        return count;}intkranal_conn_isdup_locked(kra_peer_t *peer, kra_conn_t *newconn){        kra_conn_t       *conn;        struct list_head *tmp;        int               loopback;        loopback = peer->rap_nid == kranal_data.kra_ni->ni_nid;        list_for_each(tmp, &peer->rap_conns) {                conn = list_entry(tmp, kra_conn_t, rac_list);                /* 'newconn' is from an earlier version of 'peer'!!! */                if (newconn->rac_peerstamp < conn->rac_peerstamp)                        return 1;                /* 'conn' is from an earlier version of 'peer': it will be                 * removed when we cull stale conns later on... */                if (newconn->rac_peerstamp > conn->rac_peerstamp)                        continue;                /* Different devices are OK */                if (conn->rac_device != newconn->rac_device)                        continue;                /* It's me connecting to myself */                if (loopback &&                    newconn->rac_my_connstamp == conn->rac_peer_connstamp &&                    newconn->rac_peer_connstamp == conn->rac_my_connstamp)                        continue;                /* 'newconn' is an earlier connection from 'peer'!!! */                if (newconn->rac_peer_connstamp < conn->rac_peer_connstamp)                        return 2;                /* 'conn' is an earlier connection from 'peer': it will be                 * removed when we cull stale conns later on... */                if (newconn->rac_peer_connstamp > conn->rac_peer_connstamp)                        continue;                /* 'newconn' has the SAME connection stamp; 'peer' isn't                 * playing the game... */                return 3;        }        return 0;}voidkranal_set_conn_uniqueness (kra_conn_t *conn){        unsigned long  flags;        write_lock_irqsave(&kranal_data.kra_global_lock, flags);        conn->rac_my_connstamp = kranal_data.kra_connstamp++;        do {    /* allocate a unique cqid */                conn->rac_cqid = kranal_data.kra_next_cqid++;        } while (kranal_cqid2conn_locked(conn->rac_cqid) != NULL);        write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);}intkranal_create_conn(kra_conn_t **connp, kra_device_t *dev){        kra_conn_t    *conn;        RAP_RETURN     rrc;        LASSERT (!in_interrupt());        LIBCFS_ALLOC(conn, sizeof(*conn));        if (conn == NULL)                return -ENOMEM;        memset(conn, 0, sizeof(*conn));        atomic_set(&conn->rac_refcount, 1);        INIT_LIST_HEAD(&conn->rac_list);        INIT_LIST_HEAD(&conn->rac_hashlist);        INIT_LIST_HEAD(&conn->rac_schedlist);        INIT_LIST_HEAD(&conn->rac_fmaq);        INIT_LIST_HEAD(&conn->rac_rdmaq);        INIT_LIST_HEAD(&conn->rac_replyq);        spin_lock_init(&conn->rac_lock);        kranal_set_conn_uniqueness(conn);        conn->rac_device = dev;        conn->rac_timeout = MAX(*kranal_tunables.kra_timeout, RANAL_MIN_TIMEOUT);        kranal_update_reaper_timeout(conn->rac_timeout);        rrc = RapkCreateRi(dev->rad_handle, conn->rac_cqid,                           &conn->rac_rihandle);        if (rrc != RAP_SUCCESS) {                CERROR("RapkCreateRi failed: %d\n", rrc);                LIBCFS_FREE(conn, sizeof(*conn));                return -ENETDOWN;        }        atomic_inc(&kranal_data.kra_nconns);        *connp = conn;        return 0;}voidkranal_destroy_conn(kra_conn_t *conn){        RAP_RETURN         rrc;        LASSERT (!in_interrupt());        LASSERT (!conn->rac_scheduled);        LASSERT (list_empty(&conn->rac_list));        LASSERT (list_empty(&conn->rac_hashlist));        LASSERT (list_empty(&conn->rac_schedlist));        LASSERT (atomic_read(&conn->rac_refcount) == 0);        LASSERT (list_empty(&conn->rac_fmaq));        LASSERT (list_empty(&conn->rac_rdmaq));        LASSERT (list_empty(&conn->rac_replyq));        rrc = RapkDestroyRi(conn->rac_device->rad_handle,                            conn->rac_rihandle);        LASSERT (rrc == RAP_SUCCESS);        if (conn->rac_peer != NULL)                kranal_peer_decref(conn->rac_peer);        LIBCFS_FREE(conn, sizeof(*conn));        atomic_dec(&kranal_data.kra_nconns);}voidkranal_terminate_conn_locked (kra_conn_t *conn){        LASSERT (!in_interrupt());        LASSERT (conn->rac_state == RANAL_CONN_CLOSING);        LASSERT (!list_empty(&conn->rac_hashlist));        LASSERT (list_empty(&conn->rac_list));        /* Remove from conn hash table: no new callbacks */        list_del_init(&conn->rac_hashlist);        kranal_conn_decref(conn);        conn->rac_state = RANAL_CONN_CLOSED;        /* schedule to clear out all uncompleted comms in context of dev's         * scheduler */        kranal_schedule_conn(conn);}voidkranal_close_conn_locked (kra_conn_t *conn, int error){        kra_peer_t        *peer = conn->rac_peer;        CDEBUG(error == 0 ? D_NET : D_NETERROR,               "closing conn to %s: error %d\n",                libcfs_nid2str(peer->rap_nid), error);        LASSERT (!in_interrupt());        LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED);        LASSERT (!list_empty(&conn->rac_hashlist));        LASSERT (!list_empty(&conn->rac_list));        list_del_init(&conn->rac_list);        if (list_empty(&peer->rap_conns) &&            peer->rap_persistence == 0) {                /* Non-persistent peer with no more conns... */                kranal_unlink_peer_locked(peer);        }        /* Reset RX timeout to ensure we wait for an incoming CLOSE for the         * full timeout.  If we get a CLOSE we know the peer has stopped all         * RDMA.  Otherwise if we wait for the full timeout we can also be sure         * all RDMA has stopped. */        conn->rac_last_rx = jiffies;        mb();        conn->rac_state = RANAL_CONN_CLOSING;        kranal_schedule_conn(conn);             /* schedule sending CLOSE */        kranal_conn_decref(conn);               /* lose peer's ref */}voidkranal_close_conn (kra_conn_t *conn, int error){        unsigned long    flags;        write_lock_irqsave(&kranal_data.kra_global_lock, flags);        if (conn->rac_state == RANAL_CONN_ESTABLISHED)                kranal_close_conn_locked(conn, error);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -