libcom_tcp.c
来自「SRI international 发布的OAA框架软件」· C语言 代码 · 共 2,071 行 · 第 1/5 页
C
2,071 行
icl_AddToList(*sockets, icl_NewInt(socket), TRUE);
numberOfSockets++;
}
idlist = icl_ListNextElement(idlist);
}
return numberOfSockets;
} // end of com_find_all_connected_sockets
static int com_find_all_connection_ids(ICLTerm **ids) {
int answer = FALSE;
ICLTerm *result = NULL;
ICLListType *resultlist;
static ICLTerm *info = NULL;
if (!icl_IsValid(info)) {
info = icl_NewTermFromData(
"com_connection_info(ConnectionId,Protocol,Type,InfoList,Status)",63);
}
if ( (answer = db_Solve(commdb, info, ICL_EMPTY, &result)) ) {
resultlist = icl_List(result);
*ids = icl_NewList(NULL);
while(icl_ListHasMoreElements(resultlist)) {
ICLTerm *item = icl_ListElement(resultlist);
ICLTerm *id = icl_NthTerm(item,1);
icl_AddToList(*ids, icl_CopyTerm(id), FALSE);
resultlist = icl_ListNextElement(resultlist);
}
}
icl_Free(result);
return answer;
} // end of com_find_all_connection_ids
static char* com_get_connection_id_from_socket(int socket) {
char *answer = NULL;
ICLTerm *cids = NULL;
ICLListType *cidslist = NULL;
com_find_all_connection_ids(&cids);
cidslist = icl_List(cids);
while (icl_ListHasMoreElements(cidslist)) {
char *cid = icl_Str(icl_ListElement(cidslist));
if (socket == com_get_socket_from_connection_id(cid)) {
answer = strdup(cid);
break;
}
cidslist = icl_ListNextElement(cidslist);
}
icl_Free(cids);
return answer;
} // end of com_get_connection_id_from_socket
/**
* Get an event from a specific connection.
*/
EXPORT_MSCPP
int EXPORT_BORLAND
com_GetEventFromConnection(char *ConnectionId, double timeout, ICLTerm **event) {
int socketNumber = com_get_socket_from_connection_id(ConnectionId);
return com_select_event_from_socket(socketNumber, timeout, event);
} // end of com_GetEventFromConnection
/**
* Retrieve the dotted-decimal string
* IP address for the localhost. Returns the
* first address if the machine is multi-homed.
* (This behavior is consistent with
* InetAddress.getByName() in Java.)
*/
static char *com_get_localhost_ip_address(void) {
struct hostent *h = NULL;
char name[MAXHOSTNAMELEN + 1];
if ( (gethostname(name, sizeof(name)) != 0) ||
((h = gethostbyname(name)) == NULL) ) {
fprintf(stderr,"Warning: could not determine address for localhost\n");
return NULL;
}
return strdup(inet_ntoa(*((struct in_addr *) *(h->h_addr_list))));
}
static int com_is_binary_listener(int socket) {
return (socket == binaryListener);
} // end of com_is_binary_listener
static int com_is_listening() {
return stringListener > 0;
} // end of com_is_listening
/**
* Checks all sockets for incoming events.
* Also, since the main select() call
* is performed here, connection requests
* a handled on the listener sockets, if any.
*/
static int com_read_events_into_buffer(double timeout) {
int numberOfEventsAdded = 0;
ICLTerm *ids = NULL;
ICLTerm *connectedSockets = NULL;
ICLListType *connectedSocketList = NULL;
struct timeval time;
struct timeval* timep = NULL;
int width = 0;
fd_set read_set;
// timeout = 0.0 means select should block
if(timeout > 0) {
long i = (long)floor(timeout);
time.tv_sec = i;
time.tv_usec = (long)floor((timeout - i) * 1e6);
timep = &time;
}
FD_ZERO(&read_set);
// if we have listener sockets, add them to the fd_set
if (com_is_listening()) {
FD_SET(stringListener, &read_set);
FD_SET(binaryListener, &read_set);
}
com_find_all_connection_ids(&ids);
com_find_all_connected_sockets(ids, &connectedSockets);
connectedSocketList = icl_List(connectedSockets);
// add connected sockets to fd_set
while(icl_ListHasMoreElements(connectedSocketList)) {
int socket = icl_Int(icl_ListElement(connectedSocketList));
FD_SET(socket, &read_set);
width = MAX(width, socket+1);
connectedSocketList = icl_ListNextElement(connectedSocketList);
}
if (select(width, &read_set, NULL, NULL, timep) <= 0) {
// nothing to do...clean up and return
icl_Free(ids);
icl_Free(connectedSockets);
return 0;
}
if (com_is_listening()) {
// handle requests to connect, if any
if (FD_ISSET(stringListener, &read_set)) {
com_accept_new_connection(stringListener);
FD_CLR(stringListener, &read_set);
}
if (FD_ISSET(binaryListener, &read_set)) {
com_accept_new_connection(binaryListener);
FD_CLR(stringListener, &read_set);
}
}
// check connected sockets
connectedSocketList = icl_List(connectedSockets);
while(icl_ListHasMoreElements(connectedSocketList)) {
int socket = icl_Int(icl_ListElement(connectedSocketList));
// read incoming events to the eventbuffer
if (FD_ISSET(socket, &read_set)) {
ICLTerm *event = NULL;
double totalTime = 0;
while(((timeout == 0) || (totalTime < timeout)) &&
com_select_event_from_socket(socket, 0.01, &event)) {
com_add_event_to_buffer(event);
++numberOfEventsAdded;
totalTime += 0.01;
if(icl_IsStruct(event) &&
(icl_NumTerms(event) == 1) &&
(strcmp(icl_Functor(event), "event") == 0)) {
ICLTerm* firstArg = icl_NthTerm(event, 1);
if(icl_IsStr(firstArg) &&
(strcmp(icl_Str(firstArg), "timeout") == 0)) {
break;
}
}
}
}
connectedSocketList = icl_ListNextElement(connectedSocketList);
}
icl_Free(ids);
icl_Free(connectedSockets);
return numberOfEventsAdded;
} // end of com_read_events_into_buffer
static int com_select_event_from_buffer(ICLTerm **event) {
ICLListType *eventList = NULL;
// bail quickly if buffer is empty
checkEventBuffer();
if (icl_ListLen(eventBuffer) == 0 ) {
*event = NULL;
return FALSE;
}
// select and remove from the head of the list...
eventList = icl_List(eventBuffer);
*event = eventList->elt;
eventBuffer->p = eventList->next;
#ifdef NORMAL_GC
memset(eventList, 0, sizeof(ICLListType));
#endif
free(eventList);
return TRUE;
} // end of com_select_event_from_buffer
/**
* Based on com_SelectEvent(), this function
* selects from the specified socket.
*/
static int com_select_event_from_socket(int socketNumber, double timeout, ICLTerm **event){
ICLTerm* readerTerm;
ICLTerm* query;
TermReader* reader;
char* readerString;
char *inConnectionId;
inConnectionId = com_get_connection_id_from_socket(socketNumber);
query = icl_NewTermFromData("reader(_)",9);
if(!com_GetInfo(inConnectionId, query, &readerTerm)) {
icl_Free(query);
fprintf(stderr, "com_select_event_from_socket could not find reader\n");
icl_stFree(inConnectionId);
return FALSE;
}
readerString = icl_Str(icl_NthTerm(readerTerm, 1));
readerString = strdup(readerString);
icl_stRemoveQuotes(readerString);
reader = (TermReader*)stringToPointer(readerString);
free(readerString);
CHECK_LEAKS();
*event = termReader_getNextTerm(reader, timeout);
CHECK_LEAKS();
icl_Free(query);
icl_Free(readerTerm);
if(*event == NULL) {
int te = termReader_getError(reader);
// print an error message if it is not an "ordinary" error
if ( (te != TERMREADER_OKAY) &&
(te != TERMREADER_TIMEOUT) &&
(te != TERMREADER_READERR) &&
(te != TERMREADER_NOCONN) ) {
fprintf(stderr,
"com_select_event_from_socket error in reading term: %i\n", te);
}
// disconnect if it was not a timeout error
if ( (te != TERMREADER_OKAY) &&
(te != TERMREADER_TIMEOUT) ) {
com_Disconnect(inConnectionId);
// fprintf(stderr, "disconnected connection_id: %s socket: %d\n",
// inConnectionId, socketNumber);fflush(stderr);
}
icl_stFree(inConnectionId);
return FALSE;
}
else {
// if present, copy connection id into params (for direct_connect)
if (!strcmp(icl_Functor(*event),"term")) {
ICLTerm *CID = icl_NewStruct("connection_id",1,icl_NewStr(inConnectionId));
if (icl_NumTerms(*event) == 1) {
icl_AddToList(icl_NthTerm(icl_NthTerm(*event,1),2), CID, TRUE);
}
else if (icl_NumTerms(*event) == 2) {
icl_AddToList(icl_NthTerm(icl_NthTerm(*event,2),2), CID, TRUE);
}
}
icl_stFree(inConnectionId);
return TRUE;
}
} // end of com_select_event_from_socket
/**
* Similar to the original com_SelectEvent(), this function
* selects from all the connections. It also checks
* the listen sockets, if any, for incoming connection
* requests.
*
* The main select() statement is contained
* in com_read_events_into_buffer().
*/
EXPORT_MSCPP
int EXPORT_BORLAND
com_SelectEventFromAllIds(double timeout, ICLTerm **event) {
int answer = FALSE;
checkEventBuffer();
if (com_select_event_from_buffer(event)) {
CHECK_LEAKS();
answer = TRUE;
}
else if (com_read_events_into_buffer(timeout)) {
CHECK_LEAKS();
answer = com_select_event_from_buffer(event);
CHECK_LEAKS();
}
else {
answer = FALSE;
}
CHECK_LEAKS();
return answer;
} // end of com_SelectEventFromAllIds
/**
* Generates a unique ConnectionId for a direct connection.
*/
static char* new_direct_client_id() {
static int counter = 1;
char temp[255];
sprintf(temp, "direct_client%d", counter++);
return strdup(temp);
} // end of new_direct_client_id
/**
* Opens and binds a TCP socket to a port number.
* @param port TCP port number
* @return a file descriptor to the open socket, or error status
*/
static int opentcpport(int port) {
int socketFd; /* File descriptor for the socket */
struct sockaddr_in addr; /* Address to receive data from */
// Open the socket
socketFd = socket(AF_INET, SOCK_STREAM, 0);
if (socketFd < 0) {
printf("opentcpport: socket open failure: %m\n");
return -1;
}
// Clear the address
memset(&addr, 0, sizeof(addr));
// Set the address family to INET
addr.sin_family = AF_INET;
// Set the address mask to any address using network byte order
addr.sin_addr.s_addr = htonl(INADDR_ANY);
// Set the port using network byte order
addr.sin_port = htons(port);
// Bind the port to the socket
if (bind(socketFd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
printf("opentcpport: bind failure on port %d: %m\n", port);
return -1;
}
// Return the file descriptor
return socketFd;
} // end of opentcpport
/**
* Sets up listen (server) socket at the specified port.
*/
static int tcpListenAtPort(int port) {
char listenAtString[600];
ICLTerm *listenAtTerm;
int socketFd;
// Make sure database is initialized
if (!commdb)
commdb = db_NewDB();
socketFd = opentcpport(port);
if (listen(socketFd, 5) < 0) // backlog = 5
{
printf("listen() error: %m\n");
}
if (socketFd >= 0) {
sprintf(listenAtString, "tcp_listener(%d)", socketFd);
}
listenAtTerm = icl_NewTermFromString(listenAtString);
if (listenAtTerm) {
db_Assert(commdb, listenAtTerm, ICL_EMPTY);
icl_Free(listenAtTerm);
}
else {
printf("Error! tcp_listener could not be constructed.\n");
}
return socketFd;
} // end of tcpListenAtPort
/**
* @defgroup Communications Communications
*
* TCP instantiation of lowlevel communication primitives for OAA.
*
* @{
*/
/**
* @file libcom_tcp.h
*/
/**
* @file libcom_tcp.c
* TCP instantiation of lowlevel communication primitives for OAA.
*/
/**
* @}
*/
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?