⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 userinterface.py

📁 用python实现的邮件过滤器
💻 PY
📖 第 1 页 / 共 4 页
字号:
            else:                nham = nspam = "-"            if wordProb is None:                wordProb = "-"            else:                wordProb = round(float(wordProb), accuracy)            cluesTable += cluesRow % (cgi.escape(word), wordProb,                                      nham, nspam)        return cluesTable    def _buildCluesTable(self, message, subject=None, show_tokens=False):        tokens = list(tokenizer.tokenize(message))        if show_tokens:            clues = [(tok, None) for tok in tokens]            probability = self.classifier.spamprob(tokens)            cluesTable = self._fillCluesTable(clues)            head_name = _("Tokens")        else:            (probability, clues) = self.classifier.spamprob(tokens, evidence=True)            cluesTable = self._fillCluesTable(clues)            head_name = _("Clues")        results = self.html.classifyResults.clone()        results.probability = "%.2f%% (%s)" % (probability*100, probability)        if subject is None:            heading = "%s: (%s)" % (head_name, len(clues))        else:            heading = "%s for: %s (%s)" % (head_name, subject, len(clues))        results.cluesBox = self._buildBox(heading, 'status.gif', cluesTable)        if not show_tokens:            mo = self.sc_re.search(message)            if mo:                # Also display the score the message received when it was                # classified.                prob = float(mo.group(1).strip())                results.orig_prob_num = "%.2f%% (%s)" % (prob*100, prob)            else:                del results.orig_prob            mo = self.ev_re.search(message)            if mo:                # Also display the clues as they were when the message was                # classified.                clues = []                evidence = re.findall(r"'(.+?)': ([^;]+)(?:;|$)", mo.group(1))                for word, prob in evidence:                    clues.append((word, prob))                cluesTable = self._fillCluesTable(clues)                if subject is None:                    heading = _("Original clues: (%s)") % (len(evidence),)                else:                    heading = _("Original clues for: %s (%s)") % \                              (subject, len(evidence),)                orig_results = self._buildBox(heading, 'status.gif',                                              cluesTable)                results.cluesBox += orig_results        else:            del results.orig_prob        return results    def onWordquery(self, word, query_type=_("basic"), max_results='10',                    ignore_case=False):        # It would be nice if the default value for max_results here        # always matched the value in ui.html.        try:            max_results = int(max_results)        except ValueError:            # Ignore any invalid number, like "foo"            max_results = 10        original_word = word        query = self.html.wordQuery.clone()        query.word.value = "%s" % (word,)        for q_type in [query.advanced.basic,                       query.advanced.wildcard,                       query.advanced.regex]:            if query_type == q_type.id:                q_type.checked = 'checked'                if query_type != _("basic"):                    del query.advanced.max_results.disabled        if ignore_case:            query.advanced.ignore_case.checked = 'checked'        query.advanced.max_results.value = str(max_results)        queryBox = self._buildBox(_("Word query"), 'query.gif', query)        if not options["html_ui", "display_adv_find"]:            del queryBox.advanced        stats = []        if word == "":            stats.append(_("You must enter a word."))        elif query_type == _("basic") and not ignore_case:            wordinfo = self.classifier._wordinfoget(word)            if wordinfo:                stat = (word, wordinfo.spamcount, wordinfo.hamcount,                        self.classifier.probability(wordinfo))            else:                stat = _("%r does not exist in the database.") % \                       cgi.escape(word)            stats.append(stat)        else:            if query_type != _("regex"):                word = re.escape(word)            if query_type == _("wildcard"):                word = word.replace("\\?", ".")                word = word.replace("\\*", ".*")            flags = 0            if ignore_case:                flags = re.IGNORECASE            r = re.compile(word, flags)            reached_limit = False            for w in self.classifier._wordinfokeys():                if not reached_limit and len(stats) >= max_results:                    reached_limit = True                    over_limit = 0                if r.match(w):                    if reached_limit:                        over_limit += 1                    else:                        wordinfo = self.classifier._wordinfoget(w)                        stat = (w, wordinfo.spamcount, wordinfo.hamcount,                                self.classifier.probability(wordinfo))                        stats.append(stat)            if len(stats) == 0 and max_results > 0:                stat = _("There are no words that begin with '%s' " \                         "in the database.") % (word,)                stats.append(stat)            elif reached_limit:                stat = _("Additional tokens not shown: %d") % (over_limit,)                stats.append(stat)        self._writePreamble(_("Word query"))        if len(stats) == 1:            if isinstance(stat, types.TupleType):                stat = self.html.wordStats.clone()                word = stats[0][0]                stat.spamcount = stats[0][1]                stat.hamcount = stats[0][2]                stat.spamprob = "%.6f" % stats[0][3]            else:                stat = stats[0]                word = original_word            row = self._buildBox(_("Statistics for '%s'") % \                                 cgi.escape(word), 'status.gif', stat)            self.write(row)        else:            page = self.html.multiStats.clone()            page.multiTable = "" # make way for the real rows            page.multiTable += self.html.multiHeader.clone()            stripe = 0            for stat in stats:                if isinstance(stat, types.TupleType):                    row = self.html.statsRow.clone()                    row.word, row.spamcount, row.hamcount = stat[:3]                    row.spamprob = "%.6f" % stat[3]                    setattr(row, 'class', ['stripe_on', 'stripe_off'][stripe])                    stripe = stripe ^ 1                    page.multiTable += row                else:                    self.write(self._buildBox(_("Statistics for '%s'") % \                                              cgi.escape(original_word),                                              'status.gif', stat))            self.write(self._buildBox(_("Statistics for '%s'") % \                                      cgi.escape(original_word), 'status.gif',                                      page))        self.write(queryBox)        self._writePostamble()    def onTrain(self, file, text, which):        """Train on an uploaded or pasted message."""        self._writePreamble(_("Train"))        # Upload or paste?  Spam or ham?        content = file or text        isSpam = (which == _('Train as Spam'))        # Attempt to convert the content from a DBX file to a standard mbox        if file:            content = self._convertToMbox(content)        # Convert platform-specific line endings into unix-style.        content = content.replace('\r\n', '\n').replace('\r', '\n')        # The upload might be a single message or a dbx/mbox file.        messages = self._convertUploadToMessageList(content)        # Add the messages(s) to the appropriate corpus.  This means        # that we can rebuild the database later, if desired (as long as        # they haven't expired), and can search for the messages later        # (and even correct training).  This also takes care of training        # the messages.        # This replaces the 1.0.x practice of opening a        # "_pop3proxyham.mbox" or "_pop3proxyspam.mbox" in the CWD and        # placing them there.        if isSpam:            desired_corpus = "spamCorpus"        else:            desired_corpus = "hamCorpus"        if hasattr(self, desired_corpus):            corpus = getattr(self, desired_corpus)        else:            if hasattr(self, "state"):                # sb_server (exists in state)                corpus = getattr(self.state, desired_corpus)                setattr(self, desired_corpus, corpus)                self.msg_name_func = self.state.getNewMessageName            else:                # sb_imapfilter (need to create)                if isSpam:                    fn = storage.get_pathname_option("Storage",                                                     "spam_cache")                else:                    fn = storage.get_pathname_option("Storage",                                                     "ham_cache")                storage.ensureDir(fn)                if options["Storage", "cache_use_gzip"]:                    factory = FileCorpus.GzipFileMessageFactory()                else:                    factory = FileCorpus.FileMessageFactory()                age = options["Storage", "cache_expiry_days"]*24*60*60                corpus = FileCorpus.ExpiryFileCorpus(age, factory, fn,                                          '[0123456789\-]*', cacheSize=20)                setattr(self, desired_corpus, corpus)                # We need a function to create a new name for the message                # as sb_imapfilter doesn't have one.                class UniqueNamer(object):                    count = -1                    def generate_name(self):                        self.count += 1                        return "%10.10d-%d" % (long(time.time()), self.count)                Namer = UniqueNamer()                self.msg_name_func = Namer.generate_name        # Train on the uploaded message(s).        self.write("<b>" + _("Training") + "...</b>\n")        self.flush()        for message in messages:            key = self.msg_name_func()            msg = corpus.makeMessage(key, message)            msg.setId(key)            corpus.addMessage(msg)            msg.RememberTrained(isSpam)            self.stats.RecordTraining(not isSpam)        # Save the database and return a link Home and another training        # form.        self._doSave()        self.write(_("%sOK. Return %sHome%s or train again:%s") %                   ("<p>", "<a href='home'>", "</a", "</p>"))        self.write(self._buildTrainBox())        self._writePostamble()    def _convertToMbox(self, content):        """Check if the given buffer is in a non-mbox format, and convert it        into mbox format if so.  If it's already an mbox, return it unchanged.        Currently, the only supported non-mbox format is Outlook Express DBX.        In such a case we use the module oe_mailbox to convert the DBX        content into a standard mbox file.  Testing if the file is a        DBX one is very quick (just a matter of checking the first few        bytes), and should not alter the overall performance."""        content = oe_mailbox.convertToMbox(content)        return content    def _convertUploadToMessageList(self, content):        """Returns a list of raw messages extracted from uploaded content.        You can upload either a single message or an mbox file."""        if content.startswith('From '):            # Get a list of raw messages from the mbox content.            class SimpleMessage:                def __init__(self, fp):                    self.guts = fp.read()            contentFile = StringIO.StringIO(content)            mbox = mailbox.PortableUnixMailbox(contentFile, SimpleMessage)            return map(lambda m: m.guts, mbox)        else:            # Just the one message.            return [content]    def _doSave(self):        """Saves the database."""        self.write("<b>" + _("Saving..."))        self.flush()        self.classifier.store()        self.write(_("Done.") + "</b>\n")    def onSave(self, how):        """Command handler for "Save" and "Save & shutdown"."""        isShutdown = how.lower().find('shutdown') >= 0        self._writePreamble(_("Save"), showImage=(not isShutdown))        self._doSave()        if isShutdown:            self.write("<p>%s</p>" % self.html.shutdownMessage)            self.write("</div></body></html>")            self.flush()            ## Is this still required?: self.shutdown(2)            self.close()            raise SystemExit        self._writePostamble()    def _buildClassifyBox(self):        """Returns a "Classify a message" box.  This is used on both the Home        page and the classify results page.  The Classify form is based on the        Upload form."""        form = self.html.upload.clone()        del form.or_mbox        del form.submit_spam        del form.submit_ham        form.action = "classify"        return self._buildBox(_("Classify a message"), 'classify.gif', form)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -