imaplib.py
来自「mallet是自然语言处理、机器学习领域的一个开源项目。」· Python 代码 · 共 1,209 行 · 第 1/3 页
PY
1,209 行
# Is there a literal to come? while self._match(Literal, dat): # Read literal direct from connection. size = int(self.mo.group('size')) if __debug__: if self.debug >= 4: _mesg('read literal size %s' % size) data = self.read(size) # Store response with literal as tuple self._append_untagged(typ, (dat, data)) # Read trailer - possibly containing another literal dat = self._get_line() self._append_untagged(typ, dat) # Bracketed response information? if typ in ('OK', 'NO', 'BAD') and self._match(Response_code, dat): self._append_untagged(self.mo.group('type'), self.mo.group('data')) if __debug__: if self.debug >= 1 and typ in ('NO', 'BAD', 'BYE'): _mesg('%s response: %s' % (typ, dat)) return resp def _get_tagged_response(self, tag): while 1: result = self.tagged_commands[tag] if result is not None: del self.tagged_commands[tag] return result # Some have reported "unexpected response" exceptions. # Note that ignoring them here causes loops. # Instead, send me details of the unexpected response and # I'll update the code in `_get_response()'. try: self._get_response() except self.abort, val: if __debug__: if self.debug >= 1: print_log() raise def _get_line(self): line = self.readline() if not line: raise self.abort('socket error: EOF') # Protocol mandates all lines terminated by CRLF line = line[:-2] if __debug__: if self.debug >= 4: _mesg('< %s' % line) else: _log('< %s' % line) return line def _match(self, cre, s): # Run compiled regular expression match method on 's'. # Save result, return success. self.mo = cre.match(s) if __debug__: if self.mo is not None and self.debug >= 5: _mesg("\tmatched r'%s' => %s" % (cre.pattern, `self.mo.groups()`)) return self.mo is not None def _new_tag(self): tag = '%s%s' % (self.tagpre, self.tagnum) self.tagnum = self.tagnum + 1 self.tagged_commands[tag] = None return tag def _checkquote(self, arg): # Must quote command args if non-alphanumeric chars present, # and not already quoted. if type(arg) is not type(''): return arg if (arg[0],arg[-1]) in (('(',')'),('"','"')): return arg if self.mustquote.search(arg) is None: return arg return self._quote(arg) def _quote(self, arg): arg = arg.replace('\\', '\\\\') arg = arg.replace('"', '\\"') return '"%s"' % arg def _simple_command(self, name, *args): return self._command_complete(name, apply(self._command, (name,) + args)) def _untagged_response(self, typ, dat, name): if typ == 'NO': return typ, dat if not self.untagged_responses.has_key(name): return typ, [None] data = self.untagged_responses[name] if __debug__: if self.debug >= 5: _mesg('untagged_responses[%s] => %s' % (name, data)) del self.untagged_responses[name] return typ, dataclass _Authenticator: """Private class to provide en/decoding for base64-based authentication conversation. """ def __init__(self, mechinst): self.mech = mechinst # Callable object to provide/process data def process(self, data): ret = self.mech(self.decode(data)) if ret is None: return '*' # Abort conversation return self.encode(ret) def encode(self, inp): # # Invoke binascii.b2a_base64 iteratively with # short even length buffers, strip the trailing # line feed from the result and append. "Even" # means a number that factors to both 6 and 8, # so when it gets to the end of the 8-bit input # there's no partial 6-bit output. # oup = '' while inp: if len(inp) > 48: t = inp[:48] inp = inp[48:] else: t = inp inp = '' e = binascii.b2a_base64(t) if e: oup = oup + e[:-1] return oup def decode(self, inp): if not inp: return '' return binascii.a2b_base64(inp)Mon2num = {'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6, 'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12}def Internaldate2tuple(resp): """Convert IMAP4 INTERNALDATE to UT. Returns Python time module tuple. """ mo = InternalDate.match(resp) if not mo: return None mon = Mon2num[mo.group('mon')] zonen = mo.group('zonen') day = int(mo.group('day')) year = int(mo.group('year')) hour = int(mo.group('hour')) min = int(mo.group('min')) sec = int(mo.group('sec')) zoneh = int(mo.group('zoneh')) zonem = int(mo.group('zonem')) # INTERNALDATE timezone must be subtracted to get UT zone = (zoneh*60 + zonem)*60 if zonen == '-': zone = -zone tt = (year, mon, day, hour, min, sec, -1, -1, -1) utc = time.mktime(tt) # Following is necessary because the time module has no 'mkgmtime'. # 'mktime' assumes arg in local timezone, so adds timezone/altzone. lt = time.localtime(utc) if time.daylight and lt[-1]: zone = zone + time.altzone else: zone = zone + time.timezone return time.localtime(utc - zone)def Int2AP(num): """Convert integer to A-P string representation.""" val = ''; AP = 'ABCDEFGHIJKLMNOP' num = int(abs(num)) while num: num, mod = divmod(num, 16) val = AP[mod] + val return valdef ParseFlags(resp): """Convert IMAP4 flags response to python tuple.""" mo = Flags.match(resp) if not mo: return () return tuple(mo.group('flags').split())def Time2Internaldate(date_time): """Convert 'date_time' to IMAP4 INTERNALDATE representation. Return string in form: '"DD-Mmm-YYYY HH:MM:SS +HHMM"' """ if isinstance(date_time, (int, float)): tt = time.localtime(date_time) elif isinstance(date_time, (tuple, time.struct_time)): tt = date_time elif isinstance(date_time, str): return date_time # Assume in correct format else: raise ValueError("date_time not of a known type") dt = time.strftime("%d-%b-%Y %H:%M:%S", tt) if dt[0] == '0': dt = ' ' + dt[1:] if time.daylight and tt[-1]: zone = -time.altzone else: zone = -time.timezone return '"' + dt + " %+03d%02d" % divmod(zone/60, 60) + '"'if __debug__: def _mesg(s, secs=None): if secs is None: secs = time.time() tm = time.strftime('%M:%S', time.localtime(secs)) sys.stderr.write(' %s.%02d %s\n' % (tm, (secs*100)%100, s)) sys.stderr.flush() def _dump_ur(dict): # Dump untagged responses (in `dict'). l = dict.items() if not l: return t = '\n\t\t' l = map(lambda x:'%s: "%s"' % (x[0], x[1][0] and '" "'.join(x[1]) or ''), l) _mesg('untagged responses dump:%s%s' % (t, t.join(l))) _cmd_log = [] # Last `_cmd_log_len' interactions _cmd_log_len = 10 def _log(line): # Keep log of last `_cmd_log_len' interactions for debugging. if len(_cmd_log) == _cmd_log_len: del _cmd_log[0] _cmd_log.append((time.time(), line)) def print_log(): _mesg('last %d IMAP4 interactions:' % len(_cmd_log)) for secs,line in _cmd_log: _mesg(line, secs)if __name__ == '__main__': import getopt, getpass try: optlist, args = getopt.getopt(sys.argv[1:], 'd:') except getopt.error, val: pass for opt,val in optlist: if opt == '-d': Debug = int(val) if not args: args = ('',) host = args[0] USER = getpass.getuser() PASSWD = getpass.getpass("IMAP password for %s on %s: " % (USER, host or "localhost")) test_mesg = 'From: %(user)s@localhost%(lf)sSubject: IMAP4 test%(lf)s%(lf)sdata...%(lf)s' % {'user':USER, 'lf':CRLF} test_seq1 = ( ('login', (USER, PASSWD)), ('create', ('/tmp/xxx 1',)), ('rename', ('/tmp/xxx 1', '/tmp/yyy')), ('CREATE', ('/tmp/yyz 2',)), ('append', ('/tmp/yyz 2', None, None, test_mesg)), ('list', ('/tmp', 'yy*')), ('select', ('/tmp/yyz 2',)), ('search', (None, 'SUBJECT', 'test')), ('partial', ('1', 'RFC822', 1, 1024)), ('store', ('1', 'FLAGS', '(\Deleted)')), ('namespace', ()), ('expunge', ()), ('recent', ()), ('close', ()), ) test_seq2 = ( ('select', ()), ('response',('UIDVALIDITY',)), ('uid', ('SEARCH', 'ALL')), ('response', ('EXISTS',)), ('append', (None, None, None, test_mesg)), ('recent', ()), ('logout', ()), ) def run(cmd, args): _mesg('%s %s' % (cmd, args)) typ, dat = apply(getattr(M, cmd), args) _mesg('%s => %s %s' % (cmd, typ, dat)) return dat try: M = IMAP4(host) _mesg('PROTOCOL_VERSION = %s' % M.PROTOCOL_VERSION) _mesg('CAPABILITIES = %s' % `M.capabilities`) for cmd,args in test_seq1: run(cmd, args) for ml in run('list', ('/tmp/', 'yy%')): mo = re.match(r'.*"([^"]+)"$', ml) if mo: path = mo.group(1) else: path = ml.split()[-1] run('delete', (path,)) for cmd,args in test_seq2: dat = run(cmd, args) if (cmd,args) != ('uid', ('SEARCH', 'ALL')): continue uid = dat[-1].split() if not uid: continue run('uid', ('FETCH', '%s' % uid[-1], '(FLAGS INTERNALDATE RFC822.SIZE RFC822.HEADER RFC822.TEXT)')) print '\nAll tests OK.' except: print '\nTests failed.' if not Debug: print '''If you would like to see debugging output,try: %s -d5''' % sys.argv[0] raise
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?