⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 usrp_ra_receiver.py

📁 这是用python语言写的一个数字广播的信号处理工具包。利用它
💻 PY
📖 第 1 页 / 共 3 页
字号:
#!/usr/bin/env python## Copyright 2004,2005 Free Software Foundation, Inc.# # This file is part of GNU Radio# # GNU Radio is free software; you can redistribute it and/or modify# it under the terms of the GNU General Public License as published by# the Free Software Foundation; either version 3, or (at your option)# any later version.# # GNU Radio is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the# GNU General Public License for more details.# # You should have received a copy of the GNU General Public License# along with GNU Radio; see the file COPYING.  If not, write to# the Free Software Foundation, Inc., 51 Franklin Street,# Boston, MA 02110-1301, USA.# from gnuradio import gr, grufrom gnuradio import usrpfrom usrpm import usrp_dbidfrom gnuradio import eng_notationfrom gnuradio.eng_option import eng_optionfrom gnuradio.wxgui import stdgui, ra_fftsink, ra_stripchartsink, ra_waterfallsink, form, slider, waterfallsinkfrom optparse import OptionParserimport wximport sysimport Numeric import timeimport FFTimport ephemclass continuum_calibration(gr.feval_dd):    def eval(self, x):        str = globals()["calibration_codelet"]        exec(str)        return(x)class app_flow_graph(stdgui.gui_flow_graph):    def __init__(self, frame, panel, vbox, argv):        stdgui.gui_flow_graph.__init__(self)        self.frame = frame        self.panel = panel                parser = OptionParser(option_class=eng_option)        parser.add_option("-R", "--rx-subdev-spec", type="subdev", default=(0, 0),                          help="select USRP Rx side A or B (default=A)")        parser.add_option("-d", "--decim", type="int", default=16,                          help="set fgpa decimation rate to DECIM [default=%default]")        parser.add_option("-f", "--freq", type="eng_float", default=None,                          help="set frequency to FREQ", metavar="FREQ")	parser.add_option("-a", "--avg", type="eng_float", default=1.0,		help="set spectral averaging alpha")	parser.add_option("-i", "--integ", type="eng_float", default=1.0,		help="set integration time")        parser.add_option("-g", "--gain", type="eng_float", default=None,                          help="set gain in dB (default is midpoint)")        parser.add_option("-l", "--reflevel", type="eng_float", default=30.0,                          help="Set Total power reference level")        parser.add_option("-y", "--division", type="eng_float", default=0.5,                          help="Set Total power Y division size")        parser.add_option("-e", "--longitude", type="eng_float", default=-76.02,                          help="Set Observer Longitude")        parser.add_option("-c", "--latitude", type="eng_float", default=44.85,                          help="Set Observer Latitude")        parser.add_option("-o", "--observing", type="eng_float", default=0.0,                        help="Set observing frequency")        parser.add_option("-x", "--ylabel", default="dB", help="Y axis label")         parser.add_option("-z", "--divbase", type="eng_float", default=0.025, help="Y Division increment base")         parser.add_option("-v", "--stripsize", type="eng_float", default=2400, help="Size of stripchart, in 2Hz samples")         parser.add_option("-F", "--fft_size", type="eng_float", default=1024, help="Size of FFT")        parser.add_option("-N", "--decln", type="eng_float", default=999.99, help="Observing declination")        parser.add_option("-X", "--prefix", default="./")        parser.add_option("-M", "--fft_rate", type="eng_float", default=8.0, help="FFT Rate")        parser.add_option("-A", "--calib_coeff", type="eng_float", default=1.0, help="Calibration coefficient")        parser.add_option("-B", "--calib_offset", type="eng_float", default=0.0, help="Calibration coefficient")        parser.add_option("-W", "--waterfall", action="store_true", default=False, help="Use Waterfall FFT display")        parser.add_option("-S", "--setimode", action="store_true", default=False, help="Enable SETI processing of spectral data")        parser.add_option("-K", "--setik", type="eng_float", default=1.5, help="K value for SETI analysis")        parser.add_option("-T", "--setibandwidth", type="eng_float", default=12500, help="Instantaneous SETI observing bandwidth--must be divisor of 250Khz")        parser.add_option("-n", "--notches", action="store_true", default=False,            help="Notches appear after all other arguments")        parser.add_option("-Q", "--seti_range", type="eng_float", default=1.0e6, help="Total scan width, in Hz for SETI scans")        (options, args) = parser.parse_args()        self.notches = Numeric.zeros(64,Numeric.Float64)        if len(args) != 0 and options.notches == False:            parser.print_help()            sys.exit(1)        if len(args) == 0 and options.notches != False:            parser.print_help()            sys.exit()        self.use_notches = options.notches        # Get notch locations        j = 0        for i in args:            self.notches[j] = float(i)            j = j+1        self.notch_count = j        self.show_debug_info = True        # Pick up waterfall option        self.waterfall = options.waterfall        # SETI mode stuff        self.setimode = options.setimode        self.seticounter = 0        self.setik = options.setik        self.seti_fft_bandwidth = int(options.setibandwidth)        # Calculate binwidth        binwidth = self.seti_fft_bandwidth / options.fft_size        # Use binwidth, and knowledge of likely chirp rates to set reasonable        #  values for SETI analysis code.   We assume that SETI signals will        #  chirp at somewhere between 0.10Hz/sec and 0.25Hz/sec.        #        # upper_limit is the "worst case"--that is, the case for which we have        #  to wait the longest to actually see any drift, due to the quantizing        #  on FFT bins.        upper_limit = binwidth / 0.10        self.setitimer = int(upper_limit * 2.00)        self.scanning = True        # Calculate the CHIRP values based on Hz/sec        self.CHIRP_LOWER = 0.10 * self.setitimer        self.CHIRP_UPPER = 0.25 * self.setitimer        # Reset hit counters to 0        self.hitcounter = 0        self.s1hitcounter = 0        self.s2hitcounter = 0        self.avgdelta = 0        # We scan through 2Mhz of bandwidth around the chosen center freq        self.seti_freq_range = options.seti_range        # Calculate lower edge        self.setifreq_lower = options.freq - (self.seti_freq_range/2)        self.setifreq_current = options.freq        # Calculate upper edge        self.setifreq_upper = options.freq + (self.seti_freq_range/2)        # Maximum "hits" in a line        self.nhits = 20        # Number of lines for analysis        self.nhitlines = 4        # We change center frequencies based on nhitlines and setitimer        self.setifreq_timer = self.setitimer * (self.nhitlines * 5)        # Create actual timer        self.seti_then = time.time()        # The hits recording array        self.hits_array = Numeric.zeros((self.nhits,self.nhitlines), Numeric.Float64)        self.hit_intensities = Numeric.zeros((self.nhits,self.nhitlines), Numeric.Float64)        # Calibration coefficient and offset        self.calib_coeff = options.calib_coeff        self.calib_offset = options.calib_offset        if self.calib_offset < -750:            self.calib_offset = -750        if self.calib_offset > 750:            self.calib_offset = 750        if self.calib_coeff < 1:            self.calib_coeff = 1        if self.calib_coeff > 100:            self.calib_coeff = 100        self.integ = options.integ        self.avg_alpha = options.avg        self.gain = options.gain        self.decln = options.decln        # Set initial values for datalogging timed-output        self.continuum_then = time.time()        self.spectral_then = time.time()              # build the graph        #        # If SETI mode, we always run at maximum USRP decimation        #        if (self.setimode):            options.decim = 256        self.u = usrp.source_c(decim_rate=options.decim)        self.u.set_mux(usrp.determine_rx_mux_value(self.u, options.rx_subdev_spec))        # Set initial declination        self.decln = options.decln        # determine the daughterboard subdevice we're using        self.subdev = usrp.selected_subdev(self.u, options.rx_subdev_spec)        self.cardtype = self.subdev.dbid()        input_rate = self.u.adc_freq() / self.u.decim_rate()        #        # Set prefix for data files        #        self.prefix = options.prefix        #        # The lower this number, the fewer sample frames are dropped        #  in computing the FFT.  A sampled approach is taken to        #  computing the FFT of the incoming data, which reduces        #  sensitivity.  Increasing sensitivity inreases CPU loading.        #        self.fft_rate = options.fft_rate        self.fft_size = int(options.fft_size)        # This buffer is used to remember the most-recent FFT display        #   values.  Used later by self.write_spectral_data() to write        #   spectral data to datalogging files, and by the SETI analysis        #   function.        #        self.fft_outbuf = Numeric.zeros(self.fft_size, Numeric.Float64)        #        # If SETI mode, only look at seti_fft_bandwidth        #   at a time.        #        if (self.setimode):            self.fft_input_rate = self.seti_fft_bandwidth            #            # Build a decimating bandpass filter            #            self.fft_input_taps = gr.firdes.complex_band_pass (1.0,               input_rate,               -(int(self.fft_input_rate/2)), int(self.fft_input_rate/2), 200,               gr.firdes.WIN_HAMMING, 0)            #            # Compute required decimation factor            #            decimation = int(input_rate/self.fft_input_rate)            self.fft_bandpass = gr.fir_filter_ccc (decimation,                 self.fft_input_taps)        else:            self.fft_input_rate = input_rate        # Set up FFT display        if self.waterfall == False:           self.scope = ra_fftsink.ra_fft_sink_c (self, panel,                fft_size=int(self.fft_size), sample_rate=self.fft_input_rate,               fft_rate=int(self.fft_rate), title="Spectral",                 ofunc=self.fft_outfunc, xydfunc=self.xydfunc)        else:            self.scope = ra_waterfallsink.waterfall_sink_c (self, panel,                fft_size=int(self.fft_size), sample_rate=self.fft_input_rate,                fft_rate=int(self.fft_rate), title="Spectral", ofunc=self.fft_outfunc, size=(1100, 600), xydfunc=self.xydfunc, ref_level=0, span=10)        # Set up ephemeris data        self.locality = ephem.Observer()        self.locality.long = str(options.longitude)        self.locality.lat = str(options.latitude)        # We make notes about Sunset/Sunrise in Continuum log files        self.sun = ephem.Sun()        self.sunstate = "??"        # Set up stripchart display        self.stripsize = int(options.stripsize)        if self.setimode == False:            self.chart = ra_stripchartsink.stripchart_sink_f (self, panel,                stripsize=self.stripsize,                title="Continuum",                xlabel="LMST Offset (Seconds)",                scaling=1.0, ylabel=options.ylabel,                divbase=options.divbase)        # Set center frequency        self.centerfreq = options.freq        # Set observing frequency (might be different from actual programmed        #    RF frequency)        if options.observing == 0.0:            self.observing = options.freq        else:            self.observing = options.observing        self.bw = input_rate        # We setup the first two integrators to produce a fixed integration        # Down to 1Hz, with output at 1 samples/sec        N = input_rate/5000        # Second stage runs on decimated output of first        M = (input_rate/N)        # Create taps for first integrator        t = range(0,N-1)        tapsN = []        for i in t:             tapsN.append(1.0/N)        # Create taps for second integrator        t = range(0,M-1)        tapsM = []        for i in t:            tapsM.append(1.0/M)        #        # The 3rd integrator is variable, and user selectable at runtime        # This integrator doesn't decimate, but is used to set the        #  final integration time based on the constant 1Hz input samples        # The strip chart is fed at a constant 1Hz rate as a result        #        #        # Call constructors for receive chains        #        if self.setimode == False:            # The three integrators--two FIR filters, and an IIR final filter            self.integrator1 = gr.fir_filter_fff (N, tapsN)            self.integrator2 = gr.fir_filter_fff (M, tapsM)            self.integrator3 = gr.single_pole_iir_filter_ff(1.0)                # The detector            self.detector = gr.complex_to_mag_squared()                # Signal probe            self.probe = gr.probe_signal_f();                #            # Continuum calibration stuff            #            x = self.calib_coeff/100.0            self.cal_mult = gr.multiply_const_ff(self.calib_coeff/100.0);            self.cal_offs = gr.add_const_ff(self.calib_offset*(x*8000));            if self.use_notches == True:                self.compute_notch_taps(self.notches)                self.notch_filt = gr.fft_filter_ccc(1, self.notch_taps)        #        # Start connecting configured modules in the receive chain        #        # The scope--handle SETI mode        if (self.setimode == False):            if (self.use_notches == True):                self.connect(self.u, self.notch_filt, self.scope)            else:                self.connect(self.u, self.scope)        else:            if (self.use_notches == True):                self.connect(self.u, self.notch_filt,                     self.fft_bandpass, self.scope)            else:                self.connect(self.u, self.fft_bandpass, self.scope)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -