mirror of
https://github.com/python/cpython.git
synced 2025-12-08 06:10:17 +00:00
gh-135307: Fix email error when policy max_line_length is set to 0 or None (#135367)
RDM: Like the change made in a earlier PR to the folder, we can/must use 'maxlen' as a stand in for 'unlimited' when computing line lengths when max_line_length is 0 or None; otherwise the computation results in a traceback.
This commit is contained in:
parent
173cc53d9f
commit
6d45cd8dbb
3 changed files with 35 additions and 5 deletions
|
|
@ -2,6 +2,7 @@
|
||||||
import email.charset
|
import email.charset
|
||||||
import email.message
|
import email.message
|
||||||
import email.errors
|
import email.errors
|
||||||
|
import sys
|
||||||
from email import quoprimime
|
from email import quoprimime
|
||||||
|
|
||||||
class ContentManager:
|
class ContentManager:
|
||||||
|
|
@ -142,13 +143,15 @@ def _encode_base64(data, max_line_length):
|
||||||
|
|
||||||
|
|
||||||
def _encode_text(string, charset, cte, policy):
|
def _encode_text(string, charset, cte, policy):
|
||||||
|
# If max_line_length is 0 or None, there is no limit.
|
||||||
|
maxlen = policy.max_line_length or sys.maxsize
|
||||||
lines = string.encode(charset).splitlines()
|
lines = string.encode(charset).splitlines()
|
||||||
linesep = policy.linesep.encode('ascii')
|
linesep = policy.linesep.encode('ascii')
|
||||||
def embedded_body(lines): return linesep.join(lines) + linesep
|
def embedded_body(lines): return linesep.join(lines) + linesep
|
||||||
def normal_body(lines): return b'\n'.join(lines) + b'\n'
|
def normal_body(lines): return b'\n'.join(lines) + b'\n'
|
||||||
if cte is None:
|
if cte is None:
|
||||||
# Use heuristics to decide on the "best" encoding.
|
# Use heuristics to decide on the "best" encoding.
|
||||||
if max((len(x) for x in lines), default=0) <= policy.max_line_length:
|
if max(map(len, lines), default=0) <= maxlen:
|
||||||
try:
|
try:
|
||||||
return '7bit', normal_body(lines).decode('ascii')
|
return '7bit', normal_body(lines).decode('ascii')
|
||||||
except UnicodeDecodeError:
|
except UnicodeDecodeError:
|
||||||
|
|
@ -156,8 +159,7 @@ def normal_body(lines): return b'\n'.join(lines) + b'\n'
|
||||||
if policy.cte_type == '8bit':
|
if policy.cte_type == '8bit':
|
||||||
return '8bit', normal_body(lines).decode('ascii', 'surrogateescape')
|
return '8bit', normal_body(lines).decode('ascii', 'surrogateescape')
|
||||||
sniff = embedded_body(lines[:10])
|
sniff = embedded_body(lines[:10])
|
||||||
sniff_qp = quoprimime.body_encode(sniff.decode('latin-1'),
|
sniff_qp = quoprimime.body_encode(sniff.decode('latin-1'), maxlen)
|
||||||
policy.max_line_length)
|
|
||||||
sniff_base64 = binascii.b2a_base64(sniff)
|
sniff_base64 = binascii.b2a_base64(sniff)
|
||||||
# This is a little unfair to qp; it includes lineseps, base64 doesn't.
|
# This is a little unfair to qp; it includes lineseps, base64 doesn't.
|
||||||
if len(sniff_qp) > len(sniff_base64):
|
if len(sniff_qp) > len(sniff_base64):
|
||||||
|
|
@ -172,9 +174,9 @@ def normal_body(lines): return b'\n'.join(lines) + b'\n'
|
||||||
data = normal_body(lines).decode('ascii', 'surrogateescape')
|
data = normal_body(lines).decode('ascii', 'surrogateescape')
|
||||||
elif cte == 'quoted-printable':
|
elif cte == 'quoted-printable':
|
||||||
data = quoprimime.body_encode(normal_body(lines).decode('latin-1'),
|
data = quoprimime.body_encode(normal_body(lines).decode('latin-1'),
|
||||||
policy.max_line_length)
|
maxlen)
|
||||||
elif cte == 'base64':
|
elif cte == 'base64':
|
||||||
data = _encode_base64(embedded_body(lines), policy.max_line_length)
|
data = _encode_base64(embedded_body(lines), maxlen)
|
||||||
else:
|
else:
|
||||||
raise ValueError("Unknown content transfer encoding {}".format(cte))
|
raise ValueError("Unknown content transfer encoding {}".format(cte))
|
||||||
return cte, data
|
return cte, data
|
||||||
|
|
|
||||||
|
|
@ -1004,6 +1004,32 @@ def test_folding_with_long_nospace_http_policy_1(self):
|
||||||
parsed_msg = message_from_bytes(m.as_bytes(), policy=policy.default)
|
parsed_msg = message_from_bytes(m.as_bytes(), policy=policy.default)
|
||||||
self.assertEqual(parsed_msg['Message-ID'], m['Message-ID'])
|
self.assertEqual(parsed_msg['Message-ID'], m['Message-ID'])
|
||||||
|
|
||||||
|
def test_no_wrapping_max_line_length(self):
|
||||||
|
# Test that falsey 'max_line_length' are converted to sys.maxsize.
|
||||||
|
for n in [0, None]:
|
||||||
|
with self.subTest(max_line_length=n):
|
||||||
|
self.do_test_no_wrapping_max_line_length(n)
|
||||||
|
|
||||||
|
def do_test_no_wrapping_max_line_length(self, falsey):
|
||||||
|
self.assertFalse(falsey)
|
||||||
|
pol = policy.default.clone(max_line_length=falsey)
|
||||||
|
subj = "S" * 100
|
||||||
|
body = "B" * 100
|
||||||
|
msg = EmailMessage(policy=pol)
|
||||||
|
msg["From"] = "a@ex.com"
|
||||||
|
msg["To"] = "b@ex.com"
|
||||||
|
msg["Subject"] = subj
|
||||||
|
msg.set_content(body)
|
||||||
|
|
||||||
|
raw = msg.as_bytes()
|
||||||
|
self.assertNotIn(b"=\n", raw,
|
||||||
|
"Found fold indicator; wrapping not disabled")
|
||||||
|
|
||||||
|
parsed = message_from_bytes(raw, policy=policy.default)
|
||||||
|
self.assertEqual(parsed["Subject"], subj)
|
||||||
|
parsed_body = parsed.get_body().get_content().rstrip('\n')
|
||||||
|
self.assertEqual(parsed_body, body)
|
||||||
|
|
||||||
def test_invalid_header_names(self):
|
def test_invalid_header_names(self):
|
||||||
invalid_headers = [
|
invalid_headers = [
|
||||||
('Invalid Header', 'contains space'),
|
('Invalid Header', 'contains space'),
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,2 @@
|
||||||
|
:mod:`email`: Fix exception in ``set_content()`` when encoding text
|
||||||
|
and max_line_length is set to ``0`` or ``None`` (unlimited).
|
||||||
Loading…
Add table
Add a link
Reference in a new issue