⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ra_waterfallsink.py

📁 这是用python语言写的一个数字广播的信号处理工具包。利用它
💻 PY
📖 第 1 页 / 共 2 页
字号:
#!/usr/bin/env python## Copyright 2003,2004,2005 Free Software Foundation, Inc.# # This file is part of GNU Radio# # GNU Radio is free software; you can redistribute it and/or modify# it under the terms of the GNU General Public License as published by# the Free Software Foundation; either version 2, or (at your option)# any later version.# # GNU Radio is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the# GNU General Public License for more details.# # You should have received a copy of the GNU General Public License# along with GNU Radio; see the file COPYING.  If not, write to# the Free Software Foundation, Inc., 51 Franklin Street,# Boston, MA 02110-1301, USA.# from gnuradio import gr, gru, windowfrom gnuradio.wxgui import stdguiimport wximport gnuradio.wxgui.plot as plotimport numpyimport osimport threadingimport math    default_fftsink_size = (640,240)default_fft_rate = gr.prefs().get_long('wxgui', 'fft_rate', 15)def axis_design( x1, x2, nx ):    # Given start, end, and number of labels, return value of first label,    # increment between labels, number of unlabeled division between labels,    # and scale factor.    dx = abs( x2 - x1 )/float(nx+1)  # allow for space at each end    ldx = math.log10(dx)    l2 = math.log10(2.)    l5 = math.log10(5.)    le = math.floor(ldx)    lf = ldx - le    if lf < l2/2:        c = 1        dt = 10    elif lf < (l2+l5)/2:        c = 2        dt = 4    elif lf < (l5+1)/2:        c = 5        dt = 5    else:        c = 1        dt = 10        le += 1    inc = c*pow( 10., le )    first = math.ceil( x1/inc )*inc    scale = 1.    while ( abs(x1*scale) >= 1e5 ) or ( abs(x2*scale) >= 1e5 ):        scale *= 1e-3    return ( first, inc, dt, scale )    class waterfall_sink_base(object):    def __init__(self, input_is_real=False, baseband_freq=0,                 sample_rate=1, fft_size=512,                 fft_rate=default_fft_rate,                 average=False, avg_alpha=None, title='', ofunc=None, xydfunc=None):        # initialize common attributes        self.baseband_freq = baseband_freq        self.sample_rate = sample_rate        self.fft_size = fft_size        self.fft_rate = fft_rate        self.average = average        self.ofunc = ofunc        self.xydfunc = xydfunc        if avg_alpha is None:            self.avg_alpha = 2.0 / fft_rate        else:            self.avg_alpha = avg_alpha        self.title = title        self.input_is_real = input_is_real        self.msgq = gr.msg_queue(2)         # queue up to 2 messages    def reconnect( self, fg ):        fg.connect( *self.block_list )    def set_average(self, average):        self.average = average        if average:            self.avg.set_taps(self.avg_alpha)        else:            self.avg.set_taps(1.0)    def set_avg_alpha(self, avg_alpha):        self.avg_alpha = avg_alpha    def set_baseband_freq(self, baseband_freq):        self.baseband_freq = baseband_freq    def set_sample_rate(self, sample_rate):        self.sample_rate = sample_rate        self._set_n()    def _set_n(self):        self.one_in_n.set_n(max(1, int(self.sample_rate/self.fft_size/self.fft_rate)))        class waterfall_sink_f(gr.hier_block, waterfall_sink_base):    def __init__(self, fg, parent, baseband_freq=0,                 ref_level=0, sample_rate=1, fft_size=512,                 fft_rate=default_fft_rate, average=False, avg_alpha=None,                 title='', size=default_fftsink_size, report=None, span=40, ofunc=None, xydfunc=None):        waterfall_sink_base.__init__(self, input_is_real=True,                                     baseband_freq=baseband_freq,                                     sample_rate=sample_rate,                                     fft_size=fft_size, fft_rate=fft_rate,                                     average=average, avg_alpha=avg_alpha,                                     title=title)                                       s2p = gr.serial_to_parallel(gr.sizeof_float, self.fft_size)        self.one_in_n = gr.keep_one_in_n(gr.sizeof_float * self.fft_size,                                         max(1, int(self.sample_rate/self.fft_size/self.fft_rate)))        mywindow = window.blackmanharris(self.fft_size)        fft = gr.fft_vfc(self.fft_size, True, mywindow)        c2mag = gr.complex_to_mag(self.fft_size)        self.avg = gr.single_pole_iir_filter_ff(1.0, self.fft_size)        log = gr.nlog10_ff(20, self.fft_size, -20*math.log10(self.fft_size))        sink = gr.message_sink(gr.sizeof_float * self.fft_size, self.msgq, True)        self.block_list = (s2p, self.one_in_n, fft, c2mag, self.avg, log, sink)        self.reconnect( fg )        gr.hier_block.__init__(self, fg, s2p, sink)        self.win = waterfall_window(self, parent, size=size, report=report,                                    ref_level=ref_level, span=span, ofunc=ofunc, xydfunc=xydfunc)        self.set_average(self.average)class waterfall_sink_c(gr.hier_block, waterfall_sink_base):    def __init__(self, fg, parent, baseband_freq=0,                 ref_level=0, sample_rate=1, fft_size=512,                 fft_rate=default_fft_rate, average=False, avg_alpha=None,                  title='', size=default_fftsink_size, report=None, span=40, ofunc=None, xydfunc=None):        waterfall_sink_base.__init__(self, input_is_real=False,                                     baseband_freq=baseband_freq,                                     sample_rate=sample_rate,                                     fft_size=fft_size,                                     fft_rate=fft_rate,                                     average=average, avg_alpha=avg_alpha,                                     title=title)        s2p = gr.serial_to_parallel(gr.sizeof_gr_complex, self.fft_size)        self.one_in_n = gr.keep_one_in_n(gr.sizeof_gr_complex * self.fft_size,                                         max(1, int(self.sample_rate/self.fft_size/self.fft_rate)))        mywindow = window.blackmanharris(self.fft_size)        fft = gr.fft_vcc(self.fft_size, True, mywindow)        c2mag = gr.complex_to_mag(self.fft_size)        self.avg = gr.single_pole_iir_filter_ff(1.0, self.fft_size)        log = gr.nlog10_ff(20, self.fft_size, -20*math.log10(self.fft_size))        sink = gr.message_sink(gr.sizeof_float * self.fft_size, self.msgq, True)        self.block_list = (s2p, self.one_in_n, fft, c2mag, self.avg, log, sink)        self.reconnect( fg )        gr.hier_block.__init__(self, fg, s2p, sink)        self.win = waterfall_window(self, parent, size=size, report=report,                                    ref_level=ref_level, span=span, ofunc=ofunc, xydfunc=xydfunc)        self.set_average(self.average)# ------------------------------------------------------------------------myDATA_EVENT = wx.NewEventType()EVT_DATA_EVENT = wx.PyEventBinder (myDATA_EVENT, 0)class DataEvent(wx.PyEvent):    def __init__(self, data):        wx.PyEvent.__init__(self)        self.SetEventType (myDATA_EVENT)        self.data = data    def Clone (self):         self.__class__ (self.GetId())class input_watcher (threading.Thread):    def __init__ (self, msgq, fft_size, event_receiver, **kwds):        threading.Thread.__init__ (self, **kwds)        self.setDaemon (1)        self.msgq = msgq        self.fft_size = fft_size        self.event_receiver = event_receiver        self.keep_running = True        self.start ()    def run (self):        while (self.keep_running):            msg = self.msgq.delete_head()  # blocking read of message queue            itemsize = int(msg.arg1())            nitems = int(msg.arg2())            s = msg.to_string()            # get the body of the msg as a string            # There may be more than one FFT frame in the message.            # If so, we take only the last one            if nitems > 1:                start = itemsize * (nitems - 1)                s = s[start:start+itemsize]            complex_data = numpy.fromstring (s, numpy.float32)            de = DataEvent (complex_data)            wx.PostEvent (self.event_receiver, de)            del de    class waterfall_window (wx.ScrolledWindow):    def __init__ (self, fftsink, parent, id = -1,                  pos = wx.DefaultPosition, size = wx.DefaultSize,                  style = wx.DEFAULT_FRAME_STYLE, name = "", report=None,                  ref_level = 0, span = 50, ofunc=None, xydfunc=None):        wx.ScrolledWindow.__init__(self, parent, id, pos, size,                                   style|wx.HSCROLL, name)        self.parent = parent        self.SetCursor(wx.StockCursor(wx.CURSOR_IBEAM))        self.ref_level = ref_level        self.scale_factor = 256./span        self.ppsh = 128  # pixels per scroll, horizontal        self.SetScrollbars( self.ppsh, 0, fftsink.fft_size/self.ppsh, 0 )        self.fftsink = fftsink        self.size = size        self.report = report        self.ofunc = ofunc        self.xydfunc = xydfunc        dc1 = wx.MemoryDC()        dc1.SetFont( wx.SMALL_FONT )        self.h_scale = dc1.GetCharHeight() + 3        #self.bm_size = ( self.fftsink.fft_size, self.size[1] - self.h_scale )        self.im_size = ( self.fftsink.fft_size, self.size[1] - self.h_scale )        #self.bm = wx.EmptyBitmap( self.bm_size[0], self.bm_size[1], -1)        self.im = wx.EmptyImage( self.im_size[0], self.im_size[1], True )        self.im_cur = 0        self.baseband_freq = None        self.make_pens()        wx.EVT_PAINT( self, self.OnPaint )        wx.EVT_CLOSE (self, self.on_close_window)        #wx.EVT_LEFT_UP(self, self.on_left_up)        #wx.EVT_LEFT_DOWN(self, self.on_left_down)        EVT_DATA_EVENT (self, self.set_data)                self.build_popup_menu()                wx.EVT_CLOSE (self, self.on_close_window)        self.Bind(wx.EVT_RIGHT_UP, self.on_right_click)        self.Bind(wx.EVT_MOTION, self.on_motion)        self.down_pos = None        self.input_watcher = input_watcher(fftsink.msgq, fftsink.fft_size, self)    def on_close_window (self, event):        self.keep_running = False    def on_left_down( self, evt ):        self.down_pos = evt.GetPosition()        self.down_time = evt.GetTimestamp()    def on_left_up( self, evt ):        if self.down_pos:            dt = ( evt.GetTimestamp() - self.down_time )/1000.            pph = self.fftsink.fft_size/float(self.fftsink.sample_rate)            dx =  evt.GetPosition()[0] - self.down_pos[0]            if dx != 0:                rt = pph/dx            else:                rt = 0            t = 'Down time: %f  Delta f: %f  Period: %f' % ( dt, dx/pph, rt )            print t            if self.report:                self.report(t)    def on_motion(self, event):        if self.xydfunc:            pos = event.GetPosition()            self.xydfunc(pos)    def const_list(self,const,len):        return [const] * len    def make_colormap(self):        r = []        r.extend(self.const_list(0,96))        r.extend(range(0,255,4))        r.extend(self.const_list(255,64))        r.extend(range(255,128,-4))                g = []        g.extend(self.const_list(0,32))        g.extend(range(0,255,4))        g.extend(self.const_list(255,64))        g.extend(range(255,0,-4))        g.extend(self.const_list(0,32))                b = range(128,255,4)        b.extend(self.const_list(255,64))        b.extend(range(255,0,-4))        b.extend(self.const_list(0,96))        return (r,g,b)    def make_pens(self):        (r,g,b) = self.make_colormap()        self.rgb = numpy.transpose( numpy.array( (r,g,b) ).astype(numpy.int8) )            def OnPaint(self, event):        dc = wx.BufferedPaintDC(self)        self.DoDrawing( dc )    def DoDrawing(self,dc):        w, h = self.GetClientSizeTuple()        w = min( w, self.fftsink.fft_size )        if w <= 0:            return

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -