rfc822.py
来自「mallet是自然语言处理、机器学习领域的一个开源项目。」· Python 代码 · 共 1,011 行 · 第 1/3 页
PY
1,011 行
Retrieves a list of addresses from a header, where each address is a tuple as returned by getaddr(). Scans all named headers, so it works properly with multiple To: or Cc: headers for example. """ raw = [] for h in self.getallmatchingheaders(name): if h[0] in ' \t': raw.append(h) else: if raw: raw.append(', ') i = h.find(':') if i > 0: addr = h[i+1:] raw.append(addr) alladdrs = ''.join(raw) a = AddrlistClass(alladdrs) return a.getaddrlist() def getdate(self, name): """Retrieve a date field from a header. Retrieves a date field from the named header, returning a tuple compatible with time.mktime(). """ try: data = self[name] except KeyError: return None return parsedate(data) def getdate_tz(self, name): """Retrieve a date field from a header as a 10-tuple. The first 9 elements make up a tuple compatible with time.mktime(), and the 10th is the offset of the poster's time zone from GMT/UTC. """ try: data = self[name] except KeyError: return None return parsedate_tz(data) # Access as a dictionary (only finds *last* header of each type): def __len__(self): """Get the number of headers in a message.""" return len(self.dict) def __getitem__(self, name): """Get a specific header, as from a dictionary.""" return self.dict[name.lower()] def __setitem__(self, name, value): """Set the value of a header. Note: This is not a perfect inversion of __getitem__, because any changed headers get stuck at the end of the raw-headers list rather than where the altered header was. """ del self[name] # Won't fail if it doesn't exist self.dict[name.lower()] = value text = name + ": " + value lines = text.split("\n") for line in lines: self.headers.append(line + "\n") def __delitem__(self, name): """Delete all occurrences of a specific header, if it is present.""" name = name.lower() if not self.dict.has_key(name): return del self.dict[name] name = name + ':' n = len(name) list = [] hit = 0 for i in range(len(self.headers)): line = self.headers[i] if line[:n].lower() == name: hit = 1 elif not line[:1].isspace(): hit = 0 if hit: list.append(i) list.reverse() for i in list: del self.headers[i] def setdefault(self, name, default=""): lowername = name.lower() if self.dict.has_key(lowername): return self.dict[lowername] else: text = name + ": " + default lines = text.split("\n") for line in lines: self.headers.append(line + "\n") self.dict[lowername] = default return default def has_key(self, name): """Determine whether a message contains the named header.""" return self.dict.has_key(name.lower()) def keys(self): """Get all of a message's header field names.""" return self.dict.keys() def values(self): """Get all of a message's header field values.""" return self.dict.values() def items(self): """Get all of a message's headers. Returns a list of name, value tuples. """ return self.dict.items() def __str__(self): str = '' for hdr in self.headers: str = str + hdr return str# Utility functions# -----------------# XXX Should fix unquote() and quote() to be really conformant.# XXX The inverses of the parse functions may also be useful.def unquote(str): """Remove quotes from a string.""" if len(str) > 1: if str[0] == '"' and str[-1:] == '"': return str[1:-1] if str[0] == '<' and str[-1:] == '>': return str[1:-1] return strdef quote(str): """Add quotes around a string.""" return str.replace('\\', '\\\\').replace('"', '\\"')def parseaddr(address): """Parse an address into a (realname, mailaddr) tuple.""" a = AddressList(address) list = a.addresslist if not list: return (None, None) else: return list[0]class AddrlistClass: """Address parser class by Ben Escoto. To understand what this class does, it helps to have a copy of RFC 2822 in front of you. http://www.faqs.org/rfcs/rfc2822.html Note: this class interface is deprecated and may be removed in the future. Use rfc822.AddressList instead. """ def __init__(self, field): """Initialize a new instance. `field' is an unparsed address header field, containing one or more addresses. """ self.specials = '()<>@,:;.\"[]' self.pos = 0 self.LWS = ' \t' self.CR = '\r\n' self.atomends = self.specials + self.LWS + self.CR # Note that RFC 2822 now specifies `.' as obs-phrase, meaning that it # is obsolete syntax. RFC 2822 requires that we recognize obsolete # syntax, so allow dots in phrases. self.phraseends = self.atomends.replace('.', '') self.field = field self.commentlist = [] def gotonext(self): """Parse up to the start of the next address.""" while self.pos < len(self.field): if self.field[self.pos] in self.LWS + '\n\r': self.pos = self.pos + 1 elif self.field[self.pos] == '(': self.commentlist.append(self.getcomment()) else: break def getaddrlist(self): """Parse all addresses. Returns a list containing all of the addresses. """ result = [] while 1: ad = self.getaddress() if ad: result += ad else: break return result def getaddress(self): """Parse the next address.""" self.commentlist = [] self.gotonext() oldpos = self.pos oldcl = self.commentlist plist = self.getphraselist() self.gotonext() returnlist = [] if self.pos >= len(self.field): # Bad email address technically, no domain. if plist: returnlist = [(' '.join(self.commentlist), plist[0])] elif self.field[self.pos] in '.@': # email address is just an addrspec # this isn't very efficient since we start over self.pos = oldpos self.commentlist = oldcl addrspec = self.getaddrspec() returnlist = [(' '.join(self.commentlist), addrspec)] elif self.field[self.pos] == ':': # address is a group returnlist = [] fieldlen = len(self.field) self.pos = self.pos + 1 while self.pos < len(self.field): self.gotonext() if self.pos < fieldlen and self.field[self.pos] == ';': self.pos = self.pos + 1 break returnlist = returnlist + self.getaddress() elif self.field[self.pos] == '<': # Address is a phrase then a route addr routeaddr = self.getrouteaddr() if self.commentlist: returnlist = [(' '.join(plist) + ' (' + \ ' '.join(self.commentlist) + ')', routeaddr)] else: returnlist = [(' '.join(plist), routeaddr)] else: if plist: returnlist = [(' '.join(self.commentlist), plist[0])] elif self.field[self.pos] in self.specials: self.pos = self.pos + 1 self.gotonext() if self.pos < len(self.field) and self.field[self.pos] == ',': self.pos = self.pos + 1 return returnlist def getrouteaddr(self): """Parse a route address (Return-path value). This method just skips all the route stuff and returns the addrspec. """ if self.field[self.pos] != '<': return expectroute = 0 self.pos = self.pos + 1 self.gotonext() adlist = "" while self.pos < len(self.field): if expectroute: self.getdomain() expectroute = 0 elif self.field[self.pos] == '>': self.pos = self.pos + 1 break elif self.field[self.pos] == '@': self.pos = self.pos + 1 expectroute = 1 elif self.field[self.pos] == ':': self.pos = self.pos + 1 else: adlist = self.getaddrspec() self.pos = self.pos + 1 break self.gotonext() return adlist def getaddrspec(self): """Parse an RFC 2822 addr-spec.""" aslist = [] self.gotonext() while self.pos < len(self.field): if self.field[self.pos] == '.': aslist.append('.') self.pos = self.pos + 1 elif self.field[self.pos] == '"': aslist.append('"%s"' % self.getquote()) elif self.field[self.pos] in self.atomends: break else: aslist.append(self.getatom()) self.gotonext() if self.pos >= len(self.field) or self.field[self.pos] != '@': return ''.join(aslist) aslist.append('@') self.pos = self.pos + 1 self.gotonext() return ''.join(aslist) + self.getdomain() def getdomain(self): """Get the complete domain name from an address.""" sdlist = [] while self.pos < len(self.field): if self.field[self.pos] in self.LWS: self.pos = self.pos + 1 elif self.field[self.pos] == '(': self.commentlist.append(self.getcomment()) elif self.field[self.pos] == '[': sdlist.append(self.getdomainliteral())
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?