⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mcast.cpp

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 CPP
📖 第 1 页 / 共 2 页
字号:
      sender->connect (sub ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      // To receive events we need to setup an event handler:
      TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> receiver;
      receiver = TAO_ECG_UDP_Receiver::create();

      TAO_ECG_Mcast_EH mcast_eh (&*receiver);

      // The event handler uses the ORB reactor to wait for multicast
      // traffic:
      mcast_eh.reactor (orb->orb_core ()->reactor ());

      // The multicast Event Handler needs to know to what multicast
      // groups it should listen to.  To do so it becomes an observer
      // with the event channel, to determine the list of events
      // required by all the local consumer.
      // Then it register for the multicast groups that carry those
      // events:
      mcast_eh.open (event_channel.in ()
                     ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      // Again the receiver connects to the event channel as a
      // supplier of events, using the Observer features to detect
      // local consumers and their interests:
      receiver->init (event_channel.in (),
                      endpoint,
                      address_server.in ()
                      ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      // The Receiver is also a supplier of events.  The exact type of
      // events is only known to the application, because it depends
      // on the traffic carried by all the multicast groups that the
      // different event handlers subscribe to.
      // In this example we choose to simply describe our publications
      // using wilcards, any event from any source.  More advanced
      // application could use the Observer features in the event
      // channel to update this information (and reduce the number of
      // multicast groups that each receive subscribes to).
      // In a future version the event channel could perform some of
      // those tasks automatically
      RtecEventChannelAdmin::SupplierQOS pub;
      pub.publications.length (1);
      pub.publications[0].event.header.type   = ACE_ES_EVENT_ANY;
      pub.publications[0].event.header.source = ACE_ES_EVENT_SOURCE_ANY;
      pub.is_gateway = 1;

      receiver->connect (pub ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      // **************** THAT COMPLETES THE FEDERATION SETUP

      // **************** HERE IS THE CLIENT SETUP

      // First let us create consumers and connect them to the event
      // channel
      Consumer consumer1;
      Consumer consumer2;
      RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
        event_channel->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;
      consumer1.connect (consumer_admin.in () ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;
      consumer2.connect (consumer_admin.in () ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      // And now create a supplier
      Supplier supplier;
      RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
        event_channel->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;
      supplier.connect (supplier_admin.in ()
                        ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      // **************** THAT COMPLETES THE CLIENT SETUP

      // **************** HERE IS THE EVENT LOOP

      // creating thread pool
      ACE_Thread_Manager the_ace_manager;
      the_ace_manager.open ();
      int thread_pool_id = the_ace_manager.spawn_n (
        pool_size, ACE_THR_FUNC (run_orb_within_thread), 0, THR_DETACHED | THR_NEW_LWP);
      if (thread_pool_id == -1) {
        ACE_ERROR_RETURN ((LM_ERROR, "Cannot spawn thread pool\n"), 1);
      }
      ACE_OS::sleep (1); // simple solution ensures ready thread pool

      for (int i = 0; i < data_items; i++)
        {
          supplier.perform_push (ACE_ENV_SINGLE_ARG_PARAMETER);
          ACE_TRY_CHECK;
        }

      ACE_OS::sleep (2); // simple solution ensures ready receivers
      terminate_threads = true; // terminate thread pool

      the_ace_manager.wait(); // wait until all threads in the pool are stopped

      the_ace_manager.close ();

      // **************** THAT COMPLETES THE EVENT LOOP

      // **************** HERE IS THE CLEANUP CODE

      // First the easy ones
      supplier.disconnect (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;
      consumer1.disconnect (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;
      consumer2.disconnect (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;

      // Now let us disconnect the Receiver
      receiver->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;

      int r = mcast_eh.shutdown ();

      if (r == -1)
        {
          ACE_ERROR_RETURN ((LM_ERROR,
                             "Closing MCast event handler\n"), 1);
        }

      // And also disconnect the sender of events
      sender->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;

      // The event channel must be destroyed, so it can release its
      // resources, and inform all the clients that are still
      // connected that it is going away.
      event_channel->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;

      // Deactivating the event channel implementation is not strictly
      // required, the POA will do it for us, but it is good manners:
      {
        // Using _this() activates with the default POA, we must gain
        // access to that POA to deactivate the object.
        // Notice that we 'know' that the default POA for this servant
        // is the root POA, but the code is more robust if we don't
        // rely on that.
        PortableServer::POA_var poa =
          ec_impl._default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
        ACE_TRY_CHECK;
        // Get the Object Id used for the servant..
        PortableServer::ObjectId_var oid =
          poa->servant_to_id (&ec_impl ACE_ENV_ARG_PARAMETER);
        ACE_TRY_CHECK;
        // Deactivate the object
        poa->deactivate_object (oid.in () ACE_ENV_ARG_PARAMETER);
        ACE_TRY_CHECK;
      }

      // Now we can destroy the POA, the flags mean that we want to
      // wait until the POA is really destroyed
      poa->destroy (1, 1 ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      // Finally destroy the ORB
      orb->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;

      // **************** THAT COMPLETES THE CLEANUP CODE

      ACE_DEBUG ((LM_DEBUG,
                  "MCast example finished\n"));
    }
  ACE_CATCHANY
    {
      ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Service");
      return 1;
    }
  ACE_ENDTRY;
  return 0;
}

// ****************************************************************

int parse_args (int argc, char *argv[])
{
  ACE_Get_Opt get_opts (argc, argv, "m:");
  int c;

  while ((c = get_opts ()) != -1)
    switch (c)
      {
      case 'm':
        udp_mcast_address = get_opts.opt_arg ();
        break;

      case '?':
      default:
        ACE_ERROR_RETURN ((LM_ERROR,
                           "usage:  %s "
                           "[-m udp_mcast_address]"
                           "\n",
                           argv [0]),
                          -1);
      }
  // Indicates sucessful parsing of the command line
  return 0;
}

// ****************************************************************

#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -