📄 rvsocketselect.c
字号:
/******************************************************************************
Filename : rvsocketselect.c
Description:
******************************************************************************
Copyright (c) 1999 RADVision Inc.
************************************************************************
NOTICE:
This document contains information that is proprietary to RADVision LTD.
No part of this publication may be reproduced in any form whatsoever
without written prior approval by RADVision LTD..
RADVision LTD. reserves the right to revise this publication and make
changes without obligation to notify any person of such revisions or
changes.
******************************************************************************
$Revision:$
$Date:$
$Author: S. Cipolli$
*****************************************************************************/
#include <stddef.h>
#include "rvplatform.h"
#include "rvthread.h"
#include "rvsocket_.h"
#include "rvsocketselect.h"
/*********************************************************************
* RvSocketListener
*********************************************************************/
RvSocketListener* rvSocketListenerConstruct(RvSocketListener* sl, RvSocket* s,
void (*func)(struct RvSocketSelect_*, struct RvSocketListener_*, void*),
void* data) {
sl->socket = s;
sl->func = func;
sl->data = data;
return sl;
}
/*********************************************************************
* RvSocketSelect
*********************************************************************/
static void rvSocketSelectProcessCommand(RvSocketSelect* ss) {
char buf[32];
RvSocketAddr fromAddr;
char command;
rvSocketRecvFrom(&ss->commandSocket, buf, sizeof(buf), &fromAddr);
command = buf[0];
switch(command)
{
case 'T': /* Terminate */
{
ss->state = RV_SOCKETSELECTSTATE_TERMINATED;
break;
}
case 'R': /* Register */
{
/* Nothing needs to be done. This command is sent only
to exit the select statement so we can add the newly
registered socket into the select's fd_set */
break;
}
}
}
static void rvSocketSelectProcess(RvSocketSelect* ss) {
RvSocket maxFd = 0;
Rv_fd_set sockets;
RvSocketListener* sl;
RvSocket sock;
int result;
#if defined(RV_SOCKETS_PSOS)
RvSocket sharecommand;
Rv_fd_set sockets_copy;
#endif
/* Build list of sockets */
RV_FD_ZERO(&sockets);
/* Add the command socket */
sock = ss->commandSocket;
#if defined(RV_SOCKETS_PSOS)
/* Get a shared handle for the command socket and remember it */
sharecommand = rvSharerShareSocket(sock);
sock = sharecommand;
#endif
RV_FD_SET(sock, &sockets);
maxFd = rvMax(maxFd, sock);
if(ss->state == RV_SOCKETSELECTSTATE_ALIVE) {
rvMutexLock(&ss->listLock);
ss->postmsg = rvTrue; /* after rebuilding list, wake us up for changes */
/* Add the listener sockets */
for (ss->iter = rvPtrListBegin(&ss->listeners);
ss->iter != rvPtrListEnd(&ss->listeners);
ss->iter = rvPtrListIterNext(ss->iter)) {
sl = rvPtrListIterData(ss->iter);
rvSocketListenerActivate(sl);
sock = *(sl->socket);
#if defined(RV_SOCKETS_PSOS)
/* get a shared handle for each socket and remember it */
sl->sharesocket = rvSharerShareSocket(sock);
sock = sl->sharesocket;
#endif
RV_FD_SET(sock, &sockets);
maxFd = rvMax(maxFd, sock);
}
rvMutexUnlock(&ss->listLock);
}
#if defined(RV_SOCKETS_PSOS)
/* remember which sockets we shared */
memcpy(&sockets_copy, &sockets, sizeof(Rv_fd_set));
#endif
/* Wait for one or more of the sockets */
result = rvSocketSelect_(maxFd + 1, &sockets, NULL, NULL, NULL);
#if defined(RV_SOCKETS_PSOS)
/* close all the shared socket handles that we opened */
for(sock = 0; sock <= maxFd; sock++) {
if(RV_FD_ISSET(sock, &sockets_copy))
rvSharerClose(sock);
}
#endif
if(result > 0) {
/* Check the command socket */
#if defined(RV_SOCKETS_PSOS)
sock = sharecommand;
#else
sock = ss->commandSocket;
#endif
if (RV_FD_ISSET(sock, &sockets))
rvSocketSelectProcessCommand(ss);
rvMutexLock(&ss->listLock);
ss->postmsg = rvFalse; /* don't bother us, we'll rebuild list anyway */
if(ss->state == RV_SOCKETSELECTSTATE_ALIVE) {
/* For each listener */
ss->iter = rvPtrListBegin(&ss->listeners);
while(ss->iter != rvPtrListEnd(&ss->listeners)) {
/* Get listener */
sl = rvPtrListIterData(ss->iter);
/* Read next now in case listener gets deleted */
ss->iter = rvPtrListIterNext(ss->iter);
/* If listener's socket was signalled... call the callback */
#if defined(RV_SOCKETS_PSOS)
sock = sl->sharesocket;
#else
sock = *sl->socket;
#endif
if(RV_FD_ISSET(sock, &sockets))
if(rvSocketListenerIsActivated(sl))
sl->func(ss, sl, sl->data);
}
}
rvMutexUnlock(&ss->listLock);
}
}
RvSocketSelect* rvSocketSelectConstruct(RvSocketSelect* ss, RvAlloc* a) {
ss->state = RV_SOCKETSELECTSTATE_UNBORN;
rvPtrListConstruct(&ss->listeners, a);
rvSocketConstructUdpAnyPort(&ss->commandSocket); /* force port assignment now */
#ifdef RV_INET_IPV6
{
RvIpv6Addr ipv6;
rvIpv6AddrConstruct(&ipv6, &RV_INET_ADDRLOOPBACK);
rvSocketAddrConstructIpv6(&ss->commandSocketAddr, &ipv6,
rvSocketGetPort(&ss->commandSocket));
}
#else
{
RvIpv4Addr ipv4;
rvIpv4AddrConstruct(&ipv4, htonl(RV_INET_ADDRLOOPBACK));
rvSocketAddrConstructIpv4(&ss->commandSocketAddr, &ipv4,
rvSocketGetPort(&ss->commandSocket));
}
#endif
ss->postmsg = rvFalse; /* don't send messages since we'll rebuild on first time */
ss->iter = rvPtrListEnd(&ss->listeners);
rvMutexConstruct(&ss->listLock);
return ss;
}
void rvSocketSelectDestruct(RvSocketSelect* ss) {
rvSocketDestruct(&ss->commandSocket);
rvPtrListDestruct(&ss->listeners);
rvSocketAddrDestruct(&ss->commandSocketAddr);
ss->postmsg = rvFalse;
rvMutexDestruct(&ss->listLock);
}
void rvSocketSelectStart(RvSocketSelect* ss) {
ss->state = RV_SOCKETSELECTSTATE_ALIVE;
#if defined(RV_OS_OSE)
/* OSE requires a task to have an open socket in any task that uses select */
/* so we open a dummy one to make it happy. */
rvSocketConstructUdp(&ss->dummySocket, 0);
#endif
while(ss->state != RV_SOCKETSELECTSTATE_TERMINATED)
rvSocketSelectProcess(ss);
#if defined(RV_OS_OSE)
rvSocketDestruct(&ss->dummySocket);
#endif
}
void rvSocketSelectStop(RvSocketSelect* ss) {
char command[] = {'T'};
/* Send the terminate command */
rvSocketSendTo(&ss->commandSocket, command, 1, &ss->commandSocketAddr);
}
void rvSocketSelectRegisterListener(RvSocketSelect* ss,
RvSocketListener* sl) {
RvBool sendCommand;
char command[] = {'R'};
rvMutexLock(&ss->listLock);
rvSocketListenerDeactivate(sl);
rvPtrListPushBack(&ss->listeners,sl);
sendCommand = ss->postmsg;
if(sendCommand == rvTrue) { /* see if command message needs to be sent */
ss->postmsg = rvFalse;
}
rvMutexUnlock(&ss->listLock);
/* Send the register command */
if(sendCommand == rvTrue)
rvSocketSendTo(&ss->commandSocket, command, 1, &ss->commandSocketAddr);
}
void rvSocketSelectUnregisterListener(RvSocketSelect* ss,
RvSocketListener* sl) {
rvMutexLock(&ss->listLock);
rvSocketListenerDeactivate(sl);
/* If the listener being removed is next in line
for callbacks, push the iterator forward */
if(rvPtrListIterData(ss->iter) == sl)
ss->iter = rvPtrListIterNext(ss->iter);
rvPtrListRemove(&ss->listeners,sl);
rvMutexUnlock(&ss->listLock);
}
RvSocketListener *rvSocketSelectUnregisterSocket(RvSocketSelect* ss, RvSocket *socket)
{
RvPtrListIter iter;
RvSocketListener *found = NULL;
rvMutexLock(&ss->listLock);
for(iter = rvPtrListBegin(&ss->listeners);
iter != rvPtrListEnd(&ss->listeners);
iter = rvPtrListIterNext(iter))
{
RvSocketListener *sl = (RvSocketListener *)rvPtrListIterData(iter);
if(rvSocketEqual(rvSocketListenerGetSocket(sl), socket))
{
found = sl;
rvPtrListErase(&ss->listeners, iter);
break;
}
}
rvMutexUnlock(&ss->listLock);
return found;
}
#if defined(RV_SOCKETS_NUCLEUS)
int rvSocketSelect_(int nfds, Rv_fd_set *readfds, Rv_fd_set *writefds, Rv_fd_set *exceptfds, const RvTimespec *timeout)
{
UNSIGNED t; /* Nucleus time is in ticks */
int result;
if (timeout != NULL) {
/* Convert timeout requested to time format used by select */
t = (UNSIGNED)(rvTimeConvertHrtime(timeout) / ((RvHrtime)RV_TIME_NSECPERSEC / (RvHrtime)TICKS_PER_SECOND));
if (t == 0) t = 1;
result = NU_Select(nfds, readfds, writefds, exceptfds, t);
} else
result = NU_Select(nfds, readfds, writefds, exceptfds, NU_SUSPEND);
/* Nucleus doesn't return the count */
if (result == NU_SUCCESS)
result = nfds;
return result;
}
#elif defined(RV_SOCKETS_WSOCK)
int rvSocketSelect_(int nfds, Rv_fd_set *readfds, Rv_fd_set *writefds, Rv_fd_set *exceptfds, const RvTimespec *timeout)
{
struct timeval t;
int result;
if (timeout != NULL) {
/* Convert timeout requested to time format used by select */
t.tv_sec = rvTimespecSecs(timeout);
t.tv_usec = rvTimespecNsecs(timeout) / RV_TIME_NSECPERUSEC;
result = select(nfds, readfds, writefds, exceptfds, &t);
} else
result = select(nfds, readfds, writefds, exceptfds, NULL);
return result;
}
#elif defined(RV_SOCKETS_BSD)
#if !defined(RV_OS_VXWORKS) && !defined(RV_OS_OSE)
# include <sys/time.h>
#endif
int rvSocketSelect_(int nfds, Rv_fd_set *readfds, Rv_fd_set *writefds, Rv_fd_set *exceptfds, const RvTimespec *timeout)
{
struct timeval t;
int result;
if (timeout != NULL) {
/* Convert timeout requested to time format used by select */
t.tv_sec = rvTimespecSecs(timeout);
t.tv_usec = rvTimespecNsecs(timeout) / RV_TIME_NSECPERUSEC;
result = select(nfds, readfds, writefds, exceptfds, &t);
} else
result = select(nfds, readfds, writefds, exceptfds, NULL);
return result;
}
#elif defined(RV_SOCKETS_PSOS)
/* Socket descriptors must be ones that have been shared to the calling task */
int rvSocketSelect_(int nfds, Rv_fd_set *readfds, Rv_fd_set *writefds, Rv_fd_set *exceptfds, const RvTimespec *timeout)
{
struct timeval t;
int result;
if (timeout != NULL) {
/* Convert timeout requested to time format used by select */
t.tv_sec = rvTimespecSecs(timeout);
t.tv_usec = rvTimespecNsecs(timeout) / RV_TIME_NSECPERUSEC;
result = select(nfds, readfds, writefds, exceptfds, &t);
} else
result = select(nfds, readfds, writefds, exceptfds, NULL);
return result;
}
#else
# error Unknown sockets implementation
#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -