imaplib.py

来自「mallet是自然语言处理、机器学习领域的一个开源项目。」· Python 代码 · 共 1,209 行 · 第 1/3 页

PY
1,209
字号
        """Get the ACLs for a mailbox.        (typ, [data]) = <instance>.getacl(mailbox)        """        typ, dat = self._simple_command('GETACL', mailbox)        return self._untagged_response(typ, dat, 'ACL')    def list(self, directory='""', pattern='*'):        """List mailbox names in directory matching pattern.        (typ, [data]) = <instance>.list(directory='""', pattern='*')        'data' is list of LIST responses.        """        name = 'LIST'        typ, dat = self._simple_command(name, directory, pattern)        return self._untagged_response(typ, dat, name)    def login(self, user, password):        """Identify client using plaintext password.        (typ, [data]) = <instance>.login(user, password)        NB: 'password' will be quoted.        """        #if not 'AUTH=LOGIN' in self.capabilities:        #       raise self.error("Server doesn't allow LOGIN authentication." % mech)        typ, dat = self._simple_command('LOGIN', user, self._quote(password))        if typ != 'OK':            raise self.error(dat[-1])        self.state = 'AUTH'        return typ, dat    def logout(self):        """Shutdown connection to server.        (typ, [data]) = <instance>.logout()        Returns server 'BYE' response.        """        self.state = 'LOGOUT'        try: typ, dat = self._simple_command('LOGOUT')        except: typ, dat = 'NO', ['%s: %s' % sys.exc_info()[:2]]        self.shutdown()        if self.untagged_responses.has_key('BYE'):            return 'BYE', self.untagged_responses['BYE']        return typ, dat    def lsub(self, directory='""', pattern='*'):        """List 'subscribed' mailbox names in directory matching pattern.        (typ, [data, ...]) = <instance>.lsub(directory='""', pattern='*')        'data' are tuples of message part envelope and data.        """        name = 'LSUB'        typ, dat = self._simple_command(name, directory, pattern)        return self._untagged_response(typ, dat, name)    def namespace(self):        """ Returns IMAP namespaces ala rfc2342        (typ, [data, ...]) = <instance>.namespace()        """        name = 'NAMESPACE'        typ, dat = self._simple_command(name)        return self._untagged_response(typ, dat, name)    def noop(self):        """Send NOOP command.        (typ, data) = <instance>.noop()        """        if __debug__:            if self.debug >= 3:                _dump_ur(self.untagged_responses)        return self._simple_command('NOOP')    def partial(self, message_num, message_part, start, length):        """Fetch truncated part of a message.        (typ, [data, ...]) = <instance>.partial(message_num, message_part, start, length)        'data' is tuple of message part envelope and data.        """        name = 'PARTIAL'        typ, dat = self._simple_command(name, message_num, message_part, start, length)        return self._untagged_response(typ, dat, 'FETCH')    def rename(self, oldmailbox, newmailbox):        """Rename old mailbox name to new.        (typ, data) = <instance>.rename(oldmailbox, newmailbox)        """        return self._simple_command('RENAME', oldmailbox, newmailbox)    def search(self, charset, *criteria):        """Search mailbox for matching messages.        (typ, [data]) = <instance>.search(charset, criterium, ...)        'data' is space separated list of matching message numbers.        """        name = 'SEARCH'        if charset:            typ, dat = apply(self._simple_command, (name, 'CHARSET', charset) + criteria)        else:            typ, dat = apply(self._simple_command, (name,) + criteria)        return self._untagged_response(typ, dat, name)    def select(self, mailbox='INBOX', readonly=None):        """Select a mailbox.        Flush all untagged responses.        (typ, [data]) = <instance>.select(mailbox='INBOX', readonly=None)        'data' is count of messages in mailbox ('EXISTS' response).        """        # Mandated responses are ('FLAGS', 'EXISTS', 'RECENT', 'UIDVALIDITY')        self.untagged_responses = {}    # Flush old responses.        self.is_readonly = readonly        if readonly:            name = 'EXAMINE'        else:            name = 'SELECT'        typ, dat = self._simple_command(name, mailbox)        if typ != 'OK':            self.state = 'AUTH'     # Might have been 'SELECTED'            return typ, dat        self.state = 'SELECTED'        if self.untagged_responses.has_key('READ-ONLY') \                and not readonly:            if __debug__:                if self.debug >= 1:                    _dump_ur(self.untagged_responses)            raise self.readonly('%s is not writable' % mailbox)        return typ, self.untagged_responses.get('EXISTS', [None])    def setacl(self, mailbox, who, what):        """Set a mailbox acl.        (typ, [data]) = <instance>.create(mailbox, who, what)        """        return self._simple_command('SETACL', mailbox, who, what)    def sort(self, sort_criteria, charset, *search_criteria):        """IMAP4rev1 extension SORT command.        (typ, [data]) = <instance>.sort(sort_criteria, charset, search_criteria, ...)        """        name = 'SORT'        #if not name in self.capabilities:      # Let the server decide!        #       raise self.error('unimplemented extension command: %s' % name)        if (sort_criteria[0],sort_criteria[-1]) != ('(',')'):            sort_criteria = '(%s)' % sort_criteria        typ, dat = apply(self._simple_command, (name, sort_criteria, charset) + search_criteria)        return self._untagged_response(typ, dat, name)    def status(self, mailbox, names):        """Request named status conditions for mailbox.        (typ, [data]) = <instance>.status(mailbox, names)        """        name = 'STATUS'        #if self.PROTOCOL_VERSION == 'IMAP4':   # Let the server decide!        #    raise self.error('%s unimplemented in IMAP4 (obtain IMAP4rev1 server, or re-code)' % name)        typ, dat = self._simple_command(name, mailbox, names)        return self._untagged_response(typ, dat, name)    def store(self, message_set, command, flags):        """Alters flag dispositions for messages in mailbox.        (typ, [data]) = <instance>.store(message_set, command, flags)        """        if (flags[0],flags[-1]) != ('(',')'):            flags = '(%s)' % flags  # Avoid quoting the flags        typ, dat = self._simple_command('STORE', message_set, command, flags)        return self._untagged_response(typ, dat, 'FETCH')    def subscribe(self, mailbox):        """Subscribe to new mailbox.        (typ, [data]) = <instance>.subscribe(mailbox)        """        return self._simple_command('SUBSCRIBE', mailbox)    def uid(self, command, *args):        """Execute "command arg ..." with messages identified by UID,                rather than message number.        (typ, [data]) = <instance>.uid(command, arg1, arg2, ...)        Returns response appropriate to 'command'.        """        command = command.upper()        if not Commands.has_key(command):            raise self.error("Unknown IMAP4 UID command: %s" % command)        if self.state not in Commands[command]:            raise self.error('command %s illegal in state %s'                                    % (command, self.state))        name = 'UID'        typ, dat = apply(self._simple_command, (name, command) + args)        if command in ('SEARCH', 'SORT'):            name = command        else:            name = 'FETCH'        return self._untagged_response(typ, dat, name)    def unsubscribe(self, mailbox):        """Unsubscribe from old mailbox.        (typ, [data]) = <instance>.unsubscribe(mailbox)        """        return self._simple_command('UNSUBSCRIBE', mailbox)    def xatom(self, name, *args):        """Allow simple extension commands                notified by server in CAPABILITY response.        Assumes command is legal in current state.        (typ, [data]) = <instance>.xatom(name, arg, ...)        Returns response appropriate to extension command `name'.        """        name = name.upper()        #if not name in self.capabilities:      # Let the server decide!        #    raise self.error('unknown extension command: %s' % name)        if not Commands.has_key(name):            Commands[name] = (self.state,)        return apply(self._simple_command, (name,) + args)    #       Private methods    def _append_untagged(self, typ, dat):        if dat is None: dat = ''        ur = self.untagged_responses        if __debug__:            if self.debug >= 5:                _mesg('untagged_responses[%s] %s += ["%s"]' %                        (typ, len(ur.get(typ,'')), dat))        if ur.has_key(typ):            ur[typ].append(dat)        else:            ur[typ] = [dat]    def _check_bye(self):        bye = self.untagged_responses.get('BYE')        if bye:            raise self.abort(bye[-1])    def _command(self, name, *args):        if self.state not in Commands[name]:            self.literal = None            raise self.error(            'command %s illegal in state %s' % (name, self.state))        for typ in ('OK', 'NO', 'BAD'):            if self.untagged_responses.has_key(typ):                del self.untagged_responses[typ]        if self.untagged_responses.has_key('READ-ONLY') \        and not self.is_readonly:            raise self.readonly('mailbox status changed to READ-ONLY')        tag = self._new_tag()        data = '%s %s' % (tag, name)        for arg in args:            if arg is None: continue            data = '%s %s' % (data, self._checkquote(arg))        literal = self.literal        if literal is not None:            self.literal = None            if type(literal) is type(self._command):                literator = literal            else:                literator = None                data = '%s {%s}' % (data, len(literal))        if __debug__:            if self.debug >= 4:                _mesg('> %s' % data)            else:                _log('> %s' % data)        try:            self.send('%s%s' % (data, CRLF))        except (socket.error, OSError), val:            raise self.abort('socket error: %s' % val)        if literal is None:            return tag        while 1:            # Wait for continuation response            while self._get_response():                if self.tagged_commands[tag]:   # BAD/NO?                    return tag            # Send literal            if literator:                literal = literator(self.continuation_response)            if __debug__:                if self.debug >= 4:                    _mesg('write literal size %s' % len(literal))            try:                self.send(literal)                self.send(CRLF)            except (socket.error, OSError), val:                raise self.abort('socket error: %s' % val)            if not literator:                break        return tag    def _command_complete(self, name, tag):        self._check_bye()        try:            typ, data = self._get_tagged_response(tag)        except self.abort, val:            raise self.abort('command: %s => %s' % (name, val))        except self.error, val:            raise self.error('command: %s => %s' % (name, val))        self._check_bye()        if typ == 'BAD':            raise self.error('%s command error: %s %s' % (name, typ, data))        return typ, data    def _get_response(self):        # Read response and store.        #        # Returns None for continuation responses,        # otherwise first response line received.        resp = self._get_line()        # Command completion response?        if self._match(self.tagre, resp):            tag = self.mo.group('tag')            if not self.tagged_commands.has_key(tag):                raise self.abort('unexpected tagged response: %s' % resp)            typ = self.mo.group('type')            dat = self.mo.group('data')            self.tagged_commands[tag] = (typ, [dat])        else:            dat2 = None            # '*' (untagged) responses?            if not self._match(Untagged_response, resp):                if self._match(Untagged_status, resp):                    dat2 = self.mo.group('data2')            if self.mo is None:                # Only other possibility is '+' (continuation) response...                if self._match(Continuation, resp):                    self.continuation_response = self.mo.group('data')                    return None     # NB: indicates continuation                raise self.abort("unexpected response: '%s'" % resp)            typ = self.mo.group('type')            dat = self.mo.group('data')            if dat is None: dat = ''        # Null untagged response            if dat2: dat = dat + ' ' + dat2

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?