⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 qa_bin_statistics.py

📁 这是用python语言写的一个数字广播的信号处理工具包。利用它
💻 PY
字号:
#!/usr/bin/env python## Copyright 2006 Free Software Foundation, Inc.# # This file is part of GNU Radio# # GNU Radio is free software; you can redistribute it and/or modify# it under the terms of the GNU General Public License as published by# the Free Software Foundation; either version 3, or (at your option)# any later version.# # GNU Radio is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the# GNU General Public License for more details.# # You should have received a copy of the GNU General Public License# along with GNU Radio; see the file COPYING.  If not, write to# the Free Software Foundation, Inc., 51 Franklin Street,# Boston, MA 02110-1301, USA.# from gnuradio import gr, gr_unittestimport randomimport struct#import os#print "pid =", os.getpid()#raw_input("Attach gdb and press return...")class counter(gr.feval_dd):    def __init__(self, step_size=1):        gr.feval_dd.__init__(self)        self.step_size = step_size        self.count = 0    def eval(self, input):        #print "eval: self.count =", self.count        t = self.count        self.count = self.count + self.step_size        return t        class counter3(gr.feval_dd):    def __init__(self, f, step_size):        gr.feval_dd.__init__(self)        self.f = f        self.step_size = step_size        self.count = 0    def eval(self, input):        try:            #print "eval: self.count =", self.count            t = self.count            self.count = self.count + self.step_size            self.f(self.count)        except Exception, e:            print "Exception: ", e        return t        def foobar3(new_t):    #print "foobar3: new_t =", new_t    passclass counter4(gr.feval_dd):    def __init__(self, obj_instance, step_size):        gr.feval_dd.__init__(self)        self.obj_instance = obj_instance        self.step_size = step_size        self.count = 0    def eval(self, input):        try:            #print "eval: self.count =", self.count            t = self.count            self.count = self.count + self.step_size            self.obj_instance.foobar4(self.count)        except Exception, e:            print "Exception: ", e        return t        class parse_msg(object):    def __init__(self, msg):        self.center_freq = msg.arg1()        self.vlen = int(msg.arg2())        assert(msg.length() == self.vlen * gr.sizeof_float)        self.data = struct.unpack('%df' % (self.vlen,), msg.to_string())class test_bin_statistics(gr_unittest.TestCase):    def setUp(self):        self.fg = gr.flow_graph ()    def tearDown(self):        self.fg = None    def test_001(self):        vlen = 4        tune = counter(1)        tune_delay = 0        dwell_delay = 1        msgq = gr.msg_queue()        src_data = tuple([float(x) for x in                          ( 1,  2,  3,  4,                            5,  6,  7,  8,                            9, 10, 11, 12,                            13, 14, 15, 16                            )])        expected_results = tuple([float(x) for x in                                  ( 1,  2,  3,  4,                                    5,  6,  7,  8,                                    9, 10, 11, 12,                                    13, 14, 15, 16                                    )])                                    src = gr.vector_source_f(src_data, False)        s2v = gr.stream_to_vector(gr.sizeof_float, vlen)        stats = gr.bin_statistics_f(vlen, msgq, tune, tune_delay, dwell_delay)        self.fg.connect(src, s2v, stats)        self.fg.run()        self.assertEqual(4, msgq.count())        for i in range(4):            m = parse_msg(msgq.delete_head())            #print "m =", m.center_freq, m.data            self.assertEqual(expected_results[vlen*i:vlen*i + vlen], m.data)    def test_002(self):        vlen = 4        tune = counter(1)        tune_delay = 1        dwell_delay = 2        msgq = gr.msg_queue()        src_data = tuple([float(x) for x in                          ( 1,  2,  3,  4,                            9,  6, 11,  8,                            5, 10,  7, 12,                            13, 14, 15, 16                            )])        expected_results = tuple([float(x) for x in                                  ( 9, 10, 11, 12)])                                    src = gr.vector_source_f(src_data, False)        s2v = gr.stream_to_vector(gr.sizeof_float, vlen)        stats = gr.bin_statistics_f(vlen, msgq, tune, tune_delay, dwell_delay)        self.fg.connect(src, s2v, stats)        self.fg.run()        self.assertEqual(1, msgq.count())        for i in range(1):            m = parse_msg(msgq.delete_head())            #print "m =", m.center_freq, m.data            self.assertEqual(expected_results[vlen*i:vlen*i + vlen], m.data)    def test_003(self):        vlen = 4        tune = counter3(foobar3, 1)        tune_delay = 1        dwell_delay = 2        msgq = gr.msg_queue()        src_data = tuple([float(x) for x in                          ( 1,  2,  3,  4,                            9,  6, 11,  8,                            5, 10,  7, 12,                            13, 14, 15, 16                            )])        expected_results = tuple([float(x) for x in                                  ( 9, 10, 11, 12)])                                    src = gr.vector_source_f(src_data, False)        s2v = gr.stream_to_vector(gr.sizeof_float, vlen)        stats = gr.bin_statistics_f(vlen, msgq, tune, tune_delay, dwell_delay)        self.fg.connect(src, s2v, stats)        self.fg.run()        self.assertEqual(1, msgq.count())        for i in range(1):            m = parse_msg(msgq.delete_head())            #print "m =", m.center_freq, m.data            self.assertEqual(expected_results[vlen*i:vlen*i + vlen], m.data)    def foobar4(self, new_t):        #print "foobar4: new_t =", new_t        pass            def test_004(self):        vlen = 4        tune = counter4(self, 1)        tune_delay = 1        dwell_delay = 2        msgq = gr.msg_queue()        src_data = tuple([float(x) for x in                          ( 1,  2,  3,  4,                            9,  6, 11,  8,                            5, 10,  7, 12,                            13, 14, 15, 16                            )])        expected_results = tuple([float(x) for x in                                  ( 9, 10, 11, 12)])                                    src = gr.vector_source_f(src_data, False)        s2v = gr.stream_to_vector(gr.sizeof_float, vlen)        stats = gr.bin_statistics_f(vlen, msgq, tune, tune_delay, dwell_delay)        self.fg.connect(src, s2v, stats)        self.fg.run()        self.assertEqual(1, msgq.count())        for i in range(1):            m = parse_msg(msgq.delete_head())            #print "m =", m.center_freq, m.data            self.assertEqual(expected_results[vlen*i:vlen*i + vlen], m.data)if __name__ == '__main__':   gr_unittest.main ()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -