📄 usrp_psr_receiver.py
字号:
sidtime = self.locality.sidereal_time() # Pick up localtime, for generating filenames foo = time.localtime() # Generate filenames for both data and header file filename = "%04d%02d%02d%02d%02d.padat" % (foo.tm_year, foo.tm_mon, foo.tm_mday, foo.tm_hour, foo.tm_min) hdrfilename = "%04d%02d%02d%02d%02d.pahdr" % (foo.tm_year, foo.tm_mon, foo.tm_mday, foo.tm_hour, foo.tm_min) # Current recording? Flip state if (self.pulse_recording_state == True): self.pulse_recording_state = False self.record_pulse_control.SetLabel("Recording pulses: Off ") self.pulse_recording.close() # Not recording? else: self.pulse_recording_state = True self.record_pulse_control.SetLabel("Recording pulses to: "+filename) # Cause gr_file_sink object to accept new filename # note use of self.prefix--filename prefix from # command line (defaults to ./) # self.pulse_recording.open (self.prefix+filename) # # We open the header file as a regular file, write header data, # then close hdrf = open(self.prefix+hdrfilename, "w") hdrf.write("receiver center frequency: "+str(self.frequency)+"\n") hdrf.write("observing frequency: "+str(self.observing_freq)+"\n") hdrf.write("DM: "+str(self.dm)+"\n") hdrf.write("doppler: "+str(self.doppler)+"\n") hdrf.write("pulse rate: "+str(self.pulse_freq)+"\n") hdrf.write("pulse sps: "+str(self.pulse_freq*self.folding)+"\n") hdrf.write("file sps: "+str(self.folder_input_rate)+"\n") hdrf.write("sidereal: "+str(ephem.hours(sidtime))+"\n") hdrf.write("bandwidth: "+str(self.u.adc_freq() / self.u.decim_rate())+"\n") hdrf.write("sample type: short\n") hdrf.write("sample size: 1\n") hdrf.close() # We get called at startup, and whenever the GUI "Set Folding params" # button is pressed # def set_folding_params(self): if (self.pulse_freq <= 0): return # Compute required sample rate self.sample_rate = int(self.pulse_freq*self.folding) # And the implied decimation rate required_decimation = int(self.folder_input_rate / self.sample_rate) # We also compute a new FFT comb filter, based on the expected # spectral profile of our pulse parameters # # FFT-based comb filter # N_COMB_TAPS=int(self.sample_rate*4) if N_COMB_TAPS > 2000: N_COMB_TAPS = 2000 self.folder_comb_taps = Numeric.zeros(N_COMB_TAPS,Numeric.Complex64) fincr = (self.sample_rate)/float(N_COMB_TAPS) for i in range(0,len(self.folder_comb_taps)): self.folder_comb_taps[i] = complex(0.0, 0.0) freq = 0.0 harmonics = [1.0,2.0,3.0,4.0,5.0,6.0,7.0] for i in range(0,len(self.folder_comb_taps)/2): for j in range(0,len(harmonics)): if abs(freq - harmonics[j]*self.pulse_freq) <= fincr: self.folder_comb_taps[i] = complex(4.0, 0.0) if harmonics[j] == 1.0: self.folder_comb_taps[i] = complex(8.0, 0.0) freq += fincr if self.enable_comb_filter == True: # Set the just-computed FFT comb filter taps self.folder_comb.set_taps(self.folder_comb_taps) # And compute a new decimated bandpass filter, to go in front # of the comb. Primary function is to decimate and filter down # to an exact-ish multiple of the target pulse rate # self.folding_taps = gr.firdes_band_pass (1.0, self.folder_input_rate, 0.10, self.sample_rate/2, 10, gr.firdes.WIN_HAMMING) # Set the computed taps for the bandpass/decimate filter self.folder_bandpass.set_taps (self.folding_taps) # # Record a spectral "hit" of a possible pulsar spectral profile # def record_hit(self,hits, hcavg, hcmax): # Pick up current LMST self.locality.date = ephem.now() sidtime = self.locality.sidereal_time() # Pick up localtime, for generating filenames foo = time.localtime() # Generate filenames for both data and header file hitfilename = "%04d%02d%02d%02d.phit" % (foo.tm_year, foo.tm_mon, foo.tm_mday, foo.tm_hour) hitf = open(self.prefix+hitfilename, "a") hitf.write("receiver center frequency: "+str(self.frequency)+"\n") hitf.write("observing frequency: "+str(self.observing_freq)+"\n") hitf.write("DM: "+str(self.dm)+"\n") hitf.write("doppler: "+str(self.doppler)+"\n") hitf.write("sidereal: "+str(ephem.hours(sidtime))+"\n") hitf.write("bandwidth: "+str(self.u.adc_freq() / self.u.decim_rate())+"\n") hitf.write("spectral peaks: "+str(hits)+"\n") hitf.write("HCM: "+str(hcavg)+" "+str(hcmax)+"\n") hitf.close() # This is a callback used by ra_fftsink.py (passed on creation of # ra_fftsink) # Whenever the user moves the cursor within the FFT display, this # shows the coordinate data # def xydfunc(self,xyv): s = "%.6fHz\n%.3fdB" % (xyv[0], xyv[1]) if self.lowpass >= 500: s = "%.6fHz\n%.3fdB" % (xyv[0]*1000, xyv[1]) self.myform['spec_data'].set_value(s) # This is another callback used by ra_fftsink.py (passed on creation # of ra_fftsink). We pass this as our "calibrator" function, but # we create interesting side-effects in the GUI. # # This function finds peaks in the FFT output data, and reports # on them through the "Best" text object in the GUI # It also computes the Harmonic Compliance Measure (HCM), and displays # that also. # def pulsarfunc(self,d,l): x = range(0,l) incr = float(self.lowpass)/float(l) incr = incr * 2.0 bestdb = -50.0 bestfreq = 0.0 avg = 0 dcnt = 0 # # First, we need to find the average signal level # for i in x: if (i * incr) > self.lowest_freq and (i*incr) < (self.lowpass-2): avg += d[i] dcnt += 1 # Set average signal level avg /= dcnt s2=" " findcnt = 0 # # Then we find candidates that are greater than the user-supplied # threshold. # # We try to cluster "hits" whose whole-number frequency is the # same, and compute an average "hit" frequency. # lastint = 0 hits=[] intcnt = 0 freqavg = 0 for i in x: freq = i*incr # If frequency within bounds, and the (dB-avg) value is above our # threshold if freq > self.lowest_freq and freq < self.lowpass-2 and (d[i] - avg) > self.threshold: # If we're finding a new whole-number frequency if lastint != int(freq): # Record "center" of this hit, if this is a new hit if lastint != 0: s2 += "%5.3fHz " % (freqavg/intcnt) hits.append(freqavg/intcnt) findcnt += 1 lastint = int(freq) intcnt = 1 freqavg = freq else: intcnt += 1 freqavg += freq if (findcnt >= 14): break if intcnt > 1: s2 += "%5.3fHz " % (freqavg/intcnt) hits.append(freqavg/intcnt) # # Compute the HCM, by dividing each of the "hits" by each of the # other hits, and comparing the difference between a "perfect" # harmonic, and the observed frequency ratio. # measure = 0 max_measure=0 mcnt = 0 avg_dist = 0 acnt = 0 for i in range(1,len(hits)): meas = hits[i]/hits[0] - int(hits[i]/hits[0]) if abs((hits[i]-hits[i-1])-hits[0]) < 0.1: avg_dist += hits[i]-hits[i-1] acnt += 1 if meas > 0.98 and meas < 1.0: meas = 1.0 - meas meas *= hits[0] if meas >= max_measure: max_measure = meas measure += meas mcnt += 1 if mcnt > 0: measure /= mcnt if acnt > 0: avg_dist /= acnt if len(hits) > 1: measure /= mcnt s3="\nHCM: Avg %5.3fHz(%d) Max %5.3fHz Dist %5.3fHz(%d)" % (measure,mcnt,max_measure, avg_dist, acnt) if max_measure < 0.5 and len(hits) >= 2: self.record_hit(hits, measure, max_measure) self.avg_dist = avg_dist else: s3="\nHCM: --" s4="\nAvg dB: %4.2f" % avg self.myform['best_pulse'].set_value("("+s2+")"+s3+s4) # Since we are nominally a calibrator function for ra_fftsink, we # simply return what they sent us, untouched. A "real" calibrator # function could monkey with the data before returning it to the # FFT display function. return(d) # # Callback for the "DM" gui object # # We call compute_dispfilter() as appropriate to compute a new filter, # and then set that new filter into self.dispfilt. # def set_dm(self,dm): self.dm = dm ntaps = self.compute_disp_ntaps (self.dm, self.bw, self.observing_freq) self.disp_taps = Numeric.zeros(ntaps, Numeric.Complex64) self.compute_dispfilter(self.dm,self.doppler,self.bw,self.observing_freq) self.dispfilt.set_taps(self.disp_taps) self.myform['DM'].set_value(dm) return(dm) # # Callback for the "Doppler" gui object # # We call compute_dispfilter() as appropriate to compute a new filter, # and then set that new filter into self.dispfilt. # def set_doppler(self,doppler): self.doppler = doppler ntaps = self.compute_disp_ntaps (self.dm, self.bw, self.observing_freq) self.disp_taps = Numeric.zeros(ntaps, Numeric.Complex64) self.compute_dispfilter(self.dm,self.doppler,self.bw,self.observing_freq) self.dispfilt.set_taps(self.disp_taps) self.myform['Doppler'].set_value(doppler) return(doppler) # # Compute a de-dispersion filter # From Hankins, et al, 1975 # # This code translated from dedisp_filter.c from Swinburne # pulsar software repository # def compute_dispfilter(self,dm,doppler,bw,centerfreq): npts = len(self.disp_taps) tmp = Numeric.zeros(npts, Numeric.Complex64) M_PI = 3.14159265358 DM = dm/2.41e-10 # # Because astronomers are a crazy bunch, the "standard" calcultion # is in Mhz, rather than Hz # centerfreq = centerfreq / 1.0e6 bw = bw / 1.0e6 isign = int(bw / abs (bw)) # Center frequency may be doppler shifted cfreq = centerfreq / doppler # As well as the bandwidth.. bandwidth = bw / doppler # Bandwidth divided among bins binwidth = bandwidth / npts # Delay is an "extra" parameter, in usecs, and largely # untested in the Swinburne code. delay = 0.0 # This determines the coefficient of the frequency response curve # Linear in DM, but quadratic in center frequency coeff = isign * 2.0*M_PI * DM / (cfreq*cfreq) # DC to nyquist/2 n = 0 for i in range(0,int(npts/2)): freq = (n + 0.5) * binwidth phi = coeff*freq*freq/(cfreq+freq) + (2.0*M_PI*freq*delay) tmp[i] = complex(math.cos(phi), math.sin(phi)) n += 1 # -nyquist/2 to DC n = int(npts/2) n *= -1 for i in range(int(npts/2),npts): freq = (n + 0.5) * binwidth phi = coeff*freq*freq/(cfreq+freq) + (2.0*M_PI*freq*delay) tmp[i] = complex(math.cos(phi), math.sin(phi)) n += 1 self.disp_taps = FFT.inverse_fft(tmp) return(self.disp_taps) # # Compute minimum number of taps required in de-dispersion FFT filter # def compute_disp_ntaps(self,dm,bw,freq): # # Dt calculations are in Mhz, rather than Hz # crazy astronomers.... mbw = bw/1.0e6 mfreq = freq/1.0e6 f_lower = mfreq-(mbw/2) f_upper = mfreq+(mbw/2) # Compute smear time Dt = dm/2.41e-4 * (1.0/(f_lower*f_lower)-1.0/(f_upper*f_upper)) # ntaps is now bandwidth*smeartime # Should be bandwidth*smeartime*2, but the Gnu Radio FFT filter # already expands it by a factor of 2 ntaps = bw*Dt if ntaps < 64: ntaps = 64 return(int(ntaps))def main (): app = stdgui.stdapp(app_flow_graph, "RADIO ASTRONOMY PULSAR RECEIVER: $Revision: 6044 $", nstatus=1) app.MainLoop()if __name__ == '__main__': main ()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -