📄 ghmm.py
字号:
# print "** SequenceSet.__del__ " + str(self.cseq) self.cleanFunction(self.cseq) self.cseq = None def __str__(self): "Defines string representation." seq = self.cseq strout = "\nNumber of sequences: " + str(seq.seq_number) if self.emissionDomain.CDataType == "int": for i in range(seq.seq_number): strout += "\nSeq " + str(i)+ ", length " + str(ghmmwrapper.get_arrayint(seq.seq_len,i))+ ", weight " + str(ghmmwrapper.get_arrayd(seq.seq_w,i)) + ":\n" for j in range(ghmmwrapper.get_arrayint(seq.seq_len,i) ): strout += str( self.emissionDomain.external(( ghmmwrapper.get_2d_arrayint(self.cseq.seq, i, j) )) ) if self.emissionDomain.CDataType == "double": strout += " " # checking for labels if self.emissionDomain.CDataType == "int" and self.cseq.state_labels != None: strout += "\nState labels:\n" for j in range(ghmmwrapper.get_arrayint(seq.state_labels_len,i) ): strout += str( ghmmwrapper.get_2d_arrayint(seq.state_labels,i,j)) if self.emissionDomain.CDataType == "double": for i in range(seq.seq_number): strout += "\nSeq " + str(i)+ ", length " + str(ghmmwrapper.get_arrayint(seq.seq_len,i))+ ", weight " + str(ghmmwrapper.get_arrayd(seq.seq_w,i)) + ":\n" for j in range(ghmmwrapper.get_arrayint(seq.seq_len,i) ): strout += str( self.emissionDomain.external(( ghmmwrapper.get_2d_arrayd(self.cseq.seq, i, j) )) ) + " " return strout def __len__(self): """ Return the number of sequences in the SequenceSet. """ return self.cseq.seq_number def sequenceLength(self, i): """ Return the lenght of sequence 'i' in the SequenceSet """ return ghmmwrapper.get_arrayint(self.cseq.seq_len,i) def getWeight(self, i): """ Return the weight of sequence i. Weights are used in Baum-Welch""" return ghmmwrapper.get_arrayd(self.cseq.seq_w,i) def setWeight(self, i, w): """ Return the weight of sequence i. Weights are used in Baum-Welch""" return ghmmwrapper.set_arrayd(self.cseq.seq_w,i,w) def __getitem__(self, index): """ Return an EmissionSequence object initialized with a reference to sequence 'index'. """ seq = self.sequenceAllocationFunction(1) seq.seq = self.castPtr(self.__array[index]) # int* -> int** reference ghmmwrapper.set_arrayint(seq.seq_len,0,ghmmwrapper.get_arrayint(self.cseq.seq_len,index)) seq.seq_number = 1 return EmissionSequence(self.emissionDomain, seq) def getLabel(self,index): if (self.emissionDomain.CDataType != "double"): print "WARNING: Discrete sequences do not support sequence labels." else: return ghmmwrapper.get_arrayl(self.cseq.seq_label,index) def setLabel(self,index,value): if (self.emissionDomain.CDataType != "double"): print "WARNING: Discrete sequences do not support sequence labels." else: ghmmwrapper.set_arrayl(self.cseq.seq_label,index,value) def merge(self, emissionSequences): # Only allow EmissionSequence? """ Merge 'emisisonSequences' with 'self'. 'emisisonSequences' can either be an EmissionSequence or SequenceSet object. """ if not isinstance(emissionSequences,EmissionSequence) and not isinstance(emissionSequences,SequenceSet): raise TypeError, "EmissionSequence or SequenceSet required, got " + str(emissionSequences.__class__.__name__) self.addSeqFunction(self.cseq, emissionSequences.cseq) del(emissionSequences) # removing merged sequences def getSubset(self, seqIndixes): """ Returns a SequenceSet containing (references to) the sequences with the indixes in 'seqIndixes'. """ seqNumber = len(seqIndixes) seq = self.sequenceAllocationFunction(seqNumber) seq.seq = self.emptySeq(seqNumber) # checking for state labels in the source C sequence struct if self.emissionDomain.CDataType == "int" and self.cseq.state_labels is not None: print "found labels !" seq.state_labels = ghmmwrapper.int_2d_array_nocols(seqNumber) seq.state_labels_len = ghmmwrapper.int_array(seqNumber) for i in range(seqNumber): self.setSeq(seq.seq,i,self.__array[ seqIndixes[i] ]) ghmmwrapper.set_arrayint(seq.seq_len,i,ghmmwrapper.get_arrayint(self.cseq.seq_len,seqIndixes[i])) # Above doesnt copy seq_id or seq_label or seq_w seq_id = int(ghmmwrapper.get_arrayd(self.cseq.seq_id, seqIndixes[i])) ghmmwrapper.set_arrayd(seq.seq_id, i, seq_id) seq_label = ghmmwrapper.get_arrayl(self.cseq.seq_label, i) ghmmwrapper.set_arrayl(seq.seq_label, i, int(seq_label)) seq_w = ghmmwrapper.get_arrayd(self.cseq.seq_w, i) ghmmwrapper.set_arrayd(seq.seq_w, i, seq_w) # setting labels if appropriate if self.emissionDomain.CDataType == "int" and self.cseq.state_labels is not None: self.setSeq(seq.state_labels,i, ghmmwrapper.get_col_pointer_int( self.cseq.state_labels,seqIndixes[i] ) ) ghmmwrapper.set_arrayint(seq.state_labels_len,i,ghmmwrapper.get_arrayint(self.cseq.state_labels_len, seqIndixes[i]) ) seq.seq_number = seqNumber return SequenceSet(self.emissionDomain, seq) def write(self,fileName): "Writes (appends) the SequenceSet into file 'fileName'." # different function signatures require explicit check for C data type if self.emissionDomain.CDataType == "int": ghmmwrapper.call_sequence_print(fileName, self.cseq) if self.emissionDomain.CDataType == "double": ghmmwrapper.call_sequence_d_print(fileName, self.cseq,0) def SequenceSetOpen(emissionDomain, fileName): """ Reads a sequence file with multiple sequence sets. Returns a list of SequenceSet objects. """ if not path.exists(fileName): raise IOError, 'File ' + str(fileName) + ' not found.' if emissionDomain.CDataType == "int": readFile = ghmmwrapper.sequence_read seqPtr = ghmmwrapper.get_seq_ptr elif emissionDomain.CDataType == "double": readFile = ghmmwrapper.sequence_d_read seqPtr = ghmmwrapper.get_seq_d_ptr dArr = ghmmwrapper.int_array(1) structArray = readFile(fileName, dArr) setNr = ghmmwrapper.get_arrayint(dArr,0) sequenceSets = [] for i in range(setNr): seq = seqPtr(structArray,i) sequenceSets.append(SequenceSet(emissionDomain, seq) ) # setting labels to NULL sequenceSets[i].cseq.state_labels = None sequenceSets[i].cseq.state_labels_len = None ghmmwrapper.freearray(dArr) return sequenceSets#-------------------------------------------------------------------------------# HMMFactory and derived -----------------------------------------------------class HMMFactory: """ A HMMFactory is the base class of HMM factories. A HMMFactory has just a constructor and a () method """GHMM_FILETYPE_SMO = 'smo'GHMM_FILETYPE_XML = 'xml'GHMM_FILETYPE_HMMER = 'hmm'# XXX modelName support for all file formatsclass HMMOpenFactory(HMMFactory): def __init__(self, defaultFileType=None): if defaultFileType: self.defaultFileType = defaultFileType def __call__(self, fileName, modelIndex = None): if not path.exists(fileName): raise IOError, 'File ' + str(fileName) + ' not found.' if self.defaultFileType == GHMM_FILETYPE_XML: # XML file print "File type: XML" hmm_dom = xmlutil.HMM(fileName) emission_domain = hmm_dom.AlphabetType() if emission_domain == int: [alphabets, A, B, pi, state_orders] = hmm_dom.buildMatrices() emission_domain = Alphabet(alphabets) distribution = DiscreteDistribution(emission_domain) # build adjacency list print A print B print pi # check for background distributions (background_dist, orders) = hmm_dom.getBackgroundDist() if len(background_dist.keys()) > 0: # if background distribution exists, set background distribution here cpt_background = ghmmwrapper.background_distributions() cpt_background.n = len(background_dist.keys()) cpt_background.oder = ghmmwrapper.int_array(cpt_background.n) max_len = max( map(lambda x: len(x), background_dist.values )) cpt_background.b = ghmmwrapper.double_2d_array(cpt_background.n, max_len) name2code = {} code = 0 for name in background_dist.keys(): name2code[name] = code code += 1 for name in background_dist.keys(): i = name2code[name] ghmmwrapper.set_arrayint(cpt_background.oder, i,orders[name]) for j in range(len(background_dist[name])): ghmmwrapper.set_2d_arrayd(cpt_background.b,i,j, background_dist[j]) ids = [-1]*10 for s in hmmdom.states.values(): ids[s.index] = s.background print ids # XXX add background distribution to the model, how? m = HMMFromMatrices(emission_domain, distribution, A, B, pi) return m elif self.defaultFileType == GHMM_FILETYPE_SMO: # MO & SMO Files (hmmClass, emission_domain, distribution) = self.determineHMMClass(fileName) #print "determineHMMClass = ", (hmmClass, emission_domain, distribution) nrModelPtr = ghmmwrapper.int_array(1) # XXX broken since silent states are not supported by .smo file format XXX if hmmClass == DiscreteEmissionHMM: print nrModelPtr models = ghmmwrapper.model_read(fileName, nrModelPtr) getPtr = ghmmwrapper.get_model_ptr else: models = ghmmwrapper.smodel_read(fileName, nrModelPtr) getPtr = ghmmwrapper.get_smodel_ptr nrModels = ghmmwrapper.get_arrayint(nrModelPtr, 0) print nrModels if modelIndex == None: cmodel = getPtr(models, 0) # XXX Who owns the pointer? print cmodel else: if modelIndex < nrModels: cmodel = getPtr(models, modelIndex) # XXX Who owns the pointer? else: return None m = hmmClass(emission_domain, distribution(emission_domain), cmodel) return m else: # HMMER format models h = modhmmer.hmmer(fileName) if h.m == 4: # DNA model emission_domain = DNA elif h.m == 20: # Peptide model emission_domain = AminoAcids else: # some other model emission_domain = IntegerRange(0,h.m) distribution = DiscreteDistribution(emission_domain) [A,B,pi,modelName] = h.getGHMMmatrices() return HMMFromMatrices(emission_domain, distribution, A, B, pi,hmmName=modelName) def all(self, fileName): if not path.exists(fileName): raise IOError, 'File ' + str(fileName) + ' not found.' # MO & SMO Files (hmmClass, emission_domain, distribution) = self.determineHMMClass(fileName) nrModelPtr = ghmmwrapper.int_array(1) if hmmClass == DiscreteEmissionHMM: models = hmmwrapper.model_read(fileName, nrModelPtr) getPtr = ghmmwrapper.get_model_ptr else: models = ghmmwrapper.smodel_read(fileName, nrModelPtr) getPtr = ghmmwrapper.get_smodel_ptr nrModels = ghmmwrapper.get_arrayint(nrModelPtr, 0) result = [] for i in range(nrModels): cmodel = getPtr(models, i) m = hmmClass(emission_domain, distribution(emission_domain), cmodel) result.append(m) return result def determineHMMClass(self, fileName): # # smo files # file = open(fileName,'r') hmmRe = re.compile("^HMM\s*=") shmmRe = re.compile("^SHMM\s*=") mvalueRe = re.compile("M\s*=\s*([0-9]+)") densityvalueRe = re.compile("density\s*=\s*([0-9]+)") cosvalueRe = re.compile("cos\s*=\s*([0-9]+)") emission_domain = None while 1: l = file.readline() if not l: break l = l.strip() if len(l) > 0 and l[0] != '#': # Not a comment line hmm = hmmRe.search(l) shmm = shmmRe.search(l) mvalue = mvalueRe.search(l) densityvalue = densityvalueRe.search(l) cosvalue = cosvalueRe.search(l) if hmm != None:
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -