📄 sb_imapfilter.py
字号:
else: msg.MoveTo(unsurefolder) count["unsure"] += 1 msg.Save() return countclass IMAPFilter(object): def __init__(self, classifier, stats): self.spam_folder = None self.unsure_folder = None self.ham_folder = None self.classifier = classifier self.imap_server = None self.stats = stats def Train(self): assert self.imap_server, "Cannot do anything without IMAP server." if options["globals", "verbose"]: t = time.time() total_trained = 0 for is_spam, option_name in [(False, "ham_train_folders"), (True, "spam_train_folders")]: training_folders = options["imap", option_name] for fol in training_folders: # Select the folder to make sure it exists try: self.imap_server.SelectFolder(fol) except BadIMAPResponseError: print "Skipping %s, as it cannot be selected." % (fol,) continue if options['globals', 'verbose']: print " Training %s folder %s" % \ (["ham", "spam"][is_spam], fol) folder = IMAPFolder(fol, self.imap_server, self.stats) num_trained = folder.Train(self.classifier, is_spam) total_trained += num_trained if options['globals', 'verbose']: print "\n %s trained." % (num_trained,) if total_trained: self.classifier.store() if options["globals", "verbose"]: print "Training took %.4f seconds, %s messages were trained." \ % (time.time() - t, total_trained) def Filter(self): assert self.imap_server, "Cannot do anything without IMAP server." if not self.spam_folder: self.spam_folder = IMAPFolder(options["imap", "spam_folder"], self.imap_server, self.stats) if not self.unsure_folder: self.unsure_folder = IMAPFolder(options["imap", "unsure_folder"], self.imap_server, self.stats) ham_folder_name = options["imap", "ham_folder"] if ham_folder_name and not self.ham_folder: self.ham_folder = IMAPFolder(ham_folder_name, self.imap_server, self.stats) if options["globals", "verbose"]: t = time.time() count = {} count["ham"] = 0 count["spam"] = 0 count["unsure"] = 0 # Select the ham, spam and unsure folders to make sure they exist. try: self.imap_server.SelectFolder(self.spam_folder.name) except BadIMAPResponseError: print "Cannot select spam folder. Please check configuration." sys.exit(-1) try: self.imap_server.SelectFolder(self.unsure_folder.name) except BadIMAPResponseError: print "Cannot select spam folder. Please check configuration." sys.exit(-1) if self.ham_folder: try: self.imap_server.SelectFolder(self.ham_folder.name) except BadIMAPResponseError: print "Cannot select ham folder. Please check configuration." sys.exit(-1) for filter_folder in options["imap", "filter_folders"]: # Select the folder to make sure it exists. try: self.imap_server.SelectFolder(filter_folder) except BadIMAPResponseError: print "Cannot select %s, skipping." % (filter_folder,) continue folder = IMAPFolder(filter_folder, self.imap_server, self.stats) subcount = folder.Filter(self.classifier, self.spam_folder, self.unsure_folder, self.ham_folder) for key in count.keys(): count[key] += subcount.get(key, 0) if options["globals", "verbose"]: if count is not None: print "\nClassified %s ham, %s spam, and %s unsure." % \ (count["ham"], count["spam"], count["unsure"]) print "Classifying took %.4f seconds." % (time.time() - t,)def run(force_UI=False): try: opts, args = getopt.getopt(sys.argv[1:], 'hbPtcvl:e:i:d:p:o:') except getopt.error, msg: print >>sys.stderr, str(msg) + '\n\n' + __doc__ sys.exit() doTrain = False doClassify = False doExpunge = options["imap", "expunge"] imapDebug = 0 sleepTime = 0 promptForPass = False launchUI = False servers = "" usernames = "" for opt, arg in opts: if opt == '-h': print >>sys.stderr, __doc__ sys.exit() elif opt == "-b": launchUI = True elif opt == '-t': doTrain = True elif opt == '-P': promptForPass = True elif opt == '-c': doClassify = True elif opt == '-v': options["globals", "verbose"] = True elif opt == '-e': if arg == 'y': doExpunge = True else: doExpunge = False elif opt == '-i': imapDebug = int(arg) elif opt == '-l': sleepTime = int(arg) * 60 elif opt == '-o': options.set_from_cmdline(arg, sys.stderr) bdbname, useDBM = storage.database_type(opts) # Let the user know what they are using... v = get_current_version(); print "%s.\n" % (v.get_long_version("SpamBayes IMAP Filter"),) if options["globals", "verbose"]: print "Loading database %s..." % (bdbname), classifier = storage.open_storage(bdbname, useDBM) message_db = message.Message().message_info_db if options["globals", "verbose"]: print "Done." if options["imap", "server"]: servers = options["imap", "server"] usernames = options["imap", "username"] if not promptForPass: pwds = options["imap", "password"] else: pwds = None if not launchUI and not force_UI: print "You need to specify both a server and a username." sys.exit() if promptForPass: pwds = [] for i in xrange(len(usernames)): pwds.append(getpass("Enter password for %s:" % (usernames[i],))) servers_data = [] for server, username, password in zip(servers, usernames, pwds or []): if server.find(':') > -1: server, port = server.split(':', 1) port = int(port) else: if options["imap", "use_ssl"]: port = 993 else: port = 143 servers_data.append((server, port, username, password)) # Load stats manager. stats = Stats.Stats(options, message_db) imap_filter = IMAPFilter(classifier, stats) # Web interface. We have changed the rules about this many times. # With 1.0.x, the rule is that the interface is served if we are # not classifying or training. However, this runs into the problem # that if we run with -l, we might still want to edit the options, # and we don't want to start a separate instance, because then the # database is accessed from two processes. # With 1.1.x, the rule is that the interface is also served if the # -l option is used, which means it is only not served if we are # doing a one-off classification/train. In that case, there would # probably not be enough time to get to the interface and interact # with it (and we don't want it to die halfway through!), and we # don't want to slow classification/training down, either. if sleepTime or not (doClassify or doTrain): imaps = [] for server, port, username, password in servers_data: if server == "": imaps.append(None) else: imaps.append(IMAPSession(server, port, imapDebug, doExpunge)) def close_db(): message_db.store() message_db.close() message.Message().message_info_db.store() message.Message().message_info_db.close() message.Message.message_info_db = None classifier.store() classifier.close() def change_db(): classifier = storage.open_storage(*storage.database_type(opts)) message.Message.message_info_db = message_db imap_filter = IMAPFilter(classifier, message_db) httpServer = UserInterfaceServer(options["html_ui", "port"]) httpServer.register(IMAPUserInterface(classifier, imaps, pwds, IMAPSession, stats=stats, close_db=close_db, change_db=change_db)) launchBrowser=launchUI or options["html_ui", "launch_browser"] if sleepTime: # Run in a separate thread, as we have more work to do. thread.start_new_thread(Dibbler.run, (), {"launchBrowser":launchBrowser}) else: Dibbler.run(launchBrowser=launchBrowser) if doClassify or doTrain: imaps = [] for server, port, username, password in servers_data: imaps.append(((server, port, imapDebug, doExpunge), username, password)) # In order to make working with multiple servers easier, we # allow the user to have separate configuration files for each # server. These may specify different folders to watch, different # spam/unsure folders, or any other options (e.g. thresholds). # For each server we use the default (global) options, and load # the specific options on top. To facilitate this, we use a # restore point for the options with just the default (global) # options. # XXX What about when we are running with -l and change options # XXX via the web interface? We need to handle that, really. options.set_restore_point() while True: for (server, port, imapDebug, doExpunge), username, password in imaps: imap = IMAPSession(server, port, imapDebug, doExpunge) if options["globals", "verbose"]: print "Account: %s:%s" % (imap.server, imap.port) if imap.connected: # As above, we load a separate configuration file # for each server, if it exists. We look for a # file in the optionsPathname directory, with the # name server.name.ini or .spambayes_server_name_rc # XXX While 1.1 is in alpha these names can be # XXX changed if desired. Please let Tony know! basedir = os.path.dirname(optionsPathname) fn1 = os.path.join(basedir, imap.server + ".ini") fn2 = os.path.join(basedir, imap.server.replace(".", "_") + \ "_rc") for fn in (fn1, fn2): if os.path.exists(fn): options.merge_file(fn) try: imap.login(username, password) except LoginFailure, e: print str(e) continue imap_filter.imap_server = imap if doTrain: if options["globals", "verbose"]: print "Training" imap_filter.Train() if doClassify: if options["globals", "verbose"]: print "Classifying" imap_filter.Filter() imap.logout() options.revert_to_restore_point() else: # Failed to connect. This may be a temporary problem, # so just continue on and try again. If we are only # running once we will end, otherwise we'll try again # in sleepTime seconds. # XXX Maybe we should log this error message? pass if sleepTime: time.sleep(sleepTime) else: breakif __name__ == '__main__': run()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -