📄 ralnd.c
字号:
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (C) 2004 Cluster File Systems, Inc. * Author: Eric Barton <eric@bartonsoftware.com> * * This file is part of Lustre, http://www.lustre.org. * * Lustre is free software; you can redistribute it and/or * modify it under the terms of version 2 of the GNU General Public * License as published by the Free Software Foundation. * * Lustre is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with Lustre; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. * */#include "ralnd.h"static int kranal_devids[RANAL_MAXDEVS] = {RAPK_MAIN_DEVICE_ID, RAPK_EXPANSION_DEVICE_ID};lnd_t the_kralnd = { .lnd_type = RALND, .lnd_startup = kranal_startup, .lnd_shutdown = kranal_shutdown, .lnd_ctl = kranal_ctl, .lnd_send = kranal_send, .lnd_recv = kranal_recv, .lnd_eager_recv = kranal_eager_recv, .lnd_accept = kranal_accept,};kra_data_t kranal_data;voidkranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn, lnet_nid_t dstnid){ RAP_RETURN rrc; memset(connreq, 0, sizeof(*connreq)); connreq->racr_magic = RANAL_MSG_MAGIC; connreq->racr_version = RANAL_MSG_VERSION; if (conn == NULL) /* prepping a "stub" reply */ return; connreq->racr_devid = conn->rac_device->rad_id; connreq->racr_srcnid = lnet_ptlcompat_srcnid(kranal_data.kra_ni->ni_nid, dstnid); connreq->racr_dstnid = dstnid; connreq->racr_peerstamp = kranal_data.kra_peerstamp; connreq->racr_connstamp = conn->rac_my_connstamp; connreq->racr_timeout = conn->rac_timeout; rrc = RapkGetRiParams(conn->rac_rihandle, &connreq->racr_riparams); LASSERT(rrc == RAP_SUCCESS);}intkranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int active){ int timeout = active ? *kranal_tunables.kra_timeout : lnet_acceptor_timeout(); int swab; int rc; /* return 0 on success, -ve on error, +ve to tell the peer I'm "old" */ rc = libcfs_sock_read(sock, &connreq->racr_magic, sizeof(connreq->racr_magic), timeout); if (rc != 0) { CERROR("Read(magic) failed(1): %d\n", rc); return -EIO; } if (connreq->racr_magic != RANAL_MSG_MAGIC && connreq->racr_magic != __swab32(RANAL_MSG_MAGIC)) { /* Unexpected magic! */ if (!active && the_lnet.ln_ptlcompat == 0 && (connreq->racr_magic == LNET_PROTO_MAGIC || connreq->racr_magic == __swab32(LNET_PROTO_MAGIC))) { /* future protocol version compatibility! * When LNET unifies protocols over all LNDs, the first * thing sent will be a version query. +ve rc means I * reply with my current magic/version */ return EPROTO; } if (active || the_lnet.ln_ptlcompat == 0) { CERROR("Unexpected magic %08x (1)\n", connreq->racr_magic); return -EPROTO; } /* When portals compatibility is set, I may be passed a new * connection "blindly" by the acceptor, and I have to * determine if my peer has sent an acceptor connection request * or not. This isn't a connreq, so I'll get the acceptor to * look at it... */ rc = lnet_accept(kranal_data.kra_ni, sock, connreq->racr_magic); if (rc != 0) return -EPROTO; /* ...and if it's OK I'm back to looking for a connreq... */ rc = libcfs_sock_read(sock, &connreq->racr_magic, sizeof(connreq->racr_magic), timeout); if (rc != 0) { CERROR("Read(magic) failed(2): %d\n", rc); return -EIO; } if (connreq->racr_magic != RANAL_MSG_MAGIC && connreq->racr_magic != __swab32(RANAL_MSG_MAGIC)) { CERROR("Unexpected magic %08x(2)\n", connreq->racr_magic); return -EPROTO; } } swab = (connreq->racr_magic == __swab32(RANAL_MSG_MAGIC)); rc = libcfs_sock_read(sock, &connreq->racr_version, sizeof(connreq->racr_version), timeout); if (rc != 0) { CERROR("Read(version) failed: %d\n", rc); return -EIO; } if (swab) __swab16s(&connreq->racr_version); if (connreq->racr_version != RANAL_MSG_VERSION) { if (active) { CERROR("Unexpected version %d\n", connreq->racr_version); return -EPROTO; } /* If this is a future version of the ralnd protocol, and I'm * passive (accepted the connection), tell my peer I'm "old" * (+ve rc) */ return EPROTO; } rc = libcfs_sock_read(sock, &connreq->racr_devid, sizeof(connreq->racr_version) - offsetof(kra_connreq_t, racr_devid), timeout); if (rc != 0) { CERROR("Read(body) failed: %d\n", rc); return -EIO; } if (swab) { __swab32s(&connreq->racr_magic); __swab16s(&connreq->racr_version); __swab16s(&connreq->racr_devid); __swab64s(&connreq->racr_srcnid); __swab64s(&connreq->racr_dstnid); __swab64s(&connreq->racr_peerstamp); __swab64s(&connreq->racr_connstamp); __swab32s(&connreq->racr_timeout); __swab32s(&connreq->racr_riparams.HostId); __swab32s(&connreq->racr_riparams.FmaDomainHndl); __swab32s(&connreq->racr_riparams.PTag); __swab32s(&connreq->racr_riparams.CompletionCookie); } if (connreq->racr_srcnid == LNET_NID_ANY || connreq->racr_dstnid == LNET_NID_ANY) { CERROR("Received LNET_NID_ANY\n"); return -EPROTO; } if (connreq->racr_timeout < RANAL_MIN_TIMEOUT) { CERROR("Received timeout %d < MIN %d\n", connreq->racr_timeout, RANAL_MIN_TIMEOUT); return -EPROTO; } return 0;}intkranal_close_stale_conns_locked (kra_peer_t *peer, kra_conn_t *newconn){ kra_conn_t *conn; struct list_head *ctmp; struct list_head *cnxt; int loopback; int count = 0; loopback = peer->rap_nid == kranal_data.kra_ni->ni_nid; list_for_each_safe (ctmp, cnxt, &peer->rap_conns) { conn = list_entry(ctmp, kra_conn_t, rac_list); if (conn == newconn) continue; if (conn->rac_peerstamp != newconn->rac_peerstamp) { CDEBUG(D_NET, "Closing stale conn nid: %s " " peerstamp:"LPX64"("LPX64")\n", libcfs_nid2str(peer->rap_nid), conn->rac_peerstamp, newconn->rac_peerstamp); LASSERT (conn->rac_peerstamp < newconn->rac_peerstamp); count++; kranal_close_conn_locked(conn, -ESTALE); continue; } if (conn->rac_device != newconn->rac_device) continue; if (loopback && newconn->rac_my_connstamp == conn->rac_peer_connstamp && newconn->rac_peer_connstamp == conn->rac_my_connstamp) continue; LASSERT (conn->rac_peer_connstamp < newconn->rac_peer_connstamp); CDEBUG(D_NET, "Closing stale conn nid: %s" " connstamp:"LPX64"("LPX64")\n", libcfs_nid2str(peer->rap_nid), conn->rac_peer_connstamp, newconn->rac_peer_connstamp); count++; kranal_close_conn_locked(conn, -ESTALE); } return count;}intkranal_conn_isdup_locked(kra_peer_t *peer, kra_conn_t *newconn){ kra_conn_t *conn; struct list_head *tmp; int loopback; loopback = peer->rap_nid == kranal_data.kra_ni->ni_nid; list_for_each(tmp, &peer->rap_conns) { conn = list_entry(tmp, kra_conn_t, rac_list); /* 'newconn' is from an earlier version of 'peer'!!! */ if (newconn->rac_peerstamp < conn->rac_peerstamp) return 1; /* 'conn' is from an earlier version of 'peer': it will be * removed when we cull stale conns later on... */ if (newconn->rac_peerstamp > conn->rac_peerstamp) continue; /* Different devices are OK */ if (conn->rac_device != newconn->rac_device) continue; /* It's me connecting to myself */ if (loopback && newconn->rac_my_connstamp == conn->rac_peer_connstamp && newconn->rac_peer_connstamp == conn->rac_my_connstamp) continue; /* 'newconn' is an earlier connection from 'peer'!!! */ if (newconn->rac_peer_connstamp < conn->rac_peer_connstamp) return 2; /* 'conn' is an earlier connection from 'peer': it will be * removed when we cull stale conns later on... */ if (newconn->rac_peer_connstamp > conn->rac_peer_connstamp) continue; /* 'newconn' has the SAME connection stamp; 'peer' isn't * playing the game... */ return 3; } return 0;}voidkranal_set_conn_uniqueness (kra_conn_t *conn){ unsigned long flags; write_lock_irqsave(&kranal_data.kra_global_lock, flags); conn->rac_my_connstamp = kranal_data.kra_connstamp++; do { /* allocate a unique cqid */ conn->rac_cqid = kranal_data.kra_next_cqid++; } while (kranal_cqid2conn_locked(conn->rac_cqid) != NULL); write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);}intkranal_create_conn(kra_conn_t **connp, kra_device_t *dev){ kra_conn_t *conn; RAP_RETURN rrc; LASSERT (!in_interrupt()); LIBCFS_ALLOC(conn, sizeof(*conn)); if (conn == NULL) return -ENOMEM; memset(conn, 0, sizeof(*conn)); atomic_set(&conn->rac_refcount, 1); INIT_LIST_HEAD(&conn->rac_list); INIT_LIST_HEAD(&conn->rac_hashlist); INIT_LIST_HEAD(&conn->rac_schedlist); INIT_LIST_HEAD(&conn->rac_fmaq); INIT_LIST_HEAD(&conn->rac_rdmaq); INIT_LIST_HEAD(&conn->rac_replyq); spin_lock_init(&conn->rac_lock); kranal_set_conn_uniqueness(conn); conn->rac_device = dev; conn->rac_timeout = MAX(*kranal_tunables.kra_timeout, RANAL_MIN_TIMEOUT); kranal_update_reaper_timeout(conn->rac_timeout); rrc = RapkCreateRi(dev->rad_handle, conn->rac_cqid, &conn->rac_rihandle); if (rrc != RAP_SUCCESS) { CERROR("RapkCreateRi failed: %d\n", rrc); LIBCFS_FREE(conn, sizeof(*conn)); return -ENETDOWN; } atomic_inc(&kranal_data.kra_nconns); *connp = conn; return 0;}voidkranal_destroy_conn(kra_conn_t *conn){ RAP_RETURN rrc; LASSERT (!in_interrupt()); LASSERT (!conn->rac_scheduled); LASSERT (list_empty(&conn->rac_list)); LASSERT (list_empty(&conn->rac_hashlist)); LASSERT (list_empty(&conn->rac_schedlist)); LASSERT (atomic_read(&conn->rac_refcount) == 0); LASSERT (list_empty(&conn->rac_fmaq)); LASSERT (list_empty(&conn->rac_rdmaq)); LASSERT (list_empty(&conn->rac_replyq)); rrc = RapkDestroyRi(conn->rac_device->rad_handle, conn->rac_rihandle); LASSERT (rrc == RAP_SUCCESS); if (conn->rac_peer != NULL) kranal_peer_decref(conn->rac_peer); LIBCFS_FREE(conn, sizeof(*conn)); atomic_dec(&kranal_data.kra_nconns);}voidkranal_terminate_conn_locked (kra_conn_t *conn){ LASSERT (!in_interrupt()); LASSERT (conn->rac_state == RANAL_CONN_CLOSING); LASSERT (!list_empty(&conn->rac_hashlist)); LASSERT (list_empty(&conn->rac_list)); /* Remove from conn hash table: no new callbacks */ list_del_init(&conn->rac_hashlist); kranal_conn_decref(conn); conn->rac_state = RANAL_CONN_CLOSED; /* schedule to clear out all uncompleted comms in context of dev's * scheduler */ kranal_schedule_conn(conn);}voidkranal_close_conn_locked (kra_conn_t *conn, int error){ kra_peer_t *peer = conn->rac_peer; CDEBUG(error == 0 ? D_NET : D_NETERROR, "closing conn to %s: error %d\n", libcfs_nid2str(peer->rap_nid), error); LASSERT (!in_interrupt()); LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED); LASSERT (!list_empty(&conn->rac_hashlist)); LASSERT (!list_empty(&conn->rac_list)); list_del_init(&conn->rac_list); if (list_empty(&peer->rap_conns) && peer->rap_persistence == 0) { /* Non-persistent peer with no more conns... */ kranal_unlink_peer_locked(peer); } /* Reset RX timeout to ensure we wait for an incoming CLOSE for the * full timeout. If we get a CLOSE we know the peer has stopped all * RDMA. Otherwise if we wait for the full timeout we can also be sure * all RDMA has stopped. */ conn->rac_last_rx = jiffies; mb(); conn->rac_state = RANAL_CONN_CLOSING; kranal_schedule_conn(conn); /* schedule sending CLOSE */ kranal_conn_decref(conn); /* lose peer's ref */}voidkranal_close_conn (kra_conn_t *conn, int error){ unsigned long flags; write_lock_irqsave(&kranal_data.kra_global_lock, flags); if (conn->rac_state == RANAL_CONN_ESTABLISHED) kranal_close_conn_locked(conn, error);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -