GH-128520: Divide pathlib ABCs into three classes (#128523)

In the private pathlib ABCs, rename `PurePathBase` to `JoinablePath`, and
split `PathBase` into `ReadablePath` and `WritablePath`. This improves the
API fit for read-only virtual filesystems.

The split of `PathBase` entails a similar split of `CopyWorker` (implements
copying) and the test cases in `test_pathlib_abc`.

In a later patch, we'll make `WritablePath` inherit directly from
`JoinablePath` rather than `ReadablePath`. For a couple of reasons,
this isn't quite possible yet.
This commit is contained in:
Barney Gale 2025-01-11 19:27:47 +00:00 committed by GitHub
parent 0946ed25b5
commit 22a442181d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 317 additions and 307 deletions

View file

@ -4,7 +4,7 @@
import errno
import unittest
from pathlib._abc import PurePathBase, PathBase
from pathlib._abc import JoinablePath, ReadablePath, WritablePath
from pathlib._types import Parser
import posixpath
@ -31,8 +31,8 @@ def needs_windows(fn):
#
class PurePathBaseTest(unittest.TestCase):
cls = PurePathBase
class JoinablePathTest(unittest.TestCase):
cls = JoinablePath
def test_magic_methods(self):
P = self.cls
@ -51,7 +51,7 @@ def test_parser(self):
self.assertIs(self.cls.parser, posixpath)
class DummyPurePath(PurePathBase):
class DummyJoinablePath(JoinablePath):
__slots__ = ('_segments',)
def __init__(self, *segments):
@ -63,7 +63,7 @@ def __str__(self):
return ''
def __eq__(self, other):
if not isinstance(other, DummyPurePath):
if not isinstance(other, DummyJoinablePath):
return NotImplemented
return str(self) == str(other)
@ -77,8 +77,8 @@ def with_segments(self, *pathsegments):
return type(self)(*pathsegments)
class DummyPurePathTest(unittest.TestCase):
cls = DummyPurePath
class DummyJoinablePathTest(unittest.TestCase):
cls = DummyJoinablePath
# Use a base path that's unrelated to any real filesystem path.
base = f'/this/path/kills/fascists/{TESTFN}'
@ -916,9 +916,9 @@ def test_with_suffix_invalid(self):
#
class DummyPathIO(io.BytesIO):
class DummyWritablePathIO(io.BytesIO):
"""
Used by DummyPath to implement `open('w')`
Used by DummyWritablePath to implement `open('w')`
"""
def __init__(self, files, path):
@ -931,10 +931,10 @@ def close(self):
super().close()
class DummyPath(PathBase):
class DummyReadablePath(ReadablePath):
"""
Simple implementation of PathBase that keeps files and directories in
memory.
Simple implementation of DummyReadablePath that keeps files and
directories in memory.
"""
__slots__ = ('_segments')
@ -950,7 +950,7 @@ def __str__(self):
return ''
def __eq__(self, other):
if not isinstance(other, DummyPath):
if not isinstance(other, DummyReadablePath):
return NotImplemented
return str(self) == str(other)
@ -990,10 +990,11 @@ def open(self, mode='r', buffering=-1, encoding=None,
raise FileNotFoundError(errno.ENOENT, "File not found", path)
stream = io.BytesIO(self._files[path])
elif mode == 'w':
# FIXME: move to DummyWritablePath
parent, name = posixpath.split(path)
if parent not in self._directories:
raise FileNotFoundError(errno.ENOENT, "File not found", parent)
stream = DummyPathIO(self._files, path)
stream = DummyWritablePathIO(self._files, path)
self._files[path] = b''
self._directories[parent].add(name)
else:
@ -1011,6 +1012,10 @@ def iterdir(self):
else:
raise FileNotFoundError(errno.ENOENT, "File not found", path)
class DummyWritablePath(DummyReadablePath, WritablePath):
__slots__ = ()
def mkdir(self, mode=0o777, parents=False, exist_ok=False):
path = str(self)
parent = str(self.parent)
@ -1029,24 +1034,11 @@ def mkdir(self, mode=0o777, parents=False, exist_ok=False):
self.parent.mkdir(parents=True, exist_ok=True)
self.mkdir(mode, parents=False, exist_ok=exist_ok)
def _delete(self):
path = str(self)
if path in self._files:
del self._files[path]
elif path in self._directories:
for name in list(self._directories[path]):
self.joinpath(name)._delete()
del self._directories[path]
else:
raise FileNotFoundError(errno.ENOENT, "File not found", path)
parent = str(self.parent)
self._directories[parent].remove(self.name)
class DummyReadablePathTest(DummyJoinablePathTest):
"""Tests for ReadablePathTest methods that use stat(), open() and iterdir()."""
class DummyPathTest(DummyPurePathTest):
"""Tests for PathBase methods that use stat(), open() and iterdir()."""
cls = DummyPath
cls = DummyReadablePath
can_symlink = False
# (self.base)
@ -1138,213 +1130,6 @@ def test_open_common(self):
self.assertIsInstance(f, io.BufferedIOBase)
self.assertEqual(f.read().strip(), b"this is file A")
def test_read_write_bytes(self):
p = self.cls(self.base)
(p / 'fileA').write_bytes(b'abcdefg')
self.assertEqual((p / 'fileA').read_bytes(), b'abcdefg')
# Check that trying to write str does not truncate the file.
self.assertRaises(TypeError, (p / 'fileA').write_bytes, 'somestr')
self.assertEqual((p / 'fileA').read_bytes(), b'abcdefg')
def test_read_write_text(self):
p = self.cls(self.base)
(p / 'fileA').write_text('äbcdefg', encoding='latin-1')
self.assertEqual((p / 'fileA').read_text(
encoding='utf-8', errors='ignore'), 'bcdefg')
# Check that trying to write bytes does not truncate the file.
self.assertRaises(TypeError, (p / 'fileA').write_text, b'somebytes')
self.assertEqual((p / 'fileA').read_text(encoding='latin-1'), 'äbcdefg')
def test_read_text_with_newlines(self):
p = self.cls(self.base)
# Check that `\n` character change nothing
(p / 'fileA').write_bytes(b'abcde\r\nfghlk\n\rmnopq')
self.assertEqual((p / 'fileA').read_text(newline='\n'),
'abcde\r\nfghlk\n\rmnopq')
# Check that `\r` character replaces `\n`
(p / 'fileA').write_bytes(b'abcde\r\nfghlk\n\rmnopq')
self.assertEqual((p / 'fileA').read_text(newline='\r'),
'abcde\r\nfghlk\n\rmnopq')
# Check that `\r\n` character replaces `\n`
(p / 'fileA').write_bytes(b'abcde\r\nfghlk\n\rmnopq')
self.assertEqual((p / 'fileA').read_text(newline='\r\n'),
'abcde\r\nfghlk\n\rmnopq')
def test_write_text_with_newlines(self):
p = self.cls(self.base)
# Check that `\n` character change nothing
(p / 'fileA').write_text('abcde\r\nfghlk\n\rmnopq', newline='\n')
self.assertEqual((p / 'fileA').read_bytes(),
b'abcde\r\nfghlk\n\rmnopq')
# Check that `\r` character replaces `\n`
(p / 'fileA').write_text('abcde\r\nfghlk\n\rmnopq', newline='\r')
self.assertEqual((p / 'fileA').read_bytes(),
b'abcde\r\rfghlk\r\rmnopq')
# Check that `\r\n` character replaces `\n`
(p / 'fileA').write_text('abcde\r\nfghlk\n\rmnopq', newline='\r\n')
self.assertEqual((p / 'fileA').read_bytes(),
b'abcde\r\r\nfghlk\r\n\rmnopq')
# Check that no argument passed will change `\n` to `os.linesep`
os_linesep_byte = bytes(os.linesep, encoding='ascii')
(p / 'fileA').write_text('abcde\nfghlk\n\rmnopq')
self.assertEqual((p / 'fileA').read_bytes(),
b'abcde' + os_linesep_byte + b'fghlk' + os_linesep_byte + b'\rmnopq')
def test_copy_file(self):
base = self.cls(self.base)
source = base / 'fileA'
target = base / 'copyA'
result = source.copy(target)
self.assertEqual(result, target)
self.assertTrue(target.exists())
self.assertEqual(source.read_text(), target.read_text())
def test_copy_file_to_existing_file(self):
base = self.cls(self.base)
source = base / 'fileA'
target = base / 'dirB' / 'fileB'
result = source.copy(target)
self.assertEqual(result, target)
self.assertTrue(target.exists())
self.assertEqual(source.read_text(), target.read_text())
def test_copy_file_to_existing_directory(self):
base = self.cls(self.base)
source = base / 'fileA'
target = base / 'dirA'
self.assertRaises(OSError, source.copy, target)
def test_copy_file_empty(self):
base = self.cls(self.base)
source = base / 'empty'
target = base / 'copyA'
source.write_bytes(b'')
result = source.copy(target)
self.assertEqual(result, target)
self.assertTrue(target.exists())
self.assertEqual(target.read_bytes(), b'')
def test_copy_file_to_itself(self):
base = self.cls(self.base)
source = base / 'empty'
source.write_bytes(b'')
self.assertRaises(OSError, source.copy, source)
self.assertRaises(OSError, source.copy, source, follow_symlinks=False)
def test_copy_dir_simple(self):
base = self.cls(self.base)
source = base / 'dirC'
target = base / 'copyC'
result = source.copy(target)
self.assertEqual(result, target)
self.assertTrue(target.is_dir())
self.assertTrue(target.joinpath('dirD').is_dir())
self.assertTrue(target.joinpath('dirD', 'fileD').is_file())
self.assertEqual(target.joinpath('dirD', 'fileD').read_text(),
"this is file D\n")
self.assertTrue(target.joinpath('fileC').is_file())
self.assertTrue(target.joinpath('fileC').read_text(),
"this is file C\n")
def test_copy_dir_complex(self, follow_symlinks=True):
def ordered_walk(path):
for dirpath, dirnames, filenames in path.walk(follow_symlinks=follow_symlinks):
dirnames.sort()
filenames.sort()
yield dirpath, dirnames, filenames
base = self.cls(self.base)
source = base / 'dirC'
if self.can_symlink:
# Add some symlinks
source.joinpath('linkC').symlink_to('fileC')
source.joinpath('linkD').symlink_to('dirD', target_is_directory=True)
# Perform the copy
target = base / 'copyC'
result = source.copy(target, follow_symlinks=follow_symlinks)
self.assertEqual(result, target)
# Compare the source and target trees
source_walk = ordered_walk(source)
target_walk = ordered_walk(target)
for source_item, target_item in zip(source_walk, target_walk, strict=True):
self.assertEqual(source_item[0].parts[len(source.parts):],
target_item[0].parts[len(target.parts):]) # dirpath
self.assertEqual(source_item[1], target_item[1]) # dirnames
self.assertEqual(source_item[2], target_item[2]) # filenames
# Compare files and symlinks
for filename in source_item[2]:
source_file = source_item[0].joinpath(filename)
target_file = target_item[0].joinpath(filename)
if follow_symlinks or not source_file.is_symlink():
# Regular file.
self.assertEqual(source_file.read_bytes(), target_file.read_bytes())
elif source_file.is_dir():
# Symlink to directory.
self.assertTrue(target_file.is_dir())
self.assertEqual(source_file.readlink(), target_file.readlink())
else:
# Symlink to file.
self.assertEqual(source_file.read_bytes(), target_file.read_bytes())
self.assertEqual(source_file.readlink(), target_file.readlink())
def test_copy_dir_complex_follow_symlinks_false(self):
self.test_copy_dir_complex(follow_symlinks=False)
def test_copy_dir_to_existing_directory(self):
base = self.cls(self.base)
source = base / 'dirC'
target = base / 'copyC'
target.mkdir()
target.joinpath('dirD').mkdir()
self.assertRaises(FileExistsError, source.copy, target)
def test_copy_dir_to_existing_directory_dirs_exist_ok(self):
base = self.cls(self.base)
source = base / 'dirC'
target = base / 'copyC'
target.mkdir()
target.joinpath('dirD').mkdir()
result = source.copy(target, dirs_exist_ok=True)
self.assertEqual(result, target)
self.assertTrue(target.is_dir())
self.assertTrue(target.joinpath('dirD').is_dir())
self.assertTrue(target.joinpath('dirD', 'fileD').is_file())
self.assertEqual(target.joinpath('dirD', 'fileD').read_text(),
"this is file D\n")
self.assertTrue(target.joinpath('fileC').is_file())
self.assertTrue(target.joinpath('fileC').read_text(),
"this is file C\n")
def test_copy_dir_to_itself(self):
base = self.cls(self.base)
source = base / 'dirC'
self.assertRaises(OSError, source.copy, source)
self.assertRaises(OSError, source.copy, source, follow_symlinks=False)
def test_copy_dir_into_itself(self):
base = self.cls(self.base)
source = base / 'dirC'
target = base / 'dirC' / 'dirD' / 'copyC'
self.assertRaises(OSError, source.copy, target)
self.assertRaises(OSError, source.copy, target, follow_symlinks=False)
self.assertFalse(target.exists())
def test_copy_into(self):
base = self.cls(self.base)
source = base / 'fileA'
target_dir = base / 'dirA'
result = source.copy_into(target_dir)
self.assertEqual(result, target_dir / 'fileA')
self.assertTrue(result.exists())
self.assertEqual(source.read_text(), result.read_text())
def test_copy_into_empty_name(self):
source = self.cls('')
target_dir = self.base
self.assertRaises(ValueError, source.copy_into, target_dir)
def test_iterdir(self):
P = self.cls
p = P(self.base)
@ -1574,9 +1359,220 @@ def test_is_symlink(self):
self.assertIs((P / 'linkA\x00').is_file(), False)
class DummyPathWalkTest(unittest.TestCase):
cls = DummyPath
base = DummyPathTest.base
class DummyWritablePathTest(DummyReadablePathTest):
cls = DummyWritablePath
def test_read_write_bytes(self):
p = self.cls(self.base)
(p / 'fileA').write_bytes(b'abcdefg')
self.assertEqual((p / 'fileA').read_bytes(), b'abcdefg')
# Check that trying to write str does not truncate the file.
self.assertRaises(TypeError, (p / 'fileA').write_bytes, 'somestr')
self.assertEqual((p / 'fileA').read_bytes(), b'abcdefg')
def test_read_write_text(self):
p = self.cls(self.base)
(p / 'fileA').write_text('äbcdefg', encoding='latin-1')
self.assertEqual((p / 'fileA').read_text(
encoding='utf-8', errors='ignore'), 'bcdefg')
# Check that trying to write bytes does not truncate the file.
self.assertRaises(TypeError, (p / 'fileA').write_text, b'somebytes')
self.assertEqual((p / 'fileA').read_text(encoding='latin-1'), 'äbcdefg')
def test_read_text_with_newlines(self):
p = self.cls(self.base)
# Check that `\n` character change nothing
(p / 'fileA').write_bytes(b'abcde\r\nfghlk\n\rmnopq')
self.assertEqual((p / 'fileA').read_text(newline='\n'),
'abcde\r\nfghlk\n\rmnopq')
# Check that `\r` character replaces `\n`
(p / 'fileA').write_bytes(b'abcde\r\nfghlk\n\rmnopq')
self.assertEqual((p / 'fileA').read_text(newline='\r'),
'abcde\r\nfghlk\n\rmnopq')
# Check that `\r\n` character replaces `\n`
(p / 'fileA').write_bytes(b'abcde\r\nfghlk\n\rmnopq')
self.assertEqual((p / 'fileA').read_text(newline='\r\n'),
'abcde\r\nfghlk\n\rmnopq')
def test_write_text_with_newlines(self):
p = self.cls(self.base)
# Check that `\n` character change nothing
(p / 'fileA').write_text('abcde\r\nfghlk\n\rmnopq', newline='\n')
self.assertEqual((p / 'fileA').read_bytes(),
b'abcde\r\nfghlk\n\rmnopq')
# Check that `\r` character replaces `\n`
(p / 'fileA').write_text('abcde\r\nfghlk\n\rmnopq', newline='\r')
self.assertEqual((p / 'fileA').read_bytes(),
b'abcde\r\rfghlk\r\rmnopq')
# Check that `\r\n` character replaces `\n`
(p / 'fileA').write_text('abcde\r\nfghlk\n\rmnopq', newline='\r\n')
self.assertEqual((p / 'fileA').read_bytes(),
b'abcde\r\r\nfghlk\r\n\rmnopq')
# Check that no argument passed will change `\n` to `os.linesep`
os_linesep_byte = bytes(os.linesep, encoding='ascii')
(p / 'fileA').write_text('abcde\nfghlk\n\rmnopq')
self.assertEqual((p / 'fileA').read_bytes(),
b'abcde' + os_linesep_byte + b'fghlk' + os_linesep_byte + b'\rmnopq')
def test_copy_file(self):
base = self.cls(self.base)
source = base / 'fileA'
target = base / 'copyA'
result = source.copy(target)
self.assertEqual(result, target)
self.assertTrue(target.exists())
self.assertEqual(source.read_text(), target.read_text())
def test_copy_file_to_existing_file(self):
base = self.cls(self.base)
source = base / 'fileA'
target = base / 'dirB' / 'fileB'
result = source.copy(target)
self.assertEqual(result, target)
self.assertTrue(target.exists())
self.assertEqual(source.read_text(), target.read_text())
def test_copy_file_to_existing_directory(self):
base = self.cls(self.base)
source = base / 'fileA'
target = base / 'dirA'
self.assertRaises(OSError, source.copy, target)
def test_copy_file_empty(self):
base = self.cls(self.base)
source = base / 'empty'
target = base / 'copyA'
source.write_bytes(b'')
result = source.copy(target)
self.assertEqual(result, target)
self.assertTrue(target.exists())
self.assertEqual(target.read_bytes(), b'')
def test_copy_file_to_itself(self):
base = self.cls(self.base)
source = base / 'empty'
source.write_bytes(b'')
self.assertRaises(OSError, source.copy, source)
self.assertRaises(OSError, source.copy, source, follow_symlinks=False)
def test_copy_dir_simple(self):
base = self.cls(self.base)
source = base / 'dirC'
target = base / 'copyC'
result = source.copy(target)
self.assertEqual(result, target)
self.assertTrue(target.is_dir())
self.assertTrue(target.joinpath('dirD').is_dir())
self.assertTrue(target.joinpath('dirD', 'fileD').is_file())
self.assertEqual(target.joinpath('dirD', 'fileD').read_text(),
"this is file D\n")
self.assertTrue(target.joinpath('fileC').is_file())
self.assertTrue(target.joinpath('fileC').read_text(),
"this is file C\n")
def test_copy_dir_complex(self, follow_symlinks=True):
def ordered_walk(path):
for dirpath, dirnames, filenames in path.walk(follow_symlinks=follow_symlinks):
dirnames.sort()
filenames.sort()
yield dirpath, dirnames, filenames
base = self.cls(self.base)
source = base / 'dirC'
if self.can_symlink:
# Add some symlinks
source.joinpath('linkC').symlink_to('fileC')
source.joinpath('linkD').symlink_to('dirD', target_is_directory=True)
# Perform the copy
target = base / 'copyC'
result = source.copy(target, follow_symlinks=follow_symlinks)
self.assertEqual(result, target)
# Compare the source and target trees
source_walk = ordered_walk(source)
target_walk = ordered_walk(target)
for source_item, target_item in zip(source_walk, target_walk, strict=True):
self.assertEqual(source_item[0].parts[len(source.parts):],
target_item[0].parts[len(target.parts):]) # dirpath
self.assertEqual(source_item[1], target_item[1]) # dirnames
self.assertEqual(source_item[2], target_item[2]) # filenames
# Compare files and symlinks
for filename in source_item[2]:
source_file = source_item[0].joinpath(filename)
target_file = target_item[0].joinpath(filename)
if follow_symlinks or not source_file.is_symlink():
# Regular file.
self.assertEqual(source_file.read_bytes(), target_file.read_bytes())
elif source_file.is_dir():
# Symlink to directory.
self.assertTrue(target_file.is_dir())
self.assertEqual(source_file.readlink(), target_file.readlink())
else:
# Symlink to file.
self.assertEqual(source_file.read_bytes(), target_file.read_bytes())
self.assertEqual(source_file.readlink(), target_file.readlink())
def test_copy_dir_complex_follow_symlinks_false(self):
self.test_copy_dir_complex(follow_symlinks=False)
def test_copy_dir_to_existing_directory(self):
base = self.cls(self.base)
source = base / 'dirC'
target = base / 'copyC'
target.mkdir()
target.joinpath('dirD').mkdir()
self.assertRaises(FileExistsError, source.copy, target)
def test_copy_dir_to_existing_directory_dirs_exist_ok(self):
base = self.cls(self.base)
source = base / 'dirC'
target = base / 'copyC'
target.mkdir()
target.joinpath('dirD').mkdir()
result = source.copy(target, dirs_exist_ok=True)
self.assertEqual(result, target)
self.assertTrue(target.is_dir())
self.assertTrue(target.joinpath('dirD').is_dir())
self.assertTrue(target.joinpath('dirD', 'fileD').is_file())
self.assertEqual(target.joinpath('dirD', 'fileD').read_text(),
"this is file D\n")
self.assertTrue(target.joinpath('fileC').is_file())
self.assertTrue(target.joinpath('fileC').read_text(),
"this is file C\n")
def test_copy_dir_to_itself(self):
base = self.cls(self.base)
source = base / 'dirC'
self.assertRaises(OSError, source.copy, source)
self.assertRaises(OSError, source.copy, source, follow_symlinks=False)
def test_copy_dir_into_itself(self):
base = self.cls(self.base)
source = base / 'dirC'
target = base / 'dirC' / 'dirD' / 'copyC'
self.assertRaises(OSError, source.copy, target)
self.assertRaises(OSError, source.copy, target, follow_symlinks=False)
self.assertFalse(target.exists())
def test_copy_into(self):
base = self.cls(self.base)
source = base / 'fileA'
target_dir = base / 'dirA'
result = source.copy_into(target_dir)
self.assertEqual(result, target_dir / 'fileA')
self.assertTrue(result.exists())
self.assertEqual(source.read_text(), result.read_text())
def test_copy_into_empty_name(self):
source = self.cls('')
target_dir = self.base
self.assertRaises(ValueError, source.copy_into, target_dir)
class DummyReadablePathWalkTest(unittest.TestCase):
cls = DummyReadablePath
base = DummyReadablePathTest.base
can_symlink = False
def setUp(self):