ei_connect.c

来自「OTP是开放电信平台的简称」· C语言 代码 · 共 1,724 行 · 第 1/3 页

C
1,724
字号
#endif /* !win32 */        /* extract the host and alive parts from nodename */    if (!(hostname = strchr(nodename,'@'))) {	EI_TRACE_ERR0("ei_connect","Node name has no @ in name");	return ERL_ERROR;    } else {	strncpy(alivename, nodename, hostname - nodename);	alivename[hostname - nodename] = 0x0;	hostname++;    }    #ifndef __WIN32__    hp = ei_gethostbyname_r(hostname,&host,buffer,1024,&ei_h_errno);    if (hp == NULL) {	char thishostname[EI_MAXHOSTNAMELEN+1];	if (gethostname(thishostname,EI_MAXHOSTNAMELEN) < 0) {	    EI_TRACE_ERR0("ei_connect_tmo",			  "Failed to get name of this host");	    erl_errno = EHOSTUNREACH;	    return ERL_ERROR;	} else {	    char *ct;	    /* We use a short node name */    	    if ((ct = strchr(thishostname, '.')) != NULL) *ct = '\0';	}	if (strcmp(hostname,thishostname) == 0)	    /* Both nodes on same standalone host, use loopback */	    hp = ei_gethostbyname_r("localhost",&host,buffer,1024,&ei_h_errno);	if (hp == NULL) {	    EI_TRACE_ERR2("ei_connect",			  "Can't find host for %s: %d\n",nodename,ei_h_errno);	    erl_errno = EHOSTUNREACH;	    return ERL_ERROR;	}    }#else /* __WIN32__ */    if ((hp = ei_gethostbyname(hostname)) == NULL) {	char thishostname[EI_MAXHOSTNAMELEN+1];	if (gethostname(thishostname,EI_MAXHOSTNAMELEN) < 0) {	    EI_TRACE_ERR1("ei_connect_tmo",			  "Failed to get name of this host: %d", 			  WSAGetLastError());	    erl_errno = EHOSTUNREACH;	    return ERL_ERROR;	} else {	    char *ct;	    /* We use a short node name */    	    if ((ct = strchr(thishostname, '.')) != NULL) *ct = '\0';	}	if (strcmp(hostname,thishostname) == 0)	    /* Both nodes on same standalone host, use loopback */	    hp = ei_gethostbyname("localhost");	if (hp == NULL) {	    char reason[1024];	    win32_error(reason,sizeof(reason));	    EI_TRACE_ERR2("ei_connect",			  "Can't find host for %s: %s",nodename,reason);	    erl_errno = EHOSTUNREACH;	    return ERL_ERROR;	}    }#endif /* win32 */    return ei_xconnect_tmo(ec, (Erl_IpAddr) *hp->h_addr_list, alivename, ms);} /* ei_connect */int ei_connect(ei_cnode* ec, char *nodename){    return ei_connect_tmo(ec, nodename, 0);} /* ip_addr is now in network byte order   *  * first we have to get hold of the portnumber to  *  the node through epmd at that host   **/int ei_xconnect_tmo(ei_cnode* ec, Erl_IpAddr adr, char *alivename, unsigned ms){    struct in_addr *ip_addr=(struct in_addr *) adr;    int rport = 0; /*uint16 rport = 0;*/    int sockd;    int one = 1;    int dist = 0;    ErlConnect her_name;    unsigned her_flags, her_version;    erl_errno = EIO;		/* Default error code */        EI_TRACE_CONN1("ei_xconnect","-> CONNECT attempt to connect to %s",		   alivename);        if ((rport = ei_epmd_port_tmo(ip_addr,alivename,&dist, ms)) < 0) {	EI_TRACE_ERR0("ei_xconnect","-> CONNECT can't get remote port");	/* ei_epmd_port_tmo() has set erl_errno */	return ERL_NO_PORT;    }        /* we now have port number to enode, try to connect */    if((sockd = cnct((uint16)rport, ip_addr, sizeof(struct in_addr),ms)) < 0) {	EI_TRACE_ERR0("ei_xconnect","-> CONNECT socket connect failed");	/* cnct() has set erl_errno */	return ERL_CONNECT_FAIL;    }        EI_TRACE_CONN0("ei_xconnect","-> CONNECT connected to remote");    /* FIXME why connect before checking 'dist' output from ei_epmd_port() ?! */    if (dist <= 4) {	EI_TRACE_ERR0("ei_xconnect","-> CONNECT remote version not compatible");	goto error;    }    else {	unsigned our_challenge, her_challenge;	unsigned char our_digest[16];		if (send_name(sockd, ec->thisnodename, (unsigned) dist, ms))	    goto error;	if (recv_status(sockd, ms))	    goto error;	if (recv_challenge(sockd, &her_challenge, &her_version,	    &her_flags, &her_name, ms))	    goto error;	our_challenge = gen_challenge();	gen_digest(her_challenge, ec->ei_connect_cookie, our_digest);	if (send_challenge_reply(sockd, our_digest, our_challenge, ms))	    goto error;	if (recv_challenge_ack(sockd, our_challenge, 	    ec->ei_connect_cookie, ms))	    goto error;	put_ei_socket_info(sockd, dist, null_cookie, ec); /* FIXME check == 0 */    }        setsockopt(sockd, IPPROTO_TCP, TCP_NODELAY, (char *)&one, sizeof(one));    setsockopt(sockd, SOL_SOCKET, SO_KEEPALIVE, (char *)&one, sizeof(one));    EI_TRACE_CONN1("ei_xconnect","-> CONNECT (ok) remote = %s",alivename);        erl_errno = 0;    return sockd;    error:    EI_TRACE_ERR0("ei_xconnect","-> CONNECT failed");    closesocket(sockd);    return ERL_ERROR;} /* ei_xconnect */int ei_xconnect(ei_cnode* ec, Erl_IpAddr adr, char *alivename){    return ei_xconnect_tmo(ec, adr, alivename, 0);}  /*   * For symmetry reasons*/#if 0int ei_close_connection(int fd){    return closesocket(fd);} /* ei_close_connection */#endif  /*  * Accept and initiate a connection from an other  * Erlang node. Return a file descriptor at success,  * otherwise -1;*/int ei_accept(ei_cnode* ec, int lfd, ErlConnect *conp){    return ei_accept_tmo(ec, lfd, conp, 0);}int ei_accept_tmo(ei_cnode* ec, int lfd, ErlConnect *conp, unsigned ms){    int fd;    struct sockaddr_in cli_addr;    int cli_addr_len=sizeof(struct sockaddr_in);    unsigned her_version, her_flags;    ErlConnect her_name;    erl_errno = EIO;		/* Default error code */    EI_TRACE_CONN0("ei_accept","<- ACCEPT waiting for connection");        if ((fd = ei_accept_t(lfd, (struct sockaddr*) &cli_addr, 	&cli_addr_len, ms )) < 0) {	EI_TRACE_ERR0("ei_accept","<- ACCEPT socket accept failed");	erl_errno = (fd == -2) ? ETIMEDOUT : EIO;	goto error;    }        EI_TRACE_CONN0("ei_accept","<- ACCEPT connected to remote");        if (recv_name(fd, &her_version, &her_flags, &her_name, ms)) {	EI_TRACE_ERR0("ei_accept","<- ACCEPT initial ident failed");	goto error;    }        if (her_version <= 4) {	EI_TRACE_ERR0("ei_accept","<- ACCEPT remote version not compatible");	goto error;    }    else {	unsigned our_challenge;	unsigned her_challenge;	unsigned char our_digest[16];		if (send_status(fd,"ok", ms))	    goto error;	our_challenge = gen_challenge();	if (send_challenge(fd, ec->thisnodename, 	    our_challenge, her_version, ms))	    goto error;	if (recv_challenge_reply(fd, our_challenge, 	    ec->ei_connect_cookie, 	    &her_challenge, ms))	    goto error;	gen_digest(her_challenge, ec->ei_connect_cookie, our_digest);	if (send_challenge_ack(fd, our_digest, ms))	    goto error;	put_ei_socket_info(fd, her_version, null_cookie, ec);    }    if (conp) 	*conp = her_name;        EI_TRACE_CONN1("ei_accept","<- ACCEPT (ok) remote = %s",her_name.nodename);    erl_errno = 0;		/* No error */    return fd;    error:    EI_TRACE_ERR0("ei_accept","<- ACCEPT failed");    closesocket(fd);    return ERL_ERROR;} /* ei_accept *//* Receives a message from an Erlang socket. * If the message was a TICK it is immediately * answered. Returns: ERL_ERROR, ERL_TICK or * the number of bytes read. */int ei_receive_tmo(int fd, unsigned char *bufp, int bufsize, unsigned ms) {    int len;    unsigned char fourbyte[4]={0,0,0,0};    int res;     if ((res = ei_read_fill_t(fd, (char *) bufp, 4, ms))  != 4) {	erl_errno = (res == -2) ? ETIMEDOUT : EIO;	return ERL_ERROR;    }        /* Tick handling */    if ((len = get_int32(bufp)) == ERL_TICK)     {	ei_write_fill_t(fd, (char *) fourbyte, 4, ms);	/* FIXME ok to ignore error or timeout? */	erl_errno = EAGAIN;	return ERL_TICK;    }    else if (len > bufsize)     {	/* FIXME: We should drain the message. */	erl_errno = EMSGSIZE;	return ERL_ERROR;    }    else if ((res = ei_read_fill_t(fd, (char *) bufp, len, ms)) != len)    {	erl_errno = (res == -2) ? ETIMEDOUT : EIO;	return ERL_ERROR;    }        return len;    }int ei_receive(int fd, unsigned char *bufp, int bufsize) {    return ei_receive_tmo(fd, bufp, bufsize, 0);} int ei_reg_send_tmo(ei_cnode* ec, int fd, char *server_name,		    char* buf, int len, unsigned ms){    erlang_pid *self = ei_self(ec);    self->num = fd;    /* erl_errno and return code is set by ei_reg_send_encoded_tmo() */    return ei_send_reg_encoded_tmo(fd, self, server_name, buf, len, ms);}int ei_reg_send(ei_cnode* ec, int fd, char *server_name, char* buf, int len){    return ei_reg_send_tmo(ec,fd,server_name,buf,len,0);}/* * Sends an Erlang message to a process at an Erlang node*/int ei_send_tmo(int fd, erlang_pid* to, char* buf, int len, unsigned ms){    /* erl_errno and return code is set by ei_reg_send_encoded_tmo() */    return ei_send_encoded_tmo(fd, to, buf, len, ms);}int ei_send(int fd, erlang_pid* to, char* buf, int len){    return ei_send_tmo(fd, to, buf, len, 0);}/* * Try to receive an Erlang message on a given socket. Returns* ERL_TICK, ERL_MSG, or ERL_ERROR. Sets `erl_errno' on ERL_ERROR and* ERL_TICK (to EAGAIN in the latter case).*/int ei_do_receive_msg(int fd, int staticbuffer_p, 		      erlang_msg* msg, ei_x_buff* x, unsigned ms){    int msglen;    int i;        if (!(i=ei_recv_internal(fd, &x->buff, &x->buffsz, msg, &msglen, 	staticbuffer_p, ms))) {	erl_errno = EAGAIN;	return ERL_TICK;    }    if (i<0) {	/* erl_errno set by ei_recv_internal() */	return ERL_ERROR;    }    if (staticbuffer_p && msglen > x->buffsz)    {	erl_errno = EMSGSIZE;	return ERL_ERROR;    }    x->index = x->buffsz;    switch (msg->msgtype) {	/* FIXME are these all? */    case ERL_SEND:    case ERL_REG_SEND:    case ERL_LINK:    case ERL_UNLINK:    case ERL_GROUP_LEADER:    case ERL_EXIT:    case ERL_EXIT2:    case ERL_NODE_LINK:	return ERL_MSG;	    default:	/*if (emsg->to) 'erl'_free_term(emsg->to);	  if (emsg->from) 'erl'_free_term(emsg->from);	  if (emsg->msg) 'erl'_free_term(emsg->msg);	  emsg->to = NULL;	  emsg->from = NULL;	  emsg->msg = NULL;*/		erl_errno = EIO;	return ERL_ERROR;    }} /* do_receive_msg */int ei_receive_msg(int fd, erlang_msg* msg, ei_x_buff* x){    return ei_do_receive_msg(fd, 1, msg, x, 0);}int ei_xreceive_msg(int fd, erlang_msg *msg, ei_x_buff *x){    return ei_do_receive_msg(fd, 0, msg, x, 0);}int ei_receive_msg_tmo(int fd, erlang_msg* msg, ei_x_buff* x, unsigned ms){    return ei_do_receive_msg(fd, 1, msg, x, ms);}int ei_xreceive_msg_tmo(int fd, erlang_msg *msg, ei_x_buff *x, unsigned ms){    return ei_do_receive_msg(fd, 0, msg, x, ms);}/* * The RPC consists of two parts, send and receive.* Here is the send part ! * { PidFrom, { call, Mod, Fun, Args, user }} *//** Now returns non-negative number for success, negative for failure.*/int ei_rpc_to(ei_cnode *ec, int fd, char *mod, char *fun,	      const char *buf, int len){    ei_x_buff x;    erlang_pid *self = ei_self(ec);    self->num = fd;    /* encode header */    ei_x_new_with_version(&x);    ei_x_encode_tuple_header(&x, 2);  /* A */        self->num = fd;    ei_x_encode_pid(&x, self);	      /* A 1 */        ei_x_encode_tuple_header(&x, 5);  /* B A 2 */    ei_x_encode_atom(&x, "call");     /* B 1 */    ei_x_encode_atom(&x, mod);	      /* B 2 */    ei_x_encode_atom(&x, fun);	      /* B 3 */    ei_x_append_buf(&x, buf, len);    /* B 4 */    ei_x_encode_atom(&x, "user");     /* B 5 */    /* ei_x_encode_atom(&x,"user"); */    ei_send_reg_encoded(fd, self, "rex", x.buff, x.index);    ei_x_free(&x);	    return 0;} /* rpc_to */  /*  * And here is the rpc receiving part. A negative  * timeout means 'infinity'. Returns either of: ERL_MSG,  * ERL_TICK, ERL_ERROR or ERL_TIMEOUT.*/int ei_rpc_from(ei_cnode *ec, int fd, int timeout, erlang_msg *msg,		ei_x_buff *x) {    fd_set readmask;    struct timeval tv;    struct timeval *t = NULL;        if (timeout >= 0) {	tv.tv_sec = timeout / 1000;	tv.tv_usec = (timeout % 1000) * 1000;	t = &tv;    }        FD_ZERO(&readmask);    FD_SET(fd,&readmask);        switch (select(fd+1, &readmask, NULL, NULL, t)) {    case -1: 	erl_errno = EIO;	return ERL_ERROR;	    case 0:	erl_errno = ETIMEDOUT;	return ERL_TIMEOUT;	    default:	if (FD_ISSET(fd, &readmask)) {	    return ei_xreceive_msg(fd, msg, x);	} else {	    erl_errno = EIO;	    return ERL_ERROR;	}    }} /* rpc_from */  /*  * A true RPC. It return a NULL pointer  * in case of failure, otherwise a valid  * (ETERM *) pointer containing the reply  */int ei_rpc(ei_cnode* ec, int fd, char *mod, char *fun,	   const char* inbuf, int inbuflen, ei_x_buff* x){    int i, index;    ei_term t;    erlang_msg msg;    char rex[MAXATOMLEN+1];    if (ei_rpc_to(ec, fd, mod, fun, inbuf, inbuflen) < 0) {	return -1;    }    /* FIXME are we not to reply to the tick? */    while ((i = ei_rpc_from(ec, fd, ERL_NO_TIMEOUT, &msg, x)) == ERL_TICK)	;    if (i == ERL_ERROR)  return -1;    /*ep = 'erl'_element(2,emsg.msg);*/ /* {RPC_Tag, RPC_Reply} */    index = 0;    if (ei_decode_version(x->buff, &index, &i) < 0	|| ei_decode_ei_term(x->buff, &index, &t) < 0)	return -1;		/* FIXME ei_decode_version don't set erl_errno as before */    /* FIXME this is strange, we don't check correct "rex" atom       and we let it pass if not ERL_SMALL_TUPLE_EXT and arity == 2 */    if (t.ei_type == ERL_SMALL_TUPLE_EXT && t.arity == 2)	if (ei_decode_atom(x->buff, &index, rex) < 0)	    return -1;    /* remove header */    x->index -= index;    memmove(x->buff, &x->buff[index], x->index);    return 0;}  /*  ** Handshake*//* FROM RTP RFC 1889  (except that we use all bits, bug in RFC?) */static unsigned int md_32(char* string, int length){    MD5_CTX ctx;    union {	char c[16];	unsigned x[4];    } digest;    ei_MD5Init(&ctx);    ei_MD5Update(&ctx, (unsigned char *) string, 	       (unsigned) length);    ei_MD5Final((unsigned char *) digest.c, &ctx);    return (digest.x[0] ^ digest.x[1] ^ digest.x[2] ^ digest.x[3]);}#if defined(__WIN32__)unsigned int gen_challenge(void){    struct {	SYSTEMTIME tv;	DWORD cpu;	int pid;    } s;    GetSystemTime(&s.tv);    s.cpu  = GetTickCount();    s.pid  = getpid();    return md_32((char*) &s, sizeof(s));}#elif  defined(VXWORKS)static unsigned int gen_challenge(void){    struct {	struct timespec tv;	clock_t cpu;	int pid;    } s;    s.cpu  = clock();    clock_gettime(CLOCK_REALTIME, &s.tv);    s.pid = getpid();    return md_32((char*) &s, sizeof(s));}#else  /* some unix */static unsigned int gen_challenge(void){    struct {	struct timeval tv;	clock_t cpu;	pid_t pid;	u_long hid;	uid_t uid;	gid_t gid;	struct utsname name;    } s;    gettimeofday(&s.tv, 0);    uname(&s.name);    s.cpu  = clock();    s.pid  = getpid();    s.hid  = gethostid();    s.uid  = getuid();    s.gid  = getgid();    return md_32((char*) &s, sizeof(s));}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?