From 455004af075ca33874dfab09755eb801c8a03783 Mon Sep 17 00:00:00 2001 From: Serhiy Storchaka Date: Tue, 30 Jun 2026 18:11:10 +0300 Subject: [PATCH] gh-40038: Quote imaplib command arguments when necessary Argument quoting was inadvertently disabled when imaplib was ported to Python 3 (bpo-1210 commented out the ``_checkquote()`` call, bpo-9638 then removed it), so since Python 3.0 commands failed for arguments containing protocol-sensitive characters, such as a space in a mailbox name. Quoting is restored and reimplemented per the RFC 3501 grammar, so that arguments that need quoting are escaped and quoted, while flags, sequence sets and list wildcards are left intact. For backward compatibility, an argument already enclosed in double quotes is left unchanged, so code that quotes arguments itself keeps working. Co-Authored-By: Claude Opus 4.8 (1M context) --- Doc/library/imaplib.rst | 7 +- Lib/imaplib.py | 163 +++- Lib/test/test_imaplib.py | 771 +++++++++++++++++- ...6-06-30-12-00-00.gh-issue-40038.qK7mGv.rst | 6 + 4 files changed, 878 insertions(+), 69 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2026-06-30-12-00-00.gh-issue-40038.qK7mGv.rst diff --git a/Doc/library/imaplib.rst b/Doc/library/imaplib.rst index df2468f7124e6d6..aca3e0a4dc9559d 100644 --- a/Doc/library/imaplib.rst +++ b/Doc/library/imaplib.rst @@ -179,6 +179,9 @@ enclosed with either parentheses or double quotes) each string is quoted. However, the *password* argument to the ``LOGIN`` command is always quoted. If you want to avoid having an argument string quoted (eg: the *flags* argument to ``STORE``) then enclose the string in parentheses (eg: ``r'(\Deleted)'``). +In general, pass arguments unquoted and let the module quote them as needed. +An argument that is already enclosed in double quotes is left unchanged, +so that code which quotes arguments itself keeps working. Most commands return a tuple: ``(type, [data, ...])`` where *type* is usually ``'OK'`` or ``'NO'``, and *data* is either the text from the command response, @@ -392,7 +395,7 @@ An :class:`IMAP4` instance has the following methods: .. versionadded:: 3.14 -.. method:: IMAP4.list([directory[, pattern]]) +.. method:: IMAP4.list(directory='', pattern='*') List mailbox names in *directory* matching *pattern*. *directory* defaults to the top-level mail folder, and *pattern* defaults to match anything. Returned @@ -422,7 +425,7 @@ An :class:`IMAP4` instance has the following methods: The method no longer ignores silently arbitrary exceptions. -.. method:: IMAP4.lsub(directory='""', pattern='*') +.. method:: IMAP4.lsub(directory='', pattern='*') List subscribed mailbox names in directory matching pattern. *directory* defaults to the top level directory and *pattern* defaults to match any mailbox. diff --git a/Lib/imaplib.py b/Lib/imaplib.py index 497b5a60cecb083..03a82362a5ba7f3 100644 --- a/Lib/imaplib.py +++ b/Lib/imaplib.py @@ -130,6 +130,9 @@ _Literal = br'.*{(?P\d+)}$' _Untagged_status = br'\* (?P\d+) (?P[A-Z-]+)( (?P.*))?' _control_chars = re.compile(b'[\x00-\x1F\x7F]') +_non_astring_char = re.compile(br'[(){ \x00-\x1f\x7f%*\\"]') +_non_list_char = re.compile(br'[(){ \x00-\x1f\x7f\\"]') +_quoted = re.compile(br'"(?:[^"\\]|\\.)*+"') class IMAP4: @@ -494,8 +497,7 @@ def append(self, mailbox, flags, date_time, message): if not mailbox: mailbox = 'INBOX' if flags: - if (flags[0],flags[-1]) != ('(',')'): - flags = '(%s)' % flags + flags = self._set_quote(flags) else: flags = None if date_time: @@ -504,7 +506,7 @@ def append(self, mailbox, flags, date_time, message): date_time = None literal = MapCRLF.sub(CRLF, message) self.literal = literal - return self._simple_command(name, mailbox, flags, date_time) + return self._simple_command(name, self._astring(mailbox), flags, date_time) def authenticate(self, mechanism, authobject): @@ -529,7 +531,7 @@ def authenticate(self, mechanism, authobject): #if not cap in self.capabilities: # Let the server decide! # raise self.error("Server doesn't allow %s authentication." % mech) self.literal = _Authenticator(authobject).process - typ, dat = self._simple_command('AUTHENTICATE', mech) + typ, dat = self._simple_command('AUTHENTICATE', self._atom(mech)) if typ != 'OK': raise self.error(dat[-1].decode('utf-8', 'replace')) self.state = 'AUTH' @@ -573,7 +575,8 @@ def copy(self, message_set, new_mailbox): (typ, [data]) = .copy(message_set, new_mailbox) """ - return self._simple_command('COPY', message_set, new_mailbox) + return self._simple_command('COPY', self._sequence_set(message_set), + self._astring(new_mailbox)) def create(self, mailbox): @@ -581,7 +584,7 @@ def create(self, mailbox): (typ, [data]) = .create(mailbox) """ - return self._simple_command('CREATE', mailbox) + return self._simple_command('CREATE', self._astring(mailbox)) def delete(self, mailbox): @@ -589,14 +592,15 @@ def delete(self, mailbox): (typ, [data]) = .delete(mailbox) """ - return self._simple_command('DELETE', mailbox) + return self._simple_command('DELETE', self._astring(mailbox)) def deleteacl(self, mailbox, who): """Delete the ACLs (remove any rights) set for who on mailbox. (typ, [data]) = .deleteacl(mailbox, who) """ - return self._simple_command('DELETEACL', mailbox, who) + return self._simple_command('DELETEACL', self._astring(mailbox), + self._astring(who)) def enable(self, capability): """Send an RFC5161 enable string to the server. @@ -635,7 +639,8 @@ def fetch(self, message_set, message_parts): 'data' are tuples of message part envelope and data. """ name = 'FETCH' - typ, dat = self._simple_command(name, message_set, message_parts) + typ, dat = self._simple_command(name, self._sequence_set(message_set), + self._set_quote(message_parts)) return self._untagged_response(typ, dat, name) @@ -644,7 +649,7 @@ def getacl(self, mailbox): (typ, [data]) = .getacl(mailbox) """ - typ, dat = self._simple_command('GETACL', mailbox) + typ, dat = self._simple_command('GETACL', self._astring(mailbox)) return self._untagged_response(typ, dat, 'ACL') @@ -652,7 +657,8 @@ def getannotation(self, mailbox, entry, attribute): """(typ, [data]) = .getannotation(mailbox, entry, attribute) Retrieve ANNOTATIONs.""" - typ, dat = self._simple_command('GETANNOTATION', mailbox, entry, attribute) + typ, dat = self._simple_command('GETANNOTATION', self._astring(mailbox), + entry, attribute) return self._untagged_response(typ, dat, 'ANNOTATION') @@ -663,7 +669,7 @@ def getquota(self, root): (typ, [data]) = .getquota(root) """ - typ, dat = self._simple_command('GETQUOTA', root) + typ, dat = self._simple_command('GETQUOTA', self._astring(root)) return self._untagged_response(typ, dat, 'QUOTA') @@ -672,7 +678,7 @@ def getquotaroot(self, mailbox): (typ, [[QUOTAROOT responses...], [QUOTA responses]]) = .getquotaroot(mailbox) """ - typ, dat = self._simple_command('GETQUOTAROOT', mailbox) + typ, dat = self._simple_command('GETQUOTAROOT', self._astring(mailbox)) typ, quota = self._untagged_response(typ, dat, 'QUOTA') typ, quotaroot = self._untagged_response(typ, dat, 'QUOTAROOT') return typ, [quotaroot, quota] @@ -691,15 +697,16 @@ def idle(self, duration=None): return Idler(self, duration) - def list(self, directory='""', pattern='*'): + def list(self, directory='', pattern='*'): """List mailbox names in directory matching pattern. - (typ, [data]) = .list(directory='""', pattern='*') + (typ, [data]) = .list(directory='', pattern='*') 'data' is list of LIST responses. """ name = 'LIST' - typ, dat = self._simple_command(name, directory, pattern) + typ, dat = self._simple_command(name, self._astring(directory), + self._list_mailbox(pattern)) return self._untagged_response(typ, dat, name) @@ -710,7 +717,8 @@ def login(self, user, password): NB: 'password' will be quoted. """ - typ, dat = self._simple_command('LOGIN', user, self._quote(password)) + typ, dat = self._simple_command('LOGIN', self._astring(user), + self._quote(password)) if typ != 'OK': raise self.error(dat[-1].decode('UTF-8', 'replace')) self.state = 'AUTH' @@ -755,15 +763,16 @@ def logout(self): return typ, dat - def lsub(self, directory='""', pattern='*'): + def lsub(self, directory='', pattern='*'): """List 'subscribed' mailbox names in directory matching pattern. - (typ, [data, ...]) = .lsub(directory='""', pattern='*') + (typ, [data, ...]) = .lsub(directory='', pattern='*') 'data' are tuples of message part envelope and data. """ name = 'LSUB' - typ, dat = self._simple_command(name, directory, pattern) + typ, dat = self._simple_command(name, self._astring(directory), + self._list_mailbox(pattern)) return self._untagged_response(typ, dat, name) def myrights(self, mailbox): @@ -771,7 +780,7 @@ def myrights(self, mailbox): (typ, [data]) = .myrights(mailbox) """ - typ,dat = self._simple_command('MYRIGHTS', mailbox) + typ,dat = self._simple_command('MYRIGHTS', self._astring(mailbox)) return self._untagged_response(typ, dat, 'MYRIGHTS') def namespace(self): @@ -817,7 +826,7 @@ def proxyauth(self, user): """ name = 'PROXYAUTH' - return self._simple_command(name, user) + return self._simple_command(name, self._astring(user)) def rename(self, oldmailbox, newmailbox): @@ -825,7 +834,8 @@ def rename(self, oldmailbox, newmailbox): (typ, [data]) = .rename(oldmailbox, newmailbox) """ - return self._simple_command('RENAME', oldmailbox, newmailbox) + return self._simple_command('RENAME', self._astring(oldmailbox), + self._astring(newmailbox)) def search(self, charset, *criteria): @@ -837,10 +847,11 @@ def search(self, charset, *criteria): If UTF8 is enabled, charset MUST be None. """ name = 'SEARCH' - if charset: + if charset is not None: if self.utf8_enabled: raise IMAP4.error("Non-None charset not valid in UTF8 mode") - typ, dat = self._simple_command(name, 'CHARSET', charset, *criteria) + typ, dat = self._simple_command(name, + 'CHARSET', self._astring(charset), *criteria) else: typ, dat = self._simple_command(name, *criteria) return self._untagged_response(typ, dat, name) @@ -864,7 +875,7 @@ def select(self, mailbox='INBOX', readonly=False): name = 'EXAMINE' else: name = 'SELECT' - typ, dat = self._simple_command(name, mailbox) + typ, dat = self._simple_command(name, self._astring(mailbox)) if typ != 'OK': self.state = 'AUTH' # Might have been 'SELECTED' return typ, dat @@ -883,14 +894,15 @@ def setacl(self, mailbox, who, what): (typ, [data]) = .setacl(mailbox, who, what) """ - return self._simple_command('SETACL', mailbox, who, what) + return self._simple_command('SETACL', self._astring(mailbox), + self._astring(who), self._astring(what)) - def setannotation(self, *args): + def setannotation(self, mailbox, *args): """(typ, [data]) = .setannotation(mailbox[, entry, attribute]+) Set ANNOTATIONs.""" - typ, dat = self._simple_command('SETANNOTATION', *args) + typ, dat = self._simple_command('SETANNOTATION', self._astring(mailbox), *args) return self._untagged_response(typ, dat, 'ANNOTATION') @@ -899,7 +911,8 @@ def setquota(self, root, limits): (typ, [data]) = .setquota(root, limits) """ - typ, dat = self._simple_command('SETQUOTA', root, limits) + typ, dat = self._simple_command('SETQUOTA', self._astring(root), + self._set_quote(limits)) return self._untagged_response(typ, dat, 'QUOTA') @@ -911,8 +924,9 @@ def sort(self, sort_criteria, charset, *search_criteria): name = 'SORT' #if not name in self.capabilities: # Let the server decide! # raise self.error('unimplemented extension command: %s' % name) - if (sort_criteria[0],sort_criteria[-1]) != ('(',')'): - sort_criteria = '(%s)' % sort_criteria + sort_criteria = self._set_quote(sort_criteria) + if charset is not None: + charset = self._astring(charset) typ, dat = self._simple_command(name, sort_criteria, charset, *search_criteria) return self._untagged_response(typ, dat, name) @@ -949,7 +963,8 @@ def status(self, mailbox, names): name = 'STATUS' #if self.PROTOCOL_VERSION == 'IMAP4': # Let the server decide! # raise self.error('%s unimplemented in IMAP4 (obtain IMAP4rev1 server, or re-code)' % name) - typ, dat = self._simple_command(name, mailbox, names) + typ, dat = self._simple_command(name, self._astring(mailbox), + self._set_quote(names)) return self._untagged_response(typ, dat, name) @@ -958,9 +973,9 @@ def store(self, message_set, command, flags): (typ, [data]) = .store(message_set, command, flags) """ - if (flags[0],flags[-1]) != ('(',')'): - flags = '(%s)' % flags # Avoid quoting the flags - typ, dat = self._simple_command('STORE', message_set, command, flags) + flags = self._set_quote(flags) + typ, dat = self._simple_command('STORE', self._sequence_set(message_set), + command, flags) return self._untagged_response(typ, dat, 'FETCH') @@ -969,7 +984,7 @@ def subscribe(self, mailbox): (typ, [data]) = .subscribe(mailbox) """ - return self._simple_command('SUBSCRIBE', mailbox) + return self._simple_command('SUBSCRIBE', self._astring(mailbox)) def thread(self, threading_algorithm, charset, *search_criteria): @@ -978,7 +993,10 @@ def thread(self, threading_algorithm, charset, *search_criteria): (type, [data]) = .thread(threading_algorithm, charset, search_criteria, ...) """ name = 'THREAD' - typ, dat = self._simple_command(name, threading_algorithm, charset, *search_criteria) + if charset is not None: + charset = self._astring(charset) + typ, dat = self._simple_command(name, self._atom(threading_algorithm), + charset, *search_criteria) return self._untagged_response(typ, dat, name) @@ -999,7 +1017,31 @@ def uid(self, command, *args): (command, self.state, ', '.join(Commands[command]))) name = 'UID' - typ, dat = self._simple_command(name, command, *args) + if command == 'COPY': + message_set, new_mailbox = args + args = (self._sequence_set(message_set), + self._astring(new_mailbox)) + elif command == 'FETCH': + message_set, message_parts = args + args = (self._sequence_set(message_set), + self._set_quote(message_parts)) + elif command == 'STORE': + message_set, op, flags = args + args = (self._sequence_set(message_set), op, + self._set_quote(flags)) + elif command == 'SORT': + sort_criteria, charset, *search_criteria = args + if charset is not None: + charset = self._astring(charset) + args = (self._set_quote(sort_criteria), charset, + *search_criteria) + elif command == 'THREAD': + threading_algorithm, charset, *search_criteria = args + if charset is not None: + charset = self._astring(charset) + args = (self._atom(threading_algorithm), charset, + *search_criteria) + typ, dat = self._simple_command(name, self._atom(command), *args) if command in ('SEARCH', 'SORT', 'THREAD'): name = command else: @@ -1012,7 +1054,7 @@ def unsubscribe(self, mailbox): (typ, [data]) = .unsubscribe(mailbox) """ - return self._simple_command('UNSUBSCRIBE', mailbox) + return self._simple_command('UNSUBSCRIBE', self._astring(mailbox)) def unselect(self): @@ -1368,13 +1410,46 @@ def _new_tag(self): return tag - def _quote(self, arg): + def _atom(self, arg): + return arg - arg = arg.replace('\\', '\\\\') - arg = arg.replace('"', '\\"') + def _sequence_set(self, arg): + return arg - return '"' + arg + '"' + def _set_quote(self, arg): + if arg and arg[0] == '(' and arg[-1] == ')': + return arg + return '(' + arg + ')' + def _quote(self, arg): + if isinstance(arg, str): + arg = bytes(arg, self._encoding) + arg = arg.replace(b'\\', br'\\') + arg = arg.replace(b'"', br'\"') + return b'"' + arg + b'"' + + # For backward compatibility, an argument already enclosed in double + # quotes is left unquoted, so that code which quotes arguments itself + # keeps working. New code should pass arguments unquoted and let the + # module quote them as needed. + + def _astring(self, arg): + if isinstance(arg, str): + arg = bytes(arg, self._encoding) + if _quoted.fullmatch(arg): + return arg + if arg and _non_astring_char.search(arg) is None: + return arg + return self._quote(arg) + + def _list_mailbox(self, arg): + if isinstance(arg, str): + arg = bytes(arg, self._encoding) + if _quoted.fullmatch(arg): + return arg + if arg and _non_list_char.search(arg) is None: + return arg + return self._quote(arg) def _simple_command(self, name, *args): diff --git a/Lib/test/test_imaplib.py b/Lib/test/test_imaplib.py index fb256fb7cbcd344..b001af850f458cd 100644 --- a/Lib/test/test_imaplib.py +++ b/Lib/test/test_imaplib.py @@ -26,6 +26,47 @@ CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "certdata", "keycert3.pem") CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "certdata", "pycacert.pem") +_quoted_string = re.compile(r'"(?:[^"\\]|\\.)*"') + +def splitargs(line): + # Split a command line into IMAP arguments: quoted strings, balanced + # (possibly nested) parenthesized lists, and bare atoms. + args = [] + i, n = 0, len(line) + while i < n: + if line[i] == ' ': + i += 1 + continue + start = i + depth = 0 + while i < n and (depth or line[i] != ' '): + c = line[i] + if c == '"': + i = _quoted_string.match(line, i).end() + continue + elif c == '(': + depth += 1 + elif c == ')': + depth -= 1 + i += 1 + args.append(line[start:i]) + return args + +def parse_seq_number(s, maxmsg): + if s == '*': + return maxmsg + return int(s) + +def parse_sequence_set(arg, maxmsg): + for s in arg.split(','): + if ':' in s: + lo, hi = s.split(':') + lo = parse_seq_number(lo, maxmsg) + hi = parse_seq_number(hi, maxmsg) + yield from range(min(lo, hi), max(lo, hi) + 1) + else: + yield parse_seq_number(s, maxmsg) + class TestImaplib(unittest.TestCase): @@ -91,6 +132,55 @@ def test_imap4_host_default_value(self): imaplib.IMAP4() self.assertIn(cm.exception.errno, expected_errnos) + def test_astring(self): + m = imaplib.IMAP4.__new__(imaplib.IMAP4) + m._encoding = 'ascii' + # Plain atoms are left unquoted. + self.assertEqual(m._astring('INBOX'), b'INBOX') + self.assertEqual(m._astring(b'INBOX'), b'INBOX') + # Names with protocol-sensitive characters are quoted. + self.assertEqual(m._astring('New folder'), b'"New folder"') + self.assertEqual(m._astring('a"b'), b'"a\\"b"') + self.assertEqual(m._astring('a\\b'), b'"a\\\\b"') + self.assertEqual(m._astring(''), b'""') + self.assertEqual(m._astring('*'), b'"*"') + # A well-formed quoted string is passed through unchanged. + self.assertEqual(m._astring('"New folder"'), b'"New folder"') + self.assertEqual(m._astring('""'), b'""') + # Including a lenient (non-RFC) backslash escape, which the server + # may accept. + self.assertEqual(m._astring('"a\\b"'), b'"a\\b"') + # A string that only looks quoted but is not a single token is + # quoted as data, closing the argument injection vector. + self.assertEqual(m._astring('"a" SELECT evil "'), + b'"\\"a\\" SELECT evil \\""') + self.assertEqual(m._astring('"'), b'"\\""') + + def test_astring_idempotent(self): + # Quoting an already quoted argument should not change it, so that + # quoting twice gives the same result as quoting once. + m = imaplib.IMAP4.__new__(imaplib.IMAP4) + m._encoding = 'ascii' + for arg in ['INBOX', 'New folder', 'a"b', 'a\\b', '', '*', '%', + '"New folder"', '""', '"a\\b"', '"a" SELECT evil "', + '"', 'a\tb', 'a\rb', '\x7f', '(a)']: + with self.subTest(arg=arg): + once = m._astring(arg) + self.assertEqual(m._astring(once), once) + twice = m._list_mailbox(arg) + self.assertEqual(m._list_mailbox(twice), twice) + + def test_list_mailbox(self): + m = imaplib.IMAP4.__new__(imaplib.IMAP4) + m._encoding = 'ascii' + # Wildcards are not quoted in a list pattern. + self.assertEqual(m._list_mailbox('*'), b'*') + self.assertEqual(m._list_mailbox('%'), b'%') + self.assertEqual(m._list_mailbox('foo/%'), b'foo/%') + # But spaces still require quoting. + self.assertEqual(m._list_mailbox('New folder'), b'"New folder"') + self.assertEqual(m._list_mailbox('"New folder"'), b'"New folder"') + if ssl: class SecureTCPServer(socketserver.TCPServer): @@ -119,11 +209,11 @@ class SimpleIMAPHandler(socketserver.StreamRequestHandler): def setup(self): super().setup() - self.server.is_selected = False + self.server.is_selected = None self.server.logged = None def _send(self, message): - if verbose: + if verbose > 1: print("SENT: %r" % message.strip()) self.wfile.write(message) @@ -157,7 +247,7 @@ def handle(self): if line.endswith(b'\r\n'): break - if verbose: + if verbose > 1: print('GOT: %r' % line.strip()) if self.continuation: try: @@ -165,10 +255,11 @@ def handle(self): except StopIteration: self.continuation = None continue - splitline = line.decode('ASCII').split() + splitline = splitargs(line.decode().removesuffix('\r\n')) tag = splitline[0] cmd = splitline[1] args = splitline[2:] + self.server.args = args if hasattr(self, 'cmd_' + cmd): continuation = getattr(self, 'cmd_' + cmd)(tag, args) @@ -191,17 +282,17 @@ def cmd_LOGOUT(self, tag, args): self._send_tagged(tag, 'OK', 'LOGOUT completed') def cmd_LOGIN(self, tag, args): - self.server.logged = args[0] + self.server.logged = args self._send_tagged(tag, 'OK', 'LOGIN completed') def cmd_SELECT(self, tag, args): - self.server.is_selected = True + self.server.is_selected = args self._send_line(b'* 2 EXISTS') self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.') def cmd_UNSELECT(self, tag, args): - if self.server.is_selected: - self.server.is_selected = False + if self.server.is_selected is not None: + self.server.is_selected = None self._send_tagged(tag, 'OK', 'Returned to authenticated state. (Success)') else: self._send_tagged(tag, 'BAD', 'No mailbox selected') @@ -268,6 +359,24 @@ def cmd_AUTHENTICATE(self, tag, args): self._send_tagged(tag, 'NO', 'No access') +def make_simple_handler(command, untagged_response=(), + completed=None): + if completed is None: + completed = f'{command} completed' + def cmd(self, tag, args): + for msg in untagged_response: + self._send_textline(msg) + self._send_tagged(tag, 'OK', completed) + cmd.__name__ = 'cmd_' + command + class Handler(SimpleIMAPHandler): + pass + Handler.__name__ = command.title() + 'Handler' + setattr(Handler, cmd.__name__, cmd) + Handler.__qualname__ = Handler.__name__ + cmd.__qualname__ = Handler.__qualname__ + '.' + cmd.__name__ + return Handler + + class NewIMAPTestsMixin: client = None @@ -353,6 +462,23 @@ def test_enable_raises_error_if_no_capability(self): 'does not support ENABLE'): client.enable('foo') + def test_enable(self): + class EnableHandler(SimpleIMAPHandler): + capabilities = 'IMAP4rev1 ID LITERAL+ ENABLE X-GOOD-IDEA' + def cmd_ENABLE(self, tag, args): + capabilities = self.capabilities.split() + for arg in args: + if arg in capabilities: + self._send_textline('* ENABLED ' + arg) + self._send_tagged(tag, 'OK', 'foo') + + client, server = self._setup(EnableHandler) + client.login('user', 'pass') + code, data = client.enable('CONDSTORE X-GOOD-IDEA') + self.assertEqual(code, 'OK') + self.assertEqual(data, [b'foo']) + self.assertEqual(server.args, ['CONDSTORE', 'X-GOOD-IDEA']) + def test_enable_UTF8_raises_error_if_not_supported(self): client, _ = self._setup(SimpleIMAPHandler) typ, data = client.login('user', 'pass') @@ -386,6 +512,7 @@ def cmd_APPEND(self, tag, args): code, _ = client.enable('UTF8=ACCEPT') self.assertEqual(code, 'OK') self.assertEqual(client._encoding, 'utf-8') + self.assertEqual(server.args, ['UTF8=ACCEPT']) msg_string = 'Subject: üñí©öðé' typ, data = client.append( None, None, None, (msg_string + '\n').encode('utf-8')) @@ -545,7 +672,7 @@ def test_with_statement(self): _, server = self._setup(SimpleIMAPHandler, connect=False) with self.imap_class(*server.server_address) as imap: imap.login('user', 'pass') - self.assertEqual(server.logged, 'user') + self.assertEqual(server.logged, ['user', '"pass"']) self.assertIsNone(server.logged) def test_with_statement_logout(self): @@ -553,7 +680,7 @@ def test_with_statement_logout(self): _, server = self._setup(SimpleIMAPHandler, connect=False) with self.imap_class(*server.server_address) as imap: imap.login('user', 'pass') - self.assertEqual(server.logged, 'user') + self.assertEqual(server.logged, ['user', '"pass"']) imap.logout() self.assertIsNone(server.logged) self.assertIsNone(server.logged) @@ -628,11 +755,33 @@ def test_idle_delayed_packet(self): self.fail('multi-packet response was corrupted by idle timeout') def test_login(self): - client, _ = self._setup(SimpleIMAPHandler) + client, server = self._setup(SimpleIMAPHandler) typ, data = client.login('user', 'pass') self.assertEqual(typ, 'OK') self.assertEqual(data[0], b'LOGIN completed') self.assertEqual(client.state, 'AUTH') + # The user name is quoted only when necessary, but the password + # is always quoted. + self.assertEqual(server.logged, ['user', '"pass"']) + self.assertRaises(imaplib.IMAP4.error, client.login, 'user', 'pass') + + def test_login_quoted(self): + client, server = self._setup(SimpleIMAPHandler) + typ, data = client.login('us*r', 'p%ss') + self.assertEqual(typ, 'OK') + self.assertEqual(data[0], b'LOGIN completed') + self.assertEqual(client.state, 'AUTH') + self.assertEqual(server.logged, ['"us*r"', '"p%ss"']) + + def test_login_quoted2(self): + # An already quoted user name is passed through unchanged, rather + # than being quoted a second time; the password is always quoted. + client, server = self._setup(SimpleIMAPHandler) + typ, data = client.login('"user"', '"pass"') + self.assertEqual(typ, 'OK') + self.assertEqual(data[0], b'LOGIN completed') + self.assertEqual(client.state, 'AUTH') + self.assertEqual(server.logged, ['"user"', r'"\"pass\""']) def test_logout(self): client, _ = self._setup(SimpleIMAPHandler) @@ -655,8 +804,24 @@ def cmd_LSUB(self, tag, args): self.assertEqual(typ, 'OK') self.assertEqual(data[0], b'() "." directoryA') + def test_select(self): + client, server = self._setup(SimpleIMAPHandler) + client.login('user', 'pass') + typ, data = client.select() + self.assertEqual(typ, 'OK') + self.assertEqual(data[0], b'2') + self.assertEqual(server.is_selected, ['INBOX']) + + typ, data = client.select('Archive') + self.assertEqual(typ, 'OK') + self.assertEqual(server.is_selected, ['Archive']) + + typ, data = client.select('New folder') + self.assertEqual(typ, 'OK') + self.assertEqual(server.is_selected, ['"New folder"']) + def test_unselect(self): - client, _ = self._setup(SimpleIMAPHandler) + client, server = self._setup(SimpleIMAPHandler) client.login('user', 'pass') typ, data = client.select() self.assertEqual(typ, 'OK') @@ -666,6 +831,566 @@ def test_unselect(self): self.assertEqual(typ, 'OK') self.assertEqual(data[0], b'Returned to authenticated state. (Success)') self.assertEqual(client.state, 'AUTH') + self.assertIsNone(server.is_selected) + + def test_create(self): + client, server = self._setup(make_simple_handler('CREATE')) + client.login('user', 'pass') + typ, data = client.create('owatagusiam/') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'CREATE completed']) + self.assertEqual(server.args, ['owatagusiam/']) + + typ, data = client.create('owatagusiam/blurdybloop') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'CREATE completed']) + self.assertEqual(server.args, ['owatagusiam/blurdybloop']) + + typ, data = client.create('New folder') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'CREATE completed']) + self.assertEqual(server.args, ['"New folder"']) + + def test_copy(self): + client, server = self._setup(make_simple_handler('COPY')) + client.login('user', 'pass') + client.select() + typ, data = client.copy('2:4', 'MEETING') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'COPY completed']) + self.assertEqual(server.args, ['2:4', 'MEETING']) + + typ, data = client.copy('2:4', 'New folder') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'COPY completed']) + self.assertEqual(server.args, ['2:4', '"New folder"']) + + def test_uid_copy(self): + client, server = self._setup(make_simple_handler('UID', + completed='UID COPY completed')) + client.login('user', 'pass') + client.select() + typ, data = client.uid('copy', '4827313:4828442', 'MEETING') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [None]) + self.assertEqual(server.args, ['COPY', '4827313:4828442', 'MEETING']) + + typ, data = client.uid('copy', '4827313:4828442', 'New folder') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [None]) + self.assertEqual(server.args, ['COPY', '4827313:4828442', '"New folder"']) + + def test_store(self): + client, server = self._setup(make_simple_handler('STORE', [ + r'* 2 FETCH (FLAGS (\Deleted \Seen))', + r'* 3 FETCH (FLAGS (\Deleted))', + r'* 4 FETCH (FLAGS (\Deleted \Flagged \Seen))', + ])) + client.login('user', 'pass') + client.select() + typ, data = client.store('2:4', '+FLAGS', r'(\Deleted)') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + br'2 (FLAGS (\Deleted \Seen))', + br'3 (FLAGS (\Deleted))', + br'4 (FLAGS (\Deleted \Flagged \Seen))', + ]) + self.assertEqual(server.args, ['2:4', '+FLAGS', r'(\Deleted)']) + + typ, data = client.store('2:4', '+FLAGS', r'\Deleted') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['2:4', '+FLAGS', r'(\Deleted)']) + + def test_uid_store(self): + client, server = self._setup(make_simple_handler('UID', [ + r'* 23 FETCH (FLAGS (\Deleted \Seen) UID 4827313)', + r'* 24 FETCH (FLAGS (\Deleted) UID 4827943)', + r'* 25 FETCH (FLAGS (\Deleted \Flagged \Seen) UID 4828442)', + ], 'UID STORE completed')) + client.login('user', 'pass') + client.select() + typ, data = client.uid('store', '4827313:4828442', '+FLAGS', r'(\Deleted)') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + br'23 (FLAGS (\Deleted \Seen) UID 4827313)', + br'24 (FLAGS (\Deleted) UID 4827943)', + br'25 (FLAGS (\Deleted \Flagged \Seen) UID 4828442)', + ]) + self.assertEqual(server.args, ['STORE', '4827313:4828442', '+FLAGS', r'(\Deleted)']) + + typ, data = client.uid('store', '4827313:4828442', '+FLAGS', r'\Deleted') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['STORE', '4827313:4828442', '+FLAGS', r'(\Deleted)']) + + def test_fetch(self): + # The handler expands the requested sequence set and answers for + # exactly those messages, so the test exercises the round trip of + # the message set, not just a canned reply. + class FetchHandler(SimpleIMAPHandler): + messages = 4 + def cmd_SELECT(self, tag, args): + self.server.is_selected = args + self._send_line(b'* %d EXISTS' % self.messages) + self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.') + def cmd_FETCH(self, tag, args): + for n in parse_sequence_set(args[0], self.messages): + self._send_textline(r'* %d FETCH (FLAGS (\Seen))' % n) + self._send_tagged(tag, 'OK', 'FETCH completed') + + client, server = self._setup(FetchHandler) + client.login('user', 'pass') + client.select() + typ, data = client.fetch('2:4', '(FLAGS)') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + br'2 (FLAGS (\Seen))', + br'3 (FLAGS (\Seen))', + br'4 (FLAGS (\Seen))', + ]) + self.assertEqual(server.args, ['2:4', '(FLAGS)']) + + # message_parts is wrapped in parentheses if it is not already. + typ, data = client.fetch('2:4', 'FLAGS') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + br'2 (FLAGS (\Seen))', + br'3 (FLAGS (\Seen))', + br'4 (FLAGS (\Seen))', + ]) + self.assertEqual(server.args, ['2:4', '(FLAGS)']) + + # A comma-separated set with an open range up to '*'. + typ, data = client.fetch('1,3:*', '(FLAGS)') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + br'1 (FLAGS (\Seen))', + br'3 (FLAGS (\Seen))', + br'4 (FLAGS (\Seen))', + ]) + self.assertEqual(server.args, ['1,3:*', '(FLAGS)']) + + # An item with nested parentheses is sent (and parsed) as a + # single argument. + typ, data = client.fetch('1', '(BODY[HEADER.FIELDS (DATE FROM)])') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [br'1 (FLAGS (\Seen))']) + self.assertEqual(server.args, ['1', '(BODY[HEADER.FIELDS (DATE FROM)])']) + + def test_uid_fetch(self): + client, server = self._setup(make_simple_handler('UID', [ + r'* 23 FETCH (FLAGS (\Seen) UID 4827313)', + r'* 24 FETCH (FLAGS (\Seen) UID 4827943)', + r'* 25 FETCH (FLAGS (\Seen) UID 4828442)', + ], 'UID FETCH completed')) + client.login('user', 'pass') + client.select() + typ, data = client.uid('fetch', '4827313:4828442', 'FLAGS') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + br'23 (FLAGS (\Seen) UID 4827313)', + br'24 (FLAGS (\Seen) UID 4827943)', + br'25 (FLAGS (\Seen) UID 4828442)', + ]) + self.assertEqual(server.args, ['FETCH', '4827313:4828442', '(FLAGS)']) + + def test_search(self): + response = [] + client, server = self._setup(make_simple_handler('SEARCH', response)) + client.login('user', 'pass') + client.select() + response[:] = ['* SEARCH 2 84 882'] + typ, data = client.search(None, 'FLAGGED', 'SINCE', '1-Feb-1994', 'NOT', 'FROM', '"Smith"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'2 84 882']) + self.assertEqual(server.args, ['FLAGGED', 'SINCE', '1-Feb-1994', 'NOT', 'FROM', '"Smith"']) + + response[:] = ['* SEARCH'] + typ, data = client.search(None, 'TEXT', '"string not in mailbox"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'']) + self.assertEqual(server.args, ['TEXT', '"string not in mailbox"']) + + response[:] = ['* SEARCH 43'] + typ, data = client.search('UTF-8', 'TEXT', 'XXXXXX') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'43']) + self.assertEqual(server.args, ['CHARSET', 'UTF-8', 'TEXT', 'XXXXXX']) + + typ, data = client.search('NF_Z_62-010_(1973)', 'TEXT', 'XXXXXX') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['CHARSET', '"NF_Z_62-010_(1973)"', 'TEXT', 'XXXXXX']) + + def test_uid_search(self): + response = [] + client, server = self._setup(make_simple_handler('UID', response, + 'UID SEARCH completed')) + client.login('user', 'pass') + client.select() + response[:] = ['* SEARCH 2 84 882'] + typ, data = client.uid('SEARCH', 'FLAGGED', 'SINCE', '1-Feb-1994', 'NOT', 'FROM', '"Smith"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'2 84 882']) + self.assertEqual(server.args, ['SEARCH', 'FLAGGED', 'SINCE', '1-Feb-1994', 'NOT', 'FROM', '"Smith"']) + + response[:] = ['* SEARCH'] + typ, data = client.uid('SEARCH', 'TEXT', '"string not in mailbox"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'']) + self.assertEqual(server.args, ['SEARCH', 'TEXT', '"string not in mailbox"']) + + response[:] = ['* SEARCH 43'] + typ, data = client.uid('SEARCH', 'CHARSET', 'UTF-8', 'TEXT', 'XXXXXX') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'43']) + self.assertEqual(server.args, ['SEARCH', 'CHARSET', 'UTF-8', 'TEXT', 'XXXXXX']) + + typ, data = client.uid('SEARCH', 'CHARSET', '"NF_Z_62-010_(1973)"', 'TEXT', 'XXXXXX') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['SEARCH', 'CHARSET', '"NF_Z_62-010_(1973)"', 'TEXT', 'XXXXXX']) + + def test_sort(self): + response = [] + client, server = self._setup(make_simple_handler('SORT', response)) + client.login('user', 'pass') + client.select() + response[:] = ['* SORT 2 84 882'] + typ, data = client.sort('(SUBJECT)', 'UTF-8', 'SINCE', '1-Feb-1994') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [br'2 84 882']) + self.assertEqual(server.args, ['(SUBJECT)', 'UTF-8', 'SINCE', '1-Feb-1994']) + + response[:] = ['* SORT 5 3 4 1 2'] + typ, data = client.sort('(SUBJECT REVERSE DATE)', 'UTF-8', 'ALL') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [br'5 3 4 1 2']) + self.assertEqual(server.args, ['(SUBJECT REVERSE DATE)', 'UTF-8', 'ALL']) + + response[:] = ['* SORT'] + typ, data = client.sort('(SUBJECT)', 'US-ASCII', 'TEXT', '"not in mailbox"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [br'']) + self.assertEqual(server.args, ['(SUBJECT)', 'US-ASCII', 'TEXT', '"not in mailbox"']) + + typ, data = client.sort('SUBJECT', 'NF_Z_62-010_(1973)', 'TEXT', '"not in mailbox"') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['(SUBJECT)', '"NF_Z_62-010_(1973)"', 'TEXT', '"not in mailbox"']) + + def test_uid_sort(self): + response = [] + client, server = self._setup(make_simple_handler('UID', response, + 'UID SORT completed')) + client.login('user', 'pass') + client.select() + response[:] = ['* SORT 2 84 882'] + typ, data = client.uid('sort', '(SUBJECT)', 'UTF-8', 'SINCE', '1-Feb-1994') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [br'2 84 882']) + self.assertEqual(server.args, ['SORT', '(SUBJECT)', 'UTF-8', 'SINCE', '1-Feb-1994']) + + response[:] = ['* SORT 5 3 4 1 2'] + typ, data = client.uid('sort', '(SUBJECT REVERSE DATE)', 'UTF-8', 'ALL') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [br'5 3 4 1 2']) + self.assertEqual(server.args, ['SORT', '(SUBJECT REVERSE DATE)', 'UTF-8', 'ALL']) + + response[:] = ['* SORT'] + typ, data = client.uid('sort', '(SUBJECT)', 'US-ASCII', 'TEXT', '"not in mailbox"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [br'']) + self.assertEqual(server.args, ['SORT', '(SUBJECT)', 'US-ASCII', 'TEXT', '"not in mailbox"']) + + typ, data = client.uid('sort', 'SUBJECT', 'NF_Z_62-010_(1973)', 'TEXT', '"not in mailbox"') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['SORT', '(SUBJECT)', '"NF_Z_62-010_(1973)"', 'TEXT', '"not in mailbox"']) + + def test_thread(self): + response = [] + client, server = self._setup(make_simple_handler('THREAD', response)) + client.login('user', 'pass') + client.select() + response[:] = [ + '* THREAD (166)(167)(168)(169)(172)(170)(171)' + '(173)(174 (175)(176)(178)(181)(180))(179)(177 ' + '(183)(182)(188)(184)(185)(186)(187)(189))(190)' + '(191)(192)(193)(194 195)(196 (197)(198))(199)' + '(200 202)(201)(203)(204)(205)(206 207)(208)'] + typ, data = client.thread('ORDEREDSUBJECT', 'UTF-8', 'SINCE', '5-MAR-2000') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + b'(166)(167)(168)(169)(172)(170)(171)' + b'(173)(174 (175)(176)(178)(181)(180))(179)(177 ' + b'(183)(182)(188)(184)(185)(186)(187)(189))(190)' + b'(191)(192)(193)(194 195)(196 (197)(198))(199)' + b'(200 202)(201)(203)(204)(205)(206 207)(208)']) + self.assertEqual(server.args, ['ORDEREDSUBJECT', 'UTF-8', 'SINCE', '5-MAR-2000']) + + response[:] = [ + '* THREAD (166)(167)(168)(169)(172)((170)(179))' + '(171)(173)((174)(175)(176)(178)(181)(180))' + '((177)(183)(182)(188 (184)(189))(185 186)(187))' + '(190)(191)(192)(193)((194)(195 196))(197 198)' + '(199)(200 202)(201)(203)(204)(205 206 207)(208)'] + typ, data = client.thread('ORDEREDSUBJECT', 'US-ASCII', 'TEXT', '"gewp"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + b'(166)(167)(168)(169)(172)((170)(179))' + b'(171)(173)((174)(175)(176)(178)(181)(180))' + b'((177)(183)(182)(188 (184)(189))(185 186)(187))' + b'(190)(191)(192)(193)((194)(195 196))(197 198)' + b'(199)(200 202)(201)(203)(204)(205 206 207)(208)']) + self.assertEqual(server.args, ['ORDEREDSUBJECT', 'US-ASCII', 'TEXT', '"gewp"']) + + typ, data = client.thread('ORDEREDSUBJECT', 'NF_Z_62-010_(1973)', 'TEXT', '"gewp"') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['ORDEREDSUBJECT', '"NF_Z_62-010_(1973)"', 'TEXT', '"gewp"']) + + def test_uid_thread(self): + response = [] + client, server = self._setup(make_simple_handler('UID', response, + 'UID THREAD completed')) + client.login('user', 'pass') + client.select() + response[:] = [ + '* THREAD (166)(167)(168)(169)(172)(170)(171)' + '(173)(174 (175)(176)(178)(181)(180))(179)(177 ' + '(183)(182)(188)(184)(185)(186)(187)(189))(190)' + '(191)(192)(193)(194 195)(196 (197)(198))(199)' + '(200 202)(201)(203)(204)(205)(206 207)(208)'] + typ, data = client.uid('THREAD', 'ORDEREDSUBJECT', 'UTF-8', 'SINCE', '5-MAR-2000') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + b'(166)(167)(168)(169)(172)(170)(171)' + b'(173)(174 (175)(176)(178)(181)(180))(179)(177 ' + b'(183)(182)(188)(184)(185)(186)(187)(189))(190)' + b'(191)(192)(193)(194 195)(196 (197)(198))(199)' + b'(200 202)(201)(203)(204)(205)(206 207)(208)']) + self.assertEqual(server.args, ['THREAD', 'ORDEREDSUBJECT', 'UTF-8', 'SINCE', '5-MAR-2000']) + + response[:] = [ + '* THREAD (166)(167)(168)(169)(172)((170)(179))' + '(171)(173)((174)(175)(176)(178)(181)(180))' + '((177)(183)(182)(188 (184)(189))(185 186)(187))' + '(190)(191)(192)(193)((194)(195 196))(197 198)' + '(199)(200 202)(201)(203)(204)(205 206 207)(208)'] + typ, data = client.uid('THREAD', 'ORDEREDSUBJECT', 'US-ASCII', 'TEXT', '"gewp"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + b'(166)(167)(168)(169)(172)((170)(179))' + b'(171)(173)((174)(175)(176)(178)(181)(180))' + b'((177)(183)(182)(188 (184)(189))(185 186)(187))' + b'(190)(191)(192)(193)((194)(195 196))(197 198)' + b'(199)(200 202)(201)(203)(204)(205 206 207)(208)']) + self.assertEqual(server.args, ['THREAD', 'ORDEREDSUBJECT', 'US-ASCII', 'TEXT', '"gewp"']) + + typ, data = client.uid('THREAD', 'ORDEREDSUBJECT', 'NF_Z_62-010_(1973)', 'TEXT', '"gewp"') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['THREAD', 'ORDEREDSUBJECT', '"NF_Z_62-010_(1973)"', 'TEXT', '"gewp"']) + + def test_delete(self): + client, server = self._setup(make_simple_handler('DELETE')) + client.login('user', 'pass') + typ, data = client.delete('blurdybloop') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'DELETE completed']) + self.assertEqual(server.args, ['blurdybloop']) + + typ, data = client.delete('foo/bar') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['foo/bar']) + + typ, data = client.delete('New folder') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['"New folder"']) + + def test_rename(self): + client, server = self._setup(make_simple_handler('RENAME')) + client.login('user', 'pass') + typ, data = client.rename('blurdybloop', 'sarasoop') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'RENAME completed']) + self.assertEqual(server.args, ['blurdybloop', 'sarasoop']) + + typ, data = client.rename('Old folder', 'New folder') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['"Old folder"', '"New folder"']) + + def test_subscribe(self): + client, server = self._setup(make_simple_handler('SUBSCRIBE')) + client.login('user', 'pass') + typ, data = client.subscribe('#news.comp.mail.mime') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'SUBSCRIBE completed']) + self.assertEqual(server.args, ['#news.comp.mail.mime']) + + typ, data = client.subscribe('New folder') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['"New folder"']) + + def test_unsubscribe(self): + client, server = self._setup(make_simple_handler('UNSUBSCRIBE')) + client.login('user', 'pass') + typ, data = client.unsubscribe('#news.comp.mail.mime') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'UNSUBSCRIBE completed']) + self.assertEqual(server.args, ['#news.comp.mail.mime']) + + typ, data = client.unsubscribe('New folder') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['"New folder"']) + + def test_list(self): + client, server = self._setup(make_simple_handler('LIST', + [r'* LIST (\Noselect) "/" ""'])) + client.login('user', 'pass') + typ, data = client.list() + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['""', '*']) + + typ, data = client.list('~/Mail/', '%') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['~/Mail/', '%']) + + typ, data = client.list('New folder', '*') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['"New folder"', '*']) + + def test_status(self): + client, server = self._setup(make_simple_handler('STATUS', + ['* STATUS blurdybloop (MESSAGES 231 UIDNEXT 44292)'])) + client.login('user', 'pass') + typ, data = client.status('blurdybloop', '(UIDNEXT MESSAGES)') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'blurdybloop (MESSAGES 231 UIDNEXT 44292)']) + self.assertEqual(server.args, ['blurdybloop', '(UIDNEXT MESSAGES)']) + + # The names argument is wrapped in parentheses if it is not already. + typ, data = client.status('New folder', 'UIDNEXT MESSAGES') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['"New folder"', '(UIDNEXT MESSAGES)']) + + def test_getacl(self): + client, server = self._setup(make_simple_handler('GETACL', + ['* ACL INBOX Fred rwipslxetad'])) + client.login('user', 'pass') + typ, data = client.getacl('INBOX') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'INBOX Fred rwipslxetad']) + self.assertEqual(server.args, ['INBOX']) + + typ, data = client.getacl('New folder') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['"New folder"']) + + def test_setacl(self): + client, server = self._setup(make_simple_handler('SETACL')) + client.login('user', 'pass') + typ, data = client.setacl('INBOX', 'Fred', 'rwipslxetad') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'SETACL completed']) + self.assertEqual(server.args, ['INBOX', 'Fred', 'rwipslxetad']) + + typ, data = client.setacl('New folder', 'Fred', '+lr') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['"New folder"', 'Fred', '+lr']) + + def test_deleteacl(self): + client, server = self._setup(make_simple_handler('DELETEACL')) + client.login('user', 'pass') + typ, data = client.deleteacl('INBOX', 'Fred') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'DELETEACL completed']) + self.assertEqual(server.args, ['INBOX', 'Fred']) + + typ, data = client.deleteacl('New folder', 'Fred') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['"New folder"', 'Fred']) + + def test_myrights(self): + client, server = self._setup(make_simple_handler('MYRIGHTS', + ['* MYRIGHTS INBOX rwiptsldaex'])) + client.login('user', 'pass') + typ, data = client.myrights('INBOX') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'INBOX rwiptsldaex']) + self.assertEqual(server.args, ['INBOX']) + + typ, data = client.myrights('New folder') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['"New folder"']) + + def test_getquota(self): + client, server = self._setup(make_simple_handler('GETQUOTA', + ['* QUOTA "" (STORAGE 10 512)'])) + client.login('user', 'pass') + typ, data = client.getquota('') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'"" (STORAGE 10 512)']) + self.assertEqual(server.args, ['""']) + + def test_getquotaroot(self): + client, server = self._setup(make_simple_handler('GETQUOTAROOT', + ['* QUOTAROOT INBOX ""', '* QUOTA "" (STORAGE 10 512)'])) + client.login('user', 'pass') + typ, data = client.getquotaroot('INBOX') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['INBOX']) + + typ, data = client.getquotaroot('New folder') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['"New folder"']) + + def test_setquota(self): + client, server = self._setup(make_simple_handler('SETQUOTA', + ['* QUOTA "" (STORAGE 512)'])) + client.login('user', 'pass') + typ, data = client.setquota('', '(STORAGE 512)') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'"" (STORAGE 512)']) + self.assertEqual(server.args, ['""', '(STORAGE 512)']) + + # The limits argument is wrapped in parentheses if it is not already. + typ, data = client.setquota('', 'STORAGE 512') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['""', '(STORAGE 512)']) + + def test_getannotation(self): + client, server = self._setup(make_simple_handler('GETANNOTATION', + ['* ANNOTATION INBOX "/comment" ("value.shared" "Hello")'])) + client.login('user', 'pass') + typ, data = client.getannotation('INBOX', '/comment', 'value.shared') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['INBOX', '/comment', 'value.shared']) + + typ, data = client.getannotation('New folder', '/comment', 'value.shared') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['"New folder"', '/comment', 'value.shared']) + + def test_setannotation(self): + client, server = self._setup(make_simple_handler('SETANNOTATION')) + client.login('user', 'pass') + typ, data = client.setannotation('INBOX', '/comment', + '("value.shared" "My comment")') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, + ['INBOX', '/comment', '("value.shared" "My comment")']) + + typ, data = client.setannotation('New folder', '/comment', + '("value.shared" "My comment")') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, + ['"New folder"', '/comment', '("value.shared" "My comment")']) + + def test_proxyauth(self): + client, server = self._setup(make_simple_handler('PROXYAUTH')) + client.login('user', 'pass') + typ, data = client.proxyauth('user') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'PROXYAUTH completed']) + self.assertEqual(server.args, ['user']) + + typ, data = client.proxyauth('us er') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['"us er"']) def test_control_characters(self): client, _ = self._setup(SimpleIMAPHandler) @@ -753,12 +1478,12 @@ def handle_error(self, request, client_address): self.server_close() raise - if verbose: + if verbose > 1: print("creating server") server = MyServer(addr, hdlr) self.assertEqual(server.server_address, server.socket.getsockname()) - if verbose: + if verbose > 1: print("server created") print("ADDR =", addr) print("CLASS =", self.server_class) @@ -773,17 +1498,17 @@ def handle_error(self, request, client_address): kwargs={'poll_interval': 0.01}) t.daemon = True # In case this function raises. t.start() - if verbose: + if verbose > 1: print("server running") return server, t def reap_server(self, server, thread): - if verbose: + if verbose > 1: print("waiting for server") server.shutdown() server.server_close() thread.join() - if verbose: + if verbose > 1: print("done") @contextmanager @@ -821,7 +1546,7 @@ def test_bracket_flags(self): class BracketFlagHandler(SimpleIMAPHandler): def handle(self): - self.flags = ['Answered', 'Flagged', 'Deleted', 'Seen', 'Draft'] + self.flags = [r'\Answered', r'\Flagged', r'\Deleted', r'\Seen', r'\Draft'] super().handle() def cmd_AUTHENTICATE(self, tag, args): @@ -830,11 +1555,11 @@ def cmd_AUTHENTICATE(self, tag, args): self._send_tagged(tag, 'OK', 'FAKEAUTH successful') def cmd_SELECT(self, tag, args): - flag_msg = ' \\'.join(self.flags) + flag_msg = ' '.join(self.flags) self._send_line(('* FLAGS (%s)' % flag_msg).encode('ascii')) self._send_line(b'* 2 EXISTS') self._send_line(b'* 0 RECENT') - msg = ('* OK [PERMANENTFLAGS %s \\*)] Flags permitted.' + msg = ('* OK [PERMANENTFLAGS (%s \\*)] Flags permitted.' % flag_msg) self._send_line(msg.encode('ascii')) self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.') @@ -842,7 +1567,7 @@ def cmd_SELECT(self, tag, args): def cmd_STORE(self, tag, args): new_flags = args[2].strip('(').strip(')').split() self.flags.extend(new_flags) - flags_msg = '(FLAGS (%s))' % ' \\'.join(self.flags) + flags_msg = '(FLAGS (%s))' % ' '.join(self.flags) msg = '* %s FETCH %s' % (args[0], flags_msg) self._send_line(msg.encode('ascii')) self._send_tagged(tag, 'OK', 'STORE completed.') @@ -1097,7 +1822,7 @@ def test_with_statement(self): with self.reaped_server(SimpleIMAPHandler) as server: with self.imap_class(*server.server_address) as imap: imap.login('user', 'pass') - self.assertEqual(server.logged, 'user') + self.assertEqual(server.logged, ['user', '"pass"']) self.assertIsNone(server.logged) @threading_helper.reap_threads @@ -1106,7 +1831,7 @@ def test_with_statement_logout(self): with self.reaped_server(SimpleIMAPHandler) as server: with self.imap_class(*server.server_address) as imap: imap.login('user', 'pass') - self.assertEqual(server.logged, 'user') + self.assertEqual(server.logged, ['user', '"pass"']) imap.logout() self.assertIsNone(server.logged) self.assertIsNone(server.logged) diff --git a/Misc/NEWS.d/next/Library/2026-06-30-12-00-00.gh-issue-40038.qK7mGv.rst b/Misc/NEWS.d/next/Library/2026-06-30-12-00-00.gh-issue-40038.qK7mGv.rst new file mode 100644 index 000000000000000..1f393d23266bcef --- /dev/null +++ b/Misc/NEWS.d/next/Library/2026-06-30-12-00-00.gh-issue-40038.qK7mGv.rst @@ -0,0 +1,6 @@ +:mod:`imaplib` now again quotes command arguments when necessary, for +example mailbox names containing a space. Such quoting was inadvertently +disabled when the module was ported to Python 3, and the arguments are now +quoted according to the :rfc:`3501` grammar. For backward compatibility, +an argument already enclosed in double quotes is left unchanged, so code +that quotes arguments itself keeps working.