mirror of
				https://github.com/python/cpython.git
				synced 2025-11-03 23:21:29 +00:00 
			
		
		
		
	bpo-27397: Make email module properly handle invalid-length base64 strings (#7583)
When attempting to base64-decode a payload of invalid length (1 mod 4), properly recognize and handle it. The given data will be returned as-is, i.e. not decoded, along with a new defect, InvalidBase64LengthDefect.
This commit is contained in:
		
							parent
							
								
									5a98209180
								
							
						
					
					
						commit
						c3f55be7dd
					
				
					 7 changed files with 70 additions and 18 deletions
				
			
		| 
						 | 
					@ -108,3 +108,7 @@ All defect classes are subclassed from :class:`email.errors.MessageDefect`.
 | 
				
			||||||
* :class:`InvalidBase64CharactersDefect` -- When decoding a block of base64
 | 
					* :class:`InvalidBase64CharactersDefect` -- When decoding a block of base64
 | 
				
			||||||
  encoded bytes, characters outside the base64 alphabet were encountered.
 | 
					  encoded bytes, characters outside the base64 alphabet were encountered.
 | 
				
			||||||
  The characters are ignored, but the resulting decoded bytes may be invalid.
 | 
					  The characters are ignored, but the resulting decoded bytes may be invalid.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					* :class:`InvalidBase64LengthDefect` -- When decoding a block of base64 encoded
 | 
				
			||||||
 | 
					  bytes, the number of non-padding base64 characters was invalid (1 more than
 | 
				
			||||||
 | 
					  a multiple of 4).  The encoded block was kept as-is.
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -98,30 +98,42 @@ def len_q(bstring):
 | 
				
			||||||
#
 | 
					#
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def decode_b(encoded):
 | 
					def decode_b(encoded):
 | 
				
			||||||
    defects = []
 | 
					    # First try encoding with validate=True, fixing the padding if needed.
 | 
				
			||||||
 | 
					    # This will succeed only if encoded includes no invalid characters.
 | 
				
			||||||
    pad_err = len(encoded) % 4
 | 
					    pad_err = len(encoded) % 4
 | 
				
			||||||
    if pad_err:
 | 
					    missing_padding = b'==='[:4-pad_err] if pad_err else b''
 | 
				
			||||||
        defects.append(errors.InvalidBase64PaddingDefect())
 | 
					 | 
				
			||||||
        padded_encoded = encoded + b'==='[:4-pad_err]
 | 
					 | 
				
			||||||
    else:
 | 
					 | 
				
			||||||
        padded_encoded = encoded
 | 
					 | 
				
			||||||
    try:
 | 
					    try:
 | 
				
			||||||
        return base64.b64decode(padded_encoded, validate=True), defects
 | 
					        return (
 | 
				
			||||||
 | 
					            base64.b64decode(encoded + missing_padding, validate=True),
 | 
				
			||||||
 | 
					            [errors.InvalidBase64PaddingDefect()] if pad_err else [],
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
    except binascii.Error:
 | 
					    except binascii.Error:
 | 
				
			||||||
        # Since we had correct padding, this must an invalid char error.
 | 
					        # Since we had correct padding, this is likely an invalid char error.
 | 
				
			||||||
        defects = [errors.InvalidBase64CharactersDefect()]
 | 
					        #
 | 
				
			||||||
        # The non-alphabet characters are ignored as far as padding
 | 
					        # The non-alphabet characters are ignored as far as padding
 | 
				
			||||||
        # goes, but we don't know how many there are.  So we'll just
 | 
					        # goes, but we don't know how many there are.  So try without adding
 | 
				
			||||||
        # try various padding lengths until something works.
 | 
					        # padding to see if it works.
 | 
				
			||||||
        for i in 0, 1, 2, 3:
 | 
					 | 
				
			||||||
        try:
 | 
					        try:
 | 
				
			||||||
                return base64.b64decode(encoded+b'='*i, validate=False), defects
 | 
					            return (
 | 
				
			||||||
 | 
					                base64.b64decode(encoded, validate=False),
 | 
				
			||||||
 | 
					                [errors.InvalidBase64CharactersDefect()],
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
        except binascii.Error:
 | 
					        except binascii.Error:
 | 
				
			||||||
                if i==0:
 | 
					            # Add as much padding as could possibly be necessary (extra padding
 | 
				
			||||||
                    defects.append(errors.InvalidBase64PaddingDefect())
 | 
					            # is ignored).
 | 
				
			||||||
        else:
 | 
					            try:
 | 
				
			||||||
            # This should never happen.
 | 
					                return (
 | 
				
			||||||
            raise AssertionError("unexpected binascii.Error")
 | 
					                    base64.b64decode(encoded + b'==', validate=False),
 | 
				
			||||||
 | 
					                    [errors.InvalidBase64CharactersDefect(),
 | 
				
			||||||
 | 
					                     errors.InvalidBase64PaddingDefect()],
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					            except binascii.Error:
 | 
				
			||||||
 | 
					                # This only happens when the encoded string's length is 1 more
 | 
				
			||||||
 | 
					                # than a multiple of 4, which is invalid.
 | 
				
			||||||
 | 
					                #
 | 
				
			||||||
 | 
					                # bpo-27397: Just return the encoded string since there's no
 | 
				
			||||||
 | 
					                # way to decode.
 | 
				
			||||||
 | 
					                return encoded, [errors.InvalidBase64LengthDefect()]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def encode_b(bstring):
 | 
					def encode_b(bstring):
 | 
				
			||||||
    return base64.b64encode(bstring).decode('ascii')
 | 
					    return base64.b64encode(bstring).decode('ascii')
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -73,6 +73,9 @@ class InvalidBase64PaddingDefect(MessageDefect):
 | 
				
			||||||
class InvalidBase64CharactersDefect(MessageDefect):
 | 
					class InvalidBase64CharactersDefect(MessageDefect):
 | 
				
			||||||
    """base64 encoded sequence had characters not in base64 alphabet"""
 | 
					    """base64 encoded sequence had characters not in base64 alphabet"""
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class InvalidBase64LengthDefect(MessageDefect):
 | 
				
			||||||
 | 
					    """base64 encoded sequence had invalid length (1 mod 4)"""
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# These errors are specific to header parsing.
 | 
					# These errors are specific to header parsing.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class HeaderDefect(MessageDefect):
 | 
					class HeaderDefect(MessageDefect):
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -33,7 +33,10 @@ def test_simple(self):
 | 
				
			||||||
        self._test(b'Zm9v', b'foo')
 | 
					        self._test(b'Zm9v', b'foo')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def test_missing_padding(self):
 | 
					    def test_missing_padding(self):
 | 
				
			||||||
 | 
					        # 1 missing padding character
 | 
				
			||||||
        self._test(b'dmk', b'vi', [errors.InvalidBase64PaddingDefect])
 | 
					        self._test(b'dmk', b'vi', [errors.InvalidBase64PaddingDefect])
 | 
				
			||||||
 | 
					        # 2 missing padding characters
 | 
				
			||||||
 | 
					        self._test(b'dg', b'v', [errors.InvalidBase64PaddingDefect])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def test_invalid_character(self):
 | 
					    def test_invalid_character(self):
 | 
				
			||||||
        self._test(b'dm\x01k===', b'vi', [errors.InvalidBase64CharactersDefect])
 | 
					        self._test(b'dm\x01k===', b'vi', [errors.InvalidBase64CharactersDefect])
 | 
				
			||||||
| 
						 | 
					@ -42,6 +45,9 @@ def test_invalid_character_and_bad_padding(self):
 | 
				
			||||||
        self._test(b'dm\x01k', b'vi', [errors.InvalidBase64CharactersDefect,
 | 
					        self._test(b'dm\x01k', b'vi', [errors.InvalidBase64CharactersDefect,
 | 
				
			||||||
                                       errors.InvalidBase64PaddingDefect])
 | 
					                                       errors.InvalidBase64PaddingDefect])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_invalid_length(self):
 | 
				
			||||||
 | 
					        self._test(b'abcde', b'abcde', [errors.InvalidBase64LengthDefect])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class TestDecode(TestEmailBase):
 | 
					class TestDecode(TestEmailBase):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -347,6 +347,15 @@ def test_get_unstructured_invalid_base64_character_and_bad_padding(self):
 | 
				
			||||||
             errors.InvalidBase64PaddingDefect],
 | 
					             errors.InvalidBase64PaddingDefect],
 | 
				
			||||||
            '')
 | 
					            '')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_get_unstructured_invalid_base64_length(self):
 | 
				
			||||||
 | 
					        # bpo-27397: Return the encoded string since there's no way to decode.
 | 
				
			||||||
 | 
					        self._test_get_x(self._get_unst,
 | 
				
			||||||
 | 
					            '=?utf-8?b?abcde?=',
 | 
				
			||||||
 | 
					            'abcde',
 | 
				
			||||||
 | 
					            'abcde',
 | 
				
			||||||
 | 
					            [errors.InvalidBase64LengthDefect],
 | 
				
			||||||
 | 
					            '')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def test_get_unstructured_no_whitespace_between_ews(self):
 | 
					    def test_get_unstructured_no_whitespace_between_ews(self):
 | 
				
			||||||
        self._test_get_x(self._get_unst,
 | 
					        self._test_get_x(self._get_unst,
 | 
				
			||||||
            '=?utf-8?q?foo?==?utf-8?q?bar?=',
 | 
					            '=?utf-8?q?foo?==?utf-8?q?bar?=',
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -254,6 +254,23 @@ def test_invalid_chars_in_base64_payload(self):
 | 
				
			||||||
        self.assertDefectsEqual(self.get_defects(msg),
 | 
					        self.assertDefectsEqual(self.get_defects(msg),
 | 
				
			||||||
                                [errors.InvalidBase64CharactersDefect])
 | 
					                                [errors.InvalidBase64CharactersDefect])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_invalid_length_of_base64_payload(self):
 | 
				
			||||||
 | 
					        source = textwrap.dedent("""\
 | 
				
			||||||
 | 
					            Subject: test
 | 
				
			||||||
 | 
					            MIME-Version: 1.0
 | 
				
			||||||
 | 
					            Content-Type: text/plain; charset="utf-8"
 | 
				
			||||||
 | 
					            Content-Transfer-Encoding: base64
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            abcde
 | 
				
			||||||
 | 
					            """)
 | 
				
			||||||
 | 
					        msg = self._str_msg(source)
 | 
				
			||||||
 | 
					        with self._raise_point(errors.InvalidBase64LengthDefect):
 | 
				
			||||||
 | 
					            payload = msg.get_payload(decode=True)
 | 
				
			||||||
 | 
					        if self.raise_expected: return
 | 
				
			||||||
 | 
					        self.assertEqual(payload, b'abcde')
 | 
				
			||||||
 | 
					        self.assertDefectsEqual(self.get_defects(msg),
 | 
				
			||||||
 | 
					                                [errors.InvalidBase64LengthDefect])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def test_missing_ending_boundary(self):
 | 
					    def test_missing_ending_boundary(self):
 | 
				
			||||||
        source = textwrap.dedent("""\
 | 
					        source = textwrap.dedent("""\
 | 
				
			||||||
            To: 1@harrydomain4.com
 | 
					            To: 1@harrydomain4.com
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1 @@
 | 
				
			||||||
 | 
					Make email module properly handle invalid-length base64 strings.
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue