⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ralnd.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 4 页
字号:
        write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);}intkranal_set_conn_params(kra_conn_t *conn, kra_connreq_t *connreq,                       __u32 peer_ip, int peer_port){        kra_device_t  *dev = conn->rac_device;        unsigned long  flags;        RAP_RETURN     rrc;        /* CAVEAT EMPTOR: we're really overloading rac_last_tx + rac_keepalive         * to do RapkCompleteSync() timekeeping (see kibnal_scheduler). */        conn->rac_last_tx = jiffies;        conn->rac_keepalive = 0;        rrc = RapkSetRiParams(conn->rac_rihandle, &connreq->racr_riparams);        if (rrc != RAP_SUCCESS) {                CERROR("Error setting riparams from %u.%u.%u.%u/%d: %d\n",                       HIPQUAD(peer_ip), peer_port, rrc);                return -ECONNABORTED;        }        /* Schedule conn on rad_new_conns */        kranal_conn_addref(conn);        spin_lock_irqsave(&dev->rad_lock, flags);        list_add_tail(&conn->rac_schedlist, &dev->rad_new_conns);        wake_up(&dev->rad_waitq);        spin_unlock_irqrestore(&dev->rad_lock, flags);        rrc = RapkWaitToConnect(conn->rac_rihandle);        if (rrc != RAP_SUCCESS) {                CERROR("Error waiting to connect to %u.%u.%u.%u/%d: %d\n",                       HIPQUAD(peer_ip), peer_port, rrc);                return -ECONNABORTED;        }        /* Scheduler doesn't touch conn apart from to deschedule and decref it         * after RapkCompleteSync() return success, so conn is all mine */        conn->rac_peerstamp = connreq->racr_peerstamp;        conn->rac_peer_connstamp = connreq->racr_connstamp;        conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq->racr_timeout);        kranal_update_reaper_timeout(conn->rac_keepalive);        return 0;}intkranal_passive_conn_handshake (struct socket *sock, lnet_nid_t *src_nidp,                               lnet_nid_t *dst_nidp, kra_conn_t **connp){        __u32                peer_ip;        unsigned int         peer_port;        kra_connreq_t        rx_connreq;        kra_connreq_t        tx_connreq;        kra_conn_t          *conn;        kra_device_t        *dev;        int                  rc;        int                  i;        rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port);        if (rc != 0) {                CERROR("Can't get peer's IP: %d\n", rc);                return rc;        }        rc = kranal_recv_connreq(sock, &rx_connreq, 0);        if (rc < 0) {                CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n",                       HIPQUAD(peer_ip), peer_port, rc);                return rc;        }        if (rc > 0) {                /* Request from "new" peer: send reply with my MAGIC/VERSION to                 * tell her I'm old... */                kranal_pack_connreq(&tx_connreq, NULL, LNET_NID_ANY);                rc = libcfs_sock_write(sock, &tx_connreq, sizeof(tx_connreq),                                       lnet_acceptor_timeout());                if (rc != 0)                        CERROR("Can't tx stub connreq to %u.%u.%u.%u/%d: %d\n",                               HIPQUAD(peer_ip), peer_port, rc);                return -EPROTO;        }        for (i = 0;;i++) {                if (i == kranal_data.kra_ndevs) {                        CERROR("Can't match dev %d from %u.%u.%u.%u/%d\n",                               rx_connreq.racr_devid, HIPQUAD(peer_ip), peer_port);                        return -ENODEV;                }                dev = &kranal_data.kra_devices[i];                if (dev->rad_id == rx_connreq.racr_devid)                        break;        }        rc = kranal_create_conn(&conn, dev);        if (rc != 0)                return rc;        kranal_pack_connreq(&tx_connreq, conn, rx_connreq.racr_srcnid);        rc = libcfs_sock_write(sock, &tx_connreq, sizeof(tx_connreq),                               lnet_acceptor_timeout());        if (rc != 0) {                CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n",                       HIPQUAD(peer_ip), peer_port, rc);                kranal_conn_decref(conn);                return rc;        }        rc = kranal_set_conn_params(conn, &rx_connreq, peer_ip, peer_port);        if (rc != 0) {                kranal_conn_decref(conn);                return rc;        }        *connp = conn;        *src_nidp = rx_connreq.racr_srcnid;        *dst_nidp = rx_connreq.racr_dstnid;        return 0;}intkranal_active_conn_handshake(kra_peer_t *peer,                             lnet_nid_t *dst_nidp, kra_conn_t **connp){        kra_connreq_t       connreq;        kra_conn_t         *conn;        kra_device_t       *dev;        struct socket      *sock;        int                 rc;        unsigned int        idx;        /* spread connections over all devices using both peer NIDs to ensure         * all nids use all devices */        idx = peer->rap_nid + kranal_data.kra_ni->ni_nid;        dev = &kranal_data.kra_devices[idx % kranal_data.kra_ndevs];        rc = kranal_create_conn(&conn, dev);        if (rc != 0)                return rc;        kranal_pack_connreq(&connreq, conn, peer->rap_nid);        if (the_lnet.ln_testprotocompat != 0) {                /* single-shot proto test */                LNET_LOCK();                if ((the_lnet.ln_testprotocompat & 1) != 0) {                        connreq.racr_version++;                        the_lnet.ln_testprotocompat &= ~1;                }                if ((the_lnet.ln_testprotocompat & 2) != 0) {                        connreq.racr_magic = LNET_PROTO_MAGIC;                        the_lnet.ln_testprotocompat &= ~2;                }                LNET_UNLOCK();        }        rc = lnet_connect(&sock, peer->rap_nid,                         0, peer->rap_ip, peer->rap_port);        if (rc != 0)                goto failed_0;        /* CAVEAT EMPTOR: the passive side receives with a SHORT rx timeout         * immediately after accepting a connection, so we connect and then         * send immediately. */        rc = libcfs_sock_write(sock, &connreq, sizeof(connreq),                               lnet_acceptor_timeout());        if (rc != 0) {                CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n",                       HIPQUAD(peer->rap_ip), peer->rap_port, rc);                goto failed_2;        }        rc = kranal_recv_connreq(sock, &connreq, 1);        if (rc != 0) {                CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n",                       HIPQUAD(peer->rap_ip), peer->rap_port, rc);                goto failed_2;        }        libcfs_sock_release(sock);        rc = -EPROTO;        if (connreq.racr_srcnid != peer->rap_nid) {                CERROR("Unexpected srcnid from %u.%u.%u.%u/%d: "                       "received %s expected %s\n",                       HIPQUAD(peer->rap_ip), peer->rap_port,                       libcfs_nid2str(connreq.racr_srcnid),                        libcfs_nid2str(peer->rap_nid));                goto failed_1;        }        if (connreq.racr_devid != dev->rad_id) {                CERROR("Unexpected device id from %u.%u.%u.%u/%d: "                       "received %d expected %d\n",                       HIPQUAD(peer->rap_ip), peer->rap_port,                       connreq.racr_devid, dev->rad_id);                goto failed_1;        }        rc = kranal_set_conn_params(conn, &connreq,                                    peer->rap_ip, peer->rap_port);        if (rc != 0)                goto failed_1;        *connp = conn;        *dst_nidp = connreq.racr_dstnid;        return 0; failed_2:        libcfs_sock_release(sock); failed_1:        lnet_connect_console_error(rc, peer->rap_nid,                                  peer->rap_ip, peer->rap_port); failed_0:        kranal_conn_decref(conn);        return rc;}intkranal_conn_handshake (struct socket *sock, kra_peer_t *peer){        kra_peer_t        *peer2;        kra_tx_t          *tx;        lnet_nid_t          peer_nid;        lnet_nid_t          dst_nid;        unsigned long      flags;        kra_conn_t        *conn;        int                rc;        int                nstale;        int                new_peer = 0;        if (sock == NULL) {                /* active: connd wants to connect to 'peer' */                LASSERT (peer != NULL);                LASSERT (peer->rap_connecting);                rc = kranal_active_conn_handshake(peer, &dst_nid, &conn);                if (rc != 0)                        return rc;                write_lock_irqsave(&kranal_data.kra_global_lock, flags);                if (!kranal_peer_active(peer)) {                        /* raced with peer getting unlinked */                        write_unlock_irqrestore(&kranal_data.kra_global_lock,                                                flags);                        kranal_conn_decref(conn);                        return -ESTALE;                }                peer_nid = peer->rap_nid;        } else {                /* passive: listener accepted 'sock' */                LASSERT (peer == NULL);                rc = kranal_passive_conn_handshake(sock, &peer_nid,                                                   &dst_nid, &conn);                if (rc != 0)                        return rc;                /* assume this is a new peer */                rc = kranal_create_peer(&peer, peer_nid);                if (rc != 0) {                        CERROR("Can't create conn for %s\n",                                libcfs_nid2str(peer_nid));                        kranal_conn_decref(conn);                        return -ENOMEM;                }                write_lock_irqsave(&kranal_data.kra_global_lock, flags);                peer2 = kranal_find_peer_locked(peer_nid);                if (peer2 == NULL) {                        new_peer = 1;                } else {                        /* peer_nid already in the peer table */                        kranal_peer_decref(peer);                        peer = peer2;                }        }        LASSERT ((!new_peer) != (!kranal_peer_active(peer)));        /* Refuse connection if peer thinks we are a different NID.  We check         * this while holding the global lock, to synch with connection         * destruction on NID change. */        if (!lnet_ptlcompat_matchnid(kranal_data.kra_ni->ni_nid, dst_nid)) {                write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);                CERROR("Stale/bad connection with %s: dst_nid %s, expected %s\n",                       libcfs_nid2str(peer_nid), libcfs_nid2str(dst_nid),                        libcfs_nid2str(kranal_data.kra_ni->ni_nid));                rc = -ESTALE;                goto failed;        }        /* Refuse to duplicate an existing connection (both sides might try to         * connect at once).  NB we return success!  We _are_ connected so we         * _don't_ have any blocked txs to complete with failure. */        rc = kranal_conn_isdup_locked(peer, conn);        if (rc != 0) {                LASSERT (!list_empty(&peer->rap_conns));                LASSERT (list_empty(&peer->rap_tx_queue));                write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);                CWARN("Not creating duplicate connection to %s: %d\n",                      libcfs_nid2str(peer_nid), rc);                rc = 0;                goto failed;        }        if (new_peer) {                /* peer table takes my ref on the new peer */                list_add_tail(&peer->rap_list,                              kranal_nid2peerlist(peer_nid));        }        /* initialise timestamps before reaper looks at them */        conn->rac_last_tx = conn->rac_last_rx = jiffies;        kranal_peer_addref(peer);               /* +1 ref for conn */        conn->rac_peer = peer;        list_add_tail(&conn->rac_list, &peer->rap_conns);        kranal_conn_addref(conn);               /* +1 ref for conn table */        list_add_tail(&conn->rac_hashlist,                      kranal_cqid2connlist(conn->rac_cqid));        /* Schedule all packets blocking for a connection */        while (!list_empty(&peer->rap_tx_queue)) {                tx = list_entry(peer->rap_tx_queue.next,                                kra_tx_t, tx_list);                list_del(&tx->tx_list);                kranal_post_fma(conn, tx);        }        nstale = kranal_close_stale_conns_locked(peer, conn);        write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);        /* CAVEAT EMPTOR: passive peer can disappear NOW */        if (nstale != 0)                CWARN("Closed %d stale conns to %s\n", nstale,                       libcfs_nid2str(peer_nid));        CWARN("New connection to %s on devid[%d] = %d\n",               libcfs_nid2str(peer_nid),                conn->rac_device->rad_idx, conn->rac_device->rad_id);        /* Ensure conn gets checked.  Transmits may have been queued and an         * FMA event may have happened before it got in the cq hash table */        kranal_schedule_conn(conn);        return 0; failed:        if (new_peer)                kranal_peer_decref(peer);        kranal_conn_decref(conn);        return rc;}voidkranal_connect (kra_peer_t *peer){        kra_tx_t          *tx;        unsigned long      flags;        struct list_head   zombies;        int                rc;        LASSERT (peer->rap_connecting);        CDEBUG(D_NET, "About to handshake %s\n",                libcfs_nid2str(peer->rap_nid));        rc = kranal_conn_handshake(NULL, peer);        CDEBUG(D_NET, "Done handshake %s:%d \n",                libcfs_nid2str(peer->rap_nid), rc);        write_lock_irqsave(&kranal_data.kra_global_lock, flags);        LASSERT (peer->rap_connecting);        peer->rap_connecting = 0;        if (rc == 0) {                /* kranal_conn_handshake() queues blocked txs immediately on                 * success to avoid messages jumping the queue */                LASSERT (list_empty(&peer->rap_tx_queue));                peer->rap_reconnect_interval = 0; /* OK to reconnect at any time */                write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);                return;        }        peer->rap_reconnect_interval *= 2;        peer->rap_reconnect_interval =                MAX(peer->rap_reconnect_interval,                    *kranal_tunables.kra_min_reconnect_interval);        peer->rap_reconnect_interval =                MIN(peer->rap_reconnect_interval,                    *kranal_tunables.kra_max_reconnect_interval);        peer->rap_reconnect_time = jiffies + peer->rap_reconnect_interval * HZ;        /* Grab all blocked packets while we have the global lock */        list_add(&zombies, &peer->rap_tx_queue);        list_del_init(&peer->rap_tx_queue);        write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);        if (list_empty(&zombies))                return;        CDEBUG(D_NETERROR, "Dropping packets for %s: connection failed\n",               libcfs_nid2str(peer->rap_nid));        do {                tx = list_entry(zombies.next, kra_tx_t, tx_list);                list_del(&tx->tx_list);                kranal_tx_done(tx, -EHOSTUNREACH);        } while (!list_empty(&zombies));}void

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -