📄 conn.c
字号:
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (C) 2001, 2002 Cluster File Systems, Inc. * Author: Maxim Patlasov <maxim@clusterfs.com> * * This file is part of the Lustre file system, http://www.lustre.org * Lustre is a trademark of Cluster File Systems, Inc. * */#include "usocklnd.h"/* Return 1 if the conn is timed out, 0 else */intusocklnd_conn_timed_out(usock_conn_t *conn, cfs_time_t current_time){ if (conn->uc_tx_flag && /* sending is in progress */ cfs_time_aftereq(current_time, conn->uc_tx_deadline)) return 1; if (conn->uc_rx_flag && /* receiving is in progress */ cfs_time_aftereq(current_time, conn->uc_rx_deadline)) return 1; return 0;}voidusocklnd_conn_kill(usock_conn_t *conn){ pthread_mutex_lock(&conn->uc_lock); if (conn->uc_state != UC_DEAD) usocklnd_conn_kill_locked(conn); pthread_mutex_unlock(&conn->uc_lock); }/* Mark the conn as DEAD and schedule its deletion */voidusocklnd_conn_kill_locked(usock_conn_t *conn){ conn->uc_rx_flag = conn->uc_tx_flag = 0; conn->uc_state = UC_DEAD; usocklnd_add_killrequest(conn);}usock_conn_t *usocklnd_conn_allocate(){ usock_conn_t *conn; usock_pollrequest_t *pr; LIBCFS_ALLOC (pr, sizeof(*pr)); if (pr == NULL) return NULL; LIBCFS_ALLOC (conn, sizeof(*conn)); if (conn == NULL) { LIBCFS_FREE (pr, sizeof(*pr)); return NULL; } memset(conn, 0, sizeof(*conn)); conn->uc_preq = pr; LIBCFS_ALLOC (conn->uc_rx_hello, offsetof(ksock_hello_msg_t, kshm_ips[LNET_MAX_INTERFACES])); if (conn->uc_rx_hello == NULL) { LIBCFS_FREE (pr, sizeof(*pr)); LIBCFS_FREE (conn, sizeof(*conn)); return NULL; } return conn;}voidusocklnd_conn_free(usock_conn_t *conn){ usock_pollrequest_t *pr = conn->uc_preq; if (pr != NULL) LIBCFS_FREE (pr, sizeof(*pr)); if (conn->uc_rx_hello != NULL) LIBCFS_FREE (conn->uc_rx_hello, offsetof(ksock_hello_msg_t, kshm_ips[LNET_MAX_INTERFACES])); LIBCFS_FREE (conn, sizeof(*conn));}voidusocklnd_tear_peer_conn(usock_conn_t *conn){ usock_peer_t *peer = conn->uc_peer; int idx = usocklnd_type2idx(conn->uc_type); lnet_ni_t *ni; lnet_process_id_t id; int decref_flag = 0; int killall_flag = 0; if (peer == NULL) /* nothing to tear */ return; pthread_mutex_lock(&peer->up_lock); pthread_mutex_lock(&conn->uc_lock); ni = peer->up_ni; id = peer->up_peerid; if (peer->up_conns[idx] == conn) { if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) { /* change state not to finalize twice */ conn->uc_rx_state = UC_RX_KSM_HEADER; lnet_finalize(peer->up_ni, conn->uc_rx_lnetmsg, -EIO); } usocklnd_destroy_txlist(peer->up_ni, &conn->uc_tx_list); peer->up_conns[idx] = NULL; conn->uc_peer = NULL; decref_flag = 1; if(conn->uc_errored && !peer->up_errored) peer->up_errored = killall_flag = 1; } pthread_mutex_unlock(&conn->uc_lock); if (killall_flag) usocklnd_del_conns_locked(peer); pthread_mutex_unlock(&peer->up_lock); if (!decref_flag) return; usocklnd_conn_decref(conn); usocklnd_peer_decref(peer); usocklnd_check_peer_stale(ni, id);}/* Remove peer from hash list if all up_conns[i] is NULL && * hash table is the only consumer of the peer */voidusocklnd_check_peer_stale(lnet_ni_t *ni, lnet_process_id_t id){ usock_peer_t *peer; pthread_rwlock_wrlock(&usock_data.ud_peers_lock); peer = usocklnd_find_peer_locked(ni, id); if (peer == NULL) { pthread_rwlock_unlock(&usock_data.ud_peers_lock); return; } if (cfs_atomic_read(&peer->up_refcount) == 2) { int i; for (i = 0; i < N_CONN_TYPES; i++) LASSERT (peer->up_conns[i] == NULL); list_del(&peer->up_list); if (peer->up_errored && (peer->up_peerid.pid & LNET_PID_USERFLAG) == 0) lnet_notify (peer->up_ni, peer->up_peerid.nid, 0, cfs_time_seconds(peer->up_last_alive)); usocklnd_peer_decref(peer); } usocklnd_peer_decref(peer); pthread_rwlock_unlock(&usock_data.ud_peers_lock);}/* Returns 0 on success, <0 else */intusocklnd_create_passive_conn(lnet_ni_t *ni, int fd, usock_conn_t **connp){ int rc; __u32 peer_ip; __u16 peer_port; usock_conn_t *conn; rc = libcfs_getpeername(fd, &peer_ip, &peer_port); if (rc) return rc; rc = usocklnd_set_sock_options(fd); if (rc) return rc; conn = usocklnd_conn_allocate(); if (conn == NULL) return -ENOMEM; usocklnd_rx_hellomagic_state_transition(conn); conn->uc_fd = fd; conn->uc_peer_ip = peer_ip; conn->uc_peer_port = peer_port; conn->uc_state = UC_RECEIVING_HELLO; conn->uc_pt_idx = usocklnd_ip2pt_idx(peer_ip); conn->uc_ni = ni; CFS_INIT_LIST_HEAD (&conn->uc_tx_list); CFS_INIT_LIST_HEAD (&conn->uc_zcack_list); pthread_mutex_init(&conn->uc_lock, NULL); cfs_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */ *connp = conn; return 0;}/* Returns 0 on success, <0 else */intusocklnd_create_active_conn(usock_peer_t *peer, int type, usock_conn_t **connp){ int rc; int fd; usock_conn_t *conn; __u32 dst_ip = LNET_NIDADDR(peer->up_peerid.nid); __u16 dst_port = lnet_acceptor_port(); conn = usocklnd_conn_allocate(); if (conn == NULL) return -ENOMEM; conn->uc_tx_hello = usocklnd_create_cr_hello_tx(peer->up_ni, type, peer->up_peerid.nid); if (conn->uc_tx_hello == NULL) { usocklnd_conn_free(conn); return -ENOMEM; } if (the_lnet.ln_pid & LNET_PID_USERFLAG) rc = usocklnd_connect_cli_mode(&fd, dst_ip, dst_port); else rc = usocklnd_connect_srv_mode(&fd, dst_ip, dst_port); if (rc) { usocklnd_destroy_tx(NULL, conn->uc_tx_hello); usocklnd_conn_free(conn); return rc; } conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout); conn->uc_tx_flag = 1; conn->uc_fd = fd; conn->uc_peer_ip = dst_ip; conn->uc_peer_port = dst_port; conn->uc_type = type; conn->uc_activeflag = 1; conn->uc_state = UC_CONNECTING; conn->uc_pt_idx = usocklnd_ip2pt_idx(dst_ip); conn->uc_ni = NULL; conn->uc_peerid = peer->up_peerid; conn->uc_peer = peer; usocklnd_peer_addref(peer); CFS_INIT_LIST_HEAD (&conn->uc_tx_list); CFS_INIT_LIST_HEAD (&conn->uc_zcack_list); pthread_mutex_init(&conn->uc_lock, NULL); cfs_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */ *connp = conn; return 0;}/* Returns 0 on success, <0 else */intusocklnd_connect_srv_mode(int *fdp, __u32 dst_ip, __u16 dst_port){ __u16 port; int fd; int rc; for (port = LNET_ACCEPTOR_MAX_RESERVED_PORT; port >= LNET_ACCEPTOR_MIN_RESERVED_PORT; port--) { /* Iterate through reserved ports. */ rc = libcfs_sock_create(&fd); if (rc) return rc; rc = libcfs_sock_bind_to_port(fd, port); if (rc) { close(fd); continue; } rc = usocklnd_set_sock_options(fd); if (rc) { close(fd); return rc; } rc = libcfs_sock_connect(fd, dst_ip, dst_port); if (rc == 0) { *fdp = fd; return 0; } if (rc != -EADDRINUSE && rc != -EADDRNOTAVAIL) { close(fd); return rc; } close(fd); } CERROR("Can't bind to any reserved port\n"); return rc;}/* Returns 0 on success, <0 else */intusocklnd_connect_cli_mode(int *fdp, __u32 dst_ip, __u16 dst_port){ int fd; int rc; rc = libcfs_sock_create(&fd); if (rc) return rc; rc = usocklnd_set_sock_options(fd); if (rc) { close(fd); return rc; } rc = libcfs_sock_connect(fd, dst_ip, dst_port); if (rc) { close(fd); return rc; } *fdp = fd; return 0;}intusocklnd_set_sock_options(int fd){ int rc; rc = libcfs_sock_set_nagle(fd, usock_tuns.ut_socknagle); if (rc) return rc; if (usock_tuns.ut_sockbufsiz) { rc = libcfs_sock_set_bufsiz(fd, usock_tuns.ut_sockbufsiz); if (rc) return rc; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -