⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 lowcomms.c

📁 linux 内核源代码
💻 C
📖 第 1 页 / 共 3 页
字号:
/*****************************************************************************************************************************************************************  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.**  Copyright (C) 2004-2007 Red Hat, Inc.  All rights reserved.****  This copyrighted material is made available to anyone wishing to use,**  modify, copy, or redistribute it subject to the terms and conditions**  of the GNU General Public License v.2.***************************************************************************************************************************************************************//* * lowcomms.c * * This is the "low-level" comms layer. * * It is responsible for sending/receiving messages * from other nodes in the cluster. * * Cluster nodes are referred to by their nodeids. nodeids are * simply 32 bit numbers to the locking module - if they need to * be expanded for the cluster infrastructure then that is it's * responsibility. It is this layer's * responsibility to resolve these into IP address or * whatever it needs for inter-node communication. * * The comms level is two kernel threads that deal mainly with * the receiving of messages from other nodes and passing them * up to the mid-level comms layer (which understands the * message format) for execution by the locking core, and * a send thread which does all the setting up of connections * to remote nodes and the sending of data. Threads are not allowed * to send their own data because it may cause them to wait in times * of high load. Also, this way, the sending thread can collect together * messages bound for one node and send them in one block. * * lowcomms will choose to use wither TCP or SCTP as its transport layer * depending on the configuration variable 'protocol'. This should be set * to 0 (default) for TCP or 1 for SCTP. It shouldbe configured using a * cluster-wide mechanism as it must be the same on all nodes of the cluster * for the DLM to function. * */#include <asm/ioctls.h>#include <net/sock.h>#include <net/tcp.h>#include <linux/pagemap.h>#include <linux/idr.h>#include <linux/file.h>#include <linux/sctp.h>#include <net/sctp/user.h>#include "dlm_internal.h"#include "lowcomms.h"#include "midcomms.h"#include "config.h"#define NEEDED_RMEM (4*1024*1024)struct cbuf {	unsigned int base;	unsigned int len;	unsigned int mask;};static void cbuf_add(struct cbuf *cb, int n){	cb->len += n;}static int cbuf_data(struct cbuf *cb){	return ((cb->base + cb->len) & cb->mask);}static void cbuf_init(struct cbuf *cb, int size){	cb->base = cb->len = 0;	cb->mask = size-1;}static void cbuf_eat(struct cbuf *cb, int n){	cb->len  -= n;	cb->base += n;	cb->base &= cb->mask;}static bool cbuf_empty(struct cbuf *cb){	return cb->len == 0;}struct connection {	struct socket *sock;	/* NULL if not connected */	uint32_t nodeid;	/* So we know who we are in the list */	struct mutex sock_mutex;	unsigned long flags;#define CF_READ_PENDING 1#define CF_WRITE_PENDING 2#define CF_CONNECT_PENDING 3#define CF_INIT_PENDING 4#define CF_IS_OTHERCON 5	struct list_head writequeue;  /* List of outgoing writequeue_entries */	spinlock_t writequeue_lock;	int (*rx_action) (struct connection *);	/* What to do when active */	void (*connect_action) (struct connection *);	/* What to do to connect */	struct page *rx_page;	struct cbuf cb;	int retries;#define MAX_CONNECT_RETRIES 3	int sctp_assoc;	struct connection *othercon;	struct work_struct rwork; /* Receive workqueue */	struct work_struct swork; /* Send workqueue */};#define sock2con(x) ((struct connection *)(x)->sk_user_data)/* An entry waiting to be sent */struct writequeue_entry {	struct list_head list;	struct page *page;	int offset;	int len;	int end;	int users;	struct connection *con;};static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];static int dlm_local_count;/* Work queues */static struct workqueue_struct *recv_workqueue;static struct workqueue_struct *send_workqueue;static DEFINE_IDR(connections_idr);static DECLARE_MUTEX(connections_lock);static int max_nodeid;static struct kmem_cache *con_cache;static void process_recv_sockets(struct work_struct *work);static void process_send_sockets(struct work_struct *work);/* * If 'allocation' is zero then we don't attempt to create a new * connection structure for this node. */static struct connection *__nodeid2con(int nodeid, gfp_t alloc){	struct connection *con = NULL;	int r;	int n;	con = idr_find(&connections_idr, nodeid);	if (con || !alloc)		return con;	r = idr_pre_get(&connections_idr, alloc);	if (!r)		return NULL;	con = kmem_cache_zalloc(con_cache, alloc);	if (!con)		return NULL;	r = idr_get_new_above(&connections_idr, con, nodeid, &n);	if (r) {		kmem_cache_free(con_cache, con);		return NULL;	}	if (n != nodeid) {		idr_remove(&connections_idr, n);		kmem_cache_free(con_cache, con);		return NULL;	}	con->nodeid = nodeid;	mutex_init(&con->sock_mutex);	INIT_LIST_HEAD(&con->writequeue);	spin_lock_init(&con->writequeue_lock);	INIT_WORK(&con->swork, process_send_sockets);	INIT_WORK(&con->rwork, process_recv_sockets);	/* Setup action pointers for child sockets */	if (con->nodeid) {		struct connection *zerocon = idr_find(&connections_idr, 0);		con->connect_action = zerocon->connect_action;		if (!con->rx_action)			con->rx_action = zerocon->rx_action;	}	if (nodeid > max_nodeid)		max_nodeid = nodeid;	return con;}static struct connection *nodeid2con(int nodeid, gfp_t allocation){	struct connection *con;	down(&connections_lock);	con = __nodeid2con(nodeid, allocation);	up(&connections_lock);	return con;}/* This is a bit drastic, but only called when things go wrong */static struct connection *assoc2con(int assoc_id){	int i;	struct connection *con;	down(&connections_lock);	for (i=0; i<=max_nodeid; i++) {		con = __nodeid2con(i, 0);		if (con && con->sctp_assoc == assoc_id) {			up(&connections_lock);			return con;		}	}	up(&connections_lock);	return NULL;}static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr){	struct sockaddr_storage addr;	int error;	if (!dlm_local_count)		return -1;	error = dlm_nodeid_to_addr(nodeid, &addr);	if (error)		return error;	if (dlm_local_addr[0]->ss_family == AF_INET) {		struct sockaddr_in *in4  = (struct sockaddr_in *) &addr;		struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr;		ret4->sin_addr.s_addr = in4->sin_addr.s_addr;	} else {		struct sockaddr_in6 *in6  = (struct sockaddr_in6 *) &addr;		struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr;		memcpy(&ret6->sin6_addr, &in6->sin6_addr,		       sizeof(in6->sin6_addr));	}	return 0;}/* Data available on socket or listen socket received a connect */static void lowcomms_data_ready(struct sock *sk, int count_unused){	struct connection *con = sock2con(sk);	if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))		queue_work(recv_workqueue, &con->rwork);}static void lowcomms_write_space(struct sock *sk){	struct connection *con = sock2con(sk);	if (con && !test_and_set_bit(CF_WRITE_PENDING, &con->flags))		queue_work(send_workqueue, &con->swork);}static inline void lowcomms_connect_sock(struct connection *con){	if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags))		queue_work(send_workqueue, &con->swork);}static void lowcomms_state_change(struct sock *sk){	if (sk->sk_state == TCP_ESTABLISHED)		lowcomms_write_space(sk);}/* Make a socket active */static int add_sock(struct socket *sock, struct connection *con){	con->sock = sock;	/* Install a data_ready callback */	con->sock->sk->sk_data_ready = lowcomms_data_ready;	con->sock->sk->sk_write_space = lowcomms_write_space;	con->sock->sk->sk_state_change = lowcomms_state_change;	con->sock->sk->sk_user_data = con;	return 0;}/* Add the port number to an IPv6 or 4 sockaddr and return the address   length */static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,			  int *addr_len){	saddr->ss_family =  dlm_local_addr[0]->ss_family;	if (saddr->ss_family == AF_INET) {		struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;		in4_addr->sin_port = cpu_to_be16(port);		*addr_len = sizeof(struct sockaddr_in);		memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));	} else {		struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;		in6_addr->sin6_port = cpu_to_be16(port);		*addr_len = sizeof(struct sockaddr_in6);	}	memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);}/* Close a remote connection and tidy up */static void close_connection(struct connection *con, bool and_other){	mutex_lock(&con->sock_mutex);	if (con->sock) {		sock_release(con->sock);		con->sock = NULL;	}	if (con->othercon && and_other) {		/* Will only re-enter once. */		close_connection(con->othercon, false);	}	if (con->rx_page) {		__free_page(con->rx_page);		con->rx_page = NULL;	}	con->retries = 0;	mutex_unlock(&con->sock_mutex);}/* We only send shutdown messages to nodes that are not part of the cluster */static void sctp_send_shutdown(sctp_assoc_t associd){	static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];	struct msghdr outmessage;	struct cmsghdr *cmsg;	struct sctp_sndrcvinfo *sinfo;	int ret;	struct connection *con;	con = nodeid2con(0,0);	BUG_ON(con == NULL);	outmessage.msg_name = NULL;	outmessage.msg_namelen = 0;	outmessage.msg_control = outcmsg;	outmessage.msg_controllen = sizeof(outcmsg);	outmessage.msg_flags = MSG_EOR;	cmsg = CMSG_FIRSTHDR(&outmessage);	cmsg->cmsg_level = IPPROTO_SCTP;	cmsg->cmsg_type = SCTP_SNDRCV;	cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));	outmessage.msg_controllen = cmsg->cmsg_len;	sinfo = CMSG_DATA(cmsg);	memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));	sinfo->sinfo_flags |= MSG_EOF;	sinfo->sinfo_assoc_id = associd;	ret = kernel_sendmsg(con->sock, &outmessage, NULL, 0, 0);	if (ret != 0)		log_print("send EOF to node failed: %d", ret);}/* INIT failed but we don't know which node...   restart INIT on all pending nodes */static void sctp_init_failed(void){	int i;	struct connection *con;	down(&connections_lock);	for (i=1; i<=max_nodeid; i++) {		con = __nodeid2con(i, 0);		if (!con)			continue;		con->sctp_assoc = 0;		if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {			if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {				queue_work(send_workqueue, &con->swork);			}		}	}	up(&connections_lock);}/* Something happened to an association */static void process_sctp_notification(struct connection *con,				      struct msghdr *msg, char *buf){	union sctp_notification *sn = (union sctp_notification *)buf;	if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) {		switch (sn->sn_assoc_change.sac_state) {		case SCTP_COMM_UP:		case SCTP_RESTART:		{			/* Check that the new node is in the lockspace */			struct sctp_prim prim;			int nodeid;			int prim_len, ret;			int addr_len;			struct connection *new_con;			struct file *file;			sctp_peeloff_arg_t parg;			int parglen = sizeof(parg);			/*			 * We get this before any data for an association.			 * We verify that the node is in the cluster and			 * then peel off a socket for it.			 */			if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {				log_print("COMM_UP for invalid assoc ID %d",					 (int)sn->sn_assoc_change.sac_assoc_id);				sctp_init_failed();				return;			}			memset(&prim, 0, sizeof(struct sctp_prim));			prim_len = sizeof(struct sctp_prim);			prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id;			ret = kernel_getsockopt(con->sock,						IPPROTO_SCTP,						SCTP_PRIMARY_ADDR,						(char*)&prim,						&prim_len);			if (ret < 0) {				log_print("getsockopt/sctp_primary_addr on "					  "new assoc %d failed : %d",					  (int)sn->sn_assoc_change.sac_assoc_id,					  ret);				/* Retry INIT later */				new_con = assoc2con(sn->sn_assoc_change.sac_assoc_id);				if (new_con)					clear_bit(CF_CONNECT_PENDING, &con->flags);				return;			}			make_sockaddr(&prim.ssp_addr, 0, &addr_len);			if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) {				int i;				unsigned char *b=(unsigned char *)&prim.ssp_addr;				log_print("reject connect from unknown addr");				for (i=0; i<sizeof(struct sockaddr_storage);i++)					printk("%02x ", b[i]);				printk("\n");				sctp_send_shutdown(prim.ssp_assoc_id);				return;			}			new_con = nodeid2con(nodeid, GFP_KERNEL);			if (!new_con)				return;			/* Peel off a new sock */			parg.associd = sn->sn_assoc_change.sac_assoc_id;			ret = kernel_getsockopt(con->sock, IPPROTO_SCTP,						SCTP_SOCKOPT_PEELOFF,						(void *)&parg, &parglen);			if (ret) {				log_print("Can't peel off a socket for "					  "connection %d to node %d: err=%d\n",					  parg.associd, nodeid, ret);			}			file = fget(parg.sd);			new_con->sock = SOCKET_I(file->f_dentry->d_inode);			add_sock(new_con->sock, new_con);			fput(file);			put_unused_fd(parg.sd);			log_print("got new/restarted association %d nodeid %d",				 (int)sn->sn_assoc_change.sac_assoc_id, nodeid);			/* Send any pending writes */			clear_bit(CF_CONNECT_PENDING, &new_con->flags);			clear_bit(CF_INIT_PENDING, &con->flags);			if (!test_and_set_bit(CF_WRITE_PENDING, &new_con->flags)) {				queue_work(send_workqueue, &new_con->swork);			}			if (!test_and_set_bit(CF_READ_PENDING, &new_con->flags))

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -