📄 ghmm.py
字号:
ghmmwrapper.free(tmp) else: raise UnsupportedFeature("asci sequence files are deprecated. Please convert your files" + " to the new xml-format or rebuild the GHMM with" + " the conditional \"GHMM_OBSOLETE\".") #create a ghmm_dseq with state_labels, if the appropiate parameters are set elif isinstance(sequenceInput, list): internalInput = self.emissionDomain.internalSequence(sequenceInput) seq = self.sequence_carray(internalInput) self.cseq = self.sequenceAllocationFunction(seq, len(sequenceInput)) if labelInput is not None and labelDomain is not None: assert len(sequenceInput)==len(labelInput), "Length of the sequence and labels don't match." assert isinstance(labelInput, list), "expected a list of labels." assert isinstance(labelDomain, LabelDomain), "labelDomain is not a LabelDomain class." self.labelDomain = labelDomain #translate the external labels in internal internalLabel = self.labelDomain.internalSequence(labelInput) label = ghmmhelper.list2int_array(internalLabel) self.cseq.init_labels(label, len(internalInput)) # internal use elif isinstance(sequenceInput, ghmmwrapper.ghmm_dseq) or isinstance(sequenceInput, ghmmwrapper.ghmm_cseq): if sequenceInput.seq_number > 1: raise badCPointer, "Use SequenceSet for multiple sequences." self.cseq = sequenceInput if labelDomain != None: self.labelDomain = labelDomain else: raise UnknownInputType, "inputType " + str(type(sequenceInput)) + " not recognized." def __del__(self): "Deallocation of C sequence struct." log.debug( "__del__ EmissionSequence " + str(self.cseq)) # if a parent SequenceSet exits, we use cseq.subseq_free() to free memory if self.ParentSequenceSet is not None: self.cseq.subseq_free() def __len__(self): "Returns the length of the sequence." return self.cseq.getLength(0) def __setitem__(self, index, value): internalValue = self.emissionDomain.internal(value) self.cseq.setSymbol(0, index, internalValue) def __getitem__(self, index): """ Return the symbol at position 'index'. """ if index < len(self): return self.cseq.getSymbol(0, index) else: raise IndexError def getSeqLabel(self): if not ghmmwrapper.SEQ_LABEL_FIELD: raise UnsupportedFeature ("the seq_label field is obsolete. If you need it rebuild the GHMM with the conditional \"GHMM_OBSOLETE\".") return ghmmwrapper.long_array_getitem(self.cseq.seq_label,0) def setSeqLabel(self,value): if not ghmmwrapper.SEQ_LABEL_FIELD: raise UnsupportedFeature ("the seq_label field is obsolete. If you need it rebuild the GHMM with the conditional \"GHMM_OBSOLETE\".") ghmmwrapper.long_array_setitem(self.cseq.seq_label,0,value) def getStateLabel(self): """ Returns the labeling of the sequence in external representation""" if self.cseq.state_labels != None: iLabel = ghmmhelper.int_array2list(self.cseq.getLabels(0), self.cseq.getLabelsLength(0)) return self.labelDomain.externalSequence(iLabel) else: raise IndexError(str(0) + " is out of bounds, only " + str(self.cseq.seq_number) + "labels") def getGeneratingStates(self): """ Returns the state path from which the sequence was generated as a Python list. """ l_state = [] for j in range(ghmmwrapper.int_array_getitem(self.cseq.states_len,0) ): l_state.append(ghmmwrapper.int_matrix_getitem(self.cseq.states,0,j)) return l_state def __str__(self): "Defines string representation." seq = self.cseq strout = [] l = seq.getLength(0) if l <= 80: for j in range(l): strout.append(str( self.emissionDomain.external(self[j]) ) ) if self.emissionDomain.CDataType == "double": strout.append(" ") else: for j in range(0,5): strout.append(str( self.emissionDomain.external(self[j]) ) ) if self.emissionDomain.CDataType == "double": strout.append(" ") strout.append('...') for j in range(l-5,l): strout.append(str( self.emissionDomain.external(self[j]) ) ) if self.emissionDomain.CDataType == "double": strout.append(" ") return join(strout,'') def verboseStr(self): "Defines string representation." seq = self.cseq strout = [] strout.append("\nEmissionSequence Instance:\nlength " + str(seq.getLength(0))) strout.append(", weight " + str(seq.getWeight(0)) + ":\n") for j in range(seq.getLength(0)): strout.append(str( self.emissionDomain.external(self[j]) ) ) if self.emissionDomain.CDataType == "double": strout.append(" ") # checking for labels if self.emissionDomain.CDataType == "int" and self.cseq.state_labels != None: strout.append("\nState labels:\n") for j in range(seq.getLabelsLength(0)): strout.append(str( self.labelDomain.external(ghmmwrapper.int_matrix_getitem(seq.state_labels,0,j)))+ ", ") return join(strout,'') def sequenceSet(self): """ Return a one-element SequenceSet with this sequence.""" # in order to copy the sequence in 'self', we first create an empty SequenceSet and then # add 'self' seqSet = SequenceSet(self.emissionDomain, []) seqSet.cseq.add(self.cseq) return seqSet def write(self,fileName): "Writes the EmissionSequence into file 'fileName'." self.cseq.write(fileName) def setWeight(self, value): self.cseq.setWeight(0, value) self.cseq.total_w = value def getWeight(self): return self.cseq.getWeight(0) def asSequenceSet(self): """returns a one element SequenceSet""" log.debug("EmissionSequence.asSequenceSet() -- begin " + repr(self.cseq)) seq = self.sequenceAllocationFunction(1) # checking for state labels in the source C sequence struct if self.emissionDomain.CDataType == "int" and self.cseq.state_labels is not None: log.debug("EmissionSequence.asSequenceSet() -- found labels !") seq.calloc_state_labels() self.cseq.copyStateLabel(0, seq, 0) seq.setLength(0, self.cseq.getLength(0)) seq.setSequence(0, self.cseq.getSequence(0)) seq.setWeight(0, self.cseq.getWeight(0)) log.debug("EmissionSequence.asSequenceSet() -- end " + repr(seq)) return SequenceSetSubset(self.emissionDomain, seq, self) class SequenceSet(object): def __init__(self, emissionDomain, sequenceSetInput, labelDomain = None, labelInput = None): self.emissionDomain = emissionDomain self.cseq = None if self.emissionDomain.CDataType == "int": # necessary C functions for accessing the ghmm_dseq struct self.sequenceAllocationFunction = ghmmwrapper.ghmm_dseq self.allocSingleSeq = ghmmwrapper.int_array_alloc self.seq_read = ghmmwrapper.ghmm_dseq_read self.seq_ptr_array_getitem = ghmmwrapper.dseq_ptr_array_getitem self.sequence_cmatrix = ghmmhelper.list2int_matrix elif self.emissionDomain.CDataType == "double": # necessary C functions for accessing the ghmm_cseq struct self.sequenceAllocationFunction = ghmmwrapper.ghmm_cseq self.allocSingleSeq = ghmmwrapper.double_array_alloc self.seq_read = ghmmwrapper.ghmm_cseq_read self.seq_ptr_array_getitem = ghmmwrapper.cseq_ptr_array_getitem self.sequence_cmatrix = ghmmhelper.list2double_matrix else: raise NoValidCDataType, "C data type " + str(self.emissionDomain.CDataType) + " invalid." # reads in the first sequence struct in the input file if isinstance(sequenceSetInput, str) or isinstance(sequenceSetInput, unicode): if sequenceSetInput[-3:] == ".fa": # assuming FastA file: alfa = emissionDomain.toCstruct() cseq = ghmmwrapper.ghmm_dseq(sequenceSetInput, alfa) if cseq is None: raise ParseFileError("invalid FastA file: " + sequenceSetInput) self.cseq = cseq # check if ghmm is build with asci sequence file support elif not ghmmwrapper.ASCI_SEQ_FILE: raise UnsupportedFeature ("asci sequence files are deprecated. \ Please convert your files to the new xml-format or rebuild the GHMM \ with the conditional \"GHMM_OBSOLETE\".") else: if not os.path.exists(sequenceSetInput): raise IOError, 'File ' + str(sequenceSetInput) + ' not found.' else: i = ghmmwrapper.int_array_alloc(1) tmp = self.seq_read(sequenceSetInput, i) seq_number = ghmmwrapper.int_array_getitem(i, 0) if seq_number > 0: self.cseq = self.seq_ptr_array_getitem(tmp, 0) for n in range(1, seq_number): seq = self.seq_ptr_array_getitem(tmp, n) del seq else: raise ParseFileError, 'File ' + str(sequenceSetInput) + ' not valid.' ghmmwrapper.free(tmp) ghmmwrapper.free(i) elif isinstance(sequenceSetInput, list): internalInput = [self.emissionDomain.internalSequence(seq) for seq in sequenceSetInput] (seq, lengths) = self.sequence_cmatrix(internalInput) lens = ghmmhelper.list2int_array(lengths) self.cseq = self.sequenceAllocationFunction(seq, lens, len(sequenceSetInput)) if isinstance(labelInput, list) and isinstance(labelDomain, LabelDomain): assert len(sequenceSetInput)==len(labelInput), "no. of sequences and labels do not match." self.labelDomain = labelDomain internalLabels = [self.labelDomain.internalSequence(oneLabel) for oneLabel in labelInput] (label,labellen) = ghmmhelper.list2int_matrix(internalLabels) lens = ghmmhelper.list2int_array(labellen) self.cseq.init_labels(label, lens) #internal use elif isinstance(sequenceSetInput, ghmmwrapper.ghmm_dseq) or isinstance(sequenceSetInput, ghmmwrapper.ghmm_cseq): log.debug("SequenceSet.__init__()", str(sequenceSetInput)) self.cseq = sequenceSetInput if labelDomain is not None: self.labelDomain = labelDomain else: raise UnknownInputType, "inputType " + str(type(sequenceSetInput)) + " not recognized." def __del__(self): "Deallocation of C sequence struct." log.debug( "__del__ SequenceSet " + str(self.cseq)) def __str__(self): "Defines string representation." seq = self.cseq strout = ["SequenceSet (N=" + str(seq.seq_number)+")"] if seq.seq_number <= 6: iter_list = range(seq.seq_number) else: iter_list = [0,1,'X',seq.seq_number-2,seq.seq_number-1] for i in iter_list: if i == 'X': strout.append('\n\n ...\n') else: strout.append("\n seq " + str(i)+ "(len=" + str(seq.getLength(i)) + ")\n") strout.append(' '+str(self[i])) return join(strout,'') def verboseStr(self): "Defines string representation." seq = self.cseq strout = ["\nNumber of sequences: " + str(seq.seq_number)] for i in range(seq.seq_number): strout.append("\nSeq " + str(i)+ ", length " + str(seq.getLength(i))) strout.append(", weight " + str(seq.getWeight(i)) + ":\n") for j in range(seq.getLength(i)): if self.emissionDomain.CDataType == "int": strout.append(str( self.emissionDomain.external(( ghmmwrapper.int_matrix_getitem(self.cseq.seq, i, j) )) )) elif self.emissionDomain.CDataType == "double": strout.append(str( self.emissionDomain.external(( ghmmwrapper.double_matrix_getitem(self.cseq.seq, i, j) )) ) + " ") # checking for labels if self.emissionDomain.CDataType == "int" and self.cseq.state_labels != None: strout.append("\nState labels:\n") for j in range(seq.getLabelsLength(i)): strout.append(str( self.labelDomain.external(ghmmwrapper.int_matrix_getitem(seq.state_labels,i,j))) +", ") return join(strout,'') def __len__(self): """ Return the number of sequences in the SequenceSet. """ return self.cseq.seq_number def sequenceLength(self, i): """ Return the lenght of sequence 'i' in the SequenceSet """ return self.cseq.getLength(i) def getWeight(self, i): """ Return the weight of sequence i. Weights are used in Baum-Welch""" return self.cseq.getWeight(i) def setWeight(self, i, w): """ Set the weight of sequence i. Weights are used in Baum-Welch""" ghmmwrapper.double_array_setitem(self.cseq.seq_w, i, w) def __getitem__(self, index): """ Return an EmissionSequence object initialized with a reference to sequence 'index'. """ # check the index for correct range if index >= self.cseq.seq_number: raise IndexError seq = self.cseq.get_singlesequence(index) return EmissionSequence(self.emissionDomain, seq, ParentSequenceSet=self) def getSeqLabel(self,index): if not ghmmwrapper.SEQ_LABEL_FIELD:
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -