📄 gap.c
字号:
/*
This file is part of GNUnet
(C) 2001, 2002, 2003, 2004, 2005, 2006, 2008 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
by the Free Software Foundation; either version 2, or (at your
option) any later version.
GNUnet is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with GNUnet; see the file COPYING. If not, write to the
Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
Boston, MA 02110-1301, USA.
*/
/**
* @file fs/gap/gap.c
* @brief protocol that performs anonymous routing
* @author Christian Grothoff
*/
#include "platform.h"
#include "gnunet_util.h"
#include "gnunet_protocols.h"
#include "gnunet_datastore_service.h"
#include "gnunet_stats_service.h"
#include "gap.h"
#include "fs.h"
#include "ondemand.h"
#include "plan.h"
#include "pid_table.h"
#include "migration.h"
/**
* How many entries are allowed per slot in the
* collision list?
*/
#define MAX_ENTRIES_PER_SLOT 2
/**
* How often do we check have_more?
*/
#define HAVE_MORE_FREQUENCY (100 * GNUNET_CRON_MILLISECONDS)
/**
* The GAP routing table.
*/
static struct RequestList **table;
static GNUNET_CoreAPIForPlugins *coreAPI;
static GNUNET_Datastore_ServiceAPI *datastore;
static struct GNUNET_CronManager *cron;
/**
* Size of the routing table.
*/
static unsigned int table_size;
/**
* Constant but peer-dependent value that randomizes the construction
* of the indices into the routing table. See
* computeRoutingIndex.
*/
static unsigned int random_qsel;
static GNUNET_Stats_ServiceAPI *stats;
static int stat_gap_query_dropped;
static int stat_gap_query_dropped_redundant;
static int stat_gap_query_routed;
static int stat_gap_query_refreshed;
static int stat_gap_content_found_locally;
static int stat_trust_earned;
static unsigned int
get_table_index (const GNUNET_HashCode * key)
{
unsigned int res
= (((unsigned int *) key)[0] ^
((unsigned int *) key)[1] / (1 + random_qsel)) % table_size;
GNUNET_GE_ASSERT (coreAPI->ectx, res < table_size);
return res;
}
/**
* Cron-job to inject (artificially) delayed messages.
*/
static void
send_delayed (void *cls)
{
GNUNET_MessageHeader *msg = cls;
if (stats != NULL)
stats->change (stat_gap_content_found_locally, 1);
coreAPI->loopback_send (NULL,
(const char *) msg,
ntohs (msg->size), GNUNET_YES, NULL);
GNUNET_free (msg);
}
struct DVPClosure
{
struct RequestList *request;
unsigned int iteration_count;
unsigned int result_count;
};
/**
* An iterator over a set of Datastore items. This
* function is called whenever GAP is processing a
* request. It should
* 1) abort if the load is getting too high
* 2) try on-demand encoding (and if that fails,
* discard the entry)
* 3) assemble a response and inject it via
* loopback WITH a delay
*
* @param datum called with the next item
* @param closure user-defined extra argument
* @param uid unique identifier for the datum;
* maybe 0 if no unique identifier is available
*
* @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
* GNUNET_NO to delete the item and continue (if supported)
*/
static int
datastore_value_processor (const GNUNET_HashCode * key,
const GNUNET_DatastoreValue *
value, void *closure, unsigned long long uid)
{
struct DVPClosure *cls = closure;
struct RequestList *req = cls->request;
P2P_gap_reply_MESSAGE *msg;
GNUNET_DatastoreValue *enc;
unsigned int size;
unsigned long long et;
GNUNET_CronTime now;
int ret;
GNUNET_HashCode hc;
GNUNET_HashCode mhc;
int want_more;
want_more = GNUNET_OK;
cls->iteration_count++;
if (cls->iteration_count > 10 * (1 + req->value))
{
if (cls->result_count > 0)
req->have_more += GNUNET_GAP_HAVE_MORE_INCREMENT;
want_more = GNUNET_SYSERR;
}
enc = NULL;
if (ntohl (value->type) == GNUNET_ECRS_BLOCKTYPE_ONDEMAND)
{
if (GNUNET_OK !=
GNUNET_FS_ONDEMAND_get_indexed_content (value, key, &enc))
return GNUNET_NO;
value = enc;
}
if (req->bloomfilter != NULL)
{
GNUNET_hash (&value[1],
ntohl (value->size) - sizeof (GNUNET_DatastoreValue), &hc);
GNUNET_FS_HELPER_mingle_hash (&hc, req->bloomfilter_mutator, &mhc);
if (GNUNET_YES == GNUNET_bloomfilter_test (req->bloomfilter, &mhc))
return want_more; /* not useful */
}
et = GNUNET_ntohll (value->expiration_time);
now = GNUNET_get_time ();
/* convert to relative expiration time */
if (now < et)
{
et -= now;
if (ntohl (value->type) == GNUNET_ECRS_BLOCKTYPE_KEYWORD)
et %= GNUNET_GAP_MAX_MIGRATION_EXP_KSK;
else
et %= GNUNET_GAP_MAX_MIGRATION_EXP;
}
else
{
if (ntohl (value->type) == GNUNET_ECRS_BLOCKTYPE_KEYWORD)
return want_more; /* expired KSK -- ignore! */
/* indicate entry has expired */
et = -1;
}
size =
sizeof (P2P_gap_reply_MESSAGE) + ntohl (value->size) -
sizeof (GNUNET_DatastoreValue);
msg = GNUNET_malloc (size);
msg->header.type = htons (GNUNET_P2P_PROTO_GAP_RESULT);
msg->header.size = htons (size);
msg->reserved = htonl (0);
msg->expiration = GNUNET_htonll(et);
memcpy (&msg[1], &value[1], size - sizeof (P2P_gap_reply_MESSAGE));
cls->result_count++;
if (cls->result_count > 2 * (1 + req->value))
{
req->have_more += GNUNET_GAP_HAVE_MORE_INCREMENT;
want_more = GNUNET_SYSERR;
}
if (stats != NULL)
{
stats->change (stat_trust_earned, req->value_offered);
req->value_offered = 0;
}
req->remaining_value = 0;
GNUNET_cron_add_job (cron,
send_delayed,
GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK,
GNUNET_GAP_TTL_DECREMENT), 0, msg);
ret =
(ntohl (value->type) ==
GNUNET_ECRS_BLOCKTYPE_DATA) ? GNUNET_SYSERR : want_more;
GNUNET_free_non_null (enc);
return ret;
}
/**
* Execute a GAP query. Determines where to forward
* the query and when (and captures state for the response).
* Also check the local datastore.
*
* @param respond_to where to send replies
* @param priority how important is the request for us?
* @param original_priority how important is the request to the sender?
* @param ttl how long should the query live?
* @param type type of content requested
* @param query_count how many queries are in the queries array?
* @param queries hash codes of the query
* @param filter_mutator how to map replies to the bloom filter
* @param filter_size size of the bloom filter
* @param bloomfilter_data the bloom filter bits
*/
void
GNUNET_FS_GAP_execute_query (const GNUNET_PeerIdentity * respond_to,
unsigned int priority,
unsigned int original_priority,
enum GNUNET_FS_RoutingPolicy policy,
int ttl,
unsigned int type,
unsigned int query_count,
const GNUNET_HashCode * queries,
int filter_mutator,
unsigned int filter_size,
const void *bloomfilter_data)
{
struct RequestList *rl;
struct RequestList *prev;
struct DVPClosure cls;
PID_INDEX peer;
unsigned int index;
GNUNET_CronTime now;
GNUNET_CronTime newTTL;
GNUNET_CronTime minTTL;
unsigned int total;
int ret;
GNUNET_GE_ASSERT (NULL, query_count > 0);
GNUNET_mutex_lock (GNUNET_FS_lock);
index = get_table_index (&queries[0]);
now = GNUNET_get_time ();
newTTL = now + ttl * GNUNET_CRON_SECONDS;
peer = GNUNET_FS_PT_intern (respond_to);
/* check if entry already exists and compute
maxTTL if not */
minTTL = -1;
total = 0;
rl = table[index];
while (rl != NULL)
{
if ((rl->type == type) &&
(rl->response_target == peer) &&
(0 == memcmp (&rl->queries[0], queries,
query_count * sizeof (GNUNET_HashCode))))
{
if (rl->expiration > newTTL)
{
/* ignore */
GNUNET_FS_PT_change_rc (peer, -1);
if (stats != NULL)
stats->change (stat_gap_query_dropped_redundant, 1);
if (type != GNUNET_ECRS_BLOCKTYPE_DATA)
goto CHECK; /* we may have more local results! */
GNUNET_mutex_unlock (GNUNET_FS_lock);
return;
}
if (stats != NULL)
stats->change (stat_gap_query_refreshed, 1);
rl->value += priority;
rl->remaining_value += priority;
rl->expiration = newTTL;
rl->policy = policy;
if ((rl->bloomfilter_size == filter_size) &&
(rl->bloomfilter_mutator == filter_mutator))
{
if (rl->bloomfilter_size > 0)
{
/* update ttl / BF */
GNUNET_bloomfilter_or (rl->bloomfilter,
bloomfilter_data, filter_size);
}
GNUNET_FS_PT_change_rc (peer, -1);
if (type != GNUNET_ECRS_BLOCKTYPE_DATA)
goto CHECK; /* we may have more local results! */
GNUNET_mutex_unlock (GNUNET_FS_lock);
return;
}
/* update BF */
if (rl->bloomfilter != NULL)
GNUNET_bloomfilter_free (rl->bloomfilter);
rl->bloomfilter_mutator = filter_mutator;
rl->bloomfilter_size = filter_size;
if (filter_size > 0)
rl->bloomfilter = GNUNET_bloomfilter_init (coreAPI->ectx,
bloomfilter_data,
filter_size,
GNUNET_GAP_BLOOMFILTER_K);
else
rl->bloomfilter = NULL;
GNUNET_FS_PT_change_rc (peer, -1);
if (type != GNUNET_ECRS_BLOCKTYPE_DATA)
goto CHECK; /* we may have more local results! */
GNUNET_mutex_unlock (GNUNET_FS_lock);
return;
}
if (rl->expiration < minTTL)
minTTL = rl->expiration;
total++;
rl = rl->next;
}
if ((total >= MAX_ENTRIES_PER_SLOT) && (minTTL > newTTL))
{
/* do not process */
GNUNET_FS_PT_change_rc (peer, -1);
GNUNET_mutex_unlock (GNUNET_FS_lock);
if (stats != NULL)
stats->change (stat_gap_query_dropped, 1);
return;
}
/* delete oldest table entry */
prev = NULL;
rl = table[index];
if (total >= MAX_ENTRIES_PER_SLOT)
{
while (rl->expiration != minTTL)
{
prev = rl;
rl = rl->next;
}
if (prev == NULL)
table[index] = rl->next;
else
prev->next = rl->next;
GNUNET_FS_SHARED_free_request_list (rl);
}
/* create new table entry */
rl =
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -