📄 flow_graph.py
字号:
## Copyright 2004 Free Software Foundation, Inc.# # This file is part of GNU Radio# # GNU Radio is free software; you can redistribute it and/or modify# it under the terms of the GNU General Public License as published by# the Free Software Foundation; either version 2, or (at your option)# any later version.# # GNU Radio is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU General Public License for more details.# # You should have received a copy of the GNU General Public License# along with GNU Radio; see the file COPYING. If not, write to# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,# Boston, MA 02111-1307, USA.# from gnuradio.gr.basic_flow_graph import remove_duplicates, endpoint, edge, \ basic_flow_graphfrom gnuradio.gr.exceptions import *from gnuradio_swig_python import buffer, buffer_add_reader, block_detail, \ single_threaded_schedulerfrom gnuradio.gr.scheduler import scheduler_WHITE = 0 # graph coloring tags_GRAY = 1_BLACK = 2class buffer_sizes (object): """compute buffer sizes to use""" def __init__ (self, flow_graph): # We could scan the graph here and determine individual sizes # based on relative_rate, number of readers, etc. # The simplest thing that could possibly work: all buffers same size self.fixed_buffer_size = 32*1024 def allocate (self, m, index): """allocate buffer for output index of block m""" item_size = m.output_signature().sizeof_stream_item (index) nitems = self.fixed_buffer_size / item_size if nitems < 2 * m.output_multiple (): nitems = 2 * m.output_multiple () return buffer (nitems, item_size) class flow_graph (basic_flow_graph): """add physical connection info to simple_flow_graph """ __slots__ = ['blocks', 'scheduler'] def __init__ (self): basic_flow_graph.__init__ (self); self.blocks = None self.scheduler = None def start (self): '''start graph, forking thread(s), return immediately''' if self.scheduler: raise RuntimeError, "Scheduler already running" self._setup_connections () # cast down to gr_module_sptr # t = [x.block () for x in self.topological_sort (self.blocks)] self.scheduler = scheduler (self) self.scheduler.start () def stop (self): '''tells scheduler to stop and waits for it to happen''' if self.scheduler: self.scheduler.stop () self.scheduler = None def wait (self): '''waits for scheduler to stop''' if self.scheduler: self.scheduler.wait () self.scheduler = None def is_running (self): return not not self.scheduler def run (self): '''start graph, wait for completion''' self.start () self.wait () def _setup_connections (self): """given the basic flow graph, setup all the physical connections""" self.validate () self.blocks = self.all_blocks () self._assign_details () self._assign_buffers () self._connect_inputs () def _assign_details (self): for m in self.blocks: edges = self.in_edges (m) used_ports = remove_duplicates ([e.dst.port for e in edges]) ninputs = len (used_ports) edges = self.out_edges (m) used_ports = remove_duplicates ([e.src.port for e in edges]) noutputs = len (used_ports) m.set_detail (block_detail (ninputs, noutputs)) def _assign_buffers (self): """determine the buffer sizes to use, allocate them and attach to detail""" sizes = buffer_sizes (self) for m in self.blocks: d = m.detail () for index in range (d.noutputs ()): d.set_output (index, sizes.allocate (m, index)) def _connect_inputs (self): """connect all block inputs to appropriate upstream buffers""" for m in self.blocks: d = m.detail () for e in self.in_edges (m): # FYI, sources don't have any in_edges our_port = e.dst.port upstream_block = e.src.block upstream_port = e.src.port upstream_buffer = upstream_block.detail().output (upstream_port) d.set_input (our_port, buffer_add_reader (upstream_buffer)) def topological_sort (self, all_v): ''' Return a topologically sorted list of vertices. This is basically a depth-first search with checks for back edges (the non-DAG condition) ''' # it's correct without this sort, but this # should give better ordering for cache utilization all_v = self._sort_sources_first (all_v) output = [] for v in all_v: v.ts_color = _WHITE for v in all_v: if v.ts_color == _WHITE: self._dfs_visit (v, output) output.reverse () return output def _dfs_visit (self, u, output): # print "dfs_visit (enter): ", u u.ts_color = _GRAY for v in self.downstream_verticies (u): if v.ts_color == _WHITE: # (u, v) is a tree edge self._dfs_visit (v, output) elif v.ts_color == _GRAY: # (u, v) is a back edge raise NotDAG, "The graph is not an acyclic graph (It's got a loop)" elif v.ts_color == _BLACK: # (u, v) is a cross or forward edge pass else: raise CantHappen, "Invalid color on vertex" u.ts_color = _BLACK output.append (u) # print "dfs_visit (exit): ", u, output def _sort_sources_first (self, all_v): # the sort function is not guaranteed to be stable. # We add the unique_id in to the key so we're guaranteed # of reproducible results. This is important for the test # code. There is often more than one valid topological sort. # We want to force a reproducible choice. x = [(not self.source_p(v), v.unique_id(), v) for v in all_v] x.sort () x = [v[2] for v in x] # print "sorted: ", x return x def partition_graph (self, all_v): '''Return a list of lists of nodes that are connected. The result is a list of disjoint graphs. The sublists are topologically sorted. ''' result = [] working_v = all_v[:] # make copy while working_v: rv = self._reachable_verticies (working_v[0], working_v) result.append (self.topological_sort (rv)) for v in rv: working_v.remove (v) return result def _reachable_verticies (self, start, all_v): for v in all_v: v.ts_color = _WHITE self._reachable_dfs_visit (start) return [v for v in all_v if v.ts_color == _BLACK] def _reachable_dfs_visit (self, u): u.ts_color = _BLACK for v in self.adjacent_verticies (u): if v.ts_color == _WHITE: self._reachable_dfs_visit (v) return None
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -