⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 select.c

📁 GNUnet是一个安全的点对点网络框架
💻 C
📖 第 1 页 / 共 3 页
字号:
    }
  session->lastUse = GNUNET_get_time ();
  return GNUNET_OK;
}

/**
 * Thread that selects until it is signaled to shut down.
 */
static void *
selectThread (void *ctx)
{
  struct GNUNET_SelectHandle *sh = ctx;
  GNUNET_CronTime now;
  GNUNET_CronTime timeout;
  char *clientAddr;
  fd_set readSet;
  fd_set errorSet;
  fd_set writeSet;
  struct stat buf;
  socklen_t lenOfIncomingAddr;
  int i;
  int max;
  int ret;
  int s;
  void *sctx;
  SocketHandle *sock;
  Session *session;
  size_t size;
  int old_errno;
  struct timeval tv;

  if (sh->max_addr_len != 0)
    clientAddr = GNUNET_malloc (sh->max_addr_len);
  else
    clientAddr = NULL;
  GNUNET_mutex_lock (sh->lock);
  while (sh->shutdown == GNUNET_NO)
    {
      FD_ZERO (&readSet);
      FD_ZERO (&errorSet);
      FD_ZERO (&writeSet);
      if (sh->signal_pipe[0] != -1)
        {
          if (-1 == FSTAT (sh->signal_pipe[0], &buf))
            {
              GNUNET_GE_LOG_STRERROR (sh->ectx,
                                      GNUNET_GE_ERROR | GNUNET_GE_ADMIN |
                                      GNUNET_GE_USER | GNUNET_GE_BULK,
                                      "fstat");
              sh->signal_pipe[0] = -1;  /* prevent us from error'ing all the time */
            }
          else
            {
              FD_SET (sh->signal_pipe[0], &readSet);
            }
        }
      max = sh->signal_pipe[0];
      if (sh->listen_sock != NULL)
        {
          if (!GNUNET_socket_test_valid (sh->listen_sock))
            {
              GNUNET_socket_destroy (sh->listen_sock);
              GNUNET_GE_LOG (sh->ectx,
                             GNUNET_GE_USER | GNUNET_GE_ERROR |
                             GNUNET_GE_BULK,
                             _("select listen socket for `%s' not valid!\n"),
                             sh->description);
              sh->listen_sock = NULL;   /* prevent us from error'ing all the time */
            }
          else
            {
              add_to_select_set (sh->listen_sock, &readSet, &max);
            }
        }
      for (i = 0; i < sh->sessionCount; i++)
        {
          Session *session = sh->sessions[i];
          struct GNUNET_SocketHandle *sock = session->sock;

          if (!GNUNET_socket_test_valid (sock))
            {
#if DEBUG_SELECT
              GNUNET_GE_LOG (sh->ectx,
                             GNUNET_GE_DEBUG | GNUNET_GE_DEVELOPER |
                             GNUNET_GE_BULK,
                             "Select %p destroys invalid client handle %p\n",
                             sh, session);
#endif
              destroySession (sh, session);
            }
          else
            {
              add_to_select_set (sock, &errorSet, &max);
              if (session->no_read != GNUNET_YES)
                add_to_select_set (sock, &readSet, &max);
              GNUNET_GE_ASSERT (NULL, session->wapos >= session->wspos);
              if (session->wapos > session->wspos)
                add_to_select_set (sock, &writeSet, &max);      /* do we have a pending write request? */
            }
        }
      timeout = -1;
      now = GNUNET_get_time ();
      for (i = 0; i < sh->sessionCount; i++)
        {
          session = sh->sessions[i];
          if (session->timeout != 0)
            {
              if (now > session->lastUse + session->timeout)
                timeout = 0;
              else
                timeout =
                  GNUNET_MIN (timeout,
                              session->lastUse + session->timeout - now);
            }
        }
      GNUNET_mutex_unlock (sh->lock);
      tv.tv_sec = timeout / GNUNET_CRON_SECONDS;
      tv.tv_usec = (timeout % GNUNET_CRON_SECONDS) * 1000;
      ret =
        SELECT (max + 1, &readSet, &writeSet, &errorSet,
                (timeout == -1) ? NULL : &tv);
      old_errno = errno;
      GNUNET_mutex_lock (sh->lock);
      if ((ret == -1) && ((old_errno == EAGAIN) || (old_errno == EINTR)))
        continue;
      if (ret == -1)
        {
          errno = old_errno;
          if (errno == EBADF)
            {
              GNUNET_GE_LOG_STRERROR (sh->ectx,
                                      GNUNET_GE_DEBUG | GNUNET_GE_DEVELOPER |
                                      GNUNET_GE_BULK, "select");
            }
          else
            {
              GNUNET_GE_DIE_STRERROR (sh->ectx,
                                      GNUNET_GE_FATAL | GNUNET_GE_ADMIN |
                                      GNUNET_GE_USER | GNUNET_GE_IMMEDIATE,
                                      "select");
            }
          continue;
        }
      if (sh->is_udp == GNUNET_NO)
        {
          if ((sh->listen_sock != NULL) &&
              (FD_ISSET (sh->listen_sock->handle, &readSet)))
            {
              lenOfIncomingAddr = sh->max_addr_len;
              memset (clientAddr, 0, lenOfIncomingAddr);
              /* make sure this is non-blocking */
              GNUNET_socket_set_blocking (sh->listen_sock, GNUNET_NO);
              s = ACCEPT (sh->listen_sock->handle,
                          (struct sockaddr *) clientAddr, &lenOfIncomingAddr);
              if (s == -1)
                {
                  GNUNET_GE_LOG_STRERROR (sh->ectx,
                                          GNUNET_GE_WARNING | GNUNET_GE_ADMIN
                                          | GNUNET_GE_BULK, "accept");
                  GNUNET_GE_LOG (sh->ectx,
                                 GNUNET_GE_WARNING | GNUNET_GE_ADMIN |
                                 GNUNET_GE_BULK,
                                 "Select %s failed to accept!\n",
                                 sh->description);
                  if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
                    continue;   /* not good, but not fatal either */
                  break;
                }
              if (sh->socket_quota <= 0)
                {
                  SHUTDOWN (s, SHUT_WR);
                  if (0 != CLOSE (s))
                    GNUNET_GE_LOG_STRERROR (sh->ectx,
                                            GNUNET_GE_WARNING |
                                            GNUNET_GE_ADMIN | GNUNET_GE_BULK,
                                            "close");
                  s = -1;
                  continue;
                }
              sh->socket_quota--;
#if DEBUG_SELECT
              GNUNET_GE_LOG (sh->ectx,
                             GNUNET_GE_DEBUG | GNUNET_GE_DEVELOPER |
                             GNUNET_GE_BULK,
                             "Select %p is accepting connection: %d\n", sh,
                             s);
#endif
              sock = GNUNET_socket_create (sh->ectx, sh->load_monitor, s);
              GNUNET_mutex_unlock (sh->lock);
              sctx = sh->ah (sh->ah_cls,
                             sh, sock, clientAddr, lenOfIncomingAddr);
              GNUNET_mutex_lock (sh->lock);
#if DEBUG_SELECT
              GNUNET_GE_LOG (sh->ectx,
                             GNUNET_GE_DEBUG | GNUNET_GE_DEVELOPER |
                             GNUNET_GE_BULK,
                             "Select %p is accepting connection: %p\n", sh,
                             sctx);
#endif
              if (sctx == NULL)
                {
                  GNUNET_socket_destroy (sock);
                  sh->socket_quota++;
                }
              else
                {
                  session = GNUNET_malloc (sizeof (Session));
                  memset (session, 0, sizeof (Session));
                  session->timeout = sh->timeout;
                  session->sock = sock;
                  session->sock_ctx = sctx;
                  session->lastUse = GNUNET_get_time ();
                  if (sh->sessionArrayLength == sh->sessionCount)
                    GNUNET_array_grow (sh->sessions,
                                       sh->sessionArrayLength,
                                       sh->sessionArrayLength + 4);
                  sh->sessions[sh->sessionCount++] = session;
                }
            }
        }
      else
        {                       /* is_udp == GNUNET_YES */
          if ((sh->listen_sock != NULL) &&
              (FD_ISSET (sh->listen_sock->handle, &readSet)))
            {
              int pending;
              int udp_sock;
              int error;
              socklen_t optlen;

              udp_sock = sh->listen_sock->handle;
              lenOfIncomingAddr = sh->max_addr_len;
              memset (clientAddr, 0, lenOfIncomingAddr);
              pending = 0;
              optlen = sizeof (pending);
#ifdef OSX
              error = GETSOCKOPT (udp_sock,
                                  SOL_SOCKET, SO_NREAD, &pending, &optlen);
#elif MINGW
              error = ioctlsocket (udp_sock, FIONREAD, &pending);
#else
              error = ioctl (udp_sock, FIONREAD, &pending);
#endif
              if ((error != 0) || (optlen != sizeof (pending)))
                {
                  GNUNET_GE_LOG_STRERROR (sh->ectx,
                                          GNUNET_GE_ERROR | GNUNET_GE_ADMIN |
                                          GNUNET_GE_BULK, "ioctl");
                  pending = 65535;      /* max */
                }
#if DEBUG_SELECT
              GNUNET_GE_LOG (sh->ectx,
                             GNUNET_GE_DEBUG | GNUNET_GE_DEVELOPER |
                             GNUNET_GE_BULK,
                             "Select %p is preparing to receive %u bytes from UDP\n",
                             sh, pending);
#endif
              GNUNET_GE_ASSERT (sh->ectx, pending >= 0);
              if (pending >= 65536)
                pending = 65536;
              if (pending == 0)
                {
                  /* maybe empty UDP packet was sent (see report on bug-gnunet,
                     5/11/6; read 0 bytes from UDP just to kill potential empty packet! */
                  GNUNET_socket_recv_from (sh->listen_sock,
                                           GNUNET_NC_NONBLOCKING,
                                           NULL,
                                           0, &size, clientAddr,
                                           &lenOfIncomingAddr);
                }
              else
                {
                  char *msg;

                  msg = GNUNET_malloc (pending);
                  size = 0;
                  ret = GNUNET_socket_recv_from (sh->listen_sock,
                                                 GNUNET_NC_NONBLOCKING,
                                                 msg,
                                                 pending,
                                                 &size,
                                                 clientAddr,
                                                 &lenOfIncomingAddr);
                  if (ret == GNUNET_SYSERR)
                    {
                      GNUNET_socket_close (sh->listen_sock);
                    }
                  else if (ret == GNUNET_OK)
                    {
                      /* validate msg format! */
                      const GNUNET_MessageHeader *hdr;

                      /* if size < pending, set pending to size */
                      if (size < pending)
                        pending = size;
                      hdr = (const GNUNET_MessageHeader *) msg;
                      if ((size == pending) &&
                          (size >= sizeof (GNUNET_MessageHeader)) &&
                          (ntohs (hdr->size) == size))
                        {
                          void *sctx;

                          GNUNET_mutex_unlock (sh->lock);
                          sctx = sh->ah (sh->ah_cls,
                                         sh,
                                         NULL, clientAddr, lenOfIncomingAddr);
                          GNUNET_mutex_lock (sh->lock);
                          if (sctx != NULL)
                            {
#if DEBUG_SELECT
                              GNUNET_GE_LOG (sh->ectx,
                                             GNUNET_GE_DEBUG |
                                             GNUNET_GE_DEVELOPER |
                                             GNUNET_GE_BULK,
                                             "Select %p is passing %u bytes from UDP to handler\n",
                                             sh, size);
#endif
                              sh->mh (sh->mh_cls, sh, NULL, sctx, hdr);
                              sh->ch (sh->ch_cls, sh, NULL, sctx);
                            }
                          else
                            {
#if DEBUG_SELECT
                              GNUNET_GE_LOG (sh->ectx,
                                             GNUNET_GE_DEBUG |
                                             GNUNET_GE_DEVELOPER |
                                             GNUNET_GE_BULK,
                                             "Error in select %p -- connection refused\n",
                                             sh);
#endif
                            }
                        }
                      else
                        {
#if DEBUG_SELECT
                          GNUNET_GE_BREAK (sh->ectx, size == pending);
                          GNUNET_GE_BREAK (sh->ectx,
                                           size >=
                                           sizeof (GNUNET_MessageHeader));
                          GNUNET_GE_BREAK (sh->ectx,
                                           (size >=
                                            sizeof (GNUNET_MessageHeader))
                                           && (ntohs (hdr->size) == size));
#endif
                        }
                    }
                  GNUNET_free (msg);
                }
            }
        }                       /* end UDP processing */
      if (FD_ISSET (sh->signal_pipe[0], &readSet))
        {
          /* allow reading multiple signals in one go in case we get many
             in one shot... */
#define MAXSIG_BUF 128
          char buf[MAXSIG_BUF];
          /* just a signal to refresh sets, eat and continue */
          if (0 >= READ (sh->signal_pipe[0], buf, MAXSIG_BUF))
            {
              GNUNET_GE_LOG_STRERROR (sh->ectx,
                                      GNUNET_GE_WARNING | GNUNET_GE_USER |
                                      GNUNET_GE_BULK, "read");
            }
        }
      now = GNUNET_get_time ();
      for (i = 0; i < sh->sessionCount; i++)
        {
          session = sh->sessions[i];
          sock = session->sock;
          if ((FD_ISSET (sock->handle, &readSet)) &&
              (GNUNET_SYSERR == readAndProcess (sh, session)))
            {
              i--;
              continue;
            }
          if ((FD_ISSET (sock->handle, &writeSet)) &&
              (GNUNET_SYSERR == writeAndProcess (sh, session)))
            {
              i--;
              continue;
            }
          if (FD_ISSET (sock->handle, &errorSet))
            {
              destroySession (sh, session);
              i--;
              continue;
            }
          if ((session->timeout != 0) &&
              (now > session->lastUse + session->timeout))
            {
              destroySession (sh, session);
              i--;
              continue;
            }
        }
    }
  sh->description = "DEAD";
  GNUNET_mutex_unlock (sh->lock);
  GNUNET_free_non_null (clientAddr);
  return NULL;
}

int
GNUNET_pipe_make_nonblocking (struct GNUNET_GE_Context *ectx, int handle)
{
#if MINGW
  DWORD mode;

  mode = PIPE_NOWAIT;

  if (SetNamedPipeHandleState ((HANDLE) handle, &mode, NULL, NULL))
#if HAVE_PLIBC_FD
    plibc_fd_set_blocking (handle, 0);
#else
    __win_SetHandleBlockingMode (handle, 0);
#endif
  /* don't report errors because Win9x doesn't support SetNamedPipeHandleState() */
#else
  int flags = fcntl (handle, F_GETFL);
  flags |= O_NONBLOCK;
  if (-1 == fcntl (handle, F_SETFL, flags))
    {
      GNUNET_GE_LOG_STRERROR (ectx,
                              GNUNET_GE_WARNING | GNUNET_GE_USER |
                              GNUNET_GE_ADMIN | GNUNET_GE_IMMEDIATE, "fcntl");
      return GNUNET_SYSERR;
    }
#endif
  return GNUNET_OK;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -