📄 mxlnd_cb.c
字号:
CDEBUG(D_NETERROR, "mx_cancel() returned %s (%d)\n", mx_strerror(mxret), mxret); } if (result == 1) { ctx->mxc_status.code = -ECONNABORTED; ctx->mxc_state = MXLND_CTX_CANCELED; /* NOTE this calls lnet_finalize() and * we cannot hold any locks when calling it. * It also calls mxlnd_conn_decref(conn) */ spin_unlock(&conn->mxk_lock); mxlnd_handle_rx_completion(ctx); spin_lock(&conn->mxk_lock); } break; } } spin_unlock(&conn->mxk_lock); } while (found); return;}/** * mxlnd_conn_disconnect - shutdown a connection * @conn - a kmx_conn pointer * * This function sets the status to DISCONNECT, completes queued * txs with failure, calls mx_disconnect, which will complete * pending txs and matched rxs with failure. */voidmxlnd_conn_disconnect(struct kmx_conn *conn, int mx_dis, int notify){ struct list_head *tmp = NULL; spin_lock(&conn->mxk_lock); if (conn->mxk_status == MXLND_CONN_DISCONNECT) { spin_unlock(&conn->mxk_lock); return; } conn->mxk_status = MXLND_CONN_DISCONNECT; conn->mxk_timeout = 0; while (!list_empty(&conn->mxk_tx_free_queue) || !list_empty(&conn->mxk_tx_credit_queue)) { struct kmx_ctx *tx = NULL; if (!list_empty(&conn->mxk_tx_free_queue)) { tmp = &conn->mxk_tx_free_queue; } else { tmp = &conn->mxk_tx_credit_queue; } tx = list_entry(tmp->next, struct kmx_ctx, mxc_list); list_del_init(&tx->mxc_list); tx->mxc_status.code = -ECONNABORTED; spin_unlock(&conn->mxk_lock); mxlnd_put_idle_tx(tx); mxlnd_conn_decref(conn); /* for this tx */ spin_lock(&conn->mxk_lock); } spin_unlock(&conn->mxk_lock); /* cancel pending rxs */ mxlnd_conn_cancel_pending_rxs(conn); if (kmxlnd_data.kmx_shutdown != 1) { if (mx_dis) mx_disconnect(kmxlnd_data.kmx_endpt, conn->mxk_epa); if (notify) { time_t last_alive = 0; unsigned long last_msg = 0; /* notify LNET that we are giving up on this peer */ if (time_after(conn->mxk_last_rx, conn->mxk_last_tx)) { last_msg = conn->mxk_last_rx; } else { last_msg = conn->mxk_last_tx; } last_alive = cfs_time_current_sec() - cfs_duration_sec(cfs_time_current() - last_msg); lnet_notify(kmxlnd_data.kmx_ni, conn->mxk_peer->mxp_nid, 0, last_alive); } } mxlnd_conn_decref(conn); /* drop the owning peer's reference */ return;}/** * mxlnd_conn_alloc - allocate and initialize a new conn struct * @connp - address of a kmx_conn pointer * @peer - owning kmx_peer * * Returns 0 on success and -ENOMEM on failure */intmxlnd_conn_alloc(struct kmx_conn **connp, struct kmx_peer *peer){ struct kmx_conn *conn = NULL; LASSERT(peer != NULL); MXLND_ALLOC(conn, sizeof (*conn)); if (conn == NULL) { CDEBUG(D_NETERROR, "Cannot allocate conn\n"); return -ENOMEM; } CDEBUG(D_NET, "allocated conn 0x%p for peer 0x%p\n", conn, peer); memset(conn, 0, sizeof(*conn)); /* conn->mxk_incarnation = 0 - will be set by peer */ atomic_set(&conn->mxk_refcount, 1); /* ref for owning peer */ conn->mxk_peer = peer; /* mxk_epa - to be set after mx_iconnect() */ INIT_LIST_HEAD(&conn->mxk_list); spin_lock_init(&conn->mxk_lock); /* conn->mxk_timeout = 0 */ conn->mxk_last_tx = jiffies; conn->mxk_last_rx = conn->mxk_last_tx; conn->mxk_credits = *kmxlnd_tunables.kmx_credits; /* mxk_outstanding = 0 */ conn->mxk_status = MXLND_CONN_INIT; INIT_LIST_HEAD(&conn->mxk_tx_credit_queue); INIT_LIST_HEAD(&conn->mxk_tx_free_queue); /* conn->mxk_ntx_msgs = 0 */ /* conn->mxk_ntx_data = 0 */ /* conn->mxk_ntx_posted = 0 */ /* conn->mxk_data_posted = 0 */ INIT_LIST_HEAD(&conn->mxk_pending); *connp = conn; mxlnd_peer_addref(peer); /* add a ref for this conn */ /* add to front of peer's conns list */ spin_lock(&peer->mxp_lock); list_add(&conn->mxk_list, &peer->mxp_conns); peer->mxp_conn = conn; spin_unlock(&peer->mxp_lock); return 0;}intmxlnd_q_pending_ctx(struct kmx_ctx *ctx){ int ret = 0; struct kmx_conn *conn = ctx->mxc_conn; ctx->mxc_state = MXLND_CTX_PENDING; if (conn != NULL) { spin_lock(&conn->mxk_lock); if (conn->mxk_status >= MXLND_CONN_INIT) { list_add_tail(&ctx->mxc_list, &conn->mxk_pending); if (conn->mxk_timeout == 0 || ctx->mxc_deadline < conn->mxk_timeout) { conn->mxk_timeout = ctx->mxc_deadline; } } else { ctx->mxc_state = MXLND_CTX_COMPLETED; ret = -1; } spin_unlock(&conn->mxk_lock); } return ret;}intmxlnd_deq_pending_ctx(struct kmx_ctx *ctx){ LASSERT(ctx->mxc_state == MXLND_CTX_PENDING || ctx->mxc_state == MXLND_CTX_COMPLETED); if (ctx->mxc_state != MXLND_CTX_PENDING && ctx->mxc_state != MXLND_CTX_COMPLETED) { CDEBUG(D_NETERROR, "deq ctx->mxc_state = %s\n", mxlnd_ctxstate_to_str(ctx->mxc_state)); } ctx->mxc_state = MXLND_CTX_COMPLETED; if (!list_empty(&ctx->mxc_list)) { struct kmx_conn *conn = ctx->mxc_conn; struct kmx_ctx *next = NULL; LASSERT(conn != NULL); spin_lock(&conn->mxk_lock); list_del_init(&ctx->mxc_list); conn->mxk_timeout = 0; if (!list_empty(&conn->mxk_pending)) { next = list_entry(conn->mxk_pending.next, struct kmx_ctx, mxc_list); conn->mxk_timeout = next->mxc_deadline; } spin_unlock(&ctx->mxc_conn->mxk_lock); } return 0;}/** * mxlnd_peer_free - free the peer * @peer - a kmx_peer pointer * * The calling function should decrement the rxs, drain the tx queues and * remove the peer from the peers list first then destroy it. */voidmxlnd_peer_free(struct kmx_peer *peer){ CDEBUG(D_NET, "freeing peer 0x%p\n", peer); LASSERT (atomic_read(&peer->mxp_refcount) == 0); if (peer->mxp_host != NULL) { spin_lock(&peer->mxp_host->mxh_lock); peer->mxp_host->mxh_peer = NULL; spin_unlock(&peer->mxp_host->mxh_lock); } if (!list_empty(&peer->mxp_peers)) { /* assume we are locked */ list_del_init(&peer->mxp_peers); } MXLND_FREE (peer, sizeof (*peer)); atomic_dec(&kmxlnd_data.kmx_npeers); return;}voidmxlnd_peer_hostname_to_nic_id(struct kmx_peer *peer){ u64 nic_id = 0LL; char name[MX_MAX_HOSTNAME_LEN + 1]; mx_return_t mxret = MX_SUCCESS; memset(name, 0, sizeof(name)); snprintf(name, sizeof(name), "%s:%d", peer->mxp_host->mxh_hostname, peer->mxp_host->mxh_board); mxret = mx_hostname_to_nic_id(name, &nic_id); if (mxret == MX_SUCCESS) { peer->mxp_nic_id = nic_id; } else { CDEBUG(D_NETERROR, "mx_hostname_to_nic_id() failed for %s " "with %s\n", mx_strerror(mxret), name); mxret = mx_hostname_to_nic_id(peer->mxp_host->mxh_hostname, &nic_id); if (mxret == MX_SUCCESS) { peer->mxp_nic_id = nic_id; } else { CDEBUG(D_NETERROR, "mx_hostname_to_nic_id() failed for %s " "with %s\n", mx_strerror(mxret), peer->mxp_host->mxh_hostname); } } return;}/** * mxlnd_peer_alloc - allocate and initialize a new peer struct * @peerp - address of a kmx_peer pointer * @nid - LNET node id * * Returns 0 on success and -ENOMEM on failure */intmxlnd_peer_alloc(struct kmx_peer **peerp, lnet_nid_t nid){ int i = 0; int ret = 0; u32 addr = LNET_NIDADDR(nid); struct kmx_peer *peer = NULL; struct kmx_host *host = NULL; LASSERT (nid != LNET_NID_ANY && nid != 0LL); MXLND_ALLOC(peer, sizeof (*peer)); if (peer == NULL) { CDEBUG(D_NETERROR, "Cannot allocate peer for NID 0x%llx\n", nid); return -ENOMEM; } CDEBUG(D_NET, "allocated peer 0x%p for NID 0x%llx\n", peer, nid); memset(peer, 0, sizeof(*peer)); list_for_each_entry(host, &kmxlnd_data.kmx_hosts, mxh_list) { if (addr == host->mxh_addr) { peer->mxp_host = host; spin_lock(&host->mxh_lock); host->mxh_peer = peer; spin_unlock(&host->mxh_lock); break; } } LASSERT(peer->mxp_host != NULL); peer->mxp_nid = nid; /* peer->mxp_incarnation */ atomic_set(&peer->mxp_refcount, 1); /* ref for kmx_peers list */ mxlnd_peer_hostname_to_nic_id(peer); INIT_LIST_HEAD(&peer->mxp_peers); spin_lock_init(&peer->mxp_lock); INIT_LIST_HEAD(&peer->mxp_conns); ret = mxlnd_conn_alloc(&peer->mxp_conn, peer); if (ret != 0) { mxlnd_peer_decref(peer); return ret; } for (i = 0; i < *kmxlnd_tunables.kmx_credits - 1; i++) { struct kmx_ctx *rx = NULL; ret = mxlnd_ctx_alloc(&rx, MXLND_REQ_RX); if (ret != 0) { mxlnd_reduce_idle_rxs(i); mxlnd_peer_decref(peer); return ret; } spin_lock(&kmxlnd_data.kmx_rxs_lock); list_add_tail(&rx->mxc_global_list, &kmxlnd_data.kmx_rxs); spin_unlock(&kmxlnd_data.kmx_rxs_lock); rx->mxc_put = -1; mxlnd_put_idle_rx(rx); } /* peer->mxp_reconnect_time = 0 */ /* peer->mxp_incompatible = 0 */ *peerp = peer; return 0;}/** * mxlnd_nid_to_hash - hash the nid * @nid - msg pointer * * Takes the u64 nid and XORs the lowest N bits by the next lowest N bits. */static inline intmxlnd_nid_to_hash(lnet_nid_t nid){ return (nid & MXLND_HASH_MASK) ^ ((nid & (MXLND_HASH_MASK << MXLND_HASH_BITS)) >> MXLND_HASH_BITS);}static inline struct kmx_peer *mxlnd_find_peer_by_nid(lnet_nid_t nid){ int found = 0; int hash = 0; struct kmx_peer *peer = NULL; hash = mxlnd_nid_to_hash(nid); read_lock(&kmxlnd_data.kmx_peers_lock); list_for_each_entry(peer, &kmxlnd_data.kmx_peers[hash], mxp_peers) { if (peer->mxp_nid == nid) { found = 1; break; } } read_unlock(&kmxlnd_data.kmx_peers_lock); return (found ? peer : NULL);}static inline intmxlnd_tx_requires_credit(struct kmx_ctx *tx){
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -