📄 conn.c
字号:
CERROR("Can't create peer: network shutdown\n"); return -ESHUTDOWN; } /* peer table will take 1 of my refs on peer */ usocklnd_peer_addref(peer); list_add_tail (&peer->up_list, usocklnd_nid2peerlist(id.nid)); } else { usocklnd_peer_decref(peer); /* should destroy peer */ peer = peer2; } pthread_rwlock_unlock(&usock_data.ud_peers_lock); find_or_create_peer_done: *peerp = peer; return 0;}/* NB: both peer and conn locks are held */static intusocklnd_enqueue_zcack(usock_conn_t *conn, usock_zc_ack_t *zc_ack){ if (conn->uc_state == UC_READY && list_empty(&conn->uc_tx_list) && list_empty(&conn->uc_zcack_list) && !conn->uc_sending) { int rc = usocklnd_add_pollrequest(conn, POLL_TX_SET_REQUEST, POLLOUT); if (rc != 0) return rc; } list_add_tail(&zc_ack->zc_list, &conn->uc_zcack_list); return 0;}/* NB: both peer and conn locks are held * NB: if sending isn't in progress. the caller *MUST* send tx * immediately after we'll return */static voidusocklnd_enqueue_tx(usock_conn_t *conn, usock_tx_t *tx, int *send_immediately){ if (conn->uc_state == UC_READY && list_empty(&conn->uc_tx_list) && list_empty(&conn->uc_zcack_list) && !conn->uc_sending) { conn->uc_sending = 1; *send_immediately = 1; return; } *send_immediately = 0; list_add_tail(&tx->tx_list, &conn->uc_tx_list);}/* Safely create new conn if needed. Save result in *connp. * Returns 0 on success, <0 else */intusocklnd_find_or_create_conn(usock_peer_t *peer, int type, usock_conn_t **connp, usock_tx_t *tx, usock_zc_ack_t *zc_ack, int *send_immediately){ usock_conn_t *conn; int idx; int rc; lnet_pid_t userflag = peer->up_peerid.pid & LNET_PID_USERFLAG; if (userflag) type = SOCKLND_CONN_ANY; idx = usocklnd_type2idx(type); pthread_mutex_lock(&peer->up_lock); if (peer->up_conns[idx] != NULL) { conn = peer->up_conns[idx]; LASSERT(conn->uc_type == type); } else { if (userflag) { CERROR("Refusing to create a connection to " "userspace process %s\n", libcfs_id2str(peer->up_peerid)); rc = -EHOSTUNREACH; goto find_or_create_conn_failed; } rc = usocklnd_create_active_conn(peer, type, &conn); if (rc) { peer->up_errored = 1; usocklnd_del_conns_locked(peer); goto find_or_create_conn_failed; } /* peer takes 1 of conn refcount */ usocklnd_link_conn_to_peer(conn, peer, idx); rc = usocklnd_add_pollrequest(conn, POLL_ADD_REQUEST, POLLOUT); if (rc) { peer->up_conns[idx] = NULL; usocklnd_conn_decref(conn); /* should destroy conn */ goto find_or_create_conn_failed; } usocklnd_wakeup_pollthread(conn->uc_pt_idx); } pthread_mutex_lock(&conn->uc_lock); LASSERT(conn->uc_peer == peer); LASSERT(tx == NULL || zc_ack == NULL); if (tx != NULL) { usocklnd_enqueue_tx(conn, tx, send_immediately); } else { rc = usocklnd_enqueue_zcack(conn, zc_ack); if (rc != 0) { usocklnd_conn_kill_locked(conn); pthread_mutex_unlock(&conn->uc_lock); goto find_or_create_conn_failed; } } pthread_mutex_unlock(&conn->uc_lock); usocklnd_conn_addref(conn); pthread_mutex_unlock(&peer->up_lock); *connp = conn; return 0; find_or_create_conn_failed: pthread_mutex_unlock(&peer->up_lock); return rc;}voidusocklnd_link_conn_to_peer(usock_conn_t *conn, usock_peer_t *peer, int idx){ peer->up_conns[idx] = conn; peer->up_errored = 0; /* this new fresh conn will try * revitalize even stale errored peer */}intusocklnd_invert_type(int type){ switch (type) { case SOCKLND_CONN_ANY: case SOCKLND_CONN_CONTROL: return (type); case SOCKLND_CONN_BULK_IN: return SOCKLND_CONN_BULK_OUT; case SOCKLND_CONN_BULK_OUT: return SOCKLND_CONN_BULK_IN; default: return SOCKLND_CONN_NONE; }}voidusocklnd_conn_new_state(usock_conn_t *conn, int new_state){ pthread_mutex_lock(&conn->uc_lock); if (conn->uc_state != UC_DEAD) conn->uc_state = new_state; pthread_mutex_unlock(&conn->uc_lock);}/* NB: peer is locked by caller */voidusocklnd_cleanup_stale_conns(usock_peer_t *peer, __u64 incrn, usock_conn_t *skip_conn){ int i; if (!peer->up_incrn_is_set) { peer->up_incarnation = incrn; peer->up_incrn_is_set = 1; return; } if (peer->up_incarnation == incrn) return; peer->up_incarnation = incrn; for (i = 0; i < N_CONN_TYPES; i++) { usock_conn_t *conn = peer->up_conns[i]; if (conn == NULL || conn == skip_conn) continue; pthread_mutex_lock(&conn->uc_lock); LASSERT (conn->uc_peer == peer); conn->uc_peer = NULL; peer->up_conns[i] = NULL; if (conn->uc_state != UC_DEAD) usocklnd_conn_kill_locked(conn); pthread_mutex_unlock(&conn->uc_lock); usocklnd_conn_decref(conn); usocklnd_peer_decref(peer); }}/* RX state transition to UC_RX_HELLO_MAGIC: update RX part to receive * MAGIC part of hello and set uc_rx_state */voidusocklnd_rx_hellomagic_state_transition(usock_conn_t *conn){ LASSERT(conn->uc_rx_hello != NULL); conn->uc_rx_niov = 1; conn->uc_rx_iov = conn->uc_rx_iova; conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_magic; conn->uc_rx_iov[0].iov_len = conn->uc_rx_nob_wanted = conn->uc_rx_nob_left = sizeof(conn->uc_rx_hello->kshm_magic); conn->uc_rx_state = UC_RX_HELLO_MAGIC; conn->uc_rx_flag = 1; /* waiting for incoming hello */ conn->uc_rx_deadline = cfs_time_shift(usock_tuns.ut_timeout);}/* RX state transition to UC_RX_HELLO_VERSION: update RX part to receive * VERSION part of hello and set uc_rx_state */voidusocklnd_rx_helloversion_state_transition(usock_conn_t *conn){ LASSERT(conn->uc_rx_hello != NULL); conn->uc_rx_niov = 1; conn->uc_rx_iov = conn->uc_rx_iova; conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_version; conn->uc_rx_iov[0].iov_len = conn->uc_rx_nob_wanted = conn->uc_rx_nob_left = sizeof(conn->uc_rx_hello->kshm_version); conn->uc_rx_state = UC_RX_HELLO_VERSION;}/* RX state transition to UC_RX_HELLO_BODY: update RX part to receive * the rest of hello and set uc_rx_state */voidusocklnd_rx_hellobody_state_transition(usock_conn_t *conn){ LASSERT(conn->uc_rx_hello != NULL); conn->uc_rx_niov = 1; conn->uc_rx_iov = conn->uc_rx_iova; conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_src_nid; conn->uc_rx_iov[0].iov_len = conn->uc_rx_nob_wanted = conn->uc_rx_nob_left = offsetof(ksock_hello_msg_t, kshm_ips) - offsetof(ksock_hello_msg_t, kshm_src_nid); conn->uc_rx_state = UC_RX_HELLO_BODY;}/* RX state transition to UC_RX_HELLO_IPS: update RX part to receive * array of IPs and set uc_rx_state */voidusocklnd_rx_helloIPs_state_transition(usock_conn_t *conn){ LASSERT(conn->uc_rx_hello != NULL); conn->uc_rx_niov = 1; conn->uc_rx_iov = conn->uc_rx_iova; conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_ips; conn->uc_rx_iov[0].iov_len = conn->uc_rx_nob_wanted = conn->uc_rx_nob_left = conn->uc_rx_hello->kshm_nips * sizeof(conn->uc_rx_hello->kshm_ips[0]); conn->uc_rx_state = UC_RX_HELLO_IPS;}/* RX state transition to UC_RX_LNET_HEADER: update RX part to receive * LNET header and set uc_rx_state */voidusocklnd_rx_lnethdr_state_transition(usock_conn_t *conn){ conn->uc_rx_niov = 1; conn->uc_rx_iov = conn->uc_rx_iova; conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg.ksm_u.lnetmsg; conn->uc_rx_iov[0].iov_len = conn->uc_rx_nob_wanted = conn->uc_rx_nob_left = sizeof(ksock_lnet_msg_t); conn->uc_rx_state = UC_RX_LNET_HEADER; conn->uc_rx_flag = 1;}/* RX state transition to UC_RX_KSM_HEADER: update RX part to receive * KSM header and set uc_rx_state */voidusocklnd_rx_ksmhdr_state_transition(usock_conn_t *conn){ conn->uc_rx_niov = 1; conn->uc_rx_iov = conn->uc_rx_iova; conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg; conn->uc_rx_iov[0].iov_len = conn->uc_rx_nob_wanted = conn->uc_rx_nob_left = offsetof(ksock_msg_t, ksm_u); conn->uc_rx_state = UC_RX_KSM_HEADER; conn->uc_rx_flag = 0;}/* RX state transition to UC_RX_SKIPPING: update RX part for * skipping and set uc_rx_state */voidusocklnd_rx_skipping_state_transition(usock_conn_t *conn){ static char skip_buffer[4096]; int nob; unsigned int niov = 0; int skipped = 0; int nob_to_skip = conn->uc_rx_nob_left; LASSERT(nob_to_skip != 0); conn->uc_rx_iov = conn->uc_rx_iova; /* Set up to skip as much as possible now. If there's more left * (ran out of iov entries) we'll get called again */ do { nob = MIN (nob_to_skip, sizeof(skip_buffer)); conn->uc_rx_iov[niov].iov_base = skip_buffer; conn->uc_rx_iov[niov].iov_len = nob; niov++; skipped += nob; nob_to_skip -=nob; } while (nob_to_skip != 0 && /* mustn't overflow conn's rx iov */ niov < sizeof(conn->uc_rx_iova) / sizeof (struct iovec)); conn->uc_rx_niov = niov; conn->uc_rx_nob_wanted = skipped; conn->uc_rx_state = UC_RX_SKIPPING;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -