⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 basic_flow_graph.py

📁 gnuradio软件无线电源程序.现在的手机多基于软件无线电
💻 PY
字号:
## Copyright 2004 Free Software Foundation, Inc.# # This file is part of GNU Radio# # GNU Radio is free software; you can redistribute it and/or modify# it under the terms of the GNU General Public License as published by# the Free Software Foundation; either version 2, or (at your option)# any later version.# # GNU Radio is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the# GNU General Public License for more details.# # You should have received a copy of the GNU General Public License# along with GNU Radio; see the file COPYING.  If not, write to# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,# Boston, MA 02111-1307, USA.# from gnuradio_swig_python import gr_block_sptrimport typesimport hier_blockdef remove_duplicates (seq):    new = []    for x in seq:        if not x in new:            new.append (x)    return newclass endpoint (object):    __slots__ = ['block', 'port']    def __init__ (self, block, port):        self.block = block        self.port = port    def __cmp__ (self, other):        if self.block == other.block and self.port == other.port:            return 0        return 1    def __str__ (self):        return '<endpoint (%s, %s)>' % (self.block, self.port)        def expand_src_endpoint (src_endpoint):    # A src_endpoint is an output of a block    src_endpoint = coerce_endpoint (src_endpoint)    if isinstance (src_endpoint.block, hier_block.hier_block_base):        return expand_src_endpoint (            coerce_endpoint (src_endpoint.block.resolve_output_port(src_endpoint.port)))    else:        return src_endpointdef expand_dst_endpoint (dst_endpoint):    # a dst_endpoint is the input to a block    dst_endpoint = coerce_endpoint (dst_endpoint)    if isinstance (dst_endpoint.block, hier_block.hier_block_base):        exp = [coerce_endpoint(x) for x in               dst_endpoint.block.resolve_input_port(dst_endpoint.port)]        return expand_dst_endpoints (exp)    else:        return [dst_endpoint]def expand_dst_endpoints (endpoint_list):    r = []    for e in endpoint_list:        r.extend (expand_dst_endpoint (e))    return r           def coerce_endpoint (x):    if isinstance (x, endpoint):        return x    elif isinstance (x, types.TupleType) and len (x) == 2:        return endpoint (x[0], x[1])    elif hasattr (x, 'block'):          # assume it's a block        return endpoint (x, 0)    elif isinstance(x, hier_block.hier_block_base):        return endpoint (x, 0)    else:        raise ValueError, "Not coercible to endpoint: %s" % (x,)    class edge (object):    __slots__ = ['src', 'dst']    def __init__ (self, src_endpoint, dst_endpoint):        self.src = src_endpoint        self.dst = dst_endpoint    def __cmp__ (self, other):        if self.src == other.src and self.dst == other.dst:            return 0        return 1    def __repr__ (self):        return '<edge (%s, %s)>' % (self.src, self.dst)class basic_flow_graph (object):    '''basic_flow_graph -- describe connections between blocks'''    __slots__ = ['edge_list']    def __init__ (self):        self.edge_list = []    def connect (self, *points):        '''connect requires two or more arguments that can be coerced to endpoints.        If more than two arguments are provided, they are connected together successively.        '''        if len (points) < 2:            raise ValueError, ("connect requires at least two endpoints; %d provided." % (len (points),))        for i in range (1, len (points)):            self._connect (points[i-1], points[i])    def _connect (self, src_endpoint, dst_endpoint):        s = expand_src_endpoint (src_endpoint)        for d in expand_dst_endpoint (dst_endpoint):            self._connect_prim (s, d)    def _connect_prim (self, src_endpoint, dst_endpoint):        src_endpoint = coerce_endpoint (src_endpoint)        dst_endpoint = coerce_endpoint (dst_endpoint)        self._check_valid_src_port (src_endpoint)        self._check_valid_dst_port (dst_endpoint)        self._check_dst_in_use (dst_endpoint)        self._check_type_match (src_endpoint, dst_endpoint)        self.edge_list.append (edge (src_endpoint, dst_endpoint))    def disconnect (self, src_endpoint, dst_endpoint):        s = expand_src_endpoint (src_endpoint)        for d in expand_dst_endpoint (dst_endpoint):            self._disconnect_prim (s, d)    def _disconnect_prim (self, src_endpoint, dst_endpoint):        src_endpoint = coerce_endpoint (src_endpoint)        dst_endpoint = coerce_endpoint (dst_endpoint)        e = edge (src_endpoint, dst_endpoint)        self.edge_list.remove (e)    def disconnect_all (self):        self.edge_list = []            def validate (self):        # check all blocks to ensure:        #  (1a) their input ports are contiguously assigned        #  (1b) the number of input ports is between min and max        #  (2a) their output ports are contiguously assigned        #  (2b) the number of output ports is between min and max        #  (3)  check_topology returns true        for m in self.all_blocks ():            # print m            edges = self.in_edges (m)            used_ports = [e.dst.port for e in edges]            ninputs = self._check_contiguity (m, m.input_signature (), used_ports, "input")            edges = self.out_edges (m)            used_ports = [e.src.port for e in edges]            noutputs = self._check_contiguity (m, m.output_signature (), used_ports, "output")            if not m.check_topology (ninputs, noutputs):                raise ValueError, ("%s::check_topology (%d, %d) failed" % (m, ninputs, noutputs))                        # --- public utilities ---                    def all_blocks (self):        '''return list of all blocks in the graph'''        all_blocks = []        for edge in self.edge_list:            m = edge.src.block            if not m in all_blocks:                all_blocks.append (m)            m = edge.dst.block            if not m in all_blocks:                all_blocks.append (m)        return all_blocks            def in_edges (self, m):        '''return list of all edges that have M as a destination'''        return [e for e in self.edge_list if e.dst.block == m]        def out_edges (self, m):        '''return list of all edges that have M as a source'''        return [e for e in self.edge_list if e.src.block == m]        def downstream_verticies (self, m):        return [e.dst.block for e in self.out_edges (m)]    def upstream_verticies (self, m):        return [e.src.block for e in self.in_edges (m)]    def adjacent_verticies (self, m):        '''return list of all verticies adjacent to M'''        return self.downstream_verticies (m) + self.upstream_verticies (m)    def sink_p (self, m):        '''return True iff this block is a sink'''        e = self.out_edges (m)        return len (e) == 0    def source_p (self, m):        '''return True iff this block is a source'''        e = self.in_edges (m)        return len (e) == 0            # --- internal methods ---        def _check_dst_in_use (self, dst_endpoint):        '''Ensure that there is not already an endpoint that terminates at dst_endpoint.'''        x = [ep for ep in self.edge_list if ep.dst == dst_endpoint]        if x:    # already in use            raise ValueError, ("destination endpoint already in use: %s" % (dst_endpoint))    def _check_valid_src_port (self, src_endpoint):        self._check_port (src_endpoint.block.output_signature(), src_endpoint.port)            def _check_valid_dst_port (self, dst_endpoint):        self._check_port (dst_endpoint.block.input_signature(), dst_endpoint.port)            def _check_port (self, signature, port):        if port < 0:            raise ValueError, 'port number out of range.'        if signature.max_streams () == -1: # infinite            return                         # OK        if port >= signature.max_streams ():            raise ValueError, 'port number out of range.'    def _check_type_match (self, src_endpoint, dst_endpoint):        # for now, we just ensure that the stream item sizes match        src_sig = src_endpoint.block.output_signature ()        dst_sig = dst_endpoint.block.input_signature ()        src_size = src_sig.sizeof_stream_item (src_endpoint.port)        dst_size = dst_sig.sizeof_stream_item (dst_endpoint.port)        if src_size != dst_size:            raise ValueError, 'source and destination data sizes are different'    def _check_contiguity (self, m, sig, used_ports, dir):        used_ports.sort ()        used_ports = remove_duplicates (used_ports)        min_s = sig.min_streams ()        l = len (used_ports)        if l == 0:            if min_s == 0:                return l            raise ValueError, ("%s requires %d %s connections.  It has none." %                               (m, min_s, dir))                if used_ports[-1] + 1 < min_s:            raise ValueError, ("%s requires %d %s connections.  It has %d." %                               (m, min_s, dir, used_ports[-1] + 1))                    if used_ports[-1] + 1 != l:            for i in range (l):                if used_ports[i] != i:                    raise ValueError, ("%s %s port %d is not connected" %                                       (m, dir, i))                # print "%s ports: %s" % (dir, used_ports)        return l

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -