📄 select.c
字号:
}
/**
* Start a select thread that will accept connections
* from the given socket and pass messages read to the
* given message handler.
*
* @param sock the listen socket
* @param max_addr_len maximum expected length of addresses for
* connections accepted on the given socket
* @param mon maybe NULL
* @param memory_quota amount of memory available for
* queueing messages (in bytes)
* @return NULL on error
*/
SelectHandle *
GNUNET_select_create (const char *description,
int is_udp,
struct GNUNET_GE_Context * ectx,
struct GNUNET_LoadMonitor * mon,
int sock,
unsigned int max_addr_len,
GNUNET_CronTime timeout,
GNUNET_SelectMessageHandler mh,
void *mh_cls,
GNUNET_SelectAcceptHandler ah,
void *ah_cls,
GNUNET_SelectCloseHandler ch,
void *ch_cls, unsigned int memory_quota,
int socket_quota)
{
SelectHandle *sh;
if ((is_udp == GNUNET_NO) && (sock != -1) && (0 != LISTEN (sock, 5)))
{
GNUNET_GE_LOG_STRERROR (ectx,
GNUNET_GE_ERROR | GNUNET_GE_USER |
GNUNET_GE_IMMEDIATE, "listen");
return NULL;
}
GNUNET_GE_ASSERT (ectx, description != NULL);
sh = GNUNET_malloc (sizeof (SelectHandle));
memset (sh, 0, sizeof (SelectHandle));
sh->is_udp = is_udp;
sh->description = description;
if (0 != PIPE (sh->signal_pipe))
{
GNUNET_GE_LOG_STRERROR (ectx,
GNUNET_GE_ERROR | GNUNET_GE_USER |
GNUNET_GE_IMMEDIATE, "pipe");
GNUNET_free (sh);
return NULL;
}
if ((GNUNET_OK !=
GNUNET_pipe_make_nonblocking (sh->ectx, sh->signal_pipe[0])) ||
(GNUNET_OK !=
GNUNET_pipe_make_nonblocking (sh->ectx, sh->signal_pipe[1])))
{
if ((0 != CLOSE (sh->signal_pipe[0])) ||
(0 != CLOSE (sh->signal_pipe[1])))
GNUNET_GE_LOG_STRERROR (ectx,
GNUNET_GE_ERROR | GNUNET_GE_IMMEDIATE |
GNUNET_GE_ADMIN, "close");
GNUNET_free (sh);
return NULL;
}
sh->shutdown = GNUNET_NO;
sh->ectx = ectx;
sh->load_monitor = mon;
sh->max_addr_len = max_addr_len;
sh->mh = mh;
sh->mh_cls = mh_cls;
sh->ah = ah;
sh->ah_cls = ah_cls;
sh->ch = ch;
sh->ch_cls = ch_cls;
sh->memory_quota = memory_quota;
sh->socket_quota = socket_quota;
sh->timeout = timeout;
sh->lock = GNUNET_mutex_create (GNUNET_YES);
if (sock != -1)
sh->listen_sock = GNUNET_socket_create (ectx, mon, sock);
else
sh->listen_sock = NULL;
sh->thread = GNUNET_thread_create (&selectThread, sh, 256 * 1024);
if (sh->thread == NULL)
{
GNUNET_GE_LOG_STRERROR (ectx,
GNUNET_GE_ERROR | GNUNET_GE_IMMEDIATE |
GNUNET_GE_ADMIN, "pthread_create");
if (sh->listen_sock != NULL)
GNUNET_socket_destroy (sh->listen_sock);
if ((0 != CLOSE (sh->signal_pipe[0])) ||
(0 != CLOSE (sh->signal_pipe[1])))
GNUNET_GE_LOG_STRERROR (ectx,
GNUNET_GE_ERROR | GNUNET_GE_IMMEDIATE |
GNUNET_GE_ADMIN, "close");
GNUNET_mutex_destroy (sh->lock);
GNUNET_free (sh);
return NULL;
}
return sh;
}
/**
* Terminate the select thread, close the socket and
* all associated connections.
*/
void
GNUNET_select_destroy (struct GNUNET_SelectHandle *sh)
{
void *unused;
#if DEBUG_SELECT
GNUNET_GE_LOG (sh->ectx,
GNUNET_GE_DEBUG | GNUNET_GE_DEVELOPER | GNUNET_GE_BULK,
"Destroying select %p\n", sh);
#endif
sh->shutdown = GNUNET_YES;
signalSelect (sh);
GNUNET_thread_stop_sleep (sh->thread);
GNUNET_thread_join (sh->thread, &unused);
sh->thread = NULL;
GNUNET_mutex_lock (sh->lock);
while (sh->sessionCount > 0)
destroySession (sh, sh->sessions[0]);
GNUNET_array_grow (sh->sessions, sh->sessionArrayLength, 0);
GNUNET_mutex_unlock (sh->lock);
GNUNET_mutex_destroy (sh->lock);
if (0 != CLOSE (sh->signal_pipe[1]))
GNUNET_GE_LOG_STRERROR (sh->ectx,
GNUNET_GE_ERROR | GNUNET_GE_USER | GNUNET_GE_ADMIN
| GNUNET_GE_BULK, "close");
if (0 != CLOSE (sh->signal_pipe[0]))
GNUNET_GE_LOG_STRERROR (sh->ectx,
GNUNET_GE_ERROR | GNUNET_GE_USER | GNUNET_GE_ADMIN
| GNUNET_GE_BULK, "close");
if (sh->listen_sock != NULL)
GNUNET_socket_destroy (sh->listen_sock);
GNUNET_free (sh);
}
/**
* Queue the given message with the select thread.
*
* @param mayBlock if GNUNET_YES, blocks this thread until message
* has been sent
* @param force message is important, queue even if
* there is not enough space
* @return GNUNET_OK if the message was sent or queued,
* GNUNET_NO if there was not enough memory to queue it,
* GNUNET_SYSERR if the sock does not belong with this select
*/
int
GNUNET_select_write (struct GNUNET_SelectHandle *sh,
struct GNUNET_SocketHandle *sock,
const GNUNET_MessageHeader * msg, int mayBlock,
int force)
{
Session *session;
int i;
unsigned short len;
char *newBuffer;
unsigned int newBufferSize;
int do_sig;
#if DEBUG_SELECT
GNUNET_GE_LOG (sh->ectx,
GNUNET_GE_DEBUG | GNUNET_GE_DEVELOPER | GNUNET_GE_BULK,
"Adding message of size %u to %p of select %p\n",
ntohs (msg->size), sock, sh);
#endif
session = NULL;
len = ntohs (msg->size);
GNUNET_mutex_lock (sh->lock);
for (i = 0; i < sh->sessionCount; i++)
if (sh->sessions[i]->sock == sock)
{
session = sh->sessions[i];
break;
}
if (session == NULL)
{
GNUNET_mutex_unlock (sh->lock);
return GNUNET_SYSERR;
}
GNUNET_GE_ASSERT (NULL, session->wapos >= session->wspos);
if ((force == GNUNET_NO) &&
(((sh->memory_quota > 0) &&
(session->wapos - session->wspos + len > sh->memory_quota))))
{
/* not enough free space, not allowed to grow that much */
GNUNET_mutex_unlock (sh->lock);
return GNUNET_NO;
}
if (session->wspos == session->wapos)
do_sig = GNUNET_YES;
else
do_sig = GNUNET_NO;
if (session->wsize - session->wapos < len)
{
/* need to make space in some way or other */
if (session->wapos - session->wspos + len <= session->wsize)
{
/* can compact buffer to get space */
memmove (session->wbuff,
&session->wbuff[session->wspos],
session->wapos - session->wspos);
session->wapos -= session->wspos;
session->wspos = 0;
}
else
{
/* need to grow buffer */
newBufferSize = session->wsize;
if (session->wsize == 0)
newBufferSize = 4092;
while (newBufferSize < len + session->wapos - session->wspos)
newBufferSize *= 2;
if ((sh->memory_quota > 0) &&
(newBufferSize > sh->memory_quota) && (force == GNUNET_NO))
newBufferSize = sh->memory_quota;
if (newBufferSize > GNUNET_MAX_GNUNET_malloc_CHECKED)
{
/* not enough free space, not allowed to grow that much,
even with forcing! */
GNUNET_mutex_unlock (sh->lock);
return GNUNET_NO;
}
GNUNET_GE_ASSERT (NULL,
newBufferSize >=
len + session->wapos - session->wspos);
if (newBufferSize != session->wsize)
{
newBuffer = GNUNET_malloc (newBufferSize);
memcpy (newBuffer,
&session->wbuff[session->wspos],
session->wapos - session->wspos);
GNUNET_free_non_null (session->wbuff);
session->wbuff = newBuffer;
}
else
{
if (session->wspos != 0)
memmove (session->wbuff,
&session->wbuff[session->wspos],
session->wapos - session->wspos);
}
session->wsize = newBufferSize;
session->wapos = session->wapos - session->wspos;
session->wspos = 0;
}
}
GNUNET_GE_ASSERT (NULL, session->wapos + len <= session->wsize);
memcpy (&session->wbuff[session->wapos], msg, len);
session->wapos += len;
if (mayBlock)
session->no_read = GNUNET_YES;
GNUNET_mutex_unlock (sh->lock);
if (do_sig)
signalSelect (sh);
return GNUNET_OK;
}
/**
*/
int
GNUNET_select_update_closure (struct GNUNET_SelectHandle *sh,
struct GNUNET_SocketHandle *sock,
void *old_sock_ctx, void *new_sock_ctx)
{
Session *session;
int i;
session = NULL;
GNUNET_mutex_lock (sh->lock);
for (i = 0; i < sh->sessionCount; i++)
if (sh->sessions[i]->sock == sock)
{
session = sh->sessions[i];
break;
}
if (session == NULL)
{
GNUNET_mutex_unlock (sh->lock);
return GNUNET_SYSERR;
}
GNUNET_GE_ASSERT (NULL, session->sock_ctx == old_sock_ctx);
session->sock_ctx = new_sock_ctx;
GNUNET_mutex_unlock (sh->lock);
return GNUNET_OK;
}
/**
* Add another (already connected) socket to the set of
* sockets managed by the select.
*/
int
GNUNET_select_connect (struct GNUNET_SelectHandle *sh,
struct GNUNET_SocketHandle *sock, void *sock_ctx)
{
Session *session;
#if DEBUG_SELECT
GNUNET_GE_LOG (sh->ectx,
GNUNET_GE_DEBUG | GNUNET_GE_DEVELOPER | GNUNET_GE_BULK,
"Adding connection %p to selector %p\n", sock, sh);
#endif
session = GNUNET_malloc (sizeof (Session));
memset (session, 0, sizeof (Session));
session->sock = sock;
session->sock_ctx = sock_ctx;
session->lastUse = GNUNET_get_time ();
GNUNET_mutex_lock (sh->lock);
if (sh->sessionArrayLength == sh->sessionCount)
GNUNET_array_grow (sh->sessions, sh->sessionArrayLength,
sh->sessionArrayLength + 4);
sh->sessions[sh->sessionCount++] = session;
sh->socket_quota--;
GNUNET_mutex_unlock (sh->lock);
signalSelect (sh);
return GNUNET_OK;
}
static Session *
findSession (struct GNUNET_SelectHandle *sh, struct GNUNET_SocketHandle *sock)
{
int i;
for (i = 0; i < sh->sessionCount; i++)
if (sh->sessions[i]->sock == sock)
return sh->sessions[i];
return NULL;
}
/**
* Close the associated socket and remove it from the
* set of sockets managed by select.
*/
int
GNUNET_select_disconnect (struct GNUNET_SelectHandle *sh,
struct GNUNET_SocketHandle *sock)
{
Session *session;
#if DEBUG_SELECT
GNUNET_GE_LOG (sh->ectx,
GNUNET_GE_DEBUG | GNUNET_GE_DEVELOPER | GNUNET_GE_BULK,
"Removing connection %p from selector %p\n", sock, sh);
#endif
GNUNET_mutex_lock (sh->lock);
session = findSession (sh, sock);
if (session == NULL)
{
GNUNET_mutex_unlock (sh->lock);
return GNUNET_SYSERR;
}
destroySession (sh, session);
GNUNET_mutex_unlock (sh->lock);
signalSelect (sh);
return GNUNET_OK;
}
/**
* Change the timeout for this socket to a custom
* value. Use 0 to use the default timeout for
* this select.
*/
int
GNUNET_select_change_timeout (struct GNUNET_SelectHandle *sh,
struct GNUNET_SocketHandle *sock,
GNUNET_CronTime timeout)
{
Session *session;
GNUNET_mutex_lock (sh->lock);
session = findSession (sh, sock);
if (session == NULL)
{
GNUNET_mutex_unlock (sh->lock);
return GNUNET_SYSERR;
}
session->timeout = timeout;
GNUNET_mutex_unlock (sh->lock);
return GNUNET_OK;
}
/**
* Would select queue or send the given message at this time?
*
* @param mayBlock if GNUNET_YES, blocks this thread until message
* has been sent
* @param size size of the message
* @param force message is important, queue even if
* there is not enough space
* @return GNUNET_OK if the message would be sent or queued,
* GNUNET_NO if there was not enough memory to queue it,
* GNUNET_SYSERR if the sock does not belong with this select
*/
int
GNUNET_select_test_write_now (struct GNUNET_SelectHandle *sh,
struct GNUNET_SocketHandle *sock,
unsigned int size, int mayBlock, int force)
{
Session *session;
GNUNET_mutex_lock (sh->lock);
session = findSession (sh, sock);
if (session == NULL)
{
GNUNET_mutex_unlock (sh->lock);
return GNUNET_SYSERR;
}
GNUNET_GE_ASSERT (NULL, session->wapos >= session->wspos);
if ((sh->memory_quota > 0) &&
(session->wapos - session->wspos + size > sh->memory_quota) &&
(force == GNUNET_NO))
{
/* not enough free space, not allowed to grow that much */
GNUNET_mutex_unlock (sh->lock);
return GNUNET_NO;
}
GNUNET_mutex_unlock (sh->lock);
return GNUNET_YES;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -