⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rpc.c

📁 GNUnet是一个安全的点对点网络框架
💻 C
📖 第 1 页 / 共 2 页
字号:
    size += GNUNET_RPC_parameters_get_serialized_size (values);  if (size >= GNUNET_MAX_BUFFER_SIZE)    return NULL;                /* message to big! */  ret = GNUNET_malloc (size);  ret->header.size = htons (size);  ret->header.type =    htons ((name ==            NULL) ? GNUNET_P2P_PROTO_RPC_RES : GNUNET_P2P_PROTO_RPC_REQ);  ret->timestamp = htonl (GNUNET_get_time_int32 (NULL));  ret->sequenceNumber = htonl (sequenceNumber);  ret->importance = htonl (importance);  if (name == NULL)    ret->functionNameLength = htonl (errorCode);  else    ret->functionNameLength = htonl (slen);  ret->argumentCount =    htonl ((values == NULL) ? 0 : GNUNET_RPC_parameters_count (values));  if (name != NULL)    memcpy (&ret[1], name, slen);  GNUNET_RPC_parameters_serialize (values, &((char *) &ret[1])[slen]);  return ret;}/* ***************** RPC P2P message handlers **************** *//** * Function called to communicate the return value of * an RPC to the peer that initiated it. */static voidRPC_complete (const struct GNUNET_RPC_CallParameters *results,              int errorCode, struct GNUNET_RPC_CallHandle *call){  GNUNET_mutex_lock (lock);  GNUNET_GE_ASSERT (NULL, call->msg == NULL);  call->msg = RPC_build_message (errorCode,                                 NULL,                                 call->sequenceNumber,                                 call->importance, results);  if (call->msg == NULL)    call->msg = RPC_build_message (GNUNET_RPC_ERROR_RETURN_VALUE_TOO_LARGE,                                   NULL,                                   call->sequenceNumber,                                   call->importance, results);  call->lastAttempt = GNUNET_get_time ();  call->repetitionFrequency = RPC_INITIAL_ROUND_TRIP_TIME;  call->attempts = 1;  call->errorCode = errorCode;  coreAPI->ciphertext_send (&call->initiator,                            &call->msg->header,                            call->importance,                            RPC_INITIAL_ROUND_TRIP_TIME / 2);  GNUNET_mutex_unlock (lock);}/** * Handle request for remote function call.  Checks if message * has been seen before, if not performs the call and sends * reply. */static inthandleRPCMessageReq (const GNUNET_PeerIdentity * sender,                     const GNUNET_MessageHeader * message){  const P2P_rpc_MESSAGE *req;  struct GNUNET_RPC_CallHandle *pos;  struct GNUNET_RPC_CallParameters *argumentValues;  const struct RegisteredRPC *rpc;  unsigned int sq;  unsigned int total;  char *functionName;  if (ntohs (message->size) < sizeof (P2P_rpc_MESSAGE))    {      GNUNET_GE_BREAK_OP (NULL, 0);      return GNUNET_SYSERR;    }  req = (const P2P_rpc_MESSAGE *) message;  functionName = RPC_get_function_name (req);  if (functionName == NULL)    {      GNUNET_GE_BREAK_OP (NULL, 0);      return GNUNET_SYSERR;    }  argumentValues = RPC_deserialize_arguments (req);  if (argumentValues == NULL)    {      GNUNET_free (functionName);      GNUNET_GE_BREAK_OP (NULL, 0);      return GNUNET_SYSERR;     /* message malformed */    }  sq = ntohl (req->sequenceNumber);  /* check if message is already in incomingCalls! */  GNUNET_mutex_lock (lock);  pos = incomingCalls;  total = 0;  while ((pos != NULL) &&         ((pos->sequenceNumber != sq) ||          (0 != memcmp (&pos->initiator,                        sender, sizeof (GNUNET_PeerIdentity)))))    {      if (0 == memcmp (&pos->initiator, sender, sizeof (GNUNET_PeerIdentity)))        total++;      pos = pos->next;    }  if ((pos != NULL) || (total > RPC_MAX_REQUESTS_PER_PEER))    {      /* already pending or too many pending */      GNUNET_free (functionName);      GNUNET_RPC_parameters_destroy (argumentValues);      GNUNET_mutex_unlock (lock);      return GNUNET_SYSERR;    }  /* find matching RPC function */  rpc = list_of_callbacks;  while (rpc != NULL)    {      if (0 == strcmp (functionName, rpc->name))        break;      rpc = rpc->next;    }  /* create call handle */  pos = GNUNET_malloc (sizeof (struct GNUNET_RPC_CallHandle));  memset (pos, 0, sizeof (struct GNUNET_RPC_CallHandle));  pos->function_name = functionName;  pos->sequenceNumber = sq;  pos->initiator = *sender;  pos->expirationTime = GNUNET_get_time () + RPC_INTERNAL_PROCESSING_TIMEOUT;  pos->importance = ntohl (req->importance);  pos->next = incomingCalls;  if (incomingCalls != NULL)    incomingCalls->prev = pos;  incomingCalls = pos;  if (rpc == NULL)    RPC_complete (NULL, GNUNET_RPC_ERROR_UNKNOWN_FUNCTION, pos);  else    rpc->async_callback (rpc->cls, sender, argumentValues, pos);  GNUNET_RPC_parameters_destroy (argumentValues);  GNUNET_mutex_unlock (lock);  return GNUNET_OK;}/** * Handle reply for request for remote function call.  Checks * if we are waiting for a reply, if so triggers the callback. * Also always sends an ACK. */static inthandleRPCMessageRes (const GNUNET_PeerIdentity * sender,                     const GNUNET_MessageHeader * message){  const P2P_rpc_MESSAGE *res;  struct GNUNET_RPC_RequestHandle *pos;  struct GNUNET_RPC_CallParameters *reply;  unsigned int error;  if (ntohs (message->size) < sizeof (P2P_rpc_MESSAGE))    {      GNUNET_GE_BREAK_OP (NULL, 0);      return GNUNET_SYSERR;    }  res = (const P2P_rpc_MESSAGE *) message;  RPC_send_ack (sender,                ntohl (res->sequenceNumber), ntohl (res->importance), 0);  /* Locate the GNUNET_RPC_CallHandle structure. */  GNUNET_mutex_lock (lock);  pos = outgoingCalls;  while (pos != NULL)    {      if ((0 == memcmp (&pos->receiver,                        sender,                        sizeof (GNUNET_PeerIdentity))) &&          (pos->sequenceNumber == ntohl (res->sequenceNumber)))        break;      pos = pos->next;    }  if (pos == NULL)    {      /* duplicate reply */      GNUNET_mutex_unlock (lock);      return GNUNET_OK;    }  /* remove pos from linked list */  GNUNET_mutex_unlock (lock);  /* call callback */  reply = NULL;  error = ntohl (res->functionNameLength);  if (error == GNUNET_RPC_ERROR_OK)    reply = GNUNET_RPC_parameters_deserialize ((char *) &res[1],                                               ntohs (message->size) -                                               sizeof (P2P_rpc_MESSAGE));  if (ntohl (res->argumentCount) != GNUNET_RPC_parameters_count (reply))    {      GNUNET_RPC_parameters_destroy (reply);      reply = NULL;      error = GNUNET_RPC_ERROR_REPLY_MALFORMED;    }  if (pos->callback != NULL)    {      pos->callback (sender, reply, error, pos->cls);      pos->callback = NULL;      pos->errorCode = error;    }  if (reply != NULL)    GNUNET_RPC_parameters_destroy (reply);  return GNUNET_OK;}/** * Handle a peer-to-peer message of type GNUNET_P2P_PROTO_RPC_ACK. */static inthandleRPCMessageAck (const GNUNET_PeerIdentity * sender,                     const GNUNET_MessageHeader * message){  const RPC_ACK_Message *ack;  struct GNUNET_RPC_CallHandle *pos;  if (ntohs (message->size) != sizeof (RPC_ACK_Message))    {      GNUNET_GE_BREAK_OP (NULL, 0);      return GNUNET_SYSERR;    }  ack = (const RPC_ACK_Message *) message;  GNUNET_mutex_lock (lock);  /* Locate the GNUNET_RPC_CallHandle structure. */  pos = incomingCalls;  while (pos != NULL)    {      if ((0 == memcmp (&pos->initiator,                        sender,                        sizeof (GNUNET_PeerIdentity))) &&          (pos->sequenceNumber == ntohl (ack->sequenceNumber)))        break;      pos = pos->next;    }  if (pos == NULL)    {      /* duplicate ACK, ignore */      GNUNET_mutex_unlock (lock);      return GNUNET_OK;    }  /* remove from list */  if (pos->prev == NULL)    incomingCalls = pos->next;  else    pos->prev->next = pos->next;  if (pos->next != NULL)    pos->next->prev = pos->prev;  GNUNET_free (pos->msg);  GNUNET_free (pos->function_name);  GNUNET_free (pos);  GNUNET_mutex_unlock (lock);  return GNUNET_OK;}/** * Start an asynchronous RPC. * * @param timeout when should we stop trying the RPC * @param callback function to call with the return value from *        the RPC * @param closure extra argument to callback * @return value required to stop the RPC (and the RPC must *  be explicitly stopped to free resources!) */static struct GNUNET_RPC_RequestHandle *RPC_start (const GNUNET_PeerIdentity * receiver,           const char *name,           const struct GNUNET_RPC_CallParameters *request_param,           unsigned int importance,           GNUNET_CronTime timeout,           GNUNET_RPC_AsynchronousCompletionCallback callback, void *closure){  struct GNUNET_RPC_RequestHandle *ret;  if (timeout > 1 * GNUNET_CRON_HOURS)    timeout = 1 * GNUNET_CRON_HOURS;  ret = GNUNET_malloc (sizeof (struct GNUNET_RPC_RequestHandle));  memset (ret, 0, sizeof (struct GNUNET_RPC_RequestHandle));  ret->receiver = *receiver;  ret->callback = callback;  ret->cls = closure;  ret->expirationTime = GNUNET_get_time () + timeout;  ret->lastAttempt = 0;  ret->attempts = 0;  ret->sequenceNumber = rpcIdentifier++;  ret->msg = RPC_build_message (GNUNET_RPC_ERROR_OK,                                name,                                ret->sequenceNumber,                                importance, request_param);  ret->repetitionFrequency = RPC_INITIAL_ROUND_TRIP_TIME;  GNUNET_mutex_lock (lock);  ret->next = outgoingCalls;  outgoingCalls = ret;  if (ret->next != NULL)    ret->next->prev = ret;  GNUNET_mutex_unlock (lock);  coreAPI->ciphertext_send (receiver,                            &ret->msg->header,                            importance, RPC_INITIAL_ROUND_TRIP_TIME / 2);  return ret;}/** * Stop an asynchronous RPC (and free associated resources) * * @param record the return value from RPC_start * @return GNUNET_RPC_ERROR_OK if the RPC was successful, *  another RPC_ERROR code if it was aborted */static intRPC_stop (struct GNUNET_RPC_RequestHandle *record){  int ret;  GNUNET_mutex_lock (lock);  if (record->prev == NULL)    outgoingCalls = record->next;  else    record->prev->next = record->next;  if (record->next != NULL)    record->next->prev = record->prev;  GNUNET_free (record->msg);  GNUNET_mutex_unlock (lock);  ret =    (record->callback == NULL) ? record->errorCode : GNUNET_RPC_ERROR_ABORTED;  GNUNET_free (record);  return ret;}/** * Cron-job that processes the RPC queues.  This job is responsible * for retransmission of requests and un-ACKed responses.  It is also * there to trigger timeouts. */static voidRPC_retry_job (void *unused){  GNUNET_CronTime now;  struct GNUNET_RPC_CallHandle *ipos;  struct GNUNET_RPC_RequestHandle *opos;  GNUNET_mutex_lock (lock);  now = GNUNET_get_time ();  ipos = incomingCalls;  while (ipos != NULL)    {      if ((ipos->expirationTime < now) ||          (ipos->attempts >= RPC_MAX_REPLY_ATTEMPTS))        {          GNUNET_free_non_null (ipos->msg);          GNUNET_free (ipos->function_name);          if (ipos->prev == NULL)            incomingCalls = ipos->next;          else            ipos->prev->next = ipos->next;          if (ipos->next != NULL)            ipos->next = ipos->prev;          GNUNET_free (ipos);          ipos = incomingCalls;          continue;        }      if ((ipos->msg != NULL) &&          (ipos->lastAttempt + ipos->repetitionFrequency < now))        {          ipos->lastAttempt = now;          ipos->attempts++;          ipos->repetitionFrequency *= 2;          coreAPI->ciphertext_send (&ipos->initiator,                                    &ipos->msg->header,                                    ipos->repetitionFrequency / 2,                                    ipos->importance);        }      ipos = ipos->next;    }  opos = outgoingCalls;  while (opos != NULL)    {      if (opos->expirationTime < now)        {          if (opos->callback != NULL)            {              opos->callback (&opos->receiver,                              NULL, GNUNET_RPC_ERROR_TIMEOUT, opos->cls);              opos->callback = NULL;            }          GNUNET_free_non_null (opos->msg);          if (opos->prev == NULL)            outgoingCalls = opos->next;          else            opos->prev->next = opos->next;          if (opos->next != NULL)            opos->next = opos->prev;          GNUNET_free (opos);          opos = outgoingCalls;          continue;        }      if (opos->lastAttempt + opos->repetitionFrequency < now)        {          opos->lastAttempt = now;          opos->attempts++;          opos->repetitionFrequency *= 2;          coreAPI->ciphertext_send (&opos->receiver,                                    &opos->msg->header,                                    opos->repetitionFrequency / 2,                                    opos->importance);        }      opos = opos->next;    }  GNUNET_mutex_unlock (lock);}/* ******************* Exported functions ******************* *//** * Shutdown RPC service. */voidrelease_module_rpc (){  coreAPI->p2p_ciphertext_handler_unregister (GNUNET_P2P_PROTO_RPC_REQ,                                              &handleRPCMessageReq);  coreAPI->p2p_ciphertext_handler_unregister (GNUNET_P2P_PROTO_RPC_RES,                                              &handleRPCMessageRes);  coreAPI->p2p_ciphertext_handler_unregister (GNUNET_P2P_PROTO_RPC_ACK,                                              &handleRPCMessageAck);  GNUNET_GE_ASSERT (NULL, NULL == incomingCalls);  GNUNET_GE_ASSERT (NULL, NULL == outgoingCalls);  GNUNET_GE_ASSERT (NULL, NULL == list_of_callbacks);  GNUNET_cron_del_job (coreAPI->cron,                       &RPC_retry_job, RPC_CRON_FREQUENCY, NULL);  coreAPI = NULL;  lock = NULL;}/** * Initialize the RPC service. */GNUNET_RPC_ServiceAPI *provide_module_rpc (GNUNET_CoreAPIForPlugins * capi){  static GNUNET_RPC_ServiceAPI rpcAPI;  int rvalue;  lock = capi->global_lock_get ();  coreAPI = capi;  GNUNET_GE_LOG (coreAPI->ectx,                 GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,                 _("`%s' registering handlers %d %d %d\n"), "rpc",                 GNUNET_P2P_PROTO_RPC_REQ, GNUNET_P2P_PROTO_RPC_RES,                 GNUNET_P2P_PROTO_RPC_ACK);  rvalue = GNUNET_OK;  if (capi->p2p_ciphertext_handler_register (GNUNET_P2P_PROTO_RPC_REQ,                                             &handleRPCMessageReq) ==      GNUNET_SYSERR)    rvalue = GNUNET_SYSERR;  if (capi->p2p_ciphertext_handler_register (GNUNET_P2P_PROTO_RPC_RES,                                             &handleRPCMessageRes) ==      GNUNET_SYSERR)    rvalue = GNUNET_SYSERR;  if (capi->p2p_ciphertext_handler_register (GNUNET_P2P_PROTO_RPC_ACK,                                             &handleRPCMessageAck) ==      GNUNET_SYSERR)    rvalue = GNUNET_SYSERR;  if (rvalue == GNUNET_SYSERR)    {      release_module_rpc ();      GNUNET_GE_LOG (coreAPI->ectx,                     GNUNET_GE_WARNING | GNUNET_GE_BULK | GNUNET_GE_USER,                     _("Failed to initialize `%s' service.\n"), "rpc");      return NULL;    }  GNUNET_cron_add_job (coreAPI->cron,                       &RPC_retry_job,                       RPC_CRON_FREQUENCY, RPC_CRON_FREQUENCY, NULL);  rpcAPI.RPC_register = &RPC_register;  rpcAPI.RPC_unregister = &RPC_unregister;  rpcAPI.RPC_complete = &RPC_complete;  rpcAPI.RPC_start = &RPC_start;  rpcAPI.RPC_stop = &RPC_stop;  return &rpcAPI;}/* end of rpc.c */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -