📄 sb_culler.py
字号:
print "Total bytes : ", size except: mailbox.quit() raise return mailboxdef _log_subject(mi, log): encoded_subject = mi.msg.get('subject') try: subject, encoding = Header.decode_header(encoded_subject)[0] except Header.HeaderParseError: log.info("%s Subject cannot be parsed" % (mi.i,)) return if encoding is None or encoding == 'iso-8859-1': s = subject else: s = encoded_subject log.info("%s Subject: %r" % (mi.i, s))class Filters(list): def add(self, test, action): """short-cut to make a Filter given the test and action""" self.append(Filter(test, action)) def process_mailbox(self, mailbox): count, size = mailbox.stat() log = Logger() for i in range(1, count+1): if (i-1) % 10 == 0: print " == %d/%d ==" % (i, count) # Kevin's code used -1, but -1 doesn't work for one of # my POP accounts, while a million does. # Don't use retr because that may mark the message as # read (so says Kevin's code) message_tuple = mailbox.top(i, 1000000) text = "\n".join(message_tuple[1]) msg = mboxutils.get_message(text) mi = MessageInfo(mailbox, i, msg, text) _log_subject(mi, log) for filter in self: result = filter.process(mi, log) if result: log.accept(result) break else: # don't know what to do with this so just # keep it on the server log.pass_test("unknown") log.do_action(KEEP_IN_MAILBOX) log.accept("unknown") return logdef filter_server( (server, user, pwd), filters): if VERBOSE_LEVEL: print "=" * 78 print "Processing %s on %s" % (user, server) mailbox = open_mailbox(server, user, pwd) try: log = filters.process_mailbox(mailbox) finally: mailbox.quit() return log##### User-specificimport time, sys, urllib# A simple text interface.def _unix_stop(): passdef _ms_stop(): # ^C doesn't seem to work correctly in the DOS box # so assume any keypress is a break if msvcrt.kbhit(): raise SystemExit()try: import msvcrt _check_for_stop = _ms_stopexcept ImportError: _check_for_stop = _unix_stop def restart_network(): # This is called after too many connection failures. # That usually means my ISP dropped my DHCP and I need to # bounce my Linksys firewall/DHCP/hub. print "Network appears to be down. Bringing Linksys down then up..." try: # Note this this example uses the default password. YMMV. urllib.urlopen("http://:admin@192.168.1.1/Gozila.cgi?pppoeAct=2").read() urllib.urlopen("http://:admin@192.168.1.1/Gozila.cgi?pppoeAct=1").read() except KeyboardInterrupt: raise except: traceback.print_exc()def wait(t, delta = 10): """Wait for 't' seconds""" assert delta > 0, delta assert t >= 1 first = True for i in range(t, -1, -delta): if VERBOSE_LEVEL: if not first: print "..", print i, sys.stdout.flush() time.sleep(min(i, delta)) _check_for_stop() first = False printdef main(): filters = Filters() duplicate = Duplicate() filters.add(duplicate, AppendFile("spam2.mbox")) # A list of everyone who has emailed me this year. # Keep their messages on the server. filters.add(WhiteListFrom("good_emails.txt"), KEEP) # My mailing lists. filters.add(WhiteListSubstrings("subject", [ 'ABCD:', '[Python-announce]', '[Python]', '[Bioinfo]', '[EuroPython]', ]), KEEP) filters.add(WhiteListSubstrings("to", [ "president@whitehouse.gov", "ceo@big.com", ]), KEEP) names = ["john", "", "jon", "johnathan"] valid_emails = ([name + "@lectroid.com" for name in names] + [name + "@bigboote.org" for name in names] + ["buckeroo.bonzai@aol.earth"]) filters.add(IllegalDeliveredTo(valid_emails), DELETE) filters.add(SpamAssassin(), AppendFile("spam2.mbox")) # Get rid of anything which smells like an exectuable. filters.add(IsVirus, DELETE) # Use SpamBayes to identify spam. Make a local copy then # delete from the server. h = hammie.open("cull.spambayes", "dbm", "r") filters.add(IsSpam(h, 0.90), AppendFile("spam.mbox")) # These are my POP3 accounts. server_configs = [("mail.example.com", "user@example.com", "password"), ("popserver.big.com", "ceo", "12345"), ] # The main culling loop. error_count = 0 cumulative_log = {SPAM: 0, VIRUS: 0} initial_log = None start_time = None # init'ed only after initial_log is created while 1: error_flag = False duplicate.unique.clear() # Hack! for server, user, pwd in server_configs: try: log = filter_server( (server, user, pwd), filters) except KeyboardInterrupt: raw_input("Press enter to continue. ") except StandardError: raise except: error_flag = True traceback.print_exc() continue if VERBOSE_LEVEL > 1 and log: print " ** Summary **" for x in (log.tests, log.actions): items = x.items() if items: items.sort() for k, v in items: print " %s: %s" % (k, v) print cumulative_log[SPAM] += log.tests.get(SPAM, 0) cumulative_log[VIRUS] += log.tests.get(VIRUS, 0) if initial_log is None: initial_log = cumulative_log.copy() start_time = time.time() if VERBOSE_LEVEL: print "Stats: %d spams, %d virus" % ( initial_log[SPAM], initial_log[VIRUS]) else: if VERBOSE_LEVEL: delta_t = time.time() - start_time delta_t = max(delta_t, 1) # print "Stats: %d spams (%.2f/hr), %d virus (%.2f/hr)" % ( cumulative_log[SPAM], (cumulative_log[SPAM] - initial_log[SPAM]) / delta_t * 3600, cumulative_log[VIRUS], (cumulative_log[VIRUS] - initial_log[VIRUS]) / delta_t * 3600) if error_flag: error_count += 1 if error_count > 0: restart_network() error_count = 0 delay = 10 * 60 while delay: try: wait(delay) break except KeyboardInterrupt: print while 1: cmd = raw_input("enter, delay, or quit? ") if cmd in ("q", "quit"): raise SystemExit(0) elif cmd == "": delay = 0 break elif cmd.isdigit(): delay = int(cmd) break else: print "Unknown command."if __name__ == "__main__": main()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -