📄 ghmm.py
字号:
elif isinstance(emissionSequences,SequenceSet): seqNumber = len(emissionSequences) else: raise TypeError, "EmissionSequence or SequenceSet required, got " + \ str(emissionSequences.__class__.__name__) likelihood = ghmmwrapper.double_array(1) likelihoodList = [] for i in range(seqNumber): seq = emissionSequences.getPtr(emissionSequences.cseq.seq,i) tmp = ghmmwrapper.get_arrayint(emissionSequences.cseq.seq_len,i) ret_val = self.forwardFunction(self.cmodel, seq, tmp, likelihood) if ret_val == -1: print "Warning: forward returned -1: Sequence", i,"cannot be build." # XXX Eventually this should trickle down to C-level # Returning -DBL_MIN instead of infinity is stupid, since the latter allows # to continue further computations with that inf, which causes # things to blow up later. # forwardFunction could do without a return value if -Inf is returned # What should be the semantics in case of computing the likelihood of # a set of sequences likelihoodList.append(-float('Inf')) else: likelihoodList.append(ghmmwrapper.get_arrayd(likelihood,0)) ghmmwrapper.free_arrayd(likelihood) likelihood = None return likelihoodList ## Further Marginals ... def logprob(self, emissionSequence, stateSequence): """log P[ emissionSequence, stateSequence| m] Defined in derived classes. """ pass # The functions for model training are defined in the derived classes. def baumWelch(self, trainingSequences, nrSteps, loglikelihoodCutoff): pass def baumWelchSetup(self, trainingSequences, nrSteps): pass def baumWelchStep(self, nrSteps, loglikelihoodCutoff): pass def baumWelchDelete(self): pass # extern double smodel_prob_distance(smodel *cm0, smodel *cm, int maxT, int symmetric, int verbose); def distance(self,model, seqLength): """ Returns the distance between 'self.cmodel' and 'model'. """ return self.distanceFunction(self.cmodel, model.cmodel, seqLength,0,0) def forward(self, emissionSequence): """ Result: the (N x T)-matrix containing the forward-variables and the scaling vector """ if not isinstance(emissionSequence,EmissionSequence): raise TypeError, "EmissionSequence required, got " + str(emissionSequence.__class__.__name__) unused = ghmmwrapper.double_array(1) # Dummy return value for forwardAlphaFunction t = len(emissionSequence) calpha = ghmmwrapper.matrix_d_alloc(t,self.N) cscale = ghmmwrapper.double_array(t) seq = emissionSequence.getPtr(emissionSequence.cseq.seq,0) error = self.forwardAlphaFunction(self.cmodel, seq,t, calpha, cscale, unused) if error == -1: print "ERROR: forward finished with -1: EmissionSequence cannot be build." exit # translate alpha / scale to python lists pyscale = ghmmhelper.arrayd2list(cscale, t) pyalpha = ghmmhelper.matrixd2list(calpha,t,self.N) # deallocation ghmmwrapper.freearray(unused) ghmmwrapper.freearray(cscale) ghmmwrapper.free_2darrayd(calpha,t) unused = None csale = None calpha = None return (pyalpha,pyscale) def backward(self, emissionSequence, scalingVector): """ Result: the (N x T)-matrix containing the backward-variables """ if not isinstance(emissionSequence,EmissionSequence): raise TypeError, "EmissionSequence required, got " + str(emissionSequence.__class__.__name__) seq = emissionSequence.getPtr(emissionSequence.cseq.seq,0) # parsing 'scalingVector' to C double array. cscale = ghmmhelper.list2arrayd(scalingVector) # alllocating beta matrix t = len(emissionSequence) cbeta = ghmmwrapper.double_2d_array(t, self.N) error = self.backwardBetaFunction(self.cmodel,seq,t,cbeta,cscale) if error == -1: print "ERROR: backward finished with -1: EmissionSequence cannot be build." exit pybeta = ghmmhelper.matrixd2list(cbeta,t,self.N) # deallocation ghmmwrapper.freearray(cscale) ghmmwrapper.free_2darrayd(cbeta,t) cscale = None cbeta = None return pybeta def viterbi(self, emissionSequences): """ Compute the Viterbi-path for each sequence in emissionSequences emission_sequences can either be a SequenceSet or an EmissionSequence Result: [q_0, ..., q_T] the viterbi-path of emission_sequences is an emmissionSequence object, [[q_0^0, ..., q_T^0], ..., [q_0^k, ..., q_T^k]} for a k-sequence SequenceSet """ if isinstance(emissionSequences,EmissionSequence): seqNumber = 1 elif isinstance(emissionSequences,SequenceSet): seqNumber = len(emissionSequences) else: raise TypeError, "EmissionSequence or SequenceSet required, got " + str(emissionSequences.__class__.__name__) log_p = ghmmwrapper.double_array(1) allPaths = [] for i in range(seqNumber): seq = emissionSequences.getPtr(emissionSequences.cseq.seq,i) seq_len = ghmmwrapper.get_arrayint(emissionSequences.cseq.seq_len,i) viterbiPath = self.viterbiFunction(self.cmodel,seq,seq_len,log_p) onePath = [] # for model types without possible silent states the length of the viterbi path is known if self.silent == 0: for i in range(seq_len): onePath.append(ghmmwrapper.get_arrayint(viterbiPath,i)) # in the silent case we have to reversely append as long as the path is positive because unused positions # are initialised with -1 on the C level. elif self.silent == 1: for i in range( ( seq_len * self.N )-1,-1,-1): # maximum length of a viterbi path for a silent model d = ghmmwrapper.get_arrayint(viterbiPath,i) if d >= 0: onePath.insert(0,d) else: break allPaths.append(onePath) ghmmwrapper.free_arrayd(log_p) ghmmwrapper.free_arrayi(viterbiPath) viterbiPath = None log_p = None if emissionSequences.cseq.seq_number > 1: return allPaths else: return allPaths[0] def sample(self, seqNr ,T, seed = 0): """ Sample emission sequences """ seqPtr = self.samplingFunction(self.cmodel,seed,T,seqNr,self.N) seqPtr.state_labels = None seqPtr.state_labels_len = None return SequenceSet(self.emissionDomain,seqPtr) def sampleSingle(self, T, seed = 0): """ Sample a single emission sequence of length at most T. Returns a Sequence object. """ seqPtr = self.samplingFunction(self.cmodel,seed,T,1,self.N) seqPtr.state_labels = None seqPtr.state_labels_len = None return EmissionSequence(self.emissionDomain,seqPtr) def state(self, stateLabel): """ Given a stateLabel return the integer index to the state (state labels not yet implemented) """ pass def getInitial(self, i): """ Accessor function for the initial probability \pi_i """ state = self.getStatePtr(self.cmodel.s,i) return state.pi def setInitial(self, i, prob, fixProb=0): """ Accessor function for the initial probability \pi_i For 'fixProb' = 1 \pi will be rescaled to 1 with 'pi[i]' fixed to the arguement value of 'prob'. """ state = self.getStatePtr(self.cmodel.s,i) old_pi = state.pi state.pi = prob # renormalizing pi, pi(i) is fixed on value 'prob' if fixProb == 1: coeff = (1.0 - old_pi) / prob for j in range(self.N): if i != j: state = self.getStatePtr(self.cmodel.s,j) p = state.pi state.pi = p / coeff def getTransition(self, i, j): """ Accessor function for the transition a_ij """ state = self.getStatePtr(self.cmodel.s,i) # ensure proper indices assert 0 <= i < self.N, "Index " + str(i) + " out of bounds." assert 0 <= j < self.N, "Index " + str(j) + " out of bounds." transition = None for i in range(state.out_states): stateId = ghmmwrapper.get_arrayint(state.out_id,i) if stateId == j: transition = ghmmwrapper.get_arrayd(state.out_a,i) break if transition: return transition else: return 0 def setTransition(self, i, j, prob): """ Accessor function for the transition a_ij. """ # ensure proper indices assert 0 <= i < self.N, "Index " + str(i) + " out of bounds." assert 0 <= j < self.N, "Index " + str(j) + " out of bounds." ghmmwrapper.model_set_transition(self.cmodel, i, j, prob) def getEmission(self, i): """ Accessor function for the emission distribution parameters of state 'i'. For discrete models the distribution over the symbols is returned, for continous models a matrix of the form [ [mu_1, sigma_1, weight_1] ... [mu_M, sigma_M, weight_M] ] is returned. """ if self.emissionDomain.CDataType == "int": # discrete emissions. state = self.getStatePtr(self.cmodel.s,i) emissions = ghmmhelper.arrayd2list(state.b,self.M) return emissions elif self.emissionDomain.CDataType == "double": # continous emissions state = self.getStatePtr(self.cmodel.s,i) emParam = [] for i in range(self.M): mixComp = [] mixComp.append(ghmmwrapper.get_arrayd(state.mue,i) ) mixComp.append(ghmmwrapper.get_arrayd(state.u,i) ) mixComp.append(ghmmwrapper.get_arrayd(state.c,i) ) emParam.append(mixComp) return emParam def setEmission(self, i, distributionParemters): """ Set the emission distribution parameters Defined in derived classes. """ pass def toMatrices(self): "To be defined in derived classes." print "Root class function." pass def normalize(self): """ Normalize transition probs, emission probs (if applicable) Defined in derived classes. """ pass def randomize(self, noiseLevel): """ """ pass def write(self,fileName): """ Writes HMM to file 'fileName'. """ self.fileWriteFunction(fileName,self.cmodel)def HMMwriteList(fileName,hmmList): if path.exists(fileName):
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -