libcom_tcp.c

来自「SRI international 发布的OAA框架软件」· C语言 代码 · 共 2,071 行 · 第 1/5 页

C
2,071
字号
      icl_AddToList(*sockets, icl_NewInt(socket), TRUE);
      numberOfSockets++;
    }
    idlist = icl_ListNextElement(idlist);
  }
  return numberOfSockets;
} // end of com_find_all_connected_sockets


static int com_find_all_connection_ids(ICLTerm **ids) {
  int answer = FALSE;
  ICLTerm *result = NULL;
  ICLListType *resultlist;
  static ICLTerm *info = NULL;

  if (!icl_IsValid(info)) {
    info = icl_NewTermFromData(
      "com_connection_info(ConnectionId,Protocol,Type,InfoList,Status)",63);
  }

  if ( (answer = db_Solve(commdb, info, ICL_EMPTY, &result)) ) {
    resultlist = icl_List(result);
    *ids = icl_NewList(NULL);
    while(icl_ListHasMoreElements(resultlist)) {
      ICLTerm *item = icl_ListElement(resultlist);
      ICLTerm *id = icl_NthTerm(item,1);
      icl_AddToList(*ids, icl_CopyTerm(id), FALSE);
      resultlist = icl_ListNextElement(resultlist);
    }
  }
  icl_Free(result);
  return answer;
} // end of com_find_all_connection_ids

static char* com_get_connection_id_from_socket(int socket) {
  char *answer = NULL;
  ICLTerm *cids = NULL;
  ICLListType *cidslist = NULL;
  com_find_all_connection_ids(&cids);
  cidslist = icl_List(cids);
  while (icl_ListHasMoreElements(cidslist)) {
    char *cid = icl_Str(icl_ListElement(cidslist));
    if (socket == com_get_socket_from_connection_id(cid)) {
      answer = strdup(cid);
      break;
    }
    cidslist = icl_ListNextElement(cidslist);
  }
  icl_Free(cids);
  return answer;
} // end of com_get_connection_id_from_socket

/**
 * Get an event from a specific connection.
 */
EXPORT_MSCPP
int EXPORT_BORLAND
com_GetEventFromConnection(char *ConnectionId, double timeout, ICLTerm **event) {
  int socketNumber = com_get_socket_from_connection_id(ConnectionId);
  return com_select_event_from_socket(socketNumber, timeout, event);
} // end of com_GetEventFromConnection

/**
 * Retrieve the dotted-decimal string
 * IP address for the localhost. Returns the
 * first address if the machine is multi-homed.
 * (This behavior is consistent with
 * InetAddress.getByName() in Java.)
 */
static char *com_get_localhost_ip_address(void) {
    struct hostent *h = NULL;
    char name[MAXHOSTNAMELEN + 1];

    if ( (gethostname(name, sizeof(name)) != 0) ||
         ((h = gethostbyname(name)) == NULL) ) {
      fprintf(stderr,"Warning: could not determine address for localhost\n");
      return NULL;
    }
   return strdup(inet_ntoa(*((struct in_addr *) *(h->h_addr_list))));
}

static int com_is_binary_listener(int socket) {
  return (socket == binaryListener);
} // end of com_is_binary_listener

static int com_is_listening() {
  return stringListener > 0;
} // end of com_is_listening

/**
 * Checks all sockets for incoming events.
 * Also, since the main select() call
 * is performed here, connection requests
 * a handled on the listener sockets, if any.
 */
static int com_read_events_into_buffer(double timeout) {
  int numberOfEventsAdded = 0;
  ICLTerm *ids = NULL;
  ICLTerm *connectedSockets = NULL;
  ICLListType *connectedSocketList = NULL;

  struct timeval  time;
  struct timeval* timep = NULL;
  int width = 0;
  fd_set read_set;

  // timeout = 0.0 means select should block
  if(timeout > 0) {
    long i = (long)floor(timeout);
    time.tv_sec = i;
    time.tv_usec = (long)floor((timeout - i) * 1e6);
    timep = &time;
  }

  FD_ZERO(&read_set);

  // if we have listener sockets, add them to the fd_set
  if (com_is_listening()) {
    FD_SET(stringListener, &read_set);
    FD_SET(binaryListener, &read_set);
  }

  com_find_all_connection_ids(&ids);
  com_find_all_connected_sockets(ids, &connectedSockets);
  connectedSocketList = icl_List(connectedSockets);

  // add connected sockets to fd_set
  while(icl_ListHasMoreElements(connectedSocketList)) {
    int socket = icl_Int(icl_ListElement(connectedSocketList));
    FD_SET(socket, &read_set);
    width = MAX(width, socket+1);
    connectedSocketList = icl_ListNextElement(connectedSocketList);
  }

  if (select(width, &read_set, NULL, NULL, timep) <= 0) {
    // nothing to do...clean up and return
    icl_Free(ids);
    icl_Free(connectedSockets);
    return 0;
  }

  if (com_is_listening()) {
    // handle requests to connect, if any
    if (FD_ISSET(stringListener, &read_set)) {
      com_accept_new_connection(stringListener);
      FD_CLR(stringListener, &read_set);
    }

    if (FD_ISSET(binaryListener, &read_set)) {
      com_accept_new_connection(binaryListener);
      FD_CLR(stringListener, &read_set);
    }
  }

  // check connected sockets
  connectedSocketList = icl_List(connectedSockets);
  while(icl_ListHasMoreElements(connectedSocketList)) {
    int socket = icl_Int(icl_ListElement(connectedSocketList));

    // read incoming events to the eventbuffer
    if (FD_ISSET(socket, &read_set)) {
      ICLTerm *event = NULL;
      double totalTime = 0;
      while(((timeout == 0) || (totalTime < timeout)) &&
            com_select_event_from_socket(socket, 0.01, &event)) {
        com_add_event_to_buffer(event);
        ++numberOfEventsAdded;
        totalTime += 0.01;
        if(icl_IsStruct(event) &&
           (icl_NumTerms(event) == 1) &&
           (strcmp(icl_Functor(event), "event") == 0)) {
          ICLTerm* firstArg = icl_NthTerm(event, 1);
          if(icl_IsStr(firstArg) &&
             (strcmp(icl_Str(firstArg), "timeout") == 0)) {
            break;
          }
        }
      }
    }
    connectedSocketList = icl_ListNextElement(connectedSocketList);
  }

  icl_Free(ids);
  icl_Free(connectedSockets);
  return numberOfEventsAdded;
} // end of com_read_events_into_buffer

static int com_select_event_from_buffer(ICLTerm **event) {
  ICLListType *eventList = NULL;

  // bail quickly if buffer is empty
  checkEventBuffer();
  if (icl_ListLen(eventBuffer) == 0 ) {
    *event = NULL;
    return FALSE;
  }

  // select and remove from the head of the list...
  eventList = icl_List(eventBuffer);
  *event = eventList->elt;
  eventBuffer->p = eventList->next;
#ifdef NORMAL_GC
  memset(eventList, 0, sizeof(ICLListType));
#endif
  free(eventList);

  return TRUE;
} // end of com_select_event_from_buffer

/**
 * Based on com_SelectEvent(), this function
 * selects from the specified socket.
 */
static int com_select_event_from_socket(int socketNumber, double timeout, ICLTerm **event){
  ICLTerm* readerTerm;
  ICLTerm* query;
  TermReader* reader;
  char* readerString;
  char *inConnectionId;
  inConnectionId = com_get_connection_id_from_socket(socketNumber);
  query = icl_NewTermFromData("reader(_)",9);
  if(!com_GetInfo(inConnectionId, query, &readerTerm)) {
    icl_Free(query);
    fprintf(stderr, "com_select_event_from_socket could not find reader\n");
    icl_stFree(inConnectionId);
    return FALSE;
  }
  readerString = icl_Str(icl_NthTerm(readerTerm, 1));
  readerString = strdup(readerString);
  icl_stRemoveQuotes(readerString);
  reader = (TermReader*)stringToPointer(readerString);
  free(readerString);
  CHECK_LEAKS();
  *event = termReader_getNextTerm(reader, timeout);
  CHECK_LEAKS();
  icl_Free(query);
  icl_Free(readerTerm);
  if(*event == NULL) {
    int te = termReader_getError(reader);
    // print an error message if it is not an "ordinary" error
    if ( (te != TERMREADER_OKAY) &&
         (te != TERMREADER_TIMEOUT) &&
         (te != TERMREADER_READERR) &&
         (te != TERMREADER_NOCONN) ) {
      fprintf(stderr,
        "com_select_event_from_socket error in reading term: %i\n", te);
    }
    // disconnect if it was not a timeout error
    if ( (te != TERMREADER_OKAY) &&
         (te != TERMREADER_TIMEOUT) ) {
      com_Disconnect(inConnectionId);
//      fprintf(stderr, "disconnected connection_id: %s socket: %d\n",
//              inConnectionId, socketNumber);fflush(stderr);
    }
    icl_stFree(inConnectionId);
    return FALSE;
  }
  else {

    // if present, copy connection id into params (for direct_connect)
    if (!strcmp(icl_Functor(*event),"term")) {
      ICLTerm *CID = icl_NewStruct("connection_id",1,icl_NewStr(inConnectionId));
      if (icl_NumTerms(*event) == 1) {
        icl_AddToList(icl_NthTerm(icl_NthTerm(*event,1),2), CID, TRUE);
      }
      else if (icl_NumTerms(*event) == 2) {
        icl_AddToList(icl_NthTerm(icl_NthTerm(*event,2),2), CID, TRUE);
      }
    }
    icl_stFree(inConnectionId);
    return TRUE;
  }
} // end of com_select_event_from_socket

/**
 * Similar to the original com_SelectEvent(), this function
 * selects from all the connections. It also checks
 * the listen sockets, if any, for incoming connection
 * requests.
 *
 * The main select() statement is contained
 * in com_read_events_into_buffer().
 */
EXPORT_MSCPP
int EXPORT_BORLAND
com_SelectEventFromAllIds(double timeout, ICLTerm **event) {
  int answer = FALSE;
  checkEventBuffer();

  if (com_select_event_from_buffer(event)) {
    CHECK_LEAKS();
    answer = TRUE;
  }
  else if (com_read_events_into_buffer(timeout)) {
    CHECK_LEAKS();
    answer = com_select_event_from_buffer(event);
    CHECK_LEAKS();
  }
  else {
    answer = FALSE;
  }
  CHECK_LEAKS();
  return answer;
} // end of com_SelectEventFromAllIds

/**
 * Generates a unique ConnectionId for a direct connection.
 */
static char* new_direct_client_id() {
    static int counter = 1;
    char temp[255];
    sprintf(temp, "direct_client%d", counter++);
    return strdup(temp);
} // end of new_direct_client_id


/**
 *  Opens and binds a TCP socket to a port number.
 *  @param port TCP port number
 *  @return a file descriptor to the open socket, or error status
 */
static int opentcpport(int port) {
   int socketFd; /* File descriptor for the socket */
   struct sockaddr_in addr; /* Address to receive data from */

   // Open the socket
   socketFd = socket(AF_INET, SOCK_STREAM, 0);
   if (socketFd < 0) {
      printf("opentcpport: socket open failure: %m\n");
      return -1;
   }

   // Clear the address
   memset(&addr, 0, sizeof(addr));

   // Set the address family to INET
   addr.sin_family = AF_INET;

   // Set the address mask to any address using network byte order
   addr.sin_addr.s_addr = htonl(INADDR_ANY);

   // Set the port using network byte order
   addr.sin_port = htons(port);

   // Bind the port to the socket
   if (bind(socketFd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
      printf("opentcpport: bind failure on port %d: %m\n", port);
      return -1;
   }

   // Return the file descriptor
   return socketFd;
} // end of opentcpport

/**
 * Sets up listen (server) socket at the specified port.
 */
static int tcpListenAtPort(int port) {
  char listenAtString[600];
  ICLTerm *listenAtTerm;
  int socketFd;

  // Make sure database is initialized
  if (!commdb)
    commdb = db_NewDB();

    socketFd = opentcpport(port);

    if (listen(socketFd, 5) < 0) // backlog = 5
    {
        printf("listen() error: %m\n");
    }

    if (socketFd >= 0) {
        sprintf(listenAtString, "tcp_listener(%d)", socketFd);
    }
    listenAtTerm = icl_NewTermFromString(listenAtString);

    if (listenAtTerm) {
        db_Assert(commdb, listenAtTerm, ICL_EMPTY);
        icl_Free(listenAtTerm);
    }
    else {
        printf("Error! tcp_listener could not be constructed.\n");
    }

    return socketFd;
} // end of tcpListenAtPort


/**
 * @defgroup Communications Communications
 *
 * TCP instantiation of lowlevel communication primitives for OAA.
 *
 * @{
 */

/**
 * @file libcom_tcp.h
 */

/**
 * @file libcom_tcp.c
 * TCP instantiation of lowlevel communication primitives for OAA.
 */

/**
 * @}
 */

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?