load_balancer_i.cpp

来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 1,042 行 · 第 1/3 页

CPP
1,042
字号
          ACE_ERROR_RETURN ((LM_ERROR,
                             ACE_TEXT ("(%N|%l) The factory does not have any references "),
                             ACE_TEXT ("to the group that you have sought \n\n")),
                            0);

        }
    }

  if (!this->flags_)
    {
      this->mem_pool_->find (flags_name_bind,
                             (void *&)this->flags_);
      this->update_objects (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_CHECK_RETURN (Load_Balancer::Object_Group::_nil ());
    }

  char *ior = 0;

  if (rr_groups_->find (ACE_const_cast (char *, id),
                        ior,
                        this->mem_pool_) == -1
      && random_groups_->find (ACE_const_cast (char *, id),
                               ior,
                               this->mem_pool_) == -1)
    ACE_THROW_RETURN (Load_Balancer::no_such_group (),
                      0);

  CORBA::Object_var objref =
    this->orb_->string_to_object (ior
                                  ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (Load_Balancer::Object_Group::_nil ());

  Load_Balancer::Object_Group_ptr
    object_group = Load_Balancer::Object_Group::_narrow (objref.in ()
                                                         ACE_ENV_ARG_PARAMETER);

  ACE_CHECK_RETURN (Load_Balancer::Object_Group::_nil ());


#if defined (DOORS_MEASURE_STATS)
  // Grab timestamp again.
  ACE_hrtime_t now = ACE_OS::gethrtime ();

  this->throughput_.sample (0,
                            now - latency_base);

  ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor ();
  ACE_OS::printf ("*=*=*=*=Aggregated result *=*=*=*=*= \n");
  this->throughput_.dump_results ("Aggregated", gsf);

#endif /*DOORS_MEASURE_STATS*/

  return object_group;
}

Load_Balancer::Group_List *
Object_Group_Factory_i::list_groups (int random
                                     ACE_ENV_ARG_DECL)
{
  Load_Balancer::Group_List * list;

  // Figure out the length of the list.
  CORBA::ULong len;
  if (random)
    len = random_groups_->current_size ();
  else
    len = rr_groups_->current_size ();

  // Allocate the list of <len> length.
  ACE_NEW_THROW_EX (list,
                    Load_Balancer::Group_List (len),
                    CORBA::NO_MEMORY ());
  ACE_CHECK_RETURN (list);
  list->length (len);

  // Create an iterator for group structure to populate the list.
  HASH_MAP::ITERATOR *group_iter;
  HASH_MAP::ITERATOR random_iter (*(this->random_groups_));
  HASH_MAP::ITERATOR rr_iter (*(this->rr_groups_));
  if (random)
    group_iter = &random_iter;
  else
    group_iter = &rr_iter;

  // Iterate over groups and populate the list.
  HASH_MAP::ENTRY *hash_entry;
  for (CORBA::ULong i = 0; i < len; i++)
    {
      group_iter->next (hash_entry);
      group_iter->advance ();

      (*list)[i] = ACE_OS::strdup (hash_entry->ext_id_);
    }

  return list;
}

Load_Balancer::Group_List *
Object_Group_Factory_i::round_robin_groups (ACE_ENV_SINGLE_ARG_DECL)
    ACE_THROW_SPEC ((CORBA::SystemException))
{
  return list_groups (0 ACE_ENV_ARG_PARAMETER);
}

Load_Balancer::Group_List *
Object_Group_Factory_i::random_groups (ACE_ENV_SINGLE_ARG_DECL)
    ACE_THROW_SPEC ((CORBA::SystemException))
{
  return list_groups (1 ACE_ENV_ARG_PARAMETER);
}


void
Object_Group_Factory_i::update_flags (int random
                                      ACE_ENV_ARG_DECL)
{
  //First check whether we have memory for flags_
  if (!this->flags_)
    {
      if (this->mem_pool_->find (flags_name_bind,
                                 (void *&) this->flags_) == -1)
        {
          void *value =
            this->mem_pool_->malloc (sizeof (CORBA::Short));
          ACE_NEW_THROW_EX (this->flags_,
                            (value) CORBA::Short (0),
                            CORBA::NO_MEMORY ());
          ACE_CHECK;

          // Initialize the variable
          this->mem_pool_->bind (flags_name_bind,
                                 (void *)this->flags_);
        }
    }

  CORBA::Short val = *(this->flags_);
  switch (val)
    {
    case 0:
      if (random)
        *(this->flags_) = 2;
      else
        *(this->flags_) = 1;
      break;
    case 1:
      if (random)
        *(this->flags_) = 3;
      break;
    case 2:
      if (!random)
        *(this->flags_) = 3;
      break;

    }
}

void
Object_Group_Factory_i::update_objects (ACE_ENV_SINGLE_ARG_DECL)
{
  // Create an appropriate servant.
  Object_Group_i * group_servant = 0;
  Object_Group_i *group_servant_rep = 0;

  // Check the value of of flags_ & do the instantiation and
  // registration

  switch (*(this->flags_))
    {
    case 1:
      ACE_NEW_THROW_EX (group_servant,
                        RR_Object_Group ("Round Robin group",
                                         this->poa_.in ()),
                        CORBA::NO_MEMORY ());
      group_servant->_this ();
      break;

    case 2:
      ACE_NEW_THROW_EX (group_servant,
                        Random_Object_Group ("Random group",
                                             this->poa_.in ()),
                        CORBA::NO_MEMORY ());
      group_servant->_this ();
      break;
    case 3:
      ACE_NEW_THROW_EX (group_servant_rep,
                        Random_Object_Group ("Random group",
                                             this->poa_.in ()),
                        CORBA::NO_MEMORY ());
      group_servant_rep->_this ();

      ACE_NEW_THROW_EX (group_servant,
                        RR_Object_Group ("Round Robin group",
                                         this->poa_.in ()),
                        CORBA::NO_MEMORY ());
      group_servant->_this ();
      break;
    }

}


Object_Group_i::Object_Group_i (const char * id,
                                PortableServer::POA_ptr poa)
  :poa_ (PortableServer::POA::_duplicate (poa)),
   member_id_list_ (0),
   members_ (0),
   id_ (id),
   allocator_ (0)
{

  if (!this->allocator_)
    {
      ACE_MMAP_Memory_Pool::OPTIONS options (ACE_DEFAULT_BASE_ADDR);
      ACE_NEW (this->allocator_,
               ALLOCATOR ("Mem_Pool",
                          "Mem_Pool",
                          &options));
    }
}


Object_Group_i::~Object_Group_i (void)
{
  // Need to delete all the items from the member_id_list, to avoid
  // memory leaks.
  Object_Group_i::ITERATOR iter (*member_id_list_);

  do
    {
      delete (iter.next ());
    } while (iter.advance ());

  delete this->allocator_;
}


PortableServer::POA_ptr
Object_Group_i::_default_POA (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  return PortableServer::POA::_duplicate (this->poa_.in ());
}


char *
Object_Group_i::id (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  return CORBA::string_dup (id_.c_str ());
}

void
Object_Group_i::bind (const Load_Balancer::Member & member
                      ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException,
                   Load_Balancer::duplicate_member))
{

  if (this->members_ == 0)
    {
      ACE_CString id = this->id (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_CHECK;

      id += server_id_name_bind;

      if (this->allocator_->find (id.c_str (), (void *&)this->members_) == -1)
        {
          void *hash_map = this->allocator_->malloc (sizeof (HASH_MAP));
          ACE_NEW_THROW_EX (this->members_,
                            (hash_map) HASH_MAP (this->allocator_),
                            CORBA::NO_MEMORY ());
          ACE_CHECK;

          // Bind it in the mem pool with a name
          if (this->allocator_->bind (id.c_str (),
                                      (void *)this->members_) != 0)
            {
              ACE_ERROR ((LM_ERROR,
                          "Unable to bind \n"));

            }
        }
    }

  // Check whether the element already exists..
  if (this->members_->find (ACE_const_cast (char *,
                                            (const char *) member.id),
                            this->allocator_) == 0)
    ACE_THROW (Load_Balancer::duplicate_member ());

  size_t id_len = ACE_OS::strlen (member.id) + 1;
  size_t ref_len = ACE_OS::strlen (member.obj) + 1;

  char *mem_alloc = (char *)this->allocator_->malloc (id_len + ref_len);

  if (mem_alloc == 0)
     ACE_THROW (CORBA::NO_MEMORY ());

  char **id_ptr = (char **)this->allocator_->malloc (sizeof (char *));
  *id_ptr = mem_alloc;
  char *ior_ptr = mem_alloc + id_len;

  ACE_OS::strcpy (*id_ptr, member.id);
  ACE_OS::strcpy (ior_ptr, member.obj);


  // Insert new member into <members_> and check for duplicates/failures.
  int result = this->members_->trybind (*id_ptr,
                                        ior_ptr);

  if (result == 1)
    ACE_THROW (Load_Balancer::duplicate_member ());
  else if (result == -1)
    ACE_THROW (CORBA::INTERNAL ());

  // Search the list first from the mem mapp pool and then Insert new
  // member's id into <member_id_list_>.

  ACE_CString id = dll_name_bind;
  id += this->id (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;


  if (this->allocator_->find (id.c_str (),
                              (void *&)this->member_id_list_)
      == -1)
    {
      void *dll_list = this->allocator_->malloc (sizeof (LIST));
      ACE_NEW_THROW_EX (this->member_id_list_,
                        (dll_list) LIST (this->allocator_),
                        CORBA::NO_MEMORY ());
      ACE_CHECK;

      // Bind it in the mem pool with a name
      if (this->allocator_->bind (id.c_str (),
                                 (void *)this->member_id_list_) != 0)
        {
          ACE_ERROR ((LM_ERROR,
                      "Unable to bind \n"));
          return;
        }
    }

  if (member_id_list_->insert_tail (id_ptr) == 0)
    ACE_THROW (CORBA::NO_MEMORY ());

  // Theoretically, we should deal with memory failures more
  // thoroughly.  But, practically, the whole system is going to be

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?