📄 rpc.c
字号:
/* This file is part of GNUnet (C) 2003, 2005, 2008 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2, or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.*//** * @file rpc/module/rpc.c * @brief Implementation of RPCs * @author Antti Salonen, Christian Grothoff */#include "platform.h"#include "gnunet_util.h"#include "gnunet_protocols.h"#include "gnunet_rpc_service.h"#include "gnunet_rpc_lib.h"/** * Maximum number of concurrent RPCs that we support per peer. */#define RPC_MAX_REQUESTS_PER_PEER 16/** * Maximum number of retries done for sending of responses. */#define RPC_MAX_REPLY_ATTEMPTS 3/** * Granularity for the RPC cron job. */#define RPC_CRON_FREQUENCY (500 * GNUNET_CRON_MILLISECONDS)/** * Initial minimum delay between retry attempts for RPC messages * (before we figure out how fast the connection really is). */#define RPC_INITIAL_ROUND_TRIP_TIME (15 * GNUNET_CRON_SECONDS)/** * After what time do we time-out every request (if it is not * repeated)? */#define RPC_INTERNAL_PROCESSING_TIMEOUT (2 * GNUNET_CRON_MINUTES)/** * @brief Request to execute an function call on the remote peer. The * message is of variable size to pass arguments. Requests and reply * messages use the same struct, the only difference is in the value * of the header.type field. */typedef struct{ GNUNET_MessageHeader header; /** * Timestamp (of the sender of this message). */ GNUNET_Int32Time timestamp; /** * Sequence number (of the initiator). */ unsigned int sequenceNumber; /** * How important is this message? */ unsigned int importance; /** * Number of arguments or return values. Must be 0 * if this message communicates an error. */ unsigned int argumentCount; /** * For the request, this is the length of the * name of the function. For a response, * this is the status. */ unsigned int functionNameLength;} P2P_rpc_MESSAGE;/** * An ACK message. An ACK acknowledges the receiving a reply to an * RPC call (three-way handshake). Without an ACK, the receiver of an * RPC request is supposed to repeatedly send the RPC reply (until it * times out). */typedef struct{ GNUNET_MessageHeader header; /** * The number of the original request for which this is the * ACK. */ unsigned int sequenceNumber;} RPC_ACK_Message;/** * These structures are allocated while a peer * is handling an RPC request. */struct GNUNET_RPC_CallHandle{ struct GNUNET_RPC_CallHandle *next; struct GNUNET_RPC_CallHandle *prev; /** * The message we are transmitting. NULL * if our local RPC invocation has not * yet completed. NON-NULL if we are * waiting for the ACK. */ P2P_rpc_MESSAGE *msg; /** * Name of the local RPC function that we * have been calling. */ char *function_name; /** * For which peer is this response? */ GNUNET_PeerIdentity initiator; /** * Time where this record times out (timeout value for original * request, fixed timeout for reply if no further requests are * received; once we send the ACK the record of the sender is * discarded; we always send additional ACKs even if we don't have a * matching record anymore). */ GNUNET_CronTime expirationTime; /** * Frequency at which we currently repeat the message. Initially * set to the round-trip estimate, with exponential back-off. */ GNUNET_CronTime repetitionFrequency; /** * Last time the message was sent. */ GNUNET_CronTime lastAttempt; /** * Number of times we have attempted to transmit. */ unsigned int attempts; /** * Error code for the response. */ unsigned int errorCode; /** * The sequence number of this RPC. */ unsigned int sequenceNumber; /** * How important is this RPC? */ unsigned int importance;};/** * These structures are allocated while a peer * is waiting for a remote RPC to return a result. */struct GNUNET_RPC_RequestHandle{ struct GNUNET_RPC_RequestHandle *next; struct GNUNET_RPC_RequestHandle *prev; /** * The message we are transmitting. */ P2P_rpc_MESSAGE *msg; /** * Function to call once we get a reply. */ GNUNET_RPC_AsynchronousCompletionCallback callback; void *cls; /** * To which peer are we sending the request? */ GNUNET_PeerIdentity receiver; /** * Time where this record times out (timeout value for original * request, fixed timeout for reply if no further requests are * received; once we send the ACK the record of the sender is * discarded; we always send additional ACKs even if we don't have a * matching record anymore). */ GNUNET_CronTime expirationTime; /** * Frequency at which we currently repeat the message. Initially * set to the round-trip estimate, with exponential back-off. */ GNUNET_CronTime repetitionFrequency; /** * Last time the message was sent. */ GNUNET_CronTime lastAttempt; /** * The sequence number of this RPC. */ unsigned int sequenceNumber; /** * Number of times we have attempted to transmit. */ unsigned int attempts; /** * How important is this RPC? */ unsigned int importance; /** * Error code for the response. */ unsigned int errorCode;};/** * List of RPC handlers registered by the local node. */struct RegisteredRPC{ struct RegisteredRPC *next; /** * Name of the RPC. */ char *name; /** * Callback for an asynchronous RPC. */ GNUNET_RPC_AsynchronousFunction async_callback; /** * Extra argument to async_callback. */ void *cls;};/** * A set of RegisteredRPC structures, one for each RPC registered by the * local node. */static struct RegisteredRPC *list_of_callbacks;/** * A set of GNUNET_RPC_CallHandle structures for active incoming rpc calls. * (requests without a reply). */static struct GNUNET_RPC_CallHandle *incomingCalls;/** * Linked list active outgoing rpc calls. * (waiting for function and reply messages without an ACK). */static struct GNUNET_RPC_RequestHandle *outgoingCalls;/** * A counter whose value is used for identifying the RPC's originating * from the local node. The value of the counter is incremented after each * RPC and thus its value also tells the number of RPC's originated from the * local node (modulo integer overflow). */static unsigned int rpcIdentifier;/** * Access to GNUnet core API. */static GNUNET_CoreAPIForPlugins *coreAPI;/** * A mutex for synchronous access to all module-wide data structures. This * lock must be held by the thread that accesses any module-wide accessable * data structures. */static struct GNUNET_Mutex *lock;/** * Registers an async RPC callback under the given name. * @param name the name of the callback, must not be NULL * @param callback the function to call * @return GNUNET_OK on success, GNUNET_SYSERR on error * (typically if a callback of that name is already in use). */static intRPC_register (const char *name, GNUNET_RPC_AsynchronousFunction callback, void *cls){ struct RegisteredRPC *rrpc; GNUNET_GE_ASSERT (coreAPI->ectx, name != NULL); GNUNET_GE_ASSERT (coreAPI->ectx, callback != NULL); GNUNET_mutex_lock (lock); rrpc = list_of_callbacks; while (rrpc != NULL) { if (0 == strcmp (rrpc->name, name)) { GNUNET_mutex_unlock (lock); GNUNET_GE_LOG (coreAPI->ectx, GNUNET_GE_WARNING | GNUNET_GE_BULK | GNUNET_GE_USER, _("%s:%d - RPC %s:%p could not be registered:" " another callback is already using this name (%p)\n"), __FILE__, __LINE__, name, callback, rrpc->async_callback); return GNUNET_SYSERR; } rrpc = rrpc->next; } rrpc = GNUNET_malloc (sizeof (struct RegisteredRPC)); rrpc->name = GNUNET_strdup (name); rrpc->async_callback = callback; rrpc->cls = cls; rrpc->next = list_of_callbacks; list_of_callbacks = rrpc; GNUNET_mutex_unlock (lock); return GNUNET_OK;}/** * Unregisters an asynchronous RPC callback of the given name. * @param name the name of the callback, must not be NULL * @param callback the function to unregister, NULL for any function * @return GNUNET_OK on success, GNUNET_SYSERR on error * (typically if a callback of that name does not exist or is * bound to a different function). */static intRPC_unregister (const char *name, GNUNET_RPC_AsynchronousFunction callback, void *cls){ struct RegisteredRPC *pos; struct RegisteredRPC *prev; GNUNET_GE_ASSERT (NULL, NULL == incomingCalls); GNUNET_GE_ASSERT (coreAPI->ectx, name != NULL); GNUNET_mutex_lock (lock); prev = NULL; pos = list_of_callbacks; while (pos != NULL) { if ((0 == strcmp (pos->name, name)) && (pos->async_callback == callback) && (pos->cls == cls)) { if (prev == NULL) list_of_callbacks = pos->next; else prev->next = pos->next; GNUNET_free (pos->name); GNUNET_free (pos); GNUNET_mutex_unlock (lock); return GNUNET_OK; } prev = pos; pos = pos->next; } GNUNET_mutex_unlock (lock); GNUNET_GE_LOG (coreAPI->ectx, GNUNET_GE_WARNING | GNUNET_GE_BULK | GNUNET_GE_USER, _ ("%s:%d - async RPC %s:%p could not be unregistered: not found\n"), __FILE__, __LINE__, name, callback); return GNUNET_SYSERR;}/** * Get the name of the RPC function. */static char *RPC_get_function_name (const P2P_rpc_MESSAGE * req){ char *ret; unsigned int slen; slen = ntohl (req->functionNameLength); if ((ntohs (req->header.size) < sizeof (P2P_rpc_MESSAGE) + slen) || (sizeof (P2P_rpc_MESSAGE) + slen < sizeof (P2P_rpc_MESSAGE))) return NULL; /* invalid! */ ret = GNUNET_malloc (slen + 1); memcpy (ret, &req[1], slen); ret[slen] = '\0'; return ret;}/** * Get the arguments (or return value) from * the request. */static struct GNUNET_RPC_CallParameters *RPC_deserialize_arguments (const P2P_rpc_MESSAGE * req){ unsigned int slen; struct GNUNET_RPC_CallParameters *ret; if (ntohs (req->header.type) == GNUNET_P2P_PROTO_RPC_REQ) slen = ntohl (req->functionNameLength); else slen = 0; if ((ntohs (req->header.size) < sizeof (P2P_rpc_MESSAGE) + slen) || (sizeof (P2P_rpc_MESSAGE) + slen < sizeof (P2P_rpc_MESSAGE))) return NULL; /* invalid! */ ret = GNUNET_RPC_parameters_deserialize (&((char *) &req[1])[slen], ntohs (req->header.size) - sizeof (P2P_rpc_MESSAGE) - slen); if (GNUNET_RPC_parameters_count (ret) != ntohs (req->argumentCount)) { GNUNET_RPC_parameters_destroy (ret); return NULL; /* invalid! */ } return ret;}/** * Send an ACK message. */static voidRPC_send_ack (const GNUNET_PeerIdentity * receiver, unsigned int sequenceNumber, unsigned int importance, unsigned int maxDelay){ RPC_ACK_Message msg; msg.header.size = htons (sizeof (RPC_ACK_Message)); msg.header.type = htons (GNUNET_P2P_PROTO_RPC_ACK); msg.sequenceNumber = htonl (sequenceNumber); coreAPI->ciphertext_send (receiver, &msg.header, importance, maxDelay);}/** * Build an RPC message serializing the name and values * properly. * * @param errorCode the status code for the message, if non-NULL * values will be NULL * @param name the name of the target method, NULL for a reply. * @param sequenceNumber the unique ID of the message * @param values the arguments or return values, maybe NULL * @return the RPC message to transmit, caller must free */static P2P_rpc_MESSAGE *RPC_build_message (unsigned short errorCode, const char *name, unsigned int sequenceNumber, unsigned int importance, const struct GNUNET_RPC_CallParameters *values){ P2P_rpc_MESSAGE *ret; size_t size = sizeof (P2P_rpc_MESSAGE); int slen; if (name != NULL) slen = strlen (name); else slen = 0; size += slen; if (values != NULL)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -