📄 rfc822.py
字号:
# Portions Copyright (c) 2005 Nokia Corporation
"""RFC 2822 message manipulation."""
# Cleanup and extensions by Eric S. Raymond <esr@thyrsus.com>
import time
__all__ = ["Message","AddressList","parsedate","parsedate_tz","mktime_tz"]
_blanklines = ('\r\n', '\n') # Optimization for islast()
class Message:
"""Represents a single RFC 2822-compliant message."""
def __init__(self, fp, seekable = 1):
if seekable == 1:
# Exercise tell() to make sure it works
# (and then assume seek() works, too)
try:
fp.tell()
except (AttributeError, IOError):
seekable = 0
else:
seekable = 1
self.fp = fp
self.seekable = seekable
self.startofheaders = None
self.startofbody = None
#
if self.seekable:
try:
self.startofheaders = self.fp.tell()
except IOError:
self.seekable = 0
#
self.readheaders()
#
if self.seekable:
try:
self.startofbody = self.fp.tell()
except IOError:
self.seekable = 0
def rewindbody(self):
if not self.seekable:
raise IOError, "unseekable file"
self.fp.seek(self.startofbody)
def readheaders(self):
self.dict = {}
self.unixfrom = ''
self.headers = list = []
self.status = ''
headerseen = ""
firstline = 1
startofline = unread = tell = None
if hasattr(self.fp, 'unread'):
unread = self.fp.unread
elif self.seekable:
tell = self.fp.tell
while 1:
if tell:
try:
startofline = tell()
except IOError:
startofline = tell = None
self.seekable = 0
line = self.fp.readline()
if not line:
self.status = 'EOF in headers'
break
if firstline and line.startswith('From '):
self.unixfrom = self.unixfrom + line
continue
firstline = 0
if headerseen and line[0] in ' \t':
list.append(line)
x = (self.dict[headerseen] + "\n " + line.strip())
self.dict[headerseen] = x.strip()
continue
elif self.iscomment(line):
continue
elif self.islast(line):
break
headerseen = self.isheader(line)
if headerseen:
list.append(line)
self.dict[headerseen] = line[len(headerseen)+1:].strip()
continue
else:
if not self.dict:
self.status = 'No headers'
else:
self.status = 'Non-header line where header expected'
if unread:
unread(line)
elif tell:
self.fp.seek(startofline)
else:
self.status = self.status + '; bad seek'
break
def isheader(self, line):
i = line.find(':')
if i > 0:
return line[:i].lower()
else:
return None
def islast(self, line):
return line in _blanklines
def iscomment(self, line):
return None
def getallmatchingheaders(self, name):
name = name.lower() + ':'
n = len(name)
list = []
hit = 0
for line in self.headers:
if line[:n].lower() == name:
hit = 1
elif not line[:1].isspace():
hit = 0
if hit:
list.append(line)
return list
def getfirstmatchingheader(self, name):
name = name.lower() + ':'
n = len(name)
list = []
hit = 0
for line in self.headers:
if hit:
if not line[:1].isspace():
break
elif line[:n].lower() == name:
hit = 1
if hit:
list.append(line)
return list
def getrawheader(self, name):
list = self.getfirstmatchingheader(name)
if not list:
return None
list[0] = list[0][len(name) + 1:]
return ''.join(list)
def getheader(self, name, default=None):
try:
return self.dict[name.lower()]
except KeyError:
return default
get = getheader
def getheaders(self, name):
result = []
current = ''
have_header = 0
for s in self.getallmatchingheaders(name):
if s[0].isspace():
if current:
current = "%s\n %s" % (current, s.strip())
else:
current = s.strip()
else:
if have_header:
result.append(current)
current = s[s.find(":") + 1:].strip()
have_header = 1
if have_header:
result.append(current)
return result
def getaddr(self, name):
# New, by Ben Escoto
alist = self.getaddrlist(name)
if alist:
return alist[0]
else:
return (None, None)
def getaddrlist(self, name):
raw = []
for h in self.getallmatchingheaders(name):
if h[0] in ' \t':
raw.append(h)
else:
if raw:
raw.append(', ')
i = h.find(':')
if i > 0:
addr = h[i+1:]
raw.append(addr)
alladdrs = ''.join(raw)
a = AddrlistClass(alladdrs)
return a.getaddrlist()
def getdate(self, name):
try:
data = self[name]
except KeyError:
return None
return parsedate(data)
def getdate_tz(self, name):
try:
data = self[name]
except KeyError:
return None
return parsedate_tz(data)
# Access as a dictionary (only finds *last* header of each type):
def __len__(self):
return len(self.dict)
def __getitem__(self, name):
return self.dict[name.lower()]
def __setitem__(self, name, value):
del self[name] # Won't fail if it doesn't exist
self.dict[name.lower()] = value
text = name + ": " + value
lines = text.split("\n")
for line in lines:
self.headers.append(line + "\n")
def __delitem__(self, name):
name = name.lower()
if not self.dict.has_key(name):
return
del self.dict[name]
name = name + ':'
n = len(name)
list = []
hit = 0
for i in range(len(self.headers)):
line = self.headers[i]
if line[:n].lower() == name:
hit = 1
elif not line[:1].isspace():
hit = 0
if hit:
list.append(i)
list.reverse()
for i in list:
del self.headers[i]
def setdefault(self, name, default=""):
lowername = name.lower()
if self.dict.has_key(lowername):
return self.dict[lowername]
else:
text = name + ": " + default
lines = text.split("\n")
for line in lines:
self.headers.append(line + "\n")
self.dict[lowername] = default
return default
def has_key(self, name):
return self.dict.has_key(name.lower())
def keys(self):
return self.dict.keys()
def values(self):
return self.dict.values()
def items(self):
return self.dict.items()
def __str__(self):
str = ''
for hdr in self.headers:
str = str + hdr
return str
# Utility functions
# -----------------
def unquote(str):
if len(str) > 1:
if str[0] == '"' and str[-1:] == '"':
return str[1:-1]
if str[0] == '<' and str[-1:] == '>':
return str[1:-1]
return str
def quote(str):
return str.replace('\\', '\\\\').replace('"', '\\"')
def parseaddr(address):
a = AddressList(address)
list = a.addresslist
if not list:
return (None, None)
else:
return list[0]
class AddrlistClass:
def __init__(self, field):
self.specials = '()<>@,:;.\"[]'
self.pos = 0
self.LWS = ' \t'
self.CR = '\r\n'
self.atomends = self.specials + self.LWS + self.CR
self.phraseends = self.atomends.replace('.', '')
self.field = field
self.commentlist = []
def gotonext(self):
while self.pos < len(self.field):
if self.field[self.pos] in self.LWS + '\n\r':
self.pos = self.pos + 1
elif self.field[self.pos] == '(':
self.commentlist.append(self.getcomment())
else: break
def getaddrlist(self):
result = []
while 1:
ad = self.getaddress()
if ad:
result += ad
else:
break
return result
def getaddress(self):
self.commentlist = []
self.gotonext()
oldpos = self.pos
oldcl = self.commentlist
plist = self.getphraselist()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -