📄 ghmm.py
字号:
if emission_domain != None and emission_domain != 'int': print "HMMOpenFactory:determineHMMClass: both HMM and SHMM?", emission_domain else: emission_domain = 'int' if shmm != None: if emission_domain != None and emission_domain != 'double': print "HMMOpenFactory:determineHMMClass: both HMM and SHMM?", emission_domain else: emission_domain = 'double' if mvalue != None: M = int(mvalue.group(1)) if densityvalue != None: density = int(densityvalue.group(1)) if cosvalue != None: cos = int(cosvalue.group(1)) file.close() if emission_domain == 'int': # only integer alphabet emission_domain = IntegerRange(0,M) distribution = DiscreteDistribution hmm_class = DiscreteEmissionHMM return (hmm_class, emission_domain, distribution) elif emission_domain == 'double': # M number of mixture components # density component type # cos number of state transition classes if cos == 1 and M == 1 and density == 0: emission_domain = Float() distribution = GaussianDistribution hmm_class = GaussianEmissionHMM return (hmm_class, emission_domain, distribution) elif cos == 1 and M > 1 and density == 0: emission_domain = Float() distribution = GaussianMixtureDistribution hmm_class = GaussianMixtureHMM return (hmm_class, emission_domain, distribution) return (None, None, None) HMMOpenHMMER = HMMOpenFactory(GHMM_FILETYPE_HMMER) # read single HMMER model from fileHMMOpen = HMMOpenFactory(GHMM_FILETYPE_SMO)HMMOpenXML = HMMOpenFactory(GHMM_FILETYPE_XML)def readMultipleHMMERModels(fileName): """ Reads a file containing multiple HMMs in HMMER format, returns list of HMM objects. """ if not path.exists(fileName): raise IOError, 'File ' + str(fileName) + ' not found.' modelList = [] string = "" f = open(fileName,"r") res = re.compile("^//") stat = re.compile("^ACC\s+(\w+)") for line in f.readlines(): string = string + line m = stat.match(line) if m: name = m.group(1) print "reading model " + name + " " match = res.match(line) if match: fileLike = StringIO.StringIO(string) modelList.append(HMMOpenHMMER(fileLike)) string = "" match = None return modelList class HMMFromMatricesFactory(HMMFactory): def __call__(self, emissionDomain, distribution, A, B, pi,hmmName = None, labelList=None): if isinstance(emissionDomain,Alphabet): if isinstance(distribution,DiscreteDistribution): # checking matrix dimensions and argument validation assert len(A) == len(A[0]), "ERROR: A is not quadratic." assert len(pi) == len(A), "ERROR: Length of pi does not match length of A." assert len(A) == len(B), "ERROR: Different number of entries in A and B." assert emissionDomain.size() == len(B[0]) ,"ERROR: EmissionDomain and B do not match." # HMM has discrete emissions over finite alphabet: DiscreteEmissionHMM cmodel = ghmmwrapper.new_model() cmodel.N = len(A) cmodel.M = emissionDomain.size() cmodel.prior = -1 # No prior by default # tie groups are deactivated by default cmodel.tied_to = None # assign model identifier (if specified) if hmmName != None: cmodel.name = hmmName else: cmodel.name = 'Unused' states = ghmmwrapper.arraystate(cmodel.N) silent_flag = 0 silent_states = [] #initialize states for i in range(cmodel.N): state = ghmmwrapper.get_stateptr(states,i) state.b = ghmmhelper.list2arrayd(B[i]) state.pi = pi[i] if (sum(B[i]) == 0 ): silent_states.append(1) silent_flag = 4 else: silent_states.append(0) #set out probabilities state.out_states, state.out_id, state.out_a = ghmmhelper.extract_out(A[i]) #set "in" probabilities # XXX Check whether A is numarray or Python A_col_i = map( lambda x: x[i], A) # Numarray use A[,:i] state.in_states, state.in_id, state.in_a = ghmmhelper.extract_out(A_col_i) #fix probabilities by reestimation, else 0 state.fix = 0 cmodel.s = states cmodel.model_type = silent_flag cmodel.silent = ghmmhelper.list2arrayint(silent_states) # check for state labels if labelList is not None: m = StateLabelHMM(emissionDomain, distribution, cmodel) m.setLabels(labelList) return m else: return DiscreteEmissionHMM(emissionDomain, distribution, cmodel) else: raise GHMMError(type(distribution), "Not a valid distribution for Alphabet") else: if isinstance(distribution,GaussianDistribution): cmodel = ghmmwrapper.new_smodel() cmodel.N = len(A) cmodel.M = 1 # Number of mixture componenent for emission distribution cmodel.prior = -1 # Unused cmodel.cos = 1 # number of transition classes in GHMM states = ghmmwrapper.arraysstate(cmodel.N) # XXX ? switching function ? XXX # Switching functions and transition classes are handled # elswhere #initialize states for i in range(cmodel.N): #print " state " + str(i) + ":" state = ghmmwrapper.get_sstate_ptr(states,i) state.pi = pi[i] # allocate arrays of emmission parameters state.c = ghmmhelper.list2arrayd([1.0]) # Mixture weights. Unused (mu, sigma) = B[i] state.mue = ghmmhelper.list2arrayd([mu]) #mu = mue in GHMM C-lib. state.u = ghmmhelper.list2arrayd([sigma]) #set out probabilities trans = ghmmhelper.extract_out_probs([A[i]], cmodel.cos) # cos = 1 state.out_states = trans[0] state.out_id = trans[1] state.out_a = trans[2].array #set "in" probabilities A_col_i = map( lambda x: x[i], A) trans = ghmmhelper.extract_out_probs([A_col_i],cmodel.cos) # cos = 1 state.in_states = trans[0] state.in_id = trans[1] state.in_a = trans[2].array state.fix = 0 # if fix = 1, exclude state's probabilities from reestimation #append states to model cmodel.s = states return GaussianEmissionHMM(emissionDomain, distribution, cmodel) if isinstance(distribution, GaussianMixtureDistribution): print " ** mixture model" # Interpretation of B matrix for the mixture case (Example with three states and two components each): # B = [ # [ ["mu11","mu12"],["sig11","sig12"],["w11","w12"] ], # [ ["mu21","mu22"],["sig21","sig22"],["w21","w22"] ], # [ ["mu31","mu32"],["sig31","sig32"],["w31","w32"] ], # ] cmodel = ghmmwrapper.new_smodel() cmodel.N = len(A) cmodel.M = len(B[0][0]) # Number of mixture componenent for emission distribution cmodel.prior = -1 # Unused cmodel.cos = 1 # number of transition classes in GHMM states = ghmmwrapper.arraysstate(cmodel.N) # XXX ? switching function ? XXX # Switching functions and transition classes are handled # elswhere #initialize states for i in range(cmodel.N): print " state " + str(i) + ":" state = ghmmwrapper.get_sstate_ptr(states,i) state.pi = pi[i] # allocate arrays of emmission parameters state.c = ghmmhelper.list2arrayd([1.0]) # Mixture weights. Unused mu_list = B[i][0] sigma_list = B[i][1] weight_list = B[i][2] state.mue = ghmmhelper.list2arrayd(mu_list) #mu = mue in GHMM C-lib. state.u = ghmmhelper.list2arrayd(sigma_list) state.c = ghmmhelper.list2arrayd(weight_list) #set out probabilities trans = ghmmhelper.extract_out_probs([A[i]], cmodel.cos) # cos = 1 state.out_states = trans[0] state.out_id = trans[1] state.out_a = trans[2].array #set "in" probabilities A_col_i = map( lambda x: x[i], A) trans = ghmmhelper.extract_out_probs([A_col_i],cmodel.cos) # cos = 1 state.in_states = trans[0] state.in_id = trans[1] state.in_a = trans[2].array state.fix = 0 # if fix = 1, exclude state's probabilities from reestimation #append states to model cmodel.s = states return GaussianMixtureHMM(emissionDomain, distribution, cmodel) else: raise GHMMError(type(distribution), "Cannot construct model for this domain/distribution combination") HMMFromMatrices = HMMFromMatricesFactory()#-------------------------------------------------------------------------------#- HMM and derived class HMM: def __init__(self, emissionDomain, distribution, cmodel): self.emissionDomain = emissionDomain self.distribution = distribution self.cmodel = cmodel self.N = self.cmodel.N # number of states self.M = self.cmodel.M # number of symbols / mixture components self.silent = 0 # flag for silent states # Function pointers to the C level (assigned in derived classes) self.freeFunction = "" # C function for deallocation self.samplingFunction = "" # C function for sequence generation self.viterbiFunction = "" # C function viterbi algorithm self.forwardFunction = "" # C function forward algortihm (likelihood only) self.forwardAlphaFunction = "" # C function forward algorithm (alpha matrix,scale) self.backwardBetaFunction = "" # C function backkward algorithm (beta matrix) self.getStatePtr = "" # C function to get a pointer to a state struct self.getModelPtr = "" # C function to get a pointer to the model struct self.castModelPtr = "" # C function to cast a *model to **model self.distanceFunction = "" # C function to compute a probabilistic distance between models def __del__(self): """ Deallocation routine for the underlying C data structures. """ # print "HMM.__del__", self.cmodel self.freeFunction(self.cmodel) self.cmodel = None def loglikelihood(self, emissionSequences): """ Compute log( P[emissionSequences| model]) using the forward algorithm assuming independence of the sequences in emissionSequences emissionSequences can either be a SequenceSet or a Sequence Result: log( P[emissionSequences| model]) of type float which is computed as \sum_{s} log( P[s| model]) when emissionSequences is a SequenceSet Note: The implementation will not compute the full forward matrix (XXX ToDo) """ if not isinstance(emissionSequences,EmissionSequence) and not isinstance(emissionSequences,SequenceSet): raise TypeError, "EmissionSequence or SequenceSet required, got " + str(emissionSequences.__class__.__name__) logPsum = sum(self.loglikelihoods(emissionSequences) ) return logPsum def loglikelihoods(self, emissionSequences): """ Compute a vector ( log( P[s| model]) )_{s} of log-likelihoods of the individual emission_sequences using the forward algorithm emission_sequences is of type SequenceSet Result: log( P[emissionSequences| model]) of type float (numarray) vector of floats """ if isinstance(emissionSequences,EmissionSequence): seqNumber = 1
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -