⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 querymanager.c

📁 GNUnet是一个安全的点对点网络框架
💻 C
📖 第 1 页 / 共 2 页
字号:
  GNUNET_FS_PLAN_success (sender, client, 0, rl);

  if (rl->type == GNUNET_ECRS_BLOCKTYPE_DATA)
    return GNUNET_OK;           /* the end */

  /* update bloom filter */
  rl->bloomfilter_entry_count++;
  bf_size = compute_bloomfilter_size (rl->bloomfilter_entry_count);
  if (rl->bloomfilter == NULL)
    {
      rl->bloomfilter_mutator
        = GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, -1);
      rl->bloomfilter_size = bf_size;
      rl->bloomfilter = GNUNET_bloomfilter_init (NULL,
                                                 NULL,
                                                 rl->bloomfilter_size,
                                                 GNUNET_GAP_BLOOMFILTER_K);
      if (stats != NULL)
        stats->change (stat_gap_client_bf_updates, 1);
    }
  else if (rl->bloomfilter_size != bf_size)
    {
      rl->bloomfilter_mutator
        = GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, -1);
      ic.pos = rl->responses;
      ic.mingle_number = rl->bloomfilter_mutator;
      GNUNET_bloomfilter_resize (rl->bloomfilter,
                                 &response_bf_iterator,
                                 &ic, bf_size, GNUNET_GAP_BLOOMFILTER_K);
      if (stats != NULL)
        stats->change (stat_gap_client_bf_updates, 1);
    }
  GNUNET_FS_SHARED_mark_response_seen (rl, &hc);

  /* we want more */
  return GNUNET_NO;
}

/**
 * Handle the given response (by forwarding it to
 * other peers as necessary).
 *
 * @param sender who send the response (good too know
 *        for future routing decisions)
 * @param primary_query hash code used for lookup
 *        (note that namespace membership may
 *        require additional verification that has
 *        not yet been performed; checking the
 *        signature has already been done)
 * @param size size of the data
 * @param data the data itself (a GNUNET_EC_DBlock)
 * @return how much was this content worth to us?
 */
unsigned int
GNUNET_FS_QUERYMANAGER_handle_response (const GNUNET_PeerIdentity * sender,
                                        const GNUNET_HashCode * primary_query,
                                        GNUNET_CronTime expirationTime,
                                        unsigned int size,
                                        const GNUNET_EC_DBlock * data)
{
  struct ClientDataList *cl;
  struct RequestList *rl;
  struct RequestList *prev;
  unsigned int value;
  PID_INDEX rid;

  rid = GNUNET_FS_PT_intern (sender);
  GNUNET_mutex_lock (GNUNET_FS_lock);
  value = 0;
  cl = clients;
  while (cl != NULL)
    {
      rl = cl->requests;
      prev = NULL;
      while (rl != NULL)
        {
          if (GNUNET_OK ==
              handle_response (rid,
                               cl->client,
                               rl,
                               primary_query,
                               expirationTime, size, data, &value))
            {
              if (prev != NULL)
                prev->next = rl->next;
              else
                cl->requests = rl->next;
              if (rl == cl->request_tail)
                cl->request_tail = prev;
              GNUNET_FS_SHARED_free_request_list (rl);
              if (stats != NULL)
                stats->change (stat_gap_client_query_tracked, -1);
              if (prev == NULL)
                rl = cl->requests;
              else
                rl = prev->next;
            }
          else
            {
              prev = rl;
              rl = rl->next;
            }
        }
      cl = cl->next;
    }

  GNUNET_mutex_unlock (GNUNET_FS_lock);
  GNUNET_FS_PT_change_rc (rid, -1);
  return value;
}

/**
 * Method called whenever a given client disconnects.
 * Frees all of the associated data structures.
 */
static void
handle_client_exit (struct GNUNET_ClientHandle *client)
{
  struct ClientDataList *cl;
  struct ClientDataList *prev;
  struct RequestList *rl;

  GNUNET_mutex_lock (GNUNET_FS_lock);
  cl = clients;
  prev = NULL;
  while ((cl != NULL) && (cl->client != client))
    {
      prev = cl;
      cl = cl->next;
    }
  if (cl == clients_tail)
    clients_tail = prev;
  if (cl != NULL)
    {
      while (cl->requests != NULL)
        {
          rl = cl->requests;
          cl->requests = rl->next;
          GNUNET_FS_SHARED_free_request_list (rl);
          if (stats != NULL)
            stats->change (stat_gap_client_query_tracked, -1);
        }
      if (prev == NULL)
        clients = cl->next;
      else
        prev->next = cl->next;
      GNUNET_free (cl);
    }
  GNUNET_mutex_unlock (GNUNET_FS_lock);
}


struct HMClosure
{
  struct RequestList *request;
  unsigned int processed;
  int have_more;
};

/**
 * Any response that we get should be passed
 * back to the client.  If the response is unique,
 * we should about the iteration (return GNUNET_SYSERR).
 */
static int
have_more_processor (const GNUNET_HashCode * key,
                     const GNUNET_DatastoreValue *
                     value, void *closure, unsigned long long uid)
{
  struct HMClosure *cls = closure;
  GNUNET_HashCode hc;
  int ret;

  ret = GNUNET_FS_HELPER_send_to_client (coreAPI,
                                         key, value,
                                         cls->request->response_client,
                                         cls->request, &hc);
  if (ret != GNUNET_OK)
    {
      /* client can take no more right now */
      cls->have_more = GNUNET_YES;
      return ret;               /* NO: delete, SYSERR: abort */
    }
  GNUNET_FS_SHARED_mark_response_seen (cls->request, &hc);
  cls->processed++;
  if (cls->processed > GNUNET_GAP_MAX_ASYNC_PROCESSED)
    {
      cls->have_more = GNUNET_YES;
      return GNUNET_SYSERR;
    }
  return GNUNET_OK;
}


/**
 * Cron-job to periodically check if we should
 * repeat requests.
 */
static void
repeat_requests_job (void *unused)
{
  struct HMClosure hmc;
  struct ClientDataList *client;
  struct RequestList *request;
  struct RequestList *prev;
  GNUNET_CronTime now;

  GNUNET_mutex_lock (GNUNET_FS_lock);
  if (clients == NULL)
    {
      GNUNET_mutex_unlock (GNUNET_FS_lock);
      return;
    }
  now = GNUNET_get_time ();
  client = clients;
  if (clients_tail != client)
    {
      /* move client to tail of list */
      GNUNET_GE_ASSERT (NULL, clients_tail->next == NULL);
      clients = clients->next;
      clients_tail->next = client;
      clients_tail = client;
      client->next = NULL;
    }
  request = client->requests;
  if (request == NULL)
    {
      GNUNET_mutex_unlock (GNUNET_FS_lock);
      return;
    }
  if (client->request_tail != request)
    {
      /* move request to tail of list */
      GNUNET_GE_ASSERT (NULL, client->request_tail->next == NULL);
      client->requests = request->next;
      client->request_tail->next = request;
      prev = client->request_tail;
      client->request_tail = request;
      request->next = NULL;
    }
  else
    {
      prev = NULL;
    }
  GNUNET_GE_ASSERT (NULL, request->next == NULL);
  GNUNET_GE_ASSERT (NULL, client->request_tail->next == NULL);
  if ((client->client != NULL) &&
      (GNUNET_OK !=
       coreAPI->cs_send_message_now_test (client->client,
                                          GNUNET_GAP_ESTIMATED_DATA_SIZE,
                                          GNUNET_NO)))
    {
      GNUNET_mutex_unlock (GNUNET_FS_lock);
      return;
    }
  if (request->have_more > 0)
    {
      request->have_more--;
      hmc.request = request;
      hmc.processed = 0;
      hmc.have_more = GNUNET_NO;

      if (request->type == GNUNET_ECRS_BLOCKTYPE_DATA)
        {
          if (((1 == datastore->get (&request->queries[0], request->type,
                                     &have_more_processor, &hmc)) ||
               (1 == datastore->get (&request->queries[0],
                                     GNUNET_ECRS_BLOCKTYPE_ONDEMAND,
                                     &have_more_processor, &hmc))) &&
              (hmc.have_more == GNUNET_NO))
            {
              if (prev == NULL)
                {
                  client->request_tail = NULL;
                  client->requests = NULL;
                }
              else
                {
                  prev->next = NULL;
                  if (client->request_tail == request)
                    client->request_tail = prev;
                }
              GNUNET_FS_SHARED_free_request_list (request);
              if (stats != NULL)
                stats->change (stat_gap_client_query_tracked, -1);
            }
        }
      else
        {
          datastore->get (&request->queries[0], request->type,
                          &have_more_processor, &hmc);
        }
      if (hmc.have_more)
        request->have_more += GNUNET_GAP_HAVE_MORE_INCREMENT;
    }
  else
    {
      if ((NULL == request->plan_entries) &&
          ((client->client != NULL) ||
           (request->expiration > now)) &&
          (request->last_ttl_used * GNUNET_CRON_SECONDS +
           request->last_request_time < now))
        {
          if ((GNUNET_OK ==
               GNUNET_FS_PLAN_request (client->client, 0, request))
              && (stats != NULL))
            stats->change (stat_gap_client_query_injected, 1);
        }

      if ((request->anonymityLevel == 0) &&
          (request->last_dht_get + request->dht_back_off < now))
        {
          if (request->dht_back_off * 2 > request->dht_back_off)
            request->dht_back_off *= 2;
          request->last_dht_get = now;
          GNUNET_FS_DHT_execute_query (request->type, &request->queries[0]);
        }
    }
  GNUNET_mutex_unlock (GNUNET_FS_lock);
}

int
GNUNET_FS_QUERYMANAGER_init (GNUNET_CoreAPIForPlugins * capi)
{
  coreAPI = capi;
  GNUNET_GE_ASSERT (capi->ectx,
                    GNUNET_SYSERR !=
                    capi->cs_disconnect_handler_register
                    (&handle_client_exit));
  datastore = capi->service_request ("datastore");
  stats = capi->service_request ("stats");
  if (stats != NULL)
    {
      stat_gap_client_query_received =
        stats->create (gettext_noop ("# gap client queries received"));
      stat_gap_client_response_sent =
        stats->create (gettext_noop ("# gap replies sent to clients"));
      stat_gap_client_query_tracked =
        stats->create (gettext_noop ("# gap client requests tracked"));
      stat_gap_client_query_injected =
        stats->create (gettext_noop ("# gap client requests injected"));
      stat_gap_client_bf_updates =
        stats->create (gettext_noop
                       ("# gap query bloomfilter resizing updates"));
    }
  GNUNET_cron_add_job (capi->cron,
                       &repeat_requests_job,
                       CHECK_REPEAT_FREQUENCY, CHECK_REPEAT_FREQUENCY, NULL);
  return 0;
}

int
GNUNET_FS_QUERYMANAGER_done ()
{
  GNUNET_cron_del_job (coreAPI->cron,
                       &repeat_requests_job, CHECK_REPEAT_FREQUENCY, NULL);
  GNUNET_GE_ASSERT (coreAPI->ectx,
                    GNUNET_SYSERR !=
                    coreAPI->cs_disconnect_handler_unregister
                    (&handle_client_exit));
  while (clients != NULL)
    handle_client_exit (clients->client);
  coreAPI->service_release (datastore);
  datastore = NULL;
  if (stats != NULL)
    {
      coreAPI->service_release (stats);
      stats = NULL;
    }
  return 0;
}

/* end of querymanager.c */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -