⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ra_waterfallsink.py

📁 这是用python语言写的一个数字广播的信号处理工具包。利用它
💻 PY
📖 第 1 页 / 共 2 页
字号:
        if dc is None:            dc = wx.BufferedDC( wx.ClientDC(self), (w,h) )        dc.SetBackground( wx.Brush( self.GetBackgroundColour(), wx.SOLID ) )        dc.Clear()        x, y = self.GetViewStart()        x *= self.ppsh        ih = min( h - self.h_scale, self.im_size[1] - self.im_cur )        r = wx.Rect( x, self.im_cur, w, ih )        bm = wx.BitmapFromImage( self.im.GetSubImage(r) )        dc.DrawBitmap( bm, 0, self.h_scale )        rem = min( self.im_size[1] - ih, h - ih - self.h_scale )        if( rem > 0 ):            r = wx.Rect( x, 0, w, rem )            bm = wx.BitmapFromImage( self.im.GetSubImage(r) )            dc.DrawBitmap( bm, 0, ih + self.h_scale )                # Draw axis        if self.baseband_freq != self.fftsink.baseband_freq:            self.baseband_freq = self.fftsink.baseband_freq            t = self.fftsink.sample_rate*w/float(self.fftsink.fft_size)            self.ax_spec = axis_design( self.baseband_freq - t/2,                                        self.baseband_freq + t/2, 7 )        dc.SetFont( wx.SMALL_FONT )        fo = self.baseband_freq        po = self.fftsink.fft_size/2        pph = self.fftsink.fft_size/float(self.fftsink.sample_rate)        f = math.floor((fo-po/pph)/self.ax_spec[1])*self.ax_spec[1]        while True:            t = po + ( f - fo )*pph            s = str( f*self.ax_spec[3] )            e = dc.GetTextExtent( s )            if t - e[1]/2 >= x + w:                break            dc.DrawText( s, t - x - e[0]/2, 0 )            dc.DrawLine( t - x, e[1] - 1, t - x, self.h_scale )            dt = self.ax_spec[1]/self.ax_spec[2]*pph            for i in range(self.ax_spec[2]-1):                t += dt                if t >= x + w:                    break                dc.DrawLine( t - x, e[1] + 1, t - x, self.h_scale )            f += self.ax_spec[1]    def const_list(self,const,len):        a = [const]        for i in range(1,len):            a.append(const)        return a    def make_colormap(self):        r = []        r.extend(self.const_list(0,96))        r.extend(range(0,255,4))        r.extend(self.const_list(255,64))        r.extend(range(255,128,-4))                g = []        g.extend(self.const_list(0,32))        g.extend(range(0,255,4))        g.extend(self.const_list(255,64))        g.extend(range(255,0,-4))        g.extend(self.const_list(0,32))                b = range(128,255,4)        b.extend(self.const_list(255,64))        b.extend(range(255,0,-4))        b.extend(self.const_list(0,96))        return (r,g,b)    def set_data (self, evt):        dB = evt.data        L = len (dB)        if self.ofunc != None:            self.ofunc(evt.data, L)        #dc1 = wx.MemoryDC()        #dc1.SelectObject(self.bm)        # Scroll existing bitmap        if 1:            #dc1.Blit(0,1,self.bm_size[0],self.bm_size[1]-1,dc1,0,0,            #         wx.COPY,False,-1,-1)            pass        else:            for i in range( self.bm_size[1]-1, 0, -1 ):                dc1.Blit( 0, i, self.bm_size[0], 1, dc1, 0, i-1 )        x = max(abs(self.fftsink.sample_rate), abs(self.fftsink.baseband_freq))        if x >= 1e9:            sf = 1e-9            units = "GHz"        elif x >= 1e6:            sf = 1e-6            units = "MHz"        else:            sf = 1e-3            units = "kHz"        if self.fftsink.input_is_real:     # only plot 1/2 the points            d_max = L/2            p_width = 2        else:            d_max = L/2            p_width = 1        scale_factor = self.scale_factor        dB -= self.ref_level        dB *= scale_factor        dB = dB.astype(numpy.int_).clip( min=0, max=255 )        if self.fftsink.input_is_real:     # real fft            dB = numpy.array( ( dB[0:d_max][::-1], dB[0:d_max] ) )        else:                               # complex fft            dB = numpy.concatenate( ( dB[d_max:L], dB[0:d_max] ) )        dB = self.rgb[dB]        img = wx.ImageFromData( L, 1, dB.ravel().tostring() )        #bm = wx.BitmapFromImage( img )        #dc1.DrawBitmap( bm, 0, 0 )        ibuf = self.im.GetDataBuffer()        self.im_cur -= 1        if self.im_cur < 0:            self.im_cur = self.im_size[1] - 1        start = 3*self.im_cur*self.im_size[0]        ibuf[start:start+3*self.im_size[0]] = img.GetData()        #del dc1        self.DoDrawing(None)    def on_average(self, evt):        # print "on_average"        self.fftsink.set_average(evt.IsChecked())    def on_right_click(self, event):        menu = self.popup_menu        self.PopupMenu(menu, event.GetPosition())    def build_popup_menu(self):        id_ref_gain = wx.NewId()        self.Bind( wx.EVT_MENU, self.on_ref_gain, id=id_ref_gain )        # make a menu        menu = wx.Menu()        self.popup_menu = menu        menu.Append( id_ref_gain, "Ref Level and Gain" )        self.rg_dialog = None        self.checkmarks = {            #self.id_average : lambda : self.fftsink.average            }    def on_ref_gain( self, evt ):        if self.rg_dialog == None:            self.rg_dialog = rg_dialog( self.parent, self.set_ref_gain,                                        ref=self.ref_level,                                        span=256./self.scale_factor )        self.rg_dialog.Show( True )    def set_ref_gain( self, ref, span ):        self.ref_level = ref        self.scale_factor = 256/spanclass rg_dialog( wx.Dialog ):    def __init__( self, parent, set_function, ref=0, span=256./5. ):        wx.Dialog.__init__( self, parent, -1, "Waterfall Settings" )        self.set_function = set_function        #status_bar = wx.StatusBar( self, -1 )        d_sizer = wx.BoxSizer( wx.VERTICAL )  # dialog sizer        f_sizer = wx.BoxSizer( wx.VERTICAL )  # form sizer        vs = 10        #f_sizer.Add( fn_sizer, 0, flag=wx.TOP, border=10 )        h_sizer = wx.BoxSizer( wx.HORIZONTAL )        self.ref = tab_item( self, "Ref Level:", 4, "dB" )        self.ref.ctrl.SetValue( "%d" % ref )        h_sizer.Add((0,0),1)        h_sizer.Add( self.ref, 0 )        h_sizer.Add((0,0),1)        self.span = tab_item( self, "Range:", 4, "dB" )        self.span.ctrl.SetValue( "%d" % span )        h_sizer.Add( self.span, 0 )        h_sizer.Add((0,0),1)        f_sizer.Add( h_sizer, 0, flag=wx.TOP|wx.EXPAND, border=vs )        d_sizer.Add((0,0),1)        d_sizer.Add( f_sizer, 0, flag=wx.ALIGN_CENTER_HORIZONTAL|wx.EXPAND )        d_sizer.Add((0,0),1)        d_sizer.Add((0,0),1)        button_sizer = wx.BoxSizer( wx.HORIZONTAL )        apply_button = wx.Button( self, -1, "Apply" )        apply_button.Bind( wx.EVT_BUTTON, self.apply_evt )        cancel_button = wx.Button( self, -1, "Cancel" )        cancel_button.Bind( wx.EVT_BUTTON, self.cancel_evt )        ok_button = wx.Button( self, -1, "OK" )        ok_button.Bind( wx.EVT_BUTTON, self.ok_evt )        button_sizer.Add((0,0),1)        button_sizer.Add( apply_button, 0,                          flag=wx.ALIGN_CENTER_HORIZONTAL )        button_sizer.Add((0,0),1)        button_sizer.Add( cancel_button, 0,                          flag=wx.ALIGN_CENTER_HORIZONTAL )        button_sizer.Add((0,0),1)        button_sizer.Add( ok_button, 0,                          flag=wx.ALIGN_CENTER_HORIZONTAL )        button_sizer.Add((0,0),1)        d_sizer.Add( button_sizer, 0,                     flag=wx.EXPAND|wx.ALIGN_CENTER|wx.BOTTOM, border=30 )        self.SetSizer( d_sizer )    def apply_evt( self, evt ):        self.do_apply()    def cancel_evt( self, evt ):        self.Show( False )    def ok_evt( self, evt ):        self.do_apply()        self.Show( False )    def do_apply( self ):        r = float( self.ref.ctrl.GetValue() )        g = float( self.span.ctrl.GetValue() )        self.set_function( r, g )def next_up(v, seq):    """    Return the first item in seq that is > v.    """    for s in seq:        if s > v:            return s    return vdef next_down(v, seq):    """    Return the last item in seq that is < v.    """    rseq = list(seq[:])    rseq.reverse()    for s in rseq:        if s < v:            return s    return v# One of many copies that should be consolidated . . .def tab_item( parent, label, chars, units, style=wx.TE_RIGHT, value="" ):    s = wx.BoxSizer( wx.HORIZONTAL )    s.Add( wx.StaticText( parent, -1, label ), 0,           flag=wx.ALIGN_CENTER_VERTICAL )    s.ctrl = wx.TextCtrl( parent, -1, style=style, value=value )    s.ctrl.SetMinSize( ( (1.00+chars)*s.ctrl.GetCharWidth(),                                 1.25*s.ctrl.GetCharHeight() ) )    s.Add( s.ctrl, -1, flag=wx.LEFT, border=3 )    s.Add( wx.StaticText( parent, -1, units ), 0,           flag=wx.ALIGN_CENTER_VERTICAL|wx.LEFT, border=1 )    return s# ----------------------------------------------------------------#          	      Deprecated interfaces# ----------------------------------------------------------------# returns (block, win).#   block requires a single input stream of float#   win is a subclass of wxWindowdef make_waterfall_sink_f(fg, parent, title, fft_size, input_rate):        block = waterfall_sink_f(fg, parent, title=title, fft_size=fft_size,                             sample_rate=input_rate)    return (block, block.win)# returns (block, win).#   block requires a single input stream of gr_complex#   win is a subclass of wxWindowdef make_waterfall_sink_c(fg, parent, title, fft_size, input_rate):    block = waterfall_sink_c(fg, parent, title=title, fft_size=fft_size,                             sample_rate=input_rate)    return (block, block.win)# ----------------------------------------------------------------# Standalone test app# ----------------------------------------------------------------class test_app_flow_graph (stdgui.gui_flow_graph):    def __init__(self, frame, panel, vbox, argv):        stdgui.gui_flow_graph.__init__ (self, frame, panel, vbox, argv)        fft_size = 512        # build our flow graph        input_rate = 20.000e3        # Generate a complex sinusoid        src1 = gr.sig_source_c (input_rate, gr.GR_SIN_WAVE, 5.75e3, 1000)        #src1 = gr.sig_source_c (input_rate, gr.GR_CONST_WAVE, 5.75e3, 1000)        # We add these throttle blocks so that this demo doesn't        # suck down all the CPU available.  Normally you wouldn't use these.        thr1 = gr.throttle(gr.sizeof_gr_complex, input_rate)        sink1 = waterfall_sink_c (self, panel, title="Complex Data",                                  fft_size=fft_size,                                  sample_rate=input_rate, baseband_freq=0,                                  size=(600,144) )        vbox.Add (sink1.win, 1, wx.EXPAND)        self.connect (src1, thr1, sink1)        # generate a real sinusoid        src2 = gr.sig_source_f (input_rate, gr.GR_SIN_WAVE, 5.75e3, 1000)        #src2 = gr.sig_source_f (input_rate, gr.GR_CONST_WAVE, 5.75e3, 1000)        thr2 = gr.throttle(gr.sizeof_float, input_rate)        sink2 = waterfall_sink_f (self, panel, title="Real Data", fft_size=fft_size,                                  sample_rate=input_rate, baseband_freq=0)        vbox.Add (sink2.win, 1, wx.EXPAND)        self.connect (src2, thr2, sink2)def main ():    app = stdgui.stdapp (test_app_flow_graph,                         "Waterfall Sink Test App")    app.MainLoop ()if __name__ == '__main__':    main ()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -