📄 classifier.py
字号:
wordstream = self._add_slurped(wordstream) self._add_msg(wordstream, is_spam) def unlearn(self, wordstream, is_spam): """In case of pilot error, call unlearn ASAP after screwing up. Pass the same arguments you passed to learn(). """ if options["Classifier", "use_bigrams"]: wordstream = self._enhance_wordstream(wordstream) if options["URLRetriever", "x-slurp_urls"]: wordstream = self._add_slurped(wordstream) self._remove_msg(wordstream, is_spam) def probability(self, record): """Compute, store, and return prob(msg is spam | msg contains word). This is the Graham calculation, but stripped of biases, and stripped of clamping into 0.01 thru 0.99. The Bayesian adjustment following keeps them in a sane range, and one that naturally grows the more evidence there is to back up a probability. """ spamcount = record.spamcount hamcount = record.hamcount # Try the cache first try: return self.probcache[spamcount][hamcount] except KeyError: pass nham = float(self.nham or 1) nspam = float(self.nspam or 1) assert hamcount <= nham, "Token seen in more ham than ham trained." hamratio = hamcount / nham assert spamcount <= nspam, "Token seen in more spam than spam trained." spamratio = spamcount / nspam prob = spamratio / (hamratio + spamratio) S = options["Classifier", "unknown_word_strength"] StimesX = S * options["Classifier", "unknown_word_prob"] # Now do Robinson's Bayesian adjustment. # # s*x + n*p(w) # f(w) = -------------- # s + n # # I find this easier to reason about like so (equivalent when # s != 0): # # x - p # p + ------- # 1 + n/s # # IOW, it moves p a fraction of the distance from p to x, and # less so the larger n is, or the smaller s is. n = hamcount + spamcount prob = (StimesX + n * prob) / (S + n) # Update the cache try: self.probcache[spamcount][hamcount] = prob except KeyError: self.probcache[spamcount] = {hamcount: prob} return prob # NOTE: Graham's scheme had a strange asymmetry: when a word appeared # n>1 times in a single message, training added n to the word's hamcount # or spamcount, but predicting scored words only once. Tests showed # that adding only 1 in training, or scoring more than once when # predicting, hurt under the Graham scheme. # This isn't so under Robinson's scheme, though: results improve # if training also counts a word only once. The mean ham score decreases # significantly and consistently, ham score variance decreases likewise, # mean spam score decreases (but less than mean ham score, so the spread # increases), and spam score variance increases. # I (Tim) speculate that adding n times under the Graham scheme helped # because it acted against the various ham biases, giving frequently # repeated spam words (like "Viagra") a quick ramp-up in spamprob; else, # adding only once in training, a word like that was simply ignored until # it appeared in 5 distinct training spams. Without the ham-favoring # biases, though, and never ignoring words, counting n times introduces # a subtle and unhelpful bias. # There does appear to be some useful info in how many times a word # appears in a msg, but distorting spamprob doesn't appear a correct way # to exploit it. def _add_msg(self, wordstream, is_spam): self.probcache = {} # nuke the prob cache if is_spam: self.nspam += 1 else: self.nham += 1 for word in Set(wordstream): record = self._wordinfoget(word) if record is None: record = self.WordInfoClass() if is_spam: record.spamcount += 1 else: record.hamcount += 1 self._wordinfoset(word, record) self._post_training() def _remove_msg(self, wordstream, is_spam): self.probcache = {} # nuke the prob cache if is_spam: if self.nspam <= 0: raise ValueError("spam count would go negative!") self.nspam -= 1 else: if self.nham <= 0: raise ValueError("non-spam count would go negative!") self.nham -= 1 for word in Set(wordstream): record = self._wordinfoget(word) if record is not None: if is_spam: if record.spamcount > 0: record.spamcount -= 1 else: if record.hamcount > 0: record.hamcount -= 1 if record.hamcount == 0 == record.spamcount: self._wordinfodel(word) else: self._wordinfoset(word, record) self._post_training() def _post_training(self): """This is called after training on a wordstream. Subclasses might want to ensure that their databases are in a consistent state at this point. Introduced to fix bug #797890.""" pass # Return list of (prob, word, record) triples, sorted by increasing # prob. "word" is a token from wordstream; "prob" is its spamprob (a # float in 0.0 through 1.0); and "record" is word's associated # WordInfo record if word is in the training database, or None if it's # not. No more than max_discriminators items are returned, and have # the strongest (farthest from 0.5) spamprobs of all tokens in wordstream. # Tokens with spamprobs less than minimum_prob_strength away from 0.5 # aren't returned. def _getclues(self, wordstream): mindist = options["Classifier", "minimum_prob_strength"] if options["Classifier", "use_bigrams"]: # This scheme mixes single tokens with pairs of adjacent tokens. # wordstream is "tiled" into non-overlapping unigrams and # bigrams. Non-overlap is important to prevent a single original # token from contributing to more than one spamprob returned # (systematic correlation probably isn't a good thing). # First fill list raw with # (distance, prob, word, record), indices # pairs, one for each unigram and bigram in wordstream. # indices is a tuple containing the indices (0-based relative to # the start of wordstream) of the tokens that went into word. # indices is a 1-tuple for an original token, and a 2-tuple for # a synthesized bigram token. The indices are needed to detect # overlap later. raw = [] push = raw.append pair = None # Keep track of which tokens we've already seen. # Don't use a Set here! This is an innermost loop, so speed is # important here (direct dict fiddling is much quicker than # invoking Python-level Set methods; in Python 2.4 that will # change). seen = {pair: 1} # so the bigram token is skipped on 1st loop trip for i, token in enumerate(wordstream): if i: # not the 1st loop trip, so there is a preceding token # This string interpolation must match the one in # _enhance_wordstream(). pair = "bi:%s %s" % (last_token, token) last_token = token for clue, indices in (token, (i,)), (pair, (i-1, i)): if clue not in seen: # as always, skip duplicates seen[clue] = 1 tup = self._worddistanceget(clue) if tup[0] >= mindist: push((tup, indices)) # Sort raw, strongest to weakest spamprob. raw.sort() raw.reverse() # Fill clues with the strongest non-overlapping clues. clues = [] push = clues.append # Keep track of which indices have already contributed to a # clue in clues. seen = {} for tup, indices in raw: overlap = [i for i in indices if i in seen] if not overlap: # no overlap with anything already in clues for i in indices: seen[i] = 1 push(tup) # Leave sorted from smallest to largest spamprob. clues.reverse() else: # The all-unigram scheme just scores the tokens as-is. A Set() # is used to weed out duplicates at high speed. clues = [] push = clues.append for word in Set(wordstream): tup = self._worddistanceget(word) if tup[0] >= mindist: push(tup) clues.sort() if len(clues) > options["Classifier", "max_discriminators"]: del clues[0 : -options["Classifier", "max_discriminators"]] # Return (prob, word, record). return [t[1:] for t in clues] def _worddistanceget(self, word): record = self._wordinfoget(word) if record is None: prob = options["Classifier", "unknown_word_prob"] else: prob = self.probability(record) distance = abs(prob - 0.5) return distance, prob, word, record def _wordinfoget(self, word): return self.wordinfo.get(word) def _wordinfoset(self, word, record): self.wordinfo[word] = record def _wordinfodel(self, word): del self.wordinfo[word] def _enhance_wordstream(self, wordstream): """Add bigrams to the wordstream. For example, a b c -> a b "a b" c "b c" Note that these are *token* bigrams, and not *word* bigrams - i.e. 'synthetic' tokens get bigram'ed, too. The bigram token is simply "bi:unigram1 unigram2" - a space should be sufficient as a separator, since spaces aren't in any other tokens, apart from 'synthetic' ones. The "bi:" prefix is added to avoid conflict with tokens we generate (like "subject: word", which could be "word" in a subject, or a bigram of "subject:" and "word"). If the "Classifier":"use_bigrams" option is removed, this function can be removed, too. """ last = None for token in wordstream: yield token if last: # This string interpolation must match the one in # _getclues(). yield "bi:%s %s" % (last, token) last = token
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -