📄 handlers.c
字号:
list_empty(&conn->uc_zcack_list)) { conn->uc_tx_flag = 0; ret = usocklnd_add_pollrequest(conn, POLL_TX_SET_REQUEST, 0); if (ret) rc = ret; } pthread_mutex_unlock(&conn->uc_lock); break; case UC_DEAD: break; default: LBUG(); } if (rc < 0) usocklnd_conn_kill(conn); return rc;}/* Return the first tx from tx_list with piggybacked zc_ack * from zcack_list when possible. If tx_list is empty, return * brand new noop tx for zc_ack from zcack_list. Return NULL * if an error happened */usock_tx_t *usocklnd_try_piggyback(struct list_head *tx_list_p, struct list_head *zcack_list_p){ usock_tx_t *tx; usock_zc_ack_t *zc_ack; /* assign tx and zc_ack */ if (list_empty(tx_list_p)) tx = NULL; else { tx = list_entry(tx_list_p->next, usock_tx_t, tx_list); list_del(&tx->tx_list); /* already piggybacked or partially send */ if (tx->tx_msg.ksm_zc_ack_cookie || tx->tx_resid != tx->tx_nob) return tx; } if (list_empty(zcack_list_p)) { /* nothing to piggyback */ return tx; } else { zc_ack = list_entry(zcack_list_p->next, usock_zc_ack_t, zc_list); list_del(&zc_ack->zc_list); } if (tx != NULL) /* piggyback the zc-ack cookie */ tx->tx_msg.ksm_zc_ack_cookie = zc_ack->zc_cookie; else /* cannot piggyback, need noop */ tx = usocklnd_create_noop_tx(zc_ack->zc_cookie); LIBCFS_FREE (zc_ack, sizeof(*zc_ack)); return tx;}/* All actions that we need after sending hello on active conn: * 1) update RX iov to receive hello * 2) state transition to UC_RECEIVING_HELLO * 3) notify poll_thread that we're waiting for incoming hello */intusocklnd_activeconn_hellosent(usock_conn_t *conn){ int rc = 0; pthread_mutex_lock(&conn->uc_lock); if (conn->uc_state != UC_DEAD) { usocklnd_rx_hellomagic_state_transition(conn); conn->uc_state = UC_RECEIVING_HELLO; conn->uc_tx_flag = 0; rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST, POLLIN); } pthread_mutex_unlock(&conn->uc_lock); return rc;}/* All actions that we need after sending hello on passive conn: * 1) Cope with 1st easy case: conn is already linked to a peer * 2) Cope with 2nd easy case: remove zombie conn * 3) Resolve race: * a) find the peer * b) link the conn to the peer if conn[idx] is empty * c) if the conn[idx] isn't empty and is in READY state, * remove the conn as duplicated * d) if the conn[idx] isn't empty and isn't in READY state, * override conn[idx] with the conn */intusocklnd_passiveconn_hellosent(usock_conn_t *conn){ usock_conn_t *conn2; usock_peer_t *peer; struct list_head tx_list; struct list_head zcack_list; int idx; int rc = 0; /* almost nothing to do if conn is already linked to peer hash table */ if (conn->uc_peer != NULL) goto passive_hellosent_done; /* conn->uc_peer == NULL, so the conn isn't accessible via * peer hash list, so nobody can touch the conn but us */ if (conn->uc_ni == NULL) /* remove zombie conn */ goto passive_hellosent_connkill; /* all code below is race resolution, because normally * passive conn is linked to peer just after receiving hello */ CFS_INIT_LIST_HEAD (&tx_list); CFS_INIT_LIST_HEAD (&zcack_list); /* conn is passive and isn't linked to any peer, so its tx and zc_ack lists have to be empty */ LASSERT (list_empty(&conn->uc_tx_list) && list_empty(&conn->uc_zcack_list) && conn->uc_sending == 0); rc = usocklnd_find_or_create_peer(conn->uc_ni, conn->uc_peerid, &peer); if (rc) return rc; idx = usocklnd_type2idx(conn->uc_type); /* try to link conn to peer */ pthread_mutex_lock(&peer->up_lock); if (peer->up_conns[idx] == NULL) { usocklnd_link_conn_to_peer(conn, peer, idx); usocklnd_conn_addref(conn); conn->uc_peer = peer; usocklnd_peer_addref(peer); } else { conn2 = peer->up_conns[idx]; pthread_mutex_lock(&conn2->uc_lock); if (conn2->uc_state == UC_READY) { /* conn2 is in READY state, so conn is "duplicated" */ pthread_mutex_unlock(&conn2->uc_lock); pthread_mutex_unlock(&peer->up_lock); usocklnd_peer_decref(peer); goto passive_hellosent_connkill; } /* uc_state != UC_READY => switch conn and conn2 */ /* Relink txs and zc_acks from conn2 to conn. * We're sure that nobody but us can access to conn, * nevertheless we use mutex (if we're wrong yet, * deadlock is easy to see that corrupted list */ list_add(&tx_list, &conn2->uc_tx_list); list_del_init(&conn2->uc_tx_list); list_add(&zcack_list, &conn2->uc_zcack_list); list_del_init(&conn2->uc_zcack_list); pthread_mutex_lock(&conn->uc_lock); list_add_tail(&conn->uc_tx_list, &tx_list); list_del_init(&tx_list); list_add_tail(&conn->uc_zcack_list, &zcack_list); list_del_init(&zcack_list); conn->uc_peer = peer; pthread_mutex_unlock(&conn->uc_lock); conn2->uc_peer = NULL; /* make conn2 zombie */ pthread_mutex_unlock(&conn2->uc_lock); usocklnd_conn_decref(conn2); usocklnd_link_conn_to_peer(conn, peer, idx); usocklnd_conn_addref(conn); conn->uc_peer = peer; } lnet_ni_decref(conn->uc_ni); conn->uc_ni = NULL; pthread_mutex_unlock(&peer->up_lock); usocklnd_peer_decref(peer); passive_hellosent_done: /* safely transit to UC_READY state */ /* rc == 0 */ pthread_mutex_lock(&conn->uc_lock); if (conn->uc_state != UC_DEAD) { usocklnd_rx_ksmhdr_state_transition(conn); /* we're ready to recive incoming packets and maybe already have smth. to transmit */ LASSERT (conn->uc_sending == 0); if ( list_empty(&conn->uc_tx_list) && list_empty(&conn->uc_zcack_list) ) { conn->uc_tx_flag = 0; rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST, POLLIN); } else { conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout); conn->uc_tx_flag = 1; rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST, POLLIN | POLLOUT); } if (rc == 0) conn->uc_state = UC_READY; } pthread_mutex_unlock(&conn->uc_lock); return rc; passive_hellosent_connkill: usocklnd_conn_kill(conn); return 0;}/* Send as much tx data as possible. * Returns 0 or 1 on succsess, <0 if fatal error. * 0 means partial send or non-fatal error, 1 - complete. * Rely on libcfs_sock_writev() for differentiating fatal and * non-fatal errors. An error should be considered as non-fatal if: * 1) it still makes sense to continue reading && * 2) anyway, poll() will set up POLLHUP|POLLERR flags */intusocklnd_send_tx(usock_conn_t *conn, usock_tx_t *tx){ struct iovec *iov; int nob; int fd = conn->uc_fd; cfs_time_t t; LASSERT (tx->tx_resid != 0); do { usock_peer_t *peer = conn->uc_peer; LASSERT (tx->tx_niov > 0); nob = libcfs_sock_writev(fd, tx->tx_iov, tx->tx_niov); if (nob < 0) conn->uc_errored = 1; if (nob <= 0) /* write queue is flow-controlled or error */ return nob; LASSERT (nob <= tx->tx_resid); tx->tx_resid -= nob; t = cfs_time_current(); conn->uc_tx_deadline = cfs_time_add(t, cfs_time_seconds(usock_tuns.ut_timeout)); if(peer != NULL) peer->up_last_alive = t; /* "consume" iov */ iov = tx->tx_iov; do { LASSERT (tx->tx_niov > 0); if (nob < iov->iov_len) { iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob); iov->iov_len -= nob; break; } nob -= iov->iov_len; tx->tx_iov = ++iov; tx->tx_niov--; } while (nob != 0); } while (tx->tx_resid != 0); return 1; /* send complete */}/* Read from wire as much data as possible. * Returns 0 or 1 on succsess, <0 if error or EOF. * 0 means partial read, 1 - complete */intusocklnd_read_data(usock_conn_t *conn){ struct iovec *iov; int nob; cfs_time_t t; LASSERT (conn->uc_rx_nob_wanted != 0); do { usock_peer_t *peer = conn->uc_peer; LASSERT (conn->uc_rx_niov > 0); nob = libcfs_sock_readv(conn->uc_fd, conn->uc_rx_iov, conn->uc_rx_niov); if (nob <= 0) {/* read nothing or error */ conn->uc_errored = 1; return nob; } LASSERT (nob <= conn->uc_rx_nob_wanted); conn->uc_rx_nob_wanted -= nob; conn->uc_rx_nob_left -= nob; t = cfs_time_current(); conn->uc_rx_deadline = cfs_time_add(t, cfs_time_seconds(usock_tuns.ut_timeout)); if(peer != NULL) peer->up_last_alive = t; /* "consume" iov */ iov = conn->uc_rx_iov; do { LASSERT (conn->uc_rx_niov > 0); if (nob < iov->iov_len) { iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob); iov->iov_len -= nob; break; } nob -= iov->iov_len; conn->uc_rx_iov = ++iov; conn->uc_rx_niov--; } while (nob != 0); } while (conn->uc_rx_nob_wanted != 0); return 1; /* read complete */}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -