⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 usrp_ra_receiver.py

📁 这是用python语言写的一个数字广播的信号处理工具包。利用它
💻 PY
📖 第 1 页 / 共 3 页
字号:
        flt = "%6.3f" % data        inter = self.decln        integ = self.integ        fc = self.observing        fc = fc / 1000000        bw = self.bw        bw = bw / 1000000        ga = self.gain            now = time.time()            #        # If time to write full header info (saves storage this way)        #        if (now - self.continuum_then > 20):            self.sun.compute(self.locality)            enow = ephem.now()            sun_insky = "Down"            self.sunstate = "Dn"            if ((self.sun.rise_time < enow) and (enow < self.sun.set_time)):               sun_insky = "Up"               self.sunstate = "Up"            self.continuum_then = now                    continuum_file.write(str(ephem.hours(sidtime))+" "+flt+" Dn="+str(inter)+",")            continuum_file.write("Ti="+str(integ)+",Fc="+str(fc)+",Bw="+str(bw))            continuum_file.write(",Ga="+str(ga)+",Sun="+str(sun_insky)+"\n")        else:            continuum_file.write(str(ephem.hours(sidtime))+" "+flt+"\n")            continuum_file.close()        return(data)    def write_spectral_data(self,data,sidtime):            now = time.time()        delta = 10    		        # If time to write out spectral data        # We don't write this out every time, in order to        #   save disk space.  Since the spectral data are        #   typically heavily averaged, writing this data        #   "once in a while" is OK.        #        if (now - self.spectral_then >= delta):            self.spectral_then = now            # Get localtime structure to make filename from            foo = time.localtime()                    pfx = self.prefix            filenamestr = "%s/%04d%02d%02d%02d" % (pfx, foo.tm_year,                foo.tm_mon, foo.tm_mday, foo.tm_hour)                # Open the file            spectral_file = open (filenamestr+".sdat","a")                  # Setup data fields to be written            r = data            inter = self.decln            fc = self.observing            fc = fc / 1000000            bw = self.bw            bw = bw / 1000000            av = self.avg_alpha            # Write those fields            spectral_file.write("data:"+str(ephem.hours(sidtime))+" Dn="+str(inter)+",Fc="+str(fc)+",Bw="+str(bw)+",Av="+str(av))            spectral_file.write (" [ ")            for r in data:                spectral_file.write(" "+str(r))            spectral_file.write(" ]\n")            spectral_file.close()            return(data)            return(data)    def seti_analysis(self,fftbuf,sidtime):        l = len(fftbuf)        x = 0        hits = []        hit_intensities = []        if self.seticounter < self.setitimer:            self.seticounter = self.seticounter + 1            return        else:            self.seticounter = 0        # Run through FFT output buffer, computing standard deviation (Sigma)        avg = 0        # First compute average        for i in range(0,l):            avg = avg + fftbuf[i]        avg = avg / l        sigma = 0.0        # Then compute standard deviation (Sigma)        for i in range(0,l):            d = fftbuf[i] - avg            sigma = sigma + (d*d)        sigma = Numeric.sqrt(sigma/l)        #        # Snarfle through the FFT output buffer again, looking for        #    outlying data points        start_f = self.observing - (self.fft_input_rate/2)        current_f = start_f        l = len(fftbuf)        f_incr = self.fft_input_rate / l        hit = -1        # -nyquist to DC        for i in range(l/2,l):            #            # If current FFT buffer has an item that exceeds the specified            #  sigma            #            if ((fftbuf[i] - avg) > (self.setik * sigma)):                hits.append(current_f)                hit_intensities.append(fftbuf[i])            current_f = current_f + f_incr        # DC to nyquist        for i in range(0,l/2):            #            # If current FFT buffer has an item that exceeds the specified            #  sigma            #            if ((fftbuf[i] - avg) > (self.setik * sigma)):                hits.append(current_f)                hit_intensities.append(fftbuf[i])            current_f = current_f + f_incr        # No hits        if (len(hits) <= 0):            return        #        # OK, so we have some hits in the FFT buffer        #   They'll have a rather substantial gauntlet to run before        #   being declared a real "hit"        #        # Update stats        self.s1hitcounter = self.s1hitcounter + len(hits)        # Weed out buffers with an excessive number of        #   signals above Sigma        if (len(hits) > self.nhits):            return        # Weed out FFT buffers with apparent multiple narrowband signals        #   separated significantly in frequency.  This means that a        #   single signal spanning multiple bins is OK, but a buffer that        #   has multiple, apparently-separate, signals isn't OK.        #        last = hits[0]        ns2 = 1        for i in range(1,len(hits)):            if ((hits[i] - last) > (f_incr*3.0)):                return            last = hits[i]            ns2 = ns2 + 1        self.s2hitcounter = self.s2hitcounter + ns2        #        # Run through all available hit buffers, computing difference between        #   frequencies found there, if they're all within the chirp limits        #   declare a good hit        #        good_hit = False        f_ds = Numeric.zeros(self.nhitlines, Numeric.Float64)        avg_delta = 0        k = 0        for i in range(0,min(len(hits),len(self.hits_array[:,0]))):            f_ds[0] = abs(self.hits_array[i,0] - hits[i])            for j in range(1,len(f_ds)):               f_ds[j] = abs(self.hits_array[i,j] - self.hits_array[i,0])               avg_delta = avg_delta + f_ds[j]               k = k + 1            if (self.seti_isahit (f_ds)):                good_hit = True                self.hitcounter = self.hitcounter + 1                break        if (avg_delta/k < (self.seti_fft_bandwidth/2)):            self.avgdelta = avg_delta / k        # Save 'n shuffle hits        #  Old hit buffers percolate through the hit buffers        #  (there are self.nhitlines of these buffers)        #  and then drop off the end        #  A consequence is that while the nhitlines buffers are filling,        #  you can get some absurd values for self.avgdelta, because some        #  of the buffers are full of zeros        for i in range(self.nhitlines,1):            self.hits_array[:,i] = self.hits_array[:,i-1]            self.hit_intensities[:,i] = self.hit_intensities[:,i-1]        for i in range(0,len(hits)):            self.hits_array[i,0] = hits[i]            self.hit_intensities[i,0] = hit_intensities[i]        # Finally, write the hits/intensities buffer        if (good_hit):            self.write_hits(sidtime)        return    def seti_isahit(self,fdiffs):        truecount = 0        for i in range(0,len(fdiffs)):            if (fdiffs[i] >= self.CHIRP_LOWER and fdiffs[i] <= self.CHIRP_UPPER):                truecount = truecount + 1        if truecount == len(fdiffs):            return (True)        else:            return (False)    def write_hits(self,sidtime):        # Create localtime structure for producing filename        foo = time.localtime()        pfx = self.prefix        filenamestr = "%s/%04d%02d%02d%02d" % (pfx, foo.tm_year,            foo.tm_mon, foo.tm_mday, foo.tm_hour)            # Open the data file, appending        hits_file = open (filenamestr+".seti","a")        # Write sidtime first        hits_file.write(str(ephem.hours(sidtime))+" "+str(self.decln)+" ")        #        # Then write the hits/hit intensities buffers with enough        #   "syntax" to allow parsing by external (not yet written!)        #   "stuff".        #        for i in range(0,self.nhitlines):            hits_file.write(" ")            for j in range(0,self.nhits):                hits_file.write(str(self.hits_array[j,i])+":")                hits_file.write(str(self.hit_intensities[j,i])+",")        hits_file.write("\n")        hits_file.close()        return    def xydfunc(self,xyv):        if self.setimode == True:            return        magn = int(Numeric.log10(self.observing))        if (magn == 6 or magn == 7 or magn == 8):            magn = 6        dfreq = xyv[0] * pow(10.0,magn)        ratio = self.observing / dfreq        vs = 1.0 - ratio        vs *= 299792.0        if magn >= 9:           xhz = "Ghz"        elif magn >= 6:           xhz = "Mhz"        elif magn <= 5:           xhz =  "Khz"        s = "%.6f%s\n%.3fdB" % (xyv[0], xhz, xyv[1])        s2 = "\n%.3fkm/s" % vs        self.myform['spec_data'].set_value(s+s2)    def xydfunc_waterfall(self,pos):        lower = self.observing - (self.seti_fft_bandwidth / 2)        upper = self.observing + (self.seti_fft_bandwidth / 2)        binwidth = self.seti_fft_bandwidth / 1024        s = "%.6fMHz" % ((lower + (pos.x*binwidth)) / 1.0e6)        self.myform['spec_data'].set_value(s)    def toggle_cal(self):        if (self.calstate == True):          self.calstate = False          self.u.write_io(0,0,(1<<15))          self.calibrator.SetLabel("Calibration Source: Off")        else:          self.calstate = True          self.u.write_io(0,(1<<15),(1<<15))          self.calibrator.SetLabel("Calibration Source: On")    def toggle_annotation(self):        if (self.annotate_state == True):          self.annotate_state = False          self.annotation.SetLabel("Annotation: Off")        else:          self.annotate_state = True          self.annotation.SetLabel("Annotation: On")    #    # Turn scanning on/off    # Called-back by "Recording" button    #    def toggle_scanning(self):        # Current scanning?  Flip state        if (self.scanning == True):          self.scanning = False          self.scan_control.SetLabel("Scan: Off")        # Not scanning        else:          self.scanning = True          self.scan_control.SetLabel("Scan: On ")    def set_pd_offset(self,offs):         self.myform['offset'].set_value(offs)         self.calib_offset=offs         x = self.calib_coeff / 100.0         self.cal_offs.set_k(offs*(x*8000))    def set_pd_gain(self,gain):         self.myform['dcgain'].set_value(gain)         self.cal_mult.set_k(gain*0.01)         self.calib_coeff = gain         x = gain/100.0         self.cal_offs.set_k(self.calib_offset*(x*8000))    def compute_notch_taps(self,notchlist):         NOTCH_TAPS = 256         tmptaps = Numeric.zeros(NOTCH_TAPS,Numeric.Complex64)         binwidth = self.bw / NOTCH_TAPS          for i in range(0,NOTCH_TAPS):             tmptaps[i] = complex(1.0,0.0)          for i in notchlist:             diff = i - self.observing             if i == 0:                 break             if (diff > 0):                 idx = diff / binwidth                 idx = int(idx)                 if (idx < 0 or idx > (NOTCH_TAPS/2)):                     break                 tmptaps[idx] = complex(0.0, 0.0)             if (diff < 0):                 idx = -diff / binwidth                 idx = (NOTCH_TAPS/2) - idx                 idx = int(idx+(NOTCH_TAPS/2))                 if (idx < 0 or idx > (NOTCH_TAPS)):                     break                 tmptaps[idx] = complex(0.0, 0.0)         self.notch_taps = FFT.inverse_fft(tmptaps)def main ():    app = stdgui.stdapp(app_flow_graph, "RADIO ASTRONOMY SPECTRAL/CONTINUUM RECEIVER: $Revision: 6341 $", nstatus=1)    app.MainLoop()if __name__ == '__main__':    main ()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -