📄 ts_smooth.py
字号:
#! /usr/bin/env python################################################################################ ## Copyright 2005 University of Cambridge Computer Laboratory. ## ## This file is part of Nprobe. ## ## Nprobe is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by ## the Free Software Foundation; either version 2 of the License, or ## (at your option) any later version. ## ## Nprobe is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with Nprobe; if not, write to the Free Software ## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA ## ################################################################################################################################################################ ## Smooth time series data by averaging over specified period (in s)## - various specified smooths can be applied, successively if required## #### ############################################################################ import stringimport globimport osfrom os.path import basenameimport sysfrom copy import deepcopyfrom math import floor, sin, pi#import nprobefrom sys import argvimport getoptfrom signal import *from np_plot import *from Tkinter import *import tkSimpleDialog, tkFileDialogfrom np_printplot import OverPrint ############################################################################################################################################################## Can't get data exception#class NoDataError: def __init__(self, val): self.val = val def __str__(self): return self.val ############################################################################################################################################################## Smooth error exception#class SmoothError: def __init__(self, val): self.val = val def __str__(self): return self.val############################################################################## #############################################################################class TsSmooth: def __init__(self, data=None, infile='', spec=None, draw=0, show_stages=0, savefile='', savesde=0): self.data = data self.infile=infile self.spec = spec self.do_draw = draw self.show_stages = show_stages self.savesde = savesde if savefile: self.savefile = savefile elif infile: self.savefile = infile else: self.savefile = 'saved_smooth' self.smooths = [] self.opath = '' if (not data) and infile: self.load_data(infile) if spec: print savefile self.build_smooths(spec, self.savefile) if self.data and self.smooths: self.smooth() if draw: self.draw() ############################################################################## def smooth_data(self, data): self.data = data self.smooth() return self.smoothed_data ############################################################################## def draw(self): if self.savesde: fields = ['X', 'Y', 'SD', 'SE'] else: fields = ['X', 'Y'] self.smoothed[-1].fields = fields if not self.show_stages: smoothed = [self.smoothed[-1]] else: smoothed = self.smoothed try: np_Plot(smoothed, standalone='yes', path=self.savefile, title='%s (smoothed %s)' % (basename(self.infile), self.opath), xlab='time s', ylab='ms') except EmptyDataSetError: print '%s - empty data set' % (ds.path) sys.exit(1) ############################################################################## def smooth(self): if not self.data: raise NoDataError('No data to smooth') if not self.smooths: raise SmoothError('No smooth specification') data = self.data data.sort() i = 0 if self.show_stages: self.smoothed = [DataSet(data, DATA_TS, 'Raw data', i, None)] i += 1 else: self.smoothed = [] regd = 0 #if self.savesde: #fields = ['X', 'Y', ] for smooth in self.smooths: data = smooth[0](data, smooth[1])#[:] self.smoothed.append(DataSet(data, DATA_TS, smooth[2], i, None)) i += 1 self.smoothed_data = data ############################################################################## def get_moothed_data(self): return self.smoothed_data ############################################################################## def load_data(self, file): self.infile = file data = [] try: f = open(file, 'r') except IOError, s: raise NoDataError(s) line = 0 while 1: s = f.readline() line = line +1 if not len(s): #print 'empty' break if s[0] == '#' or s[0] == ' ' or s[0] =='\n': #print 'comment' continue vs = string.split(s) try: data.append([string.atof(vs[0]), string.atof(vs[1])]) except IndexError: raise NoDataError('Malformed data line %d file %s' % (line, file)) if not data: raise NoDataError('No data in file %s' % (file)) self.data = data ############################################################################## def save_data(self): fnm = self.filepath fnm = fnm.replace(':', '-') root = Tk() root.geometry("0x0") results = [] #fnm = '%s.p%s.smoothed.e' % (args[0], pstr) OverPrint(root, title='Save smoothed data', msg='save data as:', entries = [('FILE:', fnm)],results=results) if len(results): fnm = results[0] else: fnm = '' #print results #print fnm while len(fnm): results = [] try: f = open(fnm, 'r') except: # doesn't already exist - carry on break else: # check OverPrint(root, title='Overprint', msg='file already exists - overwrite?', entries = [('FILE:', fnm)],results=results) print results if len(results): if fnm == results[0]: break fnm = results[0] else: fnm = '' break root.destroy() if len(fnm): if self.savesde: fieldstr = '#fields=X:Y:SD:SE' else: fieldstr = '#fields=X:Y' f = open(fnm, 'w') f.write('%s\n' % (fieldstr)) for e in self.smoothed_data: if self.savesde: f.write('%f\t%f\t%f\t%f\n' % (e[0], e[1], e[2], e[3])) else: f.write('%f\t%f\n' % (e[0], e[1])) f.close() print 'File %s written' % (fnm) ############################################################################## def smooth_mean(self, d, args): # # Replace data occuring in each period P with mean for period at period # mid point. Generate SD and SE for each period # p = args[0] if p == None: print 'WARNING not performing initial mean binning' return d ss = self.savesde print 'performing mean smooth, period %d' % (p) start = d[0][0] tm = 0 while tm < start: print tm, p tm += p #tm = start + p n = 0 smoothed = [] agg = 0.0 aggsq = 0.0 for e in d: t = e[0] - start v = e[1] if t > tm: #print '(%.3f) t=%.3f tm=%.3f n=%d' % (e[0], t, tm, n) if n: #print 'ap' M = agg/n #print 'n=%d M=%f agg=%f aggsq=%f ' % (n,M,agg,aggsq), V= (aggsq - (pow(agg, 2)/n))/n #(n-1)? sd = sqrt(V) if ss: smoothed.append([tm, M, sd, sd/sqrt(n)]) else: smoothed.append([tm, M]) #print 'sd=%f stderr=%f' % (sd, sd/sqrt(n)) n = 0 agg = 0.0 aggsq = 0.0 while tm < t: tm += p #print'tm now %.3f' % (tm) n += 1 agg += v aggsq += pow(v, 2) if n: M = agg/n #print 'n=%d M=%f agg=%f aggsq=%f ' % (n,M,agg,aggsq), V= (aggsq - (pow(agg, 2)/n))/n #(n-1)? sd = sqrt(V) if ss: smoothed.append([tm, M, sd, sd/sqrt(n)]) else: smoothed.append([tm, M]) #print 'sd=%f stderr=%f' % (sd, sd/sqrt(n)) #print 'mean smoothed', smoothed return smoothed ############################################################################## ############################################################################## def smooth_meanmin(self, d, args): # # Replace data occuring in each period P with mean for period at period # mid point. Generate SD and SE for each period # p = args[0] if p == None: print 'WARNING not performing initial mean binning' return d ss = self.savesde print 'performing mean smooth, period %d' % (p) start = d[0][0] tm = 0 while tm < start: print tm, p tm += p #tm = start + p n = 0 smoothed = [] Min = 1000000 for e in d: t = e[0] - start v = e[1] if t > tm: #print '(%.3f) t=%.3f tm=%.3f n=%d' % (e[0], t, tm, n) if n: smoothed.append([tm, Min]) Min = 1000000 n = 0 while tm < t: tm += p #print'tm now %.3f' % (tm) Min = min(Min, v) n += 1 if n: smoothed.append([tm, Min]) return smoothed ############################################################################## def smooth_sma(self, d, args): # # Simple moving average # span = args[0] l = len(d) sm = deepcopy(d) d = [e[1] for e in d] print 'performing sma smooth, span %d' % (span) if span == 0: raise SmoothError('sma smooth: span = 0') if span >= l: raise SmoothError('sma smooth: span %d >= data length %d' % (span, l))
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -