This commit is contained in:
Charlie Lin 2025-12-08 15:11:50 +01:00 committed by GitHub
commit 7a06eb2060
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 71 additions and 0 deletions

View file

@ -14,6 +14,7 @@ jobs:
exclude:
- os: windows-11-arm
py: "3.10"
runs-on: ${{ matrix.os }}
name: Run test with Python ${{ matrix.py }} on ${{ matrix.os }}
@ -33,6 +34,11 @@ jobs:
run: |
python -m pip install -r requirements.txt pytest
- name: Install pytest-run-parallel under free-threading
if: contains(matrix.py, 't')
run: |
pip install pytest-run-parallel
- name: Build
shell: bash
run: |
@ -40,15 +46,29 @@ jobs:
pip install .
- name: Test (C extension)
if: ${{ ! contains(matrix.py, 't') }}
shell: bash
run: |
pytest -v test
- name: Test (pure Python fallback)
if: ${{ ! contains(matrix.py, 't') }}
shell: bash
run: |
MSGPACK_PUREPYTHON=1 pytest -v test
- name: Test (C extension) in parallel under free-threading
if: contains(matrix.py, 't')
shell: bash
run: |
pytest -v --parallel-threads=auto --iterations=20 test
- name: Test (pure Python fallback) in parallel under free-threading
if: contains(matrix.py, 't')
shell: bash
run: |
MSGPACK_PUREPYTHON=1 pytest -v --parallel-threads=auto --iterations=20 test
- name: build packages
shell: bash
run: |

View file

@ -0,0 +1,51 @@
#!/usr/bin/env python3
import threading
from concurrent.futures import ThreadPoolExecutor
from msgpack import Packer
def run_threaded(
func,
num_threads=8,
pass_count=False,
pass_barrier=False,
outer_iterations=1,
prepare_args=None,
):
"""Runs a function many times in parallel"""
for _ in range(outer_iterations):
with ThreadPoolExecutor(max_workers=num_threads) as tpe:
if prepare_args is None:
args = []
else:
args = prepare_args()
if pass_barrier:
barrier = threading.Barrier(num_threads)
args.append(barrier)
if pass_count:
all_args = [(func, i, *args) for i in range(num_threads)]
else:
all_args = [(func, *args) for i in range(num_threads)]
try:
futures = []
for arg in all_args:
futures.append(tpe.submit(*arg))
finally:
if len(futures) < num_threads and pass_barrier:
barrier.abort()
for f in futures:
f.result()
def test_multithread_packing():
output = []
test_data = "abcd" * 10_000_000
packer = Packer()
def closure(b):
data = packer.pack(test_data)
output.append(data)
b.wait()
run_threaded(closure, num_threads=10, pass_barrier=True, pass_count=False)