📄 ghmm.py
字号:
raise UnsupportedFeature ("the seq_label field is obsolete. If you need it rebuild the GHMM with the conditional \"GHMM_OBSOLETE\".") return ghmmwrapper.long_array_getitem(self.cseq.seq_label,index) def setSeqLabel(self,index,value): if not ghmmwrapper.SEQ_LABEL_FIELD: raise UnsupportedFeature ("the seq_label field is obsolete. If you need it rebuild the GHMM with the conditional \"GHMM_OBSOLETE\".") ghmmwrapper.long_array_setitem(self.cseq.seq_label,index,value) def getGeneratingStates(self): """ Returns the state paths from which the sequences were generated as a Python list of lists. """ l_state = [] for i in range(len(self)): ls_i = [] for j in range(ghmmwrapper.int_array_getitem(self.cseq.states_len,i) ): ls_i.append(ghmmwrapper.int_matrix_getitem(self.cseq.states,i,j)) l_state.append(ls_i) return l_state def getSequence(self, index): """ Returns the index-th sequence in internal representation""" seq = [] if self.cseq.seq_number > index: for j in range(self.cseq.getLength(index)): seq.append(self.cseq.getSymbol(index, j)) return seq else: raise IndexError(str(index) + " is out of bounds, only " + str(self.cseq.seq_number) + "sequences") def getStateLabel(self,index): """ Returns the labeling of the index-th sequence in internal representation""" label = [] if self.cseq.seq_number > index and self.cseq.state_labels != None: for j in range(self.cseq.getLabelsLength(index)): label.append(self.labelDomain.external(ghmmwrapper.int_matrix_getitem(self.cseq.state_labels, index, j))) return label else: raise IndexError(str(0) + " is out of bounds, only " + str(self.cseq.seq_number) + "labels") def merge(self, emissionSequences): # Only allow EmissionSequence? """ Merge 'emisisonSequences' with 'self'. 'emisisonSequences' can either be an EmissionSequence or SequenceSet object. """ if not isinstance(emissionSequences,EmissionSequence) and not isinstance(emissionSequences,SequenceSet): raise TypeError, "EmissionSequence or SequenceSet required, got " + str(emissionSequences.__class__.__name__) self.cseq.add(emissionSequences.cseq) del(emissionSequences) # removing merged sequences def getSubset(self, seqIndixes): """ Returns a SequenceSet containing (references to) the sequences with the indixes in 'seqIndixes'. """ seqNumber = len(seqIndixes) seq = self.sequenceAllocationFunction(seqNumber) # checking for state labels in the source C sequence struct if self.emissionDomain.CDataType == "int" and self.cseq.state_labels is not None: log.debug( "SequenceSet: found labels !") seq.calloc_state_labels() for i,seq_nr in enumerate(seqIndixes): len_i = self.cseq.getLength(seq_nr) seq.setSequence(i, self.cseq.getSequence(seq_nr)) seq.setLength(i, len_i) seq.setWeight(i, self.cseq.getWeight(i)) # setting labels if appropriate if self.emissionDomain.CDataType == "int" and self.cseq.state_labels is not None: self.cseq.copyStateLabel(seqIndixes[i], seq, seqIndixes[i]) seq.seq_number = seqNumber return SequenceSetSubset(self.emissionDomain, seq, self) def write(self,fileName): "Writes (appends) the SequenceSet into file 'fileName'." self.cseq.write(fileName) def asSequenceSet(self): """conveinence function, returns only self""" return selfclass SequenceSetSubset(SequenceSet): """ SequenceSetSubset contains a subset of the sequences from a SequenceSet object. On the C side only the references are used. """ def __init__(self, emissionDomain, sequenceSetInput, ParentSequenceSet , labelDomain = None, labelInput = None): # reference on the parent SequenceSet object log.debug("SequenceSetSubset.__init__ -- begin -", str(ParentSequenceSet)) self.ParentSequenceSet = ParentSequenceSet SequenceSet.__init__(self, emissionDomain, sequenceSetInput, labelDomain, labelInput) def __del__(self): """ Since we do not want to deallocate the sequence memory, the destructor has to be overloaded. """ log.debug( "__del__ SequenceSubSet " + str(self.cseq)) if self.cseq is not None: self.cseq.subseq_free() # remove reference on parent SequenceSet object self.ParentSequenceSet = Nonedef SequenceSetOpen(emissionDomain, fileName): # XXX Name doof """ Reads a sequence file with multiple sequence sets. Returns a list of SequenceSet objects. """ if not os.path.exists(fileName): raise IOError, 'File ' + str(fileName) + ' not found.' if emissionDomain.CDataType == "int": readFile = ghmmwrapper.ghmm_dseq_read seqPtr = ghmmwrapper.dseq_ptr_array_getitem elif emissionDomain.CDataType == "double": readFile = ghmmwrapper.ghmm_cseq_read seqPtr = ghmmwrapper.cseq_ptr_array_getitem else: raise TypeError, "Invalid c data type " + str(emissionDomain.CDataType) structArray, setNr = readFile(fileName) # XXX Add Unittest sequenceSets = [SequenceSet(emissionDomain, seqPtr(structArray, i)) for i in range(setNr)]## sequenceSets = []## for i in range(setNr):## seq = seqPtr(structArray,i)## sequenceSets.append(SequenceSet(emissionDomain, seq) )## # setting labels to NULL (XXX only for integer?. DONE by c-constructor) ## sequenceSets[i].cseq.state_labels = None## sequenceSets[i].cseq.state_labels_len = None return sequenceSetsdef writeToFasta(seqSet,fn): """ Writes a SequenceSet into a fasta file. """ if not isinstance(seqSet, SequenceSet): raise TypeError, "SequenceSet expected." f = open(fn,'w') for i in range(len(seqSet)): rseq = [] for j in range(seqSet.sequenceLength(i)): rseq.append(str(seqSet.emissionDomain.external( ghmmwrapper.int_matrix_getitem(seqSet.cseq.seq, i, j) ))) f.write('>seq'+str(i)+'\n') f.write(fill(join(rseq,'') )) f.write('\n') f.close() #-------------------------------------------------------------------------------# HMMFactory and derived -----------------------------------------------------class HMMFactory(object): """ A HMMFactory is the base class of HMM factories. A HMMFactory has just a constructor and a () method """GHMM_FILETYPE_SMO = 'smo'GHMM_FILETYPE_XML = 'xml'GHMM_FILETYPE_HMMER = 'hmm'class HMMOpenFactory(HMMFactory): """ Opens a HMM from a file. Currently four formats are supported: HMMer, our smo file format, and two xml formats. The support for smo files and the old xml format will phase out soon """ def __init__(self, defaultFileType=None): self.defaultFileType = defaultFileType def guessFileType(self, filename): """ guesses the file format from the filename """ if filename.endswith('.'+GHMM_FILETYPE_XML): return GHMM_FILETYPE_XML elif filename.endswith('.'+GHMM_FILETYPE_SMO): return GHMM_FILETYPE_SMO elif filename.endswith('.'+GHMM_FILETYPE_HMMER): return GHMM_FILETYPE_HMMER else: return None def __call__(self, fileName, modelIndex=None, filetype=None): if not isinstance(fileName,StringIO.StringIO): if not os.path.exists(fileName): raise IOError, 'File ' + str(fileName) + ' not found.' if not filetype: if self.defaultFileType: log.warning("HMMOpenHMMER, HMMOpenSMO and HMMOpenXML are deprecated. " + "Use HMMOpen and the filetype parameter if needed.") filetype = self.defaultFileType else: filetype = self.guessFileType(fileName) if not filetype: raise WrongFileType("Could not guess the type of file " + str(fileName) + " and no filetype specified") # XML file: both new and old format if filetype == GHMM_FILETYPE_XML: # try to validate against ghmm.dtd if ghmmwrapper.ghmm_xmlfile_validate(fileName): return self.openNewXML(fileName, modelIndex) else: return self.openOldXML(fileName) elif filetype == GHMM_FILETYPE_SMO: return self.openSMO(fileName, modelIndex) elif filetype == GHMM_FILETYPE_HMMER: return self.openHMMER(fileName) else: raise TypeError, "Invalid file type " + str(filetype) def openNewXML(self, fileName, modelIndex): """ Open one ore more HMM in the new xml format """ # opens and parses the file file = ghmmwrapper.ghmm_xmlfile_parse(fileName) if file == None: log.debug( "XML has file format problems!") raise WrongFileType("file is not in GHMM xml format") nrModels = file.noModels modelType = file.modelType # we have a continuous HMM, prepare for hmm creation if (modelType & ghmmwrapper.kContinuousHMM): emission_domain = Float() distribution = ContinuousMixtureDistribution hmmClass = ContinuousMixtureHMM getPtr = ghmmwrapper.cmodel_ptr_array_getitem models = file.model.c # we have a discrete HMM, prepare for hmm creation elif ((modelType & ghmmwrapper.kDiscreteHMM) and not (modelType & ghmmwrapper.kTransitionClasses) and not (modelType & ghmmwrapper.kPairHMM)): emission_domain = 'd' distribution = DiscreteDistribution getPtr = ghmmwrapper.dmodel_ptr_array_getitem models = file.model.d if (modelType & ghmmwrapper.kLabeledStates): hmmClass = StateLabelHMM else: hmmClass = DiscreteEmissionHMM # currently not supported else: raise UnsupportedFeature, "Non-supported model type" # read all models to list at first result = [] for i in range(nrModels): cmodel = getPtr(models,i) if emission_domain is 'd': emission_domain = Alphabet([], cmodel.alphabet) if modelType & ghmmwrapper.kLabeledStates: labelDomain = LabelDomain([], cmodel.label_alphabet) m = hmmClass(emission_domain, distribution(emission_domain), labelDomain, cmodel) else: m = hmmClass(emission_domain, distribution(emission_domain), cmodel) result.append(m) # for a single if modelIndex != None: if modelIndex < nrModels: result = result[modelIndex] else: raise IndexError("the file %s has only %s models"% fileName, str(nrModels)) elif nrModels == 1: result = result[0] return result def openOldXML(self, fileName): from ghmm_gato import xmlutil hmm_dom = xmlutil.HMM(fileName) emission_domain = hmm_dom.AlphabetType() if emission_domain == int: [alphabets, A, B, pi, state_orders] = hmm_dom.buildMatrices() emission_domain = Alphabet(alphabets) distribution = DiscreteDistribution(emission_domain) # build adjacency list # check for background distributions (background_dist, orders, code2name) = hmm_dom.getBackgroundDist() # (background_dist, orders) = hmm_dom.getBackgroundDist() bg_list = [] # if background distribution exists, set background distribution here if background_dist != {}: # transformation to list for input into BackgroundDistribution, # ensure the rigth order for i in range(len(code2name.keys())-1): bg_list.append(background_dist[code2name[i]]) bg = BackgroundDistribution(emission_domain, bg_list) # check for state labels (label_list, labels) = hmm_dom.getLabels() if labels == ['None']: labeldom = None label_list = None else: labeldom = LabelDomain(labels) m = HMMFromMatrices(emission_domain, distribution, A, B, pi, None, labeldom, label_list) # old xml is discrete, set appropiate flag m.cmodel.setModelTypeFlag(ghmmwrapper.kDiscreteHMM) if background_dist != {}: ids = [-1]*m.N for s in hmm_dom.state.values():
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -