dt_creator.cpp

来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 677 行 · 第 1/2 页

CPP
677
字号

    } /* while */

  ACE_DEBUG ((LM_DEBUG,
              "Activated Job List\n"));
}

void
DT_Creator::activate_schedule (ACE_ENV_SINGLE_ARG_DECL)
{
  if (TAO_debug_level > 0)
    ACE_DEBUG ((LM_DEBUG,
		"Activating schedule, task count = %d\n",
		dt_count_));

  Thread_Task* task;

  for (int i = 0; i < dt_count_; ++i)
    {
      task = dt_list_[i];

      if (task->dist ())
	{
	  // resolve the object from the naming service
	  CosNaming::Name name (1);
	  name.length (1);
	  name[0].id = CORBA::string_dup (task->job ());

	  CORBA::Object_var obj =
	    this->naming_->resolve (name ACE_ENV_ARG_PARAMETER);
	  ACE_CHECK;

	  Job_var job = Job::_narrow (obj.in () ACE_ENV_ARG_PARAMETER);
	  ACE_CHECK;

	  //	  if (TAO_debug_level > 0)
	  // {
	      // Check that the object is configured with some
	      // PriorityModelPolicy.
	      CORBA::Policy_var policy =
		job->_get_policy (RTCORBA::PRIORITY_MODEL_POLICY_TYPE
				  ACE_ENV_ARG_PARAMETER);
	      ACE_CHECK;

	      RTCORBA::PriorityModelPolicy_var priority_policy =
		RTCORBA::PriorityModelPolicy::_narrow (policy.in ()
						       ACE_ENV_ARG_PARAMETER);
	      ACE_CHECK;

	      if (CORBA::is_nil (priority_policy.in ()))
		ACE_DEBUG ((LM_DEBUG,
			    "ERROR: Priority Model Policy not exposed!\n"));
	      else
		{
                  /*
		  RTCORBA::PriorityModel priority_model =
		    priority_policy->priority_model (ACE_ENV_SINGLE_ARG_PARAMETER);
		  ACE_CHECK;

		  if (priority_model == RTCORBA::CLIENT_PROPAGATED)
		    ACE_DEBUG ((LM_DEBUG,
				"%s priority_model = RTCORBA::CLIENT_PROPAGATED\n", task->job ()));
		  else
		    ACE_DEBUG ((LM_DEBUG,
				"%s priority_model = RTCORBA::SERVER_DECLARED\n", task->job ()));
		  */
		}
	      //} /*  if (TAO_debug_level > 0) */

	  task->job (job.in ());
	}
    }
  if (TAO_debug_level > 0 && dt_count_ > 0)
    ACE_DEBUG ((LM_DEBUG,
		"Activated schedule, task count = %d\n",
		dt_count_));

}

int
DT_Creator::resolve_naming_service (ACE_ENV_SINGLE_ARG_DECL)
{
  CORBA::Object_var naming_obj =
    this->orb_->resolve_initial_references ("NameService"
                                            ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  // Need to check return value for errors.
  if (CORBA::is_nil (naming_obj.in ()))
    ACE_ERROR_RETURN ((LM_ERROR,
                       " (%P|%t) Unable to resolve the Naming Service.\n"),
                      -1);

  this->naming_ =
    CosNaming::NamingContextExt::_narrow (naming_obj.in ()
                                          ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  //@@tmp hack, otherwise crashes on exit!..??
  CosNaming::NamingContextExt::_duplicate (this->naming_.in());
  return 0;
}

void
DT_Creator::create_distributable_threads (RTScheduling::Current_ptr current
					  ACE_ENV_ARG_DECL)
{
  current_ = RTScheduling::Current::_duplicate (current);

  long flags;
  flags = THR_NEW_LWP | THR_JOINABLE;
  flags |=
    orb_->orb_core ()->orb_params ()->scope_policy () |
    orb_->orb_core ()->orb_params ()->sched_policy ();

  ACE_DEBUG ((LM_DEBUG,
              "Waiting to Synch\n"));
  
  while (!this->synch ()->synched ())
    {
      this->orb_->perform_work (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_CHECK;
    }
  
  CORBA::Policy_var sched_param;
  sched_param = CORBA::Policy::_duplicate (this->sched_param (100));
  const char * name = 0;
  current_->begin_scheduling_segment (name,
				      sched_param.in (),
				      sched_param.in ()
				      ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  ACE_NEW (base_time_,
	   ACE_Time_Value (*(this->synch ()->base_time ())));

  for (int i = 0; i < this->dt_count_; i++)
    {
      ACE_Time_Value now (ACE_OS::gettimeofday ());

      ACE_Time_Value elapsed_time =  now - *base_time_;

      char buf [BUFSIZ];
      ACE_OS::sprintf (buf, "elapsed time = %d\n now = %d\n base_time_ = %d\n",
		       (int) elapsed_time.sec (),
		       (int) now.sec (),
		       (int) base_time_->sec());

      log [log_index++] = ACE_OS::strdup (buf) ;

      ACE_hthread_t curr_thr;
      ACE_Thread::self (curr_thr);

      if (dt_list_ [i]->start_time () != 0 && (elapsed_time.sec () < dt_list_[i]->start_time ()))
	{
	  int suspension_time = dt_list_[i]->start_time () - elapsed_time.sec ();
	  ACE_OS::sprintf (buf,"suspension_tome = %d\n",
			   suspension_time);
	  log [log_index++] = ACE_OS::strdup (buf);
	  yield (suspension_time,
		 dt_list_[i]);
	}

      sched_param = CORBA::Policy::_duplicate (this->sched_param (dt_list_ [i]->importance ()));
      dt_list_ [i]->activate_task (current,
				   sched_param.in (),
				   flags,
				   base_time_
				   ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;

    }

  this->wait ();

  current_->end_scheduling_segment (name
				    ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  this->check_ifexit ();
}

void
DT_Creator::dt_ended (void)
{
  {
    ACE_GUARD (ACE_Lock, ace_mon, *state_lock_);
    --active_dt_count_;
    if (TAO_debug_level > 0)
      ACE_DEBUG ((LM_DEBUG, "Active dt count = %d\n",active_dt_count_));
    char buf [BUFSIZ];
    ACE_OS::sprintf (buf,"Active dt count = %d\n",active_dt_count_);
    log [log_index++] = ACE_OS::strdup (buf);
  }
  this->check_ifexit ();
}

void
DT_Creator::job_ended (void)
{
  {
    ACE_GUARD (ACE_Lock, ace_mon, *state_lock_);
    --active_job_count_;
    char buf [BUFSIZ];
    ACE_OS::sprintf (buf,"Active job count = %d\n",active_job_count_);
    log [log_index++] = ACE_OS::strdup (buf);
  }

 this->check_ifexit ();
}

void
DT_Creator::check_ifexit (void)
{
  if (TAO_debug_level > 0)
    ACE_DEBUG ((LM_DEBUG,
		"Checking exit status Job# = %d DT# = %d\n",
		active_job_count_,
		active_dt_count_));

  static int shutdown = 0;

  {
    ACE_GUARD (ACE_Lock, ace_mon, *shutdown_lock_);

    if (!shutdown)
      {
	// All tasks have finished and all jobs have been shutdown.
	if (active_dt_count_ == 0 && active_job_count_ == 0)
	  {

	    ACE_DEBUG ((LM_DEBUG, "Shutdown in progress ...\n"));

	    /*
	    for (int i = 0; i < dt_count_; i++)
	      {
		dt_list_[i]->dump_stats ();
	      }

	    for (int i = 0; i < job_count_; i ++)
	      {
		job_list_[i]->dump_stats ();
	      }
	    */
	    TASK_STATS::instance ()->dump_samples (file_name_,
						   "#Schedule Output",
						   ACE_High_Res_Timer::global_scale_factor ());

	    shutdown = 1;

	    FILE* log_file = ACE_OS::fopen (log_file_name_, "w");

	    if (log_file != NULL)
	      {
		// first dump what the caller has to say.
		ACE_OS::fprintf (log_file, "Log File\n");

		for (int i = 0; i < log_index; i++)
		  {
		    ACE_OS::fprintf (log_file, "%s\n", log [i]);
		  }

		ACE_OS::fclose (log_file);
	      }
	    ACE_DEBUG ((LM_DEBUG,
			"Log File Ready\n"));

	  }
      }
  }
}

int
DT_Creator::dt_count (void)
{
  return dt_count_;
}

DT_Creator::~DT_Creator (void)
{
  // for (int i = 0; i < (BUFSIZ * 100); i++)
  delete[] log;

  delete[] dt_list_;
  delete[] poa_list_;
  delete[] job_list_;

  delete base_time_;

  delete state_lock_;
  delete shutdown_lock_;
}


void
DT_Creator::log_msg (char* msg)
{
  log [log_index++] = ACE_OS::strdup (msg);
}

CORBA::ORB_ptr
DT_Creator::orb (void)
{
  return this->orb_.in ();
}

void
DT_Creator::orb (CORBA::ORB_ptr orb)
{
  this->orb_ = CORBA::ORB::_duplicate (orb);
}

ACE_Time_Value*
DT_Creator::base_time (void)
{
  return this->base_time_;
}

void
DT_Creator::base_time (ACE_Time_Value* base_time)
{
  this->base_time_ = base_time;
}


RTScheduling::Current_ptr
DT_Creator::current (void)
{
  return current_.in ();
}


Synch_i*
DT_Creator::synch (void)
{
  return this->synch_;
}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?