| 
									
										
										
										
											2024-02-03 18:18:46 +02:00
										 |  |  | """Tests common to tarfile and zipfile.""" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import os | 
					
						
							|  |  |  | import sys | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-02-11 12:38:07 +02:00
										 |  |  | from test.support import swap_attr | 
					
						
							| 
									
										
										
										
											2024-02-03 18:18:46 +02:00
										 |  |  | from test.support import os_helper | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class OverwriteTests: | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def setUp(self): | 
					
						
							|  |  |  |         os.makedirs(self.testdir) | 
					
						
							|  |  |  |         self.addCleanup(os_helper.rmtree, self.testdir) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def create_file(self, path, content=b''): | 
					
						
							|  |  |  |         with open(path, 'wb') as f: | 
					
						
							|  |  |  |             f.write(content) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def open(self, path): | 
					
						
							|  |  |  |         raise NotImplementedError | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def extractall(self, ar): | 
					
						
							|  |  |  |         raise NotImplementedError | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_overwrite_file_as_file(self): | 
					
						
							|  |  |  |         target = os.path.join(self.testdir, 'test') | 
					
						
							|  |  |  |         self.create_file(target, b'content') | 
					
						
							|  |  |  |         with self.open(self.ar_with_file) as ar: | 
					
						
							|  |  |  |             self.extractall(ar) | 
					
						
							|  |  |  |         self.assertTrue(os.path.isfile(target)) | 
					
						
							|  |  |  |         with open(target, 'rb') as f: | 
					
						
							|  |  |  |             self.assertEqual(f.read(), b'newcontent') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_overwrite_dir_as_dir(self): | 
					
						
							|  |  |  |         target = os.path.join(self.testdir, 'test') | 
					
						
							|  |  |  |         os.mkdir(target) | 
					
						
							|  |  |  |         with self.open(self.ar_with_dir) as ar: | 
					
						
							|  |  |  |             self.extractall(ar) | 
					
						
							|  |  |  |         self.assertTrue(os.path.isdir(target)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_overwrite_dir_as_implicit_dir(self): | 
					
						
							|  |  |  |         target = os.path.join(self.testdir, 'test') | 
					
						
							|  |  |  |         os.mkdir(target) | 
					
						
							|  |  |  |         with self.open(self.ar_with_implicit_dir) as ar: | 
					
						
							|  |  |  |             self.extractall(ar) | 
					
						
							|  |  |  |         self.assertTrue(os.path.isdir(target)) | 
					
						
							|  |  |  |         self.assertTrue(os.path.isfile(os.path.join(target, 'file'))) | 
					
						
							|  |  |  |         with open(os.path.join(target, 'file'), 'rb') as f: | 
					
						
							|  |  |  |             self.assertEqual(f.read(), b'newcontent') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_overwrite_dir_as_file(self): | 
					
						
							|  |  |  |         target = os.path.join(self.testdir, 'test') | 
					
						
							|  |  |  |         os.mkdir(target) | 
					
						
							|  |  |  |         with self.open(self.ar_with_file) as ar: | 
					
						
							|  |  |  |             with self.assertRaises(PermissionError if sys.platform == 'win32' | 
					
						
							|  |  |  |                                    else IsADirectoryError): | 
					
						
							|  |  |  |                 self.extractall(ar) | 
					
						
							|  |  |  |         self.assertTrue(os.path.isdir(target)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_overwrite_file_as_dir(self): | 
					
						
							|  |  |  |         target = os.path.join(self.testdir, 'test') | 
					
						
							|  |  |  |         self.create_file(target, b'content') | 
					
						
							|  |  |  |         with self.open(self.ar_with_dir) as ar: | 
					
						
							|  |  |  |             with self.assertRaises(FileExistsError): | 
					
						
							|  |  |  |                 self.extractall(ar) | 
					
						
							|  |  |  |         self.assertTrue(os.path.isfile(target)) | 
					
						
							|  |  |  |         with open(target, 'rb') as f: | 
					
						
							|  |  |  |             self.assertEqual(f.read(), b'content') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_overwrite_file_as_implicit_dir(self): | 
					
						
							|  |  |  |         target = os.path.join(self.testdir, 'test') | 
					
						
							|  |  |  |         self.create_file(target, b'content') | 
					
						
							|  |  |  |         with self.open(self.ar_with_implicit_dir) as ar: | 
					
						
							|  |  |  |             with self.assertRaises(FileNotFoundError if sys.platform == 'win32' | 
					
						
							|  |  |  |                                    else NotADirectoryError): | 
					
						
							|  |  |  |                 self.extractall(ar) | 
					
						
							|  |  |  |         self.assertTrue(os.path.isfile(target)) | 
					
						
							|  |  |  |         with open(target, 'rb') as f: | 
					
						
							|  |  |  |             self.assertEqual(f.read(), b'content') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @os_helper.skip_unless_symlink | 
					
						
							|  |  |  |     def test_overwrite_file_symlink_as_file(self): | 
					
						
							|  |  |  |         # XXX: It is potential security vulnerability. | 
					
						
							|  |  |  |         target = os.path.join(self.testdir, 'test') | 
					
						
							|  |  |  |         target2 = os.path.join(self.testdir, 'test2') | 
					
						
							|  |  |  |         self.create_file(target2, b'content') | 
					
						
							|  |  |  |         os.symlink('test2', target) | 
					
						
							|  |  |  |         with self.open(self.ar_with_file) as ar: | 
					
						
							|  |  |  |             self.extractall(ar) | 
					
						
							|  |  |  |         self.assertTrue(os.path.islink(target)) | 
					
						
							|  |  |  |         self.assertTrue(os.path.isfile(target2)) | 
					
						
							|  |  |  |         with open(target2, 'rb') as f: | 
					
						
							|  |  |  |             self.assertEqual(f.read(), b'newcontent') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @os_helper.skip_unless_symlink | 
					
						
							|  |  |  |     def test_overwrite_broken_file_symlink_as_file(self): | 
					
						
							|  |  |  |         # XXX: It is potential security vulnerability. | 
					
						
							|  |  |  |         target = os.path.join(self.testdir, 'test') | 
					
						
							|  |  |  |         target2 = os.path.join(self.testdir, 'test2') | 
					
						
							|  |  |  |         os.symlink('test2', target) | 
					
						
							|  |  |  |         with self.open(self.ar_with_file) as ar: | 
					
						
							|  |  |  |             self.extractall(ar) | 
					
						
							|  |  |  |         self.assertTrue(os.path.islink(target)) | 
					
						
							|  |  |  |         self.assertTrue(os.path.isfile(target2)) | 
					
						
							|  |  |  |         with open(target2, 'rb') as f: | 
					
						
							|  |  |  |             self.assertEqual(f.read(), b'newcontent') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @os_helper.skip_unless_symlink | 
					
						
							|  |  |  |     def test_overwrite_dir_symlink_as_dir(self): | 
					
						
							|  |  |  |         # XXX: It is potential security vulnerability. | 
					
						
							|  |  |  |         target = os.path.join(self.testdir, 'test') | 
					
						
							|  |  |  |         target2 = os.path.join(self.testdir, 'test2') | 
					
						
							|  |  |  |         os.mkdir(target2) | 
					
						
							|  |  |  |         os.symlink('test2', target, target_is_directory=True) | 
					
						
							|  |  |  |         with self.open(self.ar_with_dir) as ar: | 
					
						
							|  |  |  |             self.extractall(ar) | 
					
						
							|  |  |  |         self.assertTrue(os.path.islink(target)) | 
					
						
							|  |  |  |         self.assertTrue(os.path.isdir(target2)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @os_helper.skip_unless_symlink | 
					
						
							|  |  |  |     def test_overwrite_dir_symlink_as_implicit_dir(self): | 
					
						
							|  |  |  |         # XXX: It is potential security vulnerability. | 
					
						
							|  |  |  |         target = os.path.join(self.testdir, 'test') | 
					
						
							|  |  |  |         target2 = os.path.join(self.testdir, 'test2') | 
					
						
							|  |  |  |         os.mkdir(target2) | 
					
						
							|  |  |  |         os.symlink('test2', target, target_is_directory=True) | 
					
						
							|  |  |  |         with self.open(self.ar_with_implicit_dir) as ar: | 
					
						
							|  |  |  |             self.extractall(ar) | 
					
						
							|  |  |  |         self.assertTrue(os.path.islink(target)) | 
					
						
							|  |  |  |         self.assertTrue(os.path.isdir(target2)) | 
					
						
							|  |  |  |         self.assertTrue(os.path.isfile(os.path.join(target2, 'file'))) | 
					
						
							|  |  |  |         with open(os.path.join(target2, 'file'), 'rb') as f: | 
					
						
							|  |  |  |             self.assertEqual(f.read(), b'newcontent') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @os_helper.skip_unless_symlink | 
					
						
							|  |  |  |     def test_overwrite_broken_dir_symlink_as_dir(self): | 
					
						
							|  |  |  |         target = os.path.join(self.testdir, 'test') | 
					
						
							|  |  |  |         target2 = os.path.join(self.testdir, 'test2') | 
					
						
							|  |  |  |         os.symlink('test2', target, target_is_directory=True) | 
					
						
							|  |  |  |         with self.open(self.ar_with_dir) as ar: | 
					
						
							|  |  |  |             with self.assertRaises(FileExistsError): | 
					
						
							|  |  |  |                 self.extractall(ar) | 
					
						
							|  |  |  |         self.assertTrue(os.path.islink(target)) | 
					
						
							|  |  |  |         self.assertFalse(os.path.exists(target2)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @os_helper.skip_unless_symlink | 
					
						
							|  |  |  |     def test_overwrite_broken_dir_symlink_as_implicit_dir(self): | 
					
						
							|  |  |  |         target = os.path.join(self.testdir, 'test') | 
					
						
							|  |  |  |         target2 = os.path.join(self.testdir, 'test2') | 
					
						
							|  |  |  |         os.symlink('test2', target, target_is_directory=True) | 
					
						
							|  |  |  |         with self.open(self.ar_with_implicit_dir) as ar: | 
					
						
							|  |  |  |             with self.assertRaises(FileExistsError): | 
					
						
							|  |  |  |                 self.extractall(ar) | 
					
						
							|  |  |  |         self.assertTrue(os.path.islink(target)) | 
					
						
							|  |  |  |         self.assertFalse(os.path.exists(target2)) | 
					
						
							| 
									
										
										
										
											2024-02-11 12:38:07 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_concurrent_extract_dir(self): | 
					
						
							|  |  |  |         target = os.path.join(self.testdir, 'test') | 
					
						
							|  |  |  |         def concurrent_mkdir(*args, **kwargs): | 
					
						
							|  |  |  |             orig_mkdir(*args, **kwargs) | 
					
						
							|  |  |  |             orig_mkdir(*args, **kwargs) | 
					
						
							|  |  |  |         with swap_attr(os, 'mkdir', concurrent_mkdir) as orig_mkdir: | 
					
						
							|  |  |  |             with self.open(self.ar_with_dir) as ar: | 
					
						
							|  |  |  |                 self.extractall(ar) | 
					
						
							|  |  |  |         self.assertTrue(os.path.isdir(target)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_concurrent_extract_implicit_dir(self): | 
					
						
							|  |  |  |         target = os.path.join(self.testdir, 'test') | 
					
						
							|  |  |  |         def concurrent_mkdir(*args, **kwargs): | 
					
						
							|  |  |  |             orig_mkdir(*args, **kwargs) | 
					
						
							|  |  |  |             orig_mkdir(*args, **kwargs) | 
					
						
							|  |  |  |         with swap_attr(os, 'mkdir', concurrent_mkdir) as orig_mkdir: | 
					
						
							|  |  |  |             with self.open(self.ar_with_implicit_dir) as ar: | 
					
						
							|  |  |  |                 self.extractall(ar) | 
					
						
							|  |  |  |         self.assertTrue(os.path.isdir(target)) | 
					
						
							|  |  |  |         self.assertTrue(os.path.isfile(os.path.join(target, 'file'))) |