📄 lowcomms.c
字号:
return sock;}/* Get local addresses */static void init_local(void){ struct sockaddr_storage sas, *addr; int i; dlm_local_count = 0; for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) { if (dlm_our_addr(&sas, i)) break; addr = kmalloc(sizeof(*addr), GFP_KERNEL); if (!addr) break; memcpy(addr, &sas, sizeof(*addr)); dlm_local_addr[dlm_local_count++] = addr; }}/* Bind to an IP address. SCTP allows multiple address so it can do multi-homing */static int add_sctp_bind_addr(struct connection *sctp_con, struct sockaddr_storage *addr, int addr_len, int num){ int result = 0; if (num == 1) result = kernel_bind(sctp_con->sock, (struct sockaddr *) addr, addr_len); else result = kernel_setsockopt(sctp_con->sock, SOL_SCTP, SCTP_SOCKOPT_BINDX_ADD, (char *)addr, addr_len); if (result < 0) log_print("Can't bind to port %d addr number %d", dlm_config.ci_tcp_port, num); return result;}/* Initialise SCTP socket and bind to all interfaces */static int sctp_listen_for_all(void){ struct socket *sock = NULL; struct sockaddr_storage localaddr; struct sctp_event_subscribe subscribe; int result = -EINVAL, num = 1, i, addr_len; struct connection *con = nodeid2con(0, GFP_KERNEL); int bufsize = NEEDED_RMEM; if (!con) return -ENOMEM; log_print("Using SCTP for communications"); result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET, IPPROTO_SCTP, &sock); if (result < 0) { log_print("Can't create comms socket, check SCTP is loaded"); goto out; } /* Listen for events */ memset(&subscribe, 0, sizeof(subscribe)); subscribe.sctp_data_io_event = 1; subscribe.sctp_association_event = 1; subscribe.sctp_send_failure_event = 1; subscribe.sctp_shutdown_event = 1; subscribe.sctp_partial_delivery_event = 1; result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE, (char *)&bufsize, sizeof(bufsize)); if (result) log_print("Error increasing buffer space on socket %d", result); result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS, (char *)&subscribe, sizeof(subscribe)); if (result < 0) { log_print("Failed to set SCTP_EVENTS on socket: result=%d", result); goto create_delsock; } /* Init con struct */ sock->sk->sk_user_data = con; con->sock = sock; con->sock->sk->sk_data_ready = lowcomms_data_ready; con->rx_action = receive_from_sock; con->connect_action = sctp_init_assoc; /* Bind to all interfaces. */ for (i = 0; i < dlm_local_count; i++) { memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr)); make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len); result = add_sctp_bind_addr(con, &localaddr, addr_len, num); if (result) goto create_delsock; ++num; } result = sock->ops->listen(sock, 5); if (result < 0) { log_print("Can't set socket listening"); goto create_delsock; } return 0;create_delsock: sock_release(sock); con->sock = NULL;out: return result;}static int tcp_listen_for_all(void){ struct socket *sock = NULL; struct connection *con = nodeid2con(0, GFP_KERNEL); int result = -EINVAL; if (!con) return -ENOMEM; /* We don't support multi-homed hosts */ if (dlm_local_addr[1] != NULL) { log_print("TCP protocol can't handle multi-homed hosts, " "try SCTP"); return -EINVAL; } log_print("Using TCP for communications"); sock = tcp_create_listen_sock(con, dlm_local_addr[0]); if (sock) { add_sock(sock, con); result = 0; } else { result = -EADDRINUSE; } return result;}static struct writequeue_entry *new_writequeue_entry(struct connection *con, gfp_t allocation){ struct writequeue_entry *entry; entry = kmalloc(sizeof(struct writequeue_entry), allocation); if (!entry) return NULL; entry->page = alloc_page(allocation); if (!entry->page) { kfree(entry); return NULL; } entry->offset = 0; entry->len = 0; entry->end = 0; entry->users = 0; entry->con = con; return entry;}void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc){ struct connection *con; struct writequeue_entry *e; int offset = 0; int users = 0; con = nodeid2con(nodeid, allocation); if (!con) return NULL; spin_lock(&con->writequeue_lock); e = list_entry(con->writequeue.prev, struct writequeue_entry, list); if ((&e->list == &con->writequeue) || (PAGE_CACHE_SIZE - e->end < len)) { e = NULL; } else { offset = e->end; e->end += len; users = e->users++; } spin_unlock(&con->writequeue_lock); if (e) { got_one: if (users == 0) kmap(e->page); *ppc = page_address(e->page) + offset; return e; } e = new_writequeue_entry(con, allocation); if (e) { spin_lock(&con->writequeue_lock); offset = e->end; e->end += len; users = e->users++; list_add_tail(&e->list, &con->writequeue); spin_unlock(&con->writequeue_lock); goto got_one; } return NULL;}void dlm_lowcomms_commit_buffer(void *mh){ struct writequeue_entry *e = (struct writequeue_entry *)mh; struct connection *con = e->con; int users; spin_lock(&con->writequeue_lock); users = --e->users; if (users) goto out; e->len = e->end - e->offset; kunmap(e->page); spin_unlock(&con->writequeue_lock); if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) { queue_work(send_workqueue, &con->swork); } return;out: spin_unlock(&con->writequeue_lock); return;}/* Send a message */static void send_to_sock(struct connection *con){ int ret = 0; ssize_t(*sendpage) (struct socket *, struct page *, int, size_t, int); const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; struct writequeue_entry *e; int len, offset; mutex_lock(&con->sock_mutex); if (con->sock == NULL) goto out_connect; sendpage = con->sock->ops->sendpage; spin_lock(&con->writequeue_lock); for (;;) { e = list_entry(con->writequeue.next, struct writequeue_entry, list); if ((struct list_head *) e == &con->writequeue) break; len = e->len; offset = e->offset; BUG_ON(len == 0 && e->users == 0); spin_unlock(&con->writequeue_lock); kmap(e->page); ret = 0; if (len) { ret = sendpage(con->sock, e->page, offset, len, msg_flags); if (ret == -EAGAIN || ret == 0) { cond_resched(); goto out; } if (ret <= 0) goto send_error; } /* Don't starve people filling buffers */ cond_resched(); spin_lock(&con->writequeue_lock); e->offset += ret; e->len -= ret; if (e->len == 0 && e->users == 0) { list_del(&e->list); kunmap(e->page); free_entry(e); continue; } } spin_unlock(&con->writequeue_lock);out: mutex_unlock(&con->sock_mutex); return;send_error: mutex_unlock(&con->sock_mutex); close_connection(con, false); lowcomms_connect_sock(con); return;out_connect: mutex_unlock(&con->sock_mutex); if (!test_bit(CF_INIT_PENDING, &con->flags)) lowcomms_connect_sock(con); return;}static void clean_one_writequeue(struct connection *con){ struct list_head *list; struct list_head *temp; spin_lock(&con->writequeue_lock); list_for_each_safe(list, temp, &con->writequeue) { struct writequeue_entry *e = list_entry(list, struct writequeue_entry, list); list_del(&e->list); free_entry(e); } spin_unlock(&con->writequeue_lock);}/* Called from recovery when it knows that a node has left the cluster */int dlm_lowcomms_close(int nodeid){ struct connection *con; log_print("closing connection to node %d", nodeid); con = nodeid2con(nodeid, 0); if (con) { clean_one_writequeue(con); close_connection(con, true); } return 0;}/* Receive workqueue function */static void process_recv_sockets(struct work_struct *work){ struct connection *con = container_of(work, struct connection, rwork); int err; clear_bit(CF_READ_PENDING, &con->flags); do { err = con->rx_action(con); } while (!err);}/* Send workqueue function */static void process_send_sockets(struct work_struct *work){ struct connection *con = container_of(work, struct connection, swork); if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { con->connect_action(con); } clear_bit(CF_WRITE_PENDING, &con->flags); send_to_sock(con);}/* Discard all entries on the write queues */static void clean_writequeues(void){ int nodeid; for (nodeid = 1; nodeid <= max_nodeid; nodeid++) { struct connection *con = __nodeid2con(nodeid, 0); if (con) clean_one_writequeue(con); }}static void work_stop(void){ destroy_workqueue(recv_workqueue); destroy_workqueue(send_workqueue);}static int work_start(void){ int error; recv_workqueue = create_workqueue("dlm_recv"); error = IS_ERR(recv_workqueue); if (error) { log_print("can't start dlm_recv %d", error); return error; } send_workqueue = create_singlethread_workqueue("dlm_send"); error = IS_ERR(send_workqueue); if (error) { log_print("can't start dlm_send %d", error); destroy_workqueue(recv_workqueue); return error; } return 0;}void dlm_lowcomms_stop(void){ int i; struct connection *con; /* Set all the flags to prevent any socket activity. */ down(&connections_lock); for (i = 0; i <= max_nodeid; i++) { con = __nodeid2con(i, 0); if (con) { con->flags |= 0x0F; if (con->sock) con->sock->sk->sk_user_data = NULL; } } up(&connections_lock); work_stop(); down(&connections_lock); clean_writequeues(); for (i = 0; i <= max_nodeid; i++) { con = __nodeid2con(i, 0); if (con) { close_connection(con, true); kmem_cache_free(con_cache, con); } } max_nodeid = 0; up(&connections_lock); kmem_cache_destroy(con_cache); idr_init(&connections_idr);}int dlm_lowcomms_start(void){ int error = -EINVAL; struct connection *con; init_local(); if (!dlm_local_count) { error = -ENOTCONN; log_print("no local IP address has been set"); goto out; } error = -ENOMEM; con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection), __alignof__(struct connection), 0, NULL); if (!con_cache) goto out; /* Start listening */ if (dlm_config.ci_protocol == 0) error = tcp_listen_for_all(); else error = sctp_listen_for_all(); if (error) goto fail_unlisten; error = work_start(); if (error) goto fail_unlisten; return 0;fail_unlisten: con = nodeid2con(0,0); if (con) { close_connection(con, false); kmem_cache_free(con_cache, con); } kmem_cache_destroy(con_cache);out: return error;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -