[3.14] gh-75572: Forward-port test_xpickle from Python 2 to 3 (GH-22452) (GH-143485)

Move data classes used in tests to separate file test_picklecommon.py,
so it can be imported in old Python versions.
(cherry picked from commit 8735daf3e8)

Co-authored-by: Ken Jin <kenjin@python.org>
Co-authored-by: Serhiy Storchaka <storchaka@gmail.com>
This commit is contained in:
Miss Islington (bot) 2026-01-06 18:28:56 +01:00 committed by GitHub
parent 22a99ca8e1
commit 98128451ea
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 1031 additions and 514 deletions

View file

@ -130,6 +130,10 @@
tzdata - Run tests that require timezone data.
xpickle - Test pickle and _pickle against Python 3.6, 3.7, 3.8
and 3.9 to test backwards compatibility. These tests
may take very long to complete.
To enable all resources except one, use '-uall,-<resource>'. For
example, to run all the tests except for the gui tests, give the
option '-uall,-gui'.

View file

@ -41,7 +41,7 @@
# - tzdata: while needed to validate fully test_datetime, it makes
# test_datetime too slow (15-20 min on some buildbots) and so is disabled by
# default (see bpo-30822).
RESOURCE_NAMES = ALL_RESOURCES + ('extralargefile', 'tzdata')
RESOURCE_NAMES = ALL_RESOURCES + ('extralargefile', 'tzdata', 'xpickle')
# Types for types hints

353
Lib/test/picklecommon.py Normal file
View file

@ -0,0 +1,353 @@
# Classes used for pickle testing.
# They are moved to separate file, so they can be loaded
# in other Python version for test_xpickle.
import sys
class C:
def __eq__(self, other):
return self.__dict__ == other.__dict__
# For test_load_classic_instance
class D(C):
def __init__(self, arg):
pass
class E(C):
def __getinitargs__(self):
return ()
import __main__
__main__.C = C
C.__module__ = "__main__"
__main__.D = D
D.__module__ = "__main__"
__main__.E = E
E.__module__ = "__main__"
# Simple mutable object.
class Object:
pass
# Hashable immutable key object containing unheshable mutable data.
class K:
def __init__(self, value):
self.value = value
def __reduce__(self):
# Shouldn't support the recursion itself
return K, (self.value,)
# For test_misc
class myint(int):
def __init__(self, x):
self.str = str(x)
# For test_misc and test_getinitargs
class initarg(C):
def __init__(self, a, b):
self.a = a
self.b = b
def __getinitargs__(self):
return self.a, self.b
# For test_metaclass
class metaclass(type):
pass
if sys.version_info >= (3,):
# Syntax not compatible with Python 2
exec('''
class use_metaclass(object, metaclass=metaclass):
pass
''')
else:
class use_metaclass(object):
__metaclass__ = metaclass
# Test classes for reduce_ex
class R:
def __init__(self, reduce=None):
self.reduce = reduce
def __reduce__(self, proto):
return self.reduce
class REX:
def __init__(self, reduce_ex=None):
self.reduce_ex = reduce_ex
def __reduce_ex__(self, proto):
return self.reduce_ex
class REX_one(object):
"""No __reduce_ex__ here, but inheriting it from object"""
_reduce_called = 0
def __reduce__(self):
self._reduce_called = 1
return REX_one, ()
class REX_two(object):
"""No __reduce__ here, but inheriting it from object"""
_proto = None
def __reduce_ex__(self, proto):
self._proto = proto
return REX_two, ()
class REX_three(object):
_proto = None
def __reduce_ex__(self, proto):
self._proto = proto
return REX_two, ()
def __reduce__(self):
raise AssertionError("This __reduce__ shouldn't be called")
class REX_four(object):
"""Calling base class method should succeed"""
_proto = None
def __reduce_ex__(self, proto):
self._proto = proto
return object.__reduce_ex__(self, proto)
class REX_five(object):
"""This one used to fail with infinite recursion"""
_reduce_called = 0
def __reduce__(self):
self._reduce_called = 1
return object.__reduce__(self)
class REX_six(object):
"""This class is used to check the 4th argument (list iterator) of
the reduce protocol.
"""
def __init__(self, items=None):
self.items = items if items is not None else []
def __eq__(self, other):
return type(self) is type(other) and self.items == other.items
def append(self, item):
self.items.append(item)
def __reduce__(self):
return type(self), (), None, iter(self.items), None
class REX_seven(object):
"""This class is used to check the 5th argument (dict iterator) of
the reduce protocol.
"""
def __init__(self, table=None):
self.table = table if table is not None else {}
def __eq__(self, other):
return type(self) is type(other) and self.table == other.table
def __setitem__(self, key, value):
self.table[key] = value
def __reduce__(self):
return type(self), (), None, None, iter(self.table.items())
class REX_state(object):
"""This class is used to check the 3th argument (state) of
the reduce protocol.
"""
def __init__(self, state=None):
self.state = state
def __eq__(self, other):
return type(self) is type(other) and self.state == other.state
def __setstate__(self, state):
self.state = state
def __reduce__(self):
return type(self), (), self.state
# For test_reduce_ex_None
class REX_None:
""" Setting __reduce_ex__ to None should fail """
__reduce_ex__ = None
# For test_reduce_None
class R_None:
""" Setting __reduce__ to None should fail """
__reduce__ = None
# For test_pickle_setstate_None
class C_None_setstate:
""" Setting __setstate__ to None should fail """
def __getstate__(self):
return 1
__setstate__ = None
# Test classes for newobj
# For test_newobj_generic and test_newobj_proxies
class MyInt(int):
sample = 1
if sys.version_info >= (3,):
class MyLong(int):
sample = 1
else:
class MyLong(long):
sample = long(1)
class MyFloat(float):
sample = 1.0
class MyComplex(complex):
sample = 1.0 + 0.0j
class MyStr(str):
sample = "hello"
if sys.version_info >= (3,):
class MyUnicode(str):
sample = "hello \u1234"
else:
class MyUnicode(unicode):
sample = unicode(r"hello \u1234", "raw-unicode-escape")
class MyTuple(tuple):
sample = (1, 2, 3)
class MyList(list):
sample = [1, 2, 3]
class MyDict(dict):
sample = {"a": 1, "b": 2}
class MySet(set):
sample = {"a", "b"}
class MyFrozenSet(frozenset):
sample = frozenset({"a", "b"})
myclasses = [MyInt, MyLong, MyFloat,
MyComplex,
MyStr, MyUnicode,
MyTuple, MyList, MyDict, MySet, MyFrozenSet]
# For test_newobj_overridden_new
class MyIntWithNew(int):
def __new__(cls, value):
raise AssertionError
class MyIntWithNew2(MyIntWithNew):
__new__ = int.__new__
# For test_newobj_list_slots
class SlotList(MyList):
__slots__ = ["foo"]
# Ruff "redefined while unused" false positive here due to `global` variables
# being assigned (and then restored) from within test methods earlier in the file
class SimpleNewObj(int): # noqa: F811
def __init__(self, *args, **kwargs):
# raise an error, to make sure this isn't called
raise TypeError("SimpleNewObj.__init__() didn't expect to get called")
def __eq__(self, other):
return int(self) == int(other) and self.__dict__ == other.__dict__
class ComplexNewObj(SimpleNewObj):
def __getnewargs__(self):
return ('%X' % self, 16)
class ComplexNewObjEx(SimpleNewObj):
def __getnewargs_ex__(self):
return ('%X' % self,), {'base': 16}
class ZeroCopyBytes(bytes):
readonly = True
c_contiguous = True
f_contiguous = True
zero_copy_reconstruct = True
def __reduce_ex__(self, protocol):
if protocol >= 5:
import pickle
return type(self)._reconstruct, (pickle.PickleBuffer(self),), None
else:
return type(self)._reconstruct, (bytes(self),)
def __repr__(self):
return "{}({!r})".format(self.__class__.__name__, bytes(self))
__str__ = __repr__
@classmethod
def _reconstruct(cls, obj):
with memoryview(obj) as m:
obj = m.obj
if type(obj) is cls:
# Zero-copy
return obj
else:
return cls(obj)
class ZeroCopyBytearray(bytearray):
readonly = False
c_contiguous = True
f_contiguous = True
zero_copy_reconstruct = True
def __reduce_ex__(self, protocol):
if protocol >= 5:
import pickle
return type(self)._reconstruct, (pickle.PickleBuffer(self),), None
else:
return type(self)._reconstruct, (bytes(self),)
def __repr__(self):
return "{}({!r})".format(self.__class__.__name__, bytes(self))
__str__ = __repr__
@classmethod
def _reconstruct(cls, obj):
with memoryview(obj) as m:
obj = m.obj
if type(obj) is cls:
# Zero-copy
return obj
else:
return cls(obj)
# For test_nested_names
class Nested:
class A:
class B:
class C:
pass
# For test_py_methods
class PyMethodsTest:
@staticmethod
def cheese():
return "cheese"
@classmethod
def wine(cls):
assert cls is PyMethodsTest
return "wine"
def biscuits(self):
assert isinstance(self, PyMethodsTest)
return "biscuits"
class Nested:
"Nested class"
@staticmethod
def ketchup():
return "ketchup"
@classmethod
def maple(cls):
assert cls is PyMethodsTest.Nested
return "maple"
def pie(self):
assert isinstance(self, PyMethodsTest.Nested)
return "pie"
# For test_c_methods
class Subclass(tuple):
class Nested(str):
pass

File diff suppressed because it is too large Load diff

240
Lib/test/test_xpickle.py Normal file
View file

@ -0,0 +1,240 @@
# This test covers backwards compatibility with previous versions of Python
# by bouncing pickled objects through Python versions by running xpickle_worker.py.
import io
import os
import pickle
import subprocess
import sys
import unittest
from test import support
from test import pickletester
try:
import _pickle
has_c_implementation = True
except ModuleNotFoundError:
has_c_implementation = False
is_windows = sys.platform.startswith('win')
# Map python version to a tuple containing the name of a corresponding valid
# Python binary to execute and its arguments.
py_executable_map = {}
protocols_map = {
3: (3, 0),
4: (3, 4),
5: (3, 8),
}
def highest_proto_for_py_version(py_version):
"""Finds the highest supported pickle protocol for a given Python version.
Args:
py_version: a 2-tuple of the major, minor version. Eg. Python 3.7 would
be (3, 7)
Returns:
int for the highest supported pickle protocol
"""
proto = 2
for p, v in protocols_map.items():
if py_version < v:
break
proto = p
return proto
def have_python_version(py_version):
"""Check whether a Python binary exists for the given py_version and has
support. This respects your PATH.
For Windows, it will first try to use the py launcher specified in PEP 397.
Otherwise (and for all other platforms), it will attempt to check for
python<py_version[0]>.<py_version[1]>.
Eg. given a *py_version* of (3, 7), the function will attempt to try
'py -3.7' (for Windows) first, then 'python3.7', and return
['py', '-3.7'] (on Windows) or ['python3.7'] on other platforms.
Args:
py_version: a 2-tuple of the major, minor version. Eg. python 3.7 would
be (3, 7)
Returns:
List/Tuple containing the Python binary name and its required arguments,
or None if no valid binary names found.
"""
python_str = ".".join(map(str, py_version))
targets = [('py', f'-{python_str}'), (f'python{python_str}',)]
if py_version not in py_executable_map:
for target in targets[0 if is_windows else 1:]:
try:
worker = subprocess.Popen([*target, '-c', 'pass'],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
shell=is_windows)
worker.communicate()
if worker.returncode == 0:
py_executable_map[py_version] = target
break
except FileNotFoundError:
pass
return py_executable_map.get(py_version, None)
@support.requires_resource('cpu')
class AbstractCompatTests(pickletester.AbstractPickleTests):
py_version = None
@classmethod
def setUpClass(cls):
assert cls.py_version is not None, 'Needs a python version tuple'
if not have_python_version(cls.py_version):
py_version_str = ".".join(map(str, cls.py_version))
raise unittest.SkipTest(f'Python {py_version_str} not available')
# Override the default pickle protocol to match what xpickle worker
# will be running.
highest_protocol = highest_proto_for_py_version(cls.py_version)
cls.enterClassContext(support.swap_attr(pickletester, 'protocols',
range(highest_protocol + 1)))
cls.enterClassContext(support.swap_attr(pickle, 'HIGHEST_PROTOCOL',
highest_protocol))
@staticmethod
def send_to_worker(python, data):
"""Bounce a pickled object through another version of Python.
This will send data to a child process where it will
be unpickled, then repickled and sent back to the parent process.
Args:
python: list containing the python binary to start and its arguments
data: bytes object to send to the child process
Returns:
The pickled data received from the child process.
"""
target = os.path.join(os.path.dirname(__file__), 'xpickle_worker.py')
worker = subprocess.Popen([*python, target],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
# For windows bpo-17023.
shell=is_windows)
stdout, stderr = worker.communicate(data)
if worker.returncode == 0:
return stdout
# if the worker fails, it will write the exception to stdout
try:
exception = pickle.loads(stdout)
except (pickle.UnpicklingError, EOFError):
raise RuntimeError(stderr)
else:
if support.verbose > 1:
print()
print(f'{data = }')
print(f'{stdout = }')
print(f'{stderr = }')
if isinstance(exception, Exception):
# To allow for tests which test for errors.
raise exception
else:
raise RuntimeError(stderr)
def dumps(self, arg, proto=0, **kwargs):
# Skip tests that require buffer_callback arguments since
# there isn't a reliable way to marshal/pickle the callback and ensure
# it works in a different Python version.
if 'buffer_callback' in kwargs:
self.skipTest('Test does not support "buffer_callback" argument.')
f = io.BytesIO()
p = self.pickler(f, proto, **kwargs)
p.dump((proto, arg))
f.seek(0)
data = bytes(f.read())
python = py_executable_map[self.py_version]
return self.send_to_worker(python, data)
def loads(self, buf, **kwds):
f = io.BytesIO(buf)
u = self.unpickler(f, **kwds)
return u.load()
# A scaled-down version of test_bytes from pickletester, to reduce
# the number of calls to self.dumps() and hence reduce the number of
# child python processes forked. This allows the test to complete
# much faster (the one from pickletester takes 3-4 minutes when running
# under text_xpickle).
def test_bytes(self):
if self.py_version < (3, 0):
self.skipTest('not supported in Python < 3.0')
for proto in pickletester.protocols:
for s in b'', b'xyz', b'xyz'*100:
p = self.dumps(s, proto)
self.assert_is_copy(s, self.loads(p))
s = bytes(range(256))
p = self.dumps(s, proto)
self.assert_is_copy(s, self.loads(p))
s = bytes([i for i in range(256) for _ in range(2)])
p = self.dumps(s, proto)
self.assert_is_copy(s, self.loads(p))
# These tests are disabled because they require some special setup
# on the worker that's hard to keep in sync.
test_global_ext1 = None
test_global_ext2 = None
test_global_ext4 = None
# These tests fail because they require classes from pickletester
# which cannot be properly imported by the xpickle worker.
test_recursive_nested_names = None
test_recursive_nested_names2 = None
# Attribute lookup problems are expected, disable the test
test_dynamic_class = None
test_evil_class_mutating_dict = None
# Expected exception is raised during unpickling in a subprocess.
test_pickle_setstate_None = None
# Other Python version may not have NumPy.
test_buffers_numpy = None
# Skip tests that require buffer_callback arguments since
# there isn't a reliable way to marshal/pickle the callback and ensure
# it works in a different Python version.
test_in_band_buffers = None
test_buffers_error = None
test_oob_buffers = None
test_oob_buffers_writable_to_readonly = None
class PyPicklePythonCompat(AbstractCompatTests):
pickler = pickle._Pickler
unpickler = pickle._Unpickler
if has_c_implementation:
class CPicklePythonCompat(AbstractCompatTests):
pickler = _pickle.Pickler
unpickler = _pickle.Unpickler
def make_test(py_version, base):
class_dict = {'py_version': py_version}
name = base.__name__.replace('Python', 'Python%d%d' % py_version)
return type(name, (base, unittest.TestCase), class_dict)
def load_tests(loader, tests, pattern):
def add_tests(py_version):
test_class = make_test(py_version, PyPicklePythonCompat)
tests.addTest(loader.loadTestsFromTestCase(test_class))
if has_c_implementation:
test_class = make_test(py_version, CPicklePythonCompat)
tests.addTest(loader.loadTestsFromTestCase(test_class))
major = sys.version_info.major
assert major == 3
add_tests((2, 7))
for minor in range(2, sys.version_info.minor):
add_tests((major, minor))
return tests
if __name__ == '__main__':
unittest.main()

View file

@ -0,0 +1,38 @@
# This script is called by test_xpickle as a subprocess to load and dump
# pickles in a different Python version.
import os
import pickle
import sys
# This allows the xpickle worker to import picklecommon.py, which it needs
# since some of the pickle objects hold references to picklecommon.py.
test_mod_path = os.path.abspath(os.path.join(os.path.dirname(__file__),
'picklecommon.py'))
if sys.version_info >= (3, 5):
import importlib.util
spec = importlib.util.spec_from_file_location('test.picklecommon', test_mod_path)
test_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(test_module)
sys.modules['test.picklecommon'] = test_module
else:
test_module = type(sys)('test.picklecommon')
sys.modules['test.picklecommon'] = test_module
sys.modules['test'] = type(sys)('test')
with open(test_mod_path, 'rb') as f:
sources = f.read()
exec(sources, vars(test_module))
in_stream = getattr(sys.stdin, 'buffer', sys.stdin)
out_stream = getattr(sys.stdout, 'buffer', sys.stdout)
try:
message = pickle.load(in_stream)
protocol, obj = message
pickle.dump(obj, out_stream, protocol)
except Exception as e:
# dump the exception to stdout and write to stderr, then exit
pickle.dump(e, out_stream)
sys.stderr.write(repr(e))
sys.exit(1)

View file

@ -0,0 +1,2 @@
Forward-port test_xpickle from Python 2 to Python 3 and add the resource
back to test's command line.