📄 ghmm.py
字号:
else: self.listOfCharacters = [calphabet.getSymbol(i) for i in range(calphabet.size)] for i,c in enumerate(self.listOfCharacters): self.index[c] = i lens = {} try: for c in self.listOfCharacters: lens[len(c)] = 1 except TypeError: self._lengthOfCharacters = None else: if len(lens) == 1: self._lengthOfCharacters = lens.keys()[0] else: self._lengthOfCharacters = None self.CDataType = "int" # flag indicating which C data type should be used def __str__(self): strout = ["<Alphabet:"] strout.append( str(self.listOfCharacters) +'>') return join(strout,'') def verboseStr(self): strout = ["GHMM Alphabet:\n"] strout.append("Number of symbols: " + str(len(self)) + "\n") strout.append("External: " + str(self.listOfCharacters) + "\n") strout.append("Internal: " + str(range(len(self))) + "\n") return join(strout,'') def __eq__(self,alph): if not isinstance(alph,Alphabet): return False else: if self.listOfCharacters == alph.listOfCharacters and self.index == alph.index and self.CDataType==alph.CDataType: return True else: return False def __len__(self): return len(self.listOfCharacters) def __hash__(self): #XXX rewrite # defining hash and eq is not recommended for mutable types. # => listOfCharacters should be considered imutable return id(self) def size(self): #XXX """ Deprecated """ log.warning( "Warning: The use of .size() is deprecated. Use len() instead.") return len(self.listOfCharacters) def internal(self, emission): """ Given a emission return the internal representation """ return self.index[emission] def internalSequence(self, emissionSequence): """ Given a emission_sequence return the internal representation Raises KeyError """ result = copy.deepcopy(emissionSequence) try: result = map(lambda i: self.index[i], result) except IndexError: raise KeyError return result def external(self, internal): """ Given an internal representation return the external representation Note: the internal code -1 always represents a gap character '-' Raises KeyError """ if internal == -1: return "-" if internal < -1 or len(self.listOfCharacters) < internal: raise KeyError, "Internal symbol "+str(internal)+" not recognized." return self.listOfCharacters[internal] def externalSequence(self, internalSequence): """ Given a sequence with the internal representation return the external representation Raises KeyError """ result = copy.deepcopy(internalSequence) try: result = map(lambda i: self.listOfCharacters[i], result) except IndexError: raise KeyError return result def isAdmissable(self, emission): """ Check whether emission is admissable (contained in) the domain """ return emission in self.listOfCharacters def getExternalCharacterLength(self): """ If all external characters are of the same length the length is returned. Otherwise None. @return: length of the external characters or None """ return self._lengthOfCharacters def toCstruct(self): calphabet = ghmmwrapper.ghmm_alphabet(len(self), "<unused>") for i,symbol in enumerate(self.listOfCharacters): calphabet.setSymbol(i, str(symbol)) return calphabetDNA = Alphabet(['a','c','g','t'])AminoAcids = Alphabet(['A','C','D','E','F','G','H','I','K','L', 'M','N','P','Q','R','S','T','V','W','Y'])def IntegerRange(a,b): return Alphabet(range(a,b))# To be used for labelled HMMs. We could use an Alphabet directly but this way it is more explicit.class LabelDomain(Alphabet): def __init__(self, listOfLabels, calphabet = None): Alphabet.__init__(self, listOfLabels, calphabet)class Float(EmissionDomain): def __init__(self): self.CDataType = "double" # flag indicating which C data type should be used def __eq__(self, other): return isinstance(other, Float) def __hash__(self): # defining hash and eq is not recommended for mutable types. # for float it is fine because it is kind of state less return id(self) def isAdmissable(self, emission): """ Check whether emission is admissable (contained in) the domain raises GHMMOutOfDomain else """ return isinstance(emission,float)#-------------------------------------------------------------------------------#- Distribution and derived ---------------------------------------------------class Distribution(object): """ Abstract base class for distribution over EmissionDomains """ # add density, mass, cumuliative dist, quantils, sample, fit pars, # momentsclass DiscreteDistribution(Distribution): """ A DiscreteDistribution over an Alphabet: The discrete distribution is parameterized by the vectors of probabilities. """ def __init__(self, alphabet): self.alphabet = alphabet self.prob_vector = None def set(self, prob_vector): self.prob_vector = prob_vector def get(self): return self.prob_vectorclass ContinuousDistribution(Distribution): passclass UniformDistribution(ContinuousDistribution): def __init__(self, domain): self.emissionDomain = domain self.max = None self.min = None def set(self, (max, min)): self.max = max self.min = min def get(self): return (self.max, self.min)class GaussianDistribution(ContinuousDistribution): # XXX attributes unused at this point def __init__(self, domain): self.emissionDomain = domain self.mu = None self.sigma = None def set(self, (mu, sigma)): self.mu = mu self.sigma = sigma def get(self): return (self.mu, self.sigma)class TruncGaussianDistribution(GaussianDistribution): # XXX attributes unused at this point def __init__(self, domain): self.GaussianDistribution(self,domain) self.trunc = None def set(self, (mu, sigma,trunc)): self.mu = mu self.sigma = sigma self.trunc = trunc def get(self): return (self.mu, self.sigma, self.trunc) class GaussianMixtureDistribution(ContinuousDistribution): # XXX attributes unused at this point def __init__(self, domain): self.emissionDomain = domain self.M = None # number of mixture components self.mu = [] self.sigma = [] self.weight = [] def set(self, index, (mu, sigma,w)): pass def get(self): passclass ContinuousMixtureDistribution(ContinuousDistribution): def __init__(self, domain): self.emissionDomain = domain self.M = 0 # number of mixture components self.components = [] self.weight = [] self.fix = [] def add(self,w,fix,distribution): assert isinstance(distribution,ContinuousDistribution) self.M = self.M + 1 self.weight.append(w) self.component.append(distribution) if isinstance(distribution,UniformDistribution): # uniform distributions are fixed by definition self.fix.append(1) else: self.fix.append(fix) def set(self, index, w, fix, distribution): if index >= M: raise IndexError assert isinstance(distribution,ContinuousDistribution) self.weight[i] = w self.components[i] = distribution if isinstance(distribution,UniformDistribution): # uniform distributions are fixed by definition self.fix[i](1) else: self.fix[i](fix) def get(self,i): assert M > i return (self.weigth[i],self.fix[i],self.component[i]) def check(self): assert self.M == len(self.components) assert sum(self.weight) == 1 assert sum(self.weight > 1) == 0 assert sum(self.weight < 0) == 0 #-------------------------------------------------------------------------------#Sequence, SequenceSet and derived ------------------------------------------class EmissionSequence(object): """ An EmissionSequence contains the *internal* representation of a sequence of emissions. It also contains a reference to the domain where the emission orginated from. """ def __init__(self, emissionDomain, sequenceInput, labelDomain = None, labelInput = None, ParentSequenceSet=None): self.emissionDomain = emissionDomain if ParentSequenceSet is not None: # optional reference to a parent SequenceSet. Is needed for reference counting if not isinstance(ParentSequenceSet,SequenceSet): raise TypeError("Invalid reference. Only SequenceSet is valid.") self.ParentSequenceSet = ParentSequenceSet if self.emissionDomain.CDataType == "int": # necessary C functions for accessing the ghmm_dseq struct self.sequenceAllocationFunction = ghmmwrapper.ghmm_dseq self.allocSingleSeq = ghmmwrapper.int_array_alloc self.seq_read = ghmmwrapper.ghmm_dseq_read self.seq_ptr_array_getitem = ghmmwrapper.dseq_ptr_array_getitem self.sequence_carray = ghmmhelper.list2int_array elif self.emissionDomain.CDataType == "double": # necessary C functions for accessing the ghmm_cseq struct self.sequenceAllocationFunction = ghmmwrapper.ghmm_cseq self.allocSingleSeq = ghmmwrapper.double_array_alloc self.seq_read = ghmmwrapper.ghmm_cseq_read self.seq_ptr_array_getitem = ghmmwrapper.cseq_ptr_array_getitem self.sequence_carray = ghmmhelper.list2double_array else: raise NoValidCDataType, "C data type " + str(self.emissionDomain.CDataType) + " invalid." # check if ghmm is build with asci sequence file support if isinstance(sequenceInput, str) or isinstance(sequenceInput, unicode): if ghmmwrapper.ASCI_SEQ_FILE: if not os.path.exists(sequenceInput): raise IOError, 'File ' + str(sequenceInput) + ' not found.' else: tmp, seq_number = self.seq_read(sequenceInput) if seq_number > 0: self.cseq = self.seq_ptr_array_getitem(tmp, 0) for n in range(1, seq_number): seq = self.seq_ptr_array_getitem(tmp, n) del seq else: raise ParseFileError('File ' + str(sequenceInput) + ' not valid.')
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -