⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sb_mboxtrain.py

📁 用python实现的邮件过滤器
💻 PY
字号:
#! /usr/bin/env python### Train spambayes on all previously-untrained messages in a mailbox.###### This keeps track of messages it's already trained by adding an### X-Spambayes-Trained: header to each one.  Then, if you move one to### another folder, it will retrain that message.  You would want to run### this from a cron job on your server."""Usage: %(program)s [OPTIONS] ...Where OPTIONS is one or more of:    -h        show usage and exit    -d DBNAME        use the DBM store.  A DBM file is larger than the pickle and        creating it is slower, but loading it is much faster,        especially for large word databases.  Recommended for use with        sb_filter or any procmail-based filter.    -p DBNAME        use the pickle store.  A pickle is smaller and faster to create,        but much slower to load.  Recommended for use with sb_server and        sb_xmlrpcserver.    -g PATH        mbox or directory of known good messages (non-spam) to train on.        Can be specified more than once.    -s PATH        mbox or directory of known spam messages to train on.        Can be specified more than once.    -f        force training, ignoring the trained header.  Use this if you        need to rebuild your database from scratch.    -q        quiet mode; no output    -n  train mail residing in "new" directory, in addition to "cur"        directory, which is always trained (Maildir only)    -r  remove mail which was trained on (Maildir only)    -o section:option:value        set [section, option] in the options database to value"""try:    True, Falseexcept NameError:    # Maintain compatibility with Python 2.2    True, False = 1, 0import sys, os, getopt, emailimport shutilfrom spambayes import hammie, storage, mboxutilsfrom spambayes.Options import options, get_pathname_optionprogram = sys.argv[0]loud = Truedef get_message(obj):    """Return an email Message object.    This works like mboxutils.get_message, except it doesn't junk the    headers if there's an error.  Doing so would cause a headerless    message to be written back out!    """    if isinstance(obj, email.Message.Message):        return obj    # Create an email Message object.    if hasattr(obj, "read"):        obj = obj.read()    try:        msg = email.message_from_string(obj)    except email.Errors.MessageParseError:        msg = None    return msgdef msg_train(h, msg, is_spam, force):    """Train bayes with a single message."""    # XXX: big hack -- why is email.Message unable to represent    # multipart/alternative?    try:        mboxutils.as_string(msg)    except TypeError:        # We'll be unable to represent this as text :(        return False    if is_spam:        spamtxt = options["Headers", "header_spam_string"]    else:        spamtxt = options["Headers", "header_ham_string"]    oldtxt = msg.get(options["Headers", "trained_header_name"])    if force:        # Train no matter what.        if oldtxt != None:            del msg[options["Headers", "trained_header_name"]]    elif oldtxt == spamtxt:        # Skip this one, we've already trained with it.        return False    elif oldtxt != None:        # It's been trained, but as something else.  Untrain.        del msg[options["Headers", "trained_header_name"]]        h.untrain(msg, not is_spam)    h.train(msg, is_spam)    msg.add_header(options["Headers", "trained_header_name"], spamtxt)    return Truedef maildir_train(h, path, is_spam, force, removetrained):    """Train bayes with all messages from a maildir."""    if loud: print "  Reading %s as Maildir" % (path,)    import time    import socket    pid = os.getpid()    host = socket.gethostname()    counter = 0    trained = 0    for fn in os.listdir(path):        cfn = os.path.join(path, fn)        tfn = os.path.normpath(os.path.join(path, "..", "tmp",                           "%d.%d_%d.%s" % (time.time(), pid,                                            counter, host)))        if (os.path.isdir(cfn)):            continue        counter += 1        if loud and counter % 10 == 0:            sys.stdout.write("\r%6d" % counter)            sys.stdout.flush()        f = file(cfn, "rb")        msg = get_message(f)        f.close()        if not msg:            print "Malformed message: %s.  Skipping..." % cfn            continue        if not msg_train(h, msg, is_spam, force):            continue        trained += 1        if not options["Headers", "include_trained"]:            continue        f = file(tfn, "wb")        f.write(mboxutils.as_string(msg))        f.close()        shutil.copystat(cfn, tfn)        # XXX: This will raise an exception on Windows.  Do any Windows        # people actually use Maildirs?        os.rename(tfn, cfn)        if (removetrained):            os.unlink(cfn)    if loud:        sys.stdout.write("\r%6d" % counter)        sys.stdout.write("\r  Trained %d out of %d messages\n" %                         (trained, counter))def mbox_train(h, path, is_spam, force):    """Train bayes with a Unix mbox"""    if loud: print "  Reading as Unix mbox"    import mailbox    import fcntl    # Open and lock the mailbox.  Some systems require it be opened for    # writes in order to assert an exclusive lock.    f = file(path, "r+b")    fcntl.flock(f, fcntl.LOCK_EX)    mbox = mailbox.PortableUnixMailbox(f, get_message)    outf = os.tmpfile()    counter = 0    trained = 0    for msg in mbox:        if not msg:            print "Malformed message number %d.  I can't train on this mbox, sorry." % counter            return        counter += 1        if loud and counter % 10 == 0:            sys.stdout.write("\r%6d" % counter)            sys.stdout.flush()        if msg_train(h, msg, is_spam, force):            trained += 1        if options["Headers", "include_trained"]:            # Write it out with the Unix "From " line            outf.write(mboxutils.as_string(msg, True))    if options["Headers", "include_trained"]:        outf.seek(0)        try:            os.ftruncate(f.fileno(), 0)            f.seek(0)        except:            # If anything goes wrong, don't try to write            print "Problem truncating mbox--nothing written"            raise        try:            for line in outf.xreadlines():                f.write(line)        except:            print >> sys.stderr ("Problem writing mbox!  Sorry, "                                 "I tried my best, but your mail "                                 "may be corrupted.")            raise    fcntl.flock(f, fcntl.LOCK_UN)    f.close()    if loud:        sys.stdout.write("\r%6d" % counter)        sys.stdout.write("\r  Trained %d out of %d messages\n" %                         (trained, counter))def mhdir_train(h, path, is_spam, force):    """Train bayes with an mh directory"""    if loud: print "  Reading as MH mailbox"    import glob    counter = 0    trained = 0    for fn in glob.glob(os.path.join(path, "[0-9]*")):        counter += 1        cfn = fn        tfn = os.path.join(path, "spambayes.tmp")        if loud and counter % 10 == 0:            sys.stdout.write("\r%6d" % counter)            sys.stdout.flush()        f = file(fn, "rb")        msg = get_message(f)        f.close()        if not msg:            print "Malformed message: %s.  Skipping..." % cfn            continue        msg_train(h, msg, is_spam, force)        trained += 1        if not options["Headers", "include_trained"]:            continue        f = file(tfn, "wb")        f.write(mboxutils.as_string(msg))        f.close()        shutil.copystat(cfn, tfn)        # XXX: This will raise an exception on Windows.  Do any Windows        # people actually use MH directories?        os.rename(tfn, cfn)    if loud:        sys.stdout.write("\r%6d" % counter)        sys.stdout.write("\r  Trained %d out of %d messages\n" %                         (trained, counter))def train(h, path, is_spam, force, trainnew, removetrained):    if not os.path.exists(path):        raise ValueError("Nonexistent path: %s" % path)    elif os.path.isfile(path):        mbox_train(h, path, is_spam, force)    elif os.path.isdir(os.path.join(path, "cur")):        maildir_train(h, os.path.join(path, "cur"), is_spam, force,                      removetrained)        if trainnew:            maildir_train(h, os.path.join(path, "new"), is_spam, force,                          removetrained)    elif os.path.isdir(path):        mhdir_train(h, path, is_spam, force)    else:        raise ValueError("Unable to determine mailbox type: " + path)def usage(code, msg=''):    """Print usage message and sys.exit(code)."""    if msg:        print >> sys.stderr, msg        print >> sys.stderr    print >> sys.stderr, __doc__ % globals()    sys.exit(code)def main():    """Main program; parse options and go."""    global loud    try:        opts, args = getopt.getopt(sys.argv[1:], 'hfqnrd:p:g:s:o:')    except getopt.error, msg:        usage(2, msg)    if not opts:        usage(2, "No options given")    force = False    trainnew = False    removetrained = False    good = []    spam = []    for opt, arg in opts:        if opt == '-h':            usage(0)        elif opt == "-f":            force = True        elif opt == "-n":            trainnew = True        elif opt == "-q":            loud = False        elif opt == '-g':            good.append(arg)        elif opt == '-s':            spam.append(arg)        elif opt == "-r":            removetrained = True        elif opt == '-o':            options.set_from_cmdline(arg, sys.stderr)    pck, usedb = storage.database_type(opts)    if args:        usage(2, "Positional arguments not allowed")    if usedb == None:        # Use settings in configuration file.        usedb = options["Storage", "persistent_use_database"]        pck = get_pathname_option("Storage",                                          "persistent_storage_file")    h = hammie.open(pck, usedb, "c")    for g in good:        if loud: print "Training ham (%s):" % g        train(h, g, False, force, trainnew, removetrained)        sys.stdout.flush()        save = True    for s in spam:        if loud: print "Training spam (%s):" % s        train(h, s, True, force, trainnew, removetrained)        sys.stdout.flush()        save = True    if save:        h.store()if __name__ == "__main__":    main()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -