📄 smtpproxy.py
字号:
BayesSMTPProxy objects to serve them.""" def __init__(self, serverName, serverPort, proxyPort, trainer): proxyArgs = (serverName, serverPort, trainer) Dibbler.Listener.__init__(self, proxyPort, BayesSMTPProxy, proxyArgs) print 'SMTP Listener on port %s is proxying %s:%d' % \ (_addressPortStr(proxyPort), serverName, serverPort)class BayesSMTPProxy(SMTPProxyBase): """Proxies between an email client and a SMTP server, inserting judgement headers. It acts on the following SMTP commands: o RCPT TO: o Checks if the recipient address matches the key ham or spam addresses, and if so notes this and does not forward a command to the proxied server. In all other cases simply passes on the verbatim command. o DATA: o Notes that we are in the data section. If (from the RCPT TO information) we are receiving a ham/spam message to train on, then do not forward the command on. Otherwise forward verbatim. Any other commands are merely passed on verbatim to the server. """ def __init__(self, clientSocket, serverName, serverPort, trainer): SMTPProxyBase.__init__(self, clientSocket, serverName, serverPort) self.handlers = {'RCPT TO': self.onRcptTo, 'DATA': self.onData, 'MAIL FROM': self.onMailFrom} self.trainer = trainer self.isClosed = False self.train_as_ham = False self.train_as_spam = False def send(self, data): try: return SMTPProxyBase.send(self, data) except socket.error: # The email client has closed the connection - 40tude Dialog # does this immediately after issuing a QUIT command, # without waiting for the response. self.close() def close(self): # This can be called multiple times by async. if not self.isClosed: self.isClosed = True SMTPProxyBase.close(self) def stripAddress(self, address): """ Strip the leading & trailing <> from an address. Handy for getting FROM: addresses. """ if '<' in address: start = string.index(address, '<') + 1 end = string.index(address, '>') return address[start:end] else: return address def onTransaction(self, command, args): handler = self.handlers.get(command.upper(), self.onUnknown) return handler(command, args) def onProcessData(self, data): if self.train_as_spam: self.trainer.train(data, True) self.train_as_spam = False return "" elif self.train_as_ham: self.trainer.train(data, False) self.train_as_ham = False return "" return data def onRcptTo(self, command, args): toFull = self.stripAddress(args[0]) if toFull == options["smtpproxy", "spam_address"]: self.train_as_spam = True self.train_as_ham = False self.blockData = True self.push("250 OK\r\n") return None elif toFull == options["smtpproxy", "ham_address"]: self.train_as_ham = True self.train_as_spam = False self.blockData = True self.push("250 OK\r\n") return None else: self.blockData = False return "%s:%s" % (command, ' '.join(args)) def onData(self, command, args): self.inData = True if self.train_as_ham == True or self.train_as_spam == True: self.push("354 Enter data ending with a . on a line by itself\r\n") return None return command + ' ' + ' '.join(args) def onMailFrom(self, command, args): """Just like the default handler, but has the necessary colon.""" rv = "%s:%s" % (command, ' '.join(args)) return rv def onUnknown(self, command, args): """Default handler.""" return self.requestclass SMTPTrainer(object): def __init__(self, classifier, state=None, imap=None): self.classifier = classifier self.state = state self.imap = imap def extractSpambayesID(self, data): msg = email.message_from_string(data, _class=message.SBHeaderMessage) # The nicest MUA is one that forwards the header intact. id = msg.get(options["Headers", "mailid_header_name"]) if id is not None: return id # Some MUAs will put it in the body somewhere, while others will # put it in an attached MIME message. id = self._find_id_in_text(msg.as_string()) if id is not None: return id # the message might be encoded for part in textparts(msg): # Decode, or take it as-is if decoding fails. try: text = part.get_payload(decode=True) except: text = part.get_payload(decode=False) if text is not None: text = try_to_repair_damaged_base64(text) if text is not None: id = self._find_id_in_text(text) return id return None header_pattern = re.escape(options["Headers", "mailid_header_name"]) # A MUA might enclose the id in a table, thus the convoluted re pattern # (Mozilla Mail does this with inline html) header_pattern += r":\s*(\</th\>\s*\<td\>\s*)?([\d\-]+)" header_re = re.compile(header_pattern) def _find_id_in_text(self, text): mo = self.header_re.search(text) if mo is None: return None return mo.group(2) def train(self, msg, isSpam): try: use_cached = options["smtpproxy", "use_cached_message"] except KeyError: use_cached = True if use_cached: id = self.extractSpambayesID(msg) if id is None: print "Could not extract id" return self.train_cached_message(id, isSpam) # Otherwise, train on the forwarded/bounced message. msg = email.message_from_string(msg, _class=message.SBHeaderMessage) id = msg.setIdFromPayload() msg.delSBHeaders() if id is None: # No id, so we don't have any reliable method of remembering # information about this message, so we just assume that it # hasn't been trained before. We could generate some sort of # checksum for the message and use that as an id (this would # mean that we didn't need to store the id with the message) # but that might be a little unreliable. self.classifier.learn(msg.tokenize(), isSpam) else: if msg.GetTrained() == (not isSpam): self.classifier.unlearn(msg.tokenize(), not isSpam) msg.RememberTrained(None) if msg.GetTrained() is None: self.classifier.learn(msg.tokenize(), isSpam) msg.RememberTrained(isSpam) def train_cached_message(self, id, isSpam): if not self.train_message_in_pop3proxy_cache(id, isSpam) and \ not self.train_message_on_imap_server(id, isSpam): print "Could not find message (%s); perhaps it was " \ "deleted from the POP3Proxy cache or the IMAP " \ "server. This means that no training was done." % (id, ) def train_message_in_pop3proxy_cache(self, id, isSpam): if self.state is None: return False sourceCorpus = None for corpus in [self.state.unknownCorpus, self.state.hamCorpus, self.state.spamCorpus]: if corpus.get(id) is not None: sourceCorpus = corpus break if corpus is None: return False if isSpam == True: targetCorpus = self.state.spamCorpus else: targetCorpus = self.state.hamCorpus targetCorpus.takeMessage(id, sourceCorpus) self.classifier.store() return True def train_message_on_imap_server(self, id, isSpam): if self.imap is None: return False msg = self.imap.FindMessage(id) if msg is None: return False if msg.GetTrained() == (not isSpam): msg.get_substance() msg.delSBHeaders() self.classifier.unlearn(msg.tokenize(), not isSpam) msg.RememberTrained(None) if msg.GetTrained() is None: msg.get_substance() msg.delSBHeaders() self.classifier.learn(msg.tokenize(), isSpam) msg.RememberTrained(isSpam) self.classifier.store() return Truedef LoadServerInfo(): # Load the proxy settings servers = [] proxyPorts = [] if options["smtpproxy", "remote_servers"]: for server in options["smtpproxy", "remote_servers"]: server = server.strip() if server.find(':') > -1: server, port = server.split(':', 1) else: port = '25' servers.append((server, int(port))) if options["smtpproxy", "listen_ports"]: splitPorts = options["smtpproxy", "listen_ports"] proxyPorts = map(_addressAndPort, splitPorts) if len(servers) != len(proxyPorts): print "smtpproxy:remote_servers & smtpproxy:listen_ports are " + \ "different lengths!" sys.exit() return servers, proxyPortsdef CreateProxies(servers, proxyPorts, trainer): """Create BayesSMTPProxyListeners for all the given servers.""" proxyListeners = [] for (server, serverPort), proxyPort in zip(servers, proxyPorts): listener = BayesSMTPProxyListener(server, serverPort, proxyPort, trainer) proxyListeners.append(listener) return proxyListeners
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -