⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 qa_atsc.py

📁 这是用python语言写的一个数字广播的信号处理工具包。利用它
💻 PY
字号:
#!/usr/bin/env python## Copyright 2004,2006 Free Software Foundation, Inc.# # This file is part of GNU Radio# # GNU Radio is free software; you can redistribute it and/or modify# it under the terms of the GNU General Public License as published by# the Free Software Foundation; either version 3, or (at your option)# any later version.# # GNU Radio is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the# GNU General Public License for more details.# # You should have received a copy of the GNU General Public License# along with GNU Radio; see the file COPYING.  If not, write to# the Free Software Foundation, Inc., 51 Franklin Street,# Boston, MA 02110-1301, USA.# from gnuradio import gr, gr_unittestimport atsc                    # qa code needs to run without being installed#from gnuradio import atscfrom atsc_utils import *import sysclass memoize(object):    def __init__(self, thunk):        self.thunk = thunk        self.cached = False        self.value = None    def __call__(self):        if self.cached:            return self.value        self.value = self.thunk()        self.cached = True        return self.value"""Make a fake transport stream that's big enough for our purposes.We generate 8 full fields.  This is relatively expensive.  Ittakes about 2 seconds to execute.  """make_transport_stream = \    memoize(lambda : tuple(make_fake_transport_stream_packet(8 * atsc.ATSC_DSEGS_PER_FIELD)))def pad_transport_stream(src):    """    An MPEG transport stream packet is 188 bytes long.  Internally we use a packet    that is 256 bytes long to help with buffer alignment.  This function adds the    appropriate trailing padding to convert each packet from 188 to 256 bytes.    """    return pad_stream(src, atsc.sizeof_atsc_mpeg_packet, atsc.sizeof_atsc_mpeg_packet_pad)def depad_transport_stream(src):    """    An MPEG transport stream packet is 188 bytes long.  Internally we use a packet    that is 256 bytes long to help with buffer alignment.  This function removes the    trailing padding to convert each packet from 256 back to 188 bytes.    """    return depad_stream(src, atsc.sizeof_atsc_mpeg_packet, atsc.sizeof_atsc_mpeg_packet_pad)class vector_source_ts(gr.hier_block):    """    MPEG Transport stream source for testing.    """    def __init__(self, fg, ts):        """        Pad tranport stream packets to 256 bytes and reformat appropriately.                @param fg: flow graph        @param ts: MPEG transport stream.        @type  ts: sequence of ints in [0,255]; len(ts) % 188 == 0        """        src = gr.vector_source_b(pad_transport_stream(ts))        s2v = gr.stream_to_vector(gr.sizeof_char, atsc.sizeof_atsc_mpeg_packet)        fg.connect(src, s2v)        gr.hier_block.__init__(self, fg, None, s2v)class vector_sink_ts(gr.hier_block):    """    MPEG Transport stream sink for testing.    """    def __init__(self, fg):        """        @param fg: flow graph        """        v2s = gr.vector_to_stream(gr.sizeof_char, atsc.sizeof_atsc_mpeg_packet)        self.sink = gr.vector_sink_b()        fg.connect(v2s, self.sink)        gr.hier_block.__init__(self, fg, v2s, None)    def data(self):        """        Extracts tranport stream from sink and returns it to python.        Depads tranport stream packets from 256 back to 188 bytes.        @rtype: tuple of ints in [0,255]; len(result) % 188 == 0        """        return tuple(depad_transport_stream(self.sink.data()))class qa_atsc(gr_unittest.TestCase):    def setUp(self):        self.fg = gr.flow_graph()    def tearDown(self):        self.fg = None    # The tests are run in alphabetical order    def test_loopback_000(self):        """        Loopback randomizer to derandomizer        """        src_data = make_transport_stream()        expected_result = src_data        src = vector_source_ts(self.fg, src_data)        rand = atsc.randomizer()        derand = atsc.derandomizer()        dst = vector_sink_ts(self.fg)        self.fg.connect(src, rand, derand, dst)        self.fg.run ()        result_data = dst.data ()        self.assertEqual (expected_result, result_data)    def test_loopback_001(self):        """        Loopback randomizer/rs_encoder to rs_decoder/derandomizer        """        src_data = make_transport_stream()        expected_result = src_data        src = vector_source_ts(self.fg, src_data)        rand = atsc.randomizer()        rs_enc = atsc.rs_encoder()        rs_dec = atsc.rs_decoder()        derand = atsc.derandomizer()        dst = vector_sink_ts(self.fg)        self.fg.connect(src, rand, rs_enc, rs_dec, derand, dst)        self.fg.run ()        result_data = dst.data ()        self.assertEqual (expected_result, result_data)    def test_loopback_002(self):        """        Loopback randomizer/rs_encoder/interleaver to	deinterleaver/rs_decoder/derandomizer         """        src_data = make_transport_stream()	interleaver_delay = 52        expected_result = src_data[0:len(src_data)-(interleaver_delay*atsc.ATSC_MPEG_PKT_LENGTH)]        src = vector_source_ts(self.fg, src_data)        rand = atsc.randomizer()        rs_enc = atsc.rs_encoder()	inter = atsc.interleaver()	deinter = atsc.deinterleaver()        rs_dec = atsc.rs_decoder()        derand = atsc.derandomizer()        dst = vector_sink_ts(self.fg)        self.fg.connect(src, rand, rs_enc, inter, deinter, rs_dec, derand, dst)        self.fg.run ()        result_data = dst.data ()	result_data = result_data[(interleaver_delay*atsc.ATSC_MPEG_PKT_LENGTH):len(result_data)]        self.assertEqual (expected_result, result_data)    def test_loopback_003(self):        """        Loopback randomizer/rs_encoder/interleaver/trellis_encoder	via ds_to_softds to	viterbi_decoder/deinterleaver/rs_decoder/derandomizer         """        src_data = make_transport_stream()	interleaver_delay = 52	viterbi_delay = 12        expected_result = src_data[0:len(src_data)-((interleaver_delay+viterbi_delay)*atsc.ATSC_MPEG_PKT_LENGTH)]        src = vector_source_ts(self.fg, src_data)        rand = atsc.randomizer()        rs_enc = atsc.rs_encoder()        inter = atsc.interleaver()	trellis = atsc.trellis_encoder()	softds = atsc.ds_to_softds()	viterbi = atsc.viterbi_decoder()        deinter = atsc.deinterleaver()        rs_dec = atsc.rs_decoder()        derand = atsc.derandomizer()        dst = vector_sink_ts(self.fg)	self.fg.connect(src, rand, rs_enc, inter, trellis, softds, viterbi, deinter, rs_dec, derand, dst)        self.fg.run ()        result_data = dst.data ()[((interleaver_delay+viterbi_delay)*atsc.ATSC_MPEG_PKT_LENGTH):len(dst.data())]        self.assertEqual (expected_result, result_data)        if __name__ == '__main__':    gr_unittest.main()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -