ei_connect.c
来自「OTP是开放电信平台的简称」· C语言 代码 · 共 1,724 行 · 第 1/3 页
C
1,724 行
#endif /* !win32 */ /* extract the host and alive parts from nodename */ if (!(hostname = strchr(nodename,'@'))) { EI_TRACE_ERR0("ei_connect","Node name has no @ in name"); return ERL_ERROR; } else { strncpy(alivename, nodename, hostname - nodename); alivename[hostname - nodename] = 0x0; hostname++; } #ifndef __WIN32__ hp = ei_gethostbyname_r(hostname,&host,buffer,1024,&ei_h_errno); if (hp == NULL) { char thishostname[EI_MAXHOSTNAMELEN+1]; if (gethostname(thishostname,EI_MAXHOSTNAMELEN) < 0) { EI_TRACE_ERR0("ei_connect_tmo", "Failed to get name of this host"); erl_errno = EHOSTUNREACH; return ERL_ERROR; } else { char *ct; /* We use a short node name */ if ((ct = strchr(thishostname, '.')) != NULL) *ct = '\0'; } if (strcmp(hostname,thishostname) == 0) /* Both nodes on same standalone host, use loopback */ hp = ei_gethostbyname_r("localhost",&host,buffer,1024,&ei_h_errno); if (hp == NULL) { EI_TRACE_ERR2("ei_connect", "Can't find host for %s: %d\n",nodename,ei_h_errno); erl_errno = EHOSTUNREACH; return ERL_ERROR; } }#else /* __WIN32__ */ if ((hp = ei_gethostbyname(hostname)) == NULL) { char thishostname[EI_MAXHOSTNAMELEN+1]; if (gethostname(thishostname,EI_MAXHOSTNAMELEN) < 0) { EI_TRACE_ERR1("ei_connect_tmo", "Failed to get name of this host: %d", WSAGetLastError()); erl_errno = EHOSTUNREACH; return ERL_ERROR; } else { char *ct; /* We use a short node name */ if ((ct = strchr(thishostname, '.')) != NULL) *ct = '\0'; } if (strcmp(hostname,thishostname) == 0) /* Both nodes on same standalone host, use loopback */ hp = ei_gethostbyname("localhost"); if (hp == NULL) { char reason[1024]; win32_error(reason,sizeof(reason)); EI_TRACE_ERR2("ei_connect", "Can't find host for %s: %s",nodename,reason); erl_errno = EHOSTUNREACH; return ERL_ERROR; } }#endif /* win32 */ return ei_xconnect_tmo(ec, (Erl_IpAddr) *hp->h_addr_list, alivename, ms);} /* ei_connect */int ei_connect(ei_cnode* ec, char *nodename){ return ei_connect_tmo(ec, nodename, 0);} /* ip_addr is now in network byte order * * first we have to get hold of the portnumber to * the node through epmd at that host **/int ei_xconnect_tmo(ei_cnode* ec, Erl_IpAddr adr, char *alivename, unsigned ms){ struct in_addr *ip_addr=(struct in_addr *) adr; int rport = 0; /*uint16 rport = 0;*/ int sockd; int one = 1; int dist = 0; ErlConnect her_name; unsigned her_flags, her_version; erl_errno = EIO; /* Default error code */ EI_TRACE_CONN1("ei_xconnect","-> CONNECT attempt to connect to %s", alivename); if ((rport = ei_epmd_port_tmo(ip_addr,alivename,&dist, ms)) < 0) { EI_TRACE_ERR0("ei_xconnect","-> CONNECT can't get remote port"); /* ei_epmd_port_tmo() has set erl_errno */ return ERL_NO_PORT; } /* we now have port number to enode, try to connect */ if((sockd = cnct((uint16)rport, ip_addr, sizeof(struct in_addr),ms)) < 0) { EI_TRACE_ERR0("ei_xconnect","-> CONNECT socket connect failed"); /* cnct() has set erl_errno */ return ERL_CONNECT_FAIL; } EI_TRACE_CONN0("ei_xconnect","-> CONNECT connected to remote"); /* FIXME why connect before checking 'dist' output from ei_epmd_port() ?! */ if (dist <= 4) { EI_TRACE_ERR0("ei_xconnect","-> CONNECT remote version not compatible"); goto error; } else { unsigned our_challenge, her_challenge; unsigned char our_digest[16]; if (send_name(sockd, ec->thisnodename, (unsigned) dist, ms)) goto error; if (recv_status(sockd, ms)) goto error; if (recv_challenge(sockd, &her_challenge, &her_version, &her_flags, &her_name, ms)) goto error; our_challenge = gen_challenge(); gen_digest(her_challenge, ec->ei_connect_cookie, our_digest); if (send_challenge_reply(sockd, our_digest, our_challenge, ms)) goto error; if (recv_challenge_ack(sockd, our_challenge, ec->ei_connect_cookie, ms)) goto error; put_ei_socket_info(sockd, dist, null_cookie, ec); /* FIXME check == 0 */ } setsockopt(sockd, IPPROTO_TCP, TCP_NODELAY, (char *)&one, sizeof(one)); setsockopt(sockd, SOL_SOCKET, SO_KEEPALIVE, (char *)&one, sizeof(one)); EI_TRACE_CONN1("ei_xconnect","-> CONNECT (ok) remote = %s",alivename); erl_errno = 0; return sockd; error: EI_TRACE_ERR0("ei_xconnect","-> CONNECT failed"); closesocket(sockd); return ERL_ERROR;} /* ei_xconnect */int ei_xconnect(ei_cnode* ec, Erl_IpAddr adr, char *alivename){ return ei_xconnect_tmo(ec, adr, alivename, 0);} /* * For symmetry reasons*/#if 0int ei_close_connection(int fd){ return closesocket(fd);} /* ei_close_connection */#endif /* * Accept and initiate a connection from an other * Erlang node. Return a file descriptor at success, * otherwise -1;*/int ei_accept(ei_cnode* ec, int lfd, ErlConnect *conp){ return ei_accept_tmo(ec, lfd, conp, 0);}int ei_accept_tmo(ei_cnode* ec, int lfd, ErlConnect *conp, unsigned ms){ int fd; struct sockaddr_in cli_addr; int cli_addr_len=sizeof(struct sockaddr_in); unsigned her_version, her_flags; ErlConnect her_name; erl_errno = EIO; /* Default error code */ EI_TRACE_CONN0("ei_accept","<- ACCEPT waiting for connection"); if ((fd = ei_accept_t(lfd, (struct sockaddr*) &cli_addr, &cli_addr_len, ms )) < 0) { EI_TRACE_ERR0("ei_accept","<- ACCEPT socket accept failed"); erl_errno = (fd == -2) ? ETIMEDOUT : EIO; goto error; } EI_TRACE_CONN0("ei_accept","<- ACCEPT connected to remote"); if (recv_name(fd, &her_version, &her_flags, &her_name, ms)) { EI_TRACE_ERR0("ei_accept","<- ACCEPT initial ident failed"); goto error; } if (her_version <= 4) { EI_TRACE_ERR0("ei_accept","<- ACCEPT remote version not compatible"); goto error; } else { unsigned our_challenge; unsigned her_challenge; unsigned char our_digest[16]; if (send_status(fd,"ok", ms)) goto error; our_challenge = gen_challenge(); if (send_challenge(fd, ec->thisnodename, our_challenge, her_version, ms)) goto error; if (recv_challenge_reply(fd, our_challenge, ec->ei_connect_cookie, &her_challenge, ms)) goto error; gen_digest(her_challenge, ec->ei_connect_cookie, our_digest); if (send_challenge_ack(fd, our_digest, ms)) goto error; put_ei_socket_info(fd, her_version, null_cookie, ec); } if (conp) *conp = her_name; EI_TRACE_CONN1("ei_accept","<- ACCEPT (ok) remote = %s",her_name.nodename); erl_errno = 0; /* No error */ return fd; error: EI_TRACE_ERR0("ei_accept","<- ACCEPT failed"); closesocket(fd); return ERL_ERROR;} /* ei_accept *//* Receives a message from an Erlang socket. * If the message was a TICK it is immediately * answered. Returns: ERL_ERROR, ERL_TICK or * the number of bytes read. */int ei_receive_tmo(int fd, unsigned char *bufp, int bufsize, unsigned ms) { int len; unsigned char fourbyte[4]={0,0,0,0}; int res; if ((res = ei_read_fill_t(fd, (char *) bufp, 4, ms)) != 4) { erl_errno = (res == -2) ? ETIMEDOUT : EIO; return ERL_ERROR; } /* Tick handling */ if ((len = get_int32(bufp)) == ERL_TICK) { ei_write_fill_t(fd, (char *) fourbyte, 4, ms); /* FIXME ok to ignore error or timeout? */ erl_errno = EAGAIN; return ERL_TICK; } else if (len > bufsize) { /* FIXME: We should drain the message. */ erl_errno = EMSGSIZE; return ERL_ERROR; } else if ((res = ei_read_fill_t(fd, (char *) bufp, len, ms)) != len) { erl_errno = (res == -2) ? ETIMEDOUT : EIO; return ERL_ERROR; } return len; }int ei_receive(int fd, unsigned char *bufp, int bufsize) { return ei_receive_tmo(fd, bufp, bufsize, 0);} int ei_reg_send_tmo(ei_cnode* ec, int fd, char *server_name, char* buf, int len, unsigned ms){ erlang_pid *self = ei_self(ec); self->num = fd; /* erl_errno and return code is set by ei_reg_send_encoded_tmo() */ return ei_send_reg_encoded_tmo(fd, self, server_name, buf, len, ms);}int ei_reg_send(ei_cnode* ec, int fd, char *server_name, char* buf, int len){ return ei_reg_send_tmo(ec,fd,server_name,buf,len,0);}/* * Sends an Erlang message to a process at an Erlang node*/int ei_send_tmo(int fd, erlang_pid* to, char* buf, int len, unsigned ms){ /* erl_errno and return code is set by ei_reg_send_encoded_tmo() */ return ei_send_encoded_tmo(fd, to, buf, len, ms);}int ei_send(int fd, erlang_pid* to, char* buf, int len){ return ei_send_tmo(fd, to, buf, len, 0);}/* * Try to receive an Erlang message on a given socket. Returns* ERL_TICK, ERL_MSG, or ERL_ERROR. Sets `erl_errno' on ERL_ERROR and* ERL_TICK (to EAGAIN in the latter case).*/int ei_do_receive_msg(int fd, int staticbuffer_p, erlang_msg* msg, ei_x_buff* x, unsigned ms){ int msglen; int i; if (!(i=ei_recv_internal(fd, &x->buff, &x->buffsz, msg, &msglen, staticbuffer_p, ms))) { erl_errno = EAGAIN; return ERL_TICK; } if (i<0) { /* erl_errno set by ei_recv_internal() */ return ERL_ERROR; } if (staticbuffer_p && msglen > x->buffsz) { erl_errno = EMSGSIZE; return ERL_ERROR; } x->index = x->buffsz; switch (msg->msgtype) { /* FIXME are these all? */ case ERL_SEND: case ERL_REG_SEND: case ERL_LINK: case ERL_UNLINK: case ERL_GROUP_LEADER: case ERL_EXIT: case ERL_EXIT2: case ERL_NODE_LINK: return ERL_MSG; default: /*if (emsg->to) 'erl'_free_term(emsg->to); if (emsg->from) 'erl'_free_term(emsg->from); if (emsg->msg) 'erl'_free_term(emsg->msg); emsg->to = NULL; emsg->from = NULL; emsg->msg = NULL;*/ erl_errno = EIO; return ERL_ERROR; }} /* do_receive_msg */int ei_receive_msg(int fd, erlang_msg* msg, ei_x_buff* x){ return ei_do_receive_msg(fd, 1, msg, x, 0);}int ei_xreceive_msg(int fd, erlang_msg *msg, ei_x_buff *x){ return ei_do_receive_msg(fd, 0, msg, x, 0);}int ei_receive_msg_tmo(int fd, erlang_msg* msg, ei_x_buff* x, unsigned ms){ return ei_do_receive_msg(fd, 1, msg, x, ms);}int ei_xreceive_msg_tmo(int fd, erlang_msg *msg, ei_x_buff *x, unsigned ms){ return ei_do_receive_msg(fd, 0, msg, x, ms);}/* * The RPC consists of two parts, send and receive.* Here is the send part ! * { PidFrom, { call, Mod, Fun, Args, user }} *//** Now returns non-negative number for success, negative for failure.*/int ei_rpc_to(ei_cnode *ec, int fd, char *mod, char *fun, const char *buf, int len){ ei_x_buff x; erlang_pid *self = ei_self(ec); self->num = fd; /* encode header */ ei_x_new_with_version(&x); ei_x_encode_tuple_header(&x, 2); /* A */ self->num = fd; ei_x_encode_pid(&x, self); /* A 1 */ ei_x_encode_tuple_header(&x, 5); /* B A 2 */ ei_x_encode_atom(&x, "call"); /* B 1 */ ei_x_encode_atom(&x, mod); /* B 2 */ ei_x_encode_atom(&x, fun); /* B 3 */ ei_x_append_buf(&x, buf, len); /* B 4 */ ei_x_encode_atom(&x, "user"); /* B 5 */ /* ei_x_encode_atom(&x,"user"); */ ei_send_reg_encoded(fd, self, "rex", x.buff, x.index); ei_x_free(&x); return 0;} /* rpc_to */ /* * And here is the rpc receiving part. A negative * timeout means 'infinity'. Returns either of: ERL_MSG, * ERL_TICK, ERL_ERROR or ERL_TIMEOUT.*/int ei_rpc_from(ei_cnode *ec, int fd, int timeout, erlang_msg *msg, ei_x_buff *x) { fd_set readmask; struct timeval tv; struct timeval *t = NULL; if (timeout >= 0) { tv.tv_sec = timeout / 1000; tv.tv_usec = (timeout % 1000) * 1000; t = &tv; } FD_ZERO(&readmask); FD_SET(fd,&readmask); switch (select(fd+1, &readmask, NULL, NULL, t)) { case -1: erl_errno = EIO; return ERL_ERROR; case 0: erl_errno = ETIMEDOUT; return ERL_TIMEOUT; default: if (FD_ISSET(fd, &readmask)) { return ei_xreceive_msg(fd, msg, x); } else { erl_errno = EIO; return ERL_ERROR; } }} /* rpc_from */ /* * A true RPC. It return a NULL pointer * in case of failure, otherwise a valid * (ETERM *) pointer containing the reply */int ei_rpc(ei_cnode* ec, int fd, char *mod, char *fun, const char* inbuf, int inbuflen, ei_x_buff* x){ int i, index; ei_term t; erlang_msg msg; char rex[MAXATOMLEN+1]; if (ei_rpc_to(ec, fd, mod, fun, inbuf, inbuflen) < 0) { return -1; } /* FIXME are we not to reply to the tick? */ while ((i = ei_rpc_from(ec, fd, ERL_NO_TIMEOUT, &msg, x)) == ERL_TICK) ; if (i == ERL_ERROR) return -1; /*ep = 'erl'_element(2,emsg.msg);*/ /* {RPC_Tag, RPC_Reply} */ index = 0; if (ei_decode_version(x->buff, &index, &i) < 0 || ei_decode_ei_term(x->buff, &index, &t) < 0) return -1; /* FIXME ei_decode_version don't set erl_errno as before */ /* FIXME this is strange, we don't check correct "rex" atom and we let it pass if not ERL_SMALL_TUPLE_EXT and arity == 2 */ if (t.ei_type == ERL_SMALL_TUPLE_EXT && t.arity == 2) if (ei_decode_atom(x->buff, &index, rex) < 0) return -1; /* remove header */ x->index -= index; memmove(x->buff, &x->buff[index], x->index); return 0;} /* ** Handshake*//* FROM RTP RFC 1889 (except that we use all bits, bug in RFC?) */static unsigned int md_32(char* string, int length){ MD5_CTX ctx; union { char c[16]; unsigned x[4]; } digest; ei_MD5Init(&ctx); ei_MD5Update(&ctx, (unsigned char *) string, (unsigned) length); ei_MD5Final((unsigned char *) digest.c, &ctx); return (digest.x[0] ^ digest.x[1] ^ digest.x[2] ^ digest.x[3]);}#if defined(__WIN32__)unsigned int gen_challenge(void){ struct { SYSTEMTIME tv; DWORD cpu; int pid; } s; GetSystemTime(&s.tv); s.cpu = GetTickCount(); s.pid = getpid(); return md_32((char*) &s, sizeof(s));}#elif defined(VXWORKS)static unsigned int gen_challenge(void){ struct { struct timespec tv; clock_t cpu; int pid; } s; s.cpu = clock(); clock_gettime(CLOCK_REALTIME, &s.tv); s.pid = getpid(); return md_32((char*) &s, sizeof(s));}#else /* some unix */static unsigned int gen_challenge(void){ struct { struct timeval tv; clock_t cpu; pid_t pid; u_long hid; uid_t uid; gid_t gid; struct utsname name; } s; gettimeofday(&s.tv, 0); uname(&s.name); s.cpu = clock(); s.pid = getpid(); s.hid = gethostid(); s.uid = getuid(); s.gid = getgid(); return md_32((char*) &s, sizeof(s));}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?