📄 routing.c
字号:
#endif GNUNET_mutex_unlock (lock); if (stats != NULL) stats->change (stat_requests_routed, 1); return GNUNET_OK;}/** * Handle GET message. */static inthandleGet (const GNUNET_PeerIdentity * sender, const GNUNET_MessageHeader * msg){ GNUNET_PeerIdentity next[GET_TRIES + 1]; const DHT_MESSAGE *get; DHT_MESSAGE aget; unsigned int target_value; unsigned int hop_count; int total; int i; int j;#if DEBUG_ROUTING GNUNET_EncName enc; GNUNET_EncName henc;#endif if (ntohs (msg->size) != sizeof (DHT_MESSAGE)) { GNUNET_GE_BREAK (NULL, 0); return GNUNET_SYSERR; } get = (const DHT_MESSAGE *) msg;#if DEBUG_ROUTING GNUNET_hash_to_enc (&get->key, &enc); if (sender != NULL) GNUNET_hash_to_enc (&sender->hashPubKey, &henc); GNUNET_GE_LOG (coreAPI->ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_DEVELOPER, "Received DHT GET for key `%s' from `%s'.\n", &enc, sender == NULL ? "me" : (char *) &henc);#endif if (stats != NULL) stats->change (stat_get_requests_received, 1); if ((sender != NULL) && (GNUNET_OK != addRoute (sender, NULL, NULL, get))) {#if DEBUG_ROUTING GNUNET_GE_LOG (coreAPI->ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_DEVELOPER, "Failed to add entry in routing table for request.\n");#endif return GNUNET_OK; /* could not route */ } total = dstore->get (&get->key, ntohl (get->type), &routeResult, NULL); if (total > MAX_RESULTS) {#if DEBUG_ROUTING GNUNET_GE_LOG (coreAPI->ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_DEVELOPER, "Found %d results locally, will not route GET any further\n", total);#endif return GNUNET_OK; } aget = *get; hop_count = ntohl (get->hop_count); target_value = get_forward_count (hop_count, GET_TRIES); aget.hop_count = htonl (1 + hop_count); aget.network_size = htonl (ntohl (get->network_size) + GNUNET_DHT_estimate_network_diameter ()); if (target_value > GET_TRIES) target_value = GET_TRIES; j = 0; if (sender != NULL) next[j++] = *sender; /* do not send back to sender! */ for (i = 0; i < target_value; i++) { if (GNUNET_OK != GNUNET_DHT_select_peer (&next[j], &get->key, &next[0], j)) {#if DEBUG_ROUTING GNUNET_GE_LOG (coreAPI->ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_DEVELOPER, "Failed to select peer for fowarding in round %d/%d\n", i + 1, GET_TRIES);#endif break; }#if DEBUG_ROUTING GNUNET_hash_to_enc (&next[j].hashPubKey, &enc); GNUNET_GE_LOG (coreAPI->ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_DEVELOPER, "Forwarding DHT GET request to peer `%s'.\n", &enc);#endif coreAPI->ciphertext_send (&next[j], &aget.header, DHT_PRIORITY, DHT_DELAY); j++; } return GNUNET_OK;}/** * Handle PUT message. */static inthandlePut (const GNUNET_PeerIdentity * sender, const GNUNET_MessageHeader * msg){ GNUNET_PeerIdentity next[PUT_TRIES + 1]; const DHT_MESSAGE *put; DHT_MESSAGE *aput; GNUNET_CronTime now; unsigned int hop_count; unsigned int target_value; int store; int i; unsigned int j;#if DEBUG_ROUTING GNUNET_EncName enc;#endif if (ntohs (msg->size) < sizeof (DHT_MESSAGE)) { GNUNET_GE_BREAK (NULL, 0); return GNUNET_SYSERR; } if (stats != NULL) stats->change (stat_put_requests_received, 1); put = (const DHT_MESSAGE *) msg;#if DEBUG_ROUTING GNUNET_hash_to_enc (&put->key, &enc); GNUNET_GE_LOG (coreAPI->ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_DEVELOPER, "Received DHT PUT for key `%s'.\n", &enc);#endif store = 0; hop_count = htons (put->hop_count); target_value = get_forward_count (hop_count, PUT_TRIES); aput = GNUNET_malloc (ntohs (msg->size)); memcpy (aput, put, ntohs (msg->size)); aput->hop_count = htons (hop_count + 1); aput->network_size = htonl (ntohl (put->network_size) + GNUNET_DHT_estimate_network_diameter ()); if (target_value > PUT_TRIES) target_value = PUT_TRIES; j = 0; if (sender != NULL) next[j++] = *sender; /* do not send back to sender! */ for (i = 0; i < target_value; i++) { if (GNUNET_OK != GNUNET_DHT_select_peer (&next[j], &put->key, &next[0], j)) {#if DEBUG_ROUTING GNUNET_GE_LOG (coreAPI->ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_DEVELOPER, "Failed to select peer for PUT fowarding in round %d/%d\n", i + 1, PUT_TRIES);#endif store = 1; continue; } if (1 == GNUNET_hash_xorcmp (&next[j].hashPubKey, &coreAPI->my_identity->hashPubKey, &put->key)) store = 1; /* we're closer than the selected target */#if DEBUG_ROUTING GNUNET_hash_to_enc (&next[j].hashPubKey, &enc); GNUNET_GE_LOG (coreAPI->ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_DEVELOPER, "Forwarding DHT PUT request to peer `%s'.\n", &enc);#endif coreAPI->ciphertext_send (&next[j], &aput->header, DHT_PRIORITY, DHT_DELAY); j++; } GNUNET_free (aput); if (store != 0) { now = GNUNET_get_time ();#if DEBUG_ROUTING GNUNET_GE_LOG (coreAPI->ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_DEVELOPER, "Decided to cache data `%.*s' locally until %llu (for %llu ms)\n", ntohs (put->header.size) - sizeof (DHT_MESSAGE), &put[1], CONTENT_LIFETIME + now, CONTENT_LIFETIME);#endif dstore->put (&put->key, ntohl (put->type), CONTENT_LIFETIME + now, ntohs (put->header.size) - sizeof (DHT_MESSAGE), (const char *) &put[1]); } else {#if DEBUG_ROUTING GNUNET_GE_LOG (coreAPI->ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_DEVELOPER, "Decided NOT to cache data `%.*s' locally\n", ntohs (put->header.size) - sizeof (DHT_MESSAGE), &put[1]);#endif } return GNUNET_OK;}/** * Handle RESULT message. */static inthandleResult (const GNUNET_PeerIdentity * sender, const GNUNET_MessageHeader * msg){ const DHT_MESSAGE *result;#if DEBUG_ROUTING GNUNET_EncName enc;#endif if (ntohs (msg->size) < sizeof (DHT_MESSAGE)) { GNUNET_GE_BREAK (NULL, 0); return GNUNET_SYSERR; } if (stats != NULL) stats->change (stat_results_received, 1); result = (const DHT_MESSAGE *) msg;#if DEBUG_ROUTING GNUNET_hash_to_enc (&result->key, &enc); GNUNET_GE_LOG (coreAPI->ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_DEVELOPER, "Received REMOTE DHT RESULT for key `%s'.\n", &enc);#endif routeResult (&result->key, ntohl (result->type), ntohs (result->header.size) - sizeof (DHT_MESSAGE), (const char *) &result[1], (void *) msg); return GNUNET_OK;}/** * Start a DHT get operation. */intGNUNET_DHT_get_start (const GNUNET_HashCode * key, unsigned int type, GNUNET_ResultProcessor handler, void *cls){ DHT_MESSAGE get;#if DEBUG_ROUTING GNUNET_EncName enc;#endif get.header.size = htons (sizeof (DHT_MESSAGE)); get.header.type = htons (GNUNET_P2P_PROTO_DHT_GET); get.type = htonl (type); get.hop_count = htonl (0); get.network_size = htonl (GNUNET_DHT_estimate_network_diameter ()); get.key = *key;#if DEBUG_ROUTING GNUNET_hash_to_enc (&get.key, &enc); GNUNET_GE_LOG (coreAPI->ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_DEVELOPER, "Initiating DHT GET (based on local request) for key `%s'.\n", &enc);#endif if (GNUNET_OK != addRoute (NULL, handler, cls, &get)) return GNUNET_SYSERR; handleGet (NULL, &get.header); return GNUNET_OK;}/** * Stop a DHT get operation (prevents calls to * the given iterator). */intGNUNET_DHT_get_stop (const GNUNET_HashCode * key, unsigned int type, GNUNET_ResultProcessor handler, void *cls){ unsigned int i; struct DHT_Source_Route *pos; struct DHT_Source_Route *prev; int done; done = GNUNET_NO; GNUNET_mutex_lock (lock); for (i = 0; i < rt_size; i++) { prev = NULL; pos = records[i].sources; while (pos != NULL) { if ((pos->receiver == handler) && (pos->receiver_closure == cls) && (0 == memcmp (key, &records[i].get.key, sizeof (GNUNET_HashCode)))) { if (prev == NULL) records[i].sources = pos->next; else prev->next = pos->next; GNUNET_free (pos); done = GNUNET_YES; break; } prev = pos; pos = prev->next; } if (records[i].sources == NULL) { GNUNET_array_grow (records[i].results, records[i].result_count, 0); records[i].expire = 0; } if (done == GNUNET_YES) break; } GNUNET_mutex_unlock (lock); if (done != GNUNET_YES) return GNUNET_SYSERR; return GNUNET_OK;}/** * Perform a DHT put operation. Note that PUT operations always * expire after a period of time and the client is responsible for * doing periodic refreshs. The given expiration time is ONLY used to * ensure that the datum is certainly deleted by that time (it maybe * deleted earlier). * * @param expiration_time absolute expiration time */intGNUNET_DHT_put (const GNUNET_HashCode * key, unsigned int type, unsigned int size, const char *data){ DHT_MESSAGE *put; put = GNUNET_malloc (sizeof (DHT_MESSAGE) + size); put->header.size = htons (sizeof (DHT_MESSAGE) + size); put->header.type = htons (GNUNET_P2P_PROTO_DHT_PUT); put->key = *key; put->type = htonl (type); put->hop_count = htonl (0); put->network_size = htonl (GNUNET_DHT_estimate_network_diameter ()); memcpy (&put[1], data, size); handlePut (NULL, &put->header); GNUNET_free (put); return GNUNET_OK;}/** * We have additional "free" bandwidth available. * Possibly find a good query to add to the message * to the given receiver. * * @param padding maximum number of bytes available * @return number of bytes added at position */static unsigned intextra_get_callback (const GNUNET_PeerIdentity * receiver, void *position, unsigned int padding){ /* FIXME */ return 0;}/** * Initialize routing DHT component. * * @param capi the core API * @return GNUNET_OK on success */intGNUNET_DHT_init_routing (GNUNET_CoreAPIForPlugins * capi){ unsigned long long rts; coreAPI = capi; rts = 65536; GNUNET_GC_get_configuration_value_number (coreAPI->cfg, "DHT", "TABLESIZE", 128, 1024 * 1024, 1024, &rts); dstore = coreAPI->service_request ("dstore"); if (dstore == NULL) return GNUNET_SYSERR; GNUNET_array_grow (records, rt_size, rts); lock = GNUNET_mutex_create (GNUNET_NO); stats = capi->service_request ("stats"); if (stats != NULL) { stat_replies_routed = stats->create (gettext_noop ("# dht replies routed")); stat_requests_routed = stats->create (gettext_noop ("# dht requests routed")); stat_get_requests_received = stats->create (gettext_noop ("# dht get requests received")); stat_put_requests_received = stats->create (gettext_noop ("# dht put requests received")); stat_results_received = stats->create (gettext_noop ("# dht results received")); } GNUNET_GE_LOG (coreAPI->ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER, _("`%s' registering p2p handlers: %d %d %d\n"), "dht", GNUNET_P2P_PROTO_DHT_GET, GNUNET_P2P_PROTO_DHT_PUT, GNUNET_P2P_PROTO_DHT_RESULT); coreAPI->p2p_ciphertext_handler_register (GNUNET_P2P_PROTO_DHT_GET, &handleGet); coreAPI->p2p_ciphertext_handler_register (GNUNET_P2P_PROTO_DHT_PUT, &handlePut); coreAPI->p2p_ciphertext_handler_register (GNUNET_P2P_PROTO_DHT_RESULT, &handleResult); coreAPI->send_callback_register (sizeof (DHT_MESSAGE), 0, &extra_get_callback); return GNUNET_OK;}/** * Shutdown routing DHT component. * * @return GNUNET_OK on success */intGNUNET_DHT_done_routing (){ unsigned int i; struct DHT_Source_Route *pos; coreAPI->send_callback_unregister (sizeof (DHT_MESSAGE), &extra_get_callback); coreAPI->p2p_ciphertext_handler_unregister (GNUNET_P2P_PROTO_DHT_GET, &handleGet); coreAPI->p2p_ciphertext_handler_unregister (GNUNET_P2P_PROTO_DHT_PUT, &handlePut); coreAPI->p2p_ciphertext_handler_unregister (GNUNET_P2P_PROTO_DHT_RESULT, &handleResult); if (stats != NULL) { coreAPI->service_release (stats); stats = NULL; } GNUNET_mutex_destroy (lock); for (i = 0; i < rt_size; i++) { while (records[i].sources != NULL) { pos = records[i].sources; records[i].sources = pos->next; GNUNET_free (pos); } GNUNET_array_grow (records[i].results, records[i].result_count, 0); } GNUNET_array_grow (records, rt_size, 0); coreAPI->service_release (dstore); return GNUNET_OK;}/* end of routing.c */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -