📄 scopesink.py
字号:
#!/usr/bin/env python## Copyright 2003,2004,2006,2007 Free Software Foundation, Inc.# # This file is part of GNU Radio# # GNU Radio is free software; you can redistribute it and/or modify# it under the terms of the GNU General Public License as published by# the Free Software Foundation; either version 3, or (at your option)# any later version.# # GNU Radio is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU General Public License for more details.# # You should have received a copy of the GNU General Public License# along with GNU Radio; see the file COPYING. If not, write to# the Free Software Foundation, Inc., 51 Franklin Street,# Boston, MA 02110-1301, USA.# from gnuradio import gr, gru, eng_notationfrom gnuradio.wxgui import stdguiimport wximport gnuradio.wxgui.plot as plotimport numpyimport threadingimport structdefault_scopesink_size = (640, 240)default_v_scale = 1000default_frame_decim = gr.prefs().get_long('wxgui', 'frame_decim', 1)class scope_sink_f(gr.hier_block): def __init__(self, fg, parent, title='', sample_rate=1, size=default_scopesink_size, frame_decim=default_frame_decim, v_scale=default_v_scale, t_scale=None): msgq = gr.msg_queue(2) # message queue that holds at most 2 messages self.guts = gr.oscope_sink_f(sample_rate, msgq) gr.hier_block.__init__(self, fg, self.guts, self.guts) self.win = scope_window(win_info (msgq, sample_rate, frame_decim, v_scale, t_scale, self.guts, title), parent) def set_sample_rate(self, sample_rate): self.guts.set_sample_rate(sample_rate) self.win.info.set_sample_rate(sample_rate)class scope_sink_c(gr.hier_block): def __init__(self, fg, parent, title='', sample_rate=1, size=default_scopesink_size, frame_decim=default_frame_decim, v_scale=default_v_scale, t_scale=None): msgq = gr.msg_queue(2) # message queue that holds at most 2 messages c2f = gr.complex_to_float() self.guts = gr.oscope_sink_f(sample_rate, msgq) fg.connect((c2f, 0), (self.guts, 0)) fg.connect((c2f, 1), (self.guts, 1)) gr.hier_block.__init__(self, fg, c2f, self.guts) self.win = scope_window(win_info(msgq, sample_rate, frame_decim, v_scale, t_scale, self.guts, title), parent) def set_sample_rate(self, sample_rate): self.guts.set_sample_rate(sample_rate) self.win.info.set_sample_rate(sample_rate)# ========================================================================# This is the deprecated interface, retained for compatibility...## returns (block, win).# block requires a N input stream of float# win is a subclass of wxWindowdef make_scope_sink_f (fg, parent, label, input_rate): block = scope_sink_f(fg, parent, title=label, sample_rate=input_rate) return (block, block.win)# ========================================================================time_base_list = [ # time / division 1.0e-7, # 100ns / div 2.5e-7, 5.0e-7, 1.0e-6, # 1us / div 2.5e-6, 5.0e-6, 1.0e-5, # 10us / div 2.5e-5, 5.0e-5, 1.0e-4, # 100us / div 2.5e-4, 5.0e-4, 1.0e-3, # 1ms / div 2.5e-3, 5.0e-3, 1.0e-2, # 10ms / div 2.5e-2, 5.0e-2 ]v_scale_list = [ # counts / div, LARGER gains are SMALLER /div, appear EARLIER 2.0e-3, # 2m / div, don't call it V/div it's actually counts/div 5.0e-3, 1.0e-2, 2.0e-2, 5.0e-2, 1.0e-1, 2.0e-1, 5.0e-1, 1.0e+0, 2.0e+0, 5.0e+0, 1.0e+1, 2.0e+1, 5.0e+1, 1.0e+2, 2.0e+2, 5.0e+2, 1.0e+3, 2.0e+3, 5.0e+3, 1.0e+4 # 10000 /div, USRP full scale is -/+ 32767 ] wxDATA_EVENT = wx.NewEventType()def EVT_DATA_EVENT(win, func): win.Connect(-1, -1, wxDATA_EVENT, func)class DataEvent(wx.PyEvent): def __init__(self, data): wx.PyEvent.__init__(self) self.SetEventType (wxDATA_EVENT) self.data = data def Clone (self): self.__class__ (self.GetId())class win_info (object): __slots__ = ['msgq', 'sample_rate', 'frame_decim', 'v_scale', 'scopesink', 'title', 'time_scale_cursor', 'v_scale_cursor', 'marker', 'xy', 'autorange', 'running'] def __init__ (self, msgq, sample_rate, frame_decim, v_scale, t_scale, scopesink, title = "Oscilloscope"): self.msgq = msgq self.sample_rate = sample_rate self.frame_decim = frame_decim self.scopesink = scopesink self.title = title; self.time_scale_cursor = gru.seq_with_cursor(time_base_list, initial_value = t_scale) self.v_scale_cursor = gru.seq_with_cursor(v_scale_list, initial_value = v_scale) self.marker = 'line' self.xy = False if v_scale == None: # 0 and None are both False, but 0 != None self.autorange = True else: self.autorange = False # 0 is a valid v_scale self.running = True def get_time_per_div (self): return self.time_scale_cursor.current () def get_volts_per_div (self): return self.v_scale_cursor.current () def set_sample_rate(self, sample_rate): self.sample_rate = sample_rate def get_sample_rate (self): return self.sample_rate def get_decimation_rate (self): return 1.0 def set_marker (self, s): self.marker = s def get_marker (self): return self.markerclass input_watcher (threading.Thread): def __init__ (self, msgq, event_receiver, frame_decim, **kwds): threading.Thread.__init__ (self, **kwds) self.setDaemon (1) self.msgq = msgq self.event_receiver = event_receiver self.frame_decim = frame_decim self.iscan = 0 self.keep_running = True self.start () def run (self): # print "input_watcher: pid = ", os.getpid () while (self.keep_running): msg = self.msgq.delete_head() # blocking read of message queue if self.iscan == 0: # only display at frame_decim self.iscan = self.frame_decim nchan = int(msg.arg1()) # number of channels of data in msg nsamples = int(msg.arg2()) # number of samples in each channel s = msg.to_string() # get the body of the msg as a string bytes_per_chan = nsamples * gr.sizeof_float records = [] for ch in range (nchan): start = ch * bytes_per_chan chan_data = s[start:start+bytes_per_chan] rec = numpy.fromstring (chan_data, numpy.float32) records.append (rec) # print "nrecords = %d, reclen = %d" % (len (records),nsamples) de = DataEvent (records) wx.PostEvent (self.event_receiver, de) records = [] del de # end if iscan == 0 self.iscan -= 1 class scope_window (wx.Panel): def __init__ (self, info, parent, id = -1, pos = wx.DefaultPosition, size = wx.DefaultSize, name = ""): wx.Panel.__init__ (self, parent, -1) self.info = info vbox = wx.BoxSizer (wx.VERTICAL) self.graph = graph_window (info, self, -1) vbox.Add (self.graph, 1, wx.EXPAND) vbox.Add (self.make_control_box(), 0, wx.EXPAND) vbox.Add (self.make_control2_box(), 0, wx.EXPAND) self.sizer = vbox self.SetSizer (self.sizer) self.SetAutoLayout (True) self.sizer.Fit (self) self.set_autorange(self.info.autorange) # second row of control buttons etc. appears BELOW control_box def make_control2_box (self): ctrlbox = wx.BoxSizer (wx.HORIZONTAL) self.inc_v_button = wx.Button (self, 1101, " < ", style=wx.BU_EXACTFIT) self.inc_v_button.SetToolTipString ("Increase vertical range") wx.EVT_BUTTON (self, 1101, self.incr_v_scale) # ID matches button ID above self.dec_v_button = wx.Button (self, 1100, " > ", style=wx.BU_EXACTFIT) self.dec_v_button.SetToolTipString ("Decrease vertical range") wx.EVT_BUTTON (self, 1100, self.decr_v_scale) self.v_scale_label = wx.StaticText (self, 1002, "None") # vertical /div self.update_v_scale_label () self.autorange_checkbox = wx.CheckBox (self, 1102, "Autorange") self.autorange_checkbox.SetToolTipString ("Select autorange on/off") wx.EVT_CHECKBOX(self, 1102, self.autorange_checkbox_event) ctrlbox.Add ((5,0) ,0) # left margin space ctrlbox.Add (self.inc_v_button, 0, wx.EXPAND) ctrlbox.Add (self.dec_v_button, 0, wx.EXPAND) ctrlbox.Add (self.v_scale_label, 0, wx.ALIGN_CENTER) ctrlbox.Add ((20,0) ,0) # spacer ctrlbox.Add (self.autorange_checkbox, 0, wx.ALIGN_CENTER) return ctrlbox def make_control_box (self): ctrlbox = wx.BoxSizer (wx.HORIZONTAL) tb_left = wx.Button (self, 1001, " < ", style=wx.BU_EXACTFIT) tb_left.SetToolTipString ("Increase time base") wx.EVT_BUTTON (self, 1001, self.incr_timebase) tb_right = wx.Button (self, 1000, " > ", style=wx.BU_EXACTFIT) tb_right.SetToolTipString ("Decrease time base") wx.EVT_BUTTON (self, 1000, self.decr_timebase) self.time_base_label = wx.StaticText (self, 1002, "") self.update_timebase_label () ctrlbox.Add ((5,0) ,0) # ctrlbox.Add (wx.StaticText (self, -1, "Horiz Scale: "), 0, wx.ALIGN_CENTER) ctrlbox.Add (tb_left, 0, wx.EXPAND) ctrlbox.Add (tb_right, 0, wx.EXPAND) ctrlbox.Add (self.time_base_label, 0, wx.ALIGN_CENTER) ctrlbox.Add ((10,0) ,1) # stretchy space ctrlbox.Add (wx.StaticText (self, -1, "Trig: "), 0, wx.ALIGN_CENTER) self.trig_chan_choice = wx.Choice (self, 1004, choices = ['Ch1', 'Ch2', 'Ch3', 'Ch4']) self.trig_chan_choice.SetToolTipString ("Select channel for trigger") wx.EVT_CHOICE (self, 1004, self.trig_chan_choice_event) ctrlbox.Add (self.trig_chan_choice, 0, wx.ALIGN_CENTER) self.trig_mode_choice = wx.Choice (self, 1005, choices = ['Auto', 'Pos', 'Neg']) self.trig_mode_choice.SetToolTipString ("Select trigger slope or Auto (untriggered roll)") wx.EVT_CHOICE (self, 1005, self.trig_mode_choice_event) ctrlbox.Add (self.trig_mode_choice, 0, wx.ALIGN_CENTER) trig_level50 = wx.Button (self, 1006, "50%") trig_level50.SetToolTipString ("Set trigger level to 50%") wx.EVT_BUTTON (self, 1006, self.set_trig_level50) ctrlbox.Add (trig_level50, 0, wx.EXPAND) run_stop = wx.Button (self, 1007, "Run/Stop") run_stop.SetToolTipString ("Toggle Run/Stop mode") wx.EVT_BUTTON (self, 1007, self.run_stop) ctrlbox.Add (run_stop, 0, wx.EXPAND)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -