epmd_srv.c
来自「OTP是开放电信平台的简称」· C语言 代码 · 共 1,254 行 · 第 1/3 页
C
1,254 行
/* -*- c-indent-level: 2; c-continued-statement-offset: 2 -*- */ /* ``The contents of this file are subject to the Erlang Public License, * Version 1.1, (the "License"); you may not use this file except in * compliance with the License. You should have received a copy of the * Erlang Public License along with this software. If not, it can be * retrieved via the world wide web at http://www.erlang.org/. * * Software distributed under the License is distributed on an "AS IS" * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See * the License for the specific language governing rights and limitations * under the License. * * The Initial Developer of the Original Code is Ericsson Utvecklings AB. * Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings * AB. All Rights Reserved.'' * * $Id$ */#ifdef HAVE_CONFIG_H# include "config.h"#endif#include "epmd.h" /* Renamed from 'epmd_r4.h' */#include "epmd_int.h"/* * * This server is a local name server for Erlang nodes. Erlang nodes can * ask this server for the listening port of other Erlang nodes on the * machine EPMD is running on. New distributed nodes that are started * register their names with this server. * * To be accessible to all Erlang nodes this server listens to a well * known port EPMD_PORT_NO (curently port 4369) where requests * for connections can be sent. * * To keep track of when registered Erlang nodes are terminated this * server keeps the socket open where the request for registration was * made. * * The protocol is briefly documented in "erl_ext_dist.txt". All requests * to this server are done with a packet * * 2 n * +--------+---------+ * | Length | Request | * +--------+---------+ * * In all but one case there is only one request for each connection made * to this server so we can safely close the socket after sending the * reply. The exception is ALIVE_REQ where we keep the connection * open without sending any data. When we receive a "close" this is * an indication that the Erlang node was terminated. The termination * may have been "normal" or caused by a crash. The operating system * ensure that the connection is closed either way. * * Reading is done non-blocking, i.e. we call a "read" only if we are * told by the "select" function that there are data to read. * * Two databases are used: One node database where the registered names * of the nodes are stored, and one connection database where the state * of sockets and the data buffers is stored. * * Incomplete packets are thrown away after a timout. The Erlang node * doing the request is responsible for completing in it in a reasonable time. * * Note that if the server gets busy it may not have time to * process all requests for connection. The "accept()" function * will on most operating systems silently refuse to accept more * than 5 outstanding requests. It is the client's responsibility * to retry the request a number of times with random time interval. * The "-debug" flag will insert a delay so you can test this * behaviour. * * FIXME: In this code we assume that the packets we send on each * socket is so small that a "write()" never block * * FIXME: We never restarts a read or write that was terminated * by an interrupt. Do we need to? * *//* We use separate data structures for node names and connections so that a request will not use a slot with a name that we want to resuse later incrementing the "creation" *//* forward declarations */static void do_request(EpmdVars*,int,Connection*,char*,int);static int do_accept(EpmdVars*,int);static void do_read(EpmdVars*,Connection*);static time_t current_time(EpmdVars*);static Connection *conn_init(EpmdVars*);static int conn_open(EpmdVars*,int);static int conn_close_fd(EpmdVars*,int);static void node_init(EpmdVars*);static Node *node_reg(EpmdVars*,char*,int,int);static Node *node_reg2(EpmdVars*,char*, int, int, unsigned char, unsigned char, int, int, char*);static int node_unreg(EpmdVars*,char*);static int node_unreg_sock(EpmdVars*,int);static int reply(EpmdVars*,int,char *,int);static void dbg_print_buf(EpmdVars*,char *,int);static void print_names(EpmdVars*);void run(EpmdVars *g){ int listensock; int i; int opt; struct SOCKADDR_IN iserv_addr; node_init(g); g->conn = conn_init(g); dbg_printf(g,2,"try to initiate listening port %d", g->port); if ((listensock = socket(FAMILY,SOCK_STREAM,0)) < 0) { dbg_perror(g,"error opening stream socket"); epmd_cleanup_exit(g,1); } g->listenfd = listensock; /* * Initialize number of active file descriptors. * Stdin, stdout, and stderr are still open. * One for the listen socket. */ g->active_conn = 3+1; /* * Note that we must not enable the SO_REUSEADDR on Windows, * because addresses will be reused even if they are still in use. */ #if (!defined(__WIN32__) && !defined(_OSE_)) /* We ignore the SIGPIPE signal that is raised when we call write twice on a socket closed by the other end. */ signal(SIGPIPE, SIG_IGN); opt = 1; /* Set this option */ if (setsockopt(listensock,SOL_SOCKET,SO_REUSEADDR,(char* ) &opt, sizeof(opt)) <0) { dbg_perror(g,"can't set sockopt"); epmd_cleanup_exit(g,1); }#endif /* In rare cases select returns because there is someone to accept but the request is withdrawn before the accept function is called. We set the listen socket to be non blocking to prevent us from being hanging in accept() waiting for the next request. */#ifdef _OSE_ opt = 1; if (ioctl(listensock, FIONBIO, (char*)&opt) != 0)#else#if (defined(__WIN32__) || defined(NO_FCNTL)) opt = 1; if (ioctl(listensock, FIONBIO, &opt) != 0) /* Gives warning in VxWorks */#else opt = fcntl(listensock, F_GETFL, 0); if (fcntl(listensock, F_SETFL, opt | O_NONBLOCK) == -1)#endif /* __WIN32__ || VXWORKS */#endif /* _OSE_ */ dbg_perror(g,"failed to set non-blocking mode of listening socket %d", listensock); { /* store port number in unsigned short */ unsigned short sport = g->port; SET_ADDR_ANY(iserv_addr, FAMILY, sport); } #ifdef _OSE_ { int optlen = sizeof(opt); opt = 1; if(getsockopt(listensock, SOL_SOCKET, SO_REUSEADDR, (void*)&opt, &optlen) < 0) fprintf(stderr, "\n\nGETSOCKOPT FAILS! %d\n\n", errno); else if(opt == 1) fprintf(stderr, "SO_REUSEADDR is set!\n"); }#endif if(bind(listensock,(struct sockaddr*) &iserv_addr, sizeof(iserv_addr)) < 0 ) { if (errno == EADDRINUSE) { dbg_tty_printf(g,1,"there is already a epmd running at port %d", g->port); epmd_cleanup_exit(g,0); } else { dbg_perror(g,"failed to bind socket"); epmd_cleanup_exit(g,1); } } dbg_printf(g,2,"starting"); listen(listensock, SOMAXCONN); FD_ZERO(&g->orig_read_mask); FD_SET(listensock,&g->orig_read_mask); dbg_tty_printf(g,2,"entering the main select() loop"); select_again: while(1) { fd_set read_mask = g->orig_read_mask; struct timeval timeout; int ret; /* If we are idle we time out now and then to enable the code below to close connections that are old and probably hanging. Make sure that select will return often enough. */ timeout.tv_sec = (g->packet_timeout < IDLE_TIMEOUT) ? 1 : IDLE_TIMEOUT; timeout.tv_usec = 0; if ((ret = select(g->max_conn,&read_mask,(fd_set *)0,(fd_set *)0,&timeout)) < 0) dbg_perror(g,"error in select "); else { time_t now; if (ret == 0) { FD_ZERO(&read_mask); } if (g->delay_accept) { /* Test of busy server */ sleep(g->delay_accept); } if (FD_ISSET(listensock,&read_mask)) { if (do_accept(g, listensock) && g->active_conn < g->max_conn) { /* * The accept() succeeded, and we have at least one file * descriptor still free, which means that another accept() * could succeed. Go do do another select(), in case there * are more incoming connections waiting to be accepted. */ goto select_again; } } /* Check all open streams marked by select for data or a close. We also close all open sockets except ALIVE with no activity for a long period */ now = current_time(g); for (i = 0; i < g->max_conn; i++) { if (g->conn[i].open == TRUE) { if (FD_ISSET(g->conn[i].fd,&read_mask)) do_read(g,&g->conn[i]); else if ((g->conn[i].keep == FALSE) && ((g->conn[i].mod_time + g->packet_timeout) < now)) { dbg_tty_printf(g,1,"closing because timed out on receive"); epmd_conn_close(g,&g->conn[i]); } } } } }}/* * This routine read as much of the packet as possible and * if completed calls "do_request()" to fullfill the request. * */static void do_read(EpmdVars *g,Connection *s){ int val, pack_size; if (s->open == FALSE) { dbg_printf(g,0,"read on unknown socket"); return; } /* Check if we already got the whole packet but we keep the connection alive to find out when a node is terminated. We then want to check for a close */ if (s->keep == TRUE) { val = read(s->fd, s->buf, INBUF_SIZE); if (val == 0) { node_unreg_sock(g,s->fd); epmd_conn_close(g,s); } else if (val < 0) { dbg_tty_printf(g,1,"error on ALIVE socket %d (%d; errno=0x%x)", s->fd, val, errno); node_unreg_sock(g,s->fd); epmd_conn_close(g,s); } else { dbg_tty_printf(g,1,"got more than expected on ALIVE socket %d (%d)", s->fd,val); dbg_print_buf(g,s->buf,val); /* FIXME: Shouldn't be needed to close down.... */ node_unreg_sock(g,s->fd); epmd_conn_close(g,s); } /* FIXME: We always close, probably the right thing to do */ return; } /* If unknown size we request the whole buffer - what we got - 1 We subtract 1 because we will add a "\0" in "do_request()". This is not needed for R3A or higher versions of Erlang, because the '\0' is included in the request, but is kept for backwards compatibility to allow R2D to use this epmd. */ pack_size = s->want ? s->want : INBUF_SIZE - 1; val = read(s->fd, s->buf + s->got, pack_size - s->got); if (val == 0) { /* A close when we haven't got all data */ dbg_printf(g,0,"got partial packet only on file descriptor %d (%d)", s->fd,s->got); epmd_conn_close(g,s); return; } if (val < 0) { dbg_perror(g,"error in read"); epmd_conn_close(g,s); return; } dbg_print_buf(g,s->buf,val); s->got += val; if ((s->want == 0) && (s->got >= 2)) { /* The two byte header that specify the length of the packet doesn't count the header as part of the packet so we add 2 to "s->want" to make us talk about all bytes we get. */ s->want = get_int16(s->buf) + 2; if ((s->want < 3) || (s->want >= INBUF_SIZE)) { dbg_printf(g,0,"invalid packet size (%d)",s->want - 2); epmd_conn_close(g,s); return; } if (s->got > s->want) { dbg_printf(g,0,"got %d bytes in packet, expected %d", s->got - 2, s->want - 2); epmd_conn_close(g,s); return; } } s->mod_time = current_time(g); /* Note activity */ if (s->want == s->got) { /* Do action and close up */ /* Skip header bytes */ do_request(g, s->fd, s, s->buf + 2, s->got - 2); if (!s->keep) epmd_conn_close(g,s); /* Normal close */ }}static int do_accept(EpmdVars *g,int listensock){ int msgsock; struct SOCKADDR_IN icli_addr; /* workaround for QNX bug - cannot */ int icli_addr_len; /* handle NULL pointers to accept. */ icli_addr_len = sizeof(icli_addr); msgsock = accept(listensock,(struct sockaddr*) &icli_addr, (unsigned int*) &icli_addr_len); if (msgsock < 0) { dbg_perror(g,"error in accept"); return FALSE; } return conn_open(g,msgsock);}static void do_request(g, fd, s, buf, bsize) EpmdVars *g; int fd; Connection *s; char *buf; int bsize;{ char wbuf[OUTBUF_SIZE]; /* Buffer for writing */ int i;
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?