⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mixture.py

📁 一个通用的隐性马尔可夫C代码库 开发环境:C语言 简要说明:这是一个通用的隐性马尔可夫C代码库
💻 PY
字号:
#!/usr/bin/env python2.3# # ghmm example: mixture.py####from ghmm import *import numarrayimport mathimport getopt, sys, stringdef Entropy(prob_dist):    """ Returns Entropy for the discrete prob. distribution         Entropy is according to natural logarithm (default)        or log with the specified base """            result = 0.0    for i in xrange(len(prob_dist)):        # we have to deal with zero probs        p = prob_dist[i]         if p > 0.0:            result -= p * math.log(p)        # p == 0.0 Note: lim_(x->0) x log(x) = 0    return resultdef sumlogs(a):    """ Given a numarray.array a of log p_i, return log(sum p_i)                XXX should be coded in C, check whether part of Numarray    """     m = max(a)    result = 0.0    for x in a:        if x >= m:            result += 1.0        else:            x = x - m            if x < -1.0e-16:                result += math.exp(x) # XXX use approximation                result = math.log(result)    result += m    return resultdef estimate_mixture(models, seqs, max_iter, eps, alpha=None):    """ Given a Python-list of models and a SequenceSet seqs        perform an nested EM to estimate maximum-likelihood        parameters for the models and the mixture coefficients.        The iteration stops after max_iter steps or if the        improvement in log-likelihood is less than eps.        alpha is a numarray of dimension len(models) containing        the mixture coefficients. If alpha is not given, uniform        values will be chosen.                Result: The models are changed in place. Return value        is (l, alpha, P) where l is the final log likelihood of        seqs under the mixture, alpha is a numarray of        dimension len(models) containing the mixture coefficients        and P is a (#sequences x #models)-matrix containing        P[model j| sequence i]            """    done = 0    iter = 1    last_mixture_likelihood = -99999999.99    # The (nr of seqs x nr of models)-matrix holding the likelihoods    l = numarray.zeros((len(seqs), len(models)), numarray.Float)    if alpha == None: # Uniform alpha        logalpha = numarray.ones(len(models), numarray.Float) * \                   math.log(1.0/len(models))    else:        logalpha = numarray.log(alpha)    print logalpha, numarray.exp(logalpha)    log_nrseqs = math.log(len(seqs))    while 1:        # Score all sequences with all models        for i, m in enumerate(models):            loglikelihood = m.loglikelihoods(seqs)            # numarray slices: l[:,i] is the i-th column of l            l[:,i] = numarray.array(loglikelihood)        #print l        for i in xrange(len(seqs)):            l[i] += logalpha # l[i] = ( log( a_k * P[seq i| model k]) )        #print l        mixture_likelihood = numarray.sum(numarray.sum(l))        print "# iter %s joint likelihood = %f" % (iter, mixture_likelihood)         improvement = mixture_likelihood - last_mixture_likelihood        if iter > max_iter or improvement < eps:            break        # Compute P[model j| seq i]        for i in xrange(len(seqs)):            seq_logprob = sumlogs(l[i]) # \sum_{k} a_k P[seq i| model k]            l[i] -= seq_logprob # l[i] = ( log P[model j | seq i] )        #print l        l_exp = numarray.exp(l) # XXX Use approx with table lookup        #print "exp(l)", l_exp        #print numarray.sum(numarray.transpose(l_exp)) # Print row sums        # Compute priors alpha        for i in xrange(len(models)):            logalpha[i] = sumlogs(l[:,i]) - log_nrseqs        #print "logalpha", logalpha, numarray.exp(logalpha)        for j, m in enumerate(models):            # Set the sequence weight for sequence i under model m to P[m| i]            for i in xrange(len(seqs)):                seqs.setWeight(i,l_exp[i,j])            m.baumWelch(seqs, 10, 0.0001)        iter += 1        last_mixture_likelihood = mixture_likelihood    return (mixture_likelihood, numarray.exp(logalpha), l_exp)def decode_mixture(P, entropy_cutoff):    """ Given P, a (#sequences x #models)-matrix where P_{ij} =        P[model j| sequence i] return a list of size (#sequences)        which contains j, the model which maximizes  P[model j| sequence i]        for a sequence, if the entropy of the discrete distribution        { P[ . | sequence i] } is less than the entropy_cutoff        and None else.    """    nr_seqs = numarray.shape(P)[0]    result = [None] * nr_seqs    for i in xrange(nr_seqs):        e = Entropy(P[i])        #print e, P[i]        if e < entropy_cutoff:            result[i] = int(numarray.argmax(P[i]))    return resultusage_info = """mixture.py [options] hmms.smo seqs.sqd [hmms-reestimated.smo]Estimate a mixture of hmms from a file hmms.smo containing continousemission HMMs and a set of sequences of reals given in the fileseqs.sqd.Running:-m iter Maximal number of iterations (default is 100)-e eps  If the improvement in likelihood is below eps, the training        is terminated (default is 0.001)Post-analyis (the options are mutually exclusive):-p      Output the matrix p_{ij} = P[model j| sequence i] to the console-c      Cluster sequences. Assign each sequence i to the model maximizing        P[model j| sequence i]. Outputs seq_id\tcluster_nr to the console        -d ent  Decode mixture. If the entropy of { P[model j| sequence i] } is        less than 'ent', sequence i is assigned to the model maximizing        P[model j| sequence i]. Outputs seq_id\tcluster_nr to the console,        cluster_nr is None if no assignment was possible        Example:mixture.py -m 10 -e 0.1 -d 0.15 test2.smo test100.sqd reestimated.smo"""def usage():    print usage_infoif __name__ == '__main__':    # Default values    max_iter = 100    eps = 0.001    output = None        try:        opts, args = getopt.getopt(sys.argv[1:], "m:e:pcd:", [])    except getopt.GetoptError:        usage()        sys.exit(2)            for o, a in opts:        if o in ['-m']:            max_iter = int(a)        if o in ['-e']:            eps = float(a)        if o in ['-p']:            output = 'p_matrix'        if o in ['-c']:            output = 'cluster'        if o in ['-d']:            output = 'decode'            entropy_cutoff = float(a)                if len(args) != 3:        usage()        sys.exit(2)    hmmsFileName = args[0]    seqsFileName = args[1]    outFileName = args[2]    models = HMMOpen.all(hmmsFileName)    print "# Read %d models from '%s'" % (len(models), hmmsFileName)    seqs = SequenceSet(Float(), seqsFileName)    print "# Read %d sequences from '%s'" % (len(seqs), seqsFileName)    (ml, alpha, P) = estimate_mixture(models, seqs, max_iter, eps)    if output != None:        if output == 'p_matrix':            for i in xrange(len(seqs)):                print string.join(map(lambda x:"%1.3f" % x, P[i]), '\t')        else:            if output == 'cluster':                assignment = decode_mixture(P, len(models)) # max ent: log(len(models))            else:                assignment = decode_mixture(P, entropy_cutoff)            for i, c in enumerate(assignment):                print "%s\t%s" % (str(i), str(c))        

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -