⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sb_culler.py

📁 用python实现的邮件过滤器
💻 PY
📖 第 1 页 / 共 2 页
字号:
            print "Total bytes  :   ", size            except:        mailbox.quit()        raise    return mailboxdef _log_subject(mi, log):    encoded_subject = mi.msg.get('subject')    try:        subject, encoding = Header.decode_header(encoded_subject)[0]    except Header.HeaderParseError:        log.info("%s Subject cannot be parsed" % (mi.i,))        return    if encoding is None or encoding == 'iso-8859-1':        s = subject    else:        s = encoded_subject    log.info("%s Subject: %r" % (mi.i, s))class Filters(list):    def add(self, test, action):        """short-cut to make a Filter given the test and action"""        self.append(Filter(test, action))    def process_mailbox(self, mailbox):        count, size = mailbox.stat()        log = Logger()        for i in range(1, count+1):            if (i-1) % 10 == 0:                print " == %d/%d ==" % (i, count)            # Kevin's code used -1, but -1 doesn't work for one of            # my POP accounts, while a million does.            # Don't use retr because that may mark the message as            # read (so says Kevin's code)            message_tuple = mailbox.top(i, 1000000)            text = "\n".join(message_tuple[1])            msg = mboxutils.get_message(text)            mi = MessageInfo(mailbox, i, msg, text)                        _log_subject(mi, log)            for filter in self:                result = filter.process(mi, log)                if result:                    log.accept(result)                    break            else:                # don't know what to do with this so just                # keep it on the server                log.pass_test("unknown")                log.do_action(KEEP_IN_MAILBOX)                log.accept("unknown")                    return logdef filter_server( (server, user, pwd), filters):    if VERBOSE_LEVEL:        print "=" * 78        print "Processing %s on %s" % (user, server)    mailbox = open_mailbox(server, user, pwd)    try:        log = filters.process_mailbox(mailbox)    finally:        mailbox.quit()    return log##### User-specificimport time, sys, urllib# A simple text interface.def _unix_stop():    passdef _ms_stop():    # ^C doesn't seem to work correctly in the DOS box    # so assume any keypress is a break    if msvcrt.kbhit():        raise SystemExit()try:    import msvcrt    _check_for_stop = _ms_stopexcept ImportError:    _check_for_stop = _unix_stop        def restart_network():    # This is called after too many connection failures.    # That usually means my ISP dropped my DHCP and I need to    # bounce my Linksys firewall/DHCP/hub.        print "Network appears to be down.  Bringing Linksys down then up..."    try:        # Note this this example uses the default password.  YMMV.        urllib.urlopen("http://:admin@192.168.1.1/Gozila.cgi?pppoeAct=2").read()        urllib.urlopen("http://:admin@192.168.1.1/Gozila.cgi?pppoeAct=1").read()    except KeyboardInterrupt:        raise    except:        traceback.print_exc()def wait(t, delta = 10):    """Wait for 't' seconds"""    assert delta > 0, delta    assert t >= 1    first = True    for i in range(t, -1, -delta):        if VERBOSE_LEVEL:            if not first:                print "..",            print i,            sys.stdout.flush()        time.sleep(min(i, delta))                _check_for_stop()        first = False    printdef main():    filters = Filters()    duplicate = Duplicate()    filters.add(duplicate, AppendFile("spam2.mbox"))    # A list of everyone who has emailed me this year.    # Keep their messages on the server.    filters.add(WhiteListFrom("good_emails.txt"), KEEP)    # My mailing lists.    filters.add(WhiteListSubstrings("subject", [                   'ABCD:',                   '[Python-announce]',                   '[Python]',                   '[Bioinfo]',                   '[EuroPython]',                   ]),                KEEP)    filters.add(WhiteListSubstrings("to", [        "president@whitehouse.gov",        "ceo@big.com",        ]),                KEEP)    names = ["john", "", "jon", "johnathan"]    valid_emails = ([name + "@lectroid.com" for name in names] +                    [name + "@bigboote.org" for name in names] +                    ["buckeroo.bonzai@aol.earth"])    filters.add(IllegalDeliveredTo(valid_emails), DELETE)    filters.add(SpamAssassin(), AppendFile("spam2.mbox"))        # Get rid of anything which smells like an exectuable.    filters.add(IsVirus, DELETE)    # Use SpamBayes to identify spam.  Make a local copy then    # delete from the server.    h = hammie.open("cull.spambayes", "dbm", "r")    filters.add(IsSpam(h, 0.90), AppendFile("spam.mbox"))    # These are my POP3 accounts.    server_configs = [("mail.example.com",                          "user@example.com", "password"),                      ("popserver.big.com", "ceo", "12345"), ]    # The main culling loop.    error_count = 0    cumulative_log = {SPAM: 0, VIRUS: 0}    initial_log = None    start_time = None  # init'ed only after initial_log is created    while 1:        error_flag = False        duplicate.unique.clear()  # Hack!        for server, user, pwd in server_configs:            try:                log = filter_server( (server, user, pwd), filters)            except KeyboardInterrupt:                raw_input("Press enter to continue. ")            except StandardError:                raise            except:                error_flag = True                traceback.print_exc()                continue            if VERBOSE_LEVEL > 1 and log:                print "  ** Summary **"                for x in (log.tests, log.actions):                    items = x.items()                    if items:                        items.sort()                        for k, v in items:                            print "  %s: %s" % (k, v)                        print            cumulative_log[SPAM] += log.tests.get(SPAM, 0)            cumulative_log[VIRUS] += log.tests.get(VIRUS, 0)        if initial_log is None:            initial_log = cumulative_log.copy()            start_time = time.time()            if VERBOSE_LEVEL:                print "Stats: %d spams, %d virus" % (                    initial_log[SPAM], initial_log[VIRUS])        else:            if VERBOSE_LEVEL:                delta_t = time.time() - start_time                delta_t = max(delta_t, 1)  #                print "Stats: %d spams (%.2f/hr), %d virus (%.2f/hr)" % (                    cumulative_log[SPAM],                    (cumulative_log[SPAM] - initial_log[SPAM]) /                             delta_t * 3600,                    cumulative_log[VIRUS],                    (cumulative_log[VIRUS] - initial_log[VIRUS]) /                             delta_t * 3600)                if error_flag:            error_count += 1        if error_count > 0:            restart_network()            error_count = 0        delay = 10 * 60        while delay:            try:                wait(delay)                break            except KeyboardInterrupt:                print                while 1:                    cmd = raw_input("enter, delay, or quit? ")                    if cmd in ("q", "quit"):                        raise SystemExit(0)                    elif cmd == "":                        delay = 0                        break                    elif cmd.isdigit():                        delay = int(cmd)                        break                    else:                        print "Unknown command."if __name__ == "__main__":    main()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -