📄 tokenizer.py
字号:
# Strip and specially tokenize embedded URLish thingies.url_fancy_re = re.compile(r""" \b # the preceeding character must not be alphanumeric (?: (?: (https? | ftp) # capture the protocol :// # skip the boilerplate )| (?= ftp\.[^\.\s<>"'\x7f-\xff] )| # allow the protocol to be missing, but only if (?= www\.[^\.\s<>"'\x7f-\xff] ) # the rest of the url starts "www.x" or "ftp.x" ) # Do a reasonable attempt at detecting the end. It may or may not # be in HTML, may or may not be in quotes, etc. If it's full of % # escapes, cool -- that's a clue too. ([^\s<>"'\x7f-\xff]+) # capture the guts""", re.VERBOSE) # 'url_re = re.compile(r""" (https? | ftp) # capture the protocol :// # skip the boilerplate # Do a reasonable attempt at detecting the end. It may or may not # be in HTML, may or may not be in quotes, etc. If it's full of % # escapes, cool -- that's a clue too. ([^\s<>"'\x7f-\xff]+) # capture the guts""", re.VERBOSE) # 'urlsep_re = re.compile(r"[;?:@&=+,$.]")class URLStripper(Stripper): def __init__(self): # The empty regexp matches anything at once. if options["Tokenizer", "x-fancy_url_recognition"]: search = url_fancy_re.search else: search = url_re.search Stripper.__init__(self, search, re.compile("").search) def tokenize(self, m): proto, guts = m.groups() assert guts if proto is None: if guts.lower().startswith("www"): proto = "http" elif guts.lower().startswith("ftp"): proto = "ftp" else: proto = "unknown" tokens = ["proto:" + proto] pushclue = tokens.append if options["Tokenizer", "x-pick_apart_urls"]: url = proto + "://" + guts escapes = re.findall(r'%..', guts) # roughly how many %nn escapes are there? if escapes: pushclue("url:%%%d" % int(log2(len(escapes)))) # %nn escapes are usually intentional obfuscation. Generate a # lot of correlated tokens if the URL contains a lot of them. # The classifier will learn which specific ones are and aren't # spammy. tokens.extend(["url:" + escape for escape in escapes]) # now remove any obfuscation and probe around a bit url = urllib.unquote(url) scheme, netloc, path, params, query, frag = urlparse.urlparse(url) if cache is not None and options["Tokenizer", "x-lookup_ip"]: ips=cache.lookup(netloc) if len(ips)==0: pushclue("url-ip:timeout") else: for ip in ips: # Should we limit to one A record? pushclue("url-ip:%s/32" % ip) dottedQuadList=ip.split(".") pushclue("url-ip:%s/8" % dottedQuadList[0]) pushclue("url-ip:%s.%s/16" % (dottedQuadList[0], dottedQuadList[1])) pushclue("url-ip:%s.%s.%s/24" % (dottedQuadList[0], dottedQuadList[1], dottedQuadList[2])) # one common technique in bogus "please (re-)authorize yourself" # scams is to make it appear as if you're visiting a valid # payment-oriented site like PayPal, CitiBank or eBay, when you # actually aren't. The company's web server appears as the # beginning of an often long username element in the URL such as # http://www.paypal.com%65%43%99%35@10.0.1.1/iwantyourccinfo # generally with an innocuous-looking fragment of text or a # valid URL as the highlighted link. Usernames should rarely # appear in URLs (perhaps in a local bookmark you established), # and never in a URL you receive from an unsolicited email or # another website. user_pwd, host_port = urllib.splituser(netloc) if user_pwd is not None: pushclue("url:has user") host, port = urllib.splitport(host_port) # web servers listening on non-standard ports are suspicious ... if port is not None: if (scheme == "http" and port != '80' or scheme == "https" and port != '443'): pushclue("url:non-standard %s port" % scheme) # ... as are web servers associated with raw ip addresses if re.match("(\d+\.?){4,4}$", host) is not None: pushclue("url:ip addr") # make sure we later tokenize the unobfuscated url bits proto, guts = url.split("://", 1) # Lose the trailing punctuation for casual embedding, like: # The code is at http://mystuff.org/here? Didn't resolve. # or # I found it at http://mystuff.org/there/. Thanks! while guts and guts[-1] in '.:?!/': guts = guts[:-1] for piece in guts.split('/'): for chunk in urlsep_re.split(piece): pushclue("url:" + chunk) return tokensreceived_complaints_re = re.compile(r'\([a-z]+(?:\s+[a-z]+)+\)')class SlurpingURLStripper(URLStripper): def __init__(self): URLStripper.__init__(self) def analyze(self, text): # If there are no URLS, then we need to clear the # wordstream, or whatever was there from the last message # will be used. classifier.slurp_wordstream = None # Continue as normal. return URLStripper.analyze(self, text) def tokenize(self, m): # XXX Note that the 'slurped' tokens are *always* trained # XXX on; it would be simple to change/parameterize this. tokens = URLStripper.tokenize(self, m) if not options["URLRetriever", "x-slurp_urls"]: return tokens proto, guts = m.groups() if proto != "http": return tokens assert guts while guts and guts[-1] in '.:;?!/)': guts = guts[:-1] classifier.slurp_wordstream = (proto, guts) return tokensif options["URLRetriever", "x-slurp_urls"]: crack_urls = SlurpingURLStripper().analyzeelse: crack_urls = URLStripper().analyze# Nuke HTML <style gimmicks.html_style_start_re = re.compile(r""" < \s* style\b [^>]* >""", re.VERBOSE)class StyleStripper(Stripper): def __init__(self): Stripper.__init__(self, html_style_start_re.search, re.compile(r"</style>").search)crack_html_style = StyleStripper().analyze# Nuke HTML comments.class CommentStripper(Stripper): def __init__(self): Stripper.__init__(self, re.compile(r"<!--|<\s*comment\s*[^>]*>").search, re.compile(r"-->|</comment>").search)crack_html_comment = CommentStripper().analyze# Nuke stuff between <noframes> </noframes> tags.class NoframesStripper(Stripper): def __init__(self): Stripper.__init__(self, re.compile(r"<\s*noframes\s*>").search, re.compile(r"</noframes\s*>").search)crack_noframes = NoframesStripper().analyze# Scan HTML for constructs often seen in viruses and worms.# <script </script# <iframe </iframe# src=cid:# height=0 width=0virus_re = re.compile(r""" < /? \s* (?: script | iframe) \b| \b src= ['"]? cid:| \b (?: height | width) = ['"]? 0""", re.VERBOSE) # 'def find_html_virus_clues(text): for bingo in virus_re.findall(text): yield bingonumeric_entity_re = re.compile(r'&#(\d+);')def numeric_entity_replacer(m): try: return chr(int(m.group(1))) except: return '?'breaking_entity_re = re.compile(r""" | < (?: p | br ) >""", re.VERBOSE)class Tokenizer: date_hms_re = re.compile(r' (?P<hour>[0-9][0-9])' r':(?P<minute>[0-9][0-9])' r'(?::[0-9][0-9])? ') date_formats = ("%a, %d %b %Y %H:%M:%S (%Z)", "%a, %d %b %Y %H:%M:%S %Z", "%d %b %Y %H:%M:%S (%Z)", "%d %b %Y %H:%M:%S %Z", "%a, %d %b %Y %H:%M (%Z)", "%a, %d %b %Y %H:%M %Z", "%d %b %Y %H:%M (%Z)", "%d %b %Y %H:%M %Z") def __init__(self): self.setup() def setup(self): """Get the tokenizer ready to use; this should be called after all options have been set.""" # We put this here, rather than in __init__, so that this can be # done after we set options at runtime (since the tokenizer # instance is generally created when this module is imported). if options["Tokenizer", "basic_header_tokenize"]: self.basic_skip = [re.compile(s) for s in options["Tokenizer", "basic_header_skip"]] def get_message(self, obj): return get_message(obj) def tokenize(self, obj): msg = self.get_message(obj) for tok in self.tokenize_headers(msg): yield tok for tok in self.tokenize_body(msg): yield tok def tokenize_headers(self, msg): # Special tagging of header lines and MIME metadata. # Content-{Type, Disposition} and their params, and charsets. # This is done for all MIME sections. for x in msg.walk(): for w in crack_content_xyz(x): yield w # The rest is solely tokenization of header lines. # XXX The headers in my (Tim's) spam and ham corpora are so different # XXX (they came from different sources) that including several kinds # XXX of header analysis renders the classifier's job trivial. So # XXX lots of this is crippled now, controlled by an ever-growing # XXX collection of funky options. # Basic header tokenization # Tokenize the contents of each header field in the way Subject lines # are tokenized later. # XXX Different kinds of tokenization have gotten better results on # XXX different header lines. No experiments have been run on # XXX whether the best choice is being made for each of the header # XXX lines tokenized by this section. # The name of the header is used as a tag. Tokens look like # "header:word". The basic approach is simple and effective, but # also very sensitive to biases in the ham and spam collections. # For example, if the ham and spam were collected at different # times, several headers with date/time information will become # the best discriminators. # (Not just Date, but Received and X-From_.) if options["Tokenizer", "basic_header_tokenize"]: for k, v in msg.items(): k = k.lower() for rx in self.basic_skip: if rx.match(k): break # do nothing -- we're supposed to skip this else: # Never found a match -- don't skip this. for w in subject_word_re.findall(v): for t in tokenize_word(w): yield "%s:%s" % (k, t) if options["Tokenizer", "basic_header_tokenize_only"]: return # Habeas Headers - see http://www.habeas.com if options["Tokenizer", "x-search_for_habeas_headers"]: habeas_headers = [("X-Habeas-SWE-1", "winter into spring"),("X-Habeas-SWE-2", "brightly anticipated"),("X-Habeas-SWE-3", "like Habeas SWE (tm)"),("X-Habeas-SWE-4", "Copyright 2002 Habeas (tm)"),("X-Habeas-SWE-5", "Sender Warranted Email (SWE) (tm). The sender of this"),("X-Habeas-SWE-6", "email in exchange for a license for this Habeas"),("X-Habeas-SWE-7", "warrant mark warrants that this is a Habeas Compliant"),("X-Habeas-SWE-8", "Message (HCM) and not spam. Please report use of this"),("X-Habeas-SWE-9", "mark in spam to <http://www.habeas.com/report/>.") ] valid_habeas = 0 invalid_habeas = False for opt, val in habeas_headers: habeas = msg.get(opt) if habeas is not None: if options["Tokenizer", "x-reduce_habeas_headers"]: if habeas == val: valid_habeas += 1 else: invalid_habeas = True else: if habeas == val: yield opt.lower() + ":valid" else: yield opt.lower() + ":invalid" if options["Tokenizer", "x-reduce_habeas_headers"]:
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -