⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 retransmit.cpp

📁 最新的版本ACE-5.6.8,刚从外文网上搬下,与大家分享.
💻 CPP
字号:
// file      : ace/RMCast/Retransmit.cpp
// author    : Boris Kolpackov <boris@kolpackov.net>
// cvs-id    : $Id: Retransmit.cpp 80826 2008-03-04 14:51:23Z wotte $

#include "ace/Time_Value.h"     // ACE_Time_Value
#include "ace/OS_NS_stdlib.h"   // abort
#include "ace/OS_NS_sys_time.h" // gettimeofday

#include "Retransmit.h"

/*
#include <iostream>
using std::cerr;
using std::endl;
*/

namespace ACE_RMCast
{
  Retransmit::
  Retransmit (Parameters const& params)
      : params_ (params),
        cond_ (mutex_),
        stop_ (false)
  {
  }

  void Retransmit::
  out_start (Out_Element* out)
  {
    Element::out_start (out);

    tracker_mgr_.spawn (track_thunk, this);
  }

  void Retransmit::
  out_stop ()
  {
    {
      Lock l (mutex_);
      stop_ = true;
      cond_.signal ();
    }

    tracker_mgr_.wait ();

    Element::out_stop ();
  }

  void Retransmit::send (Message_ptr m)
  {
    if (m->find (Data::id) != 0)
    {
      SN const* sn = static_cast<SN const*> (m->find (SN::id));

      Lock l (mutex_);
      queue_.bind (sn->num (), Descr (m->clone ()));
    }

    out_->send (m);
  }

  void Retransmit::recv (Message_ptr m)
  {
    if (NAK const* nak = static_cast<NAK const*> (m->find (NAK::id)))
    {
      Address to (static_cast<To const*> (m->find (To::id))->address ());

      if (nak->address () == to)
      {
        Lock l (mutex_);

        for (NAK::iterator j (const_cast<NAK*> (nak)->begin ());
             !j.done ();
             j.advance ())
        {
          u64* psn;
          j.next (psn);

          Message_ptr m;

          Queue::ENTRY* pair;

          if (queue_.find (*psn, pair) == 0)
          {
            //cerr << 5 << "PRTM " << to << " " << pair->ext_id_ << endl;

            m = pair->int_id_.message ();

            pair->int_id_.reset ();
          }
          else
          {
            //cerr << 4 << "message " << *psn << " not available" << endl;

            m = Message_ptr (new Message);
            m->add (Profile_ptr (new SN (*psn)));
            m->add (Profile_ptr (new NoData));
          }

          out_->send (m);
        }
      }
    }

    in_->recv (m);
  }

  ACE_THR_FUNC_RETURN Retransmit::
  track_thunk (void* obj)
  {
    reinterpret_cast<Retransmit*> (obj)->track ();
    return 0;
  }

  void Retransmit::
  track ()
  {
    while (true)
    {
      Lock l (mutex_);

      for (Queue::iterator i (queue_); !i.done ();)
      {
        if ((*i).int_id_.inc () >= params_.retention_timeout ())
        {
          u64 sn ((*i).ext_id_);
          i.advance ();
          queue_.unbind (sn);
        }
        else
        {
          i.advance ();
        }
      }

      //FUZZ: disable check_for_lack_ACE_OS
      // Go to sleep but watch for "manual cancellation" request.
      //
      ACE_Time_Value time (ACE_OS::gettimeofday ());
      //FUZZ: enable check_for_lack_ACE_OS

      time += params_.tick ();

      while (!stop_)
      {
        if (cond_.wait (&time) == -1)
        {
          if (errno != ETIME)
            ACE_OS::abort ();
          else
            break;
        }
      }

      if (stop_)
        break;
    }
  }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -