📄 usrp_ra_receiver.py
字号:
#!/usr/bin/env python## Copyright 2004,2005 Free Software Foundation, Inc.# # This file is part of GNU Radio# # GNU Radio is free software; you can redistribute it and/or modify# it under the terms of the GNU General Public License as published by# the Free Software Foundation; either version 3, or (at your option)# any later version.# # GNU Radio is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU General Public License for more details.# # You should have received a copy of the GNU General Public License# along with GNU Radio; see the file COPYING. If not, write to# the Free Software Foundation, Inc., 51 Franklin Street,# Boston, MA 02110-1301, USA.# from gnuradio import gr, grufrom gnuradio import usrpfrom usrpm import usrp_dbidfrom gnuradio import eng_notationfrom gnuradio.eng_option import eng_optionfrom gnuradio.wxgui import stdgui, ra_fftsink, ra_stripchartsink, ra_waterfallsink, form, slider, waterfallsinkfrom optparse import OptionParserimport wximport sysimport Numeric import timeimport FFTimport ephemclass continuum_calibration(gr.feval_dd): def eval(self, x): str = globals()["calibration_codelet"] exec(str) return(x)class app_flow_graph(stdgui.gui_flow_graph): def __init__(self, frame, panel, vbox, argv): stdgui.gui_flow_graph.__init__(self) self.frame = frame self.panel = panel parser = OptionParser(option_class=eng_option) parser.add_option("-R", "--rx-subdev-spec", type="subdev", default=(0, 0), help="select USRP Rx side A or B (default=A)") parser.add_option("-d", "--decim", type="int", default=16, help="set fgpa decimation rate to DECIM [default=%default]") parser.add_option("-f", "--freq", type="eng_float", default=None, help="set frequency to FREQ", metavar="FREQ") parser.add_option("-a", "--avg", type="eng_float", default=1.0, help="set spectral averaging alpha") parser.add_option("-i", "--integ", type="eng_float", default=1.0, help="set integration time") parser.add_option("-g", "--gain", type="eng_float", default=None, help="set gain in dB (default is midpoint)") parser.add_option("-l", "--reflevel", type="eng_float", default=30.0, help="Set Total power reference level") parser.add_option("-y", "--division", type="eng_float", default=0.5, help="Set Total power Y division size") parser.add_option("-e", "--longitude", type="eng_float", default=-76.02, help="Set Observer Longitude") parser.add_option("-c", "--latitude", type="eng_float", default=44.85, help="Set Observer Latitude") parser.add_option("-o", "--observing", type="eng_float", default=0.0, help="Set observing frequency") parser.add_option("-x", "--ylabel", default="dB", help="Y axis label") parser.add_option("-z", "--divbase", type="eng_float", default=0.025, help="Y Division increment base") parser.add_option("-v", "--stripsize", type="eng_float", default=2400, help="Size of stripchart, in 2Hz samples") parser.add_option("-F", "--fft_size", type="eng_float", default=1024, help="Size of FFT") parser.add_option("-N", "--decln", type="eng_float", default=999.99, help="Observing declination") parser.add_option("-X", "--prefix", default="./") parser.add_option("-M", "--fft_rate", type="eng_float", default=8.0, help="FFT Rate") parser.add_option("-A", "--calib_coeff", type="eng_float", default=1.0, help="Calibration coefficient") parser.add_option("-B", "--calib_offset", type="eng_float", default=0.0, help="Calibration coefficient") parser.add_option("-W", "--waterfall", action="store_true", default=False, help="Use Waterfall FFT display") parser.add_option("-S", "--setimode", action="store_true", default=False, help="Enable SETI processing of spectral data") parser.add_option("-K", "--setik", type="eng_float", default=1.5, help="K value for SETI analysis") parser.add_option("-T", "--setibandwidth", type="eng_float", default=12500, help="Instantaneous SETI observing bandwidth--must be divisor of 250Khz") parser.add_option("-n", "--notches", action="store_true", default=False, help="Notches appear after all other arguments") parser.add_option("-Q", "--seti_range", type="eng_float", default=1.0e6, help="Total scan width, in Hz for SETI scans") (options, args) = parser.parse_args() self.notches = Numeric.zeros(64,Numeric.Float64) if len(args) != 0 and options.notches == False: parser.print_help() sys.exit(1) if len(args) == 0 and options.notches != False: parser.print_help() sys.exit() self.use_notches = options.notches # Get notch locations j = 0 for i in args: self.notches[j] = float(i) j = j+1 self.notch_count = j self.show_debug_info = True # Pick up waterfall option self.waterfall = options.waterfall # SETI mode stuff self.setimode = options.setimode self.seticounter = 0 self.setik = options.setik self.seti_fft_bandwidth = int(options.setibandwidth) # Calculate binwidth binwidth = self.seti_fft_bandwidth / options.fft_size # Use binwidth, and knowledge of likely chirp rates to set reasonable # values for SETI analysis code. We assume that SETI signals will # chirp at somewhere between 0.10Hz/sec and 0.25Hz/sec. # # upper_limit is the "worst case"--that is, the case for which we have # to wait the longest to actually see any drift, due to the quantizing # on FFT bins. upper_limit = binwidth / 0.10 self.setitimer = int(upper_limit * 2.00) self.scanning = True # Calculate the CHIRP values based on Hz/sec self.CHIRP_LOWER = 0.10 * self.setitimer self.CHIRP_UPPER = 0.25 * self.setitimer # Reset hit counters to 0 self.hitcounter = 0 self.s1hitcounter = 0 self.s2hitcounter = 0 self.avgdelta = 0 # We scan through 2Mhz of bandwidth around the chosen center freq self.seti_freq_range = options.seti_range # Calculate lower edge self.setifreq_lower = options.freq - (self.seti_freq_range/2) self.setifreq_current = options.freq # Calculate upper edge self.setifreq_upper = options.freq + (self.seti_freq_range/2) # Maximum "hits" in a line self.nhits = 20 # Number of lines for analysis self.nhitlines = 4 # We change center frequencies based on nhitlines and setitimer self.setifreq_timer = self.setitimer * (self.nhitlines * 5) # Create actual timer self.seti_then = time.time() # The hits recording array self.hits_array = Numeric.zeros((self.nhits,self.nhitlines), Numeric.Float64) self.hit_intensities = Numeric.zeros((self.nhits,self.nhitlines), Numeric.Float64) # Calibration coefficient and offset self.calib_coeff = options.calib_coeff self.calib_offset = options.calib_offset if self.calib_offset < -750: self.calib_offset = -750 if self.calib_offset > 750: self.calib_offset = 750 if self.calib_coeff < 1: self.calib_coeff = 1 if self.calib_coeff > 100: self.calib_coeff = 100 self.integ = options.integ self.avg_alpha = options.avg self.gain = options.gain self.decln = options.decln # Set initial values for datalogging timed-output self.continuum_then = time.time() self.spectral_then = time.time() # build the graph # # If SETI mode, we always run at maximum USRP decimation # if (self.setimode): options.decim = 256 self.u = usrp.source_c(decim_rate=options.decim) self.u.set_mux(usrp.determine_rx_mux_value(self.u, options.rx_subdev_spec)) # Set initial declination self.decln = options.decln # determine the daughterboard subdevice we're using self.subdev = usrp.selected_subdev(self.u, options.rx_subdev_spec) self.cardtype = self.subdev.dbid() input_rate = self.u.adc_freq() / self.u.decim_rate() # # Set prefix for data files # self.prefix = options.prefix # # The lower this number, the fewer sample frames are dropped # in computing the FFT. A sampled approach is taken to # computing the FFT of the incoming data, which reduces # sensitivity. Increasing sensitivity inreases CPU loading. # self.fft_rate = options.fft_rate self.fft_size = int(options.fft_size) # This buffer is used to remember the most-recent FFT display # values. Used later by self.write_spectral_data() to write # spectral data to datalogging files, and by the SETI analysis # function. # self.fft_outbuf = Numeric.zeros(self.fft_size, Numeric.Float64) # # If SETI mode, only look at seti_fft_bandwidth # at a time. # if (self.setimode): self.fft_input_rate = self.seti_fft_bandwidth # # Build a decimating bandpass filter # self.fft_input_taps = gr.firdes.complex_band_pass (1.0, input_rate, -(int(self.fft_input_rate/2)), int(self.fft_input_rate/2), 200, gr.firdes.WIN_HAMMING, 0) # # Compute required decimation factor # decimation = int(input_rate/self.fft_input_rate) self.fft_bandpass = gr.fir_filter_ccc (decimation, self.fft_input_taps) else: self.fft_input_rate = input_rate # Set up FFT display if self.waterfall == False: self.scope = ra_fftsink.ra_fft_sink_c (self, panel, fft_size=int(self.fft_size), sample_rate=self.fft_input_rate, fft_rate=int(self.fft_rate), title="Spectral", ofunc=self.fft_outfunc, xydfunc=self.xydfunc) else: self.scope = ra_waterfallsink.waterfall_sink_c (self, panel, fft_size=int(self.fft_size), sample_rate=self.fft_input_rate, fft_rate=int(self.fft_rate), title="Spectral", ofunc=self.fft_outfunc, size=(1100, 600), xydfunc=self.xydfunc, ref_level=0, span=10) # Set up ephemeris data self.locality = ephem.Observer() self.locality.long = str(options.longitude) self.locality.lat = str(options.latitude) # We make notes about Sunset/Sunrise in Continuum log files self.sun = ephem.Sun() self.sunstate = "??" # Set up stripchart display self.stripsize = int(options.stripsize) if self.setimode == False: self.chart = ra_stripchartsink.stripchart_sink_f (self, panel, stripsize=self.stripsize, title="Continuum", xlabel="LMST Offset (Seconds)", scaling=1.0, ylabel=options.ylabel, divbase=options.divbase) # Set center frequency self.centerfreq = options.freq # Set observing frequency (might be different from actual programmed # RF frequency) if options.observing == 0.0: self.observing = options.freq else: self.observing = options.observing self.bw = input_rate # We setup the first two integrators to produce a fixed integration # Down to 1Hz, with output at 1 samples/sec N = input_rate/5000 # Second stage runs on decimated output of first M = (input_rate/N) # Create taps for first integrator t = range(0,N-1) tapsN = [] for i in t: tapsN.append(1.0/N) # Create taps for second integrator t = range(0,M-1) tapsM = [] for i in t: tapsM.append(1.0/M) # # The 3rd integrator is variable, and user selectable at runtime # This integrator doesn't decimate, but is used to set the # final integration time based on the constant 1Hz input samples # The strip chart is fed at a constant 1Hz rate as a result # # # Call constructors for receive chains # if self.setimode == False: # The three integrators--two FIR filters, and an IIR final filter self.integrator1 = gr.fir_filter_fff (N, tapsN) self.integrator2 = gr.fir_filter_fff (M, tapsM) self.integrator3 = gr.single_pole_iir_filter_ff(1.0) # The detector self.detector = gr.complex_to_mag_squared() # Signal probe self.probe = gr.probe_signal_f(); # # Continuum calibration stuff # x = self.calib_coeff/100.0 self.cal_mult = gr.multiply_const_ff(self.calib_coeff/100.0); self.cal_offs = gr.add_const_ff(self.calib_offset*(x*8000)); if self.use_notches == True: self.compute_notch_taps(self.notches) self.notch_filt = gr.fft_filter_ccc(1, self.notch_taps) # # Start connecting configured modules in the receive chain # # The scope--handle SETI mode if (self.setimode == False): if (self.use_notches == True): self.connect(self.u, self.notch_filt, self.scope) else: self.connect(self.u, self.scope) else: if (self.use_notches == True): self.connect(self.u, self.notch_filt, self.fft_bandpass, self.scope) else: self.connect(self.u, self.fft_bandpass, self.scope)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -