📄 querymanager.c
字号:
GNUNET_FS_PLAN_success (sender, client, 0, rl);
if (rl->type == GNUNET_ECRS_BLOCKTYPE_DATA)
return GNUNET_OK; /* the end */
/* update bloom filter */
rl->bloomfilter_entry_count++;
bf_size = compute_bloomfilter_size (rl->bloomfilter_entry_count);
if (rl->bloomfilter == NULL)
{
rl->bloomfilter_mutator
= GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, -1);
rl->bloomfilter_size = bf_size;
rl->bloomfilter = GNUNET_bloomfilter_init (NULL,
NULL,
rl->bloomfilter_size,
GNUNET_GAP_BLOOMFILTER_K);
if (stats != NULL)
stats->change (stat_gap_client_bf_updates, 1);
}
else if (rl->bloomfilter_size != bf_size)
{
rl->bloomfilter_mutator
= GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, -1);
ic.pos = rl->responses;
ic.mingle_number = rl->bloomfilter_mutator;
GNUNET_bloomfilter_resize (rl->bloomfilter,
&response_bf_iterator,
&ic, bf_size, GNUNET_GAP_BLOOMFILTER_K);
if (stats != NULL)
stats->change (stat_gap_client_bf_updates, 1);
}
GNUNET_FS_SHARED_mark_response_seen (rl, &hc);
/* we want more */
return GNUNET_NO;
}
/**
* Handle the given response (by forwarding it to
* other peers as necessary).
*
* @param sender who send the response (good too know
* for future routing decisions)
* @param primary_query hash code used for lookup
* (note that namespace membership may
* require additional verification that has
* not yet been performed; checking the
* signature has already been done)
* @param size size of the data
* @param data the data itself (a GNUNET_EC_DBlock)
* @return how much was this content worth to us?
*/
unsigned int
GNUNET_FS_QUERYMANAGER_handle_response (const GNUNET_PeerIdentity * sender,
const GNUNET_HashCode * primary_query,
GNUNET_CronTime expirationTime,
unsigned int size,
const GNUNET_EC_DBlock * data)
{
struct ClientDataList *cl;
struct RequestList *rl;
struct RequestList *prev;
unsigned int value;
PID_INDEX rid;
rid = GNUNET_FS_PT_intern (sender);
GNUNET_mutex_lock (GNUNET_FS_lock);
value = 0;
cl = clients;
while (cl != NULL)
{
rl = cl->requests;
prev = NULL;
while (rl != NULL)
{
if (GNUNET_OK ==
handle_response (rid,
cl->client,
rl,
primary_query,
expirationTime, size, data, &value))
{
if (prev != NULL)
prev->next = rl->next;
else
cl->requests = rl->next;
if (rl == cl->request_tail)
cl->request_tail = prev;
GNUNET_FS_SHARED_free_request_list (rl);
if (stats != NULL)
stats->change (stat_gap_client_query_tracked, -1);
if (prev == NULL)
rl = cl->requests;
else
rl = prev->next;
}
else
{
prev = rl;
rl = rl->next;
}
}
cl = cl->next;
}
GNUNET_mutex_unlock (GNUNET_FS_lock);
GNUNET_FS_PT_change_rc (rid, -1);
return value;
}
/**
* Method called whenever a given client disconnects.
* Frees all of the associated data structures.
*/
static void
handle_client_exit (struct GNUNET_ClientHandle *client)
{
struct ClientDataList *cl;
struct ClientDataList *prev;
struct RequestList *rl;
GNUNET_mutex_lock (GNUNET_FS_lock);
cl = clients;
prev = NULL;
while ((cl != NULL) && (cl->client != client))
{
prev = cl;
cl = cl->next;
}
if (cl == clients_tail)
clients_tail = prev;
if (cl != NULL)
{
while (cl->requests != NULL)
{
rl = cl->requests;
cl->requests = rl->next;
GNUNET_FS_SHARED_free_request_list (rl);
if (stats != NULL)
stats->change (stat_gap_client_query_tracked, -1);
}
if (prev == NULL)
clients = cl->next;
else
prev->next = cl->next;
GNUNET_free (cl);
}
GNUNET_mutex_unlock (GNUNET_FS_lock);
}
struct HMClosure
{
struct RequestList *request;
unsigned int processed;
int have_more;
};
/**
* Any response that we get should be passed
* back to the client. If the response is unique,
* we should about the iteration (return GNUNET_SYSERR).
*/
static int
have_more_processor (const GNUNET_HashCode * key,
const GNUNET_DatastoreValue *
value, void *closure, unsigned long long uid)
{
struct HMClosure *cls = closure;
GNUNET_HashCode hc;
int ret;
ret = GNUNET_FS_HELPER_send_to_client (coreAPI,
key, value,
cls->request->response_client,
cls->request, &hc);
if (ret != GNUNET_OK)
{
/* client can take no more right now */
cls->have_more = GNUNET_YES;
return ret; /* NO: delete, SYSERR: abort */
}
GNUNET_FS_SHARED_mark_response_seen (cls->request, &hc);
cls->processed++;
if (cls->processed > GNUNET_GAP_MAX_ASYNC_PROCESSED)
{
cls->have_more = GNUNET_YES;
return GNUNET_SYSERR;
}
return GNUNET_OK;
}
/**
* Cron-job to periodically check if we should
* repeat requests.
*/
static void
repeat_requests_job (void *unused)
{
struct HMClosure hmc;
struct ClientDataList *client;
struct RequestList *request;
struct RequestList *prev;
GNUNET_CronTime now;
GNUNET_mutex_lock (GNUNET_FS_lock);
if (clients == NULL)
{
GNUNET_mutex_unlock (GNUNET_FS_lock);
return;
}
now = GNUNET_get_time ();
client = clients;
if (clients_tail != client)
{
/* move client to tail of list */
GNUNET_GE_ASSERT (NULL, clients_tail->next == NULL);
clients = clients->next;
clients_tail->next = client;
clients_tail = client;
client->next = NULL;
}
request = client->requests;
if (request == NULL)
{
GNUNET_mutex_unlock (GNUNET_FS_lock);
return;
}
if (client->request_tail != request)
{
/* move request to tail of list */
GNUNET_GE_ASSERT (NULL, client->request_tail->next == NULL);
client->requests = request->next;
client->request_tail->next = request;
prev = client->request_tail;
client->request_tail = request;
request->next = NULL;
}
else
{
prev = NULL;
}
GNUNET_GE_ASSERT (NULL, request->next == NULL);
GNUNET_GE_ASSERT (NULL, client->request_tail->next == NULL);
if ((client->client != NULL) &&
(GNUNET_OK !=
coreAPI->cs_send_message_now_test (client->client,
GNUNET_GAP_ESTIMATED_DATA_SIZE,
GNUNET_NO)))
{
GNUNET_mutex_unlock (GNUNET_FS_lock);
return;
}
if (request->have_more > 0)
{
request->have_more--;
hmc.request = request;
hmc.processed = 0;
hmc.have_more = GNUNET_NO;
if (request->type == GNUNET_ECRS_BLOCKTYPE_DATA)
{
if (((1 == datastore->get (&request->queries[0], request->type,
&have_more_processor, &hmc)) ||
(1 == datastore->get (&request->queries[0],
GNUNET_ECRS_BLOCKTYPE_ONDEMAND,
&have_more_processor, &hmc))) &&
(hmc.have_more == GNUNET_NO))
{
if (prev == NULL)
{
client->request_tail = NULL;
client->requests = NULL;
}
else
{
prev->next = NULL;
if (client->request_tail == request)
client->request_tail = prev;
}
GNUNET_FS_SHARED_free_request_list (request);
if (stats != NULL)
stats->change (stat_gap_client_query_tracked, -1);
}
}
else
{
datastore->get (&request->queries[0], request->type,
&have_more_processor, &hmc);
}
if (hmc.have_more)
request->have_more += GNUNET_GAP_HAVE_MORE_INCREMENT;
}
else
{
if ((NULL == request->plan_entries) &&
((client->client != NULL) ||
(request->expiration > now)) &&
(request->last_ttl_used * GNUNET_CRON_SECONDS +
request->last_request_time < now))
{
if ((GNUNET_OK ==
GNUNET_FS_PLAN_request (client->client, 0, request))
&& (stats != NULL))
stats->change (stat_gap_client_query_injected, 1);
}
if ((request->anonymityLevel == 0) &&
(request->last_dht_get + request->dht_back_off < now))
{
if (request->dht_back_off * 2 > request->dht_back_off)
request->dht_back_off *= 2;
request->last_dht_get = now;
GNUNET_FS_DHT_execute_query (request->type, &request->queries[0]);
}
}
GNUNET_mutex_unlock (GNUNET_FS_lock);
}
int
GNUNET_FS_QUERYMANAGER_init (GNUNET_CoreAPIForPlugins * capi)
{
coreAPI = capi;
GNUNET_GE_ASSERT (capi->ectx,
GNUNET_SYSERR !=
capi->cs_disconnect_handler_register
(&handle_client_exit));
datastore = capi->service_request ("datastore");
stats = capi->service_request ("stats");
if (stats != NULL)
{
stat_gap_client_query_received =
stats->create (gettext_noop ("# gap client queries received"));
stat_gap_client_response_sent =
stats->create (gettext_noop ("# gap replies sent to clients"));
stat_gap_client_query_tracked =
stats->create (gettext_noop ("# gap client requests tracked"));
stat_gap_client_query_injected =
stats->create (gettext_noop ("# gap client requests injected"));
stat_gap_client_bf_updates =
stats->create (gettext_noop
("# gap query bloomfilter resizing updates"));
}
GNUNET_cron_add_job (capi->cron,
&repeat_requests_job,
CHECK_REPEAT_FREQUENCY, CHECK_REPEAT_FREQUENCY, NULL);
return 0;
}
int
GNUNET_FS_QUERYMANAGER_done ()
{
GNUNET_cron_del_job (coreAPI->cron,
&repeat_requests_job, CHECK_REPEAT_FREQUENCY, NULL);
GNUNET_GE_ASSERT (coreAPI->ectx,
GNUNET_SYSERR !=
coreAPI->cs_disconnect_handler_unregister
(&handle_client_exit));
while (clients != NULL)
handle_client_exit (clients->client);
coreAPI->service_release (datastore);
datastore = NULL;
if (stats != NULL)
{
coreAPI->service_release (stats);
stats = NULL;
}
return 0;
}
/* end of querymanager.c */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -