📄 rpc.c
字号:
size += GNUNET_RPC_parameters_get_serialized_size (values); if (size >= GNUNET_MAX_BUFFER_SIZE) return NULL; /* message to big! */ ret = GNUNET_malloc (size); ret->header.size = htons (size); ret->header.type = htons ((name == NULL) ? GNUNET_P2P_PROTO_RPC_RES : GNUNET_P2P_PROTO_RPC_REQ); ret->timestamp = htonl (GNUNET_get_time_int32 (NULL)); ret->sequenceNumber = htonl (sequenceNumber); ret->importance = htonl (importance); if (name == NULL) ret->functionNameLength = htonl (errorCode); else ret->functionNameLength = htonl (slen); ret->argumentCount = htonl ((values == NULL) ? 0 : GNUNET_RPC_parameters_count (values)); if (name != NULL) memcpy (&ret[1], name, slen); GNUNET_RPC_parameters_serialize (values, &((char *) &ret[1])[slen]); return ret;}/* ***************** RPC P2P message handlers **************** *//** * Function called to communicate the return value of * an RPC to the peer that initiated it. */static voidRPC_complete (const struct GNUNET_RPC_CallParameters *results, int errorCode, struct GNUNET_RPC_CallHandle *call){ GNUNET_mutex_lock (lock); GNUNET_GE_ASSERT (NULL, call->msg == NULL); call->msg = RPC_build_message (errorCode, NULL, call->sequenceNumber, call->importance, results); if (call->msg == NULL) call->msg = RPC_build_message (GNUNET_RPC_ERROR_RETURN_VALUE_TOO_LARGE, NULL, call->sequenceNumber, call->importance, results); call->lastAttempt = GNUNET_get_time (); call->repetitionFrequency = RPC_INITIAL_ROUND_TRIP_TIME; call->attempts = 1; call->errorCode = errorCode; coreAPI->ciphertext_send (&call->initiator, &call->msg->header, call->importance, RPC_INITIAL_ROUND_TRIP_TIME / 2); GNUNET_mutex_unlock (lock);}/** * Handle request for remote function call. Checks if message * has been seen before, if not performs the call and sends * reply. */static inthandleRPCMessageReq (const GNUNET_PeerIdentity * sender, const GNUNET_MessageHeader * message){ const P2P_rpc_MESSAGE *req; struct GNUNET_RPC_CallHandle *pos; struct GNUNET_RPC_CallParameters *argumentValues; const struct RegisteredRPC *rpc; unsigned int sq; unsigned int total; char *functionName; if (ntohs (message->size) < sizeof (P2P_rpc_MESSAGE)) { GNUNET_GE_BREAK_OP (NULL, 0); return GNUNET_SYSERR; } req = (const P2P_rpc_MESSAGE *) message; functionName = RPC_get_function_name (req); if (functionName == NULL) { GNUNET_GE_BREAK_OP (NULL, 0); return GNUNET_SYSERR; } argumentValues = RPC_deserialize_arguments (req); if (argumentValues == NULL) { GNUNET_free (functionName); GNUNET_GE_BREAK_OP (NULL, 0); return GNUNET_SYSERR; /* message malformed */ } sq = ntohl (req->sequenceNumber); /* check if message is already in incomingCalls! */ GNUNET_mutex_lock (lock); pos = incomingCalls; total = 0; while ((pos != NULL) && ((pos->sequenceNumber != sq) || (0 != memcmp (&pos->initiator, sender, sizeof (GNUNET_PeerIdentity))))) { if (0 == memcmp (&pos->initiator, sender, sizeof (GNUNET_PeerIdentity))) total++; pos = pos->next; } if ((pos != NULL) || (total > RPC_MAX_REQUESTS_PER_PEER)) { /* already pending or too many pending */ GNUNET_free (functionName); GNUNET_RPC_parameters_destroy (argumentValues); GNUNET_mutex_unlock (lock); return GNUNET_SYSERR; } /* find matching RPC function */ rpc = list_of_callbacks; while (rpc != NULL) { if (0 == strcmp (functionName, rpc->name)) break; rpc = rpc->next; } /* create call handle */ pos = GNUNET_malloc (sizeof (struct GNUNET_RPC_CallHandle)); memset (pos, 0, sizeof (struct GNUNET_RPC_CallHandle)); pos->function_name = functionName; pos->sequenceNumber = sq; pos->initiator = *sender; pos->expirationTime = GNUNET_get_time () + RPC_INTERNAL_PROCESSING_TIMEOUT; pos->importance = ntohl (req->importance); pos->next = incomingCalls; if (incomingCalls != NULL) incomingCalls->prev = pos; incomingCalls = pos; if (rpc == NULL) RPC_complete (NULL, GNUNET_RPC_ERROR_UNKNOWN_FUNCTION, pos); else rpc->async_callback (rpc->cls, sender, argumentValues, pos); GNUNET_RPC_parameters_destroy (argumentValues); GNUNET_mutex_unlock (lock); return GNUNET_OK;}/** * Handle reply for request for remote function call. Checks * if we are waiting for a reply, if so triggers the callback. * Also always sends an ACK. */static inthandleRPCMessageRes (const GNUNET_PeerIdentity * sender, const GNUNET_MessageHeader * message){ const P2P_rpc_MESSAGE *res; struct GNUNET_RPC_RequestHandle *pos; struct GNUNET_RPC_CallParameters *reply; unsigned int error; if (ntohs (message->size) < sizeof (P2P_rpc_MESSAGE)) { GNUNET_GE_BREAK_OP (NULL, 0); return GNUNET_SYSERR; } res = (const P2P_rpc_MESSAGE *) message; RPC_send_ack (sender, ntohl (res->sequenceNumber), ntohl (res->importance), 0); /* Locate the GNUNET_RPC_CallHandle structure. */ GNUNET_mutex_lock (lock); pos = outgoingCalls; while (pos != NULL) { if ((0 == memcmp (&pos->receiver, sender, sizeof (GNUNET_PeerIdentity))) && (pos->sequenceNumber == ntohl (res->sequenceNumber))) break; pos = pos->next; } if (pos == NULL) { /* duplicate reply */ GNUNET_mutex_unlock (lock); return GNUNET_OK; } /* remove pos from linked list */ GNUNET_mutex_unlock (lock); /* call callback */ reply = NULL; error = ntohl (res->functionNameLength); if (error == GNUNET_RPC_ERROR_OK) reply = GNUNET_RPC_parameters_deserialize ((char *) &res[1], ntohs (message->size) - sizeof (P2P_rpc_MESSAGE)); if (ntohl (res->argumentCount) != GNUNET_RPC_parameters_count (reply)) { GNUNET_RPC_parameters_destroy (reply); reply = NULL; error = GNUNET_RPC_ERROR_REPLY_MALFORMED; } if (pos->callback != NULL) { pos->callback (sender, reply, error, pos->cls); pos->callback = NULL; pos->errorCode = error; } if (reply != NULL) GNUNET_RPC_parameters_destroy (reply); return GNUNET_OK;}/** * Handle a peer-to-peer message of type GNUNET_P2P_PROTO_RPC_ACK. */static inthandleRPCMessageAck (const GNUNET_PeerIdentity * sender, const GNUNET_MessageHeader * message){ const RPC_ACK_Message *ack; struct GNUNET_RPC_CallHandle *pos; if (ntohs (message->size) != sizeof (RPC_ACK_Message)) { GNUNET_GE_BREAK_OP (NULL, 0); return GNUNET_SYSERR; } ack = (const RPC_ACK_Message *) message; GNUNET_mutex_lock (lock); /* Locate the GNUNET_RPC_CallHandle structure. */ pos = incomingCalls; while (pos != NULL) { if ((0 == memcmp (&pos->initiator, sender, sizeof (GNUNET_PeerIdentity))) && (pos->sequenceNumber == ntohl (ack->sequenceNumber))) break; pos = pos->next; } if (pos == NULL) { /* duplicate ACK, ignore */ GNUNET_mutex_unlock (lock); return GNUNET_OK; } /* remove from list */ if (pos->prev == NULL) incomingCalls = pos->next; else pos->prev->next = pos->next; if (pos->next != NULL) pos->next->prev = pos->prev; GNUNET_free (pos->msg); GNUNET_free (pos->function_name); GNUNET_free (pos); GNUNET_mutex_unlock (lock); return GNUNET_OK;}/** * Start an asynchronous RPC. * * @param timeout when should we stop trying the RPC * @param callback function to call with the return value from * the RPC * @param closure extra argument to callback * @return value required to stop the RPC (and the RPC must * be explicitly stopped to free resources!) */static struct GNUNET_RPC_RequestHandle *RPC_start (const GNUNET_PeerIdentity * receiver, const char *name, const struct GNUNET_RPC_CallParameters *request_param, unsigned int importance, GNUNET_CronTime timeout, GNUNET_RPC_AsynchronousCompletionCallback callback, void *closure){ struct GNUNET_RPC_RequestHandle *ret; if (timeout > 1 * GNUNET_CRON_HOURS) timeout = 1 * GNUNET_CRON_HOURS; ret = GNUNET_malloc (sizeof (struct GNUNET_RPC_RequestHandle)); memset (ret, 0, sizeof (struct GNUNET_RPC_RequestHandle)); ret->receiver = *receiver; ret->callback = callback; ret->cls = closure; ret->expirationTime = GNUNET_get_time () + timeout; ret->lastAttempt = 0; ret->attempts = 0; ret->sequenceNumber = rpcIdentifier++; ret->msg = RPC_build_message (GNUNET_RPC_ERROR_OK, name, ret->sequenceNumber, importance, request_param); ret->repetitionFrequency = RPC_INITIAL_ROUND_TRIP_TIME; GNUNET_mutex_lock (lock); ret->next = outgoingCalls; outgoingCalls = ret; if (ret->next != NULL) ret->next->prev = ret; GNUNET_mutex_unlock (lock); coreAPI->ciphertext_send (receiver, &ret->msg->header, importance, RPC_INITIAL_ROUND_TRIP_TIME / 2); return ret;}/** * Stop an asynchronous RPC (and free associated resources) * * @param record the return value from RPC_start * @return GNUNET_RPC_ERROR_OK if the RPC was successful, * another RPC_ERROR code if it was aborted */static intRPC_stop (struct GNUNET_RPC_RequestHandle *record){ int ret; GNUNET_mutex_lock (lock); if (record->prev == NULL) outgoingCalls = record->next; else record->prev->next = record->next; if (record->next != NULL) record->next->prev = record->prev; GNUNET_free (record->msg); GNUNET_mutex_unlock (lock); ret = (record->callback == NULL) ? record->errorCode : GNUNET_RPC_ERROR_ABORTED; GNUNET_free (record); return ret;}/** * Cron-job that processes the RPC queues. This job is responsible * for retransmission of requests and un-ACKed responses. It is also * there to trigger timeouts. */static voidRPC_retry_job (void *unused){ GNUNET_CronTime now; struct GNUNET_RPC_CallHandle *ipos; struct GNUNET_RPC_RequestHandle *opos; GNUNET_mutex_lock (lock); now = GNUNET_get_time (); ipos = incomingCalls; while (ipos != NULL) { if ((ipos->expirationTime < now) || (ipos->attempts >= RPC_MAX_REPLY_ATTEMPTS)) { GNUNET_free_non_null (ipos->msg); GNUNET_free (ipos->function_name); if (ipos->prev == NULL) incomingCalls = ipos->next; else ipos->prev->next = ipos->next; if (ipos->next != NULL) ipos->next = ipos->prev; GNUNET_free (ipos); ipos = incomingCalls; continue; } if ((ipos->msg != NULL) && (ipos->lastAttempt + ipos->repetitionFrequency < now)) { ipos->lastAttempt = now; ipos->attempts++; ipos->repetitionFrequency *= 2; coreAPI->ciphertext_send (&ipos->initiator, &ipos->msg->header, ipos->repetitionFrequency / 2, ipos->importance); } ipos = ipos->next; } opos = outgoingCalls; while (opos != NULL) { if (opos->expirationTime < now) { if (opos->callback != NULL) { opos->callback (&opos->receiver, NULL, GNUNET_RPC_ERROR_TIMEOUT, opos->cls); opos->callback = NULL; } GNUNET_free_non_null (opos->msg); if (opos->prev == NULL) outgoingCalls = opos->next; else opos->prev->next = opos->next; if (opos->next != NULL) opos->next = opos->prev; GNUNET_free (opos); opos = outgoingCalls; continue; } if (opos->lastAttempt + opos->repetitionFrequency < now) { opos->lastAttempt = now; opos->attempts++; opos->repetitionFrequency *= 2; coreAPI->ciphertext_send (&opos->receiver, &opos->msg->header, opos->repetitionFrequency / 2, opos->importance); } opos = opos->next; } GNUNET_mutex_unlock (lock);}/* ******************* Exported functions ******************* *//** * Shutdown RPC service. */voidrelease_module_rpc (){ coreAPI->p2p_ciphertext_handler_unregister (GNUNET_P2P_PROTO_RPC_REQ, &handleRPCMessageReq); coreAPI->p2p_ciphertext_handler_unregister (GNUNET_P2P_PROTO_RPC_RES, &handleRPCMessageRes); coreAPI->p2p_ciphertext_handler_unregister (GNUNET_P2P_PROTO_RPC_ACK, &handleRPCMessageAck); GNUNET_GE_ASSERT (NULL, NULL == incomingCalls); GNUNET_GE_ASSERT (NULL, NULL == outgoingCalls); GNUNET_GE_ASSERT (NULL, NULL == list_of_callbacks); GNUNET_cron_del_job (coreAPI->cron, &RPC_retry_job, RPC_CRON_FREQUENCY, NULL); coreAPI = NULL; lock = NULL;}/** * Initialize the RPC service. */GNUNET_RPC_ServiceAPI *provide_module_rpc (GNUNET_CoreAPIForPlugins * capi){ static GNUNET_RPC_ServiceAPI rpcAPI; int rvalue; lock = capi->global_lock_get (); coreAPI = capi; GNUNET_GE_LOG (coreAPI->ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER, _("`%s' registering handlers %d %d %d\n"), "rpc", GNUNET_P2P_PROTO_RPC_REQ, GNUNET_P2P_PROTO_RPC_RES, GNUNET_P2P_PROTO_RPC_ACK); rvalue = GNUNET_OK; if (capi->p2p_ciphertext_handler_register (GNUNET_P2P_PROTO_RPC_REQ, &handleRPCMessageReq) == GNUNET_SYSERR) rvalue = GNUNET_SYSERR; if (capi->p2p_ciphertext_handler_register (GNUNET_P2P_PROTO_RPC_RES, &handleRPCMessageRes) == GNUNET_SYSERR) rvalue = GNUNET_SYSERR; if (capi->p2p_ciphertext_handler_register (GNUNET_P2P_PROTO_RPC_ACK, &handleRPCMessageAck) == GNUNET_SYSERR) rvalue = GNUNET_SYSERR; if (rvalue == GNUNET_SYSERR) { release_module_rpc (); GNUNET_GE_LOG (coreAPI->ectx, GNUNET_GE_WARNING | GNUNET_GE_BULK | GNUNET_GE_USER, _("Failed to initialize `%s' service.\n"), "rpc"); return NULL; } GNUNET_cron_add_job (coreAPI->cron, &RPC_retry_job, RPC_CRON_FREQUENCY, RPC_CRON_FREQUENCY, NULL); rpcAPI.RPC_register = &RPC_register; rpcAPI.RPC_unregister = &RPC_unregister; rpcAPI.RPC_complete = &RPC_complete; rpcAPI.RPC_start = &RPC_start; rpcAPI.RPC_stop = &RPC_stop; return &rpcAPI;}/* end of rpc.c */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -