📄 gap.c
字号:
GNUNET_malloc (sizeof (struct RequestList) +
(query_count - 1) * sizeof (GNUNET_HashCode));
memset (rl, 0, sizeof (struct RequestList));
memcpy (&rl->queries[0], queries, query_count * sizeof (GNUNET_HashCode));
rl->key_count = query_count;
if (filter_size > 0)
{
rl->bloomfilter_size = filter_size;
rl->bloomfilter_mutator = filter_mutator;
rl->bloomfilter = GNUNET_bloomfilter_init (coreAPI->ectx,
bloomfilter_data,
filter_size,
GNUNET_GAP_BLOOMFILTER_K);
}
rl->anonymityLevel = 1;
rl->type = type;
rl->value = priority;
rl->remaining_value = priority > 0 ? priority - 1 : 0;
rl->value_offered = original_priority;
rl->expiration = newTTL;
rl->next = table[index];
rl->response_target = peer;
rl->policy = policy;
table[index] = rl;
if (stats != NULL)
stats->change (stat_gap_query_routed, 1);
/* check local data store */
CHECK:
cls.request = rl;
cls.iteration_count = 0;
cls.result_count = 0;
ret = datastore->get (&queries[0], type, &datastore_value_processor, &cls);
if ((type == GNUNET_ECRS_BLOCKTYPE_DATA) && (ret != 1))
ret = datastore->get (&queries[0],
GNUNET_ECRS_BLOCKTYPE_ONDEMAND,
&datastore_value_processor, &cls);
/* if not found or not unique, forward */
if (((ret != 1) || (type != GNUNET_ECRS_BLOCKTYPE_DATA)) &&
(0 != (policy & GNUNET_FS_RoutingPolicy_FORWARD)) &&
(rl->plan_entries == NULL))
GNUNET_FS_PLAN_request (NULL, peer, rl);
GNUNET_mutex_unlock (GNUNET_FS_lock);
}
/**
* Handle the given response (by forwarding it to
* other peers as necessary).
*
* @param sender who send the response (good too know
* for future routing decisions)
* @param primary_query hash code used for lookup
* (note that namespace membership may
* require additional verification that has
* not yet been performed; checking the
* signature has already been done)
* @param expiration relative time until the content
* will expire
* @param size size of the data
* @param data the data itself
* @return how much was this content worth to us?
*/
unsigned int
GNUNET_FS_GAP_handle_response (const GNUNET_PeerIdentity * sender,
const GNUNET_HashCode * primary_query,
GNUNET_CronTime expiration,
unsigned int size,
const GNUNET_EC_DBlock * data)
{
GNUNET_HashCode hc;
GNUNET_PeerIdentity target;
struct RequestList *rl;
struct RequestList *prev;
unsigned int value;
P2P_gap_reply_MESSAGE *msg;
PID_INDEX rid;
unsigned int index;
PID_INDEX blocked[MAX_ENTRIES_PER_SLOT + 1];
unsigned int block_count;
int was_new;
unsigned int rl_value;
value = 0;
GNUNET_mutex_lock (GNUNET_FS_lock);
rid = GNUNET_FS_PT_intern (sender);
index = get_table_index (primary_query);
rl = table[index];
prev = NULL;
if (rid != 0)
{
blocked[0] = rid;
block_count = 1;
}
else
{
block_count = 0;
}
was_new = GNUNET_NO;
while (rl != NULL)
{
if (GNUNET_OK == GNUNET_FS_SHARED_test_valid_new_response (rl,
primary_query,
size,
data, &hc))
{
was_new = GNUNET_YES;
GNUNET_GE_ASSERT (NULL, rl->response_target != 0);
GNUNET_FS_PT_resolve (rl->response_target, &target);
GNUNET_GE_ASSERT (NULL, block_count <= MAX_ENTRIES_PER_SLOT);
blocked[block_count++] = rl->response_target;
GNUNET_FS_PT_change_rc (rl->response_target, 1);
rl->value_offered = 0;
if (stats != NULL)
stats->change (stat_trust_earned, rl->value_offered);
if (rl->type != GNUNET_ECRS_BLOCKTYPE_DATA)
GNUNET_FS_SHARED_mark_response_seen (rl, &hc);
GNUNET_FS_PLAN_success (rid, NULL, rl->response_target, rl);
value += rl->value;
rl_value = rl->value;
rl->value = 0;
if (rl->type == GNUNET_ECRS_BLOCKTYPE_DATA)
{
if (prev == NULL)
table[index] = rl->next;
else
prev->next = rl->next;
GNUNET_FS_SHARED_free_request_list (rl);
if (prev == NULL)
rl = table[index];
else
rl = prev->next;
continue;
}
/* queue response (do this last since ciphertext_send may
cause the core to detect that the connection died which
may result in changes to the request list!) */
msg = GNUNET_malloc (sizeof (P2P_gap_reply_MESSAGE) + size);
msg->header.type = htons (GNUNET_P2P_PROTO_GAP_RESULT);
msg->header.size = htons (sizeof (P2P_gap_reply_MESSAGE) + size);
msg->reserved = 0;
msg->expiration = GNUNET_htonll (expiration);
memcpy (&msg[1], data, size);
coreAPI->ciphertext_send (&target,
&msg->header,
GNUNET_GAP_BASE_REPLY_PRIORITY * (1 +
rl_value),
GNUNET_GAP_MAX_GAP_DELAY);
GNUNET_free (msg);
/* since the linked list may have changed, start again
from the beginning! */
rl = table[index];
continue;
}
prev = rl;
rl = rl->next;
}
if (was_new == GNUNET_YES)
GNUNET_FS_MIGRATION_inject (primary_query,
size, data, expiration, block_count, blocked);
GNUNET_mutex_unlock (GNUNET_FS_lock);
GNUNET_FS_PT_decrement_rcs (blocked, block_count); /* includes rid */
return value;
}
/**
* Compute the average priority of inbound requests
* (rounded up).
*/
unsigned int
GNUNET_FS_GAP_get_average_priority ()
{
struct RequestList *rl;
unsigned long long tot;
unsigned int i;
unsigned int active;
tot = 0;
active = 0;
GNUNET_mutex_lock (GNUNET_FS_lock);
for (i = 0; i < table_size; i++)
{
rl = table[i];
while (rl != NULL)
{
tot += rl->value;
active++;
rl = rl->next;
}
}
GNUNET_mutex_unlock (GNUNET_FS_lock);
if (active == 0)
return 0;
if (active * (tot / active) < tot)
return (unsigned int) (tot / active) + 1;
return (unsigned int) (tot / active);
}
/**
* We were disconnected from another peer.
* Remove all of its pending queries.
*/
static void
cleanup_on_peer_disconnect (const GNUNET_PeerIdentity * peer, void *unused)
{
unsigned int i;
struct RequestList *rl;
struct RequestList *prev;
PID_INDEX pid;
GNUNET_mutex_lock (GNUNET_FS_lock);
pid = GNUNET_FS_PT_intern (peer);
for (i = 0; i < table_size; i++)
{
rl = table[i];
prev = NULL;
while (rl != NULL)
{
if (pid == rl->response_target)
{
if (prev == NULL)
table[i] = rl->next;
else
prev->next = rl->next;
GNUNET_FS_SHARED_free_request_list (rl);
if (prev == NULL)
rl = table[i];
else
rl = prev->next;
}
else
{
prev = rl;
rl = rl->next;
}
}
}
GNUNET_FS_PT_change_rc (pid, -1);
GNUNET_mutex_unlock (GNUNET_FS_lock);
}
/**
* Cron-job to find and transmit more results (beyond
* the initial batch) over time -- assuming the entry
* is still valid and we have more data.
*/
static void
have_more_processor (void *unused)
{
static unsigned int pos;
struct RequestList *req;
GNUNET_CronTime now;
struct DVPClosure cls;
GNUNET_mutex_lock (GNUNET_FS_lock);
now = GNUNET_get_time ();
if (pos >= table_size)
pos = 0;
req = table[pos];
while (req != NULL)
{
if ((GNUNET_cpu_get_load (coreAPI->ectx,
coreAPI->cfg) > 50) ||
(GNUNET_disk_get_load (coreAPI->ectx, coreAPI->cfg) > 25))
break;
if (req->have_more > 0)
{
req->have_more--;
cls.request = req;
cls.iteration_count = 0;
cls.result_count = 0;
datastore->get (&req->queries[0], req->type,
&datastore_value_processor, &cls);
}
req = req->next;
}
if (req == NULL)
pos++;
GNUNET_mutex_unlock (GNUNET_FS_lock);
}
int
GNUNET_FS_GAP_init (GNUNET_CoreAPIForPlugins * capi)
{
unsigned long long ts;
coreAPI = capi;
datastore = capi->service_request ("datastore");
random_qsel = GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, 0xFFFF);
if (-1 ==
GNUNET_GC_get_configuration_value_number (coreAPI->cfg, "GAP",
"TABLESIZE",
GNUNET_GAP_MIN_INDIRECTION_TABLE_SIZE,
GNUNET_MAX_GNUNET_malloc_CHECKED
/
sizeof (struct RequestList *),
GNUNET_GAP_MIN_INDIRECTION_TABLE_SIZE,
&ts))
return GNUNET_SYSERR;
table_size = ts;
table = GNUNET_malloc (sizeof (struct RequestList *) * table_size);
memset (table, 0, sizeof (struct RequestList *) * table_size);
GNUNET_GE_ASSERT (coreAPI->ectx,
GNUNET_SYSERR !=
coreAPI->peer_disconnect_notification_register
(&cleanup_on_peer_disconnect, NULL));
GNUNET_cron_add_job (capi->cron,
&have_more_processor,
HAVE_MORE_FREQUENCY, HAVE_MORE_FREQUENCY, NULL);
stats = capi->service_request ("stats");
if (stats != NULL)
{
stat_gap_query_dropped =
stats->create (gettext_noop ("# gap queries dropped (table full)"));
stat_gap_query_dropped_redundant =
stats->create (gettext_noop ("# gap queries dropped (redundant)"));
stat_gap_query_routed =
stats->create (gettext_noop ("# gap queries routed"));
stat_gap_content_found_locally =
stats->create (gettext_noop ("# gap content found locally"));
stat_gap_query_refreshed =
stats->create (gettext_noop
("# gap queries refreshed existing record"));
stat_trust_earned = stats->create (gettext_noop ("# trust earned"));
}
cron = GNUNET_cron_create (coreAPI->ectx);
GNUNET_cron_start (cron);
return 0;
}
int
GNUNET_FS_GAP_done ()
{
unsigned int i;
struct RequestList *rl;
GNUNET_cron_del_job (coreAPI->cron,
&have_more_processor, HAVE_MORE_FREQUENCY, NULL);
GNUNET_cron_stop (cron);
GNUNET_cron_destroy (cron);
for (i = 0; i < table_size; i++)
{
while (NULL != (rl = table[i]))
{
table[i] = rl->next;
GNUNET_FS_SHARED_free_request_list (rl);
}
}
GNUNET_free (table);
table = NULL;
GNUNET_GE_ASSERT (coreAPI->ectx,
GNUNET_SYSERR !=
coreAPI->peer_disconnect_notification_unregister
(&cleanup_on_peer_disconnect, NULL));
coreAPI->service_release (datastore);
datastore = NULL;
if (stats != NULL)
{
coreAPI->service_release (stats);
stats = NULL;
}
return 0;
}
/* end of gap.c */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -