⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 select.c

📁 GNUnet是一个安全的点对点网络框架
💻 C
📖 第 1 页 / 共 3 页
字号:
/*
     This file is part of GNUnet.
     (C) 2003, 2006, 2007 Christian Grothoff (and other contributing authors)

     GNUnet is free software; you can redistribute it and/or modify
     it under the terms of the GNU General Public License as published
     by the Free Software Foundation; either version 2, or (at your
     option) any later version.

     GNUnet is distributed in the hope that it will be useful, but
     WITHOUT ANY WARRANTY; without even the implied warranty of
     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
     General Public License for more details.

     You should have received a copy of the GNU General Public License
     along with GNUnet; see the file COPYING.  If not, write to the
     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
     Boston, MA 02111-1307, USA.
*/

/**
 * @file util/network/select.c
 * @brief (network) input/output operations
 * @author Christian Grothoff
 */

#include "platform.h"
#include "gnunet_util.h"
#include "network.h"

#define DEBUG_SELECT GNUNET_NO

/**
 * Select Session handle.
 */
typedef struct
{

  /**
   * the socket
   */
  struct GNUNET_SocketHandle *sock;

  /**
   * Client connection context.
   */
  void *sock_ctx;

  /**
   * The read buffer.
   */
  char *rbuff;

  /**
   * The write buffer.
   */
  char *wbuff;

  GNUNET_CronTime lastUse;

  /**
   * Set to 0 initially, set to a much lower value
   * if a "fast timeout" is desired.
   */
  GNUNET_CronTime timeout;

  /**
   * 0 : can be destroyed
   * 1 : if destruction is required, it must be delayed
   * -1: delayed destruction required
   * 2 : destruction in progress
   */
  int locked;

  /**
   * Do not read from this socket until the
   * current write is complete.
   */
  int no_read;

  /**
   * Current read position in the buffer.
   */
  unsigned int pos;

  /**
   * Current size of the read buffer.
   */
  unsigned int rsize;

  /**
   * Position in the write buffer (for sending)
   */
  unsigned int wspos;

  /**
   * Position in the write buffer (for appending)
   */
  unsigned int wapos;

  /**
   * Size of the write buffer
   */
  unsigned int wsize;

} Session;

typedef struct GNUNET_SelectHandle
{

  const char *description;

  /**
   * mutex for synchronized access
   */
  struct GNUNET_Mutex *lock;

  /**
   * one thread for listening for new connections,
   * and for reading on all open sockets
   */
  struct GNUNET_ThreadHandle *thread;

  /**
   * sock is the tcp socket that we listen on for new inbound
   * connections.  Maybe NULL if we are not listening.
   */
  struct GNUNET_SocketHandle *listen_sock;

  struct GNUNET_GE_Context *ectx;

  struct GNUNET_LoadMonitor *load_monitor;

  /**
   * Array of currently active TCP sessions.
   */
  Session **sessions;

  GNUNET_SelectMessageHandler mh;

  GNUNET_SelectAcceptHandler ah;

  GNUNET_SelectCloseHandler ch;

  void *mh_cls;

  void *ah_cls;

  void *ch_cls;

  GNUNET_CronTime timeout;

  /**
   * tcp_pipe is used to signal the thread that is
   * blocked in a select call that the set of sockets to listen
   * to has changed.
   */
  int signal_pipe[2];

  int is_udp;

  unsigned int sessionCount;

  unsigned int sessionArrayLength;

  int shutdown;

  unsigned int max_addr_len;

  unsigned int memory_quota;

  int socket_quota;

} SelectHandle;

static void
add_to_select_set (struct GNUNET_SocketHandle *s, fd_set * set, int *max)
{
  FD_SET (s->handle, set);
  if (*max < s->handle)
    *max = s->handle;
}

/**
 * Write to the pipe to wake up the select thread (the set of
 * files to watch has changed).
 */
static void
signalSelect (SelectHandle * sh)
{
  static char i = '\0';
  int ret;

#if DEBUG_SELECT
  GNUNET_GE_LOG (sh->ectx,
                 GNUNET_GE_DEBUG | GNUNET_GE_DEVELOPER | GNUNET_GE_BULK,
                 "Signaling select %p.\n", sh);
#endif
  ret = WRITE (sh->signal_pipe[1], &i, sizeof (char));
  if (ret != sizeof (char))
    GNUNET_GE_LOG_STRERROR (sh->ectx,
                            GNUNET_GE_ERROR | GNUNET_GE_ADMIN |
                            GNUNET_GE_BULK, "write");
}

/**
 * Destroy the given session by closing the socket,
 * releasing the buffers and removing it from the
 * select set.
 *
 * This function may only be called if the tcplock is
 * already held by the caller.
 */
static void
destroySession (SelectHandle * sh, Session * s)
{
  int i;

  if (s->locked == 1)
    {
      s->locked = -1;
      return;
    }
  if (s->locked == 2)
    return;                     /* already in process of destroying! */
  s->locked = 2;
#if DEBUG_SELECT
  GNUNET_GE_LOG (sh->ectx,
                 GNUNET_GE_DEBUG | GNUNET_GE_DEVELOPER | GNUNET_GE_BULK,
                 "Destroying session %p of select %p with %u in read and %u in write buffer.\n",
                 s, sh, s->rsize, s->wsize);
#endif
#if 0
  if ((s->pos > 0) || (s->wapos > s->wspos))
    fprintf (stderr,
             "Destroying session %p of select %p with loss of %u in read and %u in write buffer.\n",
             s, sh, s->pos, s->wapos - s->wspos);
#endif
  for (i = 0; i < sh->sessionCount; i++)
    {
      if (sh->sessions[i] == s)
        {
          sh->sessions[i] = sh->sessions[sh->sessionCount - 1];
          sh->sessionCount--;
          break;
        }
    }
  if (sh->sessionCount * 2 < sh->sessionArrayLength)
    GNUNET_array_grow (sh->sessions, sh->sessionArrayLength,
                       sh->sessionCount);
  GNUNET_mutex_unlock (sh->lock);
  sh->ch (sh->ch_cls, sh, s->sock, s->sock_ctx);
  GNUNET_mutex_lock (sh->lock);
  GNUNET_socket_destroy (s->sock);
  sh->socket_quota++;
  GNUNET_array_grow (s->rbuff, s->rsize, 0);
  GNUNET_array_grow (s->wbuff, s->wsize, 0);
  GNUNET_free (s);
}

/**
 * The socket of a session has data waiting, read and
 * process!
 *
 * This function may only be called if the lock is
 * already held by the caller.
 * @return GNUNET_OK for success, GNUNET_SYSERR if session was destroyed
 */
static int
readAndProcess (SelectHandle * sh, Session * session)
{
  const GNUNET_MessageHeader *pack;
  int ret;
  size_t recvd;
  unsigned short len;

  if (session->rsize == session->pos)
    {
      /* read buffer too small, grow */
      GNUNET_array_grow (session->rbuff, session->rsize,
                         session->rsize + 1024);
    }
  ret = GNUNET_socket_recv (session->sock,
                            GNUNET_NC_NONBLOCKING | GNUNET_NC_IGNORE_INT,
                            &session->rbuff[session->pos],
                            session->rsize - session->pos, &recvd);
#if DEBUG_SELECT
  GNUNET_GE_LOG (sh->ectx,
                 GNUNET_GE_DEBUG | GNUNET_GE_DEVELOPER | GNUNET_GE_BULK,
                 "Receiving from session %p of select %p return %d-%u (%s).\n",
                 sh, session, ret, recvd, STRERROR (errno));
#endif
  if (ret != GNUNET_OK)
    {
      destroySession (sh, session);
      return GNUNET_SYSERR;     /* other side closed connection */
    }
  session->pos += recvd;
  while ((sh->shutdown == GNUNET_NO)
         && (session->pos >= sizeof (GNUNET_MessageHeader)))
    {
      pack = (const GNUNET_MessageHeader *) &session->rbuff[0];
      len = ntohs (pack->size);
      /* check minimum size */
      if (len < sizeof (GNUNET_MessageHeader))
        {
          GNUNET_GE_LOG (sh->ectx,
                         GNUNET_GE_WARNING | GNUNET_GE_USER | GNUNET_GE_BULK,
                         _
                         ("Received malformed message (too small) from connection. Closing.\n"));
          destroySession (sh, session);
          return GNUNET_SYSERR;
        }
      if (len > session->rsize) /* if message larger than read buffer, grow! */
        GNUNET_array_grow (session->rbuff, session->rsize, len);

      /* do we have the entire message? */
      if (session->pos < len)
        break;                  /* wait for more */
      if (session->locked == 0)
        session->locked = 1;
      GNUNET_mutex_unlock (sh->lock);
      if (GNUNET_OK != sh->mh (sh->mh_cls,
                               sh, session->sock, session->sock_ctx, pack))
        {
          GNUNET_mutex_lock (sh->lock);
          if (session->locked == 1)
            session->locked = 0;
          destroySession (sh, session);
          return GNUNET_SYSERR;
        }
      GNUNET_mutex_lock (sh->lock);
      if (session->locked == -1)
        {
          session->locked = 0;
          destroySession (sh, session);
          return GNUNET_OK;
        }
      if (session->locked == 1)
        session->locked = 0;
      /* shrink buffer adequately */
      memmove (&session->rbuff[0], &session->rbuff[len], session->pos - len);
      session->pos -= len;
    }
  session->lastUse = GNUNET_get_time ();
  return GNUNET_OK;
}

/**
 * The socket of a session has data waiting that can be
 * transmitted, do it!
 *
 * This function may only be called if the lock is
 * already held by the caller.
 * @return GNUNET_OK for success, GNUNET_SYSERR if session was destroyed
 */
static int
writeAndProcess (SelectHandle * sh, Session * session)
{
  SocketHandle *sock;
  int ret;
  size_t size;

#if DEBUG_SELECT
  GNUNET_GE_LOG (sh->ectx,
                 GNUNET_GE_DEBUG | GNUNET_GE_DEVELOPER | GNUNET_GE_BULK,
                 "Write and process called for session %p of select %p status %d.\n",
                 sh, session, sh->shutdown);
#endif
  sock = session->sock;
  while (sh->shutdown == GNUNET_NO)
    {
      ret = GNUNET_socket_send (sock,
                                GNUNET_NC_NONBLOCKING,
                                &session->wbuff[session->wspos],
                                session->wapos - session->wspos, &size);
#if DEBUG_SELECT
      GNUNET_GE_LOG (sh->ectx,
                     GNUNET_GE_DEBUG | GNUNET_GE_DEVELOPER | GNUNET_GE_BULK,
                     "Sending %d bytes from session %p of select %s return %d.\n",
                     session->wapos - session->wspos, session,
                     sh->description, ret);
#endif
      if (ret == GNUNET_SYSERR)
        {
          if ((errno == EPIPE) || (errno == ECONNRESET))
            GNUNET_GE_LOG_STRERROR (sh->ectx,
                                    GNUNET_GE_DEBUG | GNUNET_GE_USER |
                                    GNUNET_GE_ADMIN | GNUNET_GE_BULK, "send");
          else
            GNUNET_GE_LOG_STRERROR (sh->ectx,
                                    GNUNET_GE_WARNING | GNUNET_GE_USER |
                                    GNUNET_GE_ADMIN | GNUNET_GE_BULK, "send");
          destroySession (sh, session);
          return GNUNET_SYSERR;
        }
      if (ret == GNUNET_OK)
        {
          if (size == 0)
            {
              /* send only returns 0 on error (happens if
                 other side closed connection), so close
                 the session */
              destroySession (sh, session);
              return GNUNET_SYSERR;
            }
          session->wspos += size;
          if (session->wspos == session->wapos)
            {
              /* free compaction! */
              session->wspos = 0;
              session->wapos = 0;
              session->no_read = GNUNET_NO;
              if (session->wsize > sh->memory_quota)
                {
                  /* if we went over quota before because of
                     force, use this opportunity to shrink
                     back to size! */
                  GNUNET_array_grow (session->wbuff, session->wsize,
                                     sh->memory_quota);
                }
            }
          break;
        }
      GNUNET_GE_ASSERT (sh->ectx, ret == GNUNET_NO);
      /* this should only happen under Win9x because
         of a bug in the socket implementation (KB177346).
         Let's sleep and try again. */
      GNUNET_thread_sleep (20 * GNUNET_CRON_MILLISECONDS);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -