📄 mixture.py
字号:
#!/usr/bin/env python2.3# # ghmm example: mixture.py####from ghmm import *import numarrayimport mathimport getopt, sys, stringdef Entropy(prob_dist): """ Returns Entropy for the discrete prob. distribution Entropy is according to natural logarithm (default) or log with the specified base """ result = 0.0 for i in xrange(len(prob_dist)): # we have to deal with zero probs p = prob_dist[i] if p > 0.0: result -= p * math.log(p) # p == 0.0 Note: lim_(x->0) x log(x) = 0 return resultdef sumlogs(a): """ Given a numarray.array a of log p_i, return log(sum p_i) XXX should be coded in C, check whether part of Numarray """ m = max(a) result = 0.0 for x in a: if x >= m: result += 1.0 else: x = x - m if x < -1.0e-16: result += math.exp(x) # XXX use approximation result = math.log(result) result += m return resultdef estimate_mixture(models, seqs, max_iter, eps, alpha=None): """ Given a Python-list of models and a SequenceSet seqs perform an nested EM to estimate maximum-likelihood parameters for the models and the mixture coefficients. The iteration stops after max_iter steps or if the improvement in log-likelihood is less than eps. alpha is a numarray of dimension len(models) containing the mixture coefficients. If alpha is not given, uniform values will be chosen. Result: The models are changed in place. Return value is (l, alpha, P) where l is the final log likelihood of seqs under the mixture, alpha is a numarray of dimension len(models) containing the mixture coefficients and P is a (#sequences x #models)-matrix containing P[model j| sequence i] """ done = 0 iter = 1 last_mixture_likelihood = -99999999.99 # The (nr of seqs x nr of models)-matrix holding the likelihoods l = numarray.zeros((len(seqs), len(models)), numarray.Float) if alpha == None: # Uniform alpha logalpha = numarray.ones(len(models), numarray.Float) * \ math.log(1.0/len(models)) else: logalpha = numarray.log(alpha) print logalpha, numarray.exp(logalpha) log_nrseqs = math.log(len(seqs)) while 1: # Score all sequences with all models for i, m in enumerate(models): loglikelihood = m.loglikelihoods(seqs) # numarray slices: l[:,i] is the i-th column of l l[:,i] = numarray.array(loglikelihood) #print l for i in xrange(len(seqs)): l[i] += logalpha # l[i] = ( log( a_k * P[seq i| model k]) ) #print l mixture_likelihood = numarray.sum(numarray.sum(l)) print "# iter %s joint likelihood = %f" % (iter, mixture_likelihood) improvement = mixture_likelihood - last_mixture_likelihood if iter > max_iter or improvement < eps: break # Compute P[model j| seq i] for i in xrange(len(seqs)): seq_logprob = sumlogs(l[i]) # \sum_{k} a_k P[seq i| model k] l[i] -= seq_logprob # l[i] = ( log P[model j | seq i] ) #print l l_exp = numarray.exp(l) # XXX Use approx with table lookup #print "exp(l)", l_exp #print numarray.sum(numarray.transpose(l_exp)) # Print row sums # Compute priors alpha for i in xrange(len(models)): logalpha[i] = sumlogs(l[:,i]) - log_nrseqs #print "logalpha", logalpha, numarray.exp(logalpha) for j, m in enumerate(models): # Set the sequence weight for sequence i under model m to P[m| i] for i in xrange(len(seqs)): seqs.setWeight(i,l_exp[i,j]) m.baumWelch(seqs, 10, 0.0001) iter += 1 last_mixture_likelihood = mixture_likelihood return (mixture_likelihood, numarray.exp(logalpha), l_exp)def decode_mixture(P, entropy_cutoff): """ Given P, a (#sequences x #models)-matrix where P_{ij} = P[model j| sequence i] return a list of size (#sequences) which contains j, the model which maximizes P[model j| sequence i] for a sequence, if the entropy of the discrete distribution { P[ . | sequence i] } is less than the entropy_cutoff and None else. """ nr_seqs = numarray.shape(P)[0] result = [None] * nr_seqs for i in xrange(nr_seqs): e = Entropy(P[i]) #print e, P[i] if e < entropy_cutoff: result[i] = int(numarray.argmax(P[i])) return resultusage_info = """mixture.py [options] hmms.smo seqs.sqd [hmms-reestimated.smo]Estimate a mixture of hmms from a file hmms.smo containing continousemission HMMs and a set of sequences of reals given in the fileseqs.sqd.Running:-m iter Maximal number of iterations (default is 100)-e eps If the improvement in likelihood is below eps, the training is terminated (default is 0.001)Post-analyis (the options are mutually exclusive):-p Output the matrix p_{ij} = P[model j| sequence i] to the console-c Cluster sequences. Assign each sequence i to the model maximizing P[model j| sequence i]. Outputs seq_id\tcluster_nr to the console -d ent Decode mixture. If the entropy of { P[model j| sequence i] } is less than 'ent', sequence i is assigned to the model maximizing P[model j| sequence i]. Outputs seq_id\tcluster_nr to the console, cluster_nr is None if no assignment was possible Example:mixture.py -m 10 -e 0.1 -d 0.15 test2.smo test100.sqd reestimated.smo"""def usage(): print usage_infoif __name__ == '__main__': # Default values max_iter = 100 eps = 0.001 output = None try: opts, args = getopt.getopt(sys.argv[1:], "m:e:pcd:", []) except getopt.GetoptError: usage() sys.exit(2) for o, a in opts: if o in ['-m']: max_iter = int(a) if o in ['-e']: eps = float(a) if o in ['-p']: output = 'p_matrix' if o in ['-c']: output = 'cluster' if o in ['-d']: output = 'decode' entropy_cutoff = float(a) if len(args) != 3: usage() sys.exit(2) hmmsFileName = args[0] seqsFileName = args[1] outFileName = args[2] models = HMMOpen.all(hmmsFileName) print "# Read %d models from '%s'" % (len(models), hmmsFileName) seqs = SequenceSet(Float(), seqsFileName) print "# Read %d sequences from '%s'" % (len(seqs), seqsFileName) (ml, alpha, P) = estimate_mixture(models, seqs, max_iter, eps) if output != None: if output == 'p_matrix': for i in xrange(len(seqs)): print string.join(map(lambda x:"%1.3f" % x, P[i]), '\t') else: if output == 'cluster': assignment = decode_mixture(P, len(models)) # max ent: log(len(models)) else: assignment = decode_mixture(P, entropy_cutoff) for i, c in enumerate(assignment): print "%s\t%s" % (str(i), str(c))
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -