mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 21:51:50 +00:00 
			
		
		
		
	 adbdcdbd95
			
		
	
	
		adbdcdbd95
		
	
	
	
	
		
			
			This patch also deprecates the MalformedHeaderDefect. My best guess is that this defect was rendered obsolete by a refactoring of the parser, and the corresponding defect for the new parser (which this patch introduces) was overlooked.
		
			
				
	
	
		
			292 lines
		
	
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			292 lines
		
	
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import io
 | |
| import email
 | |
| import textwrap
 | |
| import unittest
 | |
| from email._policybase import Compat32
 | |
| from email import errors
 | |
| from email.message import Message
 | |
| from test.test_email import TestEmailBase
 | |
| 
 | |
| 
 | |
| class TestCustomMessage(TestEmailBase):
 | |
| 
 | |
|     class MyMessage(Message):
 | |
|         def __init__(self, policy):
 | |
|             self.check_policy = policy
 | |
|             super().__init__()
 | |
| 
 | |
|     MyPolicy = TestEmailBase.policy.clone(linesep='boo')
 | |
| 
 | |
|     def test_custom_message_gets_policy_if_possible_from_string(self):
 | |
|         msg = email.message_from_string("Subject: bogus\n\nmsg\n",
 | |
|                                         self.MyMessage,
 | |
|                                         policy=self.MyPolicy)
 | |
|         self.assertTrue(isinstance(msg, self.MyMessage))
 | |
|         self.assertIs(msg.check_policy, self.MyPolicy)
 | |
| 
 | |
|     def test_custom_message_gets_policy_if_possible_from_file(self):
 | |
|         source_file = io.StringIO("Subject: bogus\n\nmsg\n")
 | |
|         msg = email.message_from_file(source_file,
 | |
|                                       self.MyMessage,
 | |
|                                       policy=self.MyPolicy)
 | |
|         self.assertTrue(isinstance(msg, self.MyMessage))
 | |
|         self.assertIs(msg.check_policy, self.MyPolicy)
 | |
| 
 | |
|     # XXX add tests for other functions that take Message arg.
 | |
| 
 | |
| 
 | |
| class TestMessageDefectDetectionBase:
 | |
| 
 | |
|     dup_boundary_msg = textwrap.dedent("""\
 | |
|         Subject: XX
 | |
|         From: xx@xx.dk
 | |
|         To: XX
 | |
|         Mime-version: 1.0
 | |
|         Content-type: multipart/mixed;
 | |
|            boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
 | |
| 
 | |
|         --MS_Mac_OE_3071477847_720252_MIME_Part
 | |
|         Content-type: multipart/alternative;
 | |
|            boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
 | |
| 
 | |
|         --MS_Mac_OE_3071477847_720252_MIME_Part
 | |
|         Content-type: text/plain; charset="ISO-8859-1"
 | |
|         Content-transfer-encoding: quoted-printable
 | |
| 
 | |
|         text
 | |
| 
 | |
|         --MS_Mac_OE_3071477847_720252_MIME_Part
 | |
|         Content-type: text/html; charset="ISO-8859-1"
 | |
|         Content-transfer-encoding: quoted-printable
 | |
| 
 | |
|         <HTML></HTML>
 | |
| 
 | |
|         --MS_Mac_OE_3071477847_720252_MIME_Part--
 | |
| 
 | |
|         --MS_Mac_OE_3071477847_720252_MIME_Part
 | |
|         Content-type: image/gif; name="xx.gif";
 | |
|         Content-disposition: attachment
 | |
|         Content-transfer-encoding: base64
 | |
| 
 | |
|         Some removed base64 encoded chars.
 | |
| 
 | |
|         --MS_Mac_OE_3071477847_720252_MIME_Part--
 | |
| 
 | |
|         """)
 | |
| 
 | |
|     def test_same_boundary_inner_outer(self):
 | |
|         # XXX better would be to actually detect the duplicate.
 | |
|         msg = self._str_msg(self.dup_boundary_msg)
 | |
|         inner = msg.get_payload(0)
 | |
|         self.assertTrue(hasattr(inner, 'defects'))
 | |
|         self.assertEqual(len(self.get_defects(inner)), 1)
 | |
|         self.assertTrue(isinstance(self.get_defects(inner)[0],
 | |
|                                    errors.StartBoundaryNotFoundDefect))
 | |
| 
 | |
|     def test_same_boundary_inner_outer_raises_on_defect(self):
 | |
|         with self.assertRaises(errors.StartBoundaryNotFoundDefect):
 | |
|             self._str_msg(self.dup_boundary_msg,
 | |
|                 policy=self.policy.clone(raise_on_defect=True))
 | |
| 
 | |
|     no_boundary_msg = textwrap.dedent("""\
 | |
|         Date: Fri, 6 Apr 2001 09:23:06 -0800 (GMT-0800)
 | |
|         From: foobar
 | |
|         Subject: broken mail
 | |
|         MIME-Version: 1.0
 | |
|         Content-Type: multipart/report; report-type=delivery-status;
 | |
| 
 | |
|         --JAB03225.986577786/zinfandel.lacita.com
 | |
| 
 | |
|         One part
 | |
| 
 | |
|         --JAB03225.986577786/zinfandel.lacita.com
 | |
|         Content-Type: message/delivery-status
 | |
| 
 | |
|         Header: Another part
 | |
| 
 | |
|         --JAB03225.986577786/zinfandel.lacita.com--
 | |
|         """)
 | |
| 
 | |
|     def test_multipart_no_boundary(self):
 | |
|         msg = self._str_msg(self.no_boundary_msg)
 | |
|         self.assertTrue(isinstance(msg.get_payload(), str))
 | |
|         self.assertEqual(len(self.get_defects(msg)), 2)
 | |
|         self.assertTrue(isinstance(self.get_defects(msg)[0],
 | |
|                                    errors.NoBoundaryInMultipartDefect))
 | |
|         self.assertTrue(isinstance(self.get_defects(msg)[1],
 | |
|                                    errors.MultipartInvariantViolationDefect))
 | |
| 
 | |
|     def test_multipart_no_boundary_raise_on_defect(self):
 | |
|         with self.assertRaises(errors.NoBoundaryInMultipartDefect):
 | |
|             self._str_msg(self.no_boundary_msg,
 | |
|                 policy=self.policy.clone(raise_on_defect=True))
 | |
| 
 | |
|     multipart_msg = textwrap.dedent("""\
 | |
|         Date: Wed, 14 Nov 2007 12:56:23 GMT
 | |
|         From: foo@bar.invalid
 | |
|         To: foo@bar.invalid
 | |
|         Subject: Content-Transfer-Encoding: base64 and multipart
 | |
|         MIME-Version: 1.0
 | |
|         Content-Type: multipart/mixed;
 | |
|             boundary="===============3344438784458119861=="{}
 | |
| 
 | |
|         --===============3344438784458119861==
 | |
|         Content-Type: text/plain
 | |
| 
 | |
|         Test message
 | |
| 
 | |
|         --===============3344438784458119861==
 | |
|         Content-Type: application/octet-stream
 | |
|         Content-Transfer-Encoding: base64
 | |
| 
 | |
|         YWJj
 | |
| 
 | |
|         --===============3344438784458119861==--
 | |
|         """)
 | |
| 
 | |
|     def test_multipart_invalid_cte(self):
 | |
|         msg = self._str_msg(
 | |
|             self.multipart_msg.format("\nContent-Transfer-Encoding: base64"))
 | |
|         self.assertEqual(len(self.get_defects(msg)), 1)
 | |
|         self.assertIsInstance(self.get_defects(msg)[0],
 | |
|             errors.InvalidMultipartContentTransferEncodingDefect)
 | |
| 
 | |
|     def test_multipart_invalid_cte_raise_on_defect(self):
 | |
|         with self.assertRaises(
 | |
|                 errors.InvalidMultipartContentTransferEncodingDefect):
 | |
|             self._str_msg(
 | |
|                 self.multipart_msg.format(
 | |
|                     "\nContent-Transfer-Encoding: base64"),
 | |
|                 policy=self.policy.clone(raise_on_defect=True))
 | |
| 
 | |
|     def test_multipart_no_cte_no_defect(self):
 | |
|         msg = self._str_msg(self.multipart_msg.format(''))
 | |
|         self.assertEqual(len(self.get_defects(msg)), 0)
 | |
| 
 | |
|     def test_multipart_valid_cte_no_defect(self):
 | |
|         for cte in ('7bit', '8bit', 'BINary'):
 | |
|             msg = self._str_msg(
 | |
|                 self.multipart_msg.format("\nContent-Transfer-Encoding: "+cte))
 | |
|             self.assertEqual(len(self.get_defects(msg)), 0, "cte="+cte)
 | |
| 
 | |
|     lying_multipart_msg = textwrap.dedent("""\
 | |
|         From: "Allison Dunlap" <xxx@example.com>
 | |
|         To: yyy@example.com
 | |
|         Subject: 64423
 | |
|         Date: Sun, 11 Jul 2004 16:09:27 -0300
 | |
|         MIME-Version: 1.0
 | |
|         Content-Type: multipart/alternative;
 | |
| 
 | |
|         Blah blah blah
 | |
|         """)
 | |
| 
 | |
|     def test_lying_multipart(self):
 | |
|         msg = self._str_msg(self.lying_multipart_msg)
 | |
|         self.assertTrue(hasattr(msg, 'defects'))
 | |
|         self.assertEqual(len(self.get_defects(msg)), 2)
 | |
|         self.assertTrue(isinstance(self.get_defects(msg)[0],
 | |
|                                    errors.NoBoundaryInMultipartDefect))
 | |
|         self.assertTrue(isinstance(self.get_defects(msg)[1],
 | |
|                                    errors.MultipartInvariantViolationDefect))
 | |
| 
 | |
|     def test_lying_multipart_raise_on_defect(self):
 | |
|         with self.assertRaises(errors.NoBoundaryInMultipartDefect):
 | |
|             self._str_msg(self.lying_multipart_msg,
 | |
|                 policy=self.policy.clone(raise_on_defect=True))
 | |
| 
 | |
|     missing_start_boundary_msg = textwrap.dedent("""\
 | |
|         Content-Type: multipart/mixed; boundary="AAA"
 | |
|         From: Mail Delivery Subsystem <xxx@example.com>
 | |
|         To: yyy@example.com
 | |
| 
 | |
|         --AAA
 | |
| 
 | |
|         Stuff
 | |
| 
 | |
|         --AAA
 | |
|         Content-Type: message/rfc822
 | |
| 
 | |
|         From: webmaster@python.org
 | |
|         To: zzz@example.com
 | |
|         Content-Type: multipart/mixed; boundary="BBB"
 | |
| 
 | |
|         --BBB--
 | |
| 
 | |
|         --AAA--
 | |
| 
 | |
|         """)
 | |
| 
 | |
|     def test_missing_start_boundary(self):
 | |
|         # The message structure is:
 | |
|         #
 | |
|         # multipart/mixed
 | |
|         #    text/plain
 | |
|         #    message/rfc822
 | |
|         #        multipart/mixed [*]
 | |
|         #
 | |
|         # [*] This message is missing its start boundary
 | |
|         outer = self._str_msg(self.missing_start_boundary_msg)
 | |
|         bad = outer.get_payload(1).get_payload(0)
 | |
|         self.assertEqual(len(self.get_defects(bad)), 1)
 | |
|         self.assertTrue(isinstance(self.get_defects(bad)[0],
 | |
|                                    errors.StartBoundaryNotFoundDefect))
 | |
| 
 | |
|     def test_missing_start_boundary_raise_on_defect(self):
 | |
|         with self.assertRaises(errors.StartBoundaryNotFoundDefect):
 | |
|             self._str_msg(self.missing_start_boundary_msg,
 | |
|                           policy=self.policy.clone(raise_on_defect=True))
 | |
| 
 | |
|     def test_first_line_is_continuation_header(self):
 | |
|         msg = self._str_msg(' Line 1\nSubject: test\n\nbody')
 | |
|         self.assertEqual(msg.keys(), ['Subject'])
 | |
|         self.assertEqual(msg.get_payload(), 'body')
 | |
|         self.assertEqual(len(self.get_defects(msg)), 1)
 | |
|         self.assertDefectsEqual(self.get_defects(msg),
 | |
|                                  [errors.FirstHeaderLineIsContinuationDefect])
 | |
|         self.assertEqual(self.get_defects(msg)[0].line, ' Line 1\n')
 | |
| 
 | |
|     def test_first_line_is_continuation_header_raise_on_defect(self):
 | |
|         with self.assertRaises(errors.FirstHeaderLineIsContinuationDefect):
 | |
|             self._str_msg(' Line 1\nSubject: test\n\nbody\n',
 | |
|                           policy=self.policy.clone(raise_on_defect=True))
 | |
| 
 | |
|     def test_missing_header_body_separator(self):
 | |
|         # Our heuristic if we see a line that doesn't look like a header (no
 | |
|         # leading whitespace but no ':') is to assume that the blank line that
 | |
|         # separates the header from the body is missing, and to stop parsing
 | |
|         # headers and start parsing the body.
 | |
|         msg = self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n')
 | |
|         self.assertEqual(msg.keys(), ['Subject'])
 | |
|         self.assertEqual(msg.get_payload(), 'not a header\nTo: abc\n\nb\n')
 | |
|         self.assertDefectsEqual(self.get_defects(msg),
 | |
|                                 [errors.MissingHeaderBodySeparatorDefect])
 | |
| 
 | |
|     def test_missing_header_body_separator_raise_on_defect(self):
 | |
|         with self.assertRaises(errors.MissingHeaderBodySeparatorDefect):
 | |
|             self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n',
 | |
|                           policy=self.policy.clone(raise_on_defect=True))
 | |
| 
 | |
| 
 | |
| class TestMessageDefectDetection(TestMessageDefectDetectionBase, TestEmailBase):
 | |
| 
 | |
|     def get_defects(self, obj):
 | |
|         return obj.defects
 | |
| 
 | |
| 
 | |
| class TestMessageDefectDetectionCapture(TestMessageDefectDetectionBase,
 | |
|                                         TestEmailBase):
 | |
| 
 | |
|     class CapturePolicy(Compat32):
 | |
|         captured = None
 | |
|         def register_defect(self, obj, defect):
 | |
|             self.captured.append(defect)
 | |
| 
 | |
|     def setUp(self):
 | |
|         self.policy = self.CapturePolicy(captured=list())
 | |
| 
 | |
|     def get_defects(self, obj):
 | |
|         return self.policy.captured
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     unittest.main()
 |