📄 handlers.c
字号:
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (C) 2001, 2002 Cluster File Systems, Inc. * Author: Maxim Patlasov <maxim@clusterfs.com> * * This file is part of the Lustre file system, http://www.lustre.org * Lustre is a trademark of Cluster File Systems, Inc. * */#include "usocklnd.h"#include <unistd.h>#include <syscall.h>intusocklnd_notifier_handler(int fd){ int notification; return syscall(SYS_read, fd, ¬ification, sizeof(notification));}voidusocklnd_exception_handler(usock_conn_t *conn){ pthread_mutex_lock(&conn->uc_lock); if (conn->uc_state == UC_CONNECTING || conn->uc_state == UC_SENDING_HELLO) usocklnd_conn_kill_locked(conn); pthread_mutex_unlock(&conn->uc_lock); }intusocklnd_read_handler(usock_conn_t *conn){ int rc; int continue_reading; int state; read_again: rc = 0; pthread_mutex_lock(&conn->uc_lock); state = conn->uc_state; /* process special case: LNET calls lnd_recv() asyncronously */ if (state == UC_READY && conn->uc_rx_state == UC_RX_PARSE) { /* still don't have usocklnd_recv() called */ rc = usocklnd_add_pollrequest(conn, POLL_RX_SET_REQUEST, 0); if (rc == 0) conn->uc_rx_state = UC_RX_PARSE_WAIT; else usocklnd_conn_kill_locked(conn); pthread_mutex_unlock(&conn->uc_lock); return rc; } pthread_mutex_unlock(&conn->uc_lock); /* From here and below the conn cannot be changed * asyncronously, except: * 1) usocklnd_send() can work with uc_tx_list and uc_zcack_list, * 2) usocklnd_shutdown() can change uc_state to UC_DEAD */ switch (state) { case UC_RECEIVING_HELLO: case UC_READY: if (conn->uc_rx_nob_wanted != 0) { /* read from conn fd as much wanted data as possible */ rc = usocklnd_read_data(conn); if (rc == 0) /* partial read */ break; if (rc < 0) {/* error happened or EOF */ usocklnd_conn_kill(conn); break; } } /* process incoming data */ if (state == UC_READY ) rc = usocklnd_read_msg(conn, &continue_reading); else /* state == UC_RECEIVING_HELLO */ rc = usocklnd_read_hello(conn, &continue_reading); if (rc < 0) { usocklnd_conn_kill(conn); break; } if (continue_reading) goto read_again; break; case UC_DEAD: break; default: LBUG(); } return rc;}/* Switch on rx_state. * Return 0 on success, 1 if whole packet is read, else return <0 * Always set cont_flag: 1 if we're ready to continue reading, else 0 * NB: If whole packet is read, cont_flag will be set to zero to take * care of fairess */intusocklnd_read_msg(usock_conn_t *conn, int *cont_flag){ int rc = 0; __u64 cookie; *cont_flag = 0; /* smth. new emerged in RX part - let's process it */ switch (conn->uc_rx_state) { case UC_RX_KSM_HEADER: if (conn->uc_flip) { __swab32s(&conn->uc_rx_msg.ksm_type); __swab32s(&conn->uc_rx_msg.ksm_csum); __swab64s(&conn->uc_rx_msg.ksm_zc_req_cookie); __swab64s(&conn->uc_rx_msg.ksm_zc_ack_cookie); } /* we never send packets for wich zc-acking is required */ if (conn->uc_rx_msg.ksm_type != KSOCK_MSG_LNET || conn->uc_rx_msg.ksm_zc_ack_cookie != 0) { conn->uc_errored = 1; return -EPROTO; } /* zc_req will be processed later, when lnet payload will be received */ usocklnd_rx_lnethdr_state_transition(conn); *cont_flag = 1; break; case UC_RX_LNET_HEADER: if (the_lnet.ln_pid & LNET_PID_USERFLAG) { /* replace dest_nid,pid (ksocknal sets its own) */ conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr.dest_nid = cpu_to_le64(conn->uc_peer->up_ni->ni_nid); conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr.dest_pid = cpu_to_le32(the_lnet.ln_pid); } else if (conn->uc_peer->up_peerid.pid & LNET_PID_USERFLAG) { /* Userspace peer */ lnet_process_id_t *id = &conn->uc_peer->up_peerid; lnet_hdr_t *lhdr = &conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr; /* Substitute process ID assigned at connection time */ lhdr->src_pid = cpu_to_le32(id->pid); lhdr->src_nid = cpu_to_le64(id->nid); } conn->uc_rx_state = UC_RX_PARSE; usocklnd_conn_addref(conn); /* ++ref while parsing */ rc = lnet_parse(conn->uc_peer->up_ni, &conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr, conn->uc_peerid.nid, conn, 0); if (rc < 0) { /* I just received garbage: give up on this conn */ conn->uc_errored = 1; usocklnd_conn_decref(conn); return -EPROTO; } /* Race with usocklnd_recv() is possible */ pthread_mutex_lock(&conn->uc_lock); LASSERT (conn->uc_rx_state == UC_RX_PARSE || conn->uc_rx_state == UC_RX_LNET_PAYLOAD); /* check whether usocklnd_recv() got called */ if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) *cont_flag = 1; pthread_mutex_unlock(&conn->uc_lock); break; case UC_RX_PARSE: LBUG(); /* it's error to be here, because this special * case is handled by caller */ break; case UC_RX_PARSE_WAIT: LBUG(); /* it's error to be here, because the conn * shouldn't wait for POLLIN event in this * state */ break; case UC_RX_LNET_PAYLOAD: /* payload all received */ lnet_finalize(conn->uc_peer->up_ni, conn->uc_rx_lnetmsg, 0); cookie = conn->uc_rx_msg.ksm_zc_req_cookie; if (cookie != 0) rc = usocklnd_handle_zc_req(conn->uc_peer, cookie); if (rc != 0) { /* change state not to finalize twice */ conn->uc_rx_state = UC_RX_KSM_HEADER; return -EPROTO; } /* Fall through */ case UC_RX_SKIPPING: if (conn->uc_rx_nob_left != 0) { usocklnd_rx_skipping_state_transition(conn); *cont_flag = 1; } else { usocklnd_rx_ksmhdr_state_transition(conn); rc = 1; /* whole packet is read */ } break; default: LBUG(); /* unknown state */ } return rc;}/* Handle incoming ZC request from sender. * NB: it's called only from read_handler, so we're sure that * the conn cannot become zombie in the middle of processing */intusocklnd_handle_zc_req(usock_peer_t *peer, __u64 cookie){ usock_conn_t *conn; usock_zc_ack_t *zc_ack; int type; int rc; int dummy; LIBCFS_ALLOC (zc_ack, sizeof(*zc_ack)); if (zc_ack == NULL) return -ENOMEM; zc_ack->zc_cookie = cookie; /* Let's assume that CONTROL is the best type for zcack, * but userspace clients don't use typed connections */ if (the_lnet.ln_pid & LNET_PID_USERFLAG) type = SOCKLND_CONN_ANY; else type = SOCKLND_CONN_CONTROL; rc = usocklnd_find_or_create_conn(peer, type, &conn, NULL, zc_ack, &dummy); if (rc != 0) { LIBCFS_FREE (zc_ack, sizeof(*zc_ack)); return rc; } usocklnd_conn_decref(conn); return 0;}/* Switch on rx_state. * Return 0 on success, else return <0 * Always set cont_flag: 1 if we're ready to continue reading, else 0 */intusocklnd_read_hello(usock_conn_t *conn, int *cont_flag){ int rc = 0; ksock_hello_msg_t *hello = conn->uc_rx_hello; *cont_flag = 0; /* smth. new emerged in hello - let's process it */ switch (conn->uc_rx_state) { case UC_RX_HELLO_MAGIC: if (hello->kshm_magic == LNET_PROTO_MAGIC) conn->uc_flip = 0; else if (hello->kshm_magic == __swab32(LNET_PROTO_MAGIC)) conn->uc_flip = 1; else return -EPROTO; usocklnd_rx_helloversion_state_transition(conn); *cont_flag = 1; break; case UC_RX_HELLO_VERSION: if ((!conn->uc_flip && (hello->kshm_version != KSOCK_PROTO_V2)) || (conn->uc_flip && (hello->kshm_version != __swab32(KSOCK_PROTO_V2)))) return -EPROTO; usocklnd_rx_hellobody_state_transition(conn); *cont_flag = 1; break; case UC_RX_HELLO_BODY: if (conn->uc_flip) { ksock_hello_msg_t *hello = conn->uc_rx_hello; __swab32s(&hello->kshm_src_pid); __swab64s(&hello->kshm_src_nid); __swab32s(&hello->kshm_dst_pid); __swab64s(&hello->kshm_dst_nid); __swab64s(&hello->kshm_src_incarnation); __swab64s(&hello->kshm_dst_incarnation); __swab32s(&hello->kshm_ctype); __swab32s(&hello->kshm_nips); } if (conn->uc_rx_hello->kshm_nips > LNET_MAX_INTERFACES) { CERROR("Bad nips %d from ip %u.%u.%u.%u port %d\n", conn->uc_rx_hello->kshm_nips, HIPQUAD(conn->uc_peer_ip), conn->uc_peer_port); return -EPROTO; } if (conn->uc_rx_hello->kshm_nips) { usocklnd_rx_helloIPs_state_transition(conn); *cont_flag = 1; break; } /* fall through */ case UC_RX_HELLO_IPS: if (conn->uc_activeflag == 1) /* active conn */ rc = usocklnd_activeconn_hellorecv(conn); else /* passive conn */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -