imaplib.py
来自「mallet是自然语言处理、机器学习领域的一个开源项目。」· Python 代码 · 共 1,209 行 · 第 1/3 页
PY
1,209 行
"""Get the ACLs for a mailbox. (typ, [data]) = <instance>.getacl(mailbox) """ typ, dat = self._simple_command('GETACL', mailbox) return self._untagged_response(typ, dat, 'ACL') def list(self, directory='""', pattern='*'): """List mailbox names in directory matching pattern. (typ, [data]) = <instance>.list(directory='""', pattern='*') 'data' is list of LIST responses. """ name = 'LIST' typ, dat = self._simple_command(name, directory, pattern) return self._untagged_response(typ, dat, name) def login(self, user, password): """Identify client using plaintext password. (typ, [data]) = <instance>.login(user, password) NB: 'password' will be quoted. """ #if not 'AUTH=LOGIN' in self.capabilities: # raise self.error("Server doesn't allow LOGIN authentication." % mech) typ, dat = self._simple_command('LOGIN', user, self._quote(password)) if typ != 'OK': raise self.error(dat[-1]) self.state = 'AUTH' return typ, dat def logout(self): """Shutdown connection to server. (typ, [data]) = <instance>.logout() Returns server 'BYE' response. """ self.state = 'LOGOUT' try: typ, dat = self._simple_command('LOGOUT') except: typ, dat = 'NO', ['%s: %s' % sys.exc_info()[:2]] self.shutdown() if self.untagged_responses.has_key('BYE'): return 'BYE', self.untagged_responses['BYE'] return typ, dat def lsub(self, directory='""', pattern='*'): """List 'subscribed' mailbox names in directory matching pattern. (typ, [data, ...]) = <instance>.lsub(directory='""', pattern='*') 'data' are tuples of message part envelope and data. """ name = 'LSUB' typ, dat = self._simple_command(name, directory, pattern) return self._untagged_response(typ, dat, name) def namespace(self): """ Returns IMAP namespaces ala rfc2342 (typ, [data, ...]) = <instance>.namespace() """ name = 'NAMESPACE' typ, dat = self._simple_command(name) return self._untagged_response(typ, dat, name) def noop(self): """Send NOOP command. (typ, data) = <instance>.noop() """ if __debug__: if self.debug >= 3: _dump_ur(self.untagged_responses) return self._simple_command('NOOP') def partial(self, message_num, message_part, start, length): """Fetch truncated part of a message. (typ, [data, ...]) = <instance>.partial(message_num, message_part, start, length) 'data' is tuple of message part envelope and data. """ name = 'PARTIAL' typ, dat = self._simple_command(name, message_num, message_part, start, length) return self._untagged_response(typ, dat, 'FETCH') def rename(self, oldmailbox, newmailbox): """Rename old mailbox name to new. (typ, data) = <instance>.rename(oldmailbox, newmailbox) """ return self._simple_command('RENAME', oldmailbox, newmailbox) def search(self, charset, *criteria): """Search mailbox for matching messages. (typ, [data]) = <instance>.search(charset, criterium, ...) 'data' is space separated list of matching message numbers. """ name = 'SEARCH' if charset: typ, dat = apply(self._simple_command, (name, 'CHARSET', charset) + criteria) else: typ, dat = apply(self._simple_command, (name,) + criteria) return self._untagged_response(typ, dat, name) def select(self, mailbox='INBOX', readonly=None): """Select a mailbox. Flush all untagged responses. (typ, [data]) = <instance>.select(mailbox='INBOX', readonly=None) 'data' is count of messages in mailbox ('EXISTS' response). """ # Mandated responses are ('FLAGS', 'EXISTS', 'RECENT', 'UIDVALIDITY') self.untagged_responses = {} # Flush old responses. self.is_readonly = readonly if readonly: name = 'EXAMINE' else: name = 'SELECT' typ, dat = self._simple_command(name, mailbox) if typ != 'OK': self.state = 'AUTH' # Might have been 'SELECTED' return typ, dat self.state = 'SELECTED' if self.untagged_responses.has_key('READ-ONLY') \ and not readonly: if __debug__: if self.debug >= 1: _dump_ur(self.untagged_responses) raise self.readonly('%s is not writable' % mailbox) return typ, self.untagged_responses.get('EXISTS', [None]) def setacl(self, mailbox, who, what): """Set a mailbox acl. (typ, [data]) = <instance>.create(mailbox, who, what) """ return self._simple_command('SETACL', mailbox, who, what) def sort(self, sort_criteria, charset, *search_criteria): """IMAP4rev1 extension SORT command. (typ, [data]) = <instance>.sort(sort_criteria, charset, search_criteria, ...) """ name = 'SORT' #if not name in self.capabilities: # Let the server decide! # raise self.error('unimplemented extension command: %s' % name) if (sort_criteria[0],sort_criteria[-1]) != ('(',')'): sort_criteria = '(%s)' % sort_criteria typ, dat = apply(self._simple_command, (name, sort_criteria, charset) + search_criteria) return self._untagged_response(typ, dat, name) def status(self, mailbox, names): """Request named status conditions for mailbox. (typ, [data]) = <instance>.status(mailbox, names) """ name = 'STATUS' #if self.PROTOCOL_VERSION == 'IMAP4': # Let the server decide! # raise self.error('%s unimplemented in IMAP4 (obtain IMAP4rev1 server, or re-code)' % name) typ, dat = self._simple_command(name, mailbox, names) return self._untagged_response(typ, dat, name) def store(self, message_set, command, flags): """Alters flag dispositions for messages in mailbox. (typ, [data]) = <instance>.store(message_set, command, flags) """ if (flags[0],flags[-1]) != ('(',')'): flags = '(%s)' % flags # Avoid quoting the flags typ, dat = self._simple_command('STORE', message_set, command, flags) return self._untagged_response(typ, dat, 'FETCH') def subscribe(self, mailbox): """Subscribe to new mailbox. (typ, [data]) = <instance>.subscribe(mailbox) """ return self._simple_command('SUBSCRIBE', mailbox) def uid(self, command, *args): """Execute "command arg ..." with messages identified by UID, rather than message number. (typ, [data]) = <instance>.uid(command, arg1, arg2, ...) Returns response appropriate to 'command'. """ command = command.upper() if not Commands.has_key(command): raise self.error("Unknown IMAP4 UID command: %s" % command) if self.state not in Commands[command]: raise self.error('command %s illegal in state %s' % (command, self.state)) name = 'UID' typ, dat = apply(self._simple_command, (name, command) + args) if command in ('SEARCH', 'SORT'): name = command else: name = 'FETCH' return self._untagged_response(typ, dat, name) def unsubscribe(self, mailbox): """Unsubscribe from old mailbox. (typ, [data]) = <instance>.unsubscribe(mailbox) """ return self._simple_command('UNSUBSCRIBE', mailbox) def xatom(self, name, *args): """Allow simple extension commands notified by server in CAPABILITY response. Assumes command is legal in current state. (typ, [data]) = <instance>.xatom(name, arg, ...) Returns response appropriate to extension command `name'. """ name = name.upper() #if not name in self.capabilities: # Let the server decide! # raise self.error('unknown extension command: %s' % name) if not Commands.has_key(name): Commands[name] = (self.state,) return apply(self._simple_command, (name,) + args) # Private methods def _append_untagged(self, typ, dat): if dat is None: dat = '' ur = self.untagged_responses if __debug__: if self.debug >= 5: _mesg('untagged_responses[%s] %s += ["%s"]' % (typ, len(ur.get(typ,'')), dat)) if ur.has_key(typ): ur[typ].append(dat) else: ur[typ] = [dat] def _check_bye(self): bye = self.untagged_responses.get('BYE') if bye: raise self.abort(bye[-1]) def _command(self, name, *args): if self.state not in Commands[name]: self.literal = None raise self.error( 'command %s illegal in state %s' % (name, self.state)) for typ in ('OK', 'NO', 'BAD'): if self.untagged_responses.has_key(typ): del self.untagged_responses[typ] if self.untagged_responses.has_key('READ-ONLY') \ and not self.is_readonly: raise self.readonly('mailbox status changed to READ-ONLY') tag = self._new_tag() data = '%s %s' % (tag, name) for arg in args: if arg is None: continue data = '%s %s' % (data, self._checkquote(arg)) literal = self.literal if literal is not None: self.literal = None if type(literal) is type(self._command): literator = literal else: literator = None data = '%s {%s}' % (data, len(literal)) if __debug__: if self.debug >= 4: _mesg('> %s' % data) else: _log('> %s' % data) try: self.send('%s%s' % (data, CRLF)) except (socket.error, OSError), val: raise self.abort('socket error: %s' % val) if literal is None: return tag while 1: # Wait for continuation response while self._get_response(): if self.tagged_commands[tag]: # BAD/NO? return tag # Send literal if literator: literal = literator(self.continuation_response) if __debug__: if self.debug >= 4: _mesg('write literal size %s' % len(literal)) try: self.send(literal) self.send(CRLF) except (socket.error, OSError), val: raise self.abort('socket error: %s' % val) if not literator: break return tag def _command_complete(self, name, tag): self._check_bye() try: typ, data = self._get_tagged_response(tag) except self.abort, val: raise self.abort('command: %s => %s' % (name, val)) except self.error, val: raise self.error('command: %s => %s' % (name, val)) self._check_bye() if typ == 'BAD': raise self.error('%s command error: %s %s' % (name, typ, data)) return typ, data def _get_response(self): # Read response and store. # # Returns None for continuation responses, # otherwise first response line received. resp = self._get_line() # Command completion response? if self._match(self.tagre, resp): tag = self.mo.group('tag') if not self.tagged_commands.has_key(tag): raise self.abort('unexpected tagged response: %s' % resp) typ = self.mo.group('type') dat = self.mo.group('data') self.tagged_commands[tag] = (typ, [dat]) else: dat2 = None # '*' (untagged) responses? if not self._match(Untagged_response, resp): if self._match(Untagged_status, resp): dat2 = self.mo.group('data2') if self.mo is None: # Only other possibility is '+' (continuation) response... if self._match(Continuation, resp): self.continuation_response = self.mo.group('data') return None # NB: indicates continuation raise self.abort("unexpected response: '%s'" % resp) typ = self.mo.group('type') dat = self.mo.group('data') if dat is None: dat = '' # Null untagged response if dat2: dat = dat + ' ' + dat2
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?