This commit is contained in:
Hugo van Kemenade 2025-11-19 11:42:45 +02:00
commit 5d1f8f2d03
42 changed files with 4299 additions and 3504 deletions

View file

@ -2,6 +2,10 @@ repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.13.2
hooks:
- id: ruff-check
name: Run Ruff (lint) on Apple/
args: [--exit-non-zero-on-fix, --config=Apple/.ruff.toml]
files: ^Apple/
- id: ruff-check
name: Run Ruff (lint) on Doc/
args: [--exit-non-zero-on-fix]
@ -30,6 +34,10 @@ repos:
name: Run Ruff (lint) on Tools/wasm/
args: [--exit-non-zero-on-fix, --config=Tools/wasm/.ruff.toml]
files: ^Tools/wasm/
- id: ruff-format
name: Run Ruff (format) on Apple/
args: [--exit-non-zero-on-fix, --config=Apple/.ruff.toml]
files: ^Apple
- id: ruff-format
name: Run Ruff (format) on Doc/
args: [--check]

22
Apple/.ruff.toml Normal file
View file

@ -0,0 +1,22 @@
extend = "../.ruff.toml" # Inherit the project-wide settings
[format]
preview = true
docstring-code-format = true
[lint]
select = [
"C4", # flake8-comprehensions
"E", # pycodestyle
"F", # pyflakes
"I", # isort
"ISC", # flake8-implicit-str-concat
"LOG", # flake8-logging
"PGH", # pygrep-hooks
"PT", # flake8-pytest-style
"PYI", # flake8-pyi
"RUF100", # Ban unused `# noqa` comments
"UP", # pyupgrade
"W", # pycodestyle
"YTT", # flake8-2020
]

View file

@ -46,13 +46,12 @@
import sys
import sysconfig
import time
from collections.abc import Sequence
from collections.abc import Callable, Sequence
from contextlib import contextmanager
from datetime import datetime, timezone
from os.path import basename, relpath
from pathlib import Path
from subprocess import CalledProcessError
from typing import Callable
EnvironmentT = dict[str, str]
ArgsT = Sequence[str | Path]
@ -140,17 +139,15 @@ def print_env(env: EnvironmentT) -> None:
def apple_env(host: str) -> EnvironmentT:
"""Construct an Apple development environment for the given host."""
env = {
"PATH": ":".join(
[
str(PYTHON_DIR / "Apple/iOS/Resources/bin"),
str(subdir(host) / "prefix"),
"/usr/bin",
"/bin",
"/usr/sbin",
"/sbin",
"/Library/Apple/usr/bin",
]
),
"PATH": ":".join([
str(PYTHON_DIR / "Apple/iOS/Resources/bin"),
str(subdir(host) / "prefix"),
"/usr/bin",
"/bin",
"/usr/sbin",
"/sbin",
"/Library/Apple/usr/bin",
]),
}
return env
@ -196,14 +193,10 @@ def clean(context: argparse.Namespace, target: str = "all") -> None:
paths.append(target)
if target in {"all", "hosts", "test"}:
paths.extend(
[
path.name
for path in CROSS_BUILD_DIR.glob(
f"{context.platform}-testbed.*"
)
]
)
paths.extend([
path.name
for path in CROSS_BUILD_DIR.glob(f"{context.platform}-testbed.*")
])
for path in paths:
delete_path(path)
@ -352,18 +345,16 @@ def download(url: str, target_dir: Path) -> Path:
out_path = target_path / basename(url)
if not Path(out_path).is_file():
run(
[
"curl",
"-Lf",
"--retry",
"5",
"--retry-all-errors",
"-o",
out_path,
url,
]
)
run([
"curl",
"-Lf",
"--retry",
"5",
"--retry-all-errors",
"-o",
out_path,
url,
])
else:
print(f"Using cached version of {basename(url)}")
return out_path
@ -468,8 +459,7 @@ def package_version(prefix_path: Path) -> str:
def lib_platform_files(dirname, names):
"""A file filter that ignores platform-specific files in the lib directory.
"""
"""A file filter that ignores platform-specific files in lib."""
path = Path(dirname)
if (
path.parts[-3] == "lib"
@ -478,7 +468,7 @@ def lib_platform_files(dirname, names):
):
return names
elif path.parts[-2] == "lib" and path.parts[-1].startswith("python"):
ignored_names = set(
ignored_names = {
name
for name in names
if (
@ -486,7 +476,13 @@ def lib_platform_files(dirname, names):
or name.startswith("_sysconfig_vars_")
or name == "build-details.json"
)
)
}
elif path.parts[-1] == "lib":
ignored_names = {
name
for name in names
if name.startswith("libpython") and name.endswith(".dylib")
}
else:
ignored_names = set()
@ -499,7 +495,9 @@ def lib_non_platform_files(dirname, names):
"""
path = Path(dirname)
if path.parts[-2] == "lib" and path.parts[-1].startswith("python"):
return set(names) - lib_platform_files(dirname, names) - {"lib-dynload"}
return (
set(names) - lib_platform_files(dirname, names) - {"lib-dynload"}
)
else:
return set()
@ -514,7 +512,8 @@ def create_xcframework(platform: str) -> str:
package_path.mkdir()
except FileExistsError:
raise RuntimeError(
f"{platform} XCframework already exists; do you need to run with --clean?"
f"{platform} XCframework already exists; do you need to run "
"with --clean?"
) from None
frameworks = []
@ -607,7 +606,7 @@ def create_xcframework(platform: str) -> str:
print(f" - {slice_name} binaries")
shutil.copytree(first_path / "bin", slice_path / "bin")
# Copy the include path (this will be a symlink to the framework headers)
# Copy the include path (a symlink to the framework headers)
print(f" - {slice_name} include files")
shutil.copytree(
first_path / "include",
@ -621,6 +620,12 @@ def create_xcframework(platform: str) -> str:
slice_framework / "Headers/pyconfig.h",
)
print(f" - {slice_name} shared library")
# Create a simlink for the fat library
shared_lib = slice_path / f"lib/libpython{version_tag}.dylib"
shared_lib.parent.mkdir()
shared_lib.symlink_to("../Python.framework/Python")
print(f" - {slice_name} architecture-specific files")
for host_triple, multiarch in slice_parts.items():
print(f" - {multiarch} standard library")
@ -632,6 +637,7 @@ def create_xcframework(platform: str) -> str:
framework_path(host_triple, multiarch) / "lib",
package_path / "Python.xcframework/lib",
ignore=lib_platform_files,
symlinks=True,
)
has_common_stdlib = True
@ -639,6 +645,7 @@ def create_xcframework(platform: str) -> str:
framework_path(host_triple, multiarch) / "lib",
slice_path / f"lib-{arch}",
ignore=lib_non_platform_files,
symlinks=True,
)
# Copy the host's pyconfig.h to an architecture-specific name.
@ -659,7 +666,8 @@ def create_xcframework(platform: str) -> str:
# statically link those libraries into a Framework, you become
# responsible for providing a privacy manifest for that framework.
xcprivacy_file = {
"OpenSSL": subdir(host_triple) / "prefix/share/OpenSSL.xcprivacy"
"OpenSSL": subdir(host_triple)
/ "prefix/share/OpenSSL.xcprivacy"
}
print(f" - {multiarch} xcprivacy files")
for module, lib in [
@ -669,7 +677,8 @@ def create_xcframework(platform: str) -> str:
shutil.copy(
xcprivacy_file[lib],
slice_path
/ f"lib-{arch}/python{version_tag}/lib-dynload/{module}.xcprivacy",
/ f"lib-{arch}/python{version_tag}"
/ f"lib-dynload/{module}.xcprivacy",
)
print(" - build tools")
@ -692,18 +701,16 @@ def package(context: argparse.Namespace) -> None:
# Clone testbed
print()
run(
[
sys.executable,
"Apple/testbed",
"clone",
"--platform",
context.platform,
"--framework",
CROSS_BUILD_DIR / context.platform / "Python.xcframework",
CROSS_BUILD_DIR / context.platform / "testbed",
]
)
run([
sys.executable,
"Apple/testbed",
"clone",
"--platform",
context.platform,
"--framework",
CROSS_BUILD_DIR / context.platform / "Python.xcframework",
CROSS_BUILD_DIR / context.platform / "testbed",
])
# Build the final archive
archive_name = (
@ -757,7 +764,7 @@ def build(context: argparse.Namespace, host: str | None = None) -> None:
package(context)
def test(context: argparse.Namespace, host: str | None = None) -> None:
def test(context: argparse.Namespace, host: str | None = None) -> None: # noqa: PT028
"""The implementation of the "test" command."""
if host is None:
host = context.host
@ -795,18 +802,16 @@ def test(context: argparse.Namespace, host: str | None = None) -> None:
/ f"Frameworks/{apple_multiarch(host)}"
)
run(
[
sys.executable,
"Apple/testbed",
"clone",
"--platform",
context.platform,
"--framework",
framework_path,
testbed_dir,
]
)
run([
sys.executable,
"Apple/testbed",
"clone",
"--platform",
context.platform,
"--framework",
framework_path,
testbed_dir,
])
run(
[
@ -840,7 +845,7 @@ def apple_sim_host(platform_name: str) -> str:
"""Determine the native simulator target for this platform."""
for _, slice_parts in HOSTS[platform_name].items():
for host_triple in slice_parts:
parts = host_triple.split('-')
parts = host_triple.split("-")
if parts[0] == platform.machine() and parts[-1] == "simulator":
return host_triple
@ -968,20 +973,29 @@ def parse_args() -> argparse.Namespace:
cmd.add_argument(
"--simulator",
help=(
"The name of the simulator to use (eg: 'iPhone 16e'). Defaults to "
"the most recently released 'entry level' iPhone device. Device "
"architecture and OS version can also be specified; e.g., "
"`--simulator 'iPhone 16 Pro,arch=arm64,OS=26.0'` would run on "
"an ARM64 iPhone 16 Pro simulator running iOS 26.0."
"The name of the simulator to use (eg: 'iPhone 16e'). "
"Defaults to the most recently released 'entry level' "
"iPhone device. Device architecture and OS version can also "
"be specified; e.g., "
"`--simulator 'iPhone 16 Pro,arch=arm64,OS=26.0'` would "
"run on an ARM64 iPhone 16 Pro simulator running iOS 26.0."
),
)
group = cmd.add_mutually_exclusive_group()
group.add_argument(
"--fast-ci", action="store_const", dest="ci_mode", const="fast",
help="Add test arguments for GitHub Actions")
"--fast-ci",
action="store_const",
dest="ci_mode",
const="fast",
help="Add test arguments for GitHub Actions",
)
group.add_argument(
"--slow-ci", action="store_const", dest="ci_mode", const="slow",
help="Add test arguments for buildbots")
"--slow-ci",
action="store_const",
dest="ci_mode",
const="slow",
help="Add test arguments for buildbots",
)
for subcommand in [configure_build, configure_host, build, ci]:
subcommand.add_argument(

View file

@ -46,7 +46,8 @@ install_stdlib() {
rsync -au --delete "$PROJECT_DIR/$PYTHON_XCFRAMEWORK_PATH/lib/" "$CODESIGNING_FOLDER_PATH/python/lib/"
rsync -au "$PROJECT_DIR/$PYTHON_XCFRAMEWORK_PATH/$SLICE_FOLDER/lib-$ARCHS/" "$CODESIGNING_FOLDER_PATH/python/lib/"
else
rsync -au --delete "$PROJECT_DIR/$PYTHON_XCFRAMEWORK_PATH/$SLICE_FOLDER/lib/" "$CODESIGNING_FOLDER_PATH/python/lib/"
# A single-arch framework will have a libpython symlink; that can't be included at runtime
rsync -au --delete "$PROJECT_DIR/$PYTHON_XCFRAMEWORK_PATH/$SLICE_FOLDER/lib/" "$CODESIGNING_FOLDER_PATH/python/lib/" --exclude 'libpython*.dylib'
fi
}

View file

@ -32,15 +32,15 @@ def select_simulator_device(platform):
json_data = json.loads(raw_json)
if platform == "iOS":
# Any iOS device will do; we'll look for "SE" devices - but the name isn't
# consistent over time. Older Xcode versions will use "iPhone SE (Nth
# generation)"; As of 2025, they've started using "iPhone 16e".
# Any iOS device will do; we'll look for "SE" devices - but the name
# isn't consistent over time. Older Xcode versions will use "iPhone SE
# (Nth generation)"; As of 2025, they've started using "iPhone 16e".
#
# When Xcode is updated after a new release, new devices will be available
# and old ones will be dropped from the set available on the latest iOS
# version. Select the one with the highest minimum runtime version - this
# is an indicator of the "newest" released device, which should always be
# supported on the "most recent" iOS version.
# When Xcode is updated after a new release, new devices will be
# available and old ones will be dropped from the set available on the
# latest iOS version. Select the one with the highest minimum runtime
# version - this is an indicator of the "newest" released device, which
# should always be supported on the "most recent" iOS version.
se_simulators = sorted(
(devicetype["minRuntimeVersion"], devicetype["name"])
for devicetype in json_data["devicetypes"]
@ -295,7 +295,8 @@ def main():
parser = argparse.ArgumentParser(
description=(
"Manages the process of testing an Apple Python project through Xcode."
"Manages the process of testing an Apple Python project "
"through Xcode."
),
)
@ -336,7 +337,10 @@ def main():
run = subcommands.add_parser(
"run",
usage="%(prog)s [-h] [--simulator SIMULATOR] -- <test arg> [<test arg> ...]",
usage=(
"%(prog)s [-h] [--simulator SIMULATOR] -- "
"<test arg> [<test arg> ...]"
),
description=(
"Run a testbed project. The arguments provided after `--` will be "
"passed to the running iOS process as if they were arguments to "
@ -397,9 +401,9 @@ def main():
/ "bin"
).is_dir():
print(
f"Testbed does not contain a compiled Python framework. Use "
f"`python {sys.argv[0]} clone ...` to create a runnable "
f"clone of this testbed."
"Testbed does not contain a compiled Python framework. "
f"Use `python {sys.argv[0]} clone ...` to create a "
"runnable clone of this testbed."
)
sys.exit(20)
@ -411,7 +415,8 @@ def main():
)
else:
print(
f"Must specify test arguments (e.g., {sys.argv[0]} run -- test)"
"Must specify test arguments "
f"(e.g., {sys.argv[0]} run -- test)"
)
print()
parser.print_help(sys.stderr)

View file

@ -85,6 +85,35 @@ Object Protocol
instead of the :func:`repr`.
.. c:function:: void PyUnstable_Object_Dump(PyObject *op)
Dump an object *op* to ``stderr``. This should only be used for debugging.
The output is intended to try dumping objects even after memory corruption:
* Information is written starting with fields that are the least likely to
crash when accessed.
* This function can be called without an :term:`attached thread state`, but
it's not recommended to do so: it can cause deadlocks.
* An object that does not belong to the current interpreter may be dumped,
but this may also cause crashes or unintended behavior.
* Implement a heuristic to detect if the object memory has been freed. Don't
display the object contents in this case, only its memory address.
* The output format may change at any time.
Example of output:
.. code-block:: output
object address : 0x7f80124702c0
object refcount : 2
object type : 0x9902e0
object type name: str
object repr : 'abcdef'
.. versionadded:: next
.. c:function:: int PyObject_HasAttrWithError(PyObject *o, PyObject *attr_name)
Returns ``1`` if *o* has the attribute *attr_name*, and ``0`` otherwise.

View file

@ -698,14 +698,12 @@ The following flags can be used with :c:member:`PyMemberDef.flags`:
entry indicates an offset from the subclass-specific data, rather than
from ``PyObject``.
Can only be used as part of :c:member:`Py_tp_members <PyTypeObject.tp_members>`
Can only be used as part of the :c:data:`Py_tp_members`
:c:type:`slot <PyType_Slot>` when creating a class using negative
:c:member:`~PyType_Spec.basicsize`.
It is mandatory in that case.
This flag is only used in :c:type:`PyType_Slot`.
When setting :c:member:`~PyTypeObject.tp_members` during
class creation, Python clears it and sets
When setting :c:member:`~PyTypeObject.tp_members` from the slot during
class creation, Python clears the flag and sets
:c:member:`PyMemberDef.offset` to the offset from the ``PyObject`` struct.
.. index::

View file

@ -383,8 +383,8 @@ The following functions and structs are used to create
The *bases* argument can be used to specify base classes; it can either
be only one class or a tuple of classes.
If *bases* is ``NULL``, the *Py_tp_bases* slot is used instead.
If that also is ``NULL``, the *Py_tp_base* slot is used instead.
If *bases* is ``NULL``, the :c:data:`Py_tp_bases` slot is used instead.
If that also is ``NULL``, the :c:data:`Py_tp_base` slot is used instead.
If that also is ``NULL``, the new type derives from :class:`object`.
The *module* argument can be used to record the module in which the new
@ -590,9 +590,9 @@ The following functions and structs are used to create
:c:type:`PyAsyncMethods` with an added ``Py_`` prefix.
For example, use:
* ``Py_tp_dealloc`` to set :c:member:`PyTypeObject.tp_dealloc`
* ``Py_nb_add`` to set :c:member:`PyNumberMethods.nb_add`
* ``Py_sq_length`` to set :c:member:`PySequenceMethods.sq_length`
* :c:data:`Py_tp_dealloc` to set :c:member:`PyTypeObject.tp_dealloc`
* :c:data:`Py_nb_add` to set :c:member:`PyNumberMethods.nb_add`
* :c:data:`Py_sq_length` to set :c:member:`PySequenceMethods.sq_length`
An additional slot is supported that does not correspond to a
:c:type:`!PyTypeObject` struct field:
@ -611,7 +611,7 @@ The following functions and structs are used to create
If it is not possible to switch to a ``MANAGED`` flag (for example,
for vectorcall or to support Python older than 3.12), specify the
offset in :c:member:`Py_tp_members <PyTypeObject.tp_members>`.
offset in :c:data:`Py_tp_members`.
See :ref:`PyMemberDef documentation <pymemberdef-offsets>`
for details.
@ -639,7 +639,7 @@ The following functions and structs are used to create
.. versionchanged:: 3.14
The field :c:member:`~PyTypeObject.tp_vectorcall` can now set
using ``Py_tp_vectorcall``. See the field's documentation
using :c:data:`Py_tp_vectorcall`. See the field's documentation
for details.
.. c:member:: void *pfunc
@ -649,7 +649,7 @@ The following functions and structs are used to create
*pfunc* values may not be ``NULL``, except for the following slots:
* ``Py_tp_doc``
* :c:data:`Py_tp_doc`
* :c:data:`Py_tp_token` (for clarity, prefer :c:data:`Py_TP_USE_SPEC`
rather than ``NULL``)

View file

@ -2273,7 +2273,7 @@ and :c:data:`PyType_Type` effectively act as defaults.)
This field should be set to ``NULL`` and treated as read-only.
Python will fill it in when the type is :c:func:`initialized <PyType_Ready>`.
For dynamically created classes, the ``Py_tp_bases``
For dynamically created classes, the :c:data:`Py_tp_bases`
:c:type:`slot <PyType_Slot>` can be used instead of the *bases* argument
of :c:func:`PyType_FromSpecWithBases`.
The argument form is preferred.

View file

@ -353,7 +353,7 @@ garbage collection protocol.
That is, heap types should:
- Have the :c:macro:`Py_TPFLAGS_HAVE_GC` flag.
- Define a traverse function using ``Py_tp_traverse``, which
- Define a traverse function using :c:data:`Py_tp_traverse`, which
visits the type (e.g. using ``Py_VISIT(Py_TYPE(self))``).
Please refer to the documentation of

View file

@ -1084,19 +1084,23 @@ New features
(Contributed by Victor Stinner in :gh:`129813`.)
* Add a new :c:func:`PyImport_CreateModuleFromInitfunc` C-API for creating
a module from a *spec* and *initfunc*.
(Contributed by Itamar Oren in :gh:`116146`.)
* Add :c:func:`PyTuple_FromArray` to create a :class:`tuple` from an array.
(Contributed by Victor Stinner in :gh:`111489`.)
* Add :c:func:`PyUnstable_Object_Dump` to dump an object to ``stderr``.
It should only be used for debugging.
(Contributed by Victor Stinner in :gh:`141070`.)
* Add :c:func:`PyUnstable_ThreadState_SetStackProtection` and
:c:func:`PyUnstable_ThreadState_ResetStackProtection` functions to set
the stack protection base address and stack protection size of a Python
thread state.
(Contributed by Victor Stinner in :gh:`139653`.)
* Add a new :c:func:`PyImport_CreateModuleFromInitfunc` C-API for creating
a module from a *spec* and *initfunc*.
(Contributed by Itamar Oren in :gh:`116146`.)
Changed C APIs
--------------

View file

@ -295,7 +295,10 @@ PyAPI_FUNC(PyObject *) PyType_GetDict(PyTypeObject *);
PyAPI_FUNC(int) PyObject_Print(PyObject *, FILE *, int);
PyAPI_FUNC(void) _Py_BreakPoint(void);
PyAPI_FUNC(void) _PyObject_Dump(PyObject *);
PyAPI_FUNC(void) PyUnstable_Object_Dump(PyObject *);
// Alias for backward compatibility
#define _PyObject_Dump PyUnstable_Object_Dump
PyAPI_FUNC(PyObject*) _PyObject_GetAttrId(PyObject *, _Py_Identifier *);
@ -387,10 +390,11 @@ PyAPI_FUNC(PyObject *) _PyObject_FunctionStr(PyObject *);
process with a message on stderr if the given condition fails to hold,
but compile away to nothing if NDEBUG is defined.
However, before aborting, Python will also try to call _PyObject_Dump() on
the given object. This may be of use when investigating bugs in which a
particular object is corrupt (e.g. buggy a tp_visit method in an extension
module breaking the garbage collector), to help locate the broken objects.
However, before aborting, Python will also try to call
PyUnstable_Object_Dump() on the given object. This may be of use when
investigating bugs in which a particular object is corrupt (e.g. buggy a
tp_visit method in an extension module breaking the garbage collector), to
help locate the broken objects.
The WITH_MSG variant allows you to supply an additional message that Python
will attempt to print to stderr, after the object dump. */

View file

@ -13,7 +13,7 @@ static inline void
_PyStaticObject_CheckRefcnt(PyObject *obj) {
if (!_Py_IsImmortal(obj)) {
fprintf(stderr, "Immortal Object has less refcnt than expected.\n");
_PyObject_Dump(obj);
PyUnstable_Object_Dump(obj);
}
}
#endif

View file

@ -264,17 +264,6 @@ def floatstr(o, allow_nan=self.allow_nan,
def _make_iterencode(markers, _default, _encoder, _indent, _floatstr,
_key_separator, _item_separator, _sort_keys, _skipkeys, _one_shot,
## HACK: hand-optimized bytecode; turn globals into locals
ValueError=ValueError,
dict=dict,
float=float,
id=id,
int=int,
isinstance=isinstance,
list=list,
str=str,
tuple=tuple,
_intstr=int.__repr__,
):
def _iterencode_list(lst, _current_indent_level):
@ -311,7 +300,7 @@ def _iterencode_list(lst, _current_indent_level):
# Subclasses of int/float may override __repr__, but we still
# want to encode them as integers/floats in JSON. One example
# within the standard library is IntEnum.
yield buf + _intstr(value)
yield buf + int.__repr__(value)
elif isinstance(value, float):
# see comment above for int
yield buf + _floatstr(value)
@ -374,7 +363,7 @@ def _iterencode_dict(dct, _current_indent_level):
key = 'null'
elif isinstance(key, int):
# see comment for int/float in _make_iterencode
key = _intstr(key)
key = int.__repr__(key)
elif _skipkeys:
continue
else:
@ -399,7 +388,7 @@ def _iterencode_dict(dct, _current_indent_level):
yield 'false'
elif isinstance(value, int):
# see comment for int/float in _make_iterencode
yield _intstr(value)
yield int.__repr__(value)
elif isinstance(value, float):
# see comment for int/float in _make_iterencode
yield _floatstr(value)
@ -434,7 +423,7 @@ def _iterencode(o, _current_indent_level):
yield 'false'
elif isinstance(o, int):
# see comment for int/float in _make_iterencode
yield _intstr(o)
yield int.__repr__(o)
elif isinstance(o, float):
# see comment for int/float in _make_iterencode
yield _floatstr(o)
@ -458,4 +447,13 @@ def _iterencode(o, _current_indent_level):
raise
if markers is not None:
del markers[markerid]
return _iterencode
def _iterencode_once(o, _current_indent_level):
nonlocal _iterencode, _iterencode_dict, _iterencode_list
try:
yield from _iterencode(o, _current_indent_level)
finally:
# Break reference cycles due to mutually recursive closures:
del _iterencode, _iterencode_dict, _iterencode_list
return _iterencode_once

View file

@ -1,4 +1,5 @@
import enum
import os
import sys
import textwrap
import unittest
@ -13,6 +14,9 @@
_testcapi = import_helper.import_module('_testcapi')
_testinternalcapi = import_helper.import_module('_testinternalcapi')
NULL = None
STDERR_FD = 2
class Constant(enum.IntEnum):
Py_CONSTANT_NONE = 0
@ -247,5 +251,53 @@ def func(x):
func(object())
def pyobject_dump(self, obj, release_gil=False):
pyobject_dump = _testcapi.pyobject_dump
try:
old_stderr = os.dup(STDERR_FD)
except OSError as exc:
# os.dup(STDERR_FD) is not supported on WASI
self.skipTest(f"os.dup() failed with {exc!r}")
filename = os_helper.TESTFN
try:
try:
with open(filename, "wb") as fp:
fd = fp.fileno()
os.dup2(fd, STDERR_FD)
pyobject_dump(obj, release_gil)
finally:
os.dup2(old_stderr, STDERR_FD)
os.close(old_stderr)
with open(filename) as fp:
return fp.read().rstrip()
finally:
os_helper.unlink(filename)
def test_pyobject_dump(self):
# test string object
str_obj = 'test string'
output = self.pyobject_dump(str_obj)
hex_regex = r'(0x)?[0-9a-fA-F]+'
regex = (
fr"object address : {hex_regex}\n"
r"object refcount : [0-9]+\n"
fr"object type : {hex_regex}\n"
r"object type name: str\n"
r"object repr : 'test string'"
)
self.assertRegex(output, regex)
# release the GIL
output = self.pyobject_dump(str_obj, release_gil=True)
self.assertRegex(output, regex)
# test NULL object
output = self.pyobject_dump(NULL)
self.assertRegex(output, r'<object at .* is freed>')
if __name__ == "__main__":
unittest.main()

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,9 @@
"""Tests for the sampling profiler (profiling.sampling)."""
import os
from test.support import load_package_tests
def load_tests(*args):
"""Load all tests from this subpackage."""
return load_package_tests(os.path.dirname(__file__), *args)

View file

@ -0,0 +1,101 @@
"""Helper utilities for sampling profiler tests."""
import contextlib
import socket
import subprocess
import sys
import unittest
from collections import namedtuple
from test.support import SHORT_TIMEOUT
from test.support.socket_helper import find_unused_port
from test.support.os_helper import unlink
PROCESS_VM_READV_SUPPORTED = False
try:
from _remote_debugging import PROCESS_VM_READV_SUPPORTED # noqa: F401
import _remote_debugging # noqa: F401
except ImportError:
raise unittest.SkipTest(
"Test only runs when _remote_debugging is available"
)
else:
import profiling.sampling # noqa: F401
from profiling.sampling.sample import SampleProfiler # noqa: F401
skip_if_not_supported = unittest.skipIf(
(
sys.platform != "darwin"
and sys.platform != "linux"
and sys.platform != "win32"
),
"Test only runs on Linux, Windows and MacOS",
)
SubprocessInfo = namedtuple("SubprocessInfo", ["process", "socket"])
@contextlib.contextmanager
def test_subprocess(script):
"""Context manager to create a test subprocess with socket synchronization.
Args:
script: Python code to execute in the subprocess
Yields:
SubprocessInfo: Named tuple with process and socket objects
"""
# Find an unused port for socket communication
port = find_unused_port()
# Inject socket connection code at the beginning of the script
socket_code = f"""
import socket
_test_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
_test_sock.connect(('localhost', {port}))
_test_sock.sendall(b"ready")
"""
# Combine socket code with user script
full_script = socket_code + script
# Create server socket to wait for process to be ready
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(("localhost", port))
server_socket.settimeout(SHORT_TIMEOUT)
server_socket.listen(1)
proc = subprocess.Popen(
[sys.executable, "-c", full_script],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
client_socket = None
try:
# Wait for process to connect and send ready signal
client_socket, _ = server_socket.accept()
server_socket.close()
response = client_socket.recv(1024)
if response != b"ready":
raise RuntimeError(
f"Unexpected response from subprocess: {response}"
)
yield SubprocessInfo(proc, client_socket)
finally:
if client_socket is not None:
client_socket.close()
if proc.poll() is None:
proc.kill()
proc.wait()
def close_and_unlink(file):
"""Close a file and unlink it from the filesystem."""
file.close()
unlink(file.name)

View file

@ -0,0 +1,38 @@
"""Mock classes for sampling profiler tests."""
class MockFrameInfo:
"""Mock FrameInfo for testing since the real one isn't accessible."""
def __init__(self, filename, lineno, funcname):
self.filename = filename
self.lineno = lineno
self.funcname = funcname
def __repr__(self):
return f"MockFrameInfo(filename='{self.filename}', lineno={self.lineno}, funcname='{self.funcname}')"
class MockThreadInfo:
"""Mock ThreadInfo for testing since the real one isn't accessible."""
def __init__(
self, thread_id, frame_info, status=0
): # Default to THREAD_STATE_RUNNING (0)
self.thread_id = thread_id
self.frame_info = frame_info
self.status = status
def __repr__(self):
return f"MockThreadInfo(thread_id={self.thread_id}, frame_info={self.frame_info}, status={self.status})"
class MockInterpreterInfo:
"""Mock InterpreterInfo for testing since the real one isn't accessible."""
def __init__(self, interpreter_id, threads):
self.interpreter_id = interpreter_id
self.threads = threads
def __repr__(self):
return f"MockInterpreterInfo(interpreter_id={self.interpreter_id}, threads={self.threads})"

View file

@ -0,0 +1,264 @@
"""Tests for advanced sampling profiler features (GC tracking, native frames, ProcessPoolExecutor support)."""
import io
import os
import subprocess
import tempfile
import unittest
from unittest import mock
try:
import _remote_debugging # noqa: F401
import profiling.sampling
import profiling.sampling.sample
except ImportError:
raise unittest.SkipTest(
"Test only runs when _remote_debugging is available"
)
from test.support import (
SHORT_TIMEOUT,
SuppressCrashReport,
os_helper,
requires_subprocess,
script_helper,
)
from .helpers import close_and_unlink, skip_if_not_supported, test_subprocess
@requires_subprocess()
@skip_if_not_supported
class TestGCFrameTracking(unittest.TestCase):
"""Tests for GC frame tracking in the sampling profiler."""
@classmethod
def setUpClass(cls):
"""Create a static test script with GC frames and CPU-intensive work."""
cls.gc_test_script = '''
import gc
class ExpensiveGarbage:
"""Class that triggers GC with expensive finalizer (callback)."""
def __init__(self):
self.cycle = self
def __del__(self):
# CPU-intensive work in the finalizer callback
result = 0
for i in range(100000):
result += i * i
if i % 1000 == 0:
result = result % 1000000
def main_loop():
"""Main loop that triggers GC with expensive callback."""
while True:
ExpensiveGarbage()
gc.collect()
if __name__ == "__main__":
main_loop()
'''
def test_gc_frames_enabled(self):
"""Test that GC frames appear when gc tracking is enabled."""
with (
test_subprocess(self.gc_test_script) as subproc,
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
profiling.sampling.sample.sample(
subproc.process.pid,
duration_sec=1,
sample_interval_usec=5000,
show_summary=False,
native=False,
gc=True,
)
except PermissionError:
self.skipTest("Insufficient permissions for remote profiling")
output = captured_output.getvalue()
# Should capture samples
self.assertIn("Captured", output)
self.assertIn("samples", output)
# GC frames should be present
self.assertIn("<GC>", output)
def test_gc_frames_disabled(self):
"""Test that GC frames do not appear when gc tracking is disabled."""
with (
test_subprocess(self.gc_test_script) as subproc,
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
profiling.sampling.sample.sample(
subproc.process.pid,
duration_sec=1,
sample_interval_usec=5000,
show_summary=False,
native=False,
gc=False,
)
except PermissionError:
self.skipTest("Insufficient permissions for remote profiling")
output = captured_output.getvalue()
# Should capture samples
self.assertIn("Captured", output)
self.assertIn("samples", output)
# GC frames should NOT be present
self.assertNotIn("<GC>", output)
@requires_subprocess()
@skip_if_not_supported
class TestNativeFrameTracking(unittest.TestCase):
"""Tests for native frame tracking in the sampling profiler."""
@classmethod
def setUpClass(cls):
"""Create a static test script with native frames and CPU-intensive work."""
cls.native_test_script = """
import operator
def main_loop():
while True:
# Native code in the middle of the stack:
operator.call(inner)
def inner():
# Python code at the top of the stack:
for _ in range(1_000_0000):
pass
if __name__ == "__main__":
main_loop()
"""
def test_native_frames_enabled(self):
"""Test that native frames appear when native tracking is enabled."""
collapsed_file = tempfile.NamedTemporaryFile(
suffix=".txt", delete=False
)
self.addCleanup(close_and_unlink, collapsed_file)
with (
test_subprocess(self.native_test_script) as subproc,
):
# Suppress profiler output when testing file export
with (
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
profiling.sampling.sample.sample(
subproc.process.pid,
duration_sec=1,
filename=collapsed_file.name,
output_format="collapsed",
sample_interval_usec=1000,
native=True,
)
except PermissionError:
self.skipTest(
"Insufficient permissions for remote profiling"
)
# Verify file was created and contains valid data
self.assertTrue(os.path.exists(collapsed_file.name))
self.assertGreater(os.path.getsize(collapsed_file.name), 0)
# Check file format
with open(collapsed_file.name, "r") as f:
content = f.read()
lines = content.strip().split("\n")
self.assertGreater(len(lines), 0)
stacks = [line.rsplit(" ", 1)[0] for line in lines]
# Most samples should have native code in the middle of the stack:
self.assertTrue(any(";<native>;" in stack for stack in stacks))
# No samples should have native code at the top of the stack:
self.assertFalse(any(stack.endswith(";<native>") for stack in stacks))
def test_native_frames_disabled(self):
"""Test that native frames do not appear when native tracking is disabled."""
with (
test_subprocess(self.native_test_script) as subproc,
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
profiling.sampling.sample.sample(
subproc.process.pid,
duration_sec=1,
sample_interval_usec=5000,
show_summary=False,
)
except PermissionError:
self.skipTest("Insufficient permissions for remote profiling")
output = captured_output.getvalue()
# Native frames should NOT be present:
self.assertNotIn("<native>", output)
@requires_subprocess()
@skip_if_not_supported
class TestProcessPoolExecutorSupport(unittest.TestCase):
"""
Test that ProcessPoolExecutor works correctly with profiling.sampling.
"""
def test_process_pool_executor_pickle(self):
# gh-140729: test use ProcessPoolExecutor.map() can sampling
test_script = """
import concurrent.futures
def worker(x):
return x * 2
if __name__ == "__main__":
with concurrent.futures.ProcessPoolExecutor() as executor:
results = list(executor.map(worker, [1, 2, 3]))
print(f"Results: {results}")
"""
with os_helper.temp_dir() as temp_dir:
script = script_helper.make_script(
temp_dir, "test_process_pool_executor_pickle", test_script
)
with SuppressCrashReport():
with script_helper.spawn_python(
"-m",
"profiling.sampling.sample",
"-d",
"5",
"-i",
"100000",
script,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
) as proc:
try:
stdout, stderr = proc.communicate(
timeout=SHORT_TIMEOUT
)
except subprocess.TimeoutExpired:
proc.kill()
stdout, stderr = proc.communicate()
if "PermissionError" in stderr:
self.skipTest("Insufficient permissions for remote profiling")
self.assertIn("Results: [2, 4, 6]", stdout)
self.assertNotIn("Can't pickle", stderr)

View file

@ -0,0 +1,664 @@
"""Tests for sampling profiler CLI argument parsing and functionality."""
import io
import subprocess
import sys
import unittest
from unittest import mock
try:
import _remote_debugging # noqa: F401
import profiling.sampling
import profiling.sampling.sample
except ImportError:
raise unittest.SkipTest(
"Test only runs when _remote_debugging is available"
)
from test.support import is_emscripten
class TestSampleProfilerCLI(unittest.TestCase):
def _setup_sync_mocks(self, mock_socket, mock_popen):
"""Helper to set up socket and process mocks for coordinator tests."""
# Mock the sync socket with context manager support
mock_sock_instance = mock.MagicMock()
mock_sock_instance.getsockname.return_value = ("127.0.0.1", 12345)
# Mock the connection with context manager support
mock_conn = mock.MagicMock()
mock_conn.recv.return_value = b"ready"
mock_conn.__enter__.return_value = mock_conn
mock_conn.__exit__.return_value = None
# Mock accept() to return (connection, address) and support indexing
mock_accept_result = mock.MagicMock()
mock_accept_result.__getitem__.return_value = (
mock_conn # [0] returns the connection
)
mock_sock_instance.accept.return_value = mock_accept_result
# Mock socket with context manager support
mock_sock_instance.__enter__.return_value = mock_sock_instance
mock_sock_instance.__exit__.return_value = None
mock_socket.return_value = mock_sock_instance
# Mock the subprocess
mock_process = mock.MagicMock()
mock_process.pid = 12345
mock_process.poll.return_value = None
mock_popen.return_value = mock_process
return mock_process
def _verify_coordinator_command(self, mock_popen, expected_target_args):
"""Helper to verify the coordinator command was called correctly."""
args, kwargs = mock_popen.call_args
coordinator_cmd = args[0]
self.assertEqual(coordinator_cmd[0], sys.executable)
self.assertEqual(coordinator_cmd[1], "-m")
self.assertEqual(
coordinator_cmd[2], "profiling.sampling._sync_coordinator"
)
self.assertEqual(coordinator_cmd[3], "12345") # port
# cwd is coordinator_cmd[4]
self.assertEqual(coordinator_cmd[5:], expected_target_args)
@unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist")
def test_cli_module_argument_parsing(self):
test_args = ["profiling.sampling.sample", "-m", "mymodule"]
with (
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.sample.sample") as mock_sample,
mock.patch("subprocess.Popen") as mock_popen,
mock.patch("socket.socket") as mock_socket,
):
self._setup_sync_mocks(mock_socket, mock_popen)
profiling.sampling.sample.main()
self._verify_coordinator_command(mock_popen, ("-m", "mymodule"))
mock_sample.assert_called_once_with(
12345,
sort=2, # default sort (sort_value from args.sort)
sample_interval_usec=100,
duration_sec=10,
filename=None,
all_threads=False,
limit=15,
show_summary=True,
output_format="pstats",
realtime_stats=False,
mode=0,
native=False,
gc=True,
)
@unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist")
def test_cli_module_with_arguments(self):
test_args = [
"profiling.sampling.sample",
"-m",
"mymodule",
"arg1",
"arg2",
"--flag",
]
with (
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.sample.sample") as mock_sample,
mock.patch("subprocess.Popen") as mock_popen,
mock.patch("socket.socket") as mock_socket,
):
self._setup_sync_mocks(mock_socket, mock_popen)
profiling.sampling.sample.main()
self._verify_coordinator_command(
mock_popen, ("-m", "mymodule", "arg1", "arg2", "--flag")
)
mock_sample.assert_called_once_with(
12345,
sort=2,
sample_interval_usec=100,
duration_sec=10,
filename=None,
all_threads=False,
limit=15,
show_summary=True,
output_format="pstats",
realtime_stats=False,
mode=0,
native=False,
gc=True,
)
@unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist")
def test_cli_script_argument_parsing(self):
test_args = ["profiling.sampling.sample", "myscript.py"]
with (
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.sample.sample") as mock_sample,
mock.patch("subprocess.Popen") as mock_popen,
mock.patch("socket.socket") as mock_socket,
):
self._setup_sync_mocks(mock_socket, mock_popen)
profiling.sampling.sample.main()
self._verify_coordinator_command(mock_popen, ("myscript.py",))
mock_sample.assert_called_once_with(
12345,
sort=2,
sample_interval_usec=100,
duration_sec=10,
filename=None,
all_threads=False,
limit=15,
show_summary=True,
output_format="pstats",
realtime_stats=False,
mode=0,
native=False,
gc=True,
)
@unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist")
def test_cli_script_with_arguments(self):
test_args = [
"profiling.sampling.sample",
"myscript.py",
"arg1",
"arg2",
"--flag",
]
with (
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.sample.sample") as mock_sample,
mock.patch("subprocess.Popen") as mock_popen,
mock.patch("socket.socket") as mock_socket,
):
# Use the helper to set up mocks consistently
mock_process = self._setup_sync_mocks(mock_socket, mock_popen)
# Override specific behavior for this test
mock_process.wait.side_effect = [
subprocess.TimeoutExpired(test_args, 0.1),
None,
]
profiling.sampling.sample.main()
# Verify the coordinator command was called
args, kwargs = mock_popen.call_args
coordinator_cmd = args[0]
self.assertEqual(coordinator_cmd[0], sys.executable)
self.assertEqual(coordinator_cmd[1], "-m")
self.assertEqual(
coordinator_cmd[2], "profiling.sampling._sync_coordinator"
)
self.assertEqual(coordinator_cmd[3], "12345") # port
# cwd is coordinator_cmd[4]
self.assertEqual(
coordinator_cmd[5:], ("myscript.py", "arg1", "arg2", "--flag")
)
def test_cli_mutually_exclusive_pid_module(self):
test_args = [
"profiling.sampling.sample",
"-p",
"12345",
"-m",
"mymodule",
]
with (
mock.patch("sys.argv", test_args),
mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
self.assertRaises(SystemExit) as cm,
):
profiling.sampling.sample.main()
self.assertEqual(cm.exception.code, 2) # argparse error
error_msg = mock_stderr.getvalue()
self.assertIn("not allowed with argument", error_msg)
def test_cli_mutually_exclusive_pid_script(self):
test_args = ["profiling.sampling.sample", "-p", "12345", "myscript.py"]
with (
mock.patch("sys.argv", test_args),
mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
self.assertRaises(SystemExit) as cm,
):
profiling.sampling.sample.main()
self.assertEqual(cm.exception.code, 2) # argparse error
error_msg = mock_stderr.getvalue()
self.assertIn("only one target type can be specified", error_msg)
def test_cli_no_target_specified(self):
test_args = ["profiling.sampling.sample", "-d", "5"]
with (
mock.patch("sys.argv", test_args),
mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
self.assertRaises(SystemExit) as cm,
):
profiling.sampling.sample.main()
self.assertEqual(cm.exception.code, 2) # argparse error
error_msg = mock_stderr.getvalue()
self.assertIn("one of the arguments", error_msg)
@unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist")
def test_cli_module_with_profiler_options(self):
test_args = [
"profiling.sampling.sample",
"-i",
"1000",
"-d",
"30",
"-a",
"--sort-tottime",
"-l",
"20",
"-m",
"mymodule",
]
with (
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.sample.sample") as mock_sample,
mock.patch("subprocess.Popen") as mock_popen,
mock.patch("socket.socket") as mock_socket,
):
self._setup_sync_mocks(mock_socket, mock_popen)
profiling.sampling.sample.main()
self._verify_coordinator_command(mock_popen, ("-m", "mymodule"))
mock_sample.assert_called_once_with(
12345,
sort=1, # sort-tottime
sample_interval_usec=1000,
duration_sec=30,
filename=None,
all_threads=True,
limit=20,
show_summary=True,
output_format="pstats",
realtime_stats=False,
mode=0,
native=False,
gc=True,
)
@unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist")
def test_cli_script_with_profiler_options(self):
"""Test script with various profiler options."""
test_args = [
"profiling.sampling.sample",
"-i",
"2000",
"-d",
"60",
"--collapsed",
"-o",
"output.txt",
"myscript.py",
"scriptarg",
]
with (
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.sample.sample") as mock_sample,
mock.patch("subprocess.Popen") as mock_popen,
mock.patch("socket.socket") as mock_socket,
):
self._setup_sync_mocks(mock_socket, mock_popen)
profiling.sampling.sample.main()
self._verify_coordinator_command(
mock_popen, ("myscript.py", "scriptarg")
)
# Verify profiler options were passed correctly
mock_sample.assert_called_once_with(
12345,
sort=2, # default sort
sample_interval_usec=2000,
duration_sec=60,
filename="output.txt",
all_threads=False,
limit=15,
show_summary=True,
output_format="collapsed",
realtime_stats=False,
mode=0,
native=False,
gc=True,
)
def test_cli_empty_module_name(self):
test_args = ["profiling.sampling.sample", "-m"]
with (
mock.patch("sys.argv", test_args),
mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
self.assertRaises(SystemExit) as cm,
):
profiling.sampling.sample.main()
self.assertEqual(cm.exception.code, 2) # argparse error
error_msg = mock_stderr.getvalue()
self.assertIn("argument -m/--module: expected one argument", error_msg)
@unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist")
def test_cli_long_module_option(self):
test_args = [
"profiling.sampling.sample",
"--module",
"mymodule",
"arg1",
]
with (
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.sample.sample") as mock_sample,
mock.patch("subprocess.Popen") as mock_popen,
mock.patch("socket.socket") as mock_socket,
):
self._setup_sync_mocks(mock_socket, mock_popen)
profiling.sampling.sample.main()
self._verify_coordinator_command(
mock_popen, ("-m", "mymodule", "arg1")
)
def test_cli_complex_script_arguments(self):
test_args = [
"profiling.sampling.sample",
"script.py",
"--input",
"file.txt",
"-v",
"--output=/tmp/out",
"positional",
]
with (
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.sample.sample") as mock_sample,
mock.patch(
"profiling.sampling.sample._run_with_sync"
) as mock_run_with_sync,
):
mock_process = mock.MagicMock()
mock_process.pid = 12345
mock_process.wait.side_effect = [
subprocess.TimeoutExpired(test_args, 0.1),
None,
]
mock_process.poll.return_value = None
mock_run_with_sync.return_value = mock_process
profiling.sampling.sample.main()
mock_run_with_sync.assert_called_once_with(
(
sys.executable,
"script.py",
"--input",
"file.txt",
"-v",
"--output=/tmp/out",
"positional",
)
)
def test_cli_collapsed_format_validation(self):
"""Test that CLI properly validates incompatible options with collapsed format."""
test_cases = [
# Test sort options are invalid with collapsed
(
[
"profiling.sampling.sample",
"--collapsed",
"--sort-nsamples",
"-p",
"12345",
],
"sort",
),
(
[
"profiling.sampling.sample",
"--collapsed",
"--sort-tottime",
"-p",
"12345",
],
"sort",
),
(
[
"profiling.sampling.sample",
"--collapsed",
"--sort-cumtime",
"-p",
"12345",
],
"sort",
),
(
[
"profiling.sampling.sample",
"--collapsed",
"--sort-sample-pct",
"-p",
"12345",
],
"sort",
),
(
[
"profiling.sampling.sample",
"--collapsed",
"--sort-cumul-pct",
"-p",
"12345",
],
"sort",
),
(
[
"profiling.sampling.sample",
"--collapsed",
"--sort-name",
"-p",
"12345",
],
"sort",
),
# Test limit option is invalid with collapsed
(
[
"profiling.sampling.sample",
"--collapsed",
"-l",
"20",
"-p",
"12345",
],
"limit",
),
(
[
"profiling.sampling.sample",
"--collapsed",
"--limit",
"20",
"-p",
"12345",
],
"limit",
),
# Test no-summary option is invalid with collapsed
(
[
"profiling.sampling.sample",
"--collapsed",
"--no-summary",
"-p",
"12345",
],
"summary",
),
]
for test_args, expected_error_keyword in test_cases:
with (
mock.patch("sys.argv", test_args),
mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
self.assertRaises(SystemExit) as cm,
):
profiling.sampling.sample.main()
self.assertEqual(cm.exception.code, 2) # argparse error code
error_msg = mock_stderr.getvalue()
self.assertIn("error:", error_msg)
self.assertIn("--pstats format", error_msg)
def test_cli_default_collapsed_filename(self):
"""Test that collapsed format gets a default filename when not specified."""
test_args = ["profiling.sampling.sample", "--collapsed", "-p", "12345"]
with (
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.sample.sample") as mock_sample,
):
profiling.sampling.sample.main()
# Check that filename was set to default collapsed format
mock_sample.assert_called_once()
call_args = mock_sample.call_args[1]
self.assertEqual(call_args["output_format"], "collapsed")
self.assertEqual(call_args["filename"], "collapsed.12345.txt")
def test_cli_custom_output_filenames(self):
"""Test custom output filenames for both formats."""
test_cases = [
(
[
"profiling.sampling.sample",
"--pstats",
"-o",
"custom.pstats",
"-p",
"12345",
],
"custom.pstats",
"pstats",
),
(
[
"profiling.sampling.sample",
"--collapsed",
"-o",
"custom.txt",
"-p",
"12345",
],
"custom.txt",
"collapsed",
),
]
for test_args, expected_filename, expected_format in test_cases:
with (
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.sample.sample") as mock_sample,
):
profiling.sampling.sample.main()
mock_sample.assert_called_once()
call_args = mock_sample.call_args[1]
self.assertEqual(call_args["filename"], expected_filename)
self.assertEqual(call_args["output_format"], expected_format)
def test_cli_missing_required_arguments(self):
"""Test that CLI requires PID argument."""
with (
mock.patch("sys.argv", ["profiling.sampling.sample"]),
mock.patch("sys.stderr", io.StringIO()),
):
with self.assertRaises(SystemExit):
profiling.sampling.sample.main()
def test_cli_mutually_exclusive_format_options(self):
"""Test that pstats and collapsed options are mutually exclusive."""
with (
mock.patch(
"sys.argv",
[
"profiling.sampling.sample",
"--pstats",
"--collapsed",
"-p",
"12345",
],
),
mock.patch("sys.stderr", io.StringIO()),
):
with self.assertRaises(SystemExit):
profiling.sampling.sample.main()
def test_argument_parsing_basic(self):
test_args = ["profiling.sampling.sample", "-p", "12345"]
with (
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.sample.sample") as mock_sample,
):
profiling.sampling.sample.main()
mock_sample.assert_called_once_with(
12345,
sample_interval_usec=100,
duration_sec=10,
filename=None,
all_threads=False,
limit=15,
sort=2,
show_summary=True,
output_format="pstats",
realtime_stats=False,
mode=0,
native=False,
gc=True,
)
def test_sort_options(self):
sort_options = [
("--sort-nsamples", 0),
("--sort-tottime", 1),
("--sort-cumtime", 2),
("--sort-sample-pct", 3),
("--sort-cumul-pct", 4),
("--sort-name", -1),
]
for option, expected_sort_value in sort_options:
test_args = ["profiling.sampling.sample", option, "-p", "12345"]
with (
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.sample.sample") as mock_sample,
):
profiling.sampling.sample.main()
mock_sample.assert_called_once()
call_args = mock_sample.call_args[1]
self.assertEqual(
call_args["sort"],
expected_sort_value,
)
mock_sample.reset_mock()

View file

@ -0,0 +1,896 @@
"""Tests for sampling profiler collector components."""
import json
import marshal
import os
import tempfile
import unittest
try:
import _remote_debugging # noqa: F401
from profiling.sampling.pstats_collector import PstatsCollector
from profiling.sampling.stack_collector import (
CollapsedStackCollector,
FlamegraphCollector,
)
from profiling.sampling.gecko_collector import GeckoCollector
except ImportError:
raise unittest.SkipTest(
"Test only runs when _remote_debugging is available"
)
from test.support import captured_stdout, captured_stderr
from .mocks import MockFrameInfo, MockThreadInfo, MockInterpreterInfo
from .helpers import close_and_unlink
class TestSampleProfilerComponents(unittest.TestCase):
"""Unit tests for individual profiler components."""
def test_mock_frame_info_with_empty_and_unicode_values(self):
"""Test MockFrameInfo handles empty strings, unicode characters, and very long names correctly."""
# Test with empty strings
frame = MockFrameInfo("", 0, "")
self.assertEqual(frame.filename, "")
self.assertEqual(frame.lineno, 0)
self.assertEqual(frame.funcname, "")
self.assertIn("filename=''", repr(frame))
# Test with unicode characters
frame = MockFrameInfo("文件.py", 42, "函数名")
self.assertEqual(frame.filename, "文件.py")
self.assertEqual(frame.funcname, "函数名")
# Test with very long names
long_filename = "x" * 1000 + ".py"
long_funcname = "func_" + "x" * 1000
frame = MockFrameInfo(long_filename, 999999, long_funcname)
self.assertEqual(frame.filename, long_filename)
self.assertEqual(frame.lineno, 999999)
self.assertEqual(frame.funcname, long_funcname)
def test_pstats_collector_with_extreme_intervals_and_empty_data(self):
"""Test PstatsCollector handles zero/large intervals, empty frames, None thread IDs, and duplicate frames."""
# Test with zero interval
collector = PstatsCollector(sample_interval_usec=0)
self.assertEqual(collector.sample_interval_usec, 0)
# Test with very large interval
collector = PstatsCollector(sample_interval_usec=1000000000)
self.assertEqual(collector.sample_interval_usec, 1000000000)
# Test collecting empty frames list
collector = PstatsCollector(sample_interval_usec=1000)
collector.collect([])
self.assertEqual(len(collector.result), 0)
# Test collecting frames with None thread id
test_frames = [
MockInterpreterInfo(
0,
[MockThreadInfo(None, [MockFrameInfo("file.py", 10, "func")])],
)
]
collector.collect(test_frames)
# Should still process the frames
self.assertEqual(len(collector.result), 1)
# Test collecting duplicate frames in same sample
test_frames = [
MockInterpreterInfo(
0, # interpreter_id
[
MockThreadInfo(
1,
[
MockFrameInfo("file.py", 10, "func1"),
MockFrameInfo("file.py", 10, "func1"), # Duplicate
],
)
],
)
]
collector = PstatsCollector(sample_interval_usec=1000)
collector.collect(test_frames)
# Should count both occurrences
self.assertEqual(
collector.result[("file.py", 10, "func1")]["cumulative_calls"], 2
)
def test_pstats_collector_single_frame_stacks(self):
"""Test PstatsCollector with single-frame call stacks to trigger len(frames) <= 1 branch."""
collector = PstatsCollector(sample_interval_usec=1000)
# Test with exactly one frame (should trigger the <= 1 condition)
single_frame = [
MockInterpreterInfo(
0,
[
MockThreadInfo(
1, [MockFrameInfo("single.py", 10, "single_func")]
)
],
)
]
collector.collect(single_frame)
# Should record the single frame with inline call
self.assertEqual(len(collector.result), 1)
single_key = ("single.py", 10, "single_func")
self.assertIn(single_key, collector.result)
self.assertEqual(collector.result[single_key]["direct_calls"], 1)
self.assertEqual(collector.result[single_key]["cumulative_calls"], 1)
# Test with empty frames (should also trigger <= 1 condition)
empty_frames = [MockInterpreterInfo(0, [MockThreadInfo(1, [])])]
collector.collect(empty_frames)
# Should not add any new entries
self.assertEqual(
len(collector.result), 1
) # Still just the single frame
# Test mixed single and multi-frame stacks
mixed_frames = [
MockInterpreterInfo(
0,
[
MockThreadInfo(
1,
[MockFrameInfo("single2.py", 20, "single_func2")],
), # Single frame
MockThreadInfo(
2,
[ # Multi-frame stack
MockFrameInfo("multi.py", 30, "multi_func1"),
MockFrameInfo("multi.py", 40, "multi_func2"),
],
),
],
),
]
collector.collect(mixed_frames)
# Should have recorded all functions
self.assertEqual(
len(collector.result), 4
) # single + single2 + multi1 + multi2
# Verify single frame handling
single2_key = ("single2.py", 20, "single_func2")
self.assertIn(single2_key, collector.result)
self.assertEqual(collector.result[single2_key]["direct_calls"], 1)
self.assertEqual(collector.result[single2_key]["cumulative_calls"], 1)
# Verify multi-frame handling still works
multi1_key = ("multi.py", 30, "multi_func1")
multi2_key = ("multi.py", 40, "multi_func2")
self.assertIn(multi1_key, collector.result)
self.assertIn(multi2_key, collector.result)
self.assertEqual(collector.result[multi1_key]["direct_calls"], 1)
self.assertEqual(
collector.result[multi2_key]["cumulative_calls"], 1
) # Called from multi1
def test_collapsed_stack_collector_with_empty_and_deep_stacks(self):
"""Test CollapsedStackCollector handles empty frames, single-frame stacks, and very deep call stacks."""
collector = CollapsedStackCollector()
# Test with empty frames
collector.collect([])
self.assertEqual(len(collector.stack_counter), 0)
# Test with single frame stack
test_frames = [
MockInterpreterInfo(
0, [MockThreadInfo(1, [("file.py", 10, "func")])]
)
]
collector.collect(test_frames)
self.assertEqual(len(collector.stack_counter), 1)
(((path, thread_id), count),) = collector.stack_counter.items()
self.assertEqual(path, (("file.py", 10, "func"),))
self.assertEqual(thread_id, 1)
self.assertEqual(count, 1)
# Test with very deep stack
deep_stack = [(f"file{i}.py", i, f"func{i}") for i in range(100)]
test_frames = [MockInterpreterInfo(0, [MockThreadInfo(1, deep_stack)])]
collector = CollapsedStackCollector()
collector.collect(test_frames)
# One aggregated path with 100 frames (reversed)
(((path_tuple, thread_id),),) = (collector.stack_counter.keys(),)
self.assertEqual(len(path_tuple), 100)
self.assertEqual(path_tuple[0], ("file99.py", 99, "func99"))
self.assertEqual(path_tuple[-1], ("file0.py", 0, "func0"))
self.assertEqual(thread_id, 1)
def test_pstats_collector_basic(self):
"""Test basic PstatsCollector functionality."""
collector = PstatsCollector(sample_interval_usec=1000)
# Test empty state
self.assertEqual(len(collector.result), 0)
self.assertEqual(len(collector.stats), 0)
# Test collecting sample data
test_frames = [
MockInterpreterInfo(
0,
[
MockThreadInfo(
1,
[
MockFrameInfo("file.py", 10, "func1"),
MockFrameInfo("file.py", 20, "func2"),
],
)
],
)
]
collector.collect(test_frames)
# Should have recorded calls for both functions
self.assertEqual(len(collector.result), 2)
self.assertIn(("file.py", 10, "func1"), collector.result)
self.assertIn(("file.py", 20, "func2"), collector.result)
# Top-level function should have direct call
self.assertEqual(
collector.result[("file.py", 10, "func1")]["direct_calls"], 1
)
self.assertEqual(
collector.result[("file.py", 10, "func1")]["cumulative_calls"], 1
)
# Calling function should have cumulative call but no direct calls
self.assertEqual(
collector.result[("file.py", 20, "func2")]["cumulative_calls"], 1
)
self.assertEqual(
collector.result[("file.py", 20, "func2")]["direct_calls"], 0
)
def test_pstats_collector_create_stats(self):
"""Test PstatsCollector stats creation."""
collector = PstatsCollector(
sample_interval_usec=1000000
) # 1 second intervals
test_frames = [
MockInterpreterInfo(
0,
[
MockThreadInfo(
1,
[
MockFrameInfo("file.py", 10, "func1"),
MockFrameInfo("file.py", 20, "func2"),
],
)
],
)
]
collector.collect(test_frames)
collector.collect(test_frames) # Collect twice
collector.create_stats()
# Check stats format: (direct_calls, cumulative_calls, tt, ct, callers)
func1_stats = collector.stats[("file.py", 10, "func1")]
self.assertEqual(func1_stats[0], 2) # direct_calls (top of stack)
self.assertEqual(func1_stats[1], 2) # cumulative_calls
self.assertEqual(
func1_stats[2], 2.0
) # tt (total time - 2 samples * 1 sec)
self.assertEqual(func1_stats[3], 2.0) # ct (cumulative time)
func2_stats = collector.stats[("file.py", 20, "func2")]
self.assertEqual(
func2_stats[0], 0
) # direct_calls (never top of stack)
self.assertEqual(
func2_stats[1], 2
) # cumulative_calls (appears in stack)
self.assertEqual(func2_stats[2], 0.0) # tt (no direct calls)
self.assertEqual(func2_stats[3], 2.0) # ct (cumulative time)
def test_collapsed_stack_collector_basic(self):
collector = CollapsedStackCollector()
# Test empty state
self.assertEqual(len(collector.stack_counter), 0)
# Test collecting sample data
test_frames = [
MockInterpreterInfo(
0,
[
MockThreadInfo(
1, [("file.py", 10, "func1"), ("file.py", 20, "func2")]
)
],
)
]
collector.collect(test_frames)
# Should store one reversed path
self.assertEqual(len(collector.stack_counter), 1)
(((path, thread_id), count),) = collector.stack_counter.items()
expected_tree = (("file.py", 20, "func2"), ("file.py", 10, "func1"))
self.assertEqual(path, expected_tree)
self.assertEqual(thread_id, 1)
self.assertEqual(count, 1)
def test_collapsed_stack_collector_export(self):
collapsed_out = tempfile.NamedTemporaryFile(delete=False)
self.addCleanup(close_and_unlink, collapsed_out)
collector = CollapsedStackCollector()
test_frames1 = [
MockInterpreterInfo(
0,
[
MockThreadInfo(
1, [("file.py", 10, "func1"), ("file.py", 20, "func2")]
)
],
)
]
test_frames2 = [
MockInterpreterInfo(
0,
[
MockThreadInfo(
1, [("file.py", 10, "func1"), ("file.py", 20, "func2")]
)
],
)
] # Same stack
test_frames3 = [
MockInterpreterInfo(
0, [MockThreadInfo(1, [("other.py", 5, "other_func")])]
)
]
collector.collect(test_frames1)
collector.collect(test_frames2)
collector.collect(test_frames3)
with captured_stdout(), captured_stderr():
collector.export(collapsed_out.name)
# Check file contents
with open(collapsed_out.name, "r") as f:
content = f.read()
lines = content.strip().split("\n")
self.assertEqual(len(lines), 2) # Two unique stacks
# Check collapsed format: tid:X;file:func:line;file:func:line count
stack1_expected = "tid:1;file.py:func2:20;file.py:func1:10 2"
stack2_expected = "tid:1;other.py:other_func:5 1"
self.assertIn(stack1_expected, lines)
self.assertIn(stack2_expected, lines)
def test_flamegraph_collector_basic(self):
"""Test basic FlamegraphCollector functionality."""
collector = FlamegraphCollector()
# Empty collector should produce 'No Data'
data = collector._convert_to_flamegraph_format()
# With string table, name is now an index - resolve it using the strings array
strings = data.get("strings", [])
name_index = data.get("name", 0)
resolved_name = (
strings[name_index]
if isinstance(name_index, int) and 0 <= name_index < len(strings)
else str(name_index)
)
self.assertIn(resolved_name, ("No Data", "No significant data"))
# Test collecting sample data
test_frames = [
MockInterpreterInfo(
0,
[
MockThreadInfo(
1, [("file.py", 10, "func1"), ("file.py", 20, "func2")]
)
],
)
]
collector.collect(test_frames)
# Convert and verify structure: func2 -> func1 with counts = 1
data = collector._convert_to_flamegraph_format()
# Expect promotion: root is the single child (func2), with func1 as its only child
strings = data.get("strings", [])
name_index = data.get("name", 0)
name = (
strings[name_index]
if isinstance(name_index, int) and 0 <= name_index < len(strings)
else str(name_index)
)
self.assertIsInstance(name, str)
self.assertTrue(name.startswith("Program Root: "))
self.assertIn("func2 (file.py:20)", name) # formatted name
children = data.get("children", [])
self.assertEqual(len(children), 1)
child = children[0]
child_name_index = child.get("name", 0)
child_name = (
strings[child_name_index]
if isinstance(child_name_index, int)
and 0 <= child_name_index < len(strings)
else str(child_name_index)
)
self.assertIn("func1 (file.py:10)", child_name) # formatted name
self.assertEqual(child["value"], 1)
def test_flamegraph_collector_export(self):
"""Test flamegraph HTML export functionality."""
flamegraph_out = tempfile.NamedTemporaryFile(
suffix=".html", delete=False
)
self.addCleanup(close_and_unlink, flamegraph_out)
collector = FlamegraphCollector()
# Create some test data (use Interpreter/Thread objects like runtime)
test_frames1 = [
MockInterpreterInfo(
0,
[
MockThreadInfo(
1, [("file.py", 10, "func1"), ("file.py", 20, "func2")]
)
],
)
]
test_frames2 = [
MockInterpreterInfo(
0,
[
MockThreadInfo(
1, [("file.py", 10, "func1"), ("file.py", 20, "func2")]
)
],
)
] # Same stack
test_frames3 = [
MockInterpreterInfo(
0, [MockThreadInfo(1, [("other.py", 5, "other_func")])]
)
]
collector.collect(test_frames1)
collector.collect(test_frames2)
collector.collect(test_frames3)
# Export flamegraph
with captured_stdout(), captured_stderr():
collector.export(flamegraph_out.name)
# Verify file was created and contains valid data
self.assertTrue(os.path.exists(flamegraph_out.name))
self.assertGreater(os.path.getsize(flamegraph_out.name), 0)
# Check file contains HTML content
with open(flamegraph_out.name, "r", encoding="utf-8") as f:
content = f.read()
# Should be valid HTML
self.assertIn("<!doctype html>", content.lower())
self.assertIn("<html", content)
self.assertIn("Python Performance Flamegraph", content)
self.assertIn("d3-flame-graph", content)
# Should contain the data
self.assertIn('"name":', content)
self.assertIn('"value":', content)
self.assertIn('"children":', content)
def test_gecko_collector_basic(self):
"""Test basic GeckoCollector functionality."""
collector = GeckoCollector()
# Test empty state
self.assertEqual(len(collector.threads), 0)
self.assertEqual(collector.sample_count, 0)
self.assertEqual(len(collector.global_strings), 1) # "(root)"
# Test collecting sample data
test_frames = [
MockInterpreterInfo(
0,
[
MockThreadInfo(
1,
[("file.py", 10, "func1"), ("file.py", 20, "func2")],
)
],
)
]
collector.collect(test_frames)
# Should have recorded one thread and one sample
self.assertEqual(len(collector.threads), 1)
self.assertEqual(collector.sample_count, 1)
self.assertIn(1, collector.threads)
profile_data = collector._build_profile()
# Verify profile structure
self.assertIn("meta", profile_data)
self.assertIn("threads", profile_data)
self.assertIn("shared", profile_data)
# Check shared string table
shared = profile_data["shared"]
self.assertIn("stringArray", shared)
string_array = shared["stringArray"]
self.assertGreater(len(string_array), 0)
# Should contain our functions in the string array
self.assertIn("func1", string_array)
self.assertIn("func2", string_array)
# Check thread data structure
threads = profile_data["threads"]
self.assertEqual(len(threads), 1)
thread_data = threads[0]
# Verify thread structure
self.assertIn("samples", thread_data)
self.assertIn("funcTable", thread_data)
self.assertIn("frameTable", thread_data)
self.assertIn("stackTable", thread_data)
# Verify samples
samples = thread_data["samples"]
self.assertEqual(len(samples["stack"]), 1)
self.assertEqual(len(samples["time"]), 1)
self.assertEqual(samples["length"], 1)
# Verify function table structure and content
func_table = thread_data["funcTable"]
self.assertIn("name", func_table)
self.assertIn("fileName", func_table)
self.assertIn("lineNumber", func_table)
self.assertEqual(func_table["length"], 2) # Should have 2 functions
# Verify actual function content through string array indices
func_names = []
for idx in func_table["name"]:
func_name = (
string_array[idx]
if isinstance(idx, int) and 0 <= idx < len(string_array)
else str(idx)
)
func_names.append(func_name)
self.assertIn("func1", func_names, f"func1 not found in {func_names}")
self.assertIn("func2", func_names, f"func2 not found in {func_names}")
# Verify frame table
frame_table = thread_data["frameTable"]
self.assertEqual(
frame_table["length"], 2
) # Should have frames for both functions
self.assertEqual(len(frame_table["func"]), 2)
# Verify stack structure
stack_table = thread_data["stackTable"]
self.assertGreater(stack_table["length"], 0)
self.assertGreater(len(stack_table["frame"]), 0)
def test_gecko_collector_export(self):
"""Test Gecko profile export functionality."""
gecko_out = tempfile.NamedTemporaryFile(suffix=".json", delete=False)
self.addCleanup(close_and_unlink, gecko_out)
collector = GeckoCollector()
test_frames1 = [
MockInterpreterInfo(
0,
[
MockThreadInfo(
1, [("file.py", 10, "func1"), ("file.py", 20, "func2")]
)
],
)
]
test_frames2 = [
MockInterpreterInfo(
0,
[
MockThreadInfo(
1, [("file.py", 10, "func1"), ("file.py", 20, "func2")]
)
],
)
] # Same stack
test_frames3 = [
MockInterpreterInfo(
0, [MockThreadInfo(1, [("other.py", 5, "other_func")])]
)
]
collector.collect(test_frames1)
collector.collect(test_frames2)
collector.collect(test_frames3)
# Export gecko profile
with captured_stdout(), captured_stderr():
collector.export(gecko_out.name)
# Verify file was created and contains valid data
self.assertTrue(os.path.exists(gecko_out.name))
self.assertGreater(os.path.getsize(gecko_out.name), 0)
# Check file contains valid JSON
with open(gecko_out.name, "r") as f:
profile_data = json.load(f)
# Should be valid Gecko profile format
self.assertIn("meta", profile_data)
self.assertIn("threads", profile_data)
self.assertIn("shared", profile_data)
# Check meta information
self.assertIn("categories", profile_data["meta"])
self.assertIn("interval", profile_data["meta"])
# Check shared string table
self.assertIn("stringArray", profile_data["shared"])
self.assertGreater(len(profile_data["shared"]["stringArray"]), 0)
# Should contain our functions
string_array = profile_data["shared"]["stringArray"]
self.assertIn("func1", string_array)
self.assertIn("func2", string_array)
self.assertIn("other_func", string_array)
def test_gecko_collector_markers(self):
"""Test Gecko profile markers for GIL and CPU state tracking."""
try:
from _remote_debugging import (
THREAD_STATUS_HAS_GIL,
THREAD_STATUS_ON_CPU,
THREAD_STATUS_GIL_REQUESTED,
)
except ImportError:
THREAD_STATUS_HAS_GIL = 1 << 0
THREAD_STATUS_ON_CPU = 1 << 1
THREAD_STATUS_GIL_REQUESTED = 1 << 3
collector = GeckoCollector()
# Status combinations for different thread states
HAS_GIL_ON_CPU = (
THREAD_STATUS_HAS_GIL | THREAD_STATUS_ON_CPU
) # Running Python code
NO_GIL_ON_CPU = THREAD_STATUS_ON_CPU # Running native code
WAITING_FOR_GIL = THREAD_STATUS_GIL_REQUESTED # Waiting for GIL
# Simulate thread state transitions
collector.collect(
[
MockInterpreterInfo(
0,
[
MockThreadInfo(
1,
[("test.py", 10, "python_func")],
status=HAS_GIL_ON_CPU,
)
],
)
]
)
collector.collect(
[
MockInterpreterInfo(
0,
[
MockThreadInfo(
1,
[("test.py", 15, "wait_func")],
status=WAITING_FOR_GIL,
)
],
)
]
)
collector.collect(
[
MockInterpreterInfo(
0,
[
MockThreadInfo(
1,
[("test.py", 20, "python_func2")],
status=HAS_GIL_ON_CPU,
)
],
)
]
)
collector.collect(
[
MockInterpreterInfo(
0,
[
MockThreadInfo(
1,
[("native.c", 100, "native_func")],
status=NO_GIL_ON_CPU,
)
],
)
]
)
profile_data = collector._build_profile()
# Verify we have threads with markers
self.assertIn("threads", profile_data)
self.assertEqual(len(profile_data["threads"]), 1)
thread_data = profile_data["threads"][0]
# Check markers exist
self.assertIn("markers", thread_data)
markers = thread_data["markers"]
# Should have marker arrays
self.assertIn("name", markers)
self.assertIn("startTime", markers)
self.assertIn("endTime", markers)
self.assertIn("category", markers)
self.assertGreater(
markers["length"], 0, "Should have generated markers"
)
# Get marker names from string table
string_array = profile_data["shared"]["stringArray"]
marker_names = [string_array[idx] for idx in markers["name"]]
# Verify we have different marker types
marker_name_set = set(marker_names)
# Should have "Has GIL" markers (when thread had GIL)
self.assertIn(
"Has GIL", marker_name_set, "Should have 'Has GIL' markers"
)
# Should have "No GIL" markers (when thread didn't have GIL)
self.assertIn(
"No GIL", marker_name_set, "Should have 'No GIL' markers"
)
# Should have "On CPU" markers (when thread was on CPU)
self.assertIn(
"On CPU", marker_name_set, "Should have 'On CPU' markers"
)
# Should have "Waiting for GIL" markers (when thread was waiting)
self.assertIn(
"Waiting for GIL",
marker_name_set,
"Should have 'Waiting for GIL' markers",
)
# Verify marker structure
for i in range(markers["length"]):
# All markers should be interval markers (phase = 1)
self.assertEqual(
markers["phase"][i], 1, f"Marker {i} should be interval marker"
)
# All markers should have valid time range
start_time = markers["startTime"][i]
end_time = markers["endTime"][i]
self.assertLessEqual(
start_time,
end_time,
f"Marker {i} should have valid time range",
)
# All markers should have valid category
self.assertGreaterEqual(
markers["category"][i],
0,
f"Marker {i} should have valid category",
)
def test_pstats_collector_export(self):
collector = PstatsCollector(
sample_interval_usec=1000000
) # 1 second intervals
test_frames1 = [
MockInterpreterInfo(
0,
[
MockThreadInfo(
1,
[
MockFrameInfo("file.py", 10, "func1"),
MockFrameInfo("file.py", 20, "func2"),
],
)
],
)
]
test_frames2 = [
MockInterpreterInfo(
0,
[
MockThreadInfo(
1,
[
MockFrameInfo("file.py", 10, "func1"),
MockFrameInfo("file.py", 20, "func2"),
],
)
],
)
] # Same stack
test_frames3 = [
MockInterpreterInfo(
0,
[
MockThreadInfo(
1, [MockFrameInfo("other.py", 5, "other_func")]
)
],
)
]
collector.collect(test_frames1)
collector.collect(test_frames2)
collector.collect(test_frames3)
pstats_out = tempfile.NamedTemporaryFile(
suffix=".pstats", delete=False
)
self.addCleanup(close_and_unlink, pstats_out)
collector.export(pstats_out.name)
# Check file can be loaded with marshal
with open(pstats_out.name, "rb") as f:
stats_data = marshal.load(f)
# Should be a dictionary with the sampled marker
self.assertIsInstance(stats_data, dict)
self.assertIn(("__sampled__",), stats_data)
self.assertTrue(stats_data[("__sampled__",)])
# Should have function data
function_entries = [
k for k in stats_data.keys() if k != ("__sampled__",)
]
self.assertGreater(len(function_entries), 0)
# Check specific function stats format: (cc, nc, tt, ct, callers)
func1_key = ("file.py", 10, "func1")
func2_key = ("file.py", 20, "func2")
other_key = ("other.py", 5, "other_func")
self.assertIn(func1_key, stats_data)
self.assertIn(func2_key, stats_data)
self.assertIn(other_key, stats_data)
# Check func1 stats (should have 2 samples)
func1_stats = stats_data[func1_key]
self.assertEqual(func1_stats[0], 2) # total_calls
self.assertEqual(func1_stats[1], 2) # nc (non-recursive calls)
self.assertEqual(func1_stats[2], 2.0) # tt (total time)
self.assertEqual(func1_stats[3], 2.0) # ct (cumulative time)

View file

@ -0,0 +1,804 @@
"""Tests for sampling profiler integration and error handling."""
import contextlib
import io
import marshal
import os
import shutil
import subprocess
import sys
import tempfile
import unittest
from unittest import mock
try:
import _remote_debugging
import profiling.sampling
import profiling.sampling.sample
from profiling.sampling.pstats_collector import PstatsCollector
from profiling.sampling.stack_collector import CollapsedStackCollector
from profiling.sampling.sample import SampleProfiler
except ImportError:
raise unittest.SkipTest(
"Test only runs when _remote_debugging is available"
)
from test.support import (
requires_subprocess,
captured_stdout,
captured_stderr,
)
from .helpers import (
test_subprocess,
close_and_unlink,
skip_if_not_supported,
PROCESS_VM_READV_SUPPORTED,
)
from .mocks import MockFrameInfo, MockThreadInfo, MockInterpreterInfo
@skip_if_not_supported
@unittest.skipIf(
sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
"Test only runs on Linux with process_vm_readv support",
)
class TestRecursiveFunctionProfiling(unittest.TestCase):
"""Test profiling of recursive functions and complex call patterns."""
def test_recursive_function_call_counting(self):
"""Test that recursive function calls are counted correctly."""
collector = PstatsCollector(sample_interval_usec=1000)
# Simulate a recursive call pattern: fibonacci(5) calling itself
recursive_frames = [
MockInterpreterInfo(
0,
[
MockThreadInfo(
1,
[ # First sample: deep in recursion
MockFrameInfo("fib.py", 10, "fibonacci"),
MockFrameInfo(
"fib.py", 10, "fibonacci"
), # recursive call
MockFrameInfo(
"fib.py", 10, "fibonacci"
), # deeper recursion
MockFrameInfo(
"fib.py", 10, "fibonacci"
), # even deeper
MockFrameInfo("main.py", 5, "main"), # main caller
],
)
],
),
MockInterpreterInfo(
0,
[
MockThreadInfo(
1,
[ # Second sample: different recursion depth
MockFrameInfo("fib.py", 10, "fibonacci"),
MockFrameInfo(
"fib.py", 10, "fibonacci"
), # recursive call
MockFrameInfo("main.py", 5, "main"), # main caller
],
)
],
),
MockInterpreterInfo(
0,
[
MockThreadInfo(
1,
[ # Third sample: back to deeper recursion
MockFrameInfo("fib.py", 10, "fibonacci"),
MockFrameInfo("fib.py", 10, "fibonacci"),
MockFrameInfo("fib.py", 10, "fibonacci"),
MockFrameInfo("main.py", 5, "main"),
],
)
],
),
]
for frames in recursive_frames:
collector.collect([frames])
collector.create_stats()
# Check that recursive calls are counted properly
fib_key = ("fib.py", 10, "fibonacci")
main_key = ("main.py", 5, "main")
self.assertIn(fib_key, collector.stats)
self.assertIn(main_key, collector.stats)
# Fibonacci should have many calls due to recursion
fib_stats = collector.stats[fib_key]
direct_calls, cumulative_calls, tt, ct, callers = fib_stats
# Should have recorded multiple calls (9 total appearances in samples)
self.assertEqual(cumulative_calls, 9)
self.assertGreater(tt, 0) # Should have some total time
self.assertGreater(ct, 0) # Should have some cumulative time
# Main should have fewer calls
main_stats = collector.stats[main_key]
main_direct_calls, main_cumulative_calls = main_stats[0], main_stats[1]
self.assertEqual(main_direct_calls, 0) # Never directly executing
self.assertEqual(main_cumulative_calls, 3) # Appears in all 3 samples
def test_nested_function_hierarchy(self):
"""Test profiling of deeply nested function calls."""
collector = PstatsCollector(sample_interval_usec=1000)
# Simulate a deep call hierarchy
deep_call_frames = [
MockInterpreterInfo(
0,
[
MockThreadInfo(
1,
[
MockFrameInfo("level1.py", 10, "level1_func"),
MockFrameInfo("level2.py", 20, "level2_func"),
MockFrameInfo("level3.py", 30, "level3_func"),
MockFrameInfo("level4.py", 40, "level4_func"),
MockFrameInfo("level5.py", 50, "level5_func"),
MockFrameInfo("main.py", 5, "main"),
],
)
],
),
MockInterpreterInfo(
0,
[
MockThreadInfo(
1,
[ # Same hierarchy sampled again
MockFrameInfo("level1.py", 10, "level1_func"),
MockFrameInfo("level2.py", 20, "level2_func"),
MockFrameInfo("level3.py", 30, "level3_func"),
MockFrameInfo("level4.py", 40, "level4_func"),
MockFrameInfo("level5.py", 50, "level5_func"),
MockFrameInfo("main.py", 5, "main"),
],
)
],
),
]
for frames in deep_call_frames:
collector.collect([frames])
collector.create_stats()
# All levels should be recorded
for level in range(1, 6):
key = (f"level{level}.py", level * 10, f"level{level}_func")
self.assertIn(key, collector.stats)
stats = collector.stats[key]
direct_calls, cumulative_calls, tt, ct, callers = stats
# Each level should appear in stack twice (2 samples)
self.assertEqual(cumulative_calls, 2)
# Only level1 (deepest) should have direct calls
if level == 1:
self.assertEqual(direct_calls, 2)
else:
self.assertEqual(direct_calls, 0)
# Deeper levels should have lower cumulative time than higher levels
# (since they don't include time from functions they call)
if level == 1: # Deepest level with most time
self.assertGreater(ct, 0)
def test_alternating_call_patterns(self):
"""Test profiling with alternating call patterns."""
collector = PstatsCollector(sample_interval_usec=1000)
# Simulate alternating execution paths
pattern_frames = [
# Pattern A: path through func_a
MockInterpreterInfo(
0,
[
MockThreadInfo(
1,
[
MockFrameInfo("module.py", 10, "func_a"),
MockFrameInfo("module.py", 30, "shared_func"),
MockFrameInfo("main.py", 5, "main"),
],
)
],
),
# Pattern B: path through func_b
MockInterpreterInfo(
0,
[
MockThreadInfo(
1,
[
MockFrameInfo("module.py", 20, "func_b"),
MockFrameInfo("module.py", 30, "shared_func"),
MockFrameInfo("main.py", 5, "main"),
],
)
],
),
# Pattern A again
MockInterpreterInfo(
0,
[
MockThreadInfo(
1,
[
MockFrameInfo("module.py", 10, "func_a"),
MockFrameInfo("module.py", 30, "shared_func"),
MockFrameInfo("main.py", 5, "main"),
],
)
],
),
# Pattern B again
MockInterpreterInfo(
0,
[
MockThreadInfo(
1,
[
MockFrameInfo("module.py", 20, "func_b"),
MockFrameInfo("module.py", 30, "shared_func"),
MockFrameInfo("main.py", 5, "main"),
],
)
],
),
]
for frames in pattern_frames:
collector.collect([frames])
collector.create_stats()
# Check that both paths are recorded equally
func_a_key = ("module.py", 10, "func_a")
func_b_key = ("module.py", 20, "func_b")
shared_key = ("module.py", 30, "shared_func")
main_key = ("main.py", 5, "main")
# func_a and func_b should each be directly executing twice
self.assertEqual(collector.stats[func_a_key][0], 2) # direct_calls
self.assertEqual(collector.stats[func_a_key][1], 2) # cumulative_calls
self.assertEqual(collector.stats[func_b_key][0], 2) # direct_calls
self.assertEqual(collector.stats[func_b_key][1], 2) # cumulative_calls
# shared_func should appear in all samples (4 times) but never directly executing
self.assertEqual(collector.stats[shared_key][0], 0) # direct_calls
self.assertEqual(collector.stats[shared_key][1], 4) # cumulative_calls
# main should appear in all samples but never directly executing
self.assertEqual(collector.stats[main_key][0], 0) # direct_calls
self.assertEqual(collector.stats[main_key][1], 4) # cumulative_calls
def test_collapsed_stack_with_recursion(self):
"""Test collapsed stack collector with recursive patterns."""
collector = CollapsedStackCollector()
# Recursive call pattern
recursive_frames = [
MockInterpreterInfo(
0,
[
MockThreadInfo(
1,
[
("factorial.py", 10, "factorial"),
("factorial.py", 10, "factorial"), # recursive
("factorial.py", 10, "factorial"), # deeper
("main.py", 5, "main"),
],
)
],
),
MockInterpreterInfo(
0,
[
MockThreadInfo(
1,
[
("factorial.py", 10, "factorial"),
(
"factorial.py",
10,
"factorial",
), # different depth
("main.py", 5, "main"),
],
)
],
),
]
for frames in recursive_frames:
collector.collect([frames])
# Should capture both call paths
self.assertEqual(len(collector.stack_counter), 2)
# First path should be longer (deeper recursion) than the second
path_tuples = list(collector.stack_counter.keys())
paths = [p[0] for p in path_tuples] # Extract just the call paths
lengths = [len(p) for p in paths]
self.assertNotEqual(lengths[0], lengths[1])
# Both should contain factorial calls
self.assertTrue(
any(any(f[2] == "factorial" for f in p) for p in paths)
)
# Verify total occurrences via aggregation
factorial_key = ("factorial.py", 10, "factorial")
main_key = ("main.py", 5, "main")
def total_occurrences(func):
total = 0
for (path, thread_id), count in collector.stack_counter.items():
total += sum(1 for f in path if f == func) * count
return total
self.assertEqual(total_occurrences(factorial_key), 5)
self.assertEqual(total_occurrences(main_key), 2)
@requires_subprocess()
@skip_if_not_supported
class TestSampleProfilerIntegration(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.test_script = '''
import time
import os
def slow_fibonacci(n):
"""Recursive fibonacci - should show up prominently in profiler."""
if n <= 1:
return n
return slow_fibonacci(n-1) + slow_fibonacci(n-2)
def cpu_intensive_work():
"""CPU intensive work that should show in profiler."""
result = 0
for i in range(10000):
result += i * i
if i % 100 == 0:
result = result % 1000000
return result
def medium_computation():
"""Medium complexity function."""
result = 0
for i in range(100):
result += i * i
return result
def fast_loop():
"""Fast simple loop."""
total = 0
for i in range(50):
total += i
return total
def nested_calls():
"""Test nested function calls."""
def level1():
def level2():
return medium_computation()
return level2()
return level1()
def main_loop():
"""Main test loop with different execution paths."""
iteration = 0
while True:
iteration += 1
# Different execution paths - focus on CPU intensive work
if iteration % 3 == 0:
# Very CPU intensive
result = cpu_intensive_work()
elif iteration % 2 == 0:
# Expensive recursive operation (increased frequency for slower machines)
result = slow_fibonacci(12)
else:
# Medium operation
result = nested_calls()
# No sleep - keep CPU busy
if __name__ == "__main__":
main_loop()
'''
def test_sampling_basic_functionality(self):
with (
test_subprocess(self.test_script) as subproc,
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
profiling.sampling.sample.sample(
subproc.process.pid,
duration_sec=2,
sample_interval_usec=1000, # 1ms
show_summary=False,
)
except PermissionError:
self.skipTest("Insufficient permissions for remote profiling")
output = captured_output.getvalue()
# Basic checks on output
self.assertIn("Captured", output)
self.assertIn("samples", output)
self.assertIn("Profile Stats", output)
# Should see some of our test functions
self.assertIn("slow_fibonacci", output)
def test_sampling_with_pstats_export(self):
pstats_out = tempfile.NamedTemporaryFile(
suffix=".pstats", delete=False
)
self.addCleanup(close_and_unlink, pstats_out)
with test_subprocess(self.test_script) as subproc:
# Suppress profiler output when testing file export
with (
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
profiling.sampling.sample.sample(
subproc.process.pid,
duration_sec=1,
filename=pstats_out.name,
sample_interval_usec=10000,
)
except PermissionError:
self.skipTest(
"Insufficient permissions for remote profiling"
)
# Verify file was created and contains valid data
self.assertTrue(os.path.exists(pstats_out.name))
self.assertGreater(os.path.getsize(pstats_out.name), 0)
# Try to load the stats file
with open(pstats_out.name, "rb") as f:
stats_data = marshal.load(f)
# Should be a dictionary with the sampled marker
self.assertIsInstance(stats_data, dict)
self.assertIn(("__sampled__",), stats_data)
self.assertTrue(stats_data[("__sampled__",)])
# Should have some function data
function_entries = [
k for k in stats_data.keys() if k != ("__sampled__",)
]
self.assertGreater(len(function_entries), 0)
def test_sampling_with_collapsed_export(self):
collapsed_file = tempfile.NamedTemporaryFile(
suffix=".txt", delete=False
)
self.addCleanup(close_and_unlink, collapsed_file)
with (
test_subprocess(self.test_script) as subproc,
):
# Suppress profiler output when testing file export
with (
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
profiling.sampling.sample.sample(
subproc.process.pid,
duration_sec=1,
filename=collapsed_file.name,
output_format="collapsed",
sample_interval_usec=10000,
)
except PermissionError:
self.skipTest(
"Insufficient permissions for remote profiling"
)
# Verify file was created and contains valid data
self.assertTrue(os.path.exists(collapsed_file.name))
self.assertGreater(os.path.getsize(collapsed_file.name), 0)
# Check file format
with open(collapsed_file.name, "r") as f:
content = f.read()
lines = content.strip().split("\n")
self.assertGreater(len(lines), 0)
# Each line should have format: stack_trace count
for line in lines:
parts = line.rsplit(" ", 1)
self.assertEqual(len(parts), 2)
stack_trace, count_str = parts
self.assertGreater(len(stack_trace), 0)
self.assertTrue(count_str.isdigit())
self.assertGreater(int(count_str), 0)
# Stack trace should contain semicolon-separated entries
if ";" in stack_trace:
stack_parts = stack_trace.split(";")
for part in stack_parts:
# Each part should be file:function:line
self.assertIn(":", part)
def test_sampling_all_threads(self):
with (
test_subprocess(self.test_script) as subproc,
# Suppress profiler output
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
profiling.sampling.sample.sample(
subproc.process.pid,
duration_sec=1,
all_threads=True,
sample_interval_usec=10000,
show_summary=False,
)
except PermissionError:
self.skipTest("Insufficient permissions for remote profiling")
# Just verify that sampling completed without error
# We're not testing output format here
def test_sample_target_script(self):
script_file = tempfile.NamedTemporaryFile(delete=False)
script_file.write(self.test_script.encode("utf-8"))
script_file.flush()
self.addCleanup(close_and_unlink, script_file)
test_args = ["profiling.sampling.sample", "-d", "1", script_file.name]
with (
mock.patch("sys.argv", test_args),
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
profiling.sampling.sample.main()
except PermissionError:
self.skipTest("Insufficient permissions for remote profiling")
output = captured_output.getvalue()
# Basic checks on output
self.assertIn("Captured", output)
self.assertIn("samples", output)
self.assertIn("Profile Stats", output)
# Should see some of our test functions
self.assertIn("slow_fibonacci", output)
def test_sample_target_module(self):
tempdir = tempfile.TemporaryDirectory(delete=False)
self.addCleanup(lambda x: shutil.rmtree(x), tempdir.name)
module_path = os.path.join(tempdir.name, "test_module.py")
with open(module_path, "w") as f:
f.write(self.test_script)
test_args = [
"profiling.sampling.sample",
"-d",
"1",
"-m",
"test_module",
]
with (
mock.patch("sys.argv", test_args),
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
# Change to temp directory so subprocess can find the module
contextlib.chdir(tempdir.name),
):
try:
profiling.sampling.sample.main()
except PermissionError:
self.skipTest("Insufficient permissions for remote profiling")
output = captured_output.getvalue()
# Basic checks on output
self.assertIn("Captured", output)
self.assertIn("samples", output)
self.assertIn("Profile Stats", output)
# Should see some of our test functions
self.assertIn("slow_fibonacci", output)
@skip_if_not_supported
@unittest.skipIf(
sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
"Test only runs on Linux with process_vm_readv support",
)
class TestSampleProfilerErrorHandling(unittest.TestCase):
def test_invalid_pid(self):
with self.assertRaises((OSError, RuntimeError)):
profiling.sampling.sample.sample(-1, duration_sec=1)
def test_process_dies_during_sampling(self):
with test_subprocess(
"import time; time.sleep(0.5); exit()"
) as subproc:
with (
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
profiling.sampling.sample.sample(
subproc.process.pid,
duration_sec=2, # Longer than process lifetime
sample_interval_usec=50000,
)
except PermissionError:
self.skipTest(
"Insufficient permissions for remote profiling"
)
output = captured_output.getvalue()
self.assertIn("Error rate", output)
def test_invalid_output_format(self):
with self.assertRaises(ValueError):
profiling.sampling.sample.sample(
os.getpid(),
duration_sec=1,
output_format="invalid_format",
)
def test_invalid_output_format_with_mocked_profiler(self):
"""Test invalid output format with proper mocking to avoid permission issues."""
with mock.patch(
"profiling.sampling.sample.SampleProfiler"
) as mock_profiler_class:
mock_profiler = mock.MagicMock()
mock_profiler_class.return_value = mock_profiler
with self.assertRaises(ValueError) as cm:
profiling.sampling.sample.sample(
12345,
duration_sec=1,
output_format="unknown_format",
)
# Should raise ValueError with the invalid format name
self.assertIn(
"Invalid output format: unknown_format", str(cm.exception)
)
def test_is_process_running(self):
with test_subprocess("import time; time.sleep(1000)") as subproc:
try:
profiler = SampleProfiler(
pid=subproc.process.pid,
sample_interval_usec=1000,
all_threads=False,
)
except PermissionError:
self.skipTest(
"Insufficient permissions to read the stack trace"
)
self.assertTrue(profiler._is_process_running())
self.assertIsNotNone(profiler.unwinder.get_stack_trace())
subproc.process.kill()
subproc.process.wait()
self.assertRaises(
ProcessLookupError, profiler.unwinder.get_stack_trace
)
# Exit the context manager to ensure the process is terminated
self.assertFalse(profiler._is_process_running())
self.assertRaises(
ProcessLookupError, profiler.unwinder.get_stack_trace
)
@unittest.skipUnless(sys.platform == "linux", "Only valid on Linux")
def test_esrch_signal_handling(self):
with test_subprocess("import time; time.sleep(1000)") as subproc:
try:
unwinder = _remote_debugging.RemoteUnwinder(
subproc.process.pid
)
except PermissionError:
self.skipTest(
"Insufficient permissions to read the stack trace"
)
initial_trace = unwinder.get_stack_trace()
self.assertIsNotNone(initial_trace)
subproc.process.kill()
# Wait for the process to die and try to get another trace
subproc.process.wait()
with self.assertRaises(ProcessLookupError):
unwinder.get_stack_trace()
def test_valid_output_formats(self):
"""Test that all valid output formats are accepted."""
valid_formats = ["pstats", "collapsed", "flamegraph", "gecko"]
tempdir = tempfile.TemporaryDirectory(delete=False)
self.addCleanup(shutil.rmtree, tempdir.name)
with (
contextlib.chdir(tempdir.name),
captured_stdout(),
captured_stderr(),
):
for fmt in valid_formats:
try:
# This will likely fail with permissions, but the format should be valid
profiling.sampling.sample.sample(
os.getpid(),
duration_sec=0.1,
output_format=fmt,
filename=f"test_{fmt}.out",
)
except (OSError, RuntimeError, PermissionError):
# Expected errors - we just want to test format validation
pass
def test_script_error_treatment(self):
script_file = tempfile.NamedTemporaryFile(
"w", delete=False, suffix=".py"
)
script_file.write("open('nonexistent_file.txt')\n")
script_file.close()
self.addCleanup(os.unlink, script_file.name)
result = subprocess.run(
[
sys.executable,
"-m",
"profiling.sampling.sample",
"-d",
"1",
script_file.name,
],
capture_output=True,
text=True,
)
output = result.stdout + result.stderr
if "PermissionError" in output:
self.skipTest("Insufficient permissions for remote profiling")
self.assertNotIn("Script file not found", output)
self.assertIn(
"No such file or directory: 'nonexistent_file.txt'", output
)

View file

@ -0,0 +1,514 @@
"""Tests for sampling profiler mode filtering (CPU and GIL modes)."""
import io
import unittest
from unittest import mock
try:
import _remote_debugging # noqa: F401
import profiling.sampling
import profiling.sampling.sample
from profiling.sampling.pstats_collector import PstatsCollector
except ImportError:
raise unittest.SkipTest(
"Test only runs when _remote_debugging is available"
)
from test.support import requires_subprocess
from test.support import captured_stdout, captured_stderr
from .helpers import test_subprocess
from .mocks import MockFrameInfo, MockInterpreterInfo
class TestCpuModeFiltering(unittest.TestCase):
"""Test CPU mode filtering functionality (--mode=cpu)."""
def test_mode_validation(self):
"""Test that CLI validates mode choices correctly."""
# Invalid mode choice should raise SystemExit
test_args = [
"profiling.sampling.sample",
"--mode",
"invalid",
"-p",
"12345",
]
with (
mock.patch("sys.argv", test_args),
mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
self.assertRaises(SystemExit) as cm,
):
profiling.sampling.sample.main()
self.assertEqual(cm.exception.code, 2) # argparse error
error_msg = mock_stderr.getvalue()
self.assertIn("invalid choice", error_msg)
def test_frames_filtered_with_skip_idle(self):
"""Test that frames are actually filtered when skip_idle=True."""
# Import thread status flags
try:
from _remote_debugging import (
THREAD_STATUS_HAS_GIL,
THREAD_STATUS_ON_CPU,
)
except ImportError:
THREAD_STATUS_HAS_GIL = 1 << 0
THREAD_STATUS_ON_CPU = 1 << 1
# Create mock frames with different thread statuses
class MockThreadInfoWithStatus:
def __init__(self, thread_id, frame_info, status):
self.thread_id = thread_id
self.frame_info = frame_info
self.status = status
# Create test data: active thread (HAS_GIL | ON_CPU), idle thread (neither), and another active thread
ACTIVE_STATUS = (
THREAD_STATUS_HAS_GIL | THREAD_STATUS_ON_CPU
) # Has GIL and on CPU
IDLE_STATUS = 0 # Neither has GIL nor on CPU
test_frames = [
MockInterpreterInfo(
0,
[
MockThreadInfoWithStatus(
1,
[MockFrameInfo("active1.py", 10, "active_func1")],
ACTIVE_STATUS,
),
MockThreadInfoWithStatus(
2,
[MockFrameInfo("idle.py", 20, "idle_func")],
IDLE_STATUS,
),
MockThreadInfoWithStatus(
3,
[MockFrameInfo("active2.py", 30, "active_func2")],
ACTIVE_STATUS,
),
],
)
]
# Test with skip_idle=True - should only process running threads
collector_skip = PstatsCollector(
sample_interval_usec=1000, skip_idle=True
)
collector_skip.collect(test_frames)
# Should only have functions from running threads (status 0)
active1_key = ("active1.py", 10, "active_func1")
active2_key = ("active2.py", 30, "active_func2")
idle_key = ("idle.py", 20, "idle_func")
self.assertIn(active1_key, collector_skip.result)
self.assertIn(active2_key, collector_skip.result)
self.assertNotIn(
idle_key, collector_skip.result
) # Idle thread should be filtered out
# Test with skip_idle=False - should process all threads
collector_no_skip = PstatsCollector(
sample_interval_usec=1000, skip_idle=False
)
collector_no_skip.collect(test_frames)
# Should have functions from all threads
self.assertIn(active1_key, collector_no_skip.result)
self.assertIn(active2_key, collector_no_skip.result)
self.assertIn(
idle_key, collector_no_skip.result
) # Idle thread should be included
@requires_subprocess()
def test_cpu_mode_integration_filtering(self):
"""Integration test: CPU mode should only capture active threads, not idle ones."""
# Script with one mostly-idle thread and one CPU-active thread
cpu_vs_idle_script = """
import time
import threading
cpu_ready = threading.Event()
def idle_worker():
time.sleep(999999)
def cpu_active_worker():
cpu_ready.set()
x = 1
while True:
x += 1
def main():
# Start both threads
idle_thread = threading.Thread(target=idle_worker)
cpu_thread = threading.Thread(target=cpu_active_worker)
idle_thread.start()
cpu_thread.start()
# Wait for CPU thread to be running, then signal test
cpu_ready.wait()
_test_sock.sendall(b"threads_ready")
idle_thread.join()
cpu_thread.join()
main()
"""
with test_subprocess(cpu_vs_idle_script) as subproc:
# Wait for signal that threads are running
response = subproc.socket.recv(1024)
self.assertEqual(response, b"threads_ready")
with (
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
profiling.sampling.sample.sample(
subproc.process.pid,
duration_sec=2.0,
sample_interval_usec=5000,
mode=1, # CPU mode
show_summary=False,
all_threads=True,
)
except (PermissionError, RuntimeError) as e:
self.skipTest(
"Insufficient permissions for remote profiling"
)
cpu_mode_output = captured_output.getvalue()
# Test wall-clock mode (mode=0) - should capture both functions
with (
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
profiling.sampling.sample.sample(
subproc.process.pid,
duration_sec=2.0,
sample_interval_usec=5000,
mode=0, # Wall-clock mode
show_summary=False,
all_threads=True,
)
except (PermissionError, RuntimeError) as e:
self.skipTest(
"Insufficient permissions for remote profiling"
)
wall_mode_output = captured_output.getvalue()
# Verify both modes captured samples
self.assertIn("Captured", cpu_mode_output)
self.assertIn("samples", cpu_mode_output)
self.assertIn("Captured", wall_mode_output)
self.assertIn("samples", wall_mode_output)
# CPU mode should strongly favor cpu_active_worker over mostly_idle_worker
self.assertIn("cpu_active_worker", cpu_mode_output)
self.assertNotIn("idle_worker", cpu_mode_output)
# Wall-clock mode should capture both types of work
self.assertIn("cpu_active_worker", wall_mode_output)
self.assertIn("idle_worker", wall_mode_output)
def test_cpu_mode_with_no_samples(self):
"""Test that CPU mode handles no samples gracefully when no samples are collected."""
# Mock a collector that returns empty stats
mock_collector = mock.MagicMock()
mock_collector.stats = {}
mock_collector.create_stats = mock.MagicMock()
with (
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
mock.patch(
"profiling.sampling.sample.PstatsCollector",
return_value=mock_collector,
),
mock.patch(
"profiling.sampling.sample.SampleProfiler"
) as mock_profiler_class,
):
mock_profiler = mock.MagicMock()
mock_profiler_class.return_value = mock_profiler
profiling.sampling.sample.sample(
12345, # dummy PID
duration_sec=0.5,
sample_interval_usec=5000,
mode=1, # CPU mode
show_summary=False,
all_threads=True,
)
output = captured_output.getvalue()
# Should see the "No samples were collected" message
self.assertIn("No samples were collected", output)
self.assertIn("CPU mode", output)
class TestGilModeFiltering(unittest.TestCase):
"""Test GIL mode filtering functionality (--mode=gil)."""
def test_gil_mode_validation(self):
"""Test that CLI accepts gil mode choice correctly."""
test_args = [
"profiling.sampling.sample",
"--mode",
"gil",
"-p",
"12345",
]
with (
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.sample.sample") as mock_sample,
):
try:
profiling.sampling.sample.main()
except SystemExit:
pass # Expected due to invalid PID
# Should have attempted to call sample with mode=2 (GIL mode)
mock_sample.assert_called_once()
call_args = mock_sample.call_args[1]
self.assertEqual(call_args["mode"], 2) # PROFILING_MODE_GIL
def test_gil_mode_sample_function_call(self):
"""Test that sample() function correctly uses GIL mode."""
with (
mock.patch(
"profiling.sampling.sample.SampleProfiler"
) as mock_profiler,
mock.patch(
"profiling.sampling.sample.PstatsCollector"
) as mock_collector,
):
# Mock the profiler instance
mock_instance = mock.Mock()
mock_profiler.return_value = mock_instance
# Mock the collector instance
mock_collector_instance = mock.Mock()
mock_collector.return_value = mock_collector_instance
# Call sample with GIL mode and a filename to avoid pstats creation
profiling.sampling.sample.sample(
12345,
mode=2, # PROFILING_MODE_GIL
duration_sec=1,
sample_interval_usec=1000,
filename="test_output.txt",
)
# Verify SampleProfiler was created with correct mode
mock_profiler.assert_called_once()
call_args = mock_profiler.call_args
self.assertEqual(call_args[1]["mode"], 2) # mode parameter
# Verify profiler.sample was called
mock_instance.sample.assert_called_once()
# Verify collector.export was called since we provided a filename
mock_collector_instance.export.assert_called_once_with(
"test_output.txt"
)
def test_gil_mode_collector_configuration(self):
"""Test that collectors are configured correctly for GIL mode."""
with (
mock.patch(
"profiling.sampling.sample.SampleProfiler"
) as mock_profiler,
mock.patch(
"profiling.sampling.sample.PstatsCollector"
) as mock_collector,
captured_stdout(),
captured_stderr(),
):
# Mock the profiler instance
mock_instance = mock.Mock()
mock_profiler.return_value = mock_instance
# Call sample with GIL mode
profiling.sampling.sample.sample(
12345,
mode=2, # PROFILING_MODE_GIL
output_format="pstats",
)
# Verify collector was created with skip_idle=True (since mode != WALL)
mock_collector.assert_called_once()
call_args = mock_collector.call_args[1]
self.assertTrue(call_args["skip_idle"])
def test_gil_mode_with_collapsed_format(self):
"""Test GIL mode with collapsed stack format."""
with (
mock.patch(
"profiling.sampling.sample.SampleProfiler"
) as mock_profiler,
mock.patch(
"profiling.sampling.sample.CollapsedStackCollector"
) as mock_collector,
):
# Mock the profiler instance
mock_instance = mock.Mock()
mock_profiler.return_value = mock_instance
# Call sample with GIL mode and collapsed format
profiling.sampling.sample.sample(
12345,
mode=2, # PROFILING_MODE_GIL
output_format="collapsed",
filename="test_output.txt",
)
# Verify collector was created with skip_idle=True
mock_collector.assert_called_once()
call_args = mock_collector.call_args[1]
self.assertTrue(call_args["skip_idle"])
def test_gil_mode_cli_argument_parsing(self):
"""Test CLI argument parsing for GIL mode with various options."""
test_args = [
"profiling.sampling.sample",
"--mode",
"gil",
"--interval",
"500",
"--duration",
"5",
"-p",
"12345",
]
with (
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.sample.sample") as mock_sample,
):
try:
profiling.sampling.sample.main()
except SystemExit:
pass # Expected due to invalid PID
# Verify all arguments were parsed correctly
mock_sample.assert_called_once()
call_args = mock_sample.call_args[1]
self.assertEqual(call_args["mode"], 2) # GIL mode
self.assertEqual(call_args["sample_interval_usec"], 500)
self.assertEqual(call_args["duration_sec"], 5)
@requires_subprocess()
def test_gil_mode_integration_behavior(self):
"""Integration test: GIL mode should capture GIL-holding threads."""
# Create a test script with GIL-releasing operations
gil_test_script = """
import time
import threading
gil_ready = threading.Event()
def gil_releasing_work():
time.sleep(999999)
def gil_holding_work():
gil_ready.set()
x = 1
while True:
x += 1
def main():
# Start both threads
idle_thread = threading.Thread(target=gil_releasing_work)
cpu_thread = threading.Thread(target=gil_holding_work)
idle_thread.start()
cpu_thread.start()
# Wait for GIL-holding thread to be running, then signal test
gil_ready.wait()
_test_sock.sendall(b"threads_ready")
idle_thread.join()
cpu_thread.join()
main()
"""
with test_subprocess(gil_test_script) as subproc:
# Wait for signal that threads are running
response = subproc.socket.recv(1024)
self.assertEqual(response, b"threads_ready")
with (
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
profiling.sampling.sample.sample(
subproc.process.pid,
duration_sec=2.0,
sample_interval_usec=5000,
mode=2, # GIL mode
show_summary=False,
all_threads=True,
)
except (PermissionError, RuntimeError) as e:
self.skipTest(
"Insufficient permissions for remote profiling"
)
gil_mode_output = captured_output.getvalue()
# Test wall-clock mode for comparison
with (
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
profiling.sampling.sample.sample(
subproc.process.pid,
duration_sec=0.5,
sample_interval_usec=5000,
mode=0, # Wall-clock mode
show_summary=False,
all_threads=True,
)
except (PermissionError, RuntimeError) as e:
self.skipTest(
"Insufficient permissions for remote profiling"
)
wall_mode_output = captured_output.getvalue()
# GIL mode should primarily capture GIL-holding work
# (Note: actual behavior depends on threading implementation)
self.assertIn("gil_holding_work", gil_mode_output)
# Wall-clock mode should capture both types of work
self.assertIn("gil_holding_work", wall_mode_output)
def test_mode_constants_are_defined(self):
"""Test that all profiling mode constants are properly defined."""
self.assertEqual(profiling.sampling.sample.PROFILING_MODE_WALL, 0)
self.assertEqual(profiling.sampling.sample.PROFILING_MODE_CPU, 1)
self.assertEqual(profiling.sampling.sample.PROFILING_MODE_GIL, 2)
def test_parse_mode_function(self):
"""Test the _parse_mode function with all valid modes."""
self.assertEqual(profiling.sampling.sample._parse_mode("wall"), 0)
self.assertEqual(profiling.sampling.sample._parse_mode("cpu"), 1)
self.assertEqual(profiling.sampling.sample._parse_mode("gil"), 2)
# Test invalid mode raises KeyError
with self.assertRaises(KeyError):
profiling.sampling.sample._parse_mode("invalid")

View file

@ -0,0 +1,656 @@
"""Tests for sampling profiler core functionality."""
import io
from unittest import mock
import unittest
try:
import _remote_debugging # noqa: F401
from profiling.sampling.sample import SampleProfiler, print_sampled_stats
from profiling.sampling.pstats_collector import PstatsCollector
except ImportError:
raise unittest.SkipTest(
"Test only runs when _remote_debugging is available"
)
from test.support import force_not_colorized_test_class
class TestSampleProfiler(unittest.TestCase):
"""Test the SampleProfiler class."""
def test_sample_profiler_initialization(self):
"""Test SampleProfiler initialization with various parameters."""
# Mock RemoteUnwinder to avoid permission issues
with mock.patch(
"_remote_debugging.RemoteUnwinder"
) as mock_unwinder_class:
mock_unwinder_class.return_value = mock.MagicMock()
# Test basic initialization
profiler = SampleProfiler(
pid=12345, sample_interval_usec=1000, all_threads=False
)
self.assertEqual(profiler.pid, 12345)
self.assertEqual(profiler.sample_interval_usec, 1000)
self.assertEqual(profiler.all_threads, False)
# Test with all_threads=True
profiler = SampleProfiler(
pid=54321, sample_interval_usec=5000, all_threads=True
)
self.assertEqual(profiler.pid, 54321)
self.assertEqual(profiler.sample_interval_usec, 5000)
self.assertEqual(profiler.all_threads, True)
def test_sample_profiler_sample_method_timing(self):
"""Test that the sample method respects duration and handles timing correctly."""
# Mock the unwinder to avoid needing a real process
mock_unwinder = mock.MagicMock()
mock_unwinder.get_stack_trace.return_value = [
(
1,
[
mock.MagicMock(
filename="test.py", lineno=10, funcname="test_func"
)
],
)
]
with mock.patch(
"_remote_debugging.RemoteUnwinder"
) as mock_unwinder_class:
mock_unwinder_class.return_value = mock_unwinder
profiler = SampleProfiler(
pid=12345, sample_interval_usec=100000, all_threads=False
) # 100ms interval
# Mock collector
mock_collector = mock.MagicMock()
# Mock time to control the sampling loop
start_time = 1000.0
times = [
start_time + i * 0.1 for i in range(12)
] # 0, 0.1, 0.2, ..., 1.1 seconds
with mock.patch("time.perf_counter", side_effect=times):
with io.StringIO() as output:
with mock.patch("sys.stdout", output):
profiler.sample(mock_collector, duration_sec=1)
result = output.getvalue()
# Should have captured approximately 10 samples (1 second / 0.1 second interval)
self.assertIn("Captured", result)
self.assertIn("samples", result)
# Verify collector was called multiple times
self.assertGreaterEqual(mock_collector.collect.call_count, 5)
self.assertLessEqual(mock_collector.collect.call_count, 11)
def test_sample_profiler_error_handling(self):
"""Test that the sample method handles errors gracefully."""
# Mock unwinder that raises errors
mock_unwinder = mock.MagicMock()
error_sequence = [
RuntimeError("Process died"),
[
(
1,
[
mock.MagicMock(
filename="test.py", lineno=10, funcname="test_func"
)
],
)
],
UnicodeDecodeError("utf-8", b"", 0, 1, "invalid"),
[
(
1,
[
mock.MagicMock(
filename="test.py",
lineno=20,
funcname="test_func2",
)
],
)
],
OSError("Permission denied"),
]
mock_unwinder.get_stack_trace.side_effect = error_sequence
with mock.patch(
"_remote_debugging.RemoteUnwinder"
) as mock_unwinder_class:
mock_unwinder_class.return_value = mock_unwinder
profiler = SampleProfiler(
pid=12345, sample_interval_usec=10000, all_threads=False
)
mock_collector = mock.MagicMock()
# Control timing to run exactly 5 samples
times = [0.0, 0.01, 0.02, 0.03, 0.04, 0.05, 0.06]
with mock.patch("time.perf_counter", side_effect=times):
with io.StringIO() as output:
with mock.patch("sys.stdout", output):
profiler.sample(mock_collector, duration_sec=0.05)
result = output.getvalue()
# Should report error rate
self.assertIn("Error rate:", result)
self.assertIn("%", result)
# Collector should have been called only for successful samples (should be > 0)
self.assertGreater(mock_collector.collect.call_count, 0)
self.assertLessEqual(mock_collector.collect.call_count, 3)
def test_sample_profiler_missed_samples_warning(self):
"""Test that the profiler warns about missed samples when sampling is too slow."""
mock_unwinder = mock.MagicMock()
mock_unwinder.get_stack_trace.return_value = [
(
1,
[
mock.MagicMock(
filename="test.py", lineno=10, funcname="test_func"
)
],
)
]
with mock.patch(
"_remote_debugging.RemoteUnwinder"
) as mock_unwinder_class:
mock_unwinder_class.return_value = mock_unwinder
# Use very short interval that we'll miss
profiler = SampleProfiler(
pid=12345, sample_interval_usec=1000, all_threads=False
) # 1ms interval
mock_collector = mock.MagicMock()
# Simulate slow sampling where we miss many samples
times = [
0.0,
0.1,
0.2,
0.3,
0.4,
0.5,
0.6,
0.7,
] # Extra time points to avoid StopIteration
with mock.patch("time.perf_counter", side_effect=times):
with io.StringIO() as output:
with mock.patch("sys.stdout", output):
profiler.sample(mock_collector, duration_sec=0.5)
result = output.getvalue()
# Should warn about missed samples
self.assertIn("Warning: missed", result)
self.assertIn("samples from the expected total", result)
@force_not_colorized_test_class
class TestPrintSampledStats(unittest.TestCase):
"""Test the print_sampled_stats function."""
def setUp(self):
"""Set up test data."""
# Mock stats data
self.mock_stats = mock.MagicMock()
self.mock_stats.stats = {
("file1.py", 10, "func1"): (
100,
100,
0.5,
0.5,
{},
), # cc, nc, tt, ct, callers
("file2.py", 20, "func2"): (50, 50, 0.25, 0.3, {}),
("file3.py", 30, "func3"): (200, 200, 1.5, 2.0, {}),
("file4.py", 40, "func4"): (
10,
10,
0.001,
0.001,
{},
), # millisecond range
("file5.py", 50, "func5"): (
5,
5,
0.000001,
0.000002,
{},
), # microsecond range
}
def test_print_sampled_stats_basic(self):
"""Test basic print_sampled_stats functionality."""
# Capture output
with io.StringIO() as output:
with mock.patch("sys.stdout", output):
print_sampled_stats(self.mock_stats, sample_interval_usec=100)
result = output.getvalue()
# Check header is present
self.assertIn("Profile Stats:", result)
self.assertIn("nsamples", result)
self.assertIn("tottime", result)
self.assertIn("cumtime", result)
# Check functions are present
self.assertIn("func1", result)
self.assertIn("func2", result)
self.assertIn("func3", result)
def test_print_sampled_stats_sorting(self):
"""Test different sorting options."""
# Test sort by calls
with io.StringIO() as output:
with mock.patch("sys.stdout", output):
print_sampled_stats(
self.mock_stats, sort=0, sample_interval_usec=100
)
result = output.getvalue()
lines = result.strip().split("\n")
# Find the data lines (skip header)
data_lines = [l for l in lines if "file" in l and ".py" in l]
# func3 should be first (200 calls)
self.assertIn("func3", data_lines[0])
# Test sort by time
with io.StringIO() as output:
with mock.patch("sys.stdout", output):
print_sampled_stats(
self.mock_stats, sort=1, sample_interval_usec=100
)
result = output.getvalue()
lines = result.strip().split("\n")
data_lines = [l for l in lines if "file" in l and ".py" in l]
# func3 should be first (1.5s time)
self.assertIn("func3", data_lines[0])
def test_print_sampled_stats_limit(self):
"""Test limiting output rows."""
with io.StringIO() as output:
with mock.patch("sys.stdout", output):
print_sampled_stats(
self.mock_stats, limit=2, sample_interval_usec=100
)
result = output.getvalue()
# Count function entries in the main stats section (not in summary)
lines = result.split("\n")
# Find where the main stats section ends (before summary)
main_section_lines = []
for line in lines:
if "Summary of Interesting Functions:" in line:
break
main_section_lines.append(line)
# Count function entries only in main section
func_count = sum(
1
for line in main_section_lines
if "func" in line and ".py" in line
)
self.assertEqual(func_count, 2)
def test_print_sampled_stats_time_units(self):
"""Test proper time unit selection."""
with io.StringIO() as output:
with mock.patch("sys.stdout", output):
print_sampled_stats(self.mock_stats, sample_interval_usec=100)
result = output.getvalue()
# Should use seconds for the header since max time is > 1s
self.assertIn("tottime (s)", result)
self.assertIn("cumtime (s)", result)
# Test with only microsecond-range times
micro_stats = mock.MagicMock()
micro_stats.stats = {
("file1.py", 10, "func1"): (100, 100, 0.000005, 0.000010, {}),
}
with io.StringIO() as output:
with mock.patch("sys.stdout", output):
print_sampled_stats(micro_stats, sample_interval_usec=100)
result = output.getvalue()
# Should use microseconds
self.assertIn("tottime (μs)", result)
self.assertIn("cumtime (μs)", result)
def test_print_sampled_stats_summary(self):
"""Test summary section generation."""
with io.StringIO() as output:
with mock.patch("sys.stdout", output):
print_sampled_stats(
self.mock_stats,
show_summary=True,
sample_interval_usec=100,
)
result = output.getvalue()
# Check summary sections are present
self.assertIn("Summary of Interesting Functions:", result)
self.assertIn(
"Functions with Highest Direct/Cumulative Ratio (Hot Spots):",
result,
)
self.assertIn(
"Functions with Highest Call Frequency (Indirect Calls):", result
)
self.assertIn(
"Functions with Highest Call Magnification (Cumulative/Direct):",
result,
)
def test_print_sampled_stats_no_summary(self):
"""Test disabling summary output."""
with io.StringIO() as output:
with mock.patch("sys.stdout", output):
print_sampled_stats(
self.mock_stats,
show_summary=False,
sample_interval_usec=100,
)
result = output.getvalue()
# Summary should not be present
self.assertNotIn("Summary of Interesting Functions:", result)
def test_print_sampled_stats_empty_stats(self):
"""Test with empty stats."""
empty_stats = mock.MagicMock()
empty_stats.stats = {}
with io.StringIO() as output:
with mock.patch("sys.stdout", output):
print_sampled_stats(empty_stats, sample_interval_usec=100)
result = output.getvalue()
# Should still print header
self.assertIn("Profile Stats:", result)
def test_print_sampled_stats_sample_percentage_sorting(self):
"""Test sample percentage sorting options."""
# Add a function with high sample percentage (more direct calls than func3's 200)
self.mock_stats.stats[("expensive.py", 60, "expensive_func")] = (
300, # direct calls (higher than func3's 200)
300, # cumulative calls
1.0, # total time
1.0, # cumulative time
{},
)
# Test sort by sample percentage
with io.StringIO() as output:
with mock.patch("sys.stdout", output):
print_sampled_stats(
self.mock_stats, sort=3, sample_interval_usec=100
) # sample percentage
result = output.getvalue()
lines = result.strip().split("\n")
data_lines = [l for l in lines if ".py" in l and "func" in l]
# expensive_func should be first (highest sample percentage)
self.assertIn("expensive_func", data_lines[0])
def test_print_sampled_stats_with_recursive_calls(self):
"""Test print_sampled_stats with recursive calls where nc != cc."""
# Create stats with recursive calls (nc != cc)
recursive_stats = mock.MagicMock()
recursive_stats.stats = {
# (direct_calls, cumulative_calls, tt, ct, callers) - recursive function
("recursive.py", 10, "factorial"): (
5, # direct_calls
10, # cumulative_calls (appears more times in stack due to recursion)
0.5,
0.6,
{},
),
("normal.py", 20, "normal_func"): (
3, # direct_calls
3, # cumulative_calls (same as direct for non-recursive)
0.2,
0.2,
{},
),
}
with io.StringIO() as output:
with mock.patch("sys.stdout", output):
print_sampled_stats(recursive_stats, sample_interval_usec=100)
result = output.getvalue()
# Should display recursive calls as "5/10" format
self.assertIn("5/10", result) # nc/cc format for recursive calls
self.assertIn("3", result) # just nc for non-recursive calls
self.assertIn("factorial", result)
self.assertIn("normal_func", result)
def test_print_sampled_stats_with_zero_call_counts(self):
"""Test print_sampled_stats with zero call counts to trigger division protection."""
# Create stats with zero call counts
zero_stats = mock.MagicMock()
zero_stats.stats = {
("file.py", 10, "zero_calls"): (0, 0, 0.0, 0.0, {}), # Zero calls
("file.py", 20, "normal_func"): (
5,
5,
0.1,
0.1,
{},
), # Normal function
}
with io.StringIO() as output:
with mock.patch("sys.stdout", output):
print_sampled_stats(zero_stats, sample_interval_usec=100)
result = output.getvalue()
# Should handle zero call counts gracefully
self.assertIn("zero_calls", result)
self.assertIn("zero_calls", result)
self.assertIn("normal_func", result)
def test_print_sampled_stats_sort_by_name(self):
"""Test sort by function name option."""
with io.StringIO() as output:
with mock.patch("sys.stdout", output):
print_sampled_stats(
self.mock_stats, sort=-1, sample_interval_usec=100
) # sort by name
result = output.getvalue()
lines = result.strip().split("\n")
# Find the data lines (skip header and summary)
# Data lines start with whitespace and numbers, and contain filename:lineno(function)
data_lines = []
for line in lines:
# Skip header lines and summary sections
if (
line.startswith(" ")
and "(" in line
and ")" in line
and not line.startswith(
" 1."
) # Skip summary lines that start with times
and not line.startswith(
" 0."
) # Skip summary lines that start with times
and not "per call" in line # Skip summary lines
and not "calls" in line # Skip summary lines
and not "total time" in line # Skip summary lines
and not "cumulative time" in line
): # Skip summary lines
data_lines.append(line)
# Extract just the function names for comparison
func_names = []
import re
for line in data_lines:
# Function name is between the last ( and ), accounting for ANSI color codes
match = re.search(r"\(([^)]+)\)$", line)
if match:
func_name = match.group(1)
# Remove ANSI color codes
func_name = re.sub(r"\x1b\[[0-9;]*m", "", func_name)
func_names.append(func_name)
# Verify we extracted function names and they are sorted
self.assertGreater(
len(func_names), 0, "Should have extracted some function names"
)
self.assertEqual(
func_names,
sorted(func_names),
f"Function names {func_names} should be sorted alphabetically",
)
def test_print_sampled_stats_with_zero_time_functions(self):
"""Test summary sections with functions that have zero time."""
# Create stats with zero-time functions
zero_time_stats = mock.MagicMock()
zero_time_stats.stats = {
("file1.py", 10, "zero_time_func"): (
5,
5,
0.0,
0.0,
{},
), # Zero time
("file2.py", 20, "normal_func"): (
3,
3,
0.1,
0.1,
{},
), # Normal time
}
with io.StringIO() as output:
with mock.patch("sys.stdout", output):
print_sampled_stats(
zero_time_stats,
show_summary=True,
sample_interval_usec=100,
)
result = output.getvalue()
# Should handle zero-time functions gracefully in summary
self.assertIn("Summary of Interesting Functions:", result)
self.assertIn("zero_time_func", result)
self.assertIn("normal_func", result)
def test_print_sampled_stats_with_malformed_qualified_names(self):
"""Test summary generation with function names that don't contain colons."""
# Create stats with function names that would create malformed qualified names
malformed_stats = mock.MagicMock()
malformed_stats.stats = {
# Function name without clear module separation
("no_colon_func", 10, "func"): (3, 3, 0.1, 0.1, {}),
("", 20, "empty_filename_func"): (2, 2, 0.05, 0.05, {}),
("normal.py", 30, "normal_func"): (5, 5, 0.2, 0.2, {}),
}
with io.StringIO() as output:
with mock.patch("sys.stdout", output):
print_sampled_stats(
malformed_stats,
show_summary=True,
sample_interval_usec=100,
)
result = output.getvalue()
# Should handle malformed names gracefully in summary aggregation
self.assertIn("Summary of Interesting Functions:", result)
# All function names should appear somewhere in the output
self.assertIn("func", result)
self.assertIn("empty_filename_func", result)
self.assertIn("normal_func", result)
def test_print_sampled_stats_with_recursive_call_stats_creation(self):
"""Test create_stats with recursive call data to trigger total_rec_calls branch."""
collector = PstatsCollector(sample_interval_usec=1000000) # 1 second
# Simulate recursive function data where total_rec_calls would be set
# We need to manually manipulate the collector result to test this branch
collector.result = {
("recursive.py", 10, "factorial"): {
"total_rec_calls": 3, # Non-zero recursive calls
"direct_calls": 5,
"cumulative_calls": 10,
},
("normal.py", 20, "normal_func"): {
"total_rec_calls": 0, # Zero recursive calls
"direct_calls": 2,
"cumulative_calls": 5,
},
}
collector.create_stats()
# Check that recursive calls are handled differently from non-recursive
factorial_stats = collector.stats[("recursive.py", 10, "factorial")]
normal_stats = collector.stats[("normal.py", 20, "normal_func")]
# factorial should use cumulative_calls (10) as nc
self.assertEqual(
factorial_stats[1], 10
) # nc should be cumulative_calls
self.assertEqual(factorial_stats[0], 5) # cc should be direct_calls
# normal_func should use cumulative_calls as nc
self.assertEqual(normal_stats[1], 5) # nc should be cumulative_calls
self.assertEqual(normal_stats[0], 2) # cc should be direct_calls

View file

@ -2692,6 +2692,7 @@ TESTSUBDIRS= idlelib/idle_test \
test/test_pathlib/support \
test/test_peg_generator \
test/test_profiling \
test/test_profiling/test_sampling_profiler \
test/test_pydoc \
test/test_pyrepl \
test/test_string \
@ -3049,6 +3050,9 @@ frameworkinstallunversionedstructure: $(LDLIBRARY)
$(INSTALL) -d -m $(DIRMODE) $(DESTDIR)$(PYTHONFRAMEWORKINSTALLDIR)
sed 's/%VERSION%/'"`$(RUNSHARED) $(PYTHON_FOR_BUILD) -c 'import platform; print(platform.python_version())'`"'/g' < $(RESSRCDIR)/Info.plist > $(DESTDIR)$(PYTHONFRAMEWORKINSTALLDIR)/Info.plist
$(INSTALL_SHARED) $(LDLIBRARY) $(DESTDIR)$(PYTHONFRAMEWORKPREFIX)/$(LDLIBRARY)
$(INSTALL) -d -m $(DIRMODE) $(DESTDIR)$(LIBDIR)
$(LN) -fs "../$(LDLIBRARY)" "$(DESTDIR)$(prefix)/lib/libpython$(LDVERSION).dylib"
$(LN) -fs "../$(LDLIBRARY)" "$(DESTDIR)$(prefix)/lib/libpython$(VERSION).dylib"
$(INSTALL) -d -m $(DIRMODE) $(DESTDIR)$(BINDIR)
for file in $(srcdir)/$(RESSRCDIR)/bin/* ; do \
$(INSTALL) -m $(EXEMODE) $$file $(DESTDIR)$(BINDIR); \

View file

@ -0,0 +1,2 @@
Add :c:func:`PyUnstable_Object_Dump` to dump an object to ``stderr``. It should
only be used for debugging. Patch by Victor Stinner.

View file

@ -0,0 +1 @@
Improve multithreaded scaling of dataclasses on the free-threaded build.

View file

@ -0,0 +1 @@
Fix bad file descriptor errors from ``_posixsubprocess`` on AIX.

View file

@ -0,0 +1,2 @@
Break reference cycles created by each call to :func:`json.dump` or
:meth:`json.JSONEncoder.iterencode`.

View file

@ -0,0 +1,3 @@
Fix buffer overflow in ``_Py_wrealpath()`` for paths exceeding ``MAXPATHLEN`` bytes
by using dynamic memory allocation instead of fixed-size buffer.
Patch by Shamil Abdulaev.

View file

@ -0,0 +1 @@
Fix flaky test_profiling tests on i686 and s390x architectures by increasing slow_fibonacci call frequency from every 5th iteration to every 2nd iteration.

View file

@ -0,0 +1,3 @@
Each slice of an iOS XCframework now contains a ``lib`` folder that contains
a symlink to the libpython dylib. This allows binary modules to be compiled
for iOS using dynamic libreary linking, rather than Framework linking.

View file

@ -514,7 +514,13 @@ _close_open_fds_maybe_unsafe(int start_fd, int *fds_to_keep,
proc_fd_dir = NULL;
else
#endif
#if defined(_AIX)
char fd_path[PATH_MAX];
snprintf(fd_path, sizeof(fd_path), "/proc/%ld/fd", (long)getpid());
proc_fd_dir = opendir(fd_path);
#else
proc_fd_dir = opendir(FD_DIR);
#endif
if (!proc_fd_dir) {
/* No way to get a list of open fds. */
_close_range_except(start_fd, -1, fds_to_keep, fds_to_keep_len,

View file

@ -485,6 +485,30 @@ is_uniquely_referenced(PyObject *self, PyObject *op)
}
static PyObject *
pyobject_dump(PyObject *self, PyObject *args)
{
PyObject *op;
int release_gil = 0;
if (!PyArg_ParseTuple(args, "O|i", &op, &release_gil)) {
return NULL;
}
NULLABLE(op);
if (release_gil) {
Py_BEGIN_ALLOW_THREADS
PyUnstable_Object_Dump(op);
Py_END_ALLOW_THREADS
}
else {
PyUnstable_Object_Dump(op);
}
Py_RETURN_NONE;
}
static PyMethodDef test_methods[] = {
{"call_pyobject_print", call_pyobject_print, METH_VARARGS},
{"pyobject_print_null", pyobject_print_null, METH_VARARGS},
@ -511,6 +535,7 @@ static PyMethodDef test_methods[] = {
{"test_py_is_funcs", test_py_is_funcs, METH_NOARGS},
{"clear_managed_dict", clear_managed_dict, METH_O, NULL},
{"is_uniquely_referenced", is_uniquely_referenced, METH_O},
{"pyobject_dump", pyobject_dump, METH_VARARGS},
{NULL},
};

View file

@ -713,7 +713,7 @@ _PyObject_IsFreed(PyObject *op)
/* For debugging convenience. See Misc/gdbinit for some useful gdb hooks */
void
_PyObject_Dump(PyObject* op)
PyUnstable_Object_Dump(PyObject* op)
{
if (_PyObject_IsFreed(op)) {
/* It seems like the object memory has been freed:
@ -3150,7 +3150,7 @@ _PyObject_AssertFailed(PyObject *obj, const char *expr, const char *msg,
/* This might succeed or fail, but we're about to abort, so at least
try to provide any extra info we can: */
_PyObject_Dump(obj);
PyUnstable_Object_Dump(obj);
fprintf(stderr, "\n");
fflush(stderr);

View file

@ -6546,6 +6546,18 @@ type_setattro(PyObject *self, PyObject *name, PyObject *value)
assert(!_PyType_HasFeature(metatype, Py_TPFLAGS_INLINE_VALUES));
assert(!_PyType_HasFeature(metatype, Py_TPFLAGS_MANAGED_DICT));
#ifdef Py_GIL_DISABLED
// gh-139103: Enable deferred refcounting for functions assigned
// to type objects. This is important for `dataclass.__init__`,
// which is generated dynamically.
if (value != NULL &&
PyFunction_Check(value) &&
!_PyObject_HasDeferredRefcount(value))
{
PyUnstable_Object_EnableDeferredRefcount(value);
}
#endif
PyObject *old_value = NULL;
PyObject *descr = _PyType_LookupRef(metatype, name);
if (descr != NULL) {

View file

@ -547,7 +547,8 @@ unicode_check_encoding_errors(const char *encoding, const char *errors)
}
/* Disable checks during Python finalization. For example, it allows to
call _PyObject_Dump() during finalization for debugging purpose. */
* call PyUnstable_Object_Dump() during finalization for debugging purpose.
*/
if (_PyInterpreterState_GetFinalizing(interp) != NULL) {
return 0;
}

View file

@ -2118,7 +2118,6 @@ _Py_wrealpath(const wchar_t *path,
wchar_t *resolved_path, size_t resolved_path_len)
{
char *cpath;
char cresolved_path[MAXPATHLEN];
wchar_t *wresolved_path;
char *res;
size_t r;
@ -2127,12 +2126,14 @@ _Py_wrealpath(const wchar_t *path,
errno = EINVAL;
return NULL;
}
res = realpath(cpath, cresolved_path);
res = realpath(cpath, NULL);
PyMem_RawFree(cpath);
if (res == NULL)
return NULL;
wresolved_path = Py_DecodeLocale(cresolved_path, &r);
wresolved_path = Py_DecodeLocale(res, &r);
free(res);
if (wresolved_path == NULL) {
errno = EINVAL;
return NULL;

View file

@ -2237,7 +2237,7 @@ _PyGC_Fini(PyInterpreterState *interp)
void
_PyGC_Dump(PyGC_Head *g)
{
_PyObject_Dump(FROM_GC(g));
PyUnstable_Object_Dump(FROM_GC(g));
}

View file

@ -1181,7 +1181,7 @@ _PyErr_Display(PyObject *file, PyObject *unused, PyObject *value, PyObject *tb)
}
if (print_exception_recursive(&ctx, value) < 0) {
PyErr_Clear();
_PyObject_Dump(value);
PyUnstable_Object_Dump(value);
fprintf(stderr, "lost sys.stderr\n");
}
Py_XDECREF(ctx.seen);
@ -1199,14 +1199,14 @@ PyErr_Display(PyObject *unused, PyObject *value, PyObject *tb)
PyObject *file;
if (PySys_GetOptionalAttr(&_Py_ID(stderr), &file) < 0) {
PyObject *exc = PyErr_GetRaisedException();
_PyObject_Dump(value);
PyUnstable_Object_Dump(value);
fprintf(stderr, "lost sys.stderr\n");
_PyObject_Dump(exc);
PyUnstable_Object_Dump(exc);
Py_DECREF(exc);
return;
}
if (file == NULL) {
_PyObject_Dump(value);
PyUnstable_Object_Dump(value);
fprintf(stderr, "lost sys.stderr\n");
return;
}

View file

@ -27,6 +27,7 @@
import sys
import threading
import time
from dataclasses import dataclass
from operator import methodcaller
# The iterations in individual benchmarks are scaled by this factor.
@ -202,6 +203,17 @@ def method_caller():
for i in range(1000 * WORK_SCALE):
mc(obj)
@dataclass
class MyDataClass:
x: int
y: int
z: int
@register_benchmark
def instantiate_dataclass():
for _ in range(1000 * WORK_SCALE):
obj = MyDataClass(x=1, y=2, z=3)
def bench_one_thread(func):
t0 = time.perf_counter_ns()
func()