📄 conn.c
字号:
return libcfs_fcntl_nonblock(fd);}voidusocklnd_init_msg(ksock_msg_t *msg, int type){ msg->ksm_type = type; msg->ksm_csum = 0; msg->ksm_zc_req_cookie = 0; msg->ksm_zc_ack_cookie = 0;}usock_tx_t *usocklnd_create_noop_tx(__u64 cookie){ usock_tx_t *tx; LIBCFS_ALLOC (tx, sizeof(usock_tx_t)); if (tx == NULL) return NULL; tx->tx_size = sizeof(usock_tx_t); tx->tx_lnetmsg = NULL; usocklnd_init_msg(&tx->tx_msg, KSOCK_MSG_NOOP); tx->tx_msg.ksm_zc_ack_cookie = cookie; tx->tx_iova[0].iov_base = (void *)&tx->tx_msg; tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob = offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_hdr); tx->tx_iov = tx->tx_iova; tx->tx_niov = 1; return tx;} usock_tx_t *usocklnd_create_tx(lnet_msg_t *lntmsg){ usock_tx_t *tx; unsigned int payload_niov = lntmsg->msg_niov; struct iovec *payload_iov = lntmsg->msg_iov; unsigned int payload_offset = lntmsg->msg_offset; unsigned int payload_nob = lntmsg->msg_len; int size = offsetof(usock_tx_t, tx_iova[1 + payload_niov]); LIBCFS_ALLOC (tx, size); if (tx == NULL) return NULL; tx->tx_size = size; tx->tx_lnetmsg = lntmsg; tx->tx_resid = tx->tx_nob = offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_payload) + payload_nob; usocklnd_init_msg(&tx->tx_msg, KSOCK_MSG_LNET); tx->tx_msg.ksm_u.lnetmsg.ksnm_hdr = lntmsg->msg_hdr; tx->tx_iova[0].iov_base = (void *)&tx->tx_msg; tx->tx_iova[0].iov_len = offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_payload); tx->tx_iov = tx->tx_iova; tx->tx_niov = 1 + lnet_extract_iov(payload_niov, &tx->tx_iov[1], payload_niov, payload_iov, payload_offset, payload_nob); return tx;}voidusocklnd_init_hello_msg(ksock_hello_msg_t *hello, lnet_ni_t *ni, int type, lnet_nid_t peer_nid){ usock_net_t *net = (usock_net_t *)ni->ni_data; hello->kshm_magic = LNET_PROTO_MAGIC; hello->kshm_version = KSOCK_PROTO_V2; hello->kshm_nips = 0; hello->kshm_ctype = type; hello->kshm_dst_incarnation = 0; /* not used */ hello->kshm_src_incarnation = net->un_incarnation; hello->kshm_src_pid = the_lnet.ln_pid; hello->kshm_src_nid = ni->ni_nid; hello->kshm_dst_nid = peer_nid; hello->kshm_dst_pid = 0; /* not used */}usock_tx_t *usocklnd_create_hello_tx(lnet_ni_t *ni, int type, lnet_nid_t peer_nid){ usock_tx_t *tx; int size; ksock_hello_msg_t *hello; size = sizeof(usock_tx_t) + offsetof(ksock_hello_msg_t, kshm_ips); LIBCFS_ALLOC (tx, size); if (tx == NULL) return NULL; tx->tx_size = size; tx->tx_lnetmsg = NULL; hello = (ksock_hello_msg_t *)&tx->tx_iova[1]; usocklnd_init_hello_msg(hello, ni, type, peer_nid); tx->tx_iova[0].iov_base = (void *)hello; tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob = offsetof(ksock_hello_msg_t, kshm_ips); tx->tx_iov = tx->tx_iova; tx->tx_niov = 1; return tx;}usock_tx_t *usocklnd_create_cr_hello_tx(lnet_ni_t *ni, int type, lnet_nid_t peer_nid){ usock_tx_t *tx; int size; lnet_acceptor_connreq_t *cr; ksock_hello_msg_t *hello; size = sizeof(usock_tx_t) + sizeof(lnet_acceptor_connreq_t) + offsetof(ksock_hello_msg_t, kshm_ips); LIBCFS_ALLOC (tx, size); if (tx == NULL) return NULL; tx->tx_size = size; tx->tx_lnetmsg = NULL; cr = (lnet_acceptor_connreq_t *)&tx->tx_iova[1]; memset(cr, 0, sizeof(*cr)); cr->acr_magic = LNET_PROTO_ACCEPTOR_MAGIC; cr->acr_version = LNET_PROTO_ACCEPTOR_VERSION; cr->acr_nid = peer_nid; hello = (ksock_hello_msg_t *)((char *)cr + sizeof(*cr)); usocklnd_init_hello_msg(hello, ni, type, peer_nid); tx->tx_iova[0].iov_base = (void *)cr; tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob = sizeof(lnet_acceptor_connreq_t) + offsetof(ksock_hello_msg_t, kshm_ips); tx->tx_iov = tx->tx_iova; tx->tx_niov = 1; return tx;}voidusocklnd_destroy_tx(lnet_ni_t *ni, usock_tx_t *tx){ lnet_msg_t *lnetmsg = tx->tx_lnetmsg; int rc = (tx->tx_resid == 0) ? 0 : -EIO; LASSERT (ni != NULL || lnetmsg == NULL); LIBCFS_FREE (tx, tx->tx_size); if (lnetmsg != NULL) /* NOOP and hello go without lnetmsg */ lnet_finalize(ni, lnetmsg, rc);}voidusocklnd_destroy_txlist(lnet_ni_t *ni, struct list_head *txlist){ usock_tx_t *tx; while (!list_empty(txlist)) { tx = list_entry(txlist->next, usock_tx_t, tx_list); list_del(&tx->tx_list); usocklnd_destroy_tx(ni, tx); }}voidusocklnd_destroy_zcack_list(struct list_head *zcack_list){ usock_zc_ack_t *zcack; while (!list_empty(zcack_list)) { zcack = list_entry(zcack_list->next, usock_zc_ack_t, zc_list); list_del(&zcack->zc_list); LIBCFS_FREE (zcack, sizeof(*zcack)); }}voidusocklnd_destroy_peer(usock_peer_t *peer){ usock_net_t *net = peer->up_ni->ni_data; int i; for (i = 0; i < N_CONN_TYPES; i++) LASSERT (peer->up_conns[i] == NULL); LIBCFS_FREE (peer, sizeof (*peer)); pthread_mutex_lock(&net->un_lock); if(--net->un_peercount == 0) pthread_cond_signal(&net->un_cond); pthread_mutex_unlock(&net->un_lock);}voidusocklnd_destroy_conn(usock_conn_t *conn){ LASSERT (conn->uc_peer == NULL || conn->uc_ni == NULL); if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) { LASSERT (conn->uc_peer != NULL); lnet_finalize(conn->uc_peer->up_ni, conn->uc_rx_lnetmsg, -EIO); } if (!list_empty(&conn->uc_tx_list)) { LASSERT (conn->uc_peer != NULL); usocklnd_destroy_txlist(conn->uc_peer->up_ni, &conn->uc_tx_list); } usocklnd_destroy_zcack_list(&conn->uc_zcack_list); if (conn->uc_peer != NULL) usocklnd_peer_decref(conn->uc_peer); if (conn->uc_ni != NULL) lnet_ni_decref(conn->uc_ni); if (conn->uc_tx_hello) usocklnd_destroy_tx(NULL, conn->uc_tx_hello); usocklnd_conn_free(conn);}intusocklnd_get_conn_type(lnet_msg_t *lntmsg){ int nob; if (the_lnet.ln_pid & LNET_PID_USERFLAG) return SOCKLND_CONN_ANY; nob = offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_payload) + lntmsg->msg_len; if (nob >= usock_tuns.ut_min_bulk) return SOCKLND_CONN_BULK_OUT; else return SOCKLND_CONN_CONTROL;}int usocklnd_type2idx(int type){ switch (type) { case SOCKLND_CONN_ANY: case SOCKLND_CONN_CONTROL: return 0; case SOCKLND_CONN_BULK_IN: return 1; case SOCKLND_CONN_BULK_OUT: return 2; default: LBUG(); }}usock_peer_t *usocklnd_find_peer_locked(lnet_ni_t *ni, lnet_process_id_t id){ struct list_head *peer_list = usocklnd_nid2peerlist(id.nid); struct list_head *tmp; usock_peer_t *peer; list_for_each (tmp, peer_list) { peer = list_entry (tmp, usock_peer_t, up_list); if (peer->up_ni != ni) continue; if (peer->up_peerid.nid != id.nid || peer->up_peerid.pid != id.pid) continue; usocklnd_peer_addref(peer); return peer; } return (NULL);}intusocklnd_create_peer(lnet_ni_t *ni, lnet_process_id_t id, usock_peer_t **peerp){ usock_net_t *net = ni->ni_data; usock_peer_t *peer; int i; LIBCFS_ALLOC (peer, sizeof (*peer)); if (peer == NULL) return -ENOMEM; for (i = 0; i < N_CONN_TYPES; i++) peer->up_conns[i] = NULL; peer->up_peerid = id; peer->up_ni = ni; peer->up_incrn_is_set = 0; peer->up_errored = 0; peer->up_last_alive = 0; cfs_atomic_set (&peer->up_refcount, 1); /* 1 ref for caller */ pthread_mutex_init(&peer->up_lock, NULL); pthread_mutex_lock(&net->un_lock); net->un_peercount++; pthread_mutex_unlock(&net->un_lock); *peerp = peer; return 0;}/* Safely create new peer if needed. Save result in *peerp. * Returns 0 on success, <0 else */intusocklnd_find_or_create_peer(lnet_ni_t *ni, lnet_process_id_t id, usock_peer_t **peerp){ int rc; usock_peer_t *peer; usock_peer_t *peer2; usock_net_t *net = ni->ni_data; pthread_rwlock_rdlock(&usock_data.ud_peers_lock); peer = usocklnd_find_peer_locked(ni, id); pthread_rwlock_unlock(&usock_data.ud_peers_lock); if (peer != NULL) goto find_or_create_peer_done; rc = usocklnd_create_peer(ni, id, &peer); if (rc) return rc; pthread_rwlock_wrlock(&usock_data.ud_peers_lock); peer2 = usocklnd_find_peer_locked(ni, id); if (peer2 == NULL) { if (net->un_shutdown) { pthread_rwlock_unlock(&usock_data.ud_peers_lock); usocklnd_peer_decref(peer); /* should destroy peer */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -