📄 lowcomms.c
字号:
/***************************************************************************************************************************************************************** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved.**** This copyrighted material is made available to anyone wishing to use,** modify, copy, or redistribute it subject to the terms and conditions** of the GNU General Public License v.2.***************************************************************************************************************************************************************//* * lowcomms.c * * This is the "low-level" comms layer. * * It is responsible for sending/receiving messages * from other nodes in the cluster. * * Cluster nodes are referred to by their nodeids. nodeids are * simply 32 bit numbers to the locking module - if they need to * be expanded for the cluster infrastructure then that is it's * responsibility. It is this layer's * responsibility to resolve these into IP address or * whatever it needs for inter-node communication. * * The comms level is two kernel threads that deal mainly with * the receiving of messages from other nodes and passing them * up to the mid-level comms layer (which understands the * message format) for execution by the locking core, and * a send thread which does all the setting up of connections * to remote nodes and the sending of data. Threads are not allowed * to send their own data because it may cause them to wait in times * of high load. Also, this way, the sending thread can collect together * messages bound for one node and send them in one block. * * lowcomms will choose to use wither TCP or SCTP as its transport layer * depending on the configuration variable 'protocol'. This should be set * to 0 (default) for TCP or 1 for SCTP. It shouldbe configured using a * cluster-wide mechanism as it must be the same on all nodes of the cluster * for the DLM to function. * */#include <asm/ioctls.h>#include <net/sock.h>#include <net/tcp.h>#include <linux/pagemap.h>#include <linux/idr.h>#include <linux/file.h>#include <linux/sctp.h>#include <net/sctp/user.h>#include "dlm_internal.h"#include "lowcomms.h"#include "midcomms.h"#include "config.h"#define NEEDED_RMEM (4*1024*1024)struct cbuf { unsigned int base; unsigned int len; unsigned int mask;};static void cbuf_add(struct cbuf *cb, int n){ cb->len += n;}static int cbuf_data(struct cbuf *cb){ return ((cb->base + cb->len) & cb->mask);}static void cbuf_init(struct cbuf *cb, int size){ cb->base = cb->len = 0; cb->mask = size-1;}static void cbuf_eat(struct cbuf *cb, int n){ cb->len -= n; cb->base += n; cb->base &= cb->mask;}static bool cbuf_empty(struct cbuf *cb){ return cb->len == 0;}struct connection { struct socket *sock; /* NULL if not connected */ uint32_t nodeid; /* So we know who we are in the list */ struct mutex sock_mutex; unsigned long flags;#define CF_READ_PENDING 1#define CF_WRITE_PENDING 2#define CF_CONNECT_PENDING 3#define CF_INIT_PENDING 4#define CF_IS_OTHERCON 5 struct list_head writequeue; /* List of outgoing writequeue_entries */ spinlock_t writequeue_lock; int (*rx_action) (struct connection *); /* What to do when active */ void (*connect_action) (struct connection *); /* What to do to connect */ struct page *rx_page; struct cbuf cb; int retries;#define MAX_CONNECT_RETRIES 3 int sctp_assoc; struct connection *othercon; struct work_struct rwork; /* Receive workqueue */ struct work_struct swork; /* Send workqueue */};#define sock2con(x) ((struct connection *)(x)->sk_user_data)/* An entry waiting to be sent */struct writequeue_entry { struct list_head list; struct page *page; int offset; int len; int end; int users; struct connection *con;};static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];static int dlm_local_count;/* Work queues */static struct workqueue_struct *recv_workqueue;static struct workqueue_struct *send_workqueue;static DEFINE_IDR(connections_idr);static DECLARE_MUTEX(connections_lock);static int max_nodeid;static struct kmem_cache *con_cache;static void process_recv_sockets(struct work_struct *work);static void process_send_sockets(struct work_struct *work);/* * If 'allocation' is zero then we don't attempt to create a new * connection structure for this node. */static struct connection *__nodeid2con(int nodeid, gfp_t alloc){ struct connection *con = NULL; int r; int n; con = idr_find(&connections_idr, nodeid); if (con || !alloc) return con; r = idr_pre_get(&connections_idr, alloc); if (!r) return NULL; con = kmem_cache_zalloc(con_cache, alloc); if (!con) return NULL; r = idr_get_new_above(&connections_idr, con, nodeid, &n); if (r) { kmem_cache_free(con_cache, con); return NULL; } if (n != nodeid) { idr_remove(&connections_idr, n); kmem_cache_free(con_cache, con); return NULL; } con->nodeid = nodeid; mutex_init(&con->sock_mutex); INIT_LIST_HEAD(&con->writequeue); spin_lock_init(&con->writequeue_lock); INIT_WORK(&con->swork, process_send_sockets); INIT_WORK(&con->rwork, process_recv_sockets); /* Setup action pointers for child sockets */ if (con->nodeid) { struct connection *zerocon = idr_find(&connections_idr, 0); con->connect_action = zerocon->connect_action; if (!con->rx_action) con->rx_action = zerocon->rx_action; } if (nodeid > max_nodeid) max_nodeid = nodeid; return con;}static struct connection *nodeid2con(int nodeid, gfp_t allocation){ struct connection *con; down(&connections_lock); con = __nodeid2con(nodeid, allocation); up(&connections_lock); return con;}/* This is a bit drastic, but only called when things go wrong */static struct connection *assoc2con(int assoc_id){ int i; struct connection *con; down(&connections_lock); for (i=0; i<=max_nodeid; i++) { con = __nodeid2con(i, 0); if (con && con->sctp_assoc == assoc_id) { up(&connections_lock); return con; } } up(&connections_lock); return NULL;}static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr){ struct sockaddr_storage addr; int error; if (!dlm_local_count) return -1; error = dlm_nodeid_to_addr(nodeid, &addr); if (error) return error; if (dlm_local_addr[0]->ss_family == AF_INET) { struct sockaddr_in *in4 = (struct sockaddr_in *) &addr; struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr; ret4->sin_addr.s_addr = in4->sin_addr.s_addr; } else { struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr; struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr; memcpy(&ret6->sin6_addr, &in6->sin6_addr, sizeof(in6->sin6_addr)); } return 0;}/* Data available on socket or listen socket received a connect */static void lowcomms_data_ready(struct sock *sk, int count_unused){ struct connection *con = sock2con(sk); if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags)) queue_work(recv_workqueue, &con->rwork);}static void lowcomms_write_space(struct sock *sk){ struct connection *con = sock2con(sk); if (con && !test_and_set_bit(CF_WRITE_PENDING, &con->flags)) queue_work(send_workqueue, &con->swork);}static inline void lowcomms_connect_sock(struct connection *con){ if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) queue_work(send_workqueue, &con->swork);}static void lowcomms_state_change(struct sock *sk){ if (sk->sk_state == TCP_ESTABLISHED) lowcomms_write_space(sk);}/* Make a socket active */static int add_sock(struct socket *sock, struct connection *con){ con->sock = sock; /* Install a data_ready callback */ con->sock->sk->sk_data_ready = lowcomms_data_ready; con->sock->sk->sk_write_space = lowcomms_write_space; con->sock->sk->sk_state_change = lowcomms_state_change; con->sock->sk->sk_user_data = con; return 0;}/* Add the port number to an IPv6 or 4 sockaddr and return the address length */static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, int *addr_len){ saddr->ss_family = dlm_local_addr[0]->ss_family; if (saddr->ss_family == AF_INET) { struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; in4_addr->sin_port = cpu_to_be16(port); *addr_len = sizeof(struct sockaddr_in); memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); } else { struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; in6_addr->sin6_port = cpu_to_be16(port); *addr_len = sizeof(struct sockaddr_in6); } memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);}/* Close a remote connection and tidy up */static void close_connection(struct connection *con, bool and_other){ mutex_lock(&con->sock_mutex); if (con->sock) { sock_release(con->sock); con->sock = NULL; } if (con->othercon && and_other) { /* Will only re-enter once. */ close_connection(con->othercon, false); } if (con->rx_page) { __free_page(con->rx_page); con->rx_page = NULL; } con->retries = 0; mutex_unlock(&con->sock_mutex);}/* We only send shutdown messages to nodes that are not part of the cluster */static void sctp_send_shutdown(sctp_assoc_t associd){ static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; struct msghdr outmessage; struct cmsghdr *cmsg; struct sctp_sndrcvinfo *sinfo; int ret; struct connection *con; con = nodeid2con(0,0); BUG_ON(con == NULL); outmessage.msg_name = NULL; outmessage.msg_namelen = 0; outmessage.msg_control = outcmsg; outmessage.msg_controllen = sizeof(outcmsg); outmessage.msg_flags = MSG_EOR; cmsg = CMSG_FIRSTHDR(&outmessage); cmsg->cmsg_level = IPPROTO_SCTP; cmsg->cmsg_type = SCTP_SNDRCV; cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); outmessage.msg_controllen = cmsg->cmsg_len; sinfo = CMSG_DATA(cmsg); memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); sinfo->sinfo_flags |= MSG_EOF; sinfo->sinfo_assoc_id = associd; ret = kernel_sendmsg(con->sock, &outmessage, NULL, 0, 0); if (ret != 0) log_print("send EOF to node failed: %d", ret);}/* INIT failed but we don't know which node... restart INIT on all pending nodes */static void sctp_init_failed(void){ int i; struct connection *con; down(&connections_lock); for (i=1; i<=max_nodeid; i++) { con = __nodeid2con(i, 0); if (!con) continue; con->sctp_assoc = 0; if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) { queue_work(send_workqueue, &con->swork); } } } up(&connections_lock);}/* Something happened to an association */static void process_sctp_notification(struct connection *con, struct msghdr *msg, char *buf){ union sctp_notification *sn = (union sctp_notification *)buf; if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) { switch (sn->sn_assoc_change.sac_state) { case SCTP_COMM_UP: case SCTP_RESTART: { /* Check that the new node is in the lockspace */ struct sctp_prim prim; int nodeid; int prim_len, ret; int addr_len; struct connection *new_con; struct file *file; sctp_peeloff_arg_t parg; int parglen = sizeof(parg); /* * We get this before any data for an association. * We verify that the node is in the cluster and * then peel off a socket for it. */ if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) { log_print("COMM_UP for invalid assoc ID %d", (int)sn->sn_assoc_change.sac_assoc_id); sctp_init_failed(); return; } memset(&prim, 0, sizeof(struct sctp_prim)); prim_len = sizeof(struct sctp_prim); prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id; ret = kernel_getsockopt(con->sock, IPPROTO_SCTP, SCTP_PRIMARY_ADDR, (char*)&prim, &prim_len); if (ret < 0) { log_print("getsockopt/sctp_primary_addr on " "new assoc %d failed : %d", (int)sn->sn_assoc_change.sac_assoc_id, ret); /* Retry INIT later */ new_con = assoc2con(sn->sn_assoc_change.sac_assoc_id); if (new_con) clear_bit(CF_CONNECT_PENDING, &con->flags); return; } make_sockaddr(&prim.ssp_addr, 0, &addr_len); if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) { int i; unsigned char *b=(unsigned char *)&prim.ssp_addr; log_print("reject connect from unknown addr"); for (i=0; i<sizeof(struct sockaddr_storage);i++) printk("%02x ", b[i]); printk("\n"); sctp_send_shutdown(prim.ssp_assoc_id); return; } new_con = nodeid2con(nodeid, GFP_KERNEL); if (!new_con) return; /* Peel off a new sock */ parg.associd = sn->sn_assoc_change.sac_assoc_id; ret = kernel_getsockopt(con->sock, IPPROTO_SCTP, SCTP_SOCKOPT_PEELOFF, (void *)&parg, &parglen); if (ret) { log_print("Can't peel off a socket for " "connection %d to node %d: err=%d\n", parg.associd, nodeid, ret); } file = fget(parg.sd); new_con->sock = SOCKET_I(file->f_dentry->d_inode); add_sock(new_con->sock, new_con); fput(file); put_unused_fd(parg.sd); log_print("got new/restarted association %d nodeid %d", (int)sn->sn_assoc_change.sac_assoc_id, nodeid); /* Send any pending writes */ clear_bit(CF_CONNECT_PENDING, &new_con->flags); clear_bit(CF_INIT_PENDING, &con->flags); if (!test_and_set_bit(CF_WRITE_PENDING, &new_con->flags)) { queue_work(send_workqueue, &new_con->swork); } if (!test_and_set_bit(CF_READ_PENDING, &new_con->flags))
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -