imaplib.py

来自「mallet是自然语言处理、机器学习领域的一个开源项目。」· Python 代码 · 共 1,209 行 · 第 1/3 页

PY
1,209
字号
            # Is there a literal to come?            while self._match(Literal, dat):                # Read literal direct from connection.                size = int(self.mo.group('size'))                if __debug__:                    if self.debug >= 4:                        _mesg('read literal size %s' % size)                data = self.read(size)                # Store response with literal as tuple                self._append_untagged(typ, (dat, data))                # Read trailer - possibly containing another literal                dat = self._get_line()            self._append_untagged(typ, dat)        # Bracketed response information?        if typ in ('OK', 'NO', 'BAD') and self._match(Response_code, dat):            self._append_untagged(self.mo.group('type'), self.mo.group('data'))        if __debug__:            if self.debug >= 1 and typ in ('NO', 'BAD', 'BYE'):                _mesg('%s response: %s' % (typ, dat))        return resp    def _get_tagged_response(self, tag):        while 1:            result = self.tagged_commands[tag]            if result is not None:                del self.tagged_commands[tag]                return result            # Some have reported "unexpected response" exceptions.            # Note that ignoring them here causes loops.            # Instead, send me details of the unexpected response and            # I'll update the code in `_get_response()'.            try:                self._get_response()            except self.abort, val:                if __debug__:                    if self.debug >= 1:                        print_log()                raise    def _get_line(self):        line = self.readline()        if not line:            raise self.abort('socket error: EOF')        # Protocol mandates all lines terminated by CRLF        line = line[:-2]        if __debug__:            if self.debug >= 4:                _mesg('< %s' % line)            else:                _log('< %s' % line)        return line    def _match(self, cre, s):        # Run compiled regular expression match method on 's'.        # Save result, return success.        self.mo = cre.match(s)        if __debug__:            if self.mo is not None and self.debug >= 5:                _mesg("\tmatched r'%s' => %s" % (cre.pattern, `self.mo.groups()`))        return self.mo is not None    def _new_tag(self):        tag = '%s%s' % (self.tagpre, self.tagnum)        self.tagnum = self.tagnum + 1        self.tagged_commands[tag] = None        return tag    def _checkquote(self, arg):        # Must quote command args if non-alphanumeric chars present,        # and not already quoted.        if type(arg) is not type(''):            return arg        if (arg[0],arg[-1]) in (('(',')'),('"','"')):            return arg        if self.mustquote.search(arg) is None:            return arg        return self._quote(arg)    def _quote(self, arg):        arg = arg.replace('\\', '\\\\')        arg = arg.replace('"', '\\"')        return '"%s"' % arg    def _simple_command(self, name, *args):        return self._command_complete(name, apply(self._command, (name,) + args))    def _untagged_response(self, typ, dat, name):        if typ == 'NO':            return typ, dat        if not self.untagged_responses.has_key(name):            return typ, [None]        data = self.untagged_responses[name]        if __debug__:            if self.debug >= 5:                _mesg('untagged_responses[%s] => %s' % (name, data))        del self.untagged_responses[name]        return typ, dataclass _Authenticator:    """Private class to provide en/decoding            for base64-based authentication conversation.    """    def __init__(self, mechinst):        self.mech = mechinst    # Callable object to provide/process data    def process(self, data):        ret = self.mech(self.decode(data))        if ret is None:            return '*'      # Abort conversation        return self.encode(ret)    def encode(self, inp):        #        #  Invoke binascii.b2a_base64 iteratively with        #  short even length buffers, strip the trailing        #  line feed from the result and append.  "Even"        #  means a number that factors to both 6 and 8,        #  so when it gets to the end of the 8-bit input        #  there's no partial 6-bit output.        #        oup = ''        while inp:            if len(inp) > 48:                t = inp[:48]                inp = inp[48:]            else:                t = inp                inp = ''            e = binascii.b2a_base64(t)            if e:                oup = oup + e[:-1]        return oup    def decode(self, inp):        if not inp:            return ''        return binascii.a2b_base64(inp)Mon2num = {'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6,        'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12}def Internaldate2tuple(resp):    """Convert IMAP4 INTERNALDATE to UT.    Returns Python time module tuple.    """    mo = InternalDate.match(resp)    if not mo:        return None    mon = Mon2num[mo.group('mon')]    zonen = mo.group('zonen')    day = int(mo.group('day'))    year = int(mo.group('year'))    hour = int(mo.group('hour'))    min = int(mo.group('min'))    sec = int(mo.group('sec'))    zoneh = int(mo.group('zoneh'))    zonem = int(mo.group('zonem'))    # INTERNALDATE timezone must be subtracted to get UT    zone = (zoneh*60 + zonem)*60    if zonen == '-':        zone = -zone    tt = (year, mon, day, hour, min, sec, -1, -1, -1)    utc = time.mktime(tt)    # Following is necessary because the time module has no 'mkgmtime'.    # 'mktime' assumes arg in local timezone, so adds timezone/altzone.    lt = time.localtime(utc)    if time.daylight and lt[-1]:        zone = zone + time.altzone    else:        zone = zone + time.timezone    return time.localtime(utc - zone)def Int2AP(num):    """Convert integer to A-P string representation."""    val = ''; AP = 'ABCDEFGHIJKLMNOP'    num = int(abs(num))    while num:        num, mod = divmod(num, 16)        val = AP[mod] + val    return valdef ParseFlags(resp):    """Convert IMAP4 flags response to python tuple."""    mo = Flags.match(resp)    if not mo:        return ()    return tuple(mo.group('flags').split())def Time2Internaldate(date_time):    """Convert 'date_time' to IMAP4 INTERNALDATE representation.    Return string in form: '"DD-Mmm-YYYY HH:MM:SS +HHMM"'    """    if isinstance(date_time, (int, float)):        tt = time.localtime(date_time)    elif isinstance(date_time, (tuple, time.struct_time)):        tt = date_time    elif isinstance(date_time, str):        return date_time        # Assume in correct format    else:        raise ValueError("date_time not of a known type")    dt = time.strftime("%d-%b-%Y %H:%M:%S", tt)    if dt[0] == '0':        dt = ' ' + dt[1:]    if time.daylight and tt[-1]:        zone = -time.altzone    else:        zone = -time.timezone    return '"' + dt + " %+03d%02d" % divmod(zone/60, 60) + '"'if __debug__:    def _mesg(s, secs=None):        if secs is None:            secs = time.time()        tm = time.strftime('%M:%S', time.localtime(secs))        sys.stderr.write('  %s.%02d %s\n' % (tm, (secs*100)%100, s))        sys.stderr.flush()    def _dump_ur(dict):        # Dump untagged responses (in `dict').        l = dict.items()        if not l: return        t = '\n\t\t'        l = map(lambda x:'%s: "%s"' % (x[0], x[1][0] and '" "'.join(x[1]) or ''), l)        _mesg('untagged responses dump:%s%s' % (t, t.join(l)))    _cmd_log = []           # Last `_cmd_log_len' interactions    _cmd_log_len = 10    def _log(line):        # Keep log of last `_cmd_log_len' interactions for debugging.        if len(_cmd_log) == _cmd_log_len:            del _cmd_log[0]        _cmd_log.append((time.time(), line))    def print_log():        _mesg('last %d IMAP4 interactions:' % len(_cmd_log))        for secs,line in _cmd_log:            _mesg(line, secs)if __name__ == '__main__':    import getopt, getpass    try:        optlist, args = getopt.getopt(sys.argv[1:], 'd:')    except getopt.error, val:        pass    for opt,val in optlist:        if opt == '-d':            Debug = int(val)    if not args: args = ('',)    host = args[0]    USER = getpass.getuser()    PASSWD = getpass.getpass("IMAP password for %s on %s: " % (USER, host or "localhost"))    test_mesg = 'From: %(user)s@localhost%(lf)sSubject: IMAP4 test%(lf)s%(lf)sdata...%(lf)s' % {'user':USER, 'lf':CRLF}    test_seq1 = (    ('login', (USER, PASSWD)),    ('create', ('/tmp/xxx 1',)),    ('rename', ('/tmp/xxx 1', '/tmp/yyy')),    ('CREATE', ('/tmp/yyz 2',)),    ('append', ('/tmp/yyz 2', None, None, test_mesg)),    ('list', ('/tmp', 'yy*')),    ('select', ('/tmp/yyz 2',)),    ('search', (None, 'SUBJECT', 'test')),    ('partial', ('1', 'RFC822', 1, 1024)),    ('store', ('1', 'FLAGS', '(\Deleted)')),    ('namespace', ()),    ('expunge', ()),    ('recent', ()),    ('close', ()),    )    test_seq2 = (    ('select', ()),    ('response',('UIDVALIDITY',)),    ('uid', ('SEARCH', 'ALL')),    ('response', ('EXISTS',)),    ('append', (None, None, None, test_mesg)),    ('recent', ()),    ('logout', ()),    )    def run(cmd, args):        _mesg('%s %s' % (cmd, args))        typ, dat = apply(getattr(M, cmd), args)        _mesg('%s => %s %s' % (cmd, typ, dat))        return dat    try:        M = IMAP4(host)        _mesg('PROTOCOL_VERSION = %s' % M.PROTOCOL_VERSION)        _mesg('CAPABILITIES = %s' % `M.capabilities`)        for cmd,args in test_seq1:            run(cmd, args)        for ml in run('list', ('/tmp/', 'yy%')):            mo = re.match(r'.*"([^"]+)"$', ml)            if mo: path = mo.group(1)            else: path = ml.split()[-1]            run('delete', (path,))        for cmd,args in test_seq2:            dat = run(cmd, args)            if (cmd,args) != ('uid', ('SEARCH', 'ALL')):                continue            uid = dat[-1].split()            if not uid: continue            run('uid', ('FETCH', '%s' % uid[-1],                    '(FLAGS INTERNALDATE RFC822.SIZE RFC822.HEADER RFC822.TEXT)'))        print '\nAll tests OK.'    except:        print '\nTests failed.'        if not Debug:            print '''If you would like to see debugging output,try: %s -d5''' % sys.argv[0]        raise

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?