⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ghmm.py

📁 General Hidden Markov Model Library 一个通用的隐马尔科夫模型的C代码库
💻 PY
📖 第 1 页 / 共 5 页
字号:
            raise UnsupportedFeature ("the seq_label field is obsolete. If you need it rebuild the GHMM with the conditional \"GHMM_OBSOLETE\".")        return ghmmwrapper.long_array_getitem(self.cseq.seq_label,index)    def setSeqLabel(self,index,value):        if not ghmmwrapper.SEQ_LABEL_FIELD:            raise UnsupportedFeature ("the seq_label field is obsolete. If you need it rebuild the GHMM with the conditional \"GHMM_OBSOLETE\".")        ghmmwrapper.long_array_setitem(self.cseq.seq_label,index,value)    def getGeneratingStates(self):        """        Returns the state paths from which the sequences were generated as a Python list of lists.        """        l_state = []        for i in range(len(self)):            ls_i = []            for j in range(ghmmwrapper.int_array_getitem(self.cseq.states_len,i) ):                ls_i.append(ghmmwrapper.int_matrix_getitem(self.cseq.states,i,j))            l_state.append(ls_i)           return l_state    def getSequence(self, index):        """ Returns the index-th sequence in internal representation"""        seq = []        if self.cseq.seq_number > index:            for j in range(self.cseq.getLength(index)):                seq.append(self.cseq.getSymbol(index, j))            return seq        else:            raise IndexError(str(index) + " is out of bounds, only " + str(self.cseq.seq_number) + "sequences")    def getStateLabel(self,index):        """ Returns the labeling of the index-th sequence in internal representation"""        label = []        if self.cseq.seq_number > index and self.cseq.state_labels != None:            for j in range(self.cseq.getLabelsLength(index)):                    label.append(self.labelDomain.external(ghmmwrapper.int_matrix_getitem(self.cseq.state_labels, index, j)))            return label        else:            raise IndexError(str(0) + " is out of bounds, only " + str(self.cseq.seq_number) + "labels")            def merge(self, emissionSequences): # Only allow EmissionSequence?        """              Merge 'emisisonSequences' with 'self'.             'emisisonSequences' can either be an EmissionSequence or SequenceSet object.        """        if not isinstance(emissionSequences,EmissionSequence) and not isinstance(emissionSequences,SequenceSet):            raise TypeError, "EmissionSequence or SequenceSet required, got " + str(emissionSequences.__class__.__name__)                self.cseq.add(emissionSequences.cseq)        del(emissionSequences) # removing merged sequences    def getSubset(self, seqIndixes):        """ Returns a SequenceSet containing (references to) the sequences with the indixes in            'seqIndixes'.               """        seqNumber = len(seqIndixes)        seq = self.sequenceAllocationFunction(seqNumber)                # checking for state labels in the source C sequence struct        if self.emissionDomain.CDataType == "int" and self.cseq.state_labels is not None:                        log.debug( "SequenceSet: found labels !")            seq.calloc_state_labels()        for i,seq_nr in enumerate(seqIndixes):            len_i = self.cseq.getLength(seq_nr)            seq.setSequence(i, self.cseq.getSequence(seq_nr))            seq.setLength(i, len_i)            seq.setWeight(i, self.cseq.getWeight(i))                         # setting labels if appropriate            if self.emissionDomain.CDataType == "int" and self.cseq.state_labels is not None:                self.cseq.copyStateLabel(seqIndixes[i], seq, seqIndixes[i])        seq.seq_number = seqNumber                return SequenceSetSubset(self.emissionDomain, seq, self)            def write(self,fileName):        "Writes (appends) the SequenceSet into file 'fileName'."        self.cseq.write(fileName)    def asSequenceSet(self):        """conveinence function, returns only self"""        return selfclass SequenceSetSubset(SequenceSet):    """     SequenceSetSubset contains a subset of the sequences from a SequenceSet object.    On the C side only the references are used.    """    def __init__(self, emissionDomain, sequenceSetInput, ParentSequenceSet , labelDomain = None, labelInput = None):        # reference on the parent SequenceSet object        log.debug("SequenceSetSubset.__init__ -- begin -", str(ParentSequenceSet))        self.ParentSequenceSet = ParentSequenceSet        SequenceSet.__init__(self, emissionDomain, sequenceSetInput, labelDomain, labelInput)    def __del__(self):        """ Since we do not want to deallocate the sequence memory, the destructor has to be            overloaded.        """        log.debug( "__del__ SequenceSubSet " + str(self.cseq))                if self.cseq is not None:            self.cseq.subseq_free()        # remove reference on parent SequenceSet object        self.ParentSequenceSet = Nonedef SequenceSetOpen(emissionDomain, fileName):    # XXX Name doof    """ Reads a sequence file with multiple sequence sets.     Returns a list of SequenceSet objects.        """    if not os.path.exists(fileName):        raise IOError, 'File ' + str(fileName) + ' not found.'    if emissionDomain.CDataType == "int":        readFile = ghmmwrapper.ghmm_dseq_read        seqPtr   = ghmmwrapper.dseq_ptr_array_getitem    elif emissionDomain.CDataType == "double":        readFile = ghmmwrapper.ghmm_cseq_read        seqPtr   = ghmmwrapper.cseq_ptr_array_getitem    else:        raise TypeError, "Invalid c data type " + str(emissionDomain.CDataType)    structArray, setNr = readFile(fileName)    # XXX Add Unittest    sequenceSets = [SequenceSet(emissionDomain, seqPtr(structArray, i)) for i in range(setNr)]##    sequenceSets = []##    for i in range(setNr):##        seq = seqPtr(structArray,i)##        sequenceSets.append(SequenceSet(emissionDomain, seq) )##        # setting labels to NULL (XXX only for integer?. DONE by c-constructor) ##        sequenceSets[i].cseq.state_labels = None##        sequenceSets[i].cseq.state_labels_len = None           return  sequenceSetsdef writeToFasta(seqSet,fn):    """    Writes a SequenceSet into a fasta file.    """    if not isinstance(seqSet, SequenceSet):        raise TypeError, "SequenceSet expected."    f = open(fn,'w')        for i in range(len(seqSet)):        rseq = []        for j in range(seqSet.sequenceLength(i)):           rseq.append(str(seqSet.emissionDomain.external(               ghmmwrapper.int_matrix_getitem(seqSet.cseq.seq, i, j)               )))        f.write('>seq'+str(i)+'\n')        f.write(fill(join(rseq,'') ))        f.write('\n')    f.close()    #-------------------------------------------------------------------------------# HMMFactory and derived  -----------------------------------------------------class HMMFactory(object):    """ A HMMFactory is the base class of HMM factories.        A HMMFactory has just a constructor and a () method    """GHMM_FILETYPE_SMO = 'smo'GHMM_FILETYPE_XML = 'xml'GHMM_FILETYPE_HMMER = 'hmm'class HMMOpenFactory(HMMFactory):    """ Opens a HMM from a file. Currently four formats are supported:        HMMer, our smo file format, and two xml formats.        The support for smo files and the old xml format will phase out soon    """    def __init__(self, defaultFileType=None):        self.defaultFileType = defaultFileType    def guessFileType(self, filename):        """ guesses the file format from the filename """        if filename.endswith('.'+GHMM_FILETYPE_XML):            return GHMM_FILETYPE_XML        elif filename.endswith('.'+GHMM_FILETYPE_SMO):            return GHMM_FILETYPE_SMO        elif filename.endswith('.'+GHMM_FILETYPE_HMMER):            return GHMM_FILETYPE_HMMER        else:            return None    def __call__(self, fileName, modelIndex=None, filetype=None):                if not isinstance(fileName,StringIO.StringIO):            if not os.path.exists(fileName):                raise IOError, 'File ' + str(fileName) + ' not found.'        if not filetype:            if self.defaultFileType:                log.warning("HMMOpenHMMER, HMMOpenSMO and HMMOpenXML are deprecated. "                            + "Use HMMOpen and the filetype parameter if needed.")                filetype = self.defaultFileType            else:                filetype = self.guessFileType(fileName)            if not filetype:                raise WrongFileType("Could not guess the type of file " + str(fileName)                                    + " and no filetype specified")                # XML file: both new and old format        if filetype == GHMM_FILETYPE_XML:            # try to validate against ghmm.dtd            if ghmmwrapper.ghmm_xmlfile_validate(fileName):                return self.openNewXML(fileName, modelIndex)            else:                return self.openOldXML(fileName)        elif filetype == GHMM_FILETYPE_SMO:            return self.openSMO(fileName, modelIndex)        elif filetype == GHMM_FILETYPE_HMMER:            return self.openHMMER(fileName)        else:            raise TypeError, "Invalid file type " + str(filetype)    def openNewXML(self, fileName, modelIndex):        """ Open one ore more HMM in the new xml format """        # opens and parses the file        file = ghmmwrapper.ghmm_xmlfile_parse(fileName)        if file == None:            log.debug( "XML has file format problems!")            raise WrongFileType("file is not in GHMM xml format")        nrModels = file.noModels        modelType = file.modelType        # we have a continuous HMM, prepare for hmm creation        if (modelType & ghmmwrapper.kContinuousHMM):            emission_domain = Float()            distribution = ContinuousMixtureDistribution            hmmClass = ContinuousMixtureHMM            getPtr = ghmmwrapper.cmodel_ptr_array_getitem            models = file.model.c        # we have a discrete HMM, prepare for hmm creation        elif ((modelType & ghmmwrapper.kDiscreteHMM)              and not (modelType & ghmmwrapper.kTransitionClasses)              and not (modelType & ghmmwrapper.kPairHMM)):            emission_domain = 'd'            distribution = DiscreteDistribution            getPtr = ghmmwrapper.dmodel_ptr_array_getitem            models = file.model.d            if (modelType & ghmmwrapper.kLabeledStates):                hmmClass = StateLabelHMM            else:                hmmClass = DiscreteEmissionHMM        # currently not supported        else:            raise UnsupportedFeature, "Non-supported model type"        # read all models to list at first        result = []        for i in range(nrModels):            cmodel = getPtr(models,i)            if emission_domain is 'd':                emission_domain = Alphabet([], cmodel.alphabet)            if modelType & ghmmwrapper.kLabeledStates:                labelDomain = LabelDomain([], cmodel.label_alphabet)                m = hmmClass(emission_domain, distribution(emission_domain), labelDomain, cmodel)            else:                m = hmmClass(emission_domain, distribution(emission_domain), cmodel)            result.append(m)        # for a single         if modelIndex != None:            if modelIndex < nrModels:                result = result[modelIndex]            else:                raise IndexError("the file %s has only %s models"% fileName, str(nrModels))        elif nrModels == 1:            result = result[0]        return result    def openOldXML(self, fileName):        from ghmm_gato import xmlutil        hmm_dom = xmlutil.HMM(fileName)        emission_domain = hmm_dom.AlphabetType()        if emission_domain == int:            [alphabets, A, B, pi, state_orders] = hmm_dom.buildMatrices()            emission_domain = Alphabet(alphabets)            distribution = DiscreteDistribution(emission_domain)            # build adjacency list            # check for background distributions            (background_dist, orders, code2name) = hmm_dom.getBackgroundDist()            # (background_dist, orders) = hmm_dom.getBackgroundDist()            bg_list = []            # if background distribution exists, set background distribution here            if background_dist != {}:                # transformation to list for input into BackgroundDistribution,                # ensure the rigth order                for i in range(len(code2name.keys())-1):                    bg_list.append(background_dist[code2name[i]])                bg = BackgroundDistribution(emission_domain, bg_list)            # check for state labels            (label_list, labels) = hmm_dom.getLabels()            if labels == ['None']:                labeldom   = None                label_list = None            else:                labeldom = LabelDomain(labels)            m = HMMFromMatrices(emission_domain, distribution, A, B, pi, None, labeldom, label_list)            # old xml is discrete, set appropiate flag            m.cmodel.setModelTypeFlag(ghmmwrapper.kDiscreteHMM)            if background_dist != {}:                 ids = [-1]*m.N                 for s in hmm_dom.state.values():

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -