📄 gr_single_threaded_scheduler.cc
字号:
/* -*- c++ -*- *//* * Copyright 2004 Free Software Foundation, Inc. * * This file is part of GNU Radio * * GNU Radio is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2, or (at your option) * any later version. * * GNU Radio is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with GNU Radio; see the file COPYING. If not, write to * the Free Software Foundation, Inc., 59 Temple Place - Suite 330, * Boston, MA 02111-1307, USA. */#ifdef HAVE_CONFIG_H#include "config.h"#endif#include <gr_single_threaded_scheduler.h>#include <gr_block.h>#include <gr_block_detail.h>#include <gr_buffer.h>#include <iostream>#include <limits>#include <assert.h>gr_single_threaded_scheduler::gr_single_threaded_scheduler ( const std::vector<gr_block_sptr> &blocks) : d_blocks (blocks), d_enabled (true){#if 0 std::cout << "gr_single_threaded_scheduler: " << d_blocks.size () << " blocks\n";#endif}gr_single_threaded_scheduler_sptrgr_make_single_threaded_scheduler (const std::vector<gr_block_sptr> &blocks){ return gr_single_threaded_scheduler_sptr (new gr_single_threaded_scheduler (blocks));}gr_single_threaded_scheduler::~gr_single_threaded_scheduler (){}voidgr_single_threaded_scheduler::run (){ d_enabled = true; main_loop ();}inline static unsigned intround_up (unsigned int n, unsigned int multiple){ return ((n + multiple - 1) / multiple) * multiple;}inline static unsigned intround_down (unsigned int n, unsigned int multiple){ return (n / multiple) * multiple;}//// Return minimum available write space in all our downstream buffers// or -1 if we're output blocked and the output we're blocked// on is done.//static intmin_available_space (gr_block_detail *d, int output_multiple){ int min_space = std::numeric_limits<int>::max(); for (int i = 0; i < d->noutputs (); i++){ int n = round_down (d->output(i)->space_available (), output_multiple); if (n == 0){ // We're blocked on output. if (d->output(i)->done()){ // Downstream is done, therefore we're done. return -1; } return 0; } min_space = std::min (min_space, n); } return min_space;}voidgr_single_threaded_scheduler::main_loop (){ static const int DEFAULT_CAPACITY = 16; int noutput_items; gr_vector_int ninput_items_required (DEFAULT_CAPACITY); gr_vector_int ninput_items (DEFAULT_CAPACITY); gr_vector_const_void_star input_items (DEFAULT_CAPACITY); gr_vector_void_star output_items (DEFAULT_CAPACITY); unsigned int bi; unsigned int nalive; int max_items_avail; for (unsigned i = 0; i < d_blocks.size (); i++) d_blocks[i]->detail()->set_done (false); // reset any done flags nalive = d_blocks.size (); // Loop while there are still blocks alive bi = 0; while (d_enabled && nalive > 0){ gr_block *m = d_blocks[bi].get (); gr_block_detail *d = m->detail().get (); if (d->done ()) goto next_block; if (d->source_p ()){ ninput_items_required.resize (0); ninput_items.resize (0); input_items.resize (0); output_items.resize (d->noutputs ()); // determine the minimum available output space noutput_items = min_available_space (d, m->output_multiple ()); if (noutput_items == -1) // we're done goto were_done; if (noutput_items == 0) // we're output blocked goto next_block; goto setup_call_to_work; // jump to common code } else if (d->sink_p ()){ ninput_items_required.resize (d->ninputs ()); ninput_items.resize (d->ninputs ()); input_items.resize (d->ninputs ()); output_items.resize (0); max_items_avail = 0; for (int i = 0; i < d->ninputs (); i++){ ninput_items[i] = d->input(i)->items_available(); if (ninput_items[i] == 0 && d->input(i)->done()) goto were_done; max_items_avail = std::max (max_items_avail, ninput_items[i]); } // take a swag at how much output we can sink noutput_items = (int) (max_items_avail * m->relative_rate ()); noutput_items = round_up (noutput_items, m->output_multiple ()); if (noutput_items == 0) // we're blocked on input goto next_block; goto try_again; // Jump to code shared with regular case. } else { // do the regular thing ninput_items_required.resize (d->ninputs ()); ninput_items.resize (d->ninputs ()); input_items.resize (d->ninputs ()); output_items.resize (d->noutputs ()); max_items_avail = 0; for (int i = 0; i < d->ninputs (); i++){ ninput_items[i] = d->input(i)->items_available (); max_items_avail = std::max (max_items_avail, ninput_items[i]); } // determine the minimum available output space noutput_items = min_available_space (d, m->output_multiple ()); if (noutput_items == -1) // we're done goto were_done; if (noutput_items == 0) // we're output blocked goto next_block; // This isn't necessary for correctness, but saves us time below. // FIXME benchmark to confirm this is worth the trouble if (m->relative_rate() < (1.0/8)){ // substantial decimator noutput_items = std::min ((unsigned) noutput_items, std::max ((unsigned) m->output_multiple(), round_up ((unsigned) (max_items_avail * m->relative_rate()), m->output_multiple ()))); } try_again: // ask the block how much input they need to produce noutput_items m->forecast (noutput_items, ninput_items_required); // See if we've got sufficient input available int i; for (i = 0; i < d->ninputs (); i++) if (ninput_items_required[i] > ninput_items[i]) // not enough break; if (i < d->ninputs ()){ // not enough input on input[i] // if we can, try reducing the size of our output request if (noutput_items > m->output_multiple ()){ noutput_items /= 2; noutput_items = round_up (noutput_items, m->output_multiple ()); goto try_again; } // We're blocked on input if (d->input(i)->done()) // If the upstream block is done, we're done goto were_done; // Is it possible to ever fulfill this request? if (ninput_items_required[i] > d->input(i)->max_possible_items_available ()){ // Nope, never going to happen... std::cerr << "\nsched: <gr_block " << m->name() << " (" << m->unique_id() << ")>" << " is requesting more input data\n" << " than we can provide.\n" << " ninput_items_required = " << ninput_items_required[i] << "\n" << " max_possible_items_available = " << d->input(i)->max_possible_items_available() << "\n" << " If this is a filter, consider reducing the number of taps.\n"; goto were_done; } goto next_block; } // We've got enough data on each input to produce noutput_items. // Finish setting up the call to work. for (int i = 0; i < d->ninputs (); i++) input_items[i] = d->input(i)->read_pointer(); setup_call_to_work: for (int i = 0; i < d->noutputs (); i++) output_items[i] = d->output(i)->write_pointer(); // Do the actual work of the block int n = m->general_work (noutput_items, ninput_items, input_items, output_items); if (n == -1) // block is done goto were_done; d->produce_each (n); // advance write pointers goto next_block; } assert (0); were_done: d->set_done (true); nalive--; next_block: if (++bi >= d_blocks.size ()) bi = 0; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -