diff --git a/Doc/library/imaplib.rst b/Doc/library/imaplib.rst index df2468f7124e6d6..aca3e0a4dc9559d 100644 --- a/Doc/library/imaplib.rst +++ b/Doc/library/imaplib.rst @@ -179,6 +179,9 @@ enclosed with either parentheses or double quotes) each string is quoted. However, the *password* argument to the ``LOGIN`` command is always quoted. If you want to avoid having an argument string quoted (eg: the *flags* argument to ``STORE``) then enclose the string in parentheses (eg: ``r'(\Deleted)'``). +In general, pass arguments unquoted and let the module quote them as needed. +An argument that is already enclosed in double quotes is left unchanged, +so that code which quotes arguments itself keeps working. Most commands return a tuple: ``(type, [data, ...])`` where *type* is usually ``'OK'`` or ``'NO'``, and *data* is either the text from the command response, @@ -392,7 +395,7 @@ An :class:`IMAP4` instance has the following methods: .. versionadded:: 3.14 -.. method:: IMAP4.list([directory[, pattern]]) +.. method:: IMAP4.list(directory='', pattern='*') List mailbox names in *directory* matching *pattern*. *directory* defaults to the top-level mail folder, and *pattern* defaults to match anything. Returned @@ -422,7 +425,7 @@ An :class:`IMAP4` instance has the following methods: The method no longer ignores silently arbitrary exceptions. -.. method:: IMAP4.lsub(directory='""', pattern='*') +.. method:: IMAP4.lsub(directory='', pattern='*') List subscribed mailbox names in directory matching pattern. *directory* defaults to the top level directory and *pattern* defaults to match any mailbox. diff --git a/Lib/imaplib.py b/Lib/imaplib.py index 497b5a60cecb083..03a82362a5ba7f3 100644 --- a/Lib/imaplib.py +++ b/Lib/imaplib.py @@ -130,6 +130,9 @@ _Literal = br'.*{(?P\d+)}$' _Untagged_status = br'\* (?P\d+) (?P[A-Z-]+)( (?P.*))?' _control_chars = re.compile(b'[\x00-\x1F\x7F]') +_non_astring_char = re.compile(br'[(){ \x00-\x1f\x7f%*\\"]') +_non_list_char = re.compile(br'[(){ \x00-\x1f\x7f\\"]') +_quoted = re.compile(br'"(?:[^"\\]|\\.)*+"') class IMAP4: @@ -494,8 +497,7 @@ def append(self, mailbox, flags, date_time, message): if not mailbox: mailbox = 'INBOX' if flags: - if (flags[0],flags[-1]) != ('(',')'): - flags = '(%s)' % flags + flags = self._set_quote(flags) else: flags = None if date_time: @@ -504,7 +506,7 @@ def append(self, mailbox, flags, date_time, message): date_time = None literal = MapCRLF.sub(CRLF, message) self.literal = literal - return self._simple_command(name, mailbox, flags, date_time) + return self._simple_command(name, self._astring(mailbox), flags, date_time) def authenticate(self, mechanism, authobject): @@ -529,7 +531,7 @@ def authenticate(self, mechanism, authobject): #if not cap in self.capabilities: # Let the server decide! # raise self.error("Server doesn't allow %s authentication." % mech) self.literal = _Authenticator(authobject).process - typ, dat = self._simple_command('AUTHENTICATE', mech) + typ, dat = self._simple_command('AUTHENTICATE', self._atom(mech)) if typ != 'OK': raise self.error(dat[-1].decode('utf-8', 'replace')) self.state = 'AUTH' @@ -573,7 +575,8 @@ def copy(self, message_set, new_mailbox): (typ, [data]) = .copy(message_set, new_mailbox) """ - return self._simple_command('COPY', message_set, new_mailbox) + return self._simple_command('COPY', self._sequence_set(message_set), + self._astring(new_mailbox)) def create(self, mailbox): @@ -581,7 +584,7 @@ def create(self, mailbox): (typ, [data]) = .create(mailbox) """ - return self._simple_command('CREATE', mailbox) + return self._simple_command('CREATE', self._astring(mailbox)) def delete(self, mailbox): @@ -589,14 +592,15 @@ def delete(self, mailbox): (typ, [data]) = .delete(mailbox) """ - return self._simple_command('DELETE', mailbox) + return self._simple_command('DELETE', self._astring(mailbox)) def deleteacl(self, mailbox, who): """Delete the ACLs (remove any rights) set for who on mailbox. (typ, [data]) = .deleteacl(mailbox, who) """ - return self._simple_command('DELETEACL', mailbox, who) + return self._simple_command('DELETEACL', self._astring(mailbox), + self._astring(who)) def enable(self, capability): """Send an RFC5161 enable string to the server. @@ -635,7 +639,8 @@ def fetch(self, message_set, message_parts): 'data' are tuples of message part envelope and data. """ name = 'FETCH' - typ, dat = self._simple_command(name, message_set, message_parts) + typ, dat = self._simple_command(name, self._sequence_set(message_set), + self._set_quote(message_parts)) return self._untagged_response(typ, dat, name) @@ -644,7 +649,7 @@ def getacl(self, mailbox): (typ, [data]) = .getacl(mailbox) """ - typ, dat = self._simple_command('GETACL', mailbox) + typ, dat = self._simple_command('GETACL', self._astring(mailbox)) return self._untagged_response(typ, dat, 'ACL') @@ -652,7 +657,8 @@ def getannotation(self, mailbox, entry, attribute): """(typ, [data]) = .getannotation(mailbox, entry, attribute) Retrieve ANNOTATIONs.""" - typ, dat = self._simple_command('GETANNOTATION', mailbox, entry, attribute) + typ, dat = self._simple_command('GETANNOTATION', self._astring(mailbox), + entry, attribute) return self._untagged_response(typ, dat, 'ANNOTATION') @@ -663,7 +669,7 @@ def getquota(self, root): (typ, [data]) = .getquota(root) """ - typ, dat = self._simple_command('GETQUOTA', root) + typ, dat = self._simple_command('GETQUOTA', self._astring(root)) return self._untagged_response(typ, dat, 'QUOTA') @@ -672,7 +678,7 @@ def getquotaroot(self, mailbox): (typ, [[QUOTAROOT responses...], [QUOTA responses]]) = .getquotaroot(mailbox) """ - typ, dat = self._simple_command('GETQUOTAROOT', mailbox) + typ, dat = self._simple_command('GETQUOTAROOT', self._astring(mailbox)) typ, quota = self._untagged_response(typ, dat, 'QUOTA') typ, quotaroot = self._untagged_response(typ, dat, 'QUOTAROOT') return typ, [quotaroot, quota] @@ -691,15 +697,16 @@ def idle(self, duration=None): return Idler(self, duration) - def list(self, directory='""', pattern='*'): + def list(self, directory='', pattern='*'): """List mailbox names in directory matching pattern. - (typ, [data]) = .list(directory='""', pattern='*') + (typ, [data]) = .list(directory='', pattern='*') 'data' is list of LIST responses. """ name = 'LIST' - typ, dat = self._simple_command(name, directory, pattern) + typ, dat = self._simple_command(name, self._astring(directory), + self._list_mailbox(pattern)) return self._untagged_response(typ, dat, name) @@ -710,7 +717,8 @@ def login(self, user, password): NB: 'password' will be quoted. """ - typ, dat = self._simple_command('LOGIN', user, self._quote(password)) + typ, dat = self._simple_command('LOGIN', self._astring(user), + self._quote(password)) if typ != 'OK': raise self.error(dat[-1].decode('UTF-8', 'replace')) self.state = 'AUTH' @@ -755,15 +763,16 @@ def logout(self): return typ, dat - def lsub(self, directory='""', pattern='*'): + def lsub(self, directory='', pattern='*'): """List 'subscribed' mailbox names in directory matching pattern. - (typ, [data, ...]) = .lsub(directory='""', pattern='*') + (typ, [data, ...]) = .lsub(directory='', pattern='*') 'data' are tuples of message part envelope and data. """ name = 'LSUB' - typ, dat = self._simple_command(name, directory, pattern) + typ, dat = self._simple_command(name, self._astring(directory), + self._list_mailbox(pattern)) return self._untagged_response(typ, dat, name) def myrights(self, mailbox): @@ -771,7 +780,7 @@ def myrights(self, mailbox): (typ, [data]) = .myrights(mailbox) """ - typ,dat = self._simple_command('MYRIGHTS', mailbox) + typ,dat = self._simple_command('MYRIGHTS', self._astring(mailbox)) return self._untagged_response(typ, dat, 'MYRIGHTS') def namespace(self): @@ -817,7 +826,7 @@ def proxyauth(self, user): """ name = 'PROXYAUTH' - return self._simple_command(name, user) + return self._simple_command(name, self._astring(user)) def rename(self, oldmailbox, newmailbox): @@ -825,7 +834,8 @@ def rename(self, oldmailbox, newmailbox): (typ, [data]) = .rename(oldmailbox, newmailbox) """ - return self._simple_command('RENAME', oldmailbox, newmailbox) + return self._simple_command('RENAME', self._astring(oldmailbox), + self._astring(newmailbox)) def search(self, charset, *criteria): @@ -837,10 +847,11 @@ def search(self, charset, *criteria): If UTF8 is enabled, charset MUST be None. """ name = 'SEARCH' - if charset: + if charset is not None: if self.utf8_enabled: raise IMAP4.error("Non-None charset not valid in UTF8 mode") - typ, dat = self._simple_command(name, 'CHARSET', charset, *criteria) + typ, dat = self._simple_command(name, + 'CHARSET', self._astring(charset), *criteria) else: typ, dat = self._simple_command(name, *criteria) return self._untagged_response(typ, dat, name) @@ -864,7 +875,7 @@ def select(self, mailbox='INBOX', readonly=False): name = 'EXAMINE' else: name = 'SELECT' - typ, dat = self._simple_command(name, mailbox) + typ, dat = self._simple_command(name, self._astring(mailbox)) if typ != 'OK': self.state = 'AUTH' # Might have been 'SELECTED' return typ, dat @@ -883,14 +894,15 @@ def setacl(self, mailbox, who, what): (typ, [data]) = .setacl(mailbox, who, what) """ - return self._simple_command('SETACL', mailbox, who, what) + return self._simple_command('SETACL', self._astring(mailbox), + self._astring(who), self._astring(what)) - def setannotation(self, *args): + def setannotation(self, mailbox, *args): """(typ, [data]) = .setannotation(mailbox[, entry, attribute]+) Set ANNOTATIONs.""" - typ, dat = self._simple_command('SETANNOTATION', *args) + typ, dat = self._simple_command('SETANNOTATION', self._astring(mailbox), *args) return self._untagged_response(typ, dat, 'ANNOTATION') @@ -899,7 +911,8 @@ def setquota(self, root, limits): (typ, [data]) = .setquota(root, limits) """ - typ, dat = self._simple_command('SETQUOTA', root, limits) + typ, dat = self._simple_command('SETQUOTA', self._astring(root), + self._set_quote(limits)) return self._untagged_response(typ, dat, 'QUOTA') @@ -911,8 +924,9 @@ def sort(self, sort_criteria, charset, *search_criteria): name = 'SORT' #if not name in self.capabilities: # Let the server decide! # raise self.error('unimplemented extension command: %s' % name) - if (sort_criteria[0],sort_criteria[-1]) != ('(',')'): - sort_criteria = '(%s)' % sort_criteria + sort_criteria = self._set_quote(sort_criteria) + if charset is not None: + charset = self._astring(charset) typ, dat = self._simple_command(name, sort_criteria, charset, *search_criteria) return self._untagged_response(typ, dat, name) @@ -949,7 +963,8 @@ def status(self, mailbox, names): name = 'STATUS' #if self.PROTOCOL_VERSION == 'IMAP4': # Let the server decide! # raise self.error('%s unimplemented in IMAP4 (obtain IMAP4rev1 server, or re-code)' % name) - typ, dat = self._simple_command(name, mailbox, names) + typ, dat = self._simple_command(name, self._astring(mailbox), + self._set_quote(names)) return self._untagged_response(typ, dat, name) @@ -958,9 +973,9 @@ def store(self, message_set, command, flags): (typ, [data]) = .store(message_set, command, flags) """ - if (flags[0],flags[-1]) != ('(',')'): - flags = '(%s)' % flags # Avoid quoting the flags - typ, dat = self._simple_command('STORE', message_set, command, flags) + flags = self._set_quote(flags) + typ, dat = self._simple_command('STORE', self._sequence_set(message_set), + command, flags) return self._untagged_response(typ, dat, 'FETCH') @@ -969,7 +984,7 @@ def subscribe(self, mailbox): (typ, [data]) = .subscribe(mailbox) """ - return self._simple_command('SUBSCRIBE', mailbox) + return self._simple_command('SUBSCRIBE', self._astring(mailbox)) def thread(self, threading_algorithm, charset, *search_criteria): @@ -978,7 +993,10 @@ def thread(self, threading_algorithm, charset, *search_criteria): (type, [data]) = .thread(threading_algorithm, charset, search_criteria, ...) """ name = 'THREAD' - typ, dat = self._simple_command(name, threading_algorithm, charset, *search_criteria) + if charset is not None: + charset = self._astring(charset) + typ, dat = self._simple_command(name, self._atom(threading_algorithm), + charset, *search_criteria) return self._untagged_response(typ, dat, name) @@ -999,7 +1017,31 @@ def uid(self, command, *args): (command, self.state, ', '.join(Commands[command]))) name = 'UID' - typ, dat = self._simple_command(name, command, *args) + if command == 'COPY': + message_set, new_mailbox = args + args = (self._sequence_set(message_set), + self._astring(new_mailbox)) + elif command == 'FETCH': + message_set, message_parts = args + args = (self._sequence_set(message_set), + self._set_quote(message_parts)) + elif command == 'STORE': + message_set, op, flags = args + args = (self._sequence_set(message_set), op, + self._set_quote(flags)) + elif command == 'SORT': + sort_criteria, charset, *search_criteria = args + if charset is not None: + charset = self._astring(charset) + args = (self._set_quote(sort_criteria), charset, + *search_criteria) + elif command == 'THREAD': + threading_algorithm, charset, *search_criteria = args + if charset is not None: + charset = self._astring(charset) + args = (self._atom(threading_algorithm), charset, + *search_criteria) + typ, dat = self._simple_command(name, self._atom(command), *args) if command in ('SEARCH', 'SORT', 'THREAD'): name = command else: @@ -1012,7 +1054,7 @@ def unsubscribe(self, mailbox): (typ, [data]) = .unsubscribe(mailbox) """ - return self._simple_command('UNSUBSCRIBE', mailbox) + return self._simple_command('UNSUBSCRIBE', self._astring(mailbox)) def unselect(self): @@ -1368,13 +1410,46 @@ def _new_tag(self): return tag - def _quote(self, arg): + def _atom(self, arg): + return arg - arg = arg.replace('\\', '\\\\') - arg = arg.replace('"', '\\"') + def _sequence_set(self, arg): + return arg - return '"' + arg + '"' + def _set_quote(self, arg): + if arg and arg[0] == '(' and arg[-1] == ')': + return arg + return '(' + arg + ')' + def _quote(self, arg): + if isinstance(arg, str): + arg = bytes(arg, self._encoding) + arg = arg.replace(b'\\', br'\\') + arg = arg.replace(b'"', br'\"') + return b'"' + arg + b'"' + + # For backward compatibility, an argument already enclosed in double + # quotes is left unquoted, so that code which quotes arguments itself + # keeps working. New code should pass arguments unquoted and let the + # module quote them as needed. + + def _astring(self, arg): + if isinstance(arg, str): + arg = bytes(arg, self._encoding) + if _quoted.fullmatch(arg): + return arg + if arg and _non_astring_char.search(arg) is None: + return arg + return self._quote(arg) + + def _list_mailbox(self, arg): + if isinstance(arg, str): + arg = bytes(arg, self._encoding) + if _quoted.fullmatch(arg): + return arg + if arg and _non_list_char.search(arg) is None: + return arg + return self._quote(arg) def _simple_command(self, name, *args): diff --git a/Lib/test/test_imaplib.py b/Lib/test/test_imaplib.py index fb256fb7cbcd344..b001af850f458cd 100644 --- a/Lib/test/test_imaplib.py +++ b/Lib/test/test_imaplib.py @@ -26,6 +26,47 @@ CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "certdata", "keycert3.pem") CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "certdata", "pycacert.pem") +_quoted_string = re.compile(r'"(?:[^"\\]|\\.)*"') + +def splitargs(line): + # Split a command line into IMAP arguments: quoted strings, balanced + # (possibly nested) parenthesized lists, and bare atoms. + args = [] + i, n = 0, len(line) + while i < n: + if line[i] == ' ': + i += 1 + continue + start = i + depth = 0 + while i < n and (depth or line[i] != ' '): + c = line[i] + if c == '"': + i = _quoted_string.match(line, i).end() + continue + elif c == '(': + depth += 1 + elif c == ')': + depth -= 1 + i += 1 + args.append(line[start:i]) + return args + +def parse_seq_number(s, maxmsg): + if s == '*': + return maxmsg + return int(s) + +def parse_sequence_set(arg, maxmsg): + for s in arg.split(','): + if ':' in s: + lo, hi = s.split(':') + lo = parse_seq_number(lo, maxmsg) + hi = parse_seq_number(hi, maxmsg) + yield from range(min(lo, hi), max(lo, hi) + 1) + else: + yield parse_seq_number(s, maxmsg) + class TestImaplib(unittest.TestCase): @@ -91,6 +132,55 @@ def test_imap4_host_default_value(self): imaplib.IMAP4() self.assertIn(cm.exception.errno, expected_errnos) + def test_astring(self): + m = imaplib.IMAP4.__new__(imaplib.IMAP4) + m._encoding = 'ascii' + # Plain atoms are left unquoted. + self.assertEqual(m._astring('INBOX'), b'INBOX') + self.assertEqual(m._astring(b'INBOX'), b'INBOX') + # Names with protocol-sensitive characters are quoted. + self.assertEqual(m._astring('New folder'), b'"New folder"') + self.assertEqual(m._astring('a"b'), b'"a\\"b"') + self.assertEqual(m._astring('a\\b'), b'"a\\\\b"') + self.assertEqual(m._astring(''), b'""') + self.assertEqual(m._astring('*'), b'"*"') + # A well-formed quoted string is passed through unchanged. + self.assertEqual(m._astring('"New folder"'), b'"New folder"') + self.assertEqual(m._astring('""'), b'""') + # Including a lenient (non-RFC) backslash escape, which the server + # may accept. + self.assertEqual(m._astring('"a\\b"'), b'"a\\b"') + # A string that only looks quoted but is not a single token is + # quoted as data, closing the argument injection vector. + self.assertEqual(m._astring('"a" SELECT evil "'), + b'"\\"a\\" SELECT evil \\""') + self.assertEqual(m._astring('"'), b'"\\""') + + def test_astring_idempotent(self): + # Quoting an already quoted argument should not change it, so that + # quoting twice gives the same result as quoting once. + m = imaplib.IMAP4.__new__(imaplib.IMAP4) + m._encoding = 'ascii' + for arg in ['INBOX', 'New folder', 'a"b', 'a\\b', '', '*', '%', + '"New folder"', '""', '"a\\b"', '"a" SELECT evil "', + '"', 'a\tb', 'a\rb', '\x7f', '(a)']: + with self.subTest(arg=arg): + once = m._astring(arg) + self.assertEqual(m._astring(once), once) + twice = m._list_mailbox(arg) + self.assertEqual(m._list_mailbox(twice), twice) + + def test_list_mailbox(self): + m = imaplib.IMAP4.__new__(imaplib.IMAP4) + m._encoding = 'ascii' + # Wildcards are not quoted in a list pattern. + self.assertEqual(m._list_mailbox('*'), b'*') + self.assertEqual(m._list_mailbox('%'), b'%') + self.assertEqual(m._list_mailbox('foo/%'), b'foo/%') + # But spaces still require quoting. + self.assertEqual(m._list_mailbox('New folder'), b'"New folder"') + self.assertEqual(m._list_mailbox('"New folder"'), b'"New folder"') + if ssl: class SecureTCPServer(socketserver.TCPServer): @@ -119,11 +209,11 @@ class SimpleIMAPHandler(socketserver.StreamRequestHandler): def setup(self): super().setup() - self.server.is_selected = False + self.server.is_selected = None self.server.logged = None def _send(self, message): - if verbose: + if verbose > 1: print("SENT: %r" % message.strip()) self.wfile.write(message) @@ -157,7 +247,7 @@ def handle(self): if line.endswith(b'\r\n'): break - if verbose: + if verbose > 1: print('GOT: %r' % line.strip()) if self.continuation: try: @@ -165,10 +255,11 @@ def handle(self): except StopIteration: self.continuation = None continue - splitline = line.decode('ASCII').split() + splitline = splitargs(line.decode().removesuffix('\r\n')) tag = splitline[0] cmd = splitline[1] args = splitline[2:] + self.server.args = args if hasattr(self, 'cmd_' + cmd): continuation = getattr(self, 'cmd_' + cmd)(tag, args) @@ -191,17 +282,17 @@ def cmd_LOGOUT(self, tag, args): self._send_tagged(tag, 'OK', 'LOGOUT completed') def cmd_LOGIN(self, tag, args): - self.server.logged = args[0] + self.server.logged = args self._send_tagged(tag, 'OK', 'LOGIN completed') def cmd_SELECT(self, tag, args): - self.server.is_selected = True + self.server.is_selected = args self._send_line(b'* 2 EXISTS') self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.') def cmd_UNSELECT(self, tag, args): - if self.server.is_selected: - self.server.is_selected = False + if self.server.is_selected is not None: + self.server.is_selected = None self._send_tagged(tag, 'OK', 'Returned to authenticated state. (Success)') else: self._send_tagged(tag, 'BAD', 'No mailbox selected') @@ -268,6 +359,24 @@ def cmd_AUTHENTICATE(self, tag, args): self._send_tagged(tag, 'NO', 'No access') +def make_simple_handler(command, untagged_response=(), + completed=None): + if completed is None: + completed = f'{command} completed' + def cmd(self, tag, args): + for msg in untagged_response: + self._send_textline(msg) + self._send_tagged(tag, 'OK', completed) + cmd.__name__ = 'cmd_' + command + class Handler(SimpleIMAPHandler): + pass + Handler.__name__ = command.title() + 'Handler' + setattr(Handler, cmd.__name__, cmd) + Handler.__qualname__ = Handler.__name__ + cmd.__qualname__ = Handler.__qualname__ + '.' + cmd.__name__ + return Handler + + class NewIMAPTestsMixin: client = None @@ -353,6 +462,23 @@ def test_enable_raises_error_if_no_capability(self): 'does not support ENABLE'): client.enable('foo') + def test_enable(self): + class EnableHandler(SimpleIMAPHandler): + capabilities = 'IMAP4rev1 ID LITERAL+ ENABLE X-GOOD-IDEA' + def cmd_ENABLE(self, tag, args): + capabilities = self.capabilities.split() + for arg in args: + if arg in capabilities: + self._send_textline('* ENABLED ' + arg) + self._send_tagged(tag, 'OK', 'foo') + + client, server = self._setup(EnableHandler) + client.login('user', 'pass') + code, data = client.enable('CONDSTORE X-GOOD-IDEA') + self.assertEqual(code, 'OK') + self.assertEqual(data, [b'foo']) + self.assertEqual(server.args, ['CONDSTORE', 'X-GOOD-IDEA']) + def test_enable_UTF8_raises_error_if_not_supported(self): client, _ = self._setup(SimpleIMAPHandler) typ, data = client.login('user', 'pass') @@ -386,6 +512,7 @@ def cmd_APPEND(self, tag, args): code, _ = client.enable('UTF8=ACCEPT') self.assertEqual(code, 'OK') self.assertEqual(client._encoding, 'utf-8') + self.assertEqual(server.args, ['UTF8=ACCEPT']) msg_string = 'Subject: üñí©öðé' typ, data = client.append( None, None, None, (msg_string + '\n').encode('utf-8')) @@ -545,7 +672,7 @@ def test_with_statement(self): _, server = self._setup(SimpleIMAPHandler, connect=False) with self.imap_class(*server.server_address) as imap: imap.login('user', 'pass') - self.assertEqual(server.logged, 'user') + self.assertEqual(server.logged, ['user', '"pass"']) self.assertIsNone(server.logged) def test_with_statement_logout(self): @@ -553,7 +680,7 @@ def test_with_statement_logout(self): _, server = self._setup(SimpleIMAPHandler, connect=False) with self.imap_class(*server.server_address) as imap: imap.login('user', 'pass') - self.assertEqual(server.logged, 'user') + self.assertEqual(server.logged, ['user', '"pass"']) imap.logout() self.assertIsNone(server.logged) self.assertIsNone(server.logged) @@ -628,11 +755,33 @@ def test_idle_delayed_packet(self): self.fail('multi-packet response was corrupted by idle timeout') def test_login(self): - client, _ = self._setup(SimpleIMAPHandler) + client, server = self._setup(SimpleIMAPHandler) typ, data = client.login('user', 'pass') self.assertEqual(typ, 'OK') self.assertEqual(data[0], b'LOGIN completed') self.assertEqual(client.state, 'AUTH') + # The user name is quoted only when necessary, but the password + # is always quoted. + self.assertEqual(server.logged, ['user', '"pass"']) + self.assertRaises(imaplib.IMAP4.error, client.login, 'user', 'pass') + + def test_login_quoted(self): + client, server = self._setup(SimpleIMAPHandler) + typ, data = client.login('us*r', 'p%ss') + self.assertEqual(typ, 'OK') + self.assertEqual(data[0], b'LOGIN completed') + self.assertEqual(client.state, 'AUTH') + self.assertEqual(server.logged, ['"us*r"', '"p%ss"']) + + def test_login_quoted2(self): + # An already quoted user name is passed through unchanged, rather + # than being quoted a second time; the password is always quoted. + client, server = self._setup(SimpleIMAPHandler) + typ, data = client.login('"user"', '"pass"') + self.assertEqual(typ, 'OK') + self.assertEqual(data[0], b'LOGIN completed') + self.assertEqual(client.state, 'AUTH') + self.assertEqual(server.logged, ['"user"', r'"\"pass\""']) def test_logout(self): client, _ = self._setup(SimpleIMAPHandler) @@ -655,8 +804,24 @@ def cmd_LSUB(self, tag, args): self.assertEqual(typ, 'OK') self.assertEqual(data[0], b'() "." directoryA') + def test_select(self): + client, server = self._setup(SimpleIMAPHandler) + client.login('user', 'pass') + typ, data = client.select() + self.assertEqual(typ, 'OK') + self.assertEqual(data[0], b'2') + self.assertEqual(server.is_selected, ['INBOX']) + + typ, data = client.select('Archive') + self.assertEqual(typ, 'OK') + self.assertEqual(server.is_selected, ['Archive']) + + typ, data = client.select('New folder') + self.assertEqual(typ, 'OK') + self.assertEqual(server.is_selected, ['"New folder"']) + def test_unselect(self): - client, _ = self._setup(SimpleIMAPHandler) + client, server = self._setup(SimpleIMAPHandler) client.login('user', 'pass') typ, data = client.select() self.assertEqual(typ, 'OK') @@ -666,6 +831,566 @@ def test_unselect(self): self.assertEqual(typ, 'OK') self.assertEqual(data[0], b'Returned to authenticated state. (Success)') self.assertEqual(client.state, 'AUTH') + self.assertIsNone(server.is_selected) + + def test_create(self): + client, server = self._setup(make_simple_handler('CREATE')) + client.login('user', 'pass') + typ, data = client.create('owatagusiam/') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'CREATE completed']) + self.assertEqual(server.args, ['owatagusiam/']) + + typ, data = client.create('owatagusiam/blurdybloop') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'CREATE completed']) + self.assertEqual(server.args, ['owatagusiam/blurdybloop']) + + typ, data = client.create('New folder') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'CREATE completed']) + self.assertEqual(server.args, ['"New folder"']) + + def test_copy(self): + client, server = self._setup(make_simple_handler('COPY')) + client.login('user', 'pass') + client.select() + typ, data = client.copy('2:4', 'MEETING') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'COPY completed']) + self.assertEqual(server.args, ['2:4', 'MEETING']) + + typ, data = client.copy('2:4', 'New folder') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'COPY completed']) + self.assertEqual(server.args, ['2:4', '"New folder"']) + + def test_uid_copy(self): + client, server = self._setup(make_simple_handler('UID', + completed='UID COPY completed')) + client.login('user', 'pass') + client.select() + typ, data = client.uid('copy', '4827313:4828442', 'MEETING') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [None]) + self.assertEqual(server.args, ['COPY', '4827313:4828442', 'MEETING']) + + typ, data = client.uid('copy', '4827313:4828442', 'New folder') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [None]) + self.assertEqual(server.args, ['COPY', '4827313:4828442', '"New folder"']) + + def test_store(self): + client, server = self._setup(make_simple_handler('STORE', [ + r'* 2 FETCH (FLAGS (\Deleted \Seen))', + r'* 3 FETCH (FLAGS (\Deleted))', + r'* 4 FETCH (FLAGS (\Deleted \Flagged \Seen))', + ])) + client.login('user', 'pass') + client.select() + typ, data = client.store('2:4', '+FLAGS', r'(\Deleted)') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + br'2 (FLAGS (\Deleted \Seen))', + br'3 (FLAGS (\Deleted))', + br'4 (FLAGS (\Deleted \Flagged \Seen))', + ]) + self.assertEqual(server.args, ['2:4', '+FLAGS', r'(\Deleted)']) + + typ, data = client.store('2:4', '+FLAGS', r'\Deleted') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['2:4', '+FLAGS', r'(\Deleted)']) + + def test_uid_store(self): + client, server = self._setup(make_simple_handler('UID', [ + r'* 23 FETCH (FLAGS (\Deleted \Seen) UID 4827313)', + r'* 24 FETCH (FLAGS (\Deleted) UID 4827943)', + r'* 25 FETCH (FLAGS (\Deleted \Flagged \Seen) UID 4828442)', + ], 'UID STORE completed')) + client.login('user', 'pass') + client.select() + typ, data = client.uid('store', '4827313:4828442', '+FLAGS', r'(\Deleted)') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + br'23 (FLAGS (\Deleted \Seen) UID 4827313)', + br'24 (FLAGS (\Deleted) UID 4827943)', + br'25 (FLAGS (\Deleted \Flagged \Seen) UID 4828442)', + ]) + self.assertEqual(server.args, ['STORE', '4827313:4828442', '+FLAGS', r'(\Deleted)']) + + typ, data = client.uid('store', '4827313:4828442', '+FLAGS', r'\Deleted') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['STORE', '4827313:4828442', '+FLAGS', r'(\Deleted)']) + + def test_fetch(self): + # The handler expands the requested sequence set and answers for + # exactly those messages, so the test exercises the round trip of + # the message set, not just a canned reply. + class FetchHandler(SimpleIMAPHandler): + messages = 4 + def cmd_SELECT(self, tag, args): + self.server.is_selected = args + self._send_line(b'* %d EXISTS' % self.messages) + self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.') + def cmd_FETCH(self, tag, args): + for n in parse_sequence_set(args[0], self.messages): + self._send_textline(r'* %d FETCH (FLAGS (\Seen))' % n) + self._send_tagged(tag, 'OK', 'FETCH completed') + + client, server = self._setup(FetchHandler) + client.login('user', 'pass') + client.select() + typ, data = client.fetch('2:4', '(FLAGS)') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + br'2 (FLAGS (\Seen))', + br'3 (FLAGS (\Seen))', + br'4 (FLAGS (\Seen))', + ]) + self.assertEqual(server.args, ['2:4', '(FLAGS)']) + + # message_parts is wrapped in parentheses if it is not already. + typ, data = client.fetch('2:4', 'FLAGS') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + br'2 (FLAGS (\Seen))', + br'3 (FLAGS (\Seen))', + br'4 (FLAGS (\Seen))', + ]) + self.assertEqual(server.args, ['2:4', '(FLAGS)']) + + # A comma-separated set with an open range up to '*'. + typ, data = client.fetch('1,3:*', '(FLAGS)') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + br'1 (FLAGS (\Seen))', + br'3 (FLAGS (\Seen))', + br'4 (FLAGS (\Seen))', + ]) + self.assertEqual(server.args, ['1,3:*', '(FLAGS)']) + + # An item with nested parentheses is sent (and parsed) as a + # single argument. + typ, data = client.fetch('1', '(BODY[HEADER.FIELDS (DATE FROM)])') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [br'1 (FLAGS (\Seen))']) + self.assertEqual(server.args, ['1', '(BODY[HEADER.FIELDS (DATE FROM)])']) + + def test_uid_fetch(self): + client, server = self._setup(make_simple_handler('UID', [ + r'* 23 FETCH (FLAGS (\Seen) UID 4827313)', + r'* 24 FETCH (FLAGS (\Seen) UID 4827943)', + r'* 25 FETCH (FLAGS (\Seen) UID 4828442)', + ], 'UID FETCH completed')) + client.login('user', 'pass') + client.select() + typ, data = client.uid('fetch', '4827313:4828442', 'FLAGS') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + br'23 (FLAGS (\Seen) UID 4827313)', + br'24 (FLAGS (\Seen) UID 4827943)', + br'25 (FLAGS (\Seen) UID 4828442)', + ]) + self.assertEqual(server.args, ['FETCH', '4827313:4828442', '(FLAGS)']) + + def test_search(self): + response = [] + client, server = self._setup(make_simple_handler('SEARCH', response)) + client.login('user', 'pass') + client.select() + response[:] = ['* SEARCH 2 84 882'] + typ, data = client.search(None, 'FLAGGED', 'SINCE', '1-Feb-1994', 'NOT', 'FROM', '"Smith"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'2 84 882']) + self.assertEqual(server.args, ['FLAGGED', 'SINCE', '1-Feb-1994', 'NOT', 'FROM', '"Smith"']) + + response[:] = ['* SEARCH'] + typ, data = client.search(None, 'TEXT', '"string not in mailbox"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'']) + self.assertEqual(server.args, ['TEXT', '"string not in mailbox"']) + + response[:] = ['* SEARCH 43'] + typ, data = client.search('UTF-8', 'TEXT', 'XXXXXX') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'43']) + self.assertEqual(server.args, ['CHARSET', 'UTF-8', 'TEXT', 'XXXXXX']) + + typ, data = client.search('NF_Z_62-010_(1973)', 'TEXT', 'XXXXXX') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['CHARSET', '"NF_Z_62-010_(1973)"', 'TEXT', 'XXXXXX']) + + def test_uid_search(self): + response = [] + client, server = self._setup(make_simple_handler('UID', response, + 'UID SEARCH completed')) + client.login('user', 'pass') + client.select() + response[:] = ['* SEARCH 2 84 882'] + typ, data = client.uid('SEARCH', 'FLAGGED', 'SINCE', '1-Feb-1994', 'NOT', 'FROM', '"Smith"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'2 84 882']) + self.assertEqual(server.args, ['SEARCH', 'FLAGGED', 'SINCE', '1-Feb-1994', 'NOT', 'FROM', '"Smith"']) + + response[:] = ['* SEARCH'] + typ, data = client.uid('SEARCH', 'TEXT', '"string not in mailbox"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'']) + self.assertEqual(server.args, ['SEARCH', 'TEXT', '"string not in mailbox"']) + + response[:] = ['* SEARCH 43'] + typ, data = client.uid('SEARCH', 'CHARSET', 'UTF-8', 'TEXT', 'XXXXXX') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'43']) + self.assertEqual(server.args, ['SEARCH', 'CHARSET', 'UTF-8', 'TEXT', 'XXXXXX']) + + typ, data = client.uid('SEARCH', 'CHARSET', '"NF_Z_62-010_(1973)"', 'TEXT', 'XXXXXX') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['SEARCH', 'CHARSET', '"NF_Z_62-010_(1973)"', 'TEXT', 'XXXXXX']) + + def test_sort(self): + response = [] + client, server = self._setup(make_simple_handler('SORT', response)) + client.login('user', 'pass') + client.select() + response[:] = ['* SORT 2 84 882'] + typ, data = client.sort('(SUBJECT)', 'UTF-8', 'SINCE', '1-Feb-1994') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [br'2 84 882']) + self.assertEqual(server.args, ['(SUBJECT)', 'UTF-8', 'SINCE', '1-Feb-1994']) + + response[:] = ['* SORT 5 3 4 1 2'] + typ, data = client.sort('(SUBJECT REVERSE DATE)', 'UTF-8', 'ALL') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [br'5 3 4 1 2']) + self.assertEqual(server.args, ['(SUBJECT REVERSE DATE)', 'UTF-8', 'ALL']) + + response[:] = ['* SORT'] + typ, data = client.sort('(SUBJECT)', 'US-ASCII', 'TEXT', '"not in mailbox"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [br'']) + self.assertEqual(server.args, ['(SUBJECT)', 'US-ASCII', 'TEXT', '"not in mailbox"']) + + typ, data = client.sort('SUBJECT', 'NF_Z_62-010_(1973)', 'TEXT', '"not in mailbox"') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['(SUBJECT)', '"NF_Z_62-010_(1973)"', 'TEXT', '"not in mailbox"']) + + def test_uid_sort(self): + response = [] + client, server = self._setup(make_simple_handler('UID', response, + 'UID SORT completed')) + client.login('user', 'pass') + client.select() + response[:] = ['* SORT 2 84 882'] + typ, data = client.uid('sort', '(SUBJECT)', 'UTF-8', 'SINCE', '1-Feb-1994') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [br'2 84 882']) + self.assertEqual(server.args, ['SORT', '(SUBJECT)', 'UTF-8', 'SINCE', '1-Feb-1994']) + + response[:] = ['* SORT 5 3 4 1 2'] + typ, data = client.uid('sort', '(SUBJECT REVERSE DATE)', 'UTF-8', 'ALL') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [br'5 3 4 1 2']) + self.assertEqual(server.args, ['SORT', '(SUBJECT REVERSE DATE)', 'UTF-8', 'ALL']) + + response[:] = ['* SORT'] + typ, data = client.uid('sort', '(SUBJECT)', 'US-ASCII', 'TEXT', '"not in mailbox"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [br'']) + self.assertEqual(server.args, ['SORT', '(SUBJECT)', 'US-ASCII', 'TEXT', '"not in mailbox"']) + + typ, data = client.uid('sort', 'SUBJECT', 'NF_Z_62-010_(1973)', 'TEXT', '"not in mailbox"') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['SORT', '(SUBJECT)', '"NF_Z_62-010_(1973)"', 'TEXT', '"not in mailbox"']) + + def test_thread(self): + response = [] + client, server = self._setup(make_simple_handler('THREAD', response)) + client.login('user', 'pass') + client.select() + response[:] = [ + '* THREAD (166)(167)(168)(169)(172)(170)(171)' + '(173)(174 (175)(176)(178)(181)(180))(179)(177 ' + '(183)(182)(188)(184)(185)(186)(187)(189))(190)' + '(191)(192)(193)(194 195)(196 (197)(198))(199)' + '(200 202)(201)(203)(204)(205)(206 207)(208)'] + typ, data = client.thread('ORDEREDSUBJECT', 'UTF-8', 'SINCE', '5-MAR-2000') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + b'(166)(167)(168)(169)(172)(170)(171)' + b'(173)(174 (175)(176)(178)(181)(180))(179)(177 ' + b'(183)(182)(188)(184)(185)(186)(187)(189))(190)' + b'(191)(192)(193)(194 195)(196 (197)(198))(199)' + b'(200 202)(201)(203)(204)(205)(206 207)(208)']) + self.assertEqual(server.args, ['ORDEREDSUBJECT', 'UTF-8', 'SINCE', '5-MAR-2000']) + + response[:] = [ + '* THREAD (166)(167)(168)(169)(172)((170)(179))' + '(171)(173)((174)(175)(176)(178)(181)(180))' + '((177)(183)(182)(188 (184)(189))(185 186)(187))' + '(190)(191)(192)(193)((194)(195 196))(197 198)' + '(199)(200 202)(201)(203)(204)(205 206 207)(208)'] + typ, data = client.thread('ORDEREDSUBJECT', 'US-ASCII', 'TEXT', '"gewp"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + b'(166)(167)(168)(169)(172)((170)(179))' + b'(171)(173)((174)(175)(176)(178)(181)(180))' + b'((177)(183)(182)(188 (184)(189))(185 186)(187))' + b'(190)(191)(192)(193)((194)(195 196))(197 198)' + b'(199)(200 202)(201)(203)(204)(205 206 207)(208)']) + self.assertEqual(server.args, ['ORDEREDSUBJECT', 'US-ASCII', 'TEXT', '"gewp"']) + + typ, data = client.thread('ORDEREDSUBJECT', 'NF_Z_62-010_(1973)', 'TEXT', '"gewp"') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['ORDEREDSUBJECT', '"NF_Z_62-010_(1973)"', 'TEXT', '"gewp"']) + + def test_uid_thread(self): + response = [] + client, server = self._setup(make_simple_handler('UID', response, + 'UID THREAD completed')) + client.login('user', 'pass') + client.select() + response[:] = [ + '* THREAD (166)(167)(168)(169)(172)(170)(171)' + '(173)(174 (175)(176)(178)(181)(180))(179)(177 ' + '(183)(182)(188)(184)(185)(186)(187)(189))(190)' + '(191)(192)(193)(194 195)(196 (197)(198))(199)' + '(200 202)(201)(203)(204)(205)(206 207)(208)'] + typ, data = client.uid('THREAD', 'ORDEREDSUBJECT', 'UTF-8', 'SINCE', '5-MAR-2000') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + b'(166)(167)(168)(169)(172)(170)(171)' + b'(173)(174 (175)(176)(178)(181)(180))(179)(177 ' + b'(183)(182)(188)(184)(185)(186)(187)(189))(190)' + b'(191)(192)(193)(194 195)(196 (197)(198))(199)' + b'(200 202)(201)(203)(204)(205)(206 207)(208)']) + self.assertEqual(server.args, ['THREAD', 'ORDEREDSUBJECT', 'UTF-8', 'SINCE', '5-MAR-2000']) + + response[:] = [ + '* THREAD (166)(167)(168)(169)(172)((170)(179))' + '(171)(173)((174)(175)(176)(178)(181)(180))' + '((177)(183)(182)(188 (184)(189))(185 186)(187))' + '(190)(191)(192)(193)((194)(195 196))(197 198)' + '(199)(200 202)(201)(203)(204)(205 206 207)(208)'] + typ, data = client.uid('THREAD', 'ORDEREDSUBJECT', 'US-ASCII', 'TEXT', '"gewp"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + b'(166)(167)(168)(169)(172)((170)(179))' + b'(171)(173)((174)(175)(176)(178)(181)(180))' + b'((177)(183)(182)(188 (184)(189))(185 186)(187))' + b'(190)(191)(192)(193)((194)(195 196))(197 198)' + b'(199)(200 202)(201)(203)(204)(205 206 207)(208)']) + self.assertEqual(server.args, ['THREAD', 'ORDEREDSUBJECT', 'US-ASCII', 'TEXT', '"gewp"']) + + typ, data = client.uid('THREAD', 'ORDEREDSUBJECT', 'NF_Z_62-010_(1973)', 'TEXT', '"gewp"') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['THREAD', 'ORDEREDSUBJECT', '"NF_Z_62-010_(1973)"', 'TEXT', '"gewp"']) + + def test_delete(self): + client, server = self._setup(make_simple_handler('DELETE')) + client.login('user', 'pass') + typ, data = client.delete('blurdybloop') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'DELETE completed']) + self.assertEqual(server.args, ['blurdybloop']) + + typ, data = client.delete('foo/bar') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['foo/bar']) + + typ, data = client.delete('New folder') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['"New folder"']) + + def test_rename(self): + client, server = self._setup(make_simple_handler('RENAME')) + client.login('user', 'pass') + typ, data = client.rename('blurdybloop', 'sarasoop') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'RENAME completed']) + self.assertEqual(server.args, ['blurdybloop', 'sarasoop']) + + typ, data = client.rename('Old folder', 'New folder') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['"Old folder"', '"New folder"']) + + def test_subscribe(self): + client, server = self._setup(make_simple_handler('SUBSCRIBE')) + client.login('user', 'pass') + typ, data = client.subscribe('#news.comp.mail.mime') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'SUBSCRIBE completed']) + self.assertEqual(server.args, ['#news.comp.mail.mime']) + + typ, data = client.subscribe('New folder') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['"New folder"']) + + def test_unsubscribe(self): + client, server = self._setup(make_simple_handler('UNSUBSCRIBE')) + client.login('user', 'pass') + typ, data = client.unsubscribe('#news.comp.mail.mime') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'UNSUBSCRIBE completed']) + self.assertEqual(server.args, ['#news.comp.mail.mime']) + + typ, data = client.unsubscribe('New folder') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['"New folder"']) + + def test_list(self): + client, server = self._setup(make_simple_handler('LIST', + [r'* LIST (\Noselect) "/" ""'])) + client.login('user', 'pass') + typ, data = client.list() + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['""', '*']) + + typ, data = client.list('~/Mail/', '%') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['~/Mail/', '%']) + + typ, data = client.list('New folder', '*') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['"New folder"', '*']) + + def test_status(self): + client, server = self._setup(make_simple_handler('STATUS', + ['* STATUS blurdybloop (MESSAGES 231 UIDNEXT 44292)'])) + client.login('user', 'pass') + typ, data = client.status('blurdybloop', '(UIDNEXT MESSAGES)') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'blurdybloop (MESSAGES 231 UIDNEXT 44292)']) + self.assertEqual(server.args, ['blurdybloop', '(UIDNEXT MESSAGES)']) + + # The names argument is wrapped in parentheses if it is not already. + typ, data = client.status('New folder', 'UIDNEXT MESSAGES') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['"New folder"', '(UIDNEXT MESSAGES)']) + + def test_getacl(self): + client, server = self._setup(make_simple_handler('GETACL', + ['* ACL INBOX Fred rwipslxetad'])) + client.login('user', 'pass') + typ, data = client.getacl('INBOX') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'INBOX Fred rwipslxetad']) + self.assertEqual(server.args, ['INBOX']) + + typ, data = client.getacl('New folder') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['"New folder"']) + + def test_setacl(self): + client, server = self._setup(make_simple_handler('SETACL')) + client.login('user', 'pass') + typ, data = client.setacl('INBOX', 'Fred', 'rwipslxetad') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'SETACL completed']) + self.assertEqual(server.args, ['INBOX', 'Fred', 'rwipslxetad']) + + typ, data = client.setacl('New folder', 'Fred', '+lr') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['"New folder"', 'Fred', '+lr']) + + def test_deleteacl(self): + client, server = self._setup(make_simple_handler('DELETEACL')) + client.login('user', 'pass') + typ, data = client.deleteacl('INBOX', 'Fred') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'DELETEACL completed']) + self.assertEqual(server.args, ['INBOX', 'Fred']) + + typ, data = client.deleteacl('New folder', 'Fred') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['"New folder"', 'Fred']) + + def test_myrights(self): + client, server = self._setup(make_simple_handler('MYRIGHTS', + ['* MYRIGHTS INBOX rwiptsldaex'])) + client.login('user', 'pass') + typ, data = client.myrights('INBOX') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'INBOX rwiptsldaex']) + self.assertEqual(server.args, ['INBOX']) + + typ, data = client.myrights('New folder') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['"New folder"']) + + def test_getquota(self): + client, server = self._setup(make_simple_handler('GETQUOTA', + ['* QUOTA "" (STORAGE 10 512)'])) + client.login('user', 'pass') + typ, data = client.getquota('') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'"" (STORAGE 10 512)']) + self.assertEqual(server.args, ['""']) + + def test_getquotaroot(self): + client, server = self._setup(make_simple_handler('GETQUOTAROOT', + ['* QUOTAROOT INBOX ""', '* QUOTA "" (STORAGE 10 512)'])) + client.login('user', 'pass') + typ, data = client.getquotaroot('INBOX') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['INBOX']) + + typ, data = client.getquotaroot('New folder') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['"New folder"']) + + def test_setquota(self): + client, server = self._setup(make_simple_handler('SETQUOTA', + ['* QUOTA "" (STORAGE 512)'])) + client.login('user', 'pass') + typ, data = client.setquota('', '(STORAGE 512)') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'"" (STORAGE 512)']) + self.assertEqual(server.args, ['""', '(STORAGE 512)']) + + # The limits argument is wrapped in parentheses if it is not already. + typ, data = client.setquota('', 'STORAGE 512') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['""', '(STORAGE 512)']) + + def test_getannotation(self): + client, server = self._setup(make_simple_handler('GETANNOTATION', + ['* ANNOTATION INBOX "/comment" ("value.shared" "Hello")'])) + client.login('user', 'pass') + typ, data = client.getannotation('INBOX', '/comment', 'value.shared') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['INBOX', '/comment', 'value.shared']) + + typ, data = client.getannotation('New folder', '/comment', 'value.shared') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['"New folder"', '/comment', 'value.shared']) + + def test_setannotation(self): + client, server = self._setup(make_simple_handler('SETANNOTATION')) + client.login('user', 'pass') + typ, data = client.setannotation('INBOX', '/comment', + '("value.shared" "My comment")') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, + ['INBOX', '/comment', '("value.shared" "My comment")']) + + typ, data = client.setannotation('New folder', '/comment', + '("value.shared" "My comment")') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, + ['"New folder"', '/comment', '("value.shared" "My comment")']) + + def test_proxyauth(self): + client, server = self._setup(make_simple_handler('PROXYAUTH')) + client.login('user', 'pass') + typ, data = client.proxyauth('user') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'PROXYAUTH completed']) + self.assertEqual(server.args, ['user']) + + typ, data = client.proxyauth('us er') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['"us er"']) def test_control_characters(self): client, _ = self._setup(SimpleIMAPHandler) @@ -753,12 +1478,12 @@ def handle_error(self, request, client_address): self.server_close() raise - if verbose: + if verbose > 1: print("creating server") server = MyServer(addr, hdlr) self.assertEqual(server.server_address, server.socket.getsockname()) - if verbose: + if verbose > 1: print("server created") print("ADDR =", addr) print("CLASS =", self.server_class) @@ -773,17 +1498,17 @@ def handle_error(self, request, client_address): kwargs={'poll_interval': 0.01}) t.daemon = True # In case this function raises. t.start() - if verbose: + if verbose > 1: print("server running") return server, t def reap_server(self, server, thread): - if verbose: + if verbose > 1: print("waiting for server") server.shutdown() server.server_close() thread.join() - if verbose: + if verbose > 1: print("done") @contextmanager @@ -821,7 +1546,7 @@ def test_bracket_flags(self): class BracketFlagHandler(SimpleIMAPHandler): def handle(self): - self.flags = ['Answered', 'Flagged', 'Deleted', 'Seen', 'Draft'] + self.flags = [r'\Answered', r'\Flagged', r'\Deleted', r'\Seen', r'\Draft'] super().handle() def cmd_AUTHENTICATE(self, tag, args): @@ -830,11 +1555,11 @@ def cmd_AUTHENTICATE(self, tag, args): self._send_tagged(tag, 'OK', 'FAKEAUTH successful') def cmd_SELECT(self, tag, args): - flag_msg = ' \\'.join(self.flags) + flag_msg = ' '.join(self.flags) self._send_line(('* FLAGS (%s)' % flag_msg).encode('ascii')) self._send_line(b'* 2 EXISTS') self._send_line(b'* 0 RECENT') - msg = ('* OK [PERMANENTFLAGS %s \\*)] Flags permitted.' + msg = ('* OK [PERMANENTFLAGS (%s \\*)] Flags permitted.' % flag_msg) self._send_line(msg.encode('ascii')) self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.') @@ -842,7 +1567,7 @@ def cmd_SELECT(self, tag, args): def cmd_STORE(self, tag, args): new_flags = args[2].strip('(').strip(')').split() self.flags.extend(new_flags) - flags_msg = '(FLAGS (%s))' % ' \\'.join(self.flags) + flags_msg = '(FLAGS (%s))' % ' '.join(self.flags) msg = '* %s FETCH %s' % (args[0], flags_msg) self._send_line(msg.encode('ascii')) self._send_tagged(tag, 'OK', 'STORE completed.') @@ -1097,7 +1822,7 @@ def test_with_statement(self): with self.reaped_server(SimpleIMAPHandler) as server: with self.imap_class(*server.server_address) as imap: imap.login('user', 'pass') - self.assertEqual(server.logged, 'user') + self.assertEqual(server.logged, ['user', '"pass"']) self.assertIsNone(server.logged) @threading_helper.reap_threads @@ -1106,7 +1831,7 @@ def test_with_statement_logout(self): with self.reaped_server(SimpleIMAPHandler) as server: with self.imap_class(*server.server_address) as imap: imap.login('user', 'pass') - self.assertEqual(server.logged, 'user') + self.assertEqual(server.logged, ['user', '"pass"']) imap.logout() self.assertIsNone(server.logged) self.assertIsNone(server.logged) diff --git a/Misc/NEWS.d/next/Library/2026-06-30-12-00-00.gh-issue-40038.qK7mGv.rst b/Misc/NEWS.d/next/Library/2026-06-30-12-00-00.gh-issue-40038.qK7mGv.rst new file mode 100644 index 000000000000000..1f393d23266bcef --- /dev/null +++ b/Misc/NEWS.d/next/Library/2026-06-30-12-00-00.gh-issue-40038.qK7mGv.rst @@ -0,0 +1,6 @@ +:mod:`imaplib` now again quotes command arguments when necessary, for +example mailbox names containing a space. Such quoting was inadvertently +disabled when the module was ported to Python 3, and the arguments are now +quoted according to the :rfc:`3501` grammar. For backward compatibility, +an argument already enclosed in double quotes is left unchanged, so code +that quotes arguments itself keeps working.