replicacontroller.cpp

来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 541 行

CPP
541
字号
// file      : RolyPoly/ReplicaController.cpp
// author    : Boris Kolpackov <boris@dre.vanderbilt.edu>
// cvs-id    : ReplicaController.cpp,v 1.4 2003/12/23 20:52:20 bala Exp

#include "ace/UUID.h"
#include "ace/Thread_Manager.h"
#include "ace/TMCast/Group.hpp"

#include "tao/PortableServer/Servant_Base.h"
#include "tao/Any_Impl.h"
#include "tao/DynamicC.h"

#include "orbsvcs/FT_CORBA_ORBC.h"

#include "CrashPoint.h"
#include "StateUpdate.h"
#include "ReplicaController.h"


// State slot.
//
//

namespace
{
  PortableInterceptor::SlotId state_slot_id_;
}

PortableInterceptor::SlotId
state_slot_id ()
{
  return state_slot_id_;
}

void
state_slot_id (PortableInterceptor::SlotId slot_id)
{
  state_slot_id_ = slot_id;
}

Checkpointable::
~Checkpointable ()
{
}

CORBA::Any* Checkpointable::
get_state ()
{
  return 0;
}

void Checkpointable::
associate_state (CORBA::ORB_ptr orb, CORBA::Any const& state)
{
  ACE_DECLARE_NEW_CORBA_ENV;
  ACE_TRY
  {
    CORBA::Object_var pic_obj =
      orb->resolve_initial_references ("PICurrent" ACE_ENV_ARG_PARAMETER);

    ACE_TRY_CHECK;

    PortableInterceptor::Current_var pic =
      PortableInterceptor::Current::_narrow (
        pic_obj.in () ACE_ENV_ARG_PARAMETER);

    ACE_TRY_CHECK;

    pic->set_slot (state_slot_id (), state ACE_ENV_ARG_PARAMETER);

    ACE_TRY_CHECK;
  }
  ACE_CATCHANY
  {
    ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Caught exception:");
  }
  ACE_ENDTRY;
}

// ReplyLogger
//
//

ReplicaController::
~ReplicaController ()
{
}

ReplicaController::
ReplicaController (CORBA::ORB_ptr orb)
    : orb_ (CORBA::ORB::_duplicate (orb))
{
  ACE_DECLARE_NEW_CORBA_ENV;
  ACE_TRY
  {
    CORBA::Object_var poa_object =
      orb_->resolve_initial_references ("RootPOA" ACE_ENV_ARG_PARAMETER);
    ACE_TRY_CHECK;

    root_poa_ = PortableServer::POA::_narrow (
      poa_object.in () ACE_ENV_ARG_PARAMETER);
    ACE_TRY_CHECK;
  }
  ACE_CATCHANY
  {
    ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Caught exception:");
    ACE_OS::abort ();
  }
  ACE_ENDTRY;

  // Generate member id
  ACE_Utils::UUID uuid;
  ACE_Utils::UUID_GENERATOR::instance ()->init ();
  ACE_Utils::UUID_GENERATOR::instance ()->generateUUID (uuid);

  ACE_INET_Addr address (10000, "239.255.0.1");

  ACE_DEBUG ((LM_DEBUG, "Becoming a member with id %s\n",
              uuid.to_string ()->c_str ()));

  group_.reset (new TMCast::Group (address, uuid.to_string ()->c_str ()));

  int r = ACE_Thread_Manager::instance ()->spawn (
    &ReplicaController::listener_thunk, this);

  if (r < 0)
  {
    orb_->shutdown (0);
  }
}

void ReplicaController::
listener ()
{
  try
  {
    for (char buffer[1024];;)
    {
      size_t n = group_->recv (buffer, sizeof (buffer));

      ACE_HEX_DUMP ((LM_DEBUG, buffer, n));

      TAO_InputCDR cdr (buffer, n);

      CORBA::OctetSeq object_id;
      PortableInterceptor::AdapterName adapter_name;
      CORBA::String_var client_id;
      CORBA::Long retention_id;
      CORBA::OctetSeq reply;
      CORBA::Any state;

      cdr >> object_id;
      cdr >> adapter_name;
      cdr >> client_id.out ();
      cdr >> retention_id;
      cdr >> reply;
      cdr >> state;

      if (!cdr.good_bit ())
      {
        ACE_DEBUG ((LM_DEBUG, "CDR failed\n"));
        //@@ what to do?
      }

      ACE_DEBUG ((LM_DEBUG,
                  "Received log for %s with rid %i\n",
                  client_id.in (),
                  retention_id));


      RecordId rid (client_id.in (), retention_id);

      CORBA::OctetSeq_var tmp (new CORBA::OctetSeq (reply));
      log_.insert (rid, tmp);

      // Update state.
      CORBA::TypeCode_var tc = state.type ();

      if (tc->kind () != CORBA::tk_null)
      {
        PortableServer::POA_var poa = resolve_poa (adapter_name);

        PortableServer::ServantBase_var servant =
          poa->id_to_servant (object_id);

        Checkpointable* target =
          dynamic_cast<Checkpointable*> (servant.in ());

        if (target) target->set_state (state);
      }
    }
  }
  catch (TMCast::Group::Failed const&)
  {
    ACE_DEBUG ((LM_DEBUG,
                "Group failure. Perhaps, I am alone in the group.\n"));
  }
  catch (TMCast::Group::InsufficienSpace const&)
  {
    ACE_DEBUG ((LM_DEBUG, "Group::InsufficienSpace\n"));
  }

  orb_->shutdown (0);
}

PortableServer::POA_ptr ReplicaController::
resolve_poa (PortableInterceptor::AdapterName const&)
{
  //@@ Assume for now it's a root poa.
  return PortableServer::POA::_duplicate (root_poa_.in ());
}


void* ReplicaController::
listener_thunk (void* p)
{
  ReplicaController* obj = reinterpret_cast<ReplicaController*> (p);
  obj->listener();
  return 0;
}

namespace
{
  FT::FTRequestServiceContext*
  extract_context (
    PortableInterceptor::ServerRequestInfo_ptr ri
    ACE_ENV_ARG_DECL) ACE_THROW_SPEC ((CORBA::SystemException));
}

#if TAO_HAS_EXTENDED_FT_INTERCEPTORS == 1
void
ReplicaController::tao_ft_interception_point (
  PortableInterceptor::ServerRequestInfo_ptr ri,
  CORBA::OctetSeq_out ocs
  ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException,
                   PortableInterceptor::ForwardRequest))
{
  FT::FTRequestServiceContext_var ftr (
    extract_context (ri ACE_ENV_ARG_PARAMETER));

  ACE_DEBUG ((LM_DEBUG,
              "(%P|%t) Received request from %s with rid %i\n",
              ftr->client_id.in (),
              ftr->retention_id));

  // Check if this request is eligible for replay.

  RecordId rid (ftr->client_id.in (), ftr->retention_id);

  if (log_.contains (rid))
  {
    ACE_DEBUG ((LM_DEBUG,
                "(%P|%t) Replaying reply for %s with rid %i\n",
                ftr->client_id.in (),
                ftr->retention_id));

    CORBA::OctetSeq_var copy (log_.lookup (rid)); // make a copy

    ocs = copy._retn ();
  }

  return;
}

#endif /*TAO_HAS_EXTENDED_FT_INTERCEPTORS*/

void
ReplicaController::send_reply (
  PortableInterceptor::ServerRequestInfo_ptr ri
  ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  FT::FTRequestServiceContext_var ftr (
    extract_context (ri ACE_ENV_ARG_PARAMETER));


  ACE_DEBUG ((LM_DEBUG,
              "(%P|%t) Sending reply for %s with rid %i\n",
              ftr->client_id.in (),
              ftr->retention_id));


  // Prepare reply for logging.

  CORBA::Any_var result =
    ri->result (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  TAO_OutputCDR cdr;
  result->impl ()->marshal_value (cdr);

  Dynamic::ParameterList_var pl =
    ri->arguments (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  CORBA::ULong len = pl->length ();

  for (CORBA::ULong index = 0; index != len ; ++index)
  {
    //@@ No chance for PARAM_OUT
    if ((*pl)[index].mode == CORBA::PARAM_INOUT)
    {
      (*pl)[index].argument.impl ()->marshal_value (cdr);
    }
  }

  CORBA::OctetSeq_var reply;

  ACE_NEW (reply, CORBA::OctetSeq (cdr.total_length ()));

  reply->length (cdr.total_length ());

  CORBA::Octet* buf = reply->get_buffer ();

  // @@ What if this throws an exception??  We don't have any way to
  // check whether this succeeded
  for (ACE_Message_Block const* mb = cdr.begin ();
       mb != 0;
       mb = mb->cont ())
  {
    ACE_OS::memcpy (buf, mb->rd_ptr (), mb->length ());
    buf += mb->length ();
  }

  // Logging the reply and state update.
  //

  // First send message to members.
  //
  {
    // Extract state update.

    CORBA::OctetSeq_var oid = ri->object_id ();
    PortableInterceptor::AdapterName_var an = ri->adapter_name ();

    CORBA::Any_var state = ri->get_slot (state_slot_id ());

    CORBA::TypeCode_var tc = state->type ();

    if (tc->kind () == CORBA::tk_null)
    {
      ACE_DEBUG ((LM_DEBUG, "Slot update is void\n"));

      PortableServer::POA_var poa = resolve_poa (an);

      PortableServer::ServantBase_var servant =
        poa->id_to_servant (oid);

      Checkpointable* target =
        dynamic_cast<Checkpointable*> (servant.in ());

      if (target)
      {
        CORBA::Any_var tmp = target->get_state ();

        if (tmp != 0) state = tmp._retn ();
      }
    }

    TAO_OutputCDR cdr;

    cdr << oid;
    cdr << an;
    cdr << ftr->client_id.in ();
    cdr << ftr->retention_id;
    cdr << reply.in ();
    cdr << *state;

    size_t size = cdr.total_length ();

    CORBA::OctetSeq_var msg;

    ACE_NEW (msg, CORBA::OctetSeq (size));

    msg->length (size);

    {
      CORBA::Octet* buf (msg->get_buffer ());

      for (ACE_Message_Block const* mb = cdr.begin ();
           mb != 0;
           mb = mb->cont ())
      {
        ACE_OS::memcpy (buf, mb->rd_ptr (), mb->length ());
        buf += mb->length ();
      }
    }

    CORBA::Octet* buf (msg->get_buffer ());

    // Crash point 1.
    //
    if (crash_point == 1 && ftr->retention_id > 2) exit (1);

    try
    {
      while (true)
      {
        try
        {
          group_->send (buf, size);
          ACE_DEBUG ((LM_DEBUG, "Sent log record of length %i\n", size));
          break;
        }
        catch (TMCast::Group::Aborted const&)
        {
          ACE_DEBUG ((LM_DEBUG, "Retrying to send log record.\n"));
        }
      }
    }
    catch (TMCast::Group::Failed const&)
    {
      ACE_DEBUG ((LM_DEBUG,
                  "Group failure. Perhaps, I am alone in the group.\n"));
    }
  }


  // Now perform local logging.
  //
  RecordId rid (ftr->client_id.in (), ftr->retention_id);

  // This is slow but eh-safe ;-).
  //
  log_.insert (rid, reply);


  // Crash point 2.
  //
  if (crash_point == 2 && ftr->retention_id > 2) exit (1);
}


namespace
{
  FT::FTRequestServiceContext*
  extract_context (PortableInterceptor::ServerRequestInfo_ptr ri
                   ACE_ENV_ARG_DECL)
    ACE_THROW_SPEC ((CORBA::SystemException))
  {
    IOP::ServiceContext_var svc =
      ri->get_request_service_context (IOP::FT_REQUEST
                                       ACE_ENV_ARG_PARAMETER);
    ACE_CHECK;

    TAO_InputCDR cdr (ACE_reinterpret_cast (const char*,
                                            svc->context_data.get_buffer ()),
                      svc->context_data.length ());

    CORBA::Boolean byte_order;

    if ((cdr >> ACE_InputCDR::to_boolean (byte_order)) == 0)
    {
      //@@ what to throw?
      ACE_THROW (CORBA::BAD_CONTEXT ());
    }

    cdr.reset_byte_order (ACE_static_cast (int,byte_order));

    // Funny, the following two lines should normally translate
    // just to one ctor call. But because we have to use this
    // ACE_NEW macro hackery we have a default ctor call plus
    // assignment operator call. Yet another example how the
    // majority is being penalized by some broken platforms.
    //
    FT::FTRequestServiceContext_var req;

    //@@ completed status maybe wrong
    //
    ACE_NEW_THROW_EX (req,
                      FT::FTRequestServiceContext,
                      CORBA::NO_MEMORY (
                        CORBA::SystemException::_tao_minor_code (
                          TAO_DEFAULT_MINOR_CODE,
                          ENOMEM),
                        CORBA::COMPLETED_NO));

    cdr >> *req;

    if (!cdr.good_bit ())
    {
      //@@ what to throw?
      ACE_THROW (CORBA::UNKNOWN ());
    }

    return req._retn ();
  }
}


char*
ReplicaController::name (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  return CORBA::string_dup ("ReplicaController");
}

void
ReplicaController::send_exception (
    PortableInterceptor::ServerRequestInfo_ptr
    ACE_ENV_ARG_DECL_NOT_USED)
  ACE_THROW_SPEC ((CORBA::SystemException,
                   PortableInterceptor::ForwardRequest))
{
}

void
ReplicaController::send_other (
    PortableInterceptor::ServerRequestInfo_ptr
    ACE_ENV_ARG_DECL_NOT_USED)
  ACE_THROW_SPEC ((CORBA::SystemException,
                   PortableInterceptor::ForwardRequest))
{
}

void
ReplicaController::destroy (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
}

void
ReplicaController::receive_request_service_contexts (
    PortableInterceptor::ServerRequestInfo_ptr
    ACE_ENV_ARG_DECL_NOT_USED)
  ACE_THROW_SPEC ((CORBA::SystemException,
                   PortableInterceptor::ForwardRequest))
{

}

void
ReplicaController::receive_request (
    PortableInterceptor::ServerRequestInfo_ptr
    ACE_ENV_ARG_DECL_NOT_USED)
  ACE_THROW_SPEC ((CORBA::SystemException,
                   PortableInterceptor::ForwardRequest))
{
}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?