⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sb_imapfilter.py

📁 用python实现的邮件过滤器
💻 PY
📖 第 1 页 / 共 4 页
字号:
                else:                    msg.MoveTo(unsurefolder)                    count["unsure"] += 1                msg.Save()        return countclass IMAPFilter(object):    def __init__(self, classifier, stats):        self.spam_folder = None        self.unsure_folder = None        self.ham_folder = None        self.classifier = classifier        self.imap_server = None        self.stats = stats    def Train(self):        assert self.imap_server, "Cannot do anything without IMAP server."                if options["globals", "verbose"]:            t = time.time()        total_trained = 0        for is_spam, option_name in [(False, "ham_train_folders"),                                     (True, "spam_train_folders")]:            training_folders = options["imap", option_name]            for fol in training_folders:                # Select the folder to make sure it exists                try:                    self.imap_server.SelectFolder(fol)                except BadIMAPResponseError:                    print "Skipping %s, as it cannot be selected." % (fol,)                    continue                if options['globals', 'verbose']:                    print "   Training %s folder %s" % \                          (["ham", "spam"][is_spam], fol)                folder = IMAPFolder(fol, self.imap_server, self.stats)                num_trained = folder.Train(self.classifier, is_spam)                total_trained += num_trained                if options['globals', 'verbose']:                    print "\n       %s trained." % (num_trained,)        if total_trained:            self.classifier.store()        if options["globals", "verbose"]:            print "Training took %.4f seconds, %s messages were trained." \                  % (time.time() - t, total_trained)    def Filter(self):        assert self.imap_server, "Cannot do anything without IMAP server."        if not self.spam_folder:            self.spam_folder = IMAPFolder(options["imap", "spam_folder"],                                          self.imap_server, self.stats)        if not self.unsure_folder:            self.unsure_folder = IMAPFolder(options["imap",                                                    "unsure_folder"],                                            self.imap_server, self.stats)        ham_folder_name = options["imap", "ham_folder"]        if ham_folder_name and not self.ham_folder:            self.ham_folder = IMAPFolder(ham_folder_name, self.imap_server,                                         self.stats)        if options["globals", "verbose"]:            t = time.time()        count = {}        count["ham"] = 0        count["spam"] = 0        count["unsure"] = 0        # Select the ham, spam and unsure folders to make sure they exist.        try:            self.imap_server.SelectFolder(self.spam_folder.name)        except BadIMAPResponseError:            print "Cannot select spam folder.  Please check configuration."            sys.exit(-1)        try:            self.imap_server.SelectFolder(self.unsure_folder.name)        except BadIMAPResponseError:            print "Cannot select spam folder.  Please check configuration."            sys.exit(-1)        if self.ham_folder:            try:                self.imap_server.SelectFolder(self.ham_folder.name)            except BadIMAPResponseError:                print "Cannot select ham folder.  Please check configuration."                sys.exit(-1)                        for filter_folder in options["imap", "filter_folders"]:            # Select the folder to make sure it exists.            try:                self.imap_server.SelectFolder(filter_folder)            except BadIMAPResponseError:                print "Cannot select %s, skipping." % (filter_folder,)                continue            folder = IMAPFolder(filter_folder, self.imap_server, self.stats)            subcount = folder.Filter(self.classifier, self.spam_folder,                                     self.unsure_folder, self.ham_folder)            for key in count.keys():                count[key] += subcount.get(key, 0)        if options["globals", "verbose"]:            if count is not None:                print "\nClassified %s ham, %s spam, and %s unsure." % \                      (count["ham"], count["spam"], count["unsure"])            print "Classifying took %.4f seconds." % (time.time() - t,)def run(force_UI=False):    try:        opts, args = getopt.getopt(sys.argv[1:], 'hbPtcvl:e:i:d:p:o:')    except getopt.error, msg:        print >>sys.stderr, str(msg) + '\n\n' + __doc__        sys.exit()    doTrain = False    doClassify = False    doExpunge = options["imap", "expunge"]    imapDebug = 0    sleepTime = 0    promptForPass = False    launchUI = False    servers = ""    usernames = ""    for opt, arg in opts:        if opt == '-h':            print >>sys.stderr, __doc__            sys.exit()        elif opt == "-b":            launchUI = True        elif opt == '-t':            doTrain = True        elif opt == '-P':            promptForPass = True        elif opt == '-c':            doClassify = True        elif opt == '-v':            options["globals", "verbose"] = True        elif opt == '-e':            if arg == 'y':                doExpunge = True            else:                doExpunge = False        elif opt == '-i':            imapDebug = int(arg)        elif opt == '-l':            sleepTime = int(arg) * 60        elif opt == '-o':            options.set_from_cmdline(arg, sys.stderr)    bdbname, useDBM = storage.database_type(opts)    # Let the user know what they are using...    v = get_current_version();    print "%s.\n" % (v.get_long_version("SpamBayes IMAP Filter"),)    if options["globals", "verbose"]:        print "Loading database %s..." % (bdbname),    classifier = storage.open_storage(bdbname, useDBM)    message_db = message.Message().message_info_db    if options["globals", "verbose"]:        print "Done."    if options["imap", "server"]:        servers = options["imap", "server"]        usernames = options["imap", "username"]        if not promptForPass:            pwds = options["imap", "password"]    else:        pwds = None        if not launchUI and not force_UI:            print "You need to specify both a server and a username."            sys.exit()    if promptForPass:        pwds = []        for i in xrange(len(usernames)):            pwds.append(getpass("Enter password for %s:" % (usernames[i],)))    servers_data = []    for server, username, password in zip(servers, usernames, pwds or []):        if server.find(':') > -1:            server, port = server.split(':', 1)            port = int(port)        else:            if options["imap", "use_ssl"]:                port = 993            else:                port = 143        servers_data.append((server, port, username, password))    # Load stats manager.    stats = Stats.Stats(options, message_db)        imap_filter = IMAPFilter(classifier, stats)    # Web interface.  We have changed the rules about this many times.    # With 1.0.x, the rule is that the interface is served if we are    # not classifying or training.  However, this runs into the problem    # that if we run with -l, we might still want to edit the options,    # and we don't want to start a separate instance, because then the    # database is accessed from two processes.    # With 1.1.x, the rule is that the interface is also served if the    # -l option is used, which means it is only not served if we are    # doing a one-off classification/train.  In that case, there would    # probably not be enough time to get to the interface and interact    # with it (and we don't want it to die halfway through!), and we    # don't want to slow classification/training down, either.    if sleepTime or not (doClassify or doTrain):        imaps = []        for server, port, username, password in servers_data:            if server == "":                imaps.append(None)            else:                imaps.append(IMAPSession(server, port, imapDebug, doExpunge))        def close_db():            message_db.store()            message_db.close()            message.Message().message_info_db.store()            message.Message().message_info_db.close()            message.Message.message_info_db = None            classifier.store()            classifier.close()        def change_db():            classifier = storage.open_storage(*storage.database_type(opts))            message.Message.message_info_db = message_db            imap_filter = IMAPFilter(classifier, message_db)        httpServer = UserInterfaceServer(options["html_ui", "port"])        httpServer.register(IMAPUserInterface(classifier, imaps, pwds,                                              IMAPSession, stats=stats,                                              close_db=close_db,                                              change_db=change_db))        launchBrowser=launchUI or options["html_ui", "launch_browser"]        if sleepTime:            # Run in a separate thread, as we have more work to do.            thread.start_new_thread(Dibbler.run, (),                                    {"launchBrowser":launchBrowser})        else:            Dibbler.run(launchBrowser=launchBrowser)    if doClassify or doTrain:        imaps = []        for server, port, username, password in servers_data:            imaps.append(((server, port, imapDebug, doExpunge),                          username, password))        # In order to make working with multiple servers easier, we        # allow the user to have separate configuration files for each        # server.  These may specify different folders to watch, different        # spam/unsure folders, or any other options (e.g. thresholds).        # For each server we use the default (global) options, and load        # the specific options on top.  To facilitate this, we use a        # restore point for the options with just the default (global)        # options.        # XXX What about when we are running with -l and change options        # XXX via the web interface?  We need to handle that, really.        options.set_restore_point()        while True:            for (server, port, imapDebug, doExpunge), username, password in imaps:                imap = IMAPSession(server, port, imapDebug, doExpunge)                if options["globals", "verbose"]:                    print "Account: %s:%s" % (imap.server, imap.port)                if imap.connected:                    # As above, we load a separate configuration file                    # for each server, if it exists.  We look for a                    # file in the optionsPathname directory, with the                    # name server.name.ini or .spambayes_server_name_rc                    # XXX While 1.1 is in alpha these names can be                    # XXX changed if desired.  Please let Tony know!                    basedir = os.path.dirname(optionsPathname)                    fn1 = os.path.join(basedir, imap.server + ".ini")                    fn2 = os.path.join(basedir,                                       imap.server.replace(".", "_") + \                                       "_rc")                    for fn in (fn1, fn2):                        if os.path.exists(fn):                            options.merge_file(fn)                    try:                                            imap.login(username, password)                    except LoginFailure, e:                        print str(e)                        continue                    imap_filter.imap_server = imap                    if doTrain:                        if options["globals", "verbose"]:                            print "Training"                        imap_filter.Train()                    if doClassify:                        if options["globals", "verbose"]:                            print "Classifying"                        imap_filter.Filter()                    imap.logout()                    options.revert_to_restore_point()                else:                    # Failed to connect.  This may be a temporary problem,                    # so just continue on and try again.  If we are only                    # running once we will end, otherwise we'll try again                    # in sleepTime seconds.                    # XXX Maybe we should log this error message?                    pass            if sleepTime:                time.sleep(sleepTime)            else:                breakif __name__ == '__main__':    run()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -