⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mxlnd_cb.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 5 页
字号:
                                        CDEBUG(D_NETERROR, "mx_cancel() returned %s (%d)\n", mx_strerror(mxret), mxret);                                }                                if (result == 1) {                                        ctx->mxc_status.code = -ECONNABORTED;                                        ctx->mxc_state = MXLND_CTX_CANCELED;                                        /* NOTE this calls lnet_finalize() and                                         * we cannot hold any locks when calling it.                                         * It also calls mxlnd_conn_decref(conn) */                                        spin_unlock(&conn->mxk_lock);                                        mxlnd_handle_rx_completion(ctx);                                        spin_lock(&conn->mxk_lock);                                }                                break;                        }                }                spin_unlock(&conn->mxk_lock);        }        while (found);        return;}/** * mxlnd_conn_disconnect - shutdown a connection * @conn - a kmx_conn pointer * * This function sets the status to DISCONNECT, completes queued * txs with failure, calls mx_disconnect, which will complete * pending txs and matched rxs with failure. */voidmxlnd_conn_disconnect(struct kmx_conn *conn, int mx_dis, int notify){        struct list_head        *tmp    = NULL;        spin_lock(&conn->mxk_lock);        if (conn->mxk_status == MXLND_CONN_DISCONNECT) {                spin_unlock(&conn->mxk_lock);                return;        }        conn->mxk_status = MXLND_CONN_DISCONNECT;        conn->mxk_timeout = 0;        while (!list_empty(&conn->mxk_tx_free_queue) ||               !list_empty(&conn->mxk_tx_credit_queue)) {                struct kmx_ctx          *tx     = NULL;                if (!list_empty(&conn->mxk_tx_free_queue)) {                        tmp = &conn->mxk_tx_free_queue;                } else {                        tmp = &conn->mxk_tx_credit_queue;                }                tx = list_entry(tmp->next, struct kmx_ctx, mxc_list);                list_del_init(&tx->mxc_list);                tx->mxc_status.code = -ECONNABORTED;                spin_unlock(&conn->mxk_lock);                mxlnd_put_idle_tx(tx);                mxlnd_conn_decref(conn); /* for this tx */                spin_lock(&conn->mxk_lock);        }        spin_unlock(&conn->mxk_lock);        /* cancel pending rxs */        mxlnd_conn_cancel_pending_rxs(conn);        if (kmxlnd_data.kmx_shutdown != 1) {                if (mx_dis) mx_disconnect(kmxlnd_data.kmx_endpt, conn->mxk_epa);                if (notify) {                        time_t          last_alive      = 0;                        unsigned long   last_msg        = 0;                        /* notify LNET that we are giving up on this peer */                        if (time_after(conn->mxk_last_rx, conn->mxk_last_tx)) {                                last_msg = conn->mxk_last_rx;                        } else {                                last_msg = conn->mxk_last_tx;                        }                        last_alive = cfs_time_current_sec() -                                     cfs_duration_sec(cfs_time_current() - last_msg);                        lnet_notify(kmxlnd_data.kmx_ni, conn->mxk_peer->mxp_nid, 0, last_alive);                }        }        mxlnd_conn_decref(conn); /* drop the owning peer's reference */                return;}/** * mxlnd_conn_alloc - allocate and initialize a new conn struct * @connp - address of a kmx_conn pointer * @peer - owning kmx_peer * * Returns 0 on success and -ENOMEM on failure */intmxlnd_conn_alloc(struct kmx_conn **connp, struct kmx_peer *peer){        struct kmx_conn *conn    = NULL;        LASSERT(peer != NULL);        MXLND_ALLOC(conn, sizeof (*conn));        if (conn == NULL) {                CDEBUG(D_NETERROR, "Cannot allocate conn\n");                return -ENOMEM;        }        CDEBUG(D_NET, "allocated conn 0x%p for peer 0x%p\n", conn, peer);        memset(conn, 0, sizeof(*conn));        /* conn->mxk_incarnation = 0 - will be set by peer */        atomic_set(&conn->mxk_refcount, 1);     /* ref for owning peer */        conn->mxk_peer = peer;        /* mxk_epa - to be set after mx_iconnect() */        INIT_LIST_HEAD(&conn->mxk_list);        spin_lock_init(&conn->mxk_lock);        /* conn->mxk_timeout = 0 */        conn->mxk_last_tx = jiffies;        conn->mxk_last_rx = conn->mxk_last_tx;        conn->mxk_credits = *kmxlnd_tunables.kmx_credits;        /* mxk_outstanding = 0 */        conn->mxk_status = MXLND_CONN_INIT;        INIT_LIST_HEAD(&conn->mxk_tx_credit_queue);        INIT_LIST_HEAD(&conn->mxk_tx_free_queue);        /* conn->mxk_ntx_msgs = 0 */        /* conn->mxk_ntx_data = 0 */        /* conn->mxk_ntx_posted = 0 */        /* conn->mxk_data_posted = 0 */        INIT_LIST_HEAD(&conn->mxk_pending);        *connp = conn;        mxlnd_peer_addref(peer);        /* add a ref for this conn */        /* add to front of peer's conns list */        spin_lock(&peer->mxp_lock);        list_add(&conn->mxk_list, &peer->mxp_conns);        peer->mxp_conn = conn;        spin_unlock(&peer->mxp_lock);        return 0;}intmxlnd_q_pending_ctx(struct kmx_ctx *ctx){        int             ret     = 0;        struct kmx_conn *conn   = ctx->mxc_conn;        ctx->mxc_state = MXLND_CTX_PENDING;        if (conn != NULL) {                spin_lock(&conn->mxk_lock);                if (conn->mxk_status >= MXLND_CONN_INIT) {                        list_add_tail(&ctx->mxc_list, &conn->mxk_pending);                        if (conn->mxk_timeout == 0 || ctx->mxc_deadline < conn->mxk_timeout) {                                conn->mxk_timeout = ctx->mxc_deadline;                        }                } else {                        ctx->mxc_state = MXLND_CTX_COMPLETED;                        ret = -1;                }                spin_unlock(&conn->mxk_lock);        }        return ret;}intmxlnd_deq_pending_ctx(struct kmx_ctx *ctx){        LASSERT(ctx->mxc_state == MXLND_CTX_PENDING ||                ctx->mxc_state == MXLND_CTX_COMPLETED);        if (ctx->mxc_state != MXLND_CTX_PENDING &&            ctx->mxc_state != MXLND_CTX_COMPLETED) {                CDEBUG(D_NETERROR, "deq ctx->mxc_state = %s\n",                        mxlnd_ctxstate_to_str(ctx->mxc_state));        }        ctx->mxc_state = MXLND_CTX_COMPLETED;        if (!list_empty(&ctx->mxc_list)) {                struct kmx_conn *conn = ctx->mxc_conn;                struct kmx_ctx *next = NULL;                LASSERT(conn != NULL);                spin_lock(&conn->mxk_lock);                list_del_init(&ctx->mxc_list);                conn->mxk_timeout = 0;                if (!list_empty(&conn->mxk_pending)) {                        next = list_entry(conn->mxk_pending.next, struct kmx_ctx, mxc_list);                        conn->mxk_timeout = next->mxc_deadline;                }                spin_unlock(&ctx->mxc_conn->mxk_lock);        }        return 0;}/** * mxlnd_peer_free - free the peer * @peer - a kmx_peer pointer * * The calling function should decrement the rxs, drain the tx queues and * remove the peer from the peers list first then destroy it. */voidmxlnd_peer_free(struct kmx_peer *peer){        CDEBUG(D_NET, "freeing peer 0x%p\n", peer);        LASSERT (atomic_read(&peer->mxp_refcount) == 0);        if (peer->mxp_host != NULL) {                spin_lock(&peer->mxp_host->mxh_lock);                peer->mxp_host->mxh_peer = NULL;                spin_unlock(&peer->mxp_host->mxh_lock);        }        if (!list_empty(&peer->mxp_peers)) {                /* assume we are locked */                list_del_init(&peer->mxp_peers);        }        MXLND_FREE (peer, sizeof (*peer));        atomic_dec(&kmxlnd_data.kmx_npeers);        return;}voidmxlnd_peer_hostname_to_nic_id(struct kmx_peer *peer){        u64             nic_id  = 0LL;        char            name[MX_MAX_HOSTNAME_LEN + 1];        mx_return_t     mxret   = MX_SUCCESS;        memset(name, 0, sizeof(name));        snprintf(name, sizeof(name), "%s:%d", peer->mxp_host->mxh_hostname, peer->mxp_host->mxh_board);        mxret = mx_hostname_to_nic_id(name, &nic_id);        if (mxret == MX_SUCCESS) {                peer->mxp_nic_id = nic_id;        } else {                CDEBUG(D_NETERROR, "mx_hostname_to_nic_id() failed for %s "                                   "with %s\n", mx_strerror(mxret), name);                mxret = mx_hostname_to_nic_id(peer->mxp_host->mxh_hostname, &nic_id);                if (mxret == MX_SUCCESS) {                        peer->mxp_nic_id = nic_id;                } else {                        CDEBUG(D_NETERROR, "mx_hostname_to_nic_id() failed for %s "                                           "with %s\n", mx_strerror(mxret),                                            peer->mxp_host->mxh_hostname);                }        }        return;}/** * mxlnd_peer_alloc - allocate and initialize a new peer struct * @peerp - address of a kmx_peer pointer * @nid - LNET node id * * Returns 0 on success and -ENOMEM on failure */intmxlnd_peer_alloc(struct kmx_peer **peerp, lnet_nid_t nid){        int                     i       = 0;        int                     ret     = 0;        u32                     addr    = LNET_NIDADDR(nid);        struct kmx_peer        *peer    = NULL;        struct kmx_host        *host    = NULL;        LASSERT (nid != LNET_NID_ANY && nid != 0LL);        MXLND_ALLOC(peer, sizeof (*peer));        if (peer == NULL) {                CDEBUG(D_NETERROR, "Cannot allocate peer for NID 0x%llx\n", nid);                return -ENOMEM;        }        CDEBUG(D_NET, "allocated peer 0x%p for NID 0x%llx\n", peer, nid);        memset(peer, 0, sizeof(*peer));        list_for_each_entry(host, &kmxlnd_data.kmx_hosts, mxh_list) {                if (addr == host->mxh_addr) {                        peer->mxp_host = host;                        spin_lock(&host->mxh_lock);                        host->mxh_peer = peer;                        spin_unlock(&host->mxh_lock);                        break;                }        }        LASSERT(peer->mxp_host != NULL);        peer->mxp_nid = nid;        /* peer->mxp_incarnation */        atomic_set(&peer->mxp_refcount, 1);     /* ref for kmx_peers list */        mxlnd_peer_hostname_to_nic_id(peer);        INIT_LIST_HEAD(&peer->mxp_peers);        spin_lock_init(&peer->mxp_lock);        INIT_LIST_HEAD(&peer->mxp_conns);        ret = mxlnd_conn_alloc(&peer->mxp_conn, peer);        if (ret != 0) {                mxlnd_peer_decref(peer);                return ret;        }        for (i = 0; i < *kmxlnd_tunables.kmx_credits - 1; i++) {                struct kmx_ctx   *rx     = NULL;                ret = mxlnd_ctx_alloc(&rx, MXLND_REQ_RX);                if (ret != 0) {                        mxlnd_reduce_idle_rxs(i);                        mxlnd_peer_decref(peer);                        return ret;                }                spin_lock(&kmxlnd_data.kmx_rxs_lock);                list_add_tail(&rx->mxc_global_list, &kmxlnd_data.kmx_rxs);                spin_unlock(&kmxlnd_data.kmx_rxs_lock);                rx->mxc_put = -1;                mxlnd_put_idle_rx(rx);        }        /* peer->mxp_reconnect_time = 0 */        /* peer->mxp_incompatible = 0 */        *peerp = peer;        return 0;}/** * mxlnd_nid_to_hash - hash the nid * @nid - msg pointer * * Takes the u64 nid and XORs the lowest N bits by the next lowest N bits. */static inline intmxlnd_nid_to_hash(lnet_nid_t nid){        return (nid & MXLND_HASH_MASK) ^               ((nid & (MXLND_HASH_MASK << MXLND_HASH_BITS)) >> MXLND_HASH_BITS);}static inline struct kmx_peer *mxlnd_find_peer_by_nid(lnet_nid_t nid){        int                     found   = 0;        int                     hash    = 0;        struct kmx_peer         *peer   = NULL;        hash = mxlnd_nid_to_hash(nid);        read_lock(&kmxlnd_data.kmx_peers_lock);        list_for_each_entry(peer, &kmxlnd_data.kmx_peers[hash], mxp_peers) {                if (peer->mxp_nid == nid) {                        found = 1;                        break;                }        }        read_unlock(&kmxlnd_data.kmx_peers_lock);        return (found ? peer : NULL);}static inline intmxlnd_tx_requires_credit(struct kmx_ctx *tx){

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -