📄 fs.c
字号:
#endif
return coreAPI->cs_send_value (sock, ret);
}
/**
* Process a client request unindex content.
*/
static int
handle_cs_unindex_request (struct GNUNET_ClientHandle *sock,
const GNUNET_MessageHeader * req)
{
int ret;
const CS_fs_request_unindex_MESSAGE *ru;
struct GNUNET_GE_Context *cectx;
cectx = coreAPI->cs_log_context_create (sock);
if (ntohs (req->size) != sizeof (CS_fs_request_unindex_MESSAGE))
{
GNUNET_GE_BREAK (ectx, 0);
GNUNET_GE_BREAK (cectx, 0);
GNUNET_GE_free_context (cectx);
return GNUNET_SYSERR;
}
ru = (const CS_fs_request_unindex_MESSAGE *) req;
#if DEBUG_FS
GNUNET_GE_LOG (ectx,
GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
"FS received REQUEST UNINDEX\n");
#endif
ret = GNUNET_FS_ONDEMAND_delete_indexed_content (cectx,
datastore,
ntohl (ru->blocksize),
&ru->fileId);
GNUNET_GE_free_context (cectx);
return coreAPI->cs_send_value (sock, ret);
}
/**
* Process a client request to test if certain
* data is indexed.
*/
static int
handle_cs_test_indexed_request (struct GNUNET_ClientHandle *sock,
const GNUNET_MessageHeader * req)
{
int ret;
const CS_fs_request_test_index_MESSAGE *ru;
if (ntohs (req->size) != sizeof (CS_fs_request_test_index_MESSAGE))
{
GNUNET_GE_BREAK (ectx, 0);
return GNUNET_SYSERR;
}
ru = (const CS_fs_request_test_index_MESSAGE *) req;
#if DEBUG_FS
GNUNET_GE_LOG (ectx,
GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
"FS received REQUEST TESTINDEXED\n");
#endif
ret = GNUNET_FS_ONDEMAND_test_indexed_file (datastore, &ru->fileId);
return coreAPI->cs_send_value (sock, ret);
}
struct FPPClosure
{
struct GNUNET_ClientHandle *sock;
struct ResponseList *seen;
unsigned int processed;
int have_more;
};
/**
* Any response that we get should be passed
* back to the client. If the response is unique,
* we should about the iteration (return GNUNET_SYSERR).
*/
static int
fast_path_processor (const GNUNET_HashCode * key,
const GNUNET_DatastoreValue *
value, void *closure, unsigned long long uid)
{
struct FPPClosure *cls = closure;
GNUNET_HashCode hc;
struct ResponseList *rl;
unsigned int type;
int ret;
if (cls->processed > GNUNET_GAP_MAX_SYNC_PROCESSED)
{
cls->have_more = GNUNET_YES;
return GNUNET_SYSERR;
}
type = ntohl (((const GNUNET_EC_DBlock *) &value[1])->type);
ret = GNUNET_FS_HELPER_send_to_client (coreAPI,
key, value, cls->sock, NULL, &hc);
if (ret == GNUNET_NO)
return GNUNET_NO; /* delete + continue */
cls->processed++;
if (ret != GNUNET_OK)
cls->have_more = GNUNET_YES; /* switch to async processing */
if ((type == GNUNET_ECRS_BLOCKTYPE_DATA) || (ret != GNUNET_OK))
return GNUNET_SYSERR; /* unique response or client can take no more */
rl = GNUNET_malloc (sizeof (struct ResponseList));
rl->hash = hc;
rl->next = cls->seen;
cls->seen = rl;
return GNUNET_OK;
}
/**
* Process a query from the client. Forwards to the network.
*
* @return GNUNET_SYSERR if the TCP connection should be closed, otherwise GNUNET_OK
*/
static int
handle_cs_query_start_request (struct GNUNET_ClientHandle *sock,
const GNUNET_MessageHeader * req)
{
static GNUNET_PeerIdentity all_zeros;
struct FPPClosure fpp;
struct ResponseList *pos;
const CS_fs_request_search_MESSAGE *rs;
unsigned int keyCount;
unsigned int type;
unsigned int anonymityLevel;
int have_target;
#if DEBUG_FS
GNUNET_EncName enc;
#endif
if (ntohs (req->size) < sizeof (CS_fs_request_search_MESSAGE))
{
GNUNET_GE_BREAK (ectx, 0);
return GNUNET_SYSERR;
}
rs = (const CS_fs_request_search_MESSAGE *) req;
type = ntohl (rs->type);
/* try "fast path" avoiding gap/dht if unique reply is locally available */
#if DEBUG_FS
IF_GELOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
GNUNET_hash_to_enc (&rs->query[0], &enc));
GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
"FS received QUERY (query: `%s', type: %u)\n", &enc, type);
#endif
fpp.sock = sock;
fpp.seen = NULL;
fpp.have_more = GNUNET_NO;
fpp.processed = 0;
if (GNUNET_OK ==
coreAPI->cs_send_message_now_test (sock,
GNUNET_GAP_ESTIMATED_DATA_SIZE,
GNUNET_NO))
{
if (type == GNUNET_ECRS_BLOCKTYPE_DATA)
{
if (((1 == datastore->get (&rs->query[0],
type, &fast_path_processor, &fpp)) ||
(1 == datastore->get (&rs->query[0],
GNUNET_ECRS_BLOCKTYPE_ONDEMAND,
&fast_path_processor, &fpp))) &&
(fpp.have_more == GNUNET_NO))
goto CLEANUP;
}
else
datastore->get (&rs->query[0], type, &fast_path_processor, &fpp);
}
else
fpp.have_more = GNUNET_YES;
anonymityLevel = ntohl (rs->anonymity_level);
keyCount =
1 + (ntohs (req->size) -
sizeof (CS_fs_request_search_MESSAGE)) / sizeof (GNUNET_HashCode);
have_target =
memcmp (&all_zeros, &rs->target, sizeof (GNUNET_PeerIdentity)) != 0;
GNUNET_FS_QUERYMANAGER_start_query (&rs->query[0], keyCount, anonymityLevel,
type, sock,
have_target ? &rs->target : NULL,
fpp.seen, fpp.have_more);
CLEANUP:
while (fpp.seen != NULL)
{
pos = fpp.seen;
fpp.seen = pos->next;
GNUNET_free (pos);
}
return GNUNET_OK;
}
/**
* Process a stop request from the client.
*
* @return GNUNET_SYSERR if the TCP connection should be closed, otherwise GNUNET_OK
*/
static int
handle_cs_query_stop_request (struct GNUNET_ClientHandle *sock,
const GNUNET_MessageHeader * req)
{
const CS_fs_request_search_MESSAGE *rs;
unsigned int keyCount;
unsigned int type;
unsigned int anonymityLevel;
if (ntohs (req->size) < sizeof (CS_fs_request_search_MESSAGE))
{
GNUNET_GE_BREAK (ectx, 0);
return GNUNET_SYSERR;
}
rs = (const CS_fs_request_search_MESSAGE *) req;
type = ntohl (rs->type);
anonymityLevel = ntohl (rs->anonymity_level);
keyCount =
1 + (ntohs (req->size) -
sizeof (CS_fs_request_search_MESSAGE)) / sizeof (GNUNET_HashCode);
GNUNET_FS_QUERYMANAGER_stop_query (&rs->query[0], keyCount, anonymityLevel,
type, sock);
return GNUNET_OK;
}
/**
* Return 1 if the current network (upstream) or CPU load is
* (far) too high, 0 if the load is ok.
*/
static int
test_load_too_high ()
{
return ((hardCPULimit > 0) &&
(GNUNET_cpu_get_load (ectx,
coreAPI->cfg) >= hardCPULimit)) ||
((hardUpLimit > 0) &&
(GNUNET_network_monitor_get_load (coreAPI->load_monitor,
GNUNET_ND_UPLOAD) >= hardUpLimit));
}
/**
* Handle P2P query for content.
*/
static int
handle_p2p_query (const GNUNET_PeerIdentity * sender,
const GNUNET_MessageHeader * msg)
{
const P2P_gap_query_MESSAGE *req;
unsigned int query_count;
unsigned short size;
unsigned int bloomfilter_size;
int ttl;
unsigned int prio;
unsigned int type;
unsigned int netLoad;
enum GNUNET_FS_RoutingPolicy policy;
double preference;
if (stats != NULL)
stats->change (stat_gap_query_received, 1);
if (test_load_too_high ())
{
#if DEBUG_GAP
if (sender != NULL)
{
IF_GELOG (ectx,
GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
GNUNET_hash_to_enc (&sender->hashPubKey, &enc));
}
GNUNET_GE_LOG (ectx,
GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
"Dropping query from %s, this peer is too busy.\n",
sender == NULL ? "localhost" : (char *) &enc);
#endif
if (stats != NULL)
stats->change (stat_gap_query_drop_busy, 1);
return GNUNET_OK;
}
size = ntohs (msg->size);
if (size < sizeof (P2P_gap_query_MESSAGE))
{
GNUNET_GE_BREAK_OP (ectx, 0);
return GNUNET_SYSERR; /* malformed query */
}
req = (const P2P_gap_query_MESSAGE *) msg;
query_count = ntohl (req->number_of_queries);
if ((query_count == 0) ||
(query_count > GNUNET_MAX_BUFFER_SIZE / sizeof (GNUNET_HashCode)) ||
(size <
sizeof (P2P_gap_query_MESSAGE) + (query_count -
1) * sizeof (GNUNET_HashCode))
|| (0 ==
memcmp (&req->returnTo, coreAPI->my_identity,
sizeof (GNUNET_PeerIdentity))))
{
GNUNET_GE_BREAK_OP (ectx, 0);
return GNUNET_SYSERR; /* malformed query */
}
bloomfilter_size =
size - (sizeof (P2P_gap_query_MESSAGE) +
(query_count - 1) * sizeof (GNUNET_HashCode));
GNUNET_GE_ASSERT (NULL, bloomfilter_size < size);
prio = ntohl (req->priority);
netLoad =
GNUNET_network_monitor_get_load (coreAPI->load_monitor, GNUNET_ND_UPLOAD);
if ((netLoad == (unsigned int) -1)
|| (netLoad < GNUNET_GAP_IDLE_LOAD_THRESHOLD))
{
prio = 0; /* minimum priority, no charge! */
policy = GNUNET_FS_RoutingPolicy_ALL;
}
else
{
prio = -identity->changeHostTrust (sender, -prio);
if (netLoad < GNUNET_GAP_IDLE_LOAD_THRESHOLD + prio)
{
policy = GNUNET_FS_RoutingPolicy_ALL;
}
else if (netLoad < 90 + 10 * prio)
{
policy =
GNUNET_FS_RoutingPolicy_ANSWER | GNUNET_FS_RoutingPolicy_FORWARD;
}
else if (netLoad < 100)
{
policy = GNUNET_FS_RoutingPolicy_ANSWER;
}
else
{
if (stats != NULL)
stats->change (stat_gap_query_drop_busy, 1);
return GNUNET_OK; /* drop */
}
}
if ((policy & GNUNET_FS_RoutingPolicy_INDIRECT) == 0)
/* kill the priority (since we cannot benefit) */
prio = 0;
ttl = GNUNET_FS_HELPER_bound_ttl (ntohl (req->ttl), prio);
type = ntohl (req->type);
/* decrement ttl (always) */
if (ttl < 0)
{
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -