⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 usrp_psr_receiver.py

📁 这是用python语言写的一个数字广播的信号处理工具包。利用它
💻 PY
📖 第 1 页 / 共 3 页
字号:
        sidtime = self.locality.sidereal_time()        # Pick up localtime, for generating filenames        foo = time.localtime()        # Generate filenames for both data and header file        filename = "%04d%02d%02d%02d%02d.padat" % (foo.tm_year, foo.tm_mon,           foo.tm_mday, foo.tm_hour, foo.tm_min)        hdrfilename = "%04d%02d%02d%02d%02d.pahdr" % (foo.tm_year, foo.tm_mon,           foo.tm_mday, foo.tm_hour, foo.tm_min)        # Current recording?  Flip state        if (self.pulse_recording_state == True):          self.pulse_recording_state = False          self.record_pulse_control.SetLabel("Recording pulses: Off                           ")          self.pulse_recording.close()        # Not recording?        else:          self.pulse_recording_state = True          self.record_pulse_control.SetLabel("Recording pulses to: "+filename)          # Cause gr_file_sink object to accept new filename          #   note use of self.prefix--filename prefix from          #   command line (defaults to ./)          #          self.pulse_recording.open (self.prefix+filename)          #          # We open the header file as a regular file, write header data,          #   then close          hdrf = open(self.prefix+hdrfilename, "w")          hdrf.write("receiver center frequency: "+str(self.frequency)+"\n")          hdrf.write("observing frequency: "+str(self.observing_freq)+"\n")          hdrf.write("DM: "+str(self.dm)+"\n")          hdrf.write("doppler: "+str(self.doppler)+"\n")          hdrf.write("pulse rate: "+str(self.pulse_freq)+"\n")          hdrf.write("pulse sps: "+str(self.pulse_freq*self.folding)+"\n")          hdrf.write("file sps: "+str(self.folder_input_rate)+"\n")          hdrf.write("sidereal: "+str(ephem.hours(sidtime))+"\n")          hdrf.write("bandwidth: "+str(self.u.adc_freq() / self.u.decim_rate())+"\n")          hdrf.write("sample type: short\n")          hdrf.write("sample size: 1\n")          hdrf.close()    # We get called at startup, and whenever the GUI "Set Folding params"    #   button is pressed    #    def set_folding_params(self):        if (self.pulse_freq <= 0):            return        # Compute required sample rate        self.sample_rate = int(self.pulse_freq*self.folding)        # And the implied decimation rate        required_decimation = int(self.folder_input_rate / self.sample_rate)        # We also compute a new FFT comb filter, based on the expected        #  spectral profile of our pulse parameters        #        # FFT-based comb filter        #        N_COMB_TAPS=int(self.sample_rate*4)        if N_COMB_TAPS > 2000:            N_COMB_TAPS = 2000        self.folder_comb_taps = Numeric.zeros(N_COMB_TAPS,Numeric.Complex64)        fincr = (self.sample_rate)/float(N_COMB_TAPS)        for i in range(0,len(self.folder_comb_taps)):            self.folder_comb_taps[i] = complex(0.0, 0.0)        freq = 0.0        harmonics = [1.0,2.0,3.0,4.0,5.0,6.0,7.0]        for i in range(0,len(self.folder_comb_taps)/2):            for j in range(0,len(harmonics)):                 if abs(freq - harmonics[j]*self.pulse_freq) <= fincr:                     self.folder_comb_taps[i] = complex(4.0, 0.0)                     if harmonics[j] == 1.0:                         self.folder_comb_taps[i] = complex(8.0, 0.0)            freq += fincr        if self.enable_comb_filter == True:            # Set the just-computed FFT comb filter taps            self.folder_comb.set_taps(self.folder_comb_taps)        # And compute a new decimated bandpass filter, to go in front        #   of the comb.  Primary function is to decimate and filter down        #   to an exact-ish multiple of the target pulse rate        #        self.folding_taps = gr.firdes_band_pass (1.0, self.folder_input_rate,            0.10, self.sample_rate/2, 10,             gr.firdes.WIN_HAMMING)        # Set the computed taps for the bandpass/decimate filter        self.folder_bandpass.set_taps (self.folding_taps)    #    # Record a spectral "hit" of a possible pulsar spectral profile    #    def record_hit(self,hits, hcavg, hcmax):        # Pick up current LMST        self.locality.date = ephem.now()        sidtime = self.locality.sidereal_time()        # Pick up localtime, for generating filenames        foo = time.localtime()        # Generate filenames for both data and header file        hitfilename = "%04d%02d%02d%02d.phit" % (foo.tm_year, foo.tm_mon,           foo.tm_mday, foo.tm_hour)        hitf = open(self.prefix+hitfilename, "a")        hitf.write("receiver center frequency: "+str(self.frequency)+"\n")        hitf.write("observing frequency: "+str(self.observing_freq)+"\n")        hitf.write("DM: "+str(self.dm)+"\n")        hitf.write("doppler: "+str(self.doppler)+"\n")        hitf.write("sidereal: "+str(ephem.hours(sidtime))+"\n")        hitf.write("bandwidth: "+str(self.u.adc_freq() / self.u.decim_rate())+"\n")        hitf.write("spectral peaks: "+str(hits)+"\n")        hitf.write("HCM: "+str(hcavg)+" "+str(hcmax)+"\n")        hitf.close()    # This is a callback used by ra_fftsink.py (passed on creation of    #   ra_fftsink)    # Whenever the user moves the cursor within the FFT display, this    #   shows the coordinate data    #    def xydfunc(self,xyv):        s = "%.6fHz\n%.3fdB" % (xyv[0], xyv[1])        if self.lowpass >= 500:            s = "%.6fHz\n%.3fdB" % (xyv[0]*1000, xyv[1])                self.myform['spec_data'].set_value(s)    # This is another callback used by ra_fftsink.py (passed on creation    #   of ra_fftsink).  We pass this as our "calibrator" function, but    #   we create interesting side-effects in the GUI.    #    # This function finds peaks in the FFT output data, and reports    #  on them through the "Best" text object in the GUI    #  It also computes the Harmonic Compliance Measure (HCM), and displays    #  that also.    #    def pulsarfunc(self,d,l):       x = range(0,l)       incr = float(self.lowpass)/float(l)       incr = incr * 2.0       bestdb = -50.0       bestfreq = 0.0       avg = 0       dcnt = 0       #       # First, we need to find the average signal level       #       for i in x:           if (i * incr) > self.lowest_freq and (i*incr) < (self.lowpass-2):               avg += d[i]               dcnt += 1       # Set average signal level       avg /= dcnt       s2=" "       findcnt = 0       #       # Then we find candidates that are greater than the user-supplied       #   threshold.       #       # We try to cluster "hits" whose whole-number frequency is the       #   same, and compute an average "hit" frequency.       #       lastint = 0       hits=[]       intcnt = 0       freqavg = 0       for i in x:           freq = i*incr           # If frequency within bounds, and the (dB-avg) value is above our           #   threshold           if freq > self.lowest_freq and freq < self.lowpass-2 and (d[i] - avg) > self.threshold:               # If we're finding a new whole-number frequency               if lastint != int(freq):                   # Record "center" of this hit, if this is a new hit                   if lastint != 0:                       s2 += "%5.3fHz " % (freqavg/intcnt)                       hits.append(freqavg/intcnt)                       findcnt += 1                   lastint = int(freq)                   intcnt = 1                   freqavg = freq               else:                   intcnt += 1                   freqavg += freq           if (findcnt >= 14):               break       if intcnt > 1:           s2 += "%5.3fHz " % (freqavg/intcnt)           hits.append(freqavg/intcnt)       #       # Compute the HCM, by dividing each of the "hits" by each of the       #   other hits, and comparing the difference between a "perfect"       #   harmonic, and the observed frequency ratio.       #       measure = 0       max_measure=0       mcnt = 0       avg_dist = 0       acnt = 0       for i in range(1,len(hits)):           meas = hits[i]/hits[0] - int(hits[i]/hits[0])           if abs((hits[i]-hits[i-1])-hits[0]) < 0.1:               avg_dist += hits[i]-hits[i-1]               acnt += 1           if meas > 0.98 and meas < 1.0:               meas = 1.0 - meas           meas *= hits[0]           if meas >= max_measure:               max_measure = meas           measure += meas           mcnt += 1       if mcnt > 0:           measure /= mcnt           if acnt > 0:               avg_dist /= acnt       if len(hits) > 1:           measure /= mcnt           s3="\nHCM: Avg %5.3fHz(%d) Max %5.3fHz Dist %5.3fHz(%d)" % (measure,mcnt,max_measure, avg_dist, acnt)           if max_measure < 0.5 and len(hits) >= 2:               self.record_hit(hits, measure, max_measure)               self.avg_dist = avg_dist       else:           s3="\nHCM: --"       s4="\nAvg dB: %4.2f" % avg       self.myform['best_pulse'].set_value("("+s2+")"+s3+s4)       # Since we are nominally a calibrator function for ra_fftsink, we       #  simply return what they sent us, untouched.  A "real" calibrator       #  function could monkey with the data before returning it to the       #  FFT display function.       return(d)    #    # Callback for the "DM" gui object    #    # We call compute_dispfilter() as appropriate to compute a new filter,    #   and then set that new filter into self.dispfilt.    #    def set_dm(self,dm):       self.dm = dm       ntaps = self.compute_disp_ntaps (self.dm, self.bw, self.observing_freq)       self.disp_taps = Numeric.zeros(ntaps, Numeric.Complex64)       self.compute_dispfilter(self.dm,self.doppler,self.bw,self.observing_freq)       self.dispfilt.set_taps(self.disp_taps)       self.myform['DM'].set_value(dm)       return(dm)    #    # Callback for the "Doppler" gui object    #    # We call compute_dispfilter() as appropriate to compute a new filter,    #   and then set that new filter into self.dispfilt.    #    def set_doppler(self,doppler):       self.doppler = doppler       ntaps = self.compute_disp_ntaps (self.dm, self.bw, self.observing_freq)       self.disp_taps = Numeric.zeros(ntaps, Numeric.Complex64)       self.compute_dispfilter(self.dm,self.doppler,self.bw,self.observing_freq)       self.dispfilt.set_taps(self.disp_taps)       self.myform['Doppler'].set_value(doppler)       return(doppler)    #    # Compute a de-dispersion filter    #  From Hankins, et al, 1975    #    # This code translated from dedisp_filter.c from Swinburne    #   pulsar software repository    #    def compute_dispfilter(self,dm,doppler,bw,centerfreq):        npts = len(self.disp_taps)        tmp = Numeric.zeros(npts, Numeric.Complex64)        M_PI = 3.14159265358        DM = dm/2.41e-10        #        # Because astronomers are a crazy bunch, the "standard" calcultion        #   is in Mhz, rather than Hz        #        centerfreq = centerfreq / 1.0e6        bw = bw / 1.0e6                isign = int(bw / abs (bw))                # Center frequency may be doppler shifted        cfreq     = centerfreq / doppler        # As well as the bandwidth..        bandwidth = bw / doppler        # Bandwidth divided among bins        binwidth  = bandwidth / npts        # Delay is an "extra" parameter, in usecs, and largely        #  untested in the Swinburne code.        delay = 0.0                # This determines the coefficient of the frequency response curve        # Linear in DM, but quadratic in center frequency        coeff = isign * 2.0*M_PI * DM / (cfreq*cfreq)                # DC to nyquist/2        n = 0        for i in range(0,int(npts/2)):            freq = (n + 0.5) * binwidth            phi = coeff*freq*freq/(cfreq+freq) + (2.0*M_PI*freq*delay)            tmp[i] = complex(math.cos(phi), math.sin(phi))            n += 1        # -nyquist/2 to DC        n = int(npts/2)        n *= -1        for i in range(int(npts/2),npts):            freq = (n + 0.5) * binwidth            phi = coeff*freq*freq/(cfreq+freq) + (2.0*M_PI*freq*delay)            tmp[i] = complex(math.cos(phi), math.sin(phi))            n += 1                self.disp_taps = FFT.inverse_fft(tmp)        return(self.disp_taps)    #    # Compute minimum number of taps required in de-dispersion FFT filter    #    def compute_disp_ntaps(self,dm,bw,freq):        #        # Dt calculations are in Mhz, rather than Hz        #    crazy astronomers....        mbw = bw/1.0e6        mfreq = freq/1.0e6        f_lower = mfreq-(mbw/2)        f_upper = mfreq+(mbw/2)        # Compute smear time        Dt = dm/2.41e-4 * (1.0/(f_lower*f_lower)-1.0/(f_upper*f_upper))        # ntaps is now bandwidth*smeartime        # Should be bandwidth*smeartime*2, but the Gnu Radio FFT filter        #   already expands it by a factor of 2        ntaps = bw*Dt        if ntaps < 64:            ntaps = 64        return(int(ntaps))def main ():    app = stdgui.stdapp(app_flow_graph, "RADIO ASTRONOMY PULSAR RECEIVER: $Revision: 6044 $", nstatus=1)    app.MainLoop()if __name__ == '__main__':    main ()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -