📄 select.c
字号:
/*
This file is part of GNUnet.
(C) 2003, 2006, 2007 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
by the Free Software Foundation; either version 2, or (at your
option) any later version.
GNUnet is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with GNUnet; see the file COPYING. If not, write to the
Free Software Foundation, Inc., 59 Temple Place - Suite 330,
Boston, MA 02111-1307, USA.
*/
/**
* @file util/network/select.c
* @brief (network) input/output operations
* @author Christian Grothoff
*/
#include "platform.h"
#include "gnunet_util.h"
#include "network.h"
#define DEBUG_SELECT GNUNET_NO
/**
* Select Session handle.
*/
typedef struct
{
/**
* the socket
*/
struct GNUNET_SocketHandle *sock;
/**
* Client connection context.
*/
void *sock_ctx;
/**
* The read buffer.
*/
char *rbuff;
/**
* The write buffer.
*/
char *wbuff;
GNUNET_CronTime lastUse;
/**
* Set to 0 initially, set to a much lower value
* if a "fast timeout" is desired.
*/
GNUNET_CronTime timeout;
/**
* 0 : can be destroyed
* 1 : if destruction is required, it must be delayed
* -1: delayed destruction required
* 2 : destruction in progress
*/
int locked;
/**
* Do not read from this socket until the
* current write is complete.
*/
int no_read;
/**
* Current read position in the buffer.
*/
unsigned int pos;
/**
* Current size of the read buffer.
*/
unsigned int rsize;
/**
* Position in the write buffer (for sending)
*/
unsigned int wspos;
/**
* Position in the write buffer (for appending)
*/
unsigned int wapos;
/**
* Size of the write buffer
*/
unsigned int wsize;
} Session;
typedef struct GNUNET_SelectHandle
{
const char *description;
/**
* mutex for synchronized access
*/
struct GNUNET_Mutex *lock;
/**
* one thread for listening for new connections,
* and for reading on all open sockets
*/
struct GNUNET_ThreadHandle *thread;
/**
* sock is the tcp socket that we listen on for new inbound
* connections. Maybe NULL if we are not listening.
*/
struct GNUNET_SocketHandle *listen_sock;
struct GNUNET_GE_Context *ectx;
struct GNUNET_LoadMonitor *load_monitor;
/**
* Array of currently active TCP sessions.
*/
Session **sessions;
GNUNET_SelectMessageHandler mh;
GNUNET_SelectAcceptHandler ah;
GNUNET_SelectCloseHandler ch;
void *mh_cls;
void *ah_cls;
void *ch_cls;
GNUNET_CronTime timeout;
/**
* tcp_pipe is used to signal the thread that is
* blocked in a select call that the set of sockets to listen
* to has changed.
*/
int signal_pipe[2];
int is_udp;
unsigned int sessionCount;
unsigned int sessionArrayLength;
int shutdown;
unsigned int max_addr_len;
unsigned int memory_quota;
int socket_quota;
} SelectHandle;
static void
add_to_select_set (struct GNUNET_SocketHandle *s, fd_set * set, int *max)
{
FD_SET (s->handle, set);
if (*max < s->handle)
*max = s->handle;
}
/**
* Write to the pipe to wake up the select thread (the set of
* files to watch has changed).
*/
static void
signalSelect (SelectHandle * sh)
{
static char i = '\0';
int ret;
#if DEBUG_SELECT
GNUNET_GE_LOG (sh->ectx,
GNUNET_GE_DEBUG | GNUNET_GE_DEVELOPER | GNUNET_GE_BULK,
"Signaling select %p.\n", sh);
#endif
ret = WRITE (sh->signal_pipe[1], &i, sizeof (char));
if (ret != sizeof (char))
GNUNET_GE_LOG_STRERROR (sh->ectx,
GNUNET_GE_ERROR | GNUNET_GE_ADMIN |
GNUNET_GE_BULK, "write");
}
/**
* Destroy the given session by closing the socket,
* releasing the buffers and removing it from the
* select set.
*
* This function may only be called if the tcplock is
* already held by the caller.
*/
static void
destroySession (SelectHandle * sh, Session * s)
{
int i;
if (s->locked == 1)
{
s->locked = -1;
return;
}
if (s->locked == 2)
return; /* already in process of destroying! */
s->locked = 2;
#if DEBUG_SELECT
GNUNET_GE_LOG (sh->ectx,
GNUNET_GE_DEBUG | GNUNET_GE_DEVELOPER | GNUNET_GE_BULK,
"Destroying session %p of select %p with %u in read and %u in write buffer.\n",
s, sh, s->rsize, s->wsize);
#endif
#if 0
if ((s->pos > 0) || (s->wapos > s->wspos))
fprintf (stderr,
"Destroying session %p of select %p with loss of %u in read and %u in write buffer.\n",
s, sh, s->pos, s->wapos - s->wspos);
#endif
for (i = 0; i < sh->sessionCount; i++)
{
if (sh->sessions[i] == s)
{
sh->sessions[i] = sh->sessions[sh->sessionCount - 1];
sh->sessionCount--;
break;
}
}
if (sh->sessionCount * 2 < sh->sessionArrayLength)
GNUNET_array_grow (sh->sessions, sh->sessionArrayLength,
sh->sessionCount);
GNUNET_mutex_unlock (sh->lock);
sh->ch (sh->ch_cls, sh, s->sock, s->sock_ctx);
GNUNET_mutex_lock (sh->lock);
GNUNET_socket_destroy (s->sock);
sh->socket_quota++;
GNUNET_array_grow (s->rbuff, s->rsize, 0);
GNUNET_array_grow (s->wbuff, s->wsize, 0);
GNUNET_free (s);
}
/**
* The socket of a session has data waiting, read and
* process!
*
* This function may only be called if the lock is
* already held by the caller.
* @return GNUNET_OK for success, GNUNET_SYSERR if session was destroyed
*/
static int
readAndProcess (SelectHandle * sh, Session * session)
{
const GNUNET_MessageHeader *pack;
int ret;
size_t recvd;
unsigned short len;
if (session->rsize == session->pos)
{
/* read buffer too small, grow */
GNUNET_array_grow (session->rbuff, session->rsize,
session->rsize + 1024);
}
ret = GNUNET_socket_recv (session->sock,
GNUNET_NC_NONBLOCKING | GNUNET_NC_IGNORE_INT,
&session->rbuff[session->pos],
session->rsize - session->pos, &recvd);
#if DEBUG_SELECT
GNUNET_GE_LOG (sh->ectx,
GNUNET_GE_DEBUG | GNUNET_GE_DEVELOPER | GNUNET_GE_BULK,
"Receiving from session %p of select %p return %d-%u (%s).\n",
sh, session, ret, recvd, STRERROR (errno));
#endif
if (ret != GNUNET_OK)
{
destroySession (sh, session);
return GNUNET_SYSERR; /* other side closed connection */
}
session->pos += recvd;
while ((sh->shutdown == GNUNET_NO)
&& (session->pos >= sizeof (GNUNET_MessageHeader)))
{
pack = (const GNUNET_MessageHeader *) &session->rbuff[0];
len = ntohs (pack->size);
/* check minimum size */
if (len < sizeof (GNUNET_MessageHeader))
{
GNUNET_GE_LOG (sh->ectx,
GNUNET_GE_WARNING | GNUNET_GE_USER | GNUNET_GE_BULK,
_
("Received malformed message (too small) from connection. Closing.\n"));
destroySession (sh, session);
return GNUNET_SYSERR;
}
if (len > session->rsize) /* if message larger than read buffer, grow! */
GNUNET_array_grow (session->rbuff, session->rsize, len);
/* do we have the entire message? */
if (session->pos < len)
break; /* wait for more */
if (session->locked == 0)
session->locked = 1;
GNUNET_mutex_unlock (sh->lock);
if (GNUNET_OK != sh->mh (sh->mh_cls,
sh, session->sock, session->sock_ctx, pack))
{
GNUNET_mutex_lock (sh->lock);
if (session->locked == 1)
session->locked = 0;
destroySession (sh, session);
return GNUNET_SYSERR;
}
GNUNET_mutex_lock (sh->lock);
if (session->locked == -1)
{
session->locked = 0;
destroySession (sh, session);
return GNUNET_OK;
}
if (session->locked == 1)
session->locked = 0;
/* shrink buffer adequately */
memmove (&session->rbuff[0], &session->rbuff[len], session->pos - len);
session->pos -= len;
}
session->lastUse = GNUNET_get_time ();
return GNUNET_OK;
}
/**
* The socket of a session has data waiting that can be
* transmitted, do it!
*
* This function may only be called if the lock is
* already held by the caller.
* @return GNUNET_OK for success, GNUNET_SYSERR if session was destroyed
*/
static int
writeAndProcess (SelectHandle * sh, Session * session)
{
SocketHandle *sock;
int ret;
size_t size;
#if DEBUG_SELECT
GNUNET_GE_LOG (sh->ectx,
GNUNET_GE_DEBUG | GNUNET_GE_DEVELOPER | GNUNET_GE_BULK,
"Write and process called for session %p of select %p status %d.\n",
sh, session, sh->shutdown);
#endif
sock = session->sock;
while (sh->shutdown == GNUNET_NO)
{
ret = GNUNET_socket_send (sock,
GNUNET_NC_NONBLOCKING,
&session->wbuff[session->wspos],
session->wapos - session->wspos, &size);
#if DEBUG_SELECT
GNUNET_GE_LOG (sh->ectx,
GNUNET_GE_DEBUG | GNUNET_GE_DEVELOPER | GNUNET_GE_BULK,
"Sending %d bytes from session %p of select %s return %d.\n",
session->wapos - session->wspos, session,
sh->description, ret);
#endif
if (ret == GNUNET_SYSERR)
{
if ((errno == EPIPE) || (errno == ECONNRESET))
GNUNET_GE_LOG_STRERROR (sh->ectx,
GNUNET_GE_DEBUG | GNUNET_GE_USER |
GNUNET_GE_ADMIN | GNUNET_GE_BULK, "send");
else
GNUNET_GE_LOG_STRERROR (sh->ectx,
GNUNET_GE_WARNING | GNUNET_GE_USER |
GNUNET_GE_ADMIN | GNUNET_GE_BULK, "send");
destroySession (sh, session);
return GNUNET_SYSERR;
}
if (ret == GNUNET_OK)
{
if (size == 0)
{
/* send only returns 0 on error (happens if
other side closed connection), so close
the session */
destroySession (sh, session);
return GNUNET_SYSERR;
}
session->wspos += size;
if (session->wspos == session->wapos)
{
/* free compaction! */
session->wspos = 0;
session->wapos = 0;
session->no_read = GNUNET_NO;
if (session->wsize > sh->memory_quota)
{
/* if we went over quota before because of
force, use this opportunity to shrink
back to size! */
GNUNET_array_grow (session->wbuff, session->wsize,
sh->memory_quota);
}
}
break;
}
GNUNET_GE_ASSERT (sh->ectx, ret == GNUNET_NO);
/* this should only happen under Win9x because
of a bug in the socket implementation (KB177346).
Let's sleep and try again. */
GNUNET_thread_sleep (20 * GNUNET_CRON_MILLISECONDS);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -