⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 lowcomms.c

📁 linux 内核源代码
💻 C
📖 第 1 页 / 共 3 页
字号:
	return sock;}/* Get local addresses */static void init_local(void){	struct sockaddr_storage sas, *addr;	int i;	dlm_local_count = 0;	for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) {		if (dlm_our_addr(&sas, i))			break;		addr = kmalloc(sizeof(*addr), GFP_KERNEL);		if (!addr)			break;		memcpy(addr, &sas, sizeof(*addr));		dlm_local_addr[dlm_local_count++] = addr;	}}/* Bind to an IP address. SCTP allows multiple address so it can do   multi-homing */static int add_sctp_bind_addr(struct connection *sctp_con,			      struct sockaddr_storage *addr,			      int addr_len, int num){	int result = 0;	if (num == 1)		result = kernel_bind(sctp_con->sock,				     (struct sockaddr *) addr,				     addr_len);	else		result = kernel_setsockopt(sctp_con->sock, SOL_SCTP,					   SCTP_SOCKOPT_BINDX_ADD,					   (char *)addr, addr_len);	if (result < 0)		log_print("Can't bind to port %d addr number %d",			  dlm_config.ci_tcp_port, num);	return result;}/* Initialise SCTP socket and bind to all interfaces */static int sctp_listen_for_all(void){	struct socket *sock = NULL;	struct sockaddr_storage localaddr;	struct sctp_event_subscribe subscribe;	int result = -EINVAL, num = 1, i, addr_len;	struct connection *con = nodeid2con(0, GFP_KERNEL);	int bufsize = NEEDED_RMEM;	if (!con)		return -ENOMEM;	log_print("Using SCTP for communications");	result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET,				  IPPROTO_SCTP, &sock);	if (result < 0) {		log_print("Can't create comms socket, check SCTP is loaded");		goto out;	}	/* Listen for events */	memset(&subscribe, 0, sizeof(subscribe));	subscribe.sctp_data_io_event = 1;	subscribe.sctp_association_event = 1;	subscribe.sctp_send_failure_event = 1;	subscribe.sctp_shutdown_event = 1;	subscribe.sctp_partial_delivery_event = 1;	result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE,				 (char *)&bufsize, sizeof(bufsize));	if (result)		log_print("Error increasing buffer space on socket %d", result);	result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS,				   (char *)&subscribe, sizeof(subscribe));	if (result < 0) {		log_print("Failed to set SCTP_EVENTS on socket: result=%d",			  result);		goto create_delsock;	}	/* Init con struct */	sock->sk->sk_user_data = con;	con->sock = sock;	con->sock->sk->sk_data_ready = lowcomms_data_ready;	con->rx_action = receive_from_sock;	con->connect_action = sctp_init_assoc;	/* Bind to all interfaces. */	for (i = 0; i < dlm_local_count; i++) {		memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));		make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len);		result = add_sctp_bind_addr(con, &localaddr, addr_len, num);		if (result)			goto create_delsock;		++num;	}	result = sock->ops->listen(sock, 5);	if (result < 0) {		log_print("Can't set socket listening");		goto create_delsock;	}	return 0;create_delsock:	sock_release(sock);	con->sock = NULL;out:	return result;}static int tcp_listen_for_all(void){	struct socket *sock = NULL;	struct connection *con = nodeid2con(0, GFP_KERNEL);	int result = -EINVAL;	if (!con)		return -ENOMEM;	/* We don't support multi-homed hosts */	if (dlm_local_addr[1] != NULL) {		log_print("TCP protocol can't handle multi-homed hosts, "			  "try SCTP");		return -EINVAL;	}	log_print("Using TCP for communications");	sock = tcp_create_listen_sock(con, dlm_local_addr[0]);	if (sock) {		add_sock(sock, con);		result = 0;	}	else {		result = -EADDRINUSE;	}	return result;}static struct writequeue_entry *new_writequeue_entry(struct connection *con,						     gfp_t allocation){	struct writequeue_entry *entry;	entry = kmalloc(sizeof(struct writequeue_entry), allocation);	if (!entry)		return NULL;	entry->page = alloc_page(allocation);	if (!entry->page) {		kfree(entry);		return NULL;	}	entry->offset = 0;	entry->len = 0;	entry->end = 0;	entry->users = 0;	entry->con = con;	return entry;}void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc){	struct connection *con;	struct writequeue_entry *e;	int offset = 0;	int users = 0;	con = nodeid2con(nodeid, allocation);	if (!con)		return NULL;	spin_lock(&con->writequeue_lock);	e = list_entry(con->writequeue.prev, struct writequeue_entry, list);	if ((&e->list == &con->writequeue) ||	    (PAGE_CACHE_SIZE - e->end < len)) {		e = NULL;	} else {		offset = e->end;		e->end += len;		users = e->users++;	}	spin_unlock(&con->writequeue_lock);	if (e) {	got_one:		if (users == 0)			kmap(e->page);		*ppc = page_address(e->page) + offset;		return e;	}	e = new_writequeue_entry(con, allocation);	if (e) {		spin_lock(&con->writequeue_lock);		offset = e->end;		e->end += len;		users = e->users++;		list_add_tail(&e->list, &con->writequeue);		spin_unlock(&con->writequeue_lock);		goto got_one;	}	return NULL;}void dlm_lowcomms_commit_buffer(void *mh){	struct writequeue_entry *e = (struct writequeue_entry *)mh;	struct connection *con = e->con;	int users;	spin_lock(&con->writequeue_lock);	users = --e->users;	if (users)		goto out;	e->len = e->end - e->offset;	kunmap(e->page);	spin_unlock(&con->writequeue_lock);	if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {		queue_work(send_workqueue, &con->swork);	}	return;out:	spin_unlock(&con->writequeue_lock);	return;}/* Send a message */static void send_to_sock(struct connection *con){	int ret = 0;	ssize_t(*sendpage) (struct socket *, struct page *, int, size_t, int);	const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;	struct writequeue_entry *e;	int len, offset;	mutex_lock(&con->sock_mutex);	if (con->sock == NULL)		goto out_connect;	sendpage = con->sock->ops->sendpage;	spin_lock(&con->writequeue_lock);	for (;;) {		e = list_entry(con->writequeue.next, struct writequeue_entry,			       list);		if ((struct list_head *) e == &con->writequeue)			break;		len = e->len;		offset = e->offset;		BUG_ON(len == 0 && e->users == 0);		spin_unlock(&con->writequeue_lock);		kmap(e->page);		ret = 0;		if (len) {			ret = sendpage(con->sock, e->page, offset, len,				       msg_flags);			if (ret == -EAGAIN || ret == 0) {				cond_resched();				goto out;			}			if (ret <= 0)				goto send_error;		}			/* Don't starve people filling buffers */			cond_resched();		spin_lock(&con->writequeue_lock);		e->offset += ret;		e->len -= ret;		if (e->len == 0 && e->users == 0) {			list_del(&e->list);			kunmap(e->page);			free_entry(e);			continue;		}	}	spin_unlock(&con->writequeue_lock);out:	mutex_unlock(&con->sock_mutex);	return;send_error:	mutex_unlock(&con->sock_mutex);	close_connection(con, false);	lowcomms_connect_sock(con);	return;out_connect:	mutex_unlock(&con->sock_mutex);	if (!test_bit(CF_INIT_PENDING, &con->flags))		lowcomms_connect_sock(con);	return;}static void clean_one_writequeue(struct connection *con){	struct list_head *list;	struct list_head *temp;	spin_lock(&con->writequeue_lock);	list_for_each_safe(list, temp, &con->writequeue) {		struct writequeue_entry *e =			list_entry(list, struct writequeue_entry, list);		list_del(&e->list);		free_entry(e);	}	spin_unlock(&con->writequeue_lock);}/* Called from recovery when it knows that a node has   left the cluster */int dlm_lowcomms_close(int nodeid){	struct connection *con;	log_print("closing connection to node %d", nodeid);	con = nodeid2con(nodeid, 0);	if (con) {		clean_one_writequeue(con);		close_connection(con, true);	}	return 0;}/* Receive workqueue function */static void process_recv_sockets(struct work_struct *work){	struct connection *con = container_of(work, struct connection, rwork);	int err;	clear_bit(CF_READ_PENDING, &con->flags);	do {		err = con->rx_action(con);	} while (!err);}/* Send workqueue function */static void process_send_sockets(struct work_struct *work){	struct connection *con = container_of(work, struct connection, swork);	if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {		con->connect_action(con);	}	clear_bit(CF_WRITE_PENDING, &con->flags);	send_to_sock(con);}/* Discard all entries on the write queues */static void clean_writequeues(void){	int nodeid;	for (nodeid = 1; nodeid <= max_nodeid; nodeid++) {		struct connection *con = __nodeid2con(nodeid, 0);		if (con)			clean_one_writequeue(con);	}}static void work_stop(void){	destroy_workqueue(recv_workqueue);	destroy_workqueue(send_workqueue);}static int work_start(void){	int error;	recv_workqueue = create_workqueue("dlm_recv");	error = IS_ERR(recv_workqueue);	if (error) {		log_print("can't start dlm_recv %d", error);		return error;	}	send_workqueue = create_singlethread_workqueue("dlm_send");	error = IS_ERR(send_workqueue);	if (error) {		log_print("can't start dlm_send %d", error);		destroy_workqueue(recv_workqueue);		return error;	}	return 0;}void dlm_lowcomms_stop(void){	int i;	struct connection *con;	/* Set all the flags to prevent any	   socket activity.	*/	down(&connections_lock);	for (i = 0; i <= max_nodeid; i++) {		con = __nodeid2con(i, 0);		if (con) {			con->flags |= 0x0F;			if (con->sock)				con->sock->sk->sk_user_data = NULL;		}	}	up(&connections_lock);	work_stop();	down(&connections_lock);	clean_writequeues();	for (i = 0; i <= max_nodeid; i++) {		con = __nodeid2con(i, 0);		if (con) {			close_connection(con, true);			kmem_cache_free(con_cache, con);		}	}	max_nodeid = 0;	up(&connections_lock);	kmem_cache_destroy(con_cache);	idr_init(&connections_idr);}int dlm_lowcomms_start(void){	int error = -EINVAL;	struct connection *con;	init_local();	if (!dlm_local_count) {		error = -ENOTCONN;		log_print("no local IP address has been set");		goto out;	}	error = -ENOMEM;	con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),				      __alignof__(struct connection), 0,				      NULL);	if (!con_cache)		goto out;	/* Start listening */	if (dlm_config.ci_protocol == 0)		error = tcp_listen_for_all();	else		error = sctp_listen_for_all();	if (error)		goto fail_unlisten;	error = work_start();	if (error)		goto fail_unlisten;	return 0;fail_unlisten:	con = nodeid2con(0,0);	if (con) {		close_connection(con, false);		kmem_cache_free(con_cache, con);	}	kmem_cache_destroy(con_cache);out:	return error;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -