📄 plan.c
字号:
}
/* insert into ranking list */
rank->next = rpc->rankings;
rpc->rankings = rank;
}
/**
* Plan the transmission of the given request. Use the history of the
* request and the client to schedule the request for transmission.<p>
*
* This method is probably the most important function in the
* anonymous file-sharing module. It determines for each query where
* it should be forwarded (to which peers, to how many peers) and what
* its TTL and priority values should be.<p>
*
* @param client maybe NULL, in which case peer is significant
* @param peer sender of the request (if not a local client)
* @param request to plan
* @return GNUNET_YES if the request is being planned, GNUNET_NO if not,
* GNUNET_SYSERR on error
*/
int
GNUNET_FS_PLAN_request (struct GNUNET_ClientHandle *client,
PID_INDEX peer, struct RequestList *request)
{
struct ClientInfoList *info;
struct PeerRankings *rank;
struct RankingPeerContext rpc;
GNUNET_PeerIdentity peerId;
unsigned int target_count;
unsigned int i;
unsigned int total_peers;
unsigned long long total_score;
unsigned long long selector;
double entropy;
double prob;
info = clients;
while ((info != NULL) && ((info->client != client) || (info->peer != peer)))
info = info->next;
/* for all connected peers compute ranking */
rpc.info = info;
rpc.request = request;
rpc.rankings = NULL;
total_peers = coreAPI->p2p_connections_iterate (rank_peers, &rpc);
/* use request type, priority, system load and
entropy of ranking to determine number of peers
to queue */
/* use biased random selection to select
peers according to ranking; add requests */
total_score = 0;
rank = rpc.rankings;
while (rank != NULL)
{
GNUNET_GE_ASSERT (NULL, rank->score > 0);
total_score += rank->score;
rank = rank->next;
}
if (total_score == 0)
return GNUNET_NO; /* no peers available */
entropy = 0;
rank = rpc.rankings;
while (rank != NULL)
{
prob = 1.0 * rank->score / total_score;
if (prob > 0.000000001)
entropy -= prob * log (prob) / LOG_2;
rank = rank->next;
}
if (entropy < 0.001)
entropy = 0.001; /* should only be possible if we have virtually only one choice */
target_count = (unsigned int) ceil (entropy);
/* limit target count based on value of the reqeust */
if (target_count > 2 * request->value + 3)
target_count = 2 * request->value + 3;
if (target_count > total_peers)
target_count = total_peers;
/* select target_count peers */
for (i = 0; i < target_count; i++)
{
selector = GNUNET_random_u64 (GNUNET_RANDOM_QUALITY_WEAK, total_score);
rank = rpc.rankings;
while (rank != NULL)
{
if (rank->score > selector)
{
if (request->response_client == NULL)
{
if (rank->prio > request->remaining_value)
{
if ((i == target_count - 1) ||
(request->remaining_value == 0))
rank->prio = request->remaining_value;
else
rank->prio =
GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK,
request->remaining_value);
}
request->remaining_value -= rank->prio;
}
queue_request (rank->peer, request, rank->ttl, rank->prio);
total_score -= rank->score;
rank->score = 0; /* mark as used */
break;
}
selector -= rank->score;
rank = rank->next;
}
}
/* free rpc.rankings list */
while (rpc.rankings != NULL)
{
rank = rpc.rankings;
rpc.rankings = rank->next;
GNUNET_FS_PT_resolve (rank->peer, &peerId);
if (rank->score != 0)
coreAPI->p2p_bandwidth_downstream_reserve (&peerId,
-rank->reserved_bandwidth);
GNUNET_FS_PT_change_rc (rank->peer, -1);
GNUNET_free (rank);
}
return target_count > 0 ? GNUNET_YES : GNUNET_NO;
}
/**
* Try to add the given request to the buffer.
*
* @param available size of the buffer
* @return number of bytes written to the buffer
*/
static unsigned int
try_add_request (struct RequestList *req,
unsigned int prio,
int ttl, void *buf, unsigned int available)
{
P2P_gap_query_MESSAGE *msg = buf;
unsigned int size;
GNUNET_CronTime now;
GNUNET_GE_ASSERT (NULL, req->key_count > 0);
size = sizeof (P2P_gap_query_MESSAGE)
+ req->bloomfilter_size + (req->key_count - 1) * sizeof (GNUNET_HashCode);
if (size > available)
return 0;
if ((prio > req->remaining_value) && (req->response_client == NULL))
prio = req->remaining_value;
ttl = GNUNET_FS_HELPER_bound_ttl (ttl, prio);
msg->header.size = htons (size);
msg->header.type = htons (GNUNET_P2P_PROTO_GAP_QUERY);
msg->type = htonl (req->type);
msg->priority = htonl (prio);
msg->ttl = htonl (ttl);
msg->filter_mutator = htonl (req->bloomfilter_mutator);
msg->number_of_queries = htonl (req->key_count);
if (0 != (req->policy & GNUNET_FS_RoutingPolicy_INDIRECT))
msg->returnTo = *coreAPI->my_identity;
else
GNUNET_FS_PT_resolve (req->response_target, &msg->returnTo);
memcpy (&msg->queries[0],
&req->queries[0], req->key_count * sizeof (GNUNET_HashCode));
if (req->bloomfilter != NULL)
GNUNET_bloomfilter_get_raw_data (req->bloomfilter,
(char *) &msg->queries[req->key_count],
req->bloomfilter_size);
now = GNUNET_get_time ();
if (now + ttl > req->last_request_time + req->last_ttl_used)
{
req->last_request_time = now;
req->last_prio_used = prio;
req->last_ttl_used = ttl;
}
req->remaining_value -= prio;
if (stats != NULL)
{
stats->change (stat_gap_query_sent, 1);
stats->change (stat_trust_spent, prio);
}
return size;
}
/**
* The core has space for a query, find one!
*
* @param receiver the receiver of the message
* @param position is the reference to the
* first unused position in the buffer where GNUnet is building
* the message
* @param padding is the number of bytes left in that buffer.
* @return the number of bytes written to
* that buffer (must be a positive number).
*/
static unsigned int
query_fill_callback (const GNUNET_PeerIdentity *
receiver, void *position, unsigned int padding)
{
char *buf = position;
struct QueryPlanList *pl;
struct QueryPlanEntry *e;
struct QueryPlanEntry *n;
struct QueryPlanEntry *pos;
struct QueryPlanEntry *prev;
struct PeerHistoryList *hl;
struct ClientInfoList *cl;
PID_INDEX peer;
unsigned int off;
unsigned int ret;
/* no locking required: this method will be
called by the core only while the core
lock is held (which is the same as the
FS lock) */
off = 0;
peer = GNUNET_FS_PT_intern (receiver);
pl = queries;
while ((pl != NULL) && (pl->peer != peer))
pl = pl->next;
if (pl != NULL)
{
e = pl->head;
while ((e != NULL) && (padding - off >= sizeof (P2P_gap_query_MESSAGE)))
{
ret = try_add_request (e->request,
e->prio, e->ttl, &buf[off], padding - off);
n = e->next;
if (ret != 0)
{
/* remove e from e's doubly-linked list */
if (e->prev != NULL)
e->prev->next = e->next;
else
pl->head = e->next;
if (e->next != NULL)
e->next->prev = e->prev;
else
pl->tail = e->prev;
/* remove e from singly-linked list of request */
prev = NULL;
pos = e->request->plan_entries;
while (pos != e)
{
prev = pos;
pos = pos->plan_entries_next;
}
if (prev == NULL)
e->request->plan_entries = e->plan_entries_next;
else
prev->plan_entries_next = e->plan_entries_next;
cl = find_or_create_client_entry (e->request->response_client,
e->request->response_target);
GNUNET_free (e);
hl = find_or_create_history_entry (cl, peer);
hl->last_request_time = GNUNET_get_time ();
hl->request_count++;
}
off += ret;
e = n;
}
}
GNUNET_FS_PT_change_rc (peer, -1);
return off;
}
static void
free_client_info_list (struct ClientInfoList *pos)
{
struct PeerHistoryList *ph;
while (pos->history != NULL)
{
ph = pos->history;
pos->history = ph->next;
GNUNET_FS_PT_change_rc (ph->peer, -1);
GNUNET_free (ph);
}
GNUNET_FS_PT_change_rc (pos->peer, -1);
GNUNET_free (pos);
}
/**
* Method called whenever a given client disconnects.
* Frees all of the associated data structures.
*/
static void
handle_client_exit (struct GNUNET_ClientHandle *client)
{
struct ClientInfoList *pos;
struct ClientInfoList *prev;
GNUNET_mutex_lock (GNUNET_FS_lock);
pos = clients;
prev = NULL;
while (pos != NULL)
{
if (pos->client == client)
{
if (prev == NULL)
clients = pos->next;
else
prev->next = pos->next;
free_client_info_list (pos);
if (prev == NULL)
pos = clients;
else
pos = prev->next;
}
else
{
prev = pos;
pos = pos->next;
}
}
GNUNET_mutex_unlock (GNUNET_FS_lock);
}
/**
* Notify the plan that a request succeeded.
*/
void
GNUNET_FS_PLAN_success (PID_INDEX responder,
struct GNUNET_ClientHandle *client,
PID_INDEX peer, const struct RequestList *success)
{
struct ClientInfoList *cl;
struct PeerHistoryList *hl;
GNUNET_mutex_lock (GNUNET_FS_lock);
cl = find_or_create_client_entry (client, peer);
hl = find_or_create_history_entry (cl, responder);
hl->response_count++;
hl->last_good_ttl = success->last_ttl_used;
hl->last_good_prio = success->last_prio_used;
hl->last_response_time = GNUNET_get_time ();
hl->response_count++;
GNUNET_mutex_unlock (GNUNET_FS_lock);
if (stats != NULL)
stats->change (stat_gap_query_success, 1);
}
/**
* Free the given query plan list and all of its entries.
*/
static void
free_query_plan_list (struct QueryPlanList *qpl)
{
struct QueryPlanEntry *el;
struct QueryPlanEntry *pred;
while (qpl->head != NULL)
{
el = qpl->head;
qpl->head = el->next;
pred = el->request->plan_entries;
if (pred == el)
el->request->plan_entries = el->plan_entries_next;
else
{
while (pred->plan_entries_next != el)
pred = pred->plan_entries_next;
pred->plan_entries_next = el->plan_entries_next;
}
GNUNET_free (el);
}
GNUNET_FS_PT_change_rc (qpl->peer, -1);
GNUNET_free (qpl);
}
/**
* Connection to another peer was cut. Clean up
* all state associated with that peer (except for
* active requests, that's not our job).
*/
static void
peer_disconnect_handler (const GNUNET_PeerIdentity * peer, void *unused)
{
PID_INDEX pid;
struct QueryPlanList *qpos;
struct QueryPlanList *qprev;
struct ClientInfoList *cpos;
struct ClientInfoList *cprev;
GNUNET_mutex_lock (GNUNET_FS_lock);
pid = GNUNET_FS_PT_intern (peer);
qprev = NULL;
qpos = queries;
while (qpos != NULL)
{
if (qpos->peer == pid)
{
if (qprev != NULL)
qprev->next = qpos->next;
else
queries = qpos->next;
free_query_plan_list (qpos);
if (qprev != NULL)
qpos = qprev->next;
else
qpos = queries;
continue;
}
qprev = qpos;
qpos = qpos->next;
}
cprev = NULL;
cpos = clients;
while (cpos != NULL)
{
if ((cpos->peer == pid) && (cpos->client == NULL))
{
if (cprev == NULL)
clients = cpos->next;
else
cprev->next = cpos->next;
free_client_info_list (cpos);
if (cprev == NULL)
cpos = clients;
else
cpos = cprev->next;
continue;
}
cprev = cpos;
cpos = cpos->next;
}
GNUNET_FS_PT_change_rc (pid, -1);
GNUNET_mutex_unlock (GNUNET_FS_lock);
}
int
GNUNET_FS_PLAN_init (GNUNET_CoreAPIForPlugins * capi)
{
LOG_2 = log (2);
coreAPI = capi;
GNUNET_GE_ASSERT (capi->ectx,
GNUNET_SYSERR !=
capi->cs_disconnect_handler_register
(&handle_client_exit));
GNUNET_GE_ASSERT (capi->ectx,
GNUNET_SYSERR !=
capi->peer_disconnect_notification_register
(&peer_disconnect_handler, NULL));
GNUNET_GE_ASSERT (coreAPI->ectx,
GNUNET_SYSERR !=
coreAPI->send_callback_register (sizeof
(P2P_gap_query_MESSAGE),
GNUNET_FS_GAP_QUERY_POLL_PRIORITY,
&query_fill_callback));
stats = capi->service_request ("stats");
if (stats != NULL)
{
stat_gap_query_sent =
stats->create (gettext_noop ("# gap requests total sent"));
stat_gap_query_planned =
stats->create (gettext_noop ("# gap content total planned"));
stat_gap_query_success =
stats->create (gettext_noop ("# gap routes succeeded"));
stat_trust_spent = stats->create (gettext_noop ("# trust spent"));
}
return 0;
}
int
GNUNET_FS_PLAN_done ()
{
struct QueryPlanList *qpl;
while (queries != NULL)
{
qpl = queries;
queries = qpl->next;
free_query_plan_list (qpl);
}
/* clean up clients */
while (clients != NULL)
handle_client_exit (clients->client);
GNUNET_GE_ASSERT (coreAPI->ectx,
GNUNET_SYSERR !=
coreAPI->cs_disconnect_handler_unregister
(&handle_client_exit));
GNUNET_GE_ASSERT (coreAPI->ectx,
GNUNET_SYSERR !=
coreAPI->peer_disconnect_notification_unregister
(&peer_disconnect_handler, NULL));
GNUNET_GE_ASSERT (coreAPI->ectx,
GNUNET_SYSERR !=
coreAPI->send_callback_unregister (sizeof
(P2P_gap_query_MESSAGE),
&query_fill_callback));
if (stats != NULL)
{
coreAPI->service_release (stats);
stats = NULL;
}
return 0;
}
/* end of plan.c */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -