⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 select.c

📁 GNUnet是一个安全的点对点网络框架
💻 C
📖 第 1 页 / 共 3 页
字号:
}

/**
 * Start a select thread that will accept connections
 * from the given socket and pass messages read to the
 * given message handler.
 *
 * @param sock the listen socket
 * @param max_addr_len maximum expected length of addresses for
 *        connections accepted on the given socket
 * @param mon maybe NULL
 * @param memory_quota amount of memory available for
 *        queueing messages (in bytes)
 * @return NULL on error
 */
SelectHandle *
GNUNET_select_create (const char *description,
                      int is_udp,
                      struct GNUNET_GE_Context * ectx,
                      struct GNUNET_LoadMonitor * mon,
                      int sock,
                      unsigned int max_addr_len,
                      GNUNET_CronTime timeout,
                      GNUNET_SelectMessageHandler mh,
                      void *mh_cls,
                      GNUNET_SelectAcceptHandler ah,
                      void *ah_cls,
                      GNUNET_SelectCloseHandler ch,
                      void *ch_cls, unsigned int memory_quota,
                      int socket_quota)
{
  SelectHandle *sh;

  if ((is_udp == GNUNET_NO) && (sock != -1) && (0 != LISTEN (sock, 5)))
    {
      GNUNET_GE_LOG_STRERROR (ectx,
                              GNUNET_GE_ERROR | GNUNET_GE_USER |
                              GNUNET_GE_IMMEDIATE, "listen");
      return NULL;
    }
  GNUNET_GE_ASSERT (ectx, description != NULL);
  sh = GNUNET_malloc (sizeof (SelectHandle));
  memset (sh, 0, sizeof (SelectHandle));
  sh->is_udp = is_udp;
  sh->description = description;
  if (0 != PIPE (sh->signal_pipe))
    {
      GNUNET_GE_LOG_STRERROR (ectx,
                              GNUNET_GE_ERROR | GNUNET_GE_USER |
                              GNUNET_GE_IMMEDIATE, "pipe");
      GNUNET_free (sh);
      return NULL;
    }
  if ((GNUNET_OK !=
       GNUNET_pipe_make_nonblocking (sh->ectx, sh->signal_pipe[0])) ||
      (GNUNET_OK !=
       GNUNET_pipe_make_nonblocking (sh->ectx, sh->signal_pipe[1])))
    {
      if ((0 != CLOSE (sh->signal_pipe[0])) ||
          (0 != CLOSE (sh->signal_pipe[1])))
        GNUNET_GE_LOG_STRERROR (ectx,
                                GNUNET_GE_ERROR | GNUNET_GE_IMMEDIATE |
                                GNUNET_GE_ADMIN, "close");
      GNUNET_free (sh);
      return NULL;
    }

  sh->shutdown = GNUNET_NO;
  sh->ectx = ectx;
  sh->load_monitor = mon;
  sh->max_addr_len = max_addr_len;
  sh->mh = mh;
  sh->mh_cls = mh_cls;
  sh->ah = ah;
  sh->ah_cls = ah_cls;
  sh->ch = ch;
  sh->ch_cls = ch_cls;
  sh->memory_quota = memory_quota;
  sh->socket_quota = socket_quota;
  sh->timeout = timeout;
  sh->lock = GNUNET_mutex_create (GNUNET_YES);
  if (sock != -1)
    sh->listen_sock = GNUNET_socket_create (ectx, mon, sock);
  else
    sh->listen_sock = NULL;
  sh->thread = GNUNET_thread_create (&selectThread, sh, 256 * 1024);
  if (sh->thread == NULL)
    {
      GNUNET_GE_LOG_STRERROR (ectx,
                              GNUNET_GE_ERROR | GNUNET_GE_IMMEDIATE |
                              GNUNET_GE_ADMIN, "pthread_create");
      if (sh->listen_sock != NULL)
        GNUNET_socket_destroy (sh->listen_sock);
      if ((0 != CLOSE (sh->signal_pipe[0])) ||
          (0 != CLOSE (sh->signal_pipe[1])))
        GNUNET_GE_LOG_STRERROR (ectx,
                                GNUNET_GE_ERROR | GNUNET_GE_IMMEDIATE |
                                GNUNET_GE_ADMIN, "close");
      GNUNET_mutex_destroy (sh->lock);
      GNUNET_free (sh);
      return NULL;
    }
  return sh;
}

/**
 * Terminate the select thread, close the socket and
 * all associated connections.
 */
void
GNUNET_select_destroy (struct GNUNET_SelectHandle *sh)
{
  void *unused;

#if DEBUG_SELECT
  GNUNET_GE_LOG (sh->ectx,
                 GNUNET_GE_DEBUG | GNUNET_GE_DEVELOPER | GNUNET_GE_BULK,
                 "Destroying select %p\n", sh);
#endif
  sh->shutdown = GNUNET_YES;
  signalSelect (sh);
  GNUNET_thread_stop_sleep (sh->thread);
  GNUNET_thread_join (sh->thread, &unused);
  sh->thread = NULL;
  GNUNET_mutex_lock (sh->lock);
  while (sh->sessionCount > 0)
    destroySession (sh, sh->sessions[0]);
  GNUNET_array_grow (sh->sessions, sh->sessionArrayLength, 0);
  GNUNET_mutex_unlock (sh->lock);
  GNUNET_mutex_destroy (sh->lock);
  if (0 != CLOSE (sh->signal_pipe[1]))
    GNUNET_GE_LOG_STRERROR (sh->ectx,
                            GNUNET_GE_ERROR | GNUNET_GE_USER | GNUNET_GE_ADMIN
                            | GNUNET_GE_BULK, "close");
  if (0 != CLOSE (sh->signal_pipe[0]))
    GNUNET_GE_LOG_STRERROR (sh->ectx,
                            GNUNET_GE_ERROR | GNUNET_GE_USER | GNUNET_GE_ADMIN
                            | GNUNET_GE_BULK, "close");
  if (sh->listen_sock != NULL)
    GNUNET_socket_destroy (sh->listen_sock);
  GNUNET_free (sh);
}

/**
 * Queue the given message with the select thread.
 *
 * @param mayBlock if GNUNET_YES, blocks this thread until message
 *        has been sent
 * @param force message is important, queue even if
 *        there is not enough space
 * @return GNUNET_OK if the message was sent or queued,
 *         GNUNET_NO if there was not enough memory to queue it,
 *         GNUNET_SYSERR if the sock does not belong with this select
 */
int
GNUNET_select_write (struct GNUNET_SelectHandle *sh,
                     struct GNUNET_SocketHandle *sock,
                     const GNUNET_MessageHeader * msg, int mayBlock,
                     int force)
{
  Session *session;
  int i;
  unsigned short len;
  char *newBuffer;
  unsigned int newBufferSize;
  int do_sig;

#if DEBUG_SELECT
  GNUNET_GE_LOG (sh->ectx,
                 GNUNET_GE_DEBUG | GNUNET_GE_DEVELOPER | GNUNET_GE_BULK,
                 "Adding message of size %u to %p of select %p\n",
                 ntohs (msg->size), sock, sh);
#endif
  session = NULL;
  len = ntohs (msg->size);
  GNUNET_mutex_lock (sh->lock);
  for (i = 0; i < sh->sessionCount; i++)
    if (sh->sessions[i]->sock == sock)
      {
        session = sh->sessions[i];
        break;
      }
  if (session == NULL)
    {
      GNUNET_mutex_unlock (sh->lock);
      return GNUNET_SYSERR;
    }
  GNUNET_GE_ASSERT (NULL, session->wapos >= session->wspos);
  if ((force == GNUNET_NO) &&
      (((sh->memory_quota > 0) &&
        (session->wapos - session->wspos + len > sh->memory_quota))))
    {
      /* not enough free space, not allowed to grow that much */
      GNUNET_mutex_unlock (sh->lock);
      return GNUNET_NO;
    }
  if (session->wspos == session->wapos)
    do_sig = GNUNET_YES;
  else
    do_sig = GNUNET_NO;
  if (session->wsize - session->wapos < len)
    {
      /* need to make space in some way or other */
      if (session->wapos - session->wspos + len <= session->wsize)
        {
          /* can compact buffer to get space */
          memmove (session->wbuff,
                   &session->wbuff[session->wspos],
                   session->wapos - session->wspos);
          session->wapos -= session->wspos;
          session->wspos = 0;
        }
      else
        {
          /* need to grow buffer */
          newBufferSize = session->wsize;
          if (session->wsize == 0)
            newBufferSize = 4092;
          while (newBufferSize < len + session->wapos - session->wspos)
            newBufferSize *= 2;
          if ((sh->memory_quota > 0) &&
              (newBufferSize > sh->memory_quota) && (force == GNUNET_NO))
            newBufferSize = sh->memory_quota;
          if (newBufferSize > GNUNET_MAX_GNUNET_malloc_CHECKED)
            {
              /* not enough free space, not allowed to grow that much,
                 even with forcing! */
              GNUNET_mutex_unlock (sh->lock);
              return GNUNET_NO;
            }
          GNUNET_GE_ASSERT (NULL,
                            newBufferSize >=
                            len + session->wapos - session->wspos);
          if (newBufferSize != session->wsize)
            {
              newBuffer = GNUNET_malloc (newBufferSize);
              memcpy (newBuffer,
                      &session->wbuff[session->wspos],
                      session->wapos - session->wspos);
              GNUNET_free_non_null (session->wbuff);
              session->wbuff = newBuffer;
            }
          else
            {
              if (session->wspos != 0)
                memmove (session->wbuff,
                         &session->wbuff[session->wspos],
                         session->wapos - session->wspos);
            }
          session->wsize = newBufferSize;
          session->wapos = session->wapos - session->wspos;
          session->wspos = 0;
        }
    }
  GNUNET_GE_ASSERT (NULL, session->wapos + len <= session->wsize);
  memcpy (&session->wbuff[session->wapos], msg, len);
  session->wapos += len;
  if (mayBlock)
    session->no_read = GNUNET_YES;
  GNUNET_mutex_unlock (sh->lock);
  if (do_sig)
    signalSelect (sh);
  return GNUNET_OK;
}


/**
 */
int
GNUNET_select_update_closure (struct GNUNET_SelectHandle *sh,
                              struct GNUNET_SocketHandle *sock,
                              void *old_sock_ctx, void *new_sock_ctx)
{
  Session *session;
  int i;

  session = NULL;
  GNUNET_mutex_lock (sh->lock);
  for (i = 0; i < sh->sessionCount; i++)
    if (sh->sessions[i]->sock == sock)
      {
        session = sh->sessions[i];
        break;
      }
  if (session == NULL)
    {
      GNUNET_mutex_unlock (sh->lock);
      return GNUNET_SYSERR;
    }
  GNUNET_GE_ASSERT (NULL, session->sock_ctx == old_sock_ctx);
  session->sock_ctx = new_sock_ctx;
  GNUNET_mutex_unlock (sh->lock);
  return GNUNET_OK;
}

/**
 * Add another (already connected) socket to the set of
 * sockets managed by the select.
 */
int
GNUNET_select_connect (struct GNUNET_SelectHandle *sh,
                       struct GNUNET_SocketHandle *sock, void *sock_ctx)
{
  Session *session;

#if DEBUG_SELECT
  GNUNET_GE_LOG (sh->ectx,
                 GNUNET_GE_DEBUG | GNUNET_GE_DEVELOPER | GNUNET_GE_BULK,
                 "Adding connection %p to selector %p\n", sock, sh);
#endif
  session = GNUNET_malloc (sizeof (Session));
  memset (session, 0, sizeof (Session));
  session->sock = sock;
  session->sock_ctx = sock_ctx;
  session->lastUse = GNUNET_get_time ();
  GNUNET_mutex_lock (sh->lock);
  if (sh->sessionArrayLength == sh->sessionCount)
    GNUNET_array_grow (sh->sessions, sh->sessionArrayLength,
                       sh->sessionArrayLength + 4);
  sh->sessions[sh->sessionCount++] = session;
  sh->socket_quota--;
  GNUNET_mutex_unlock (sh->lock);
  signalSelect (sh);
  return GNUNET_OK;
}

static Session *
findSession (struct GNUNET_SelectHandle *sh, struct GNUNET_SocketHandle *sock)
{
  int i;

  for (i = 0; i < sh->sessionCount; i++)
    if (sh->sessions[i]->sock == sock)
      return sh->sessions[i];
  return NULL;
}

/**
 * Close the associated socket and remove it from the
 * set of sockets managed by select.
 */
int
GNUNET_select_disconnect (struct GNUNET_SelectHandle *sh,
                          struct GNUNET_SocketHandle *sock)
{
  Session *session;

#if DEBUG_SELECT
  GNUNET_GE_LOG (sh->ectx,
                 GNUNET_GE_DEBUG | GNUNET_GE_DEVELOPER | GNUNET_GE_BULK,
                 "Removing connection %p from selector %p\n", sock, sh);
#endif
  GNUNET_mutex_lock (sh->lock);
  session = findSession (sh, sock);
  if (session == NULL)
    {
      GNUNET_mutex_unlock (sh->lock);
      return GNUNET_SYSERR;
    }
  destroySession (sh, session);
  GNUNET_mutex_unlock (sh->lock);
  signalSelect (sh);
  return GNUNET_OK;
}

/**
 * Change the timeout for this socket to a custom
 * value.  Use 0 to use the default timeout for
 * this select.
 */
int
GNUNET_select_change_timeout (struct GNUNET_SelectHandle *sh,
                              struct GNUNET_SocketHandle *sock,
                              GNUNET_CronTime timeout)
{
  Session *session;

  GNUNET_mutex_lock (sh->lock);
  session = findSession (sh, sock);
  if (session == NULL)
    {
      GNUNET_mutex_unlock (sh->lock);
      return GNUNET_SYSERR;
    }
  session->timeout = timeout;
  GNUNET_mutex_unlock (sh->lock);
  return GNUNET_OK;
}


/**
 * Would select queue or send the given message at this time?
 *
 * @param mayBlock if GNUNET_YES, blocks this thread until message
 *        has been sent
 * @param size size of the message
 * @param force message is important, queue even if
 *        there is not enough space
 * @return GNUNET_OK if the message would be sent or queued,
 *         GNUNET_NO if there was not enough memory to queue it,
 *         GNUNET_SYSERR if the sock does not belong with this select
 */
int
GNUNET_select_test_write_now (struct GNUNET_SelectHandle *sh,
                              struct GNUNET_SocketHandle *sock,
                              unsigned int size, int mayBlock, int force)
{
  Session *session;

  GNUNET_mutex_lock (sh->lock);
  session = findSession (sh, sock);
  if (session == NULL)
    {
      GNUNET_mutex_unlock (sh->lock);
      return GNUNET_SYSERR;
    }
  GNUNET_GE_ASSERT (NULL, session->wapos >= session->wspos);
  if ((sh->memory_quota > 0) &&
      (session->wapos - session->wspos + size > sh->memory_quota) &&
      (force == GNUNET_NO))
    {
      /* not enough free space, not allowed to grow that much */
      GNUNET_mutex_unlock (sh->lock);
      return GNUNET_NO;
    }
  GNUNET_mutex_unlock (sh->lock);
  return GNUNET_YES;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -