📄 acceptor.c
字号:
"%u.%u.%u.%u\n", libcfs_nid2str(blind_ni->ni_nid), HIPQUAD(peer_ip)); goto failed; } continue; } rc = libcfs_sock_read(newsock, &magic, sizeof(magic), accept_timeout); if (rc != 0) { CERROR("Error %d reading connection request from " "%u.%u.%u.%u\n", rc, HIPQUAD(peer_ip)); goto failed; } rc = lnet_accept(NULL, newsock, magic); if (rc != 0) goto failed; continue; failed: libcfs_sock_release(newsock); } libcfs_sock_release(lnet_acceptor_state.pta_sock); lnet_acceptor_state.pta_sock = NULL; if (blind_ni != NULL) lnet_ni_decref(blind_ni); LCONSOLE(0,"Acceptor stopping\n"); /* unblock lnet_acceptor_stop() */ mutex_up(&lnet_acceptor_state.pta_signal); return 0;}intlnet_acceptor_start(void){ long pid; long secure; LASSERT (lnet_acceptor_state.pta_sock == NULL); init_mutex_locked(&lnet_acceptor_state.pta_signal); if (!strcmp(accept, "secure")) { secure = 1; } else if (!strcmp(accept, "all")) { secure = 0; } else if (!strcmp(accept, "none")) { return 0; } else { LCONSOLE_ERROR_MSG(0x124, "Can't parse 'accept=\"%s\"'\n", accept); return -EINVAL; } if (lnet_count_acceptor_nis(NULL) == 0) /* not required */ return 0; pid = cfs_kernel_thread(lnet_acceptor, (void *)secure, 0); if (pid < 0) { CERROR("Can't start acceptor thread: %ld\n", pid); return -ESRCH; } mutex_down(&lnet_acceptor_state.pta_signal); /* wait for acceptor to startup */ if (!lnet_acceptor_state.pta_shutdown) { /* started OK */ LASSERT (lnet_acceptor_state.pta_sock != NULL); return 0; } LASSERT (lnet_acceptor_state.pta_sock == NULL); return -ENETDOWN;}voidlnet_acceptor_stop(void){ if (lnet_acceptor_state.pta_sock == NULL) /* not running */ return; lnet_acceptor_state.pta_shutdown = 1; libcfs_sock_abort_accept(lnet_acceptor_state.pta_sock); /* block until acceptor signals exit */ mutex_down(&lnet_acceptor_state.pta_signal);}#else /* __KERNEL__ */#ifdef HAVE_LIBPTHREADstatic char *accept_type;static int accept_port = 988;static int accept_backlog;static int accept_timeout;struct { int pta_shutdown; int pta_sock; struct cfs_completion pta_completion;} lnet_acceptor_state;intlnet_acceptor_port(void){ return accept_port;}intlnet_parse_int_tunable(int *value, char *name, int dflt){ char *env = getenv(name); char *end; if (env == NULL) { *value = dflt; return 0; } *value = strtoull(env, &end, 0); if (*end == 0) return 0; CERROR("Can't parse tunable %s=%s\n", name, env); return -EINVAL;}intlnet_parse_string_tunable(char **value, char *name, char *dflt){ char *env = getenv(name); if (env == NULL) *value = dflt; else *value = env; return 0;}intlnet_acceptor_get_tunables(){ int rc; rc = lnet_parse_string_tunable(&accept_type, "LNET_ACCEPT", "secure"); if (rc != 0) return rc; rc = lnet_parse_int_tunable(&accept_port, "LNET_ACCEPT_PORT", 988); if (rc != 0) return rc; rc = lnet_parse_int_tunable(&accept_backlog, "LNET_ACCEPT_BACKLOG", 127); if (rc != 0) return rc; rc = lnet_parse_int_tunable(&accept_timeout, "LNET_ACCEPT_TIMEOUT", 5); if (rc != 0) return rc; CDEBUG(D_NET, "accept_type = %s\n", accept_type); CDEBUG(D_NET, "accept_port = %d\n", accept_port); CDEBUG(D_NET, "accept_backlog = %d\n", accept_backlog); CDEBUG(D_NET, "accept_timeout = %d\n", accept_timeout); return 0;}static inline intlnet_accept_magic(__u32 magic, __u32 constant){ return (magic == constant || magic == __swab32(constant));}/* user-land lnet_accept() isn't used by any LND's directly. So, we don't * do it visible outside acceptor.c and we can change its prototype * freely */static intlnet_accept(int sock, __u32 magic, __u32 peer_ip, int peer_port){ int rc, flip; lnet_acceptor_connreq_t cr; lnet_ni_t *ni; if (!lnet_accept_magic(magic, LNET_PROTO_ACCEPTOR_MAGIC)) { LCONSOLE_ERROR("Refusing connection from %u.%u.%u.%u magic %08x: " "unsupported acceptor protocol\n", HIPQUAD(peer_ip), magic); return -EPROTO; } flip = (magic != LNET_PROTO_ACCEPTOR_MAGIC); rc = libcfs_sock_read(sock, &cr.acr_version, sizeof(cr.acr_version), accept_timeout); if (rc != 0) { CERROR("Error %d reading connection request version from " "%u.%u.%u.%u\n", rc, HIPQUAD(peer_ip)); return -EIO; } if (flip) __swab32s(&cr.acr_version); if (cr.acr_version != LNET_PROTO_ACCEPTOR_VERSION) return -EPROTO; rc = libcfs_sock_read(sock, &cr.acr_nid, sizeof(cr) - offsetof(lnet_acceptor_connreq_t, acr_nid), accept_timeout); if (rc != 0) { CERROR("Error %d reading connection request from " "%u.%u.%u.%u\n", rc, HIPQUAD(peer_ip)); return -EIO; } if (flip) __swab64s(&cr.acr_nid); ni = lnet_net2ni(LNET_NIDNET(cr.acr_nid)); if (ni == NULL || /* no matching net */ ni->ni_nid != cr.acr_nid) { /* right NET, wrong NID! */ if (ni != NULL) lnet_ni_decref(ni); LCONSOLE_ERROR("Refusing connection from %u.%u.%u.%u for %s: " " No matching NI\n", HIPQUAD(peer_ip), libcfs_nid2str(cr.acr_nid)); return -EPERM; } if (ni->ni_lnd->lnd_accept == NULL) { lnet_ni_decref(ni); LCONSOLE_ERROR("Refusing connection from %u.%u.%u.%u for %s: " " NI doesn not accept IP connections\n", HIPQUAD(peer_ip), libcfs_nid2str(cr.acr_nid)); return -EPERM; } CDEBUG(D_NET, "Accept %s from %u.%u.%u.%u\n", libcfs_nid2str(cr.acr_nid), HIPQUAD(peer_ip)); rc = ni->ni_lnd->lnd_accept(ni, sock); lnet_ni_decref(ni); return rc;}intlnet_acceptor(void *arg){ char name[16]; int secure = (int)((unsigned long)arg); int rc; int newsock; __u32 peer_ip; int peer_port; __u32 magic; snprintf(name, sizeof(name), "acceptor_%03d", accept_port); cfs_daemonize(name); cfs_block_allsigs(); rc = libcfs_sock_listen(&lnet_acceptor_state.pta_sock, 0, accept_port, accept_backlog); if (rc != 0) { if (rc == -EADDRINUSE) LCONSOLE_ERROR("Can't start acceptor on port %d: " "port already in use\n", accept_port); else LCONSOLE_ERROR("Can't start acceptor on port %d: " "unexpected error %d\n", accept_port, rc); } else { LCONSOLE(0, "Accept %s, port %d\n", accept_type, accept_port); } /* set init status and unblock parent */ lnet_acceptor_state.pta_shutdown = rc; cfs_complete(&lnet_acceptor_state.pta_completion); if (rc != 0) return rc; while (!lnet_acceptor_state.pta_shutdown) { rc = libcfs_sock_accept(&newsock, lnet_acceptor_state.pta_sock, &peer_ip, &peer_port); if (rc != 0) continue; /* maybe we're waken up with libcfs_sock_abort_accept() */ if (lnet_acceptor_state.pta_shutdown) { close(newsock); break; } if (secure && peer_port > LNET_ACCEPTOR_MAX_RESERVED_PORT) { CERROR("Refusing connection from %u.%u.%u.%u: " "insecure port %d\n", HIPQUAD(peer_ip), peer_port); goto failed; } rc = libcfs_sock_read(newsock, &magic, sizeof(magic), accept_timeout); if (rc != 0) { CERROR("Error %d reading connection request from " "%u.%u.%u.%u\n", rc, HIPQUAD(peer_ip)); goto failed; } rc = lnet_accept(newsock, magic, peer_ip, peer_port); if (rc != 0) goto failed; continue; failed: close(newsock); } close(lnet_acceptor_state.pta_sock); LCONSOLE(0,"Acceptor stopping\n"); /* unblock lnet_acceptor_stop() */ cfs_complete(&lnet_acceptor_state.pta_completion); return 0;}static int skip_waiting_for_completion;intlnet_acceptor_start(void){ long secure; int rc; rc = lnet_acceptor_get_tunables(); if (rc != 0) return rc; /* Do nothing if we're liblustre clients */ if ((the_lnet.ln_pid & LNET_PID_USERFLAG) != 0) return 0; cfs_init_completion(&lnet_acceptor_state.pta_completion); if (!strcmp(accept_type, "secure")) { secure = 1; } else if (!strcmp(accept_type, "all")) { secure = 0; } else if (!strcmp(accept_type, "none")) { skip_waiting_for_completion = 1; return 0; } else { LCONSOLE_ERROR ("Can't parse 'accept_type=\"%s\"'\n", accept_type); cfs_fini_completion(&lnet_acceptor_state.pta_completion); return -EINVAL; } if (lnet_count_acceptor_nis(NULL) == 0) { /* not required */ skip_waiting_for_completion = 1; return 0; } rc = cfs_create_thread(lnet_acceptor, (void *)secure); if (rc != 0) { CERROR("Can't start acceptor thread: %d\n", rc); cfs_fini_completion(&lnet_acceptor_state.pta_completion); return rc; } /* wait for acceptor to startup */ cfs_wait_for_completion(&lnet_acceptor_state.pta_completion); if (!lnet_acceptor_state.pta_shutdown) return 0; cfs_fini_completion(&lnet_acceptor_state.pta_completion); return -ENETDOWN;}voidlnet_acceptor_stop(void){ /* Do nothing if we're liblustre clients */ if ((the_lnet.ln_pid & LNET_PID_USERFLAG) != 0) return; if (!skip_waiting_for_completion) { lnet_acceptor_state.pta_shutdown = 1; libcfs_sock_abort_accept(accept_port); /* block until acceptor signals exit */ cfs_wait_for_completion(&lnet_acceptor_state.pta_completion); } cfs_fini_completion(&lnet_acceptor_state.pta_completion);}#elseintlnet_acceptor_start(void){ return 0;}voidlnet_acceptor_stop(void){}#endif /* !HAVE_LIBPTHREAD */#endif /* !__KERNEL__ */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -