mirror of
https://github.com/python/cpython.git
synced 2025-10-24 02:13:49 +00:00

After some failures in AMD64 FreeBSD CURRENT Debug 3.x buildbots regarding tests in test_multiprocessing_spawn and after examining similar failures in test_socket, some errors in the calculation of ancillary data buffers were found in multiprocessing.reduction. CMSG_LEN() can often be used as the buffer size for recvmsg() to receive a single item of ancillary data, but RFC 3542 requires portable applications to use CMSG_SPACE() and thus include space for padding, even when the item will be the last in the buffer. The failures we experience are due to the usage of CMSG_LEN() instead of CMSG_SPACE().
281 lines
9.3 KiB
Python
281 lines
9.3 KiB
Python
#
|
|
# Module which deals with pickling of objects.
|
|
#
|
|
# multiprocessing/reduction.py
|
|
#
|
|
# Copyright (c) 2006-2008, R Oudkerk
|
|
# Licensed to PSF under a Contributor Agreement.
|
|
#
|
|
|
|
from abc import ABCMeta
|
|
import copyreg
|
|
import functools
|
|
import io
|
|
import os
|
|
import pickle
|
|
import socket
|
|
import sys
|
|
|
|
from . import context
|
|
|
|
__all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump']
|
|
|
|
|
|
HAVE_SEND_HANDLE = (sys.platform == 'win32' or
|
|
(hasattr(socket, 'CMSG_LEN') and
|
|
hasattr(socket, 'SCM_RIGHTS') and
|
|
hasattr(socket.socket, 'sendmsg')))
|
|
|
|
#
|
|
# Pickler subclass
|
|
#
|
|
|
|
class ForkingPickler(pickle.Pickler):
|
|
'''Pickler subclass used by multiprocessing.'''
|
|
_extra_reducers = {}
|
|
_copyreg_dispatch_table = copyreg.dispatch_table
|
|
|
|
def __init__(self, *args):
|
|
super().__init__(*args)
|
|
self.dispatch_table = self._copyreg_dispatch_table.copy()
|
|
self.dispatch_table.update(self._extra_reducers)
|
|
|
|
@classmethod
|
|
def register(cls, type, reduce):
|
|
'''Register a reduce function for a type.'''
|
|
cls._extra_reducers[type] = reduce
|
|
|
|
@classmethod
|
|
def dumps(cls, obj, protocol=None):
|
|
buf = io.BytesIO()
|
|
cls(buf, protocol).dump(obj)
|
|
return buf.getbuffer()
|
|
|
|
loads = pickle.loads
|
|
|
|
register = ForkingPickler.register
|
|
|
|
def dump(obj, file, protocol=None):
|
|
'''Replacement for pickle.dump() using ForkingPickler.'''
|
|
ForkingPickler(file, protocol).dump(obj)
|
|
|
|
#
|
|
# Platform specific definitions
|
|
#
|
|
|
|
if sys.platform == 'win32':
|
|
# Windows
|
|
__all__ += ['DupHandle', 'duplicate', 'steal_handle']
|
|
import _winapi
|
|
|
|
def duplicate(handle, target_process=None, inheritable=False,
|
|
*, source_process=None):
|
|
'''Duplicate a handle. (target_process is a handle not a pid!)'''
|
|
current_process = _winapi.GetCurrentProcess()
|
|
if source_process is None:
|
|
source_process = current_process
|
|
if target_process is None:
|
|
target_process = current_process
|
|
return _winapi.DuplicateHandle(
|
|
source_process, handle, target_process,
|
|
0, inheritable, _winapi.DUPLICATE_SAME_ACCESS)
|
|
|
|
def steal_handle(source_pid, handle):
|
|
'''Steal a handle from process identified by source_pid.'''
|
|
source_process_handle = _winapi.OpenProcess(
|
|
_winapi.PROCESS_DUP_HANDLE, False, source_pid)
|
|
try:
|
|
return _winapi.DuplicateHandle(
|
|
source_process_handle, handle,
|
|
_winapi.GetCurrentProcess(), 0, False,
|
|
_winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE)
|
|
finally:
|
|
_winapi.CloseHandle(source_process_handle)
|
|
|
|
def send_handle(conn, handle, destination_pid):
|
|
'''Send a handle over a local connection.'''
|
|
dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid)
|
|
conn.send(dh)
|
|
|
|
def recv_handle(conn):
|
|
'''Receive a handle over a local connection.'''
|
|
return conn.recv().detach()
|
|
|
|
class DupHandle(object):
|
|
'''Picklable wrapper for a handle.'''
|
|
def __init__(self, handle, access, pid=None):
|
|
if pid is None:
|
|
# We just duplicate the handle in the current process and
|
|
# let the receiving process steal the handle.
|
|
pid = os.getpid()
|
|
proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
|
|
try:
|
|
self._handle = _winapi.DuplicateHandle(
|
|
_winapi.GetCurrentProcess(),
|
|
handle, proc, access, False, 0)
|
|
finally:
|
|
_winapi.CloseHandle(proc)
|
|
self._access = access
|
|
self._pid = pid
|
|
|
|
def detach(self):
|
|
'''Get the handle. This should only be called once.'''
|
|
# retrieve handle from process which currently owns it
|
|
if self._pid == os.getpid():
|
|
# The handle has already been duplicated for this process.
|
|
return self._handle
|
|
# We must steal the handle from the process whose pid is self._pid.
|
|
proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
|
|
self._pid)
|
|
try:
|
|
return _winapi.DuplicateHandle(
|
|
proc, self._handle, _winapi.GetCurrentProcess(),
|
|
self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE)
|
|
finally:
|
|
_winapi.CloseHandle(proc)
|
|
|
|
else:
|
|
# Unix
|
|
__all__ += ['DupFd', 'sendfds', 'recvfds']
|
|
import array
|
|
|
|
# On MacOSX we should acknowledge receipt of fds -- see Issue14669
|
|
ACKNOWLEDGE = sys.platform == 'darwin'
|
|
|
|
def sendfds(sock, fds):
|
|
'''Send an array of fds over an AF_UNIX socket.'''
|
|
fds = array.array('i', fds)
|
|
msg = bytes([len(fds) % 256])
|
|
sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
|
|
if ACKNOWLEDGE and sock.recv(1) != b'A':
|
|
raise RuntimeError('did not receive acknowledgement of fd')
|
|
|
|
def recvfds(sock, size):
|
|
'''Receive an array of fds over an AF_UNIX socket.'''
|
|
a = array.array('i')
|
|
bytes_size = a.itemsize * size
|
|
msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_SPACE(bytes_size))
|
|
if not msg and not ancdata:
|
|
raise EOFError
|
|
try:
|
|
if ACKNOWLEDGE:
|
|
sock.send(b'A')
|
|
if len(ancdata) != 1:
|
|
raise RuntimeError('received %d items of ancdata' %
|
|
len(ancdata))
|
|
cmsg_level, cmsg_type, cmsg_data = ancdata[0]
|
|
if (cmsg_level == socket.SOL_SOCKET and
|
|
cmsg_type == socket.SCM_RIGHTS):
|
|
if len(cmsg_data) % a.itemsize != 0:
|
|
raise ValueError
|
|
a.frombytes(cmsg_data)
|
|
if len(a) % 256 != msg[0]:
|
|
raise AssertionError(
|
|
"Len is {0:n} but msg[0] is {1!r}".format(
|
|
len(a), msg[0]))
|
|
return list(a)
|
|
except (ValueError, IndexError):
|
|
pass
|
|
raise RuntimeError('Invalid data received')
|
|
|
|
def send_handle(conn, handle, destination_pid):
|
|
'''Send a handle over a local connection.'''
|
|
with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
|
|
sendfds(s, [handle])
|
|
|
|
def recv_handle(conn):
|
|
'''Receive a handle over a local connection.'''
|
|
with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
|
|
return recvfds(s, 1)[0]
|
|
|
|
def DupFd(fd):
|
|
'''Return a wrapper for an fd.'''
|
|
popen_obj = context.get_spawning_popen()
|
|
if popen_obj is not None:
|
|
return popen_obj.DupFd(popen_obj.duplicate_for_child(fd))
|
|
elif HAVE_SEND_HANDLE:
|
|
from . import resource_sharer
|
|
return resource_sharer.DupFd(fd)
|
|
else:
|
|
raise ValueError('SCM_RIGHTS appears not to be available')
|
|
|
|
#
|
|
# Try making some callable types picklable
|
|
#
|
|
|
|
def _reduce_method(m):
|
|
if m.__self__ is None:
|
|
return getattr, (m.__class__, m.__func__.__name__)
|
|
else:
|
|
return getattr, (m.__self__, m.__func__.__name__)
|
|
class _C:
|
|
def f(self):
|
|
pass
|
|
register(type(_C().f), _reduce_method)
|
|
|
|
|
|
def _reduce_method_descriptor(m):
|
|
return getattr, (m.__objclass__, m.__name__)
|
|
register(type(list.append), _reduce_method_descriptor)
|
|
register(type(int.__add__), _reduce_method_descriptor)
|
|
|
|
|
|
def _reduce_partial(p):
|
|
return _rebuild_partial, (p.func, p.args, p.keywords or {})
|
|
def _rebuild_partial(func, args, keywords):
|
|
return functools.partial(func, *args, **keywords)
|
|
register(functools.partial, _reduce_partial)
|
|
|
|
#
|
|
# Make sockets picklable
|
|
#
|
|
|
|
if sys.platform == 'win32':
|
|
def _reduce_socket(s):
|
|
from .resource_sharer import DupSocket
|
|
return _rebuild_socket, (DupSocket(s),)
|
|
def _rebuild_socket(ds):
|
|
return ds.detach()
|
|
register(socket.socket, _reduce_socket)
|
|
|
|
else:
|
|
def _reduce_socket(s):
|
|
df = DupFd(s.fileno())
|
|
return _rebuild_socket, (df, s.family, s.type, s.proto)
|
|
def _rebuild_socket(df, family, type, proto):
|
|
fd = df.detach()
|
|
return socket.socket(family, type, proto, fileno=fd)
|
|
register(socket.socket, _reduce_socket)
|
|
|
|
|
|
class AbstractReducer(metaclass=ABCMeta):
|
|
'''Abstract base class for use in implementing a Reduction class
|
|
suitable for use in replacing the standard reduction mechanism
|
|
used in multiprocessing.'''
|
|
ForkingPickler = ForkingPickler
|
|
register = register
|
|
dump = dump
|
|
send_handle = send_handle
|
|
recv_handle = recv_handle
|
|
|
|
if sys.platform == 'win32':
|
|
steal_handle = steal_handle
|
|
duplicate = duplicate
|
|
DupHandle = DupHandle
|
|
else:
|
|
sendfds = sendfds
|
|
recvfds = recvfds
|
|
DupFd = DupFd
|
|
|
|
_reduce_method = _reduce_method
|
|
_reduce_method_descriptor = _reduce_method_descriptor
|
|
_rebuild_partial = _rebuild_partial
|
|
_reduce_socket = _reduce_socket
|
|
_rebuild_socket = _rebuild_socket
|
|
|
|
def __init__(self, *args):
|
|
register(type(_C().f), _reduce_method)
|
|
register(type(list.append), _reduce_method_descriptor)
|
|
register(type(int.__add__), _reduce_method_descriptor)
|
|
register(functools.partial, _reduce_partial)
|
|
register(socket.socket, _reduce_socket)
|