⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 openiblnd.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 5 页
字号:
        LIBCFS_ALLOC(msg, sizeof(*msg));        if (msg == NULL) {                CERROR("Can't allocate msgs for %u.%u.%u.%u/%d\n",                       HIPQUAD(peer_ip), peer_port);                return;        }                rc = libcfs_sock_read(sock, &msg->ibm_magic, sizeof(msg->ibm_magic),                              lnet_acceptor_timeout());        if (rc != 0) {                CERROR("Error %d receiving svcqry(1) from %u.%u.%u.%u/%d\n",                       rc, HIPQUAD(peer_ip), peer_port);                goto out;        }        if (msg->ibm_magic != IBNAL_MSG_MAGIC &&            msg->ibm_magic != __swab32(IBNAL_MSG_MAGIC)) {                /* Unexpected magic! */                if (the_lnet.ln_ptlcompat == 0) {                        if (msg->ibm_magic == LNET_PROTO_MAGIC ||                            msg->ibm_magic == __swab32(LNET_PROTO_MAGIC)) {                                /* future protocol version compatibility!                                 * When LNET unifies protocols over all LNDs,                                 * the first thing sent will be a version                                 * query.  I send back a reply in my current                                 * protocol to tell her I'm "old" */                                kibnal_init_msg(msg, 0, 0);                                kibnal_pack_msg(msg, IBNAL_MSG_VERSION, 0,                                                 LNET_NID_ANY, 0);                                reject = 1;                                goto reply;                        }                        CERROR ("Bad magic(1) %#08x (%#08x expected) from "                                "%u.%u.%u.%u/%d\n", msg->ibm_magic,                                IBNAL_MSG_MAGIC, HIPQUAD(peer_ip), peer_port);                        goto out;                }                /* When portals compatibility is set, I may be passed a new                 * connection "blindly" by the acceptor, and I have to                 * determine if my peer has sent an acceptor connection request                 * or not. */                rc = lnet_accept(kibnal_data.kib_ni, sock, msg->ibm_magic);                if (rc != 0)                        goto out;                /* It was an acceptor connection request!                 * Now I should see my magic... */                rc = libcfs_sock_read(sock, &msg->ibm_magic,                                      sizeof(msg->ibm_magic),                                      lnet_acceptor_timeout());                if (rc != 0) {                        CERROR("Error %d receiving svcqry(2) from %u.%u.%u.%u/%d\n",                               rc, HIPQUAD(peer_ip), peer_port);                        goto out;                }                if (msg->ibm_magic != IBNAL_MSG_MAGIC &&                    msg->ibm_magic != __swab32(IBNAL_MSG_MAGIC)) {                        CERROR ("Bad magic(2) %#08x (%#08x expected) from "                                "%u.%u.%u.%u/%d\n", msg->ibm_magic,                                IBNAL_MSG_MAGIC, HIPQUAD(peer_ip), peer_port);                        goto out;                }        }        /* Now check version */        rc = libcfs_sock_read(sock, &msg->ibm_version, sizeof(msg->ibm_version),                              lnet_acceptor_timeout());        if (rc != 0) {                CERROR("Error %d receiving svcqry(3) from %u.%u.%u.%u/%d\n",                       rc, HIPQUAD(peer_ip), peer_port);                goto out;        }        version = (msg->ibm_magic == IBNAL_MSG_MAGIC) ?                  msg->ibm_version : __swab16(msg->ibm_version);        /* Peer is a different protocol version: reply in my current protocol         * to tell her I'm "old" */        if (version != IBNAL_MSG_VERSION &&            version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) {                kibnal_init_msg(msg, 0, 0);                kibnal_pack_msg(msg, IBNAL_MSG_VERSION, 0, LNET_NID_ANY, 0);                reject = 1;                goto reply;        }                /* Now read in all the rest */        rc = libcfs_sock_read(sock, &msg->ibm_type,                              offsetof(kib_msg_t, ibm_u) -                              offsetof(kib_msg_t, ibm_type),                              lnet_acceptor_timeout());        if (rc != 0) {                CERROR("Error %d receiving svcqry(4) from %u.%u.%u.%u/%d\n",                       rc, HIPQUAD(peer_ip), peer_port);                goto out;        }                rc = kibnal_unpack_msg(msg, version, offsetof(kib_msg_t, ibm_u));        if (rc != 0) {                CERROR("Error %d unpacking svcqry from %u.%u.%u.%u/%d\n",                       rc, HIPQUAD(peer_ip), peer_port);                goto out;        }                if (msg->ibm_type != IBNAL_MSG_SVCQRY) {                CERROR("Unexpected message %d from %u.%u.%u.%u/%d\n",                       msg->ibm_type, HIPQUAD(peer_ip), peer_port);                goto out;        }                if (!lnet_ptlcompat_matchnid(kibnal_data.kib_ni->ni_nid,                                     msg->ibm_dstnid)) {                CERROR("Unexpected dstnid %s: expected %s from %u.%u.%u.%u/%d\n",                       libcfs_nid2str(msg->ibm_dstnid),                       libcfs_nid2str(kibnal_data.kib_ni->ni_nid),                       HIPQUAD(peer_ip), peer_port);                goto out;        }        srcnid = msg->ibm_srcnid;        srcstamp = msg->ibm_srcstamp;                kibnal_init_msg(msg, IBNAL_MSG_SVCRSP, sizeof(msg->ibm_u.svcrsp));        msg->ibm_u.svcrsp.ibsr_svc_id = kibnal_data.kib_svc_id;        memcpy(msg->ibm_u.svcrsp.ibsr_svc_gid, kibnal_data.kib_svc_gid,               sizeof(kibnal_data.kib_svc_gid));        msg->ibm_u.svcrsp.ibsr_svc_pkey = kibnal_data.kib_svc_pkey;        kibnal_pack_msg(msg, version, 0, srcnid, srcstamp); reply:        rc = libcfs_sock_write (sock, msg, msg->ibm_nob,                                lnet_acceptor_timeout());        if (!reject && rc != 0) {                /* Only complain if we're not rejecting */                CERROR("Error %d replying to svcqry from %u.%u.%u.%u/%d\n",                       rc, HIPQUAD(peer_ip), peer_port);                goto out;        }         out:        LIBCFS_FREE(msg, sizeof(*msg));}voidkibnal_free_acceptsock (kib_acceptsock_t *as){        libcfs_sock_release(as->ibas_sock);        LIBCFS_FREE(as, sizeof(*as));}intkibnal_accept(lnet_ni_t *ni, struct socket *sock){        kib_acceptsock_t  *as;        unsigned long      flags;        LIBCFS_ALLOC(as, sizeof(*as));        if (as == NULL) {                CERROR("Out of Memory\n");                return -ENOMEM;        }        as->ibas_sock = sock;                        spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);                        list_add_tail(&as->ibas_list, &kibnal_data.kib_connd_acceptq);        wake_up(&kibnal_data.kib_connd_waitq);        spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);        return 0;}intkibnal_start_ib_listener (void) {        int    rc;        LASSERT (kibnal_data.kib_listen_handle == NULL);        kibnal_data.kib_svc_id = ib_cm_service_assign();        CDEBUG(D_NET, "svc id "LPX64"\n", kibnal_data.kib_svc_id);        rc = ib_cached_gid_get(kibnal_data.kib_device,                               kibnal_data.kib_port, 0,                               kibnal_data.kib_svc_gid);        if (rc != 0) {                CERROR("Can't get port %d GID: %d\n",                       kibnal_data.kib_port, rc);                return rc;        }                rc = ib_cached_pkey_get(kibnal_data.kib_device,                                kibnal_data.kib_port, 0,                                &kibnal_data.kib_svc_pkey);        if (rc != 0) {                CERROR ("Can't get port %d PKEY: %d\n",                        kibnal_data.kib_port, rc);                return rc;        }        rc = ib_cm_listen(kibnal_data.kib_svc_id,                          TS_IB_CM_SERVICE_EXACT_MASK,                          kibnal_passive_conn_callback, NULL,                          &kibnal_data.kib_listen_handle);        if (rc != 0) {                kibnal_data.kib_listen_handle = NULL;                CERROR ("Can't create IB listener: %d\n", rc);                return rc;        }                LASSERT (kibnal_data.kib_listen_handle != NULL);        return 0;}voidkibnal_stop_ib_listener (void) {        int    rc;                LASSERT (kibnal_data.kib_listen_handle != NULL);        rc = ib_cm_listen_stop (kibnal_data.kib_listen_handle);        if (rc != 0)                CERROR("Error stopping IB listener: %d\n", rc);                        kibnal_data.kib_listen_handle = NULL;}intkibnal_create_peer (kib_peer_t **peerp, lnet_nid_t nid){        kib_peer_t     *peer;        unsigned long   flags;        int             rc;        LASSERT (nid != LNET_NID_ANY);        LIBCFS_ALLOC(peer, sizeof (*peer));        if (peer == NULL) {                CERROR("Cannot allocate peer\n");                return -ENOMEM;        }        memset(peer, 0, sizeof(*peer));         /* zero flags etc */        peer->ibp_nid = nid;        atomic_set (&peer->ibp_refcount, 1);    /* 1 ref for caller */        INIT_LIST_HEAD (&peer->ibp_list);       /* not in the peer table yet */        INIT_LIST_HEAD (&peer->ibp_conns);        INIT_LIST_HEAD (&peer->ibp_tx_queue);        INIT_LIST_HEAD (&peer->ibp_connd_list); /* not queued for connecting */        peer->ibp_error = 0;        peer->ibp_last_alive = cfs_time_current();        peer->ibp_reconnect_interval = 0;       /* OK to connect at any time */        write_lock_irqsave(&kibnal_data.kib_global_lock, flags);        if (atomic_read(&kibnal_data.kib_npeers) >=            *kibnal_tunables.kib_concurrent_peers) {                rc = -EOVERFLOW;        /* !! but at least it distinguishes */        } else if (kibnal_data.kib_nonewpeers) {                rc = -ESHUTDOWN;        /* shutdown has started */        } else {                rc = 0;                /* npeers only grows with kib_global_lock held */                atomic_inc(&kibnal_data.kib_npeers);        }                write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);        if (rc != 0) {                CERROR("Can't create peer: %s\n",                        (rc == -ESHUTDOWN) ? "shutting down" :                        "too many peers");                LIBCFS_FREE(peer, sizeof(*peer));        } else {                *peerp = peer;        }                return rc;}voidkibnal_destroy_peer (kib_peer_t *peer){        CDEBUG (D_NET, "peer %s %p deleted\n",                 libcfs_nid2str(peer->ibp_nid), peer);        LASSERT (atomic_read (&peer->ibp_refcount) == 0);        LASSERT (peer->ibp_persistence == 0);        LASSERT (!kibnal_peer_active(peer));        LASSERT (peer->ibp_connecting == 0);        LASSERT (peer->ibp_accepting == 0);        LASSERT (list_empty (&peer->ibp_connd_list));        LASSERT (list_empty (&peer->ibp_conns));        LASSERT (list_empty (&peer->ibp_tx_queue));        LIBCFS_FREE (peer, sizeof (*peer));        /* NB a peer's connections keep a reference on their peer until         * they are destroyed, so we can be assured that _all_ state to do         * with this peer has been cleaned up when its refcount drops to         * zero. */        atomic_dec(&kibnal_data.kib_npeers);}kib_peer_t *kibnal_find_peer_locked (lnet_nid_t nid){        struct list_head *peer_list = kibnal_nid2peerlist (nid);        struct list_head *tmp;        kib_peer_t       *peer;        list_for_each (tmp, peer_list) {                peer = list_entry (tmp, kib_peer_t, ibp_list);                LASSERT (peer->ibp_persistence != 0 || /* persistent peer */                         peer->ibp_connecting != 0 || /* creating conns */                         peer->ibp_accepting != 0 ||                         !list_empty (&peer->ibp_conns));  /* active conn */                if (peer->ibp_nid != nid)                        continue;                return (peer);        }        return (NULL);}kib_peer_t *kibnal_get_peer (lnet_nid_t nid){        kib_peer_t     *peer;        unsigned long   flags;        read_lock_irqsave(&kibnal_data.kib_global_lock, flags);        peer = kibnal_find_peer_locked (nid);        if (peer != NULL)                       /* +1 ref for caller? */                kibnal_peer_addref(peer);        read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);        return (peer);}voidkibnal_unlink_peer_locked (kib_peer_t *peer){        LASSERT (peer->ibp_persistence == 0);        LASSERT (list_empty(&peer->ibp_conns));        LASSERT (kibnal_peer_active(peer));        list_del_init (&peer->ibp_list);        /* lose peerlist's ref */        kibnal_peer_decref(peer);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -