📄 ts_smooth.py
字号:
for x in range(1, l): p = min(x, span) if p <= span: sum = 0.0 for y in range(p): sum += d[x-y] else: sum = sum - d[x-p] + d[x] sm[x][1] = sum/p return sm ############################################################################## def smooth_ema(self, d, args): # # Single exponential moving average # a = args[0] l = len(d) sm = deepcopy(d) d = [e[1] for e in d] if not 0 < a <= 1: raise SmoothError('ema smooth: invalid alpha %f (0<alpha<=1)' % (a)) print 'performing ema smooth, alpha %f' % (a) se = a le = 1.0 - a # get something sensible for first value n = min(4, l) acc = 0.0 for x in range(n): acc += d[x] sm[0][1] = ((acc/n)*le) + (sm[0][1]*se) for x in range(1, l): sm[x][1] = d[x]*se + sm[x-1][1]*le return sm ############################################################################## def han(self, d, args): # # Hanning smooth # sm = deepcopy(d) l = len(d) if l < 3: raise SmoothError('han: need > 3 data points') print 'hanning' d = [e[1] for e in d] sm[0][1] = 0.75*d[0] + 0.25*d[1] for x in range(1, l-1): sm[x][1] = 0.25*d[x-1] + 0.5*d[x] + 0.25*d[x+1] sm[l-1][1] = 0.25*d[l-2] + 0.75*d[l-1] return sm ############################################################################## def smooth_sft(self, d, args): # # FFT - based smooth (low-pass filter) # def four1(d, nn, isign): n = nn << 1 j = 1 for i in range(1, n, 2): if j > i: tmp = d[j] d[j] = d[i] d[i] = tmp tmp = d[j+1] d[j+1] = d[i+1] d[i+1] = tmp m = n/2 while j > m >= 2: j -= m; m >> 1 j += m mmax = 2 while n > mmax: istep = mmax << 1 theta = (pi*2)/(isign*mmax) wtmp = sin(0.5*theta) wpr = -2.0*wtmp*wtmp wpi = sin(theta) wr = 1.0 wi = 0.0 for m in range(1, mmax, 2): for i in range(m, n+1, istep): j = i + mmax print j tmpr = wr*d[j] - wi*d[j+1] tmpi = wr*d[j+1] + wi*d[j] d[j] = d[i] - tmpr d[j+1] = d[i+1] - tmpi d[i] += tmpr d[i+1] += tmpi wtmp = wr wr = wr*wpr - wi*wpi + wr wi = wi*wpr + wtmp*wpi + wi mmax = istep def realfft(d, n, isign): theta = pi/n c1 = 0.5 if isign == 1: c2 = -0.5 four1(d, n, 1) else: c2 = 0.5 theta = -theta wpr = -2.0*pow(sin(0.5*theta), 2) wpi = sin(theta) wr = 1.0 + wpr wi = wpi np3 = 2*n+3 for i in range(2, n/2+1): i1 = 2*i-1 i2 = i1 + 1 i3 = np3 - i2 #i3 = n-2*i i4 = i3 + 1 #print i1, i2, i3, i4 h1r = c1*(d[i1] + d[i3]) h1i = c1*(d[i2] - d[i4]) h2r = -c2*(d[i2] + d[i4]) h2i = c2*(d[i1] - d[i3]) d[i1] = h1r + wr*h2r - wi*h2i d[i2] = h1i + wr*h2i + wi*h2r d[i3] = h1r - wr*h2r + wi*h2i d[i4] = -h1i + wr*h2i + wi*h2r wtmp = wr wr = wr*wpr - wi*wpi + wr wi = wi*wpr + wtmp*wpi + wi if isign == 1: h1r = d[1] d[1] = h1r + d[2] d[2] = h1r - d[2] else: h1r = d[1] d[1] = c1*(h1r + d[2]) d[2] = c1*(h1r - d[2]) four1(d, n, -1) span = args[0] sm = deepcopy(d) d = [e[1] for e in d] n = len(d) d.insert(0, 0.0) # calculate o/a size of working array (s >= 2**x >= n + 2*span) l = n + 1 + int(floor((2*span)+0.5)) m = 2 while m < l: m *= 2 for i in range(m-n): d.append(0.0) print len(d) # # here's the smooth # cnst = pow(span/m, 2) d1 = d[1] dn = d[n] rn1 = 1.0/(n-1.0) for i in range(1, n+1): d[i] -= rn1*(d1*(n-i)+dn*(i-1)) mo2 = m/2 realfft(d, mo2, 1) d[1] /= mo2 fac = 1.0 #print cnst for i in range(1, mo2): k = (2*i+1) print k, fac if fac != 0.0: fac = (1.0 - cnst*i*i)/mo2 if fac < 0.0: fac = 0.0 d[k] *= fac d[k+1] *= fac else: d[k] = 0.0 d[k+1] = 0.0 fac = (1.0 - (0.25*span*span))/mo2 if fac < 0.0: fac = 0.0 d[2] *= fac realfft(d, mo2, -1) for i in range(1, n+1): d[i] += rn1*(d1*(n-i) + dn*(i-1)) #print d[i-1] for i in range(n): sm[i][1] = d[i-1] return sm ############################################################################## def build_smooths(self, smoothopts, path): need_reg = 1 first = 1 smooths = [] opath = '' for s in smoothopts: #opath += '.%s' % (s) sp = s.split(':') if sp[0] == 'mean': if not len(sp) == 2: raise SmoothError('mean - no averaging period given') try: p = float(sp[1]) opath += '.%s' % (s) except: if sp[1] == '-': # inhibit initial binning p = None else: raise SmoothError('mean - invalid period') smooths.append((self.smooth_mean, (p,), opath)) elif sp[0] == 'sma': if not len(sp) >= 2: raise SmoothError('sma - no span(s) given') opath += sp[0] for span in sp[1:]: try: p = int(span) except: raise SmoothError('sma - invalid span %d' % (span)) opath += ':%s' % (span) smooths.append((self.smooth_sma, (p,), opath)) elif sp[0] == 'ema': if not len(sp) == 2: raise SmoothError('ema - no alpha given') try: p = float(sp[1]) except: raise SmoothError('ema - invalid alpha') opath += '.%s' % (s) smooths.append((self.smooth_ema, (p,), opath)) elif sp[0] == 'sft': if not len(sp) == 2: raise SmoothError('sft - no alpha given') try: p = float(sp[1]) except: raise SmoothError('sft - invalid alpha') opath += '.%s' % (s) smooths.append((self.smooth_sft, (p,), opath)) elif sp[0] == 'han': opath += '.%s' % (s) smooths.append((self.han, (0,), opath)) else: raise SmoothError('smooth \'%s\' not recognised' % (sp[0])) # if not starting with mean smooth normalise time periods if first and sp[0] != 'mean': smooths.insert(0, (self.smooth_mean, (1,), 'mean:1')) first = 0 if path[-1] == '.': opath = opath[1:] self.smooths = smooths self.opath = opath self.filepath = path + opath ############################################################################################################################################################def usage(s): print 'usage %s: <opts> <data file>' % (s) print 'opts:' print '\t-d draw results (default if not saving)' print '\t-s:spec1 -s:spec2 ... smooth specifiers in order applied' print '\t available smooths:' print '\t -s<mean:p> - average over periods p' print '\t -s<sma:r1:r2:r3... - simple moving average successively\n\t over distances r1, r2, r3 ...' print '\t -s<ema:a> - exponential moving average over alpha a' print '\t if no initial mean smooth is given prior to sma or ema a\n period 1 mean will be applied' print '\t-p:x shorthand for -smean:x' print '\t-S save smoothed data' print '\t-i show original data, intermediate results and final data' print '\t-e save s.d. and s.e.m. for averaging periods (output cols. 3/4)' print '\t-h(elp)' sys.exit(1) ##############################################################################def get_file(dir): root = Tk() root.winfo_toplevel().title('Get file:') fnm = tkFileDialog.askopenfilename( \ title='File', initialdir=dir, filetypes=[('All files', '.*')]) root.destroy() del root return fnm ##############################################################################def main(): scriptname = sys.argv[0] p = None pstr = '' savedata = 0 smoothopts = [] smoothed = [] show_stages = 0 savesde = 0 draw = 0 try: optlist, args = getopt.getopt(sys.argv[1:], 'hSs:ip:ed') except getopt.GetoptError, s: print 'ts_smooth: %s' % (s) usage() sys.exit(1) for opt in optlist: if opt[0] == "-h": usage(scriptname) if opt[0] == "-S": savedata = 1 if opt[0] == "-s": smoothopts.append(opt[1]) if opt[0] == "-p": smoothopts.append('mean:' + opt[1]) if opt[0] == "-i": show_stages = 1 if opt[0] == "-e": savesde = 1 if opt[0] == "-d": draw = 1 if not savedata: draw = 1 print args if len(args) == 0: fnm = get_file(os.getcwd()) elif len(args) == 1 and os.path.isdir(args[0]): fnm = get_file(os.path.dirname(args[0])) elif len(args) == 1 and os.path.isfile(args[0]): fnm = args[0] if not fnm: sys.exit(0) basepath = fnm if not smoothopts: print 'No smooth specified' usage() else: try: smoother = TsSmooth(infile=fnm, spec=smoothopts, draw=draw, show_stages=show_stages, savesde=savesde) except NoDataError, s: print s.val sys.exit(1) except SmoothError, s: print s sys.exit(1) if savedata: smoother.save_data() ############################################################################### Call main when run as scriptif __name__ == '__main__': main()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -