📄 select.c
字号:
}
session->lastUse = GNUNET_get_time ();
return GNUNET_OK;
}
/**
* Thread that selects until it is signaled to shut down.
*/
static void *
selectThread (void *ctx)
{
struct GNUNET_SelectHandle *sh = ctx;
GNUNET_CronTime now;
GNUNET_CronTime timeout;
char *clientAddr;
fd_set readSet;
fd_set errorSet;
fd_set writeSet;
struct stat buf;
socklen_t lenOfIncomingAddr;
int i;
int max;
int ret;
int s;
void *sctx;
SocketHandle *sock;
Session *session;
size_t size;
int old_errno;
struct timeval tv;
if (sh->max_addr_len != 0)
clientAddr = GNUNET_malloc (sh->max_addr_len);
else
clientAddr = NULL;
GNUNET_mutex_lock (sh->lock);
while (sh->shutdown == GNUNET_NO)
{
FD_ZERO (&readSet);
FD_ZERO (&errorSet);
FD_ZERO (&writeSet);
if (sh->signal_pipe[0] != -1)
{
if (-1 == FSTAT (sh->signal_pipe[0], &buf))
{
GNUNET_GE_LOG_STRERROR (sh->ectx,
GNUNET_GE_ERROR | GNUNET_GE_ADMIN |
GNUNET_GE_USER | GNUNET_GE_BULK,
"fstat");
sh->signal_pipe[0] = -1; /* prevent us from error'ing all the time */
}
else
{
FD_SET (sh->signal_pipe[0], &readSet);
}
}
max = sh->signal_pipe[0];
if (sh->listen_sock != NULL)
{
if (!GNUNET_socket_test_valid (sh->listen_sock))
{
GNUNET_socket_destroy (sh->listen_sock);
GNUNET_GE_LOG (sh->ectx,
GNUNET_GE_USER | GNUNET_GE_ERROR |
GNUNET_GE_BULK,
_("select listen socket for `%s' not valid!\n"),
sh->description);
sh->listen_sock = NULL; /* prevent us from error'ing all the time */
}
else
{
add_to_select_set (sh->listen_sock, &readSet, &max);
}
}
for (i = 0; i < sh->sessionCount; i++)
{
Session *session = sh->sessions[i];
struct GNUNET_SocketHandle *sock = session->sock;
if (!GNUNET_socket_test_valid (sock))
{
#if DEBUG_SELECT
GNUNET_GE_LOG (sh->ectx,
GNUNET_GE_DEBUG | GNUNET_GE_DEVELOPER |
GNUNET_GE_BULK,
"Select %p destroys invalid client handle %p\n",
sh, session);
#endif
destroySession (sh, session);
}
else
{
add_to_select_set (sock, &errorSet, &max);
if (session->no_read != GNUNET_YES)
add_to_select_set (sock, &readSet, &max);
GNUNET_GE_ASSERT (NULL, session->wapos >= session->wspos);
if (session->wapos > session->wspos)
add_to_select_set (sock, &writeSet, &max); /* do we have a pending write request? */
}
}
timeout = -1;
now = GNUNET_get_time ();
for (i = 0; i < sh->sessionCount; i++)
{
session = sh->sessions[i];
if (session->timeout != 0)
{
if (now > session->lastUse + session->timeout)
timeout = 0;
else
timeout =
GNUNET_MIN (timeout,
session->lastUse + session->timeout - now);
}
}
GNUNET_mutex_unlock (sh->lock);
tv.tv_sec = timeout / GNUNET_CRON_SECONDS;
tv.tv_usec = (timeout % GNUNET_CRON_SECONDS) * 1000;
ret =
SELECT (max + 1, &readSet, &writeSet, &errorSet,
(timeout == -1) ? NULL : &tv);
old_errno = errno;
GNUNET_mutex_lock (sh->lock);
if ((ret == -1) && ((old_errno == EAGAIN) || (old_errno == EINTR)))
continue;
if (ret == -1)
{
errno = old_errno;
if (errno == EBADF)
{
GNUNET_GE_LOG_STRERROR (sh->ectx,
GNUNET_GE_DEBUG | GNUNET_GE_DEVELOPER |
GNUNET_GE_BULK, "select");
}
else
{
GNUNET_GE_DIE_STRERROR (sh->ectx,
GNUNET_GE_FATAL | GNUNET_GE_ADMIN |
GNUNET_GE_USER | GNUNET_GE_IMMEDIATE,
"select");
}
continue;
}
if (sh->is_udp == GNUNET_NO)
{
if ((sh->listen_sock != NULL) &&
(FD_ISSET (sh->listen_sock->handle, &readSet)))
{
lenOfIncomingAddr = sh->max_addr_len;
memset (clientAddr, 0, lenOfIncomingAddr);
/* make sure this is non-blocking */
GNUNET_socket_set_blocking (sh->listen_sock, GNUNET_NO);
s = ACCEPT (sh->listen_sock->handle,
(struct sockaddr *) clientAddr, &lenOfIncomingAddr);
if (s == -1)
{
GNUNET_GE_LOG_STRERROR (sh->ectx,
GNUNET_GE_WARNING | GNUNET_GE_ADMIN
| GNUNET_GE_BULK, "accept");
GNUNET_GE_LOG (sh->ectx,
GNUNET_GE_WARNING | GNUNET_GE_ADMIN |
GNUNET_GE_BULK,
"Select %s failed to accept!\n",
sh->description);
if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
continue; /* not good, but not fatal either */
break;
}
if (sh->socket_quota <= 0)
{
SHUTDOWN (s, SHUT_WR);
if (0 != CLOSE (s))
GNUNET_GE_LOG_STRERROR (sh->ectx,
GNUNET_GE_WARNING |
GNUNET_GE_ADMIN | GNUNET_GE_BULK,
"close");
s = -1;
continue;
}
sh->socket_quota--;
#if DEBUG_SELECT
GNUNET_GE_LOG (sh->ectx,
GNUNET_GE_DEBUG | GNUNET_GE_DEVELOPER |
GNUNET_GE_BULK,
"Select %p is accepting connection: %d\n", sh,
s);
#endif
sock = GNUNET_socket_create (sh->ectx, sh->load_monitor, s);
GNUNET_mutex_unlock (sh->lock);
sctx = sh->ah (sh->ah_cls,
sh, sock, clientAddr, lenOfIncomingAddr);
GNUNET_mutex_lock (sh->lock);
#if DEBUG_SELECT
GNUNET_GE_LOG (sh->ectx,
GNUNET_GE_DEBUG | GNUNET_GE_DEVELOPER |
GNUNET_GE_BULK,
"Select %p is accepting connection: %p\n", sh,
sctx);
#endif
if (sctx == NULL)
{
GNUNET_socket_destroy (sock);
sh->socket_quota++;
}
else
{
session = GNUNET_malloc (sizeof (Session));
memset (session, 0, sizeof (Session));
session->timeout = sh->timeout;
session->sock = sock;
session->sock_ctx = sctx;
session->lastUse = GNUNET_get_time ();
if (sh->sessionArrayLength == sh->sessionCount)
GNUNET_array_grow (sh->sessions,
sh->sessionArrayLength,
sh->sessionArrayLength + 4);
sh->sessions[sh->sessionCount++] = session;
}
}
}
else
{ /* is_udp == GNUNET_YES */
if ((sh->listen_sock != NULL) &&
(FD_ISSET (sh->listen_sock->handle, &readSet)))
{
int pending;
int udp_sock;
int error;
socklen_t optlen;
udp_sock = sh->listen_sock->handle;
lenOfIncomingAddr = sh->max_addr_len;
memset (clientAddr, 0, lenOfIncomingAddr);
pending = 0;
optlen = sizeof (pending);
#ifdef OSX
error = GETSOCKOPT (udp_sock,
SOL_SOCKET, SO_NREAD, &pending, &optlen);
#elif MINGW
error = ioctlsocket (udp_sock, FIONREAD, &pending);
#else
error = ioctl (udp_sock, FIONREAD, &pending);
#endif
if ((error != 0) || (optlen != sizeof (pending)))
{
GNUNET_GE_LOG_STRERROR (sh->ectx,
GNUNET_GE_ERROR | GNUNET_GE_ADMIN |
GNUNET_GE_BULK, "ioctl");
pending = 65535; /* max */
}
#if DEBUG_SELECT
GNUNET_GE_LOG (sh->ectx,
GNUNET_GE_DEBUG | GNUNET_GE_DEVELOPER |
GNUNET_GE_BULK,
"Select %p is preparing to receive %u bytes from UDP\n",
sh, pending);
#endif
GNUNET_GE_ASSERT (sh->ectx, pending >= 0);
if (pending >= 65536)
pending = 65536;
if (pending == 0)
{
/* maybe empty UDP packet was sent (see report on bug-gnunet,
5/11/6; read 0 bytes from UDP just to kill potential empty packet! */
GNUNET_socket_recv_from (sh->listen_sock,
GNUNET_NC_NONBLOCKING,
NULL,
0, &size, clientAddr,
&lenOfIncomingAddr);
}
else
{
char *msg;
msg = GNUNET_malloc (pending);
size = 0;
ret = GNUNET_socket_recv_from (sh->listen_sock,
GNUNET_NC_NONBLOCKING,
msg,
pending,
&size,
clientAddr,
&lenOfIncomingAddr);
if (ret == GNUNET_SYSERR)
{
GNUNET_socket_close (sh->listen_sock);
}
else if (ret == GNUNET_OK)
{
/* validate msg format! */
const GNUNET_MessageHeader *hdr;
/* if size < pending, set pending to size */
if (size < pending)
pending = size;
hdr = (const GNUNET_MessageHeader *) msg;
if ((size == pending) &&
(size >= sizeof (GNUNET_MessageHeader)) &&
(ntohs (hdr->size) == size))
{
void *sctx;
GNUNET_mutex_unlock (sh->lock);
sctx = sh->ah (sh->ah_cls,
sh,
NULL, clientAddr, lenOfIncomingAddr);
GNUNET_mutex_lock (sh->lock);
if (sctx != NULL)
{
#if DEBUG_SELECT
GNUNET_GE_LOG (sh->ectx,
GNUNET_GE_DEBUG |
GNUNET_GE_DEVELOPER |
GNUNET_GE_BULK,
"Select %p is passing %u bytes from UDP to handler\n",
sh, size);
#endif
sh->mh (sh->mh_cls, sh, NULL, sctx, hdr);
sh->ch (sh->ch_cls, sh, NULL, sctx);
}
else
{
#if DEBUG_SELECT
GNUNET_GE_LOG (sh->ectx,
GNUNET_GE_DEBUG |
GNUNET_GE_DEVELOPER |
GNUNET_GE_BULK,
"Error in select %p -- connection refused\n",
sh);
#endif
}
}
else
{
#if DEBUG_SELECT
GNUNET_GE_BREAK (sh->ectx, size == pending);
GNUNET_GE_BREAK (sh->ectx,
size >=
sizeof (GNUNET_MessageHeader));
GNUNET_GE_BREAK (sh->ectx,
(size >=
sizeof (GNUNET_MessageHeader))
&& (ntohs (hdr->size) == size));
#endif
}
}
GNUNET_free (msg);
}
}
} /* end UDP processing */
if (FD_ISSET (sh->signal_pipe[0], &readSet))
{
/* allow reading multiple signals in one go in case we get many
in one shot... */
#define MAXSIG_BUF 128
char buf[MAXSIG_BUF];
/* just a signal to refresh sets, eat and continue */
if (0 >= READ (sh->signal_pipe[0], buf, MAXSIG_BUF))
{
GNUNET_GE_LOG_STRERROR (sh->ectx,
GNUNET_GE_WARNING | GNUNET_GE_USER |
GNUNET_GE_BULK, "read");
}
}
now = GNUNET_get_time ();
for (i = 0; i < sh->sessionCount; i++)
{
session = sh->sessions[i];
sock = session->sock;
if ((FD_ISSET (sock->handle, &readSet)) &&
(GNUNET_SYSERR == readAndProcess (sh, session)))
{
i--;
continue;
}
if ((FD_ISSET (sock->handle, &writeSet)) &&
(GNUNET_SYSERR == writeAndProcess (sh, session)))
{
i--;
continue;
}
if (FD_ISSET (sock->handle, &errorSet))
{
destroySession (sh, session);
i--;
continue;
}
if ((session->timeout != 0) &&
(now > session->lastUse + session->timeout))
{
destroySession (sh, session);
i--;
continue;
}
}
}
sh->description = "DEAD";
GNUNET_mutex_unlock (sh->lock);
GNUNET_free_non_null (clientAddr);
return NULL;
}
int
GNUNET_pipe_make_nonblocking (struct GNUNET_GE_Context *ectx, int handle)
{
#if MINGW
DWORD mode;
mode = PIPE_NOWAIT;
if (SetNamedPipeHandleState ((HANDLE) handle, &mode, NULL, NULL))
#if HAVE_PLIBC_FD
plibc_fd_set_blocking (handle, 0);
#else
__win_SetHandleBlockingMode (handle, 0);
#endif
/* don't report errors because Win9x doesn't support SetNamedPipeHandleState() */
#else
int flags = fcntl (handle, F_GETFL);
flags |= O_NONBLOCK;
if (-1 == fcntl (handle, F_SETFL, flags))
{
GNUNET_GE_LOG_STRERROR (ectx,
GNUNET_GE_WARNING | GNUNET_GE_USER |
GNUNET_GE_ADMIN | GNUNET_GE_IMMEDIATE, "fcntl");
return GNUNET_SYSERR;
}
#endif
return GNUNET_OK;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -