mirror of
				https://github.com/python/cpython.git
				synced 2025-10-30 21:21:22 +00:00 
			
		
		
		
	gh-136547: fix hashlib_helper for blocking and requesting digests (#136762)
				
					
				
			- Fix `hashlib_helper.block_algorithm` where the dummy functions were incorrectly defined.
- Rename `hashlib_helper.HashAPI` to `hashlib_helper.HashInfo` and add more helper methods.
- Simplify `hashlib_helper.requires_*()` functions.
- Rewrite some private helpers in `hashlib_helper`.
- Remove `find_{builtin,openssl}_hashdigest_constructor()` as they are no more needed and were
  not meant to be public in the first place.
- Fix some tests in `test_hashlib` when FIPS mode is on.
			
			
This commit is contained in:
		
							parent
							
								
									cc81b4e501
								
							
						
					
					
						commit
						c504f62fe2
					
				
					 4 changed files with 579 additions and 310 deletions
				
			
		|  | @ -136,12 +136,22 @@ def __get_openssl_constructor(name): | |||
|         # Prefer our builtin blake2 implementation. | ||||
|         return __get_builtin_constructor(name) | ||||
|     try: | ||||
|         # MD5, SHA1, and SHA2 are in all supported OpenSSL versions | ||||
|         # SHA3/shake are available in OpenSSL 1.1.1+ | ||||
|         # Fetch the OpenSSL hash function if it exists, | ||||
|         # independently of the context security policy. | ||||
|         f = getattr(_hashlib, 'openssl_' + name) | ||||
|         # Allow the C module to raise ValueError.  The function will be | ||||
|         # defined but the hash not actually available.  Don't fall back to | ||||
|         # builtin if the current security policy blocks a digest, bpo#40695. | ||||
|         # Check if the context security policy blocks the digest or not | ||||
|         # by allowing the C module to raise a ValueError. The function | ||||
|         # will be defined but the hash will not be available at runtime. | ||||
|         # | ||||
|         # We use "usedforsecurity=False" to prevent falling back to the | ||||
|         # built-in function in case the security policy does not allow it. | ||||
|         # | ||||
|         # Note that this only affects the explicit named constructors, | ||||
|         # and not the algorithms exposed through hashlib.new() which | ||||
|         # can still be resolved to a built-in function even if the | ||||
|         # current security policy does not allow it. | ||||
|         # | ||||
|         # See https://github.com/python/cpython/issues/84872. | ||||
|         f(usedforsecurity=False) | ||||
|         # Use the C function directly (very fast) | ||||
|         return f | ||||
|  |  | |||
|  | @ -1,91 +1,214 @@ | |||
| import contextlib | ||||
| import enum | ||||
| import functools | ||||
| import hashlib | ||||
| import importlib | ||||
| import inspect | ||||
| import unittest | ||||
| import unittest.mock | ||||
| from collections import namedtuple | ||||
| from test.support.import_helper import import_module | ||||
| from test.support import import_helper | ||||
| from types import MappingProxyType | ||||
| 
 | ||||
| try: | ||||
|     import _hashlib | ||||
| except ImportError: | ||||
|     _hashlib = None | ||||
| 
 | ||||
| try: | ||||
|     import _hmac | ||||
| except ImportError: | ||||
|     _hmac = None | ||||
| def try_import_module(module_name): | ||||
|     """Try to import a module and return None on failure.""" | ||||
|     try: | ||||
|         return importlib.import_module(module_name) | ||||
|     except ImportError: | ||||
|         return None | ||||
| 
 | ||||
| 
 | ||||
| CANONICAL_DIGEST_NAMES = frozenset(( | ||||
|     'md5', 'sha1', | ||||
|     'sha224', 'sha256', 'sha384', 'sha512', | ||||
|     'sha3_224', 'sha3_256', 'sha3_384', 'sha3_512', | ||||
|     'shake_128', 'shake_256', | ||||
|     'blake2s', 'blake2b', | ||||
| class HID(enum.StrEnum): | ||||
|     """Enumeration containing the canonical digest names. | ||||
| 
 | ||||
|     Those names should only be used by hashlib.new() or hmac.new(). | ||||
|     Their support by _hashlib.new() is not necessarily guaranteed. | ||||
|     """ | ||||
| 
 | ||||
|     md5 = enum.auto() | ||||
|     sha1 = enum.auto() | ||||
| 
 | ||||
|     sha224 = enum.auto() | ||||
|     sha256 = enum.auto() | ||||
|     sha384 = enum.auto() | ||||
|     sha512 = enum.auto() | ||||
| 
 | ||||
|     sha3_224 = enum.auto() | ||||
|     sha3_256 = enum.auto() | ||||
|     sha3_384 = enum.auto() | ||||
|     sha3_512 = enum.auto() | ||||
| 
 | ||||
|     shake_128 = enum.auto() | ||||
|     shake_256 = enum.auto() | ||||
| 
 | ||||
|     blake2s = enum.auto() | ||||
|     blake2b = enum.auto() | ||||
| 
 | ||||
|     def __repr__(self): | ||||
|         return str(self) | ||||
| 
 | ||||
|     @property | ||||
|     def is_xof(self): | ||||
|         """Indicate whether the hash is an extendable-output hash function.""" | ||||
|         return self.startswith("shake_") | ||||
| 
 | ||||
|     @property | ||||
|     def is_keyed(self): | ||||
|         """Indicate whether the hash is a keyed hash function.""" | ||||
|         return self.startswith("blake2") | ||||
| 
 | ||||
| 
 | ||||
| CANONICAL_DIGEST_NAMES = frozenset(map(str, HID.__members__)) | ||||
| NON_HMAC_DIGEST_NAMES = frozenset(( | ||||
|     HID.shake_128, HID.shake_256, | ||||
|     HID.blake2s, HID.blake2b, | ||||
| )) | ||||
| 
 | ||||
| NON_HMAC_DIGEST_NAMES = frozenset({ | ||||
|     'shake_128', 'shake_256', | ||||
|     'blake2s', 'blake2b', | ||||
| }) | ||||
| 
 | ||||
| class HashInfo: | ||||
|     """Dataclass storing explicit hash constructor names. | ||||
| 
 | ||||
| class HashAPI(namedtuple("HashAPI", "builtin openssl hashlib")): | ||||
|     - *builtin* is the fully-qualified name for the explicit HACL* | ||||
|       hash constructor function, e.g., "_md5.md5". | ||||
| 
 | ||||
|     def fullname(self, typ): | ||||
|         match typ: | ||||
|     - *openssl* is the name of the "_hashlib" module method for the explicit | ||||
|       OpenSSL hash constructor function, e.g., "openssl_md5". | ||||
| 
 | ||||
|     - *hashlib* is the name of the "hashlib" module method for the explicit | ||||
|       hash constructor function, e.g., "md5". | ||||
|     """ | ||||
| 
 | ||||
|     def __init__(self, builtin, openssl=None, hashlib=None): | ||||
|         assert isinstance(builtin, str), builtin | ||||
|         assert len(builtin.split(".")) == 2, builtin | ||||
| 
 | ||||
|         self.builtin = builtin | ||||
|         self.builtin_module_name, self.builtin_method_name = ( | ||||
|             self.builtin.split(".", maxsplit=1) | ||||
|         ) | ||||
| 
 | ||||
|         assert openssl is None or openssl.startswith("openssl_") | ||||
|         self.openssl = self.openssl_method_name = openssl | ||||
|         self.openssl_module_name = "_hashlib" if openssl else None | ||||
| 
 | ||||
|         assert hashlib is None or isinstance(hashlib, str) | ||||
|         self.hashlib = self.hashlib_method_name = hashlib | ||||
|         self.hashlib_module_name = "hashlib" if hashlib else None | ||||
| 
 | ||||
|     def module_name(self, implementation): | ||||
|         match implementation: | ||||
|             case "builtin": | ||||
|                 return self.builtin | ||||
|                 return self.builtin_module_name | ||||
|             case "openssl": | ||||
|                 return f"_hashlib.{self.openssl}" if self.openssl else None | ||||
|                 return self.openssl_module_name | ||||
|             case "hashlib": | ||||
|                 return f"hashlib.{self.hashlib}" if self.hashlib else None | ||||
|             case _: | ||||
|                 raise AssertionError(f"unknown type: {typ}") | ||||
|                 return self.hashlib_module_name | ||||
|         raise AssertionError(f"invalid implementation {implementation}") | ||||
| 
 | ||||
|     def method_name(self, implementation): | ||||
|         match implementation: | ||||
|             case "builtin": | ||||
|                 return self.builtin_method_name | ||||
|             case "openssl": | ||||
|                 return self.openssl_method_name | ||||
|             case "hashlib": | ||||
|                 return self.hashlib_method_name | ||||
|         raise AssertionError(f"invalid implementation {implementation}") | ||||
| 
 | ||||
|     def fullname(self, implementation): | ||||
|         """Get the fully qualified name of a given implementation. | ||||
| 
 | ||||
|         This returns a string of the form "MODULE_NAME.METHOD_NAME" or None | ||||
|         if the hash function does not have a corresponding implementation. | ||||
| 
 | ||||
|         *implementation* must be "builtin", "openssl" or "hashlib". | ||||
|         """ | ||||
|         module_name = self.module_name(implementation) | ||||
|         method_name = self.method_name(implementation) | ||||
|         if module_name is None or method_name is None: | ||||
|             return None | ||||
|         return f"{module_name}.{method_name}" | ||||
| 
 | ||||
| 
 | ||||
| # Mapping from a "canonical" name to a pair (HACL*, _hashlib.*, hashlib.*) | ||||
| # constructors. If the constructor name is None, then this means that the | ||||
| # algorithm can only be used by the "agile" new() interfaces. | ||||
| _EXPLICIT_CONSTRUCTORS = MappingProxyType({ | ||||
|     "md5": HashAPI("_md5.md5", "openssl_md5", "md5"), | ||||
|     "sha1": HashAPI("_sha1.sha1", "openssl_sha1", "sha1"), | ||||
|     "sha224": HashAPI("_sha2.sha224", "openssl_sha224", "sha224"), | ||||
|     "sha256": HashAPI("_sha2.sha256", "openssl_sha256", "sha256"), | ||||
|     "sha384": HashAPI("_sha2.sha384", "openssl_sha384", "sha384"), | ||||
|     "sha512": HashAPI("_sha2.sha512", "openssl_sha512", "sha512"), | ||||
|     "sha3_224": HashAPI("_sha3.sha3_224", "openssl_sha3_224", "sha3_224"), | ||||
|     "sha3_256": HashAPI("_sha3.sha3_256", "openssl_sha3_256", "sha3_256"), | ||||
|     "sha3_384": HashAPI("_sha3.sha3_384", "openssl_sha3_384", "sha3_384"), | ||||
|     "sha3_512": HashAPI("_sha3.sha3_512", "openssl_sha3_512", "sha3_512"), | ||||
|     "shake_128": HashAPI("_sha3.shake_128", "openssl_shake_128", "shake_128"), | ||||
|     "shake_256": HashAPI("_sha3.shake_256", "openssl_shake_256", "shake_256"), | ||||
|     "blake2s": HashAPI("_blake2.blake2s", None, "blake2s"), | ||||
|     "blake2b": HashAPI("_blake2.blake2b", None, "blake2b"), | ||||
| _EXPLICIT_CONSTRUCTORS = MappingProxyType({  # fmt: skip | ||||
|     HID.md5: HashInfo("_md5.md5", "openssl_md5", "md5"), | ||||
|     HID.sha1: HashInfo("_sha1.sha1", "openssl_sha1", "sha1"), | ||||
|     HID.sha224: HashInfo("_sha2.sha224", "openssl_sha224", "sha224"), | ||||
|     HID.sha256: HashInfo("_sha2.sha256", "openssl_sha256", "sha256"), | ||||
|     HID.sha384: HashInfo("_sha2.sha384", "openssl_sha384", "sha384"), | ||||
|     HID.sha512: HashInfo("_sha2.sha512", "openssl_sha512", "sha512"), | ||||
|     HID.sha3_224: HashInfo( | ||||
|         "_sha3.sha3_224", "openssl_sha3_224", "sha3_224" | ||||
|     ), | ||||
|     HID.sha3_256: HashInfo( | ||||
|         "_sha3.sha3_256", "openssl_sha3_256", "sha3_256" | ||||
|     ), | ||||
|     HID.sha3_384: HashInfo( | ||||
|         "_sha3.sha3_384", "openssl_sha3_384", "sha3_384" | ||||
|     ), | ||||
|     HID.sha3_512: HashInfo( | ||||
|         "_sha3.sha3_512", "openssl_sha3_512", "sha3_512" | ||||
|     ), | ||||
|     HID.shake_128: HashInfo( | ||||
|         "_sha3.shake_128", "openssl_shake_128", "shake_128" | ||||
|     ), | ||||
|     HID.shake_256: HashInfo( | ||||
|         "_sha3.shake_256", "openssl_shake_256", "shake_256" | ||||
|     ), | ||||
|     HID.blake2s: HashInfo("_blake2.blake2s", None, "blake2s"), | ||||
|     HID.blake2b: HashInfo("_blake2.blake2b", None, "blake2b"), | ||||
| }) | ||||
| assert _EXPLICIT_CONSTRUCTORS.keys() == CANONICAL_DIGEST_NAMES | ||||
| get_hash_info = _EXPLICIT_CONSTRUCTORS.__getitem__ | ||||
| 
 | ||||
| # Mapping from canonical hash names to their explicit HACL* HMAC constructor. | ||||
| # There is currently no OpenSSL one-shot named function and there will likely | ||||
| # be none in the future. | ||||
| _EXPLICIT_HMAC_CONSTRUCTORS = { | ||||
|     name: f'_hmac.compute_{name}' for name in ( | ||||
|         'md5', 'sha1', | ||||
|         'sha224', 'sha256', 'sha384', 'sha512', | ||||
|         'sha3_224', 'sha3_256', 'sha3_384', 'sha3_512', | ||||
|     ) | ||||
|     HID(name): f"_hmac.compute_{name}" | ||||
|     for name in CANONICAL_DIGEST_NAMES | ||||
| } | ||||
| _EXPLICIT_HMAC_CONSTRUCTORS['shake_128'] = None | ||||
| _EXPLICIT_HMAC_CONSTRUCTORS['shake_256'] = None | ||||
| # Neither HACL* nor OpenSSL supports HMAC over XOFs. | ||||
| _EXPLICIT_HMAC_CONSTRUCTORS[HID.shake_128] = None | ||||
| _EXPLICIT_HMAC_CONSTRUCTORS[HID.shake_256] = None | ||||
| # Strictly speaking, HMAC-BLAKE is meaningless as BLAKE2 is already a | ||||
| # keyed hash function. However, as it's exposed by HACL*, we test it. | ||||
| _EXPLICIT_HMAC_CONSTRUCTORS['blake2s'] = '_hmac.compute_blake2s_32' | ||||
| _EXPLICIT_HMAC_CONSTRUCTORS['blake2b'] = '_hmac.compute_blake2b_32' | ||||
| _EXPLICIT_HMAC_CONSTRUCTORS[HID.blake2s] = '_hmac.compute_blake2s_32' | ||||
| _EXPLICIT_HMAC_CONSTRUCTORS[HID.blake2b] = '_hmac.compute_blake2b_32' | ||||
| _EXPLICIT_HMAC_CONSTRUCTORS = MappingProxyType(_EXPLICIT_HMAC_CONSTRUCTORS) | ||||
| assert _EXPLICIT_HMAC_CONSTRUCTORS.keys() == CANONICAL_DIGEST_NAMES | ||||
| 
 | ||||
| 
 | ||||
| def _decorate_func_or_class(decorator_func, func_or_class): | ||||
|     if not isinstance(func_or_class, type): | ||||
|         return decorator_func(func_or_class) | ||||
| 
 | ||||
|     decorated_class = func_or_class | ||||
|     setUpClass = decorated_class.__dict__.get('setUpClass') | ||||
|     if setUpClass is None: | ||||
|         def setUpClass(cls): | ||||
|             super(decorated_class, cls).setUpClass() | ||||
|         setUpClass.__qualname__ = decorated_class.__qualname__ + '.setUpClass' | ||||
|         setUpClass.__module__ = decorated_class.__module__ | ||||
|     else: | ||||
|         setUpClass = setUpClass.__func__ | ||||
|     setUpClass = classmethod(decorator_func(setUpClass)) | ||||
|     decorated_class.setUpClass = setUpClass | ||||
|     return decorated_class | ||||
| 
 | ||||
| 
 | ||||
| def _chain_decorators(decorators): | ||||
|     """Obtain a decorator by chaining multiple decorators. | ||||
| 
 | ||||
|     The decorators are applied in the order they are given. | ||||
|     """ | ||||
|     def decorator_func(func): | ||||
|         return functools.reduce(lambda w, deco: deco(w), decorators, func) | ||||
|     return functools.partial(_decorate_func_or_class, decorator_func) | ||||
| 
 | ||||
| 
 | ||||
| def _ensure_wrapper_signature(wrapper, wrapped): | ||||
|     """Ensure that a wrapper has the same signature as the wrapped function. | ||||
| 
 | ||||
|  | @ -108,49 +231,129 @@ def _ensure_wrapper_signature(wrapper, wrapped): | |||
| 
 | ||||
| 
 | ||||
| def requires_hashlib(): | ||||
|     _hashlib = try_import_module("_hashlib") | ||||
|     return unittest.skipIf(_hashlib is None, "requires _hashlib") | ||||
| 
 | ||||
| 
 | ||||
| def requires_builtin_hmac(): | ||||
|     _hmac = try_import_module("_hmac") | ||||
|     return unittest.skipIf(_hmac is None, "requires _hmac") | ||||
| 
 | ||||
| 
 | ||||
| def _missing_hash(digestname, implementation=None, *, exc=None): | ||||
|     parts = ["missing", implementation, f"hash algorithm: {digestname!r}"] | ||||
|     msg = " ".join(filter(None, parts)) | ||||
|     raise unittest.SkipTest(msg) from exc | ||||
| class SkipNoHash(unittest.SkipTest): | ||||
|     """A SkipTest exception raised when a hash is not available.""" | ||||
| 
 | ||||
|     def __init__(self, digestname, implementation=None, interface=None): | ||||
|         parts = ["missing", implementation, f"hash algorithm {digestname!r}"] | ||||
|         if interface is not None: | ||||
|             parts.append(f"for {interface}") | ||||
|         super().__init__(" ".join(filter(None, parts))) | ||||
| 
 | ||||
| 
 | ||||
| def _openssl_availabillity(digestname, *, usedforsecurity): | ||||
| def _hashlib_new(digestname, openssl, /, **kwargs): | ||||
|     """Check availability of [hashlib|_hashlib].new(digestname, **kwargs). | ||||
| 
 | ||||
|     If *openssl* is True, module is "_hashlib" (C extension module), | ||||
|     otherwise it is "hashlib" (pure Python interface). | ||||
| 
 | ||||
|     The constructor function is returned (without binding **kwargs), | ||||
|     or SkipTest is raised if none exists. | ||||
|     """ | ||||
|     assert isinstance(digestname, str), digestname | ||||
|     # Re-import 'hashlib' in case it was mocked, but propagate | ||||
|     # exceptions as it should be unconditionally available. | ||||
|     hashlib = importlib.import_module("hashlib") | ||||
|     # re-import '_hashlib' in case it was mocked | ||||
|     _hashlib = try_import_module("_hashlib") | ||||
|     module = _hashlib if openssl and _hashlib is not None else hashlib | ||||
|     try: | ||||
|         module.new(digestname, **kwargs) | ||||
|     except ValueError as exc: | ||||
|         interface = f"{module.__name__}.new" | ||||
|         raise SkipNoHash(digestname, interface=interface) from exc | ||||
|     return functools.partial(module.new, digestname) | ||||
| 
 | ||||
| 
 | ||||
| def _builtin_hash(module_name, digestname, /, **kwargs): | ||||
|     """Check availability of <module_name>.<digestname>(**kwargs). | ||||
| 
 | ||||
|     - The *module_name* is the C extension module name based on HACL*. | ||||
|     - The *digestname* is one of its member, e.g., 'md5'. | ||||
| 
 | ||||
|     The constructor function is returned, or SkipTest is raised if none exists. | ||||
|     """ | ||||
|     assert isinstance(module_name, str), module_name | ||||
|     assert isinstance(digestname, str), digestname | ||||
|     fullname = f'{module_name}.{digestname}' | ||||
|     try: | ||||
|         builtin_module = importlib.import_module(module_name) | ||||
|     except ImportError as exc: | ||||
|         raise SkipNoHash(fullname, "builtin") from exc | ||||
|     try: | ||||
|         constructor = getattr(builtin_module, digestname) | ||||
|     except AttributeError as exc: | ||||
|         raise SkipNoHash(fullname, "builtin") from exc | ||||
|     try: | ||||
|         constructor(**kwargs) | ||||
|     except ValueError as exc: | ||||
|         raise SkipNoHash(fullname, "builtin") from exc | ||||
|     return constructor | ||||
| 
 | ||||
| 
 | ||||
| def _openssl_new(digestname, /, **kwargs): | ||||
|     """Check availability of _hashlib.new(digestname, **kwargs). | ||||
| 
 | ||||
|     The constructor function is returned (without binding **kwargs), | ||||
|     or SkipTest is raised if none exists. | ||||
|     """ | ||||
|     assert isinstance(digestname, str), digestname | ||||
|     try: | ||||
|         _hashlib.new(digestname, usedforsecurity=usedforsecurity) | ||||
|     except AttributeError: | ||||
|         assert _hashlib is None | ||||
|         _missing_hash(digestname, "OpenSSL") | ||||
|         # re-import '_hashlib' in case it was mocked | ||||
|         _hashlib = importlib.import_module("_hashlib") | ||||
|     except ImportError as exc: | ||||
|         raise SkipNoHash(digestname, "openssl") from exc | ||||
|     try: | ||||
|         _hashlib.new(digestname, **kwargs) | ||||
|     except ValueError as exc: | ||||
|         _missing_hash(digestname, "OpenSSL", exc=exc) | ||||
|         raise SkipNoHash(digestname, interface="_hashlib.new") from exc | ||||
|     return functools.partial(_hashlib.new, digestname) | ||||
| 
 | ||||
| 
 | ||||
| def _decorate_func_or_class(func_or_class, decorator_func): | ||||
|     if not isinstance(func_or_class, type): | ||||
|         return decorator_func(func_or_class) | ||||
| def _openssl_hash(digestname, /, **kwargs): | ||||
|     """Check availability of _hashlib.openssl_<digestname>(**kwargs). | ||||
| 
 | ||||
|     decorated_class = func_or_class | ||||
|     setUpClass = decorated_class.__dict__.get('setUpClass') | ||||
|     if setUpClass is None: | ||||
|         def setUpClass(cls): | ||||
|             super(decorated_class, cls).setUpClass() | ||||
|         setUpClass.__qualname__ = decorated_class.__qualname__ + '.setUpClass' | ||||
|         setUpClass.__module__ = decorated_class.__module__ | ||||
|     else: | ||||
|         setUpClass = setUpClass.__func__ | ||||
|     setUpClass = classmethod(decorator_func(setUpClass)) | ||||
|     decorated_class.setUpClass = setUpClass | ||||
|     return decorated_class | ||||
|     The constructor function is returned (without binding **kwargs), | ||||
|     or SkipTest is raised if none exists. | ||||
|     """ | ||||
|     assert isinstance(digestname, str), digestname | ||||
|     fullname = f"_hashlib.openssl_{digestname}" | ||||
|     try: | ||||
|         # re-import '_hashlib' in case it was mocked | ||||
|         _hashlib = importlib.import_module("_hashlib") | ||||
|     except ImportError as exc: | ||||
|         raise SkipNoHash(fullname, "openssl") from exc | ||||
|     try: | ||||
|         constructor = getattr(_hashlib, f"openssl_{digestname}", None) | ||||
|     except AttributeError as exc: | ||||
|         raise SkipNoHash(fullname, "openssl") from exc | ||||
|     try: | ||||
|         constructor(**kwargs) | ||||
|     except ValueError as exc: | ||||
|         raise SkipNoHash(fullname, "openssl") from exc | ||||
|     return constructor | ||||
| 
 | ||||
| 
 | ||||
| def requires_hashdigest(digestname, openssl=None, usedforsecurity=True): | ||||
| def _make_requires_hashdigest_decorator(test, /, *test_args, **test_kwargs): | ||||
|     def decorator_func(func): | ||||
|         @functools.wraps(func) | ||||
|         def wrapper(*args, **kwargs): | ||||
|             test(*test_args, **test_kwargs) | ||||
|             return func(*args, **kwargs) | ||||
|         return wrapper | ||||
|     return functools.partial(_decorate_func_or_class, decorator_func) | ||||
| 
 | ||||
| 
 | ||||
| def requires_hashdigest(digestname, openssl=None, *, usedforsecurity=True): | ||||
|     """Decorator raising SkipTest if a hashing algorithm is not available. | ||||
| 
 | ||||
|     The hashing algorithm may be missing, blocked by a strict crypto policy, | ||||
|  | @ -167,27 +370,9 @@ def requires_hashdigest(digestname, openssl=None, usedforsecurity=True): | |||
|     ValueError: [digital envelope routines: EVP_DigestInit_ex] disabled for FIPS | ||||
|     ValueError: unsupported hash type md4 | ||||
|     """ | ||||
|     assert isinstance(digestname, str), digestname | ||||
|     if openssl and _hashlib is not None: | ||||
|         def test_availability(): | ||||
|             _hashlib.new(digestname, usedforsecurity=usedforsecurity) | ||||
|     else: | ||||
|         def test_availability(): | ||||
|             hashlib.new(digestname, usedforsecurity=usedforsecurity) | ||||
| 
 | ||||
|     def decorator_func(func): | ||||
|         @functools.wraps(func) | ||||
|         def wrapper(*args, **kwargs): | ||||
|             try: | ||||
|                 test_availability() | ||||
|             except ValueError as exc: | ||||
|                 _missing_hash(digestname, exc=exc) | ||||
|             return func(*args, **kwargs) | ||||
|         return wrapper | ||||
| 
 | ||||
|     def decorator(func_or_class): | ||||
|         return _decorate_func_or_class(func_or_class, decorator_func) | ||||
|     return decorator | ||||
|     return _make_requires_hashdigest_decorator( | ||||
|         _hashlib_new, digestname, openssl, usedforsecurity=usedforsecurity | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def requires_openssl_hashdigest(digestname, *, usedforsecurity=True): | ||||
|  | @ -195,27 +380,9 @@ def requires_openssl_hashdigest(digestname, *, usedforsecurity=True): | |||
| 
 | ||||
|     The hashing algorithm may be missing or blocked by a strict crypto policy. | ||||
|     """ | ||||
|     assert isinstance(digestname, str), digestname | ||||
|     def decorator_func(func): | ||||
|         @requires_hashlib()  # avoid checking at each call | ||||
|         @functools.wraps(func) | ||||
|         def wrapper(*args, **kwargs): | ||||
|             _openssl_availabillity(digestname, usedforsecurity=usedforsecurity) | ||||
|             return func(*args, **kwargs) | ||||
|         return wrapper | ||||
| 
 | ||||
|     def decorator(func_or_class): | ||||
|         return _decorate_func_or_class(func_or_class, decorator_func) | ||||
|     return decorator | ||||
| 
 | ||||
| 
 | ||||
| def find_openssl_hashdigest_constructor(digestname, *, usedforsecurity=True): | ||||
|     """Find the OpenSSL hash function constructor by its name.""" | ||||
|     assert isinstance(digestname, str), digestname | ||||
|     _openssl_availabillity(digestname, usedforsecurity=usedforsecurity) | ||||
|     # This returns a function of the form _hashlib.openssl_<name> and | ||||
|     # not a lambda function as it is rejected by _hashlib.hmac_new(). | ||||
|     return getattr(_hashlib, f"openssl_{digestname}") | ||||
|     return _make_requires_hashdigest_decorator( | ||||
|         _openssl_new, digestname, usedforsecurity=usedforsecurity | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def requires_builtin_hashdigest( | ||||
|  | @ -226,40 +393,22 @@ def requires_builtin_hashdigest( | |||
|     - The *module_name* is the C extension module name based on HACL*. | ||||
|     - The *digestname* is one of its member, e.g., 'md5'. | ||||
|     """ | ||||
|     assert isinstance(digestname, str), digestname | ||||
|     def decorator_func(func): | ||||
|         @functools.wraps(func) | ||||
|         def wrapper(*args, **kwargs): | ||||
|             module = import_module(module_name) | ||||
|             try: | ||||
|                 getattr(module, digestname) | ||||
|             except AttributeError: | ||||
|                 fullname = f'{module_name}.{digestname}' | ||||
|                 _missing_hash(fullname, implementation="HACL") | ||||
|             return func(*args, **kwargs) | ||||
|         return wrapper | ||||
| 
 | ||||
|     def decorator(func_or_class): | ||||
|         return _decorate_func_or_class(func_or_class, decorator_func) | ||||
|     return decorator | ||||
|     return _make_requires_hashdigest_decorator( | ||||
|         _builtin_hash, module_name, digestname, usedforsecurity=usedforsecurity | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def find_builtin_hashdigest_constructor( | ||||
|     module_name, digestname, *, usedforsecurity=True | ||||
| ): | ||||
|     """Find the HACL* hash function constructor. | ||||
| 
 | ||||
|     - The *module_name* is the C extension module name based on HACL*. | ||||
|     - The *digestname* is one of its member, e.g., 'md5'. | ||||
|     """ | ||||
|     assert isinstance(digestname, str), digestname | ||||
|     module = import_module(module_name) | ||||
|     try: | ||||
|         constructor = getattr(module, digestname) | ||||
|         constructor(b'', usedforsecurity=usedforsecurity) | ||||
|     except (AttributeError, TypeError, ValueError): | ||||
|         _missing_hash(f'{module_name}.{digestname}', implementation="HACL") | ||||
|     return constructor | ||||
| def requires_builtin_hashes(*ignored, usedforsecurity=True): | ||||
|     """Decorator raising SkipTest if one HACL* hashing algorithm is missing.""" | ||||
|     return _chain_decorators(( | ||||
|         requires_builtin_hashdigest( | ||||
|             api.builtin_module_name, | ||||
|             api.builtin_method_name, | ||||
|             usedforsecurity=usedforsecurity, | ||||
|         ) | ||||
|         for name, api in _EXPLICIT_CONSTRUCTORS.items() | ||||
|         if name not in ignored | ||||
|     )) | ||||
| 
 | ||||
| 
 | ||||
| class HashFunctionsTrait: | ||||
|  | @ -281,7 +430,9 @@ class HashFunctionsTrait: | |||
|         'sha3_224', 'sha3_256', 'sha3_384', 'sha3_512', | ||||
|     ] | ||||
| 
 | ||||
|     # Default 'usedforsecurity' to use when looking up a hash function. | ||||
|     # Default 'usedforsecurity' to use when checking a hash function. | ||||
|     # When the trait properties are callables (e.g., _md5.md5) and | ||||
|     # not strings, they must be called with the same 'usedforsecurity'. | ||||
|     usedforsecurity = True | ||||
| 
 | ||||
|     @classmethod | ||||
|  | @ -357,9 +508,9 @@ class OpenSSLHashFunctionsTrait(HashFunctionsTrait): | |||
| 
 | ||||
|     def _find_constructor(self, digestname): | ||||
|         self.is_valid_digest_name(digestname) | ||||
|         return find_openssl_hashdigest_constructor( | ||||
|             digestname, usedforsecurity=self.usedforsecurity | ||||
|         ) | ||||
|         # This returns a function of the form _hashlib.openssl_<name> and | ||||
|         # not a lambda function as it is rejected by _hashlib.hmac_new(). | ||||
|         return _openssl_hash(digestname, usedforsecurity=self.usedforsecurity) | ||||
| 
 | ||||
| 
 | ||||
| class BuiltinHashFunctionsTrait(HashFunctionsTrait): | ||||
|  | @ -370,49 +521,14 @@ class BuiltinHashFunctionsTrait(HashFunctionsTrait): | |||
|     is not since the former is unconditionally built. | ||||
|     """ | ||||
| 
 | ||||
|     def _find_constructor_in(self, module, digestname): | ||||
|     def _find_constructor(self, digestname): | ||||
|         self.is_valid_digest_name(digestname) | ||||
|         return find_builtin_hashdigest_constructor(module, digestname) | ||||
| 
 | ||||
|     @property | ||||
|     def md5(self): | ||||
|         return self._find_constructor_in("_md5", "md5") | ||||
| 
 | ||||
|     @property | ||||
|     def sha1(self): | ||||
|         return self._find_constructor_in("_sha1", "sha1") | ||||
| 
 | ||||
|     @property | ||||
|     def sha224(self): | ||||
|         return self._find_constructor_in("_sha2", "sha224") | ||||
| 
 | ||||
|     @property | ||||
|     def sha256(self): | ||||
|         return self._find_constructor_in("_sha2", "sha256") | ||||
| 
 | ||||
|     @property | ||||
|     def sha384(self): | ||||
|         return self._find_constructor_in("_sha2", "sha384") | ||||
| 
 | ||||
|     @property | ||||
|     def sha512(self): | ||||
|         return self._find_constructor_in("_sha2", "sha512") | ||||
| 
 | ||||
|     @property | ||||
|     def sha3_224(self): | ||||
|         return self._find_constructor_in("_sha3", "sha3_224") | ||||
| 
 | ||||
|     @property | ||||
|     def sha3_256(self): | ||||
|         return self._find_constructor_in("_sha3","sha3_256") | ||||
| 
 | ||||
|     @property | ||||
|     def sha3_384(self): | ||||
|         return self._find_constructor_in("_sha3","sha3_384") | ||||
| 
 | ||||
|     @property | ||||
|     def sha3_512(self): | ||||
|         return self._find_constructor_in("_sha3","sha3_512") | ||||
|         info = _EXPLICIT_CONSTRUCTORS[digestname] | ||||
|         return _builtin_hash( | ||||
|             info.builtin_module_name, | ||||
|             info.builtin_method_name, | ||||
|             usedforsecurity=self.usedforsecurity, | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| def find_gil_minsize(modules_names, default=2048): | ||||
|  | @ -426,38 +542,45 @@ def find_gil_minsize(modules_names, default=2048): | |||
|     """ | ||||
|     sizes = [] | ||||
|     for module_name in modules_names: | ||||
|         try: | ||||
|             module = importlib.import_module(module_name) | ||||
|         except ImportError: | ||||
|             continue | ||||
|         sizes.append(getattr(module, '_GIL_MINSIZE', default)) | ||||
|         module = try_import_module(module_name) | ||||
|         if module is not None: | ||||
|             sizes.append(getattr(module, '_GIL_MINSIZE', default)) | ||||
|     return max(sizes, default=default) | ||||
| 
 | ||||
| 
 | ||||
| def _block_openssl_hash_new(blocked_name): | ||||
|     """Block OpenSSL implementation of _hashlib.new().""" | ||||
|     assert isinstance(blocked_name, str), blocked_name | ||||
|     if _hashlib is None: | ||||
| 
 | ||||
|     # re-import '_hashlib' in case it was mocked | ||||
|     if (_hashlib := try_import_module("_hashlib")) is None: | ||||
|         return contextlib.nullcontext() | ||||
| 
 | ||||
|     @functools.wraps(wrapped := _hashlib.new) | ||||
|     def wrapper(name, data=b'', *, usedforsecurity=True, string=None): | ||||
|     def _hashlib_new(name, data=b'', *, usedforsecurity=True, string=None): | ||||
|         if name == blocked_name: | ||||
|             raise _hashlib.UnsupportedDigestmodError(blocked_name) | ||||
|         return wrapped(*args, **kwargs) | ||||
|     _ensure_wrapper_signature(wrapper, wrapped) | ||||
|     return unittest.mock.patch('_hashlib.new', wrapper) | ||||
|         return wrapped(name, data, | ||||
|                        usedforsecurity=usedforsecurity, string=string) | ||||
| 
 | ||||
|     _ensure_wrapper_signature(_hashlib_new, wrapped) | ||||
|     return unittest.mock.patch('_hashlib.new', _hashlib_new) | ||||
| 
 | ||||
| 
 | ||||
| def _block_openssl_hmac_new(blocked_name): | ||||
|     """Block OpenSSL HMAC-HASH implementation.""" | ||||
|     assert isinstance(blocked_name, str), blocked_name | ||||
|     if _hashlib is None: | ||||
| 
 | ||||
|     # re-import '_hashlib' in case it was mocked | ||||
|     if (_hashlib := try_import_module("_hashlib")) is None: | ||||
|         return contextlib.nullcontext() | ||||
| 
 | ||||
|     @functools.wraps(wrapped := _hashlib.hmac_new) | ||||
|     def wrapper(key, msg=b'', digestmod=None): | ||||
|         if digestmod == blocked_name: | ||||
|             raise _hashlib.UnsupportedDigestmodError(blocked_name) | ||||
|         return wrapped(key, msg, digestmod) | ||||
| 
 | ||||
|     _ensure_wrapper_signature(wrapper, wrapped) | ||||
|     return unittest.mock.patch('_hashlib.hmac_new', wrapper) | ||||
| 
 | ||||
|  | @ -465,112 +588,132 @@ def wrapper(key, msg=b'', digestmod=None): | |||
| def _block_openssl_hmac_digest(blocked_name): | ||||
|     """Block OpenSSL HMAC-HASH one-shot digest implementation.""" | ||||
|     assert isinstance(blocked_name, str), blocked_name | ||||
|     if _hashlib is None: | ||||
| 
 | ||||
|     # re-import '_hashlib' in case it was mocked | ||||
|     if (_hashlib := try_import_module("_hashlib")) is None: | ||||
|         return contextlib.nullcontext() | ||||
| 
 | ||||
|     @functools.wraps(wrapped := _hashlib.hmac_digest) | ||||
|     def wrapper(key, msg, digest): | ||||
|     def _hashlib_hmac_digest(key, msg, digest): | ||||
|         if digest == blocked_name: | ||||
|             raise _hashlib.UnsupportedDigestmodError(blocked_name) | ||||
|         return wrapped(key, msg, digestmod) | ||||
|     _ensure_wrapper_signature(wrapper, wrapped) | ||||
|     return unittest.mock.patch('_hashlib.hmac_digest', wrapper) | ||||
|         return wrapped(key, msg, digest) | ||||
| 
 | ||||
|     _ensure_wrapper_signature(_hashlib_hmac_digest, wrapped) | ||||
|     return unittest.mock.patch('_hashlib.hmac_digest', _hashlib_hmac_digest) | ||||
| 
 | ||||
| 
 | ||||
| @contextlib.contextmanager | ||||
| def _block_builtin_hash_new(name): | ||||
|     """Block a buitin-in hash name from the hashlib.new() interface.""" | ||||
|     assert isinstance(name, str), name | ||||
|     assert name.lower() == name, f"invalid name: {name}" | ||||
|     assert name in HID, f"invalid hash: {name}" | ||||
| 
 | ||||
|     builtin_cache = getattr(hashlib, '__builtin_constructor_cache') | ||||
|     if name in builtin_cache: | ||||
|         f = builtin_cache.pop(name) | ||||
|         F = builtin_cache.pop(name.upper(), None) | ||||
|     else: | ||||
|         f = F = None | ||||
|     try: | ||||
|         yield | ||||
|     finally: | ||||
|         if f is not None: | ||||
|             builtin_cache[name] = f | ||||
|         if F is not None: | ||||
|             builtin_cache[name.upper()] = F | ||||
|     # Re-import 'hashlib' in case it was mocked | ||||
|     hashlib = importlib.import_module('hashlib') | ||||
|     builtin_constructor_cache = getattr(hashlib, '__builtin_constructor_cache') | ||||
|     builtin_constructor_cache_mock = builtin_constructor_cache.copy() | ||||
|     builtin_constructor_cache_mock.pop(name, None) | ||||
|     builtin_constructor_cache_mock.pop(name.upper(), None) | ||||
| 
 | ||||
|     # __get_builtin_constructor() imports the HACL* modules on demand, | ||||
|     # so we need to block the possibility of importing it, but only | ||||
|     # during the call to __get_builtin_constructor(). | ||||
|     get_builtin_constructor = getattr(hashlib, '__get_builtin_constructor') | ||||
|     builtin_module_name = _EXPLICIT_CONSTRUCTORS[name].builtin_module_name | ||||
| 
 | ||||
|     @functools.wraps(get_builtin_constructor) | ||||
|     def get_builtin_constructor_mock(name): | ||||
|         with import_helper.isolated_modules(): | ||||
|             sys = importlib.import_module("sys") | ||||
|             sys.modules[builtin_module_name] = None  # block module's import | ||||
|             return get_builtin_constructor(name) | ||||
| 
 | ||||
|     return unittest.mock.patch.multiple( | ||||
|         hashlib, | ||||
|         __get_builtin_constructor=get_builtin_constructor_mock, | ||||
|         __builtin_constructor_cache=builtin_constructor_cache_mock | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def _block_builtin_hmac_new(blocked_name): | ||||
|     assert isinstance(blocked_name, str), blocked_name | ||||
|     if _hmac is None: | ||||
| 
 | ||||
|     # re-import '_hmac' in case it was mocked | ||||
|     if (_hmac := try_import_module("_hmac")) is None: | ||||
|         return contextlib.nullcontext() | ||||
| 
 | ||||
|     @functools.wraps(wrapped := _hmac.new) | ||||
|     def wrapper(key, msg=None, digestmod=None): | ||||
|     def _hmac_new(key, msg=None, digestmod=None): | ||||
|         if digestmod == blocked_name: | ||||
|             raise _hmac.UnknownHashError(blocked_name) | ||||
|         return wrapped(key, msg, digestmod) | ||||
|     _ensure_wrapper_signature(wrapper, wrapped) | ||||
|     return unittest.mock.patch('_hmac.new', wrapper) | ||||
| 
 | ||||
|     _ensure_wrapper_signature(_hmac_new, wrapped) | ||||
|     return unittest.mock.patch('_hmac.new', _hmac_new) | ||||
| 
 | ||||
| 
 | ||||
| def _block_builtin_hmac_digest(blocked_name): | ||||
|     assert isinstance(blocked_name, str), blocked_name | ||||
|     if _hmac is None: | ||||
| 
 | ||||
|     # re-import '_hmac' in case it was mocked | ||||
|     if (_hmac := try_import_module("_hmac")) is None: | ||||
|         return contextlib.nullcontext() | ||||
| 
 | ||||
|     @functools.wraps(wrapped := _hmac.compute_digest) | ||||
|     def wrapper(key, msg, digest): | ||||
|     def _hmac_compute_digest(key, msg, digest): | ||||
|         if digest == blocked_name: | ||||
|             raise _hmac.UnknownHashError(blocked_name) | ||||
|         return wrapped(key, msg, digest) | ||||
|     _ensure_wrapper_signature(wrapper, wrapped) | ||||
|     return unittest.mock.patch('_hmac.compute_digest', wrapper) | ||||
| 
 | ||||
|     _ensure_wrapper_signature(_hmac_compute_digest, wrapped) | ||||
|     return unittest.mock.patch('_hmac.compute_digest', _hmac_compute_digest) | ||||
| 
 | ||||
| 
 | ||||
| def _make_hash_constructor_blocker(name, dummy, *, interface): | ||||
|     assert isinstance(name, str), name | ||||
|     assert interface in ('builtin', 'openssl', 'hashlib') | ||||
|     assert name in _EXPLICIT_CONSTRUCTORS, f"invalid hash: {name}" | ||||
|     fullname = _EXPLICIT_CONSTRUCTORS[name].fullname(interface) | ||||
|     if fullname is None: | ||||
| def _make_hash_constructor_blocker(name, dummy, implementation): | ||||
|     info = _EXPLICIT_CONSTRUCTORS[name] | ||||
|     module_name = info.module_name(implementation) | ||||
|     method_name = info.method_name(implementation) | ||||
|     if module_name is None or method_name is None: | ||||
|         # function shouldn't exist for this implementation | ||||
|         return contextlib.nullcontext() | ||||
|     assert fullname.count('.') == 1, fullname | ||||
|     module_name, method = fullname.split('.', maxsplit=1) | ||||
| 
 | ||||
|     try: | ||||
|         module = importlib.import_module(module_name) | ||||
|     except ImportError: | ||||
|         # module is already disabled | ||||
|         return contextlib.nullcontext() | ||||
|     wrapped = getattr(module, method) | ||||
| 
 | ||||
|     wrapped = getattr(module, method_name) | ||||
|     wrapper = functools.wraps(wrapped)(dummy) | ||||
|     _ensure_wrapper_signature(wrapper, wrapped) | ||||
|     return unittest.mock.patch(fullname, wrapper) | ||||
|     return unittest.mock.patch(info.fullname(implementation), wrapper) | ||||
| 
 | ||||
| 
 | ||||
| def _block_hashlib_hash_constructor(name): | ||||
|     """Block explicit public constructors.""" | ||||
|     assert isinstance(name, str), name | ||||
|     def dummy(data=b'', *, usedforsecurity=True, string=None): | ||||
|         raise ValueError(f"unsupported hash name: {name}") | ||||
|     return _make_hash_constructor_blocker(name, dummy, interface='hashlib') | ||||
|         raise ValueError(f"blocked explicit public hash name: {name}") | ||||
| 
 | ||||
|     return _make_hash_constructor_blocker(name, dummy, 'hashlib') | ||||
| 
 | ||||
| 
 | ||||
| def _block_openssl_hash_constructor(name): | ||||
|     """Block explicit OpenSSL constructors.""" | ||||
|     assert isinstance(name, str), name | ||||
|     def dummy(data=b'', *, usedforsecurity=True, string=None): | ||||
|         raise ValueError(f"unsupported hash name: {name}") | ||||
|     return _make_hash_constructor_blocker(name, dummy, interface='openssl') | ||||
|         raise ValueError(f"blocked explicit OpenSSL hash name: {name}") | ||||
|     return _make_hash_constructor_blocker(name, dummy, 'openssl') | ||||
| 
 | ||||
| 
 | ||||
| def _block_builtin_hash_constructor(name): | ||||
|     """Block explicit HACL* constructors.""" | ||||
|     assert isinstance(name, str), name | ||||
|     def dummy(data=b'', *, usedforsecurity=True, string=b''): | ||||
|         raise ValueError(f"unsupported hash name: {name}") | ||||
|     return _make_hash_constructor_blocker(name, dummy, interface='builtin') | ||||
|         raise ValueError(f"blocked explicit builtin hash name: {name}") | ||||
|     return _make_hash_constructor_blocker(name, dummy, 'builtin') | ||||
| 
 | ||||
| 
 | ||||
| def _block_builtin_hmac_constructor(name): | ||||
|     """Block explicit HACL* HMAC constructors.""" | ||||
|     assert isinstance(name, str), name | ||||
|     assert name in _EXPLICIT_HMAC_CONSTRUCTORS, f"invalid hash: {name}" | ||||
|     fullname = _EXPLICIT_HMAC_CONSTRUCTORS[name] | ||||
|     if fullname is None: | ||||
|         # function shouldn't exist for this implementation | ||||
|  | @ -585,7 +728,7 @@ def _block_builtin_hmac_constructor(name): | |||
|         return contextlib.nullcontext() | ||||
|     @functools.wraps(wrapped := getattr(module, method)) | ||||
|     def wrapper(key, obj): | ||||
|         raise ValueError(f"unsupported hash name: {name}") | ||||
|         raise ValueError(f"blocked hash name: {name}") | ||||
|     _ensure_wrapper_signature(wrapper, wrapped) | ||||
|     return unittest.mock.patch(fullname, wrapper) | ||||
| 
 | ||||
|  | @ -600,22 +743,54 @@ def block_algorithm(name, *, allow_openssl=False, allow_builtin=False): | |||
|     """ | ||||
|     with contextlib.ExitStack() as stack: | ||||
|         if not (allow_openssl or allow_builtin): | ||||
|             # If one of the private interface is allowed, then the | ||||
|             # public interface will fallback to it even though the | ||||
|             # comment in hashlib.py says otherwise. | ||||
|             # Named constructors have a different behavior in the sense | ||||
|             # that they are either built-ins or OpenSSL ones, but not | ||||
|             # "agile" ones (namely once "hashlib" has been imported, | ||||
|             # they are fixed). | ||||
|             # | ||||
|             # So we should only block it if the private interfaces | ||||
|             # are blocked as well. | ||||
|             # If OpenSSL is not available, hashes fall back to built-in ones, | ||||
|             # in which case we don't need to block the explicit public hashes | ||||
|             # as they will call a mocked one. | ||||
|             # | ||||
|             # If OpenSSL is available, hashes fall back to "openssl_*" ones, | ||||
|             # except for BLAKE2b and BLAKE2s. | ||||
|             stack.enter_context(_block_hashlib_hash_constructor(name)) | ||||
|         elif ( | ||||
|             # In FIPS mode, hashlib.<name>() functions may raise if they use | ||||
|             # the OpenSSL implementation, except with usedforsecurity=False. | ||||
|             # However, blocking such functions also means blocking them | ||||
|             # so we again need to block them if we want to. | ||||
|             (_hashlib := try_import_module("_hashlib")) | ||||
|             and _hashlib.get_fips_mode() | ||||
|             and not allow_openssl | ||||
|         ) or ( | ||||
|             # Without OpenSSL, hashlib.<name>() functions are aliases | ||||
|             # to built-in functions, so both of them must be blocked | ||||
|             # as the module may have been imported before the HACL ones. | ||||
|             not (_hashlib := try_import_module("_hashlib")) | ||||
|             and not allow_builtin | ||||
|         ): | ||||
|             stack.enter_context(_block_hashlib_hash_constructor(name)) | ||||
| 
 | ||||
|         if not allow_openssl: | ||||
|             # _hashlib.new() | ||||
|             stack.enter_context(_block_openssl_hash_new(name)) | ||||
|             stack.enter_context(_block_openssl_hmac_new(name)) | ||||
|             stack.enter_context(_block_openssl_hmac_digest(name)) | ||||
|             # _hashlib.openssl_*() | ||||
|             stack.enter_context(_block_openssl_hash_constructor(name)) | ||||
|             # _hashlib.hmac_new() | ||||
|             stack.enter_context(_block_openssl_hmac_new(name)) | ||||
|             # _hashlib.hmac_digest() | ||||
|             stack.enter_context(_block_openssl_hmac_digest(name)) | ||||
| 
 | ||||
|         if not allow_builtin: | ||||
|             # __get_builtin_constructor(name) | ||||
|             stack.enter_context(_block_builtin_hash_new(name)) | ||||
|             stack.enter_context(_block_builtin_hmac_new(name)) | ||||
|             stack.enter_context(_block_builtin_hmac_digest(name)) | ||||
|             # <built-in module>.<built-in name>() | ||||
|             stack.enter_context(_block_builtin_hash_constructor(name)) | ||||
|             # _hmac.new(..., name) | ||||
|             stack.enter_context(_block_builtin_hmac_new(name)) | ||||
|             # _hmac.compute_<name>() | ||||
|             stack.enter_context(_block_builtin_hmac_constructor(name)) | ||||
|             # _hmac.compute_digest(..., name) | ||||
|             stack.enter_context(_block_builtin_hmac_digest(name)) | ||||
|         yield | ||||
|  |  | |||
|  | @ -545,13 +545,17 @@ def check(self, name, data, hexdigest, shake=False, **kwargs): | |||
| 
 | ||||
|     def check_file_digest(self, name, data, hexdigest): | ||||
|         hexdigest = hexdigest.lower() | ||||
|         try: | ||||
|             hashlib.new(name) | ||||
|         except ValueError: | ||||
|             # skip, algorithm is blocked by security policy. | ||||
|             return | ||||
|         digests = [name] | ||||
|         digests.extend(self.constructors_to_test[name]) | ||||
|         digests = [] | ||||
|         for digest in [name, *self.constructors_to_test[name]]: | ||||
|             try: | ||||
|                 if callable(digest): | ||||
|                     digest(b"") | ||||
|                 else: | ||||
|                     hashlib.new(digest) | ||||
|             except ValueError: | ||||
|                 # skip, algorithm is blocked by security policy. | ||||
|                 continue | ||||
|             digests.append(digest) | ||||
| 
 | ||||
|         with tempfile.TemporaryFile() as f: | ||||
|             f.write(data) | ||||
|  |  | |||
|  | @ -2,6 +2,7 @@ | |||
| import errno | ||||
| import importlib | ||||
| import itertools | ||||
| import inspect | ||||
| import io | ||||
| import logging | ||||
| import os | ||||
|  | @ -820,6 +821,7 @@ def test_linked_to_musl(self): | |||
|     # SuppressCrashReport | ||||
| 
 | ||||
| 
 | ||||
| @hashlib_helper.requires_builtin_hashes() | ||||
| class TestHashlibSupport(unittest.TestCase): | ||||
| 
 | ||||
|     @classmethod | ||||
|  | @ -828,11 +830,20 @@ def setUpClass(cls): | |||
|         cls.hashlib = import_helper.import_module("hashlib") | ||||
|         cls.hmac = import_helper.import_module("hmac") | ||||
| 
 | ||||
|         # We required the extension modules to be present since blocking | ||||
|         # HACL* implementations while allowing OpenSSL ones would still | ||||
|         # result in failures. | ||||
|         # All C extension modules must be present since blocking | ||||
|         # the built-in implementation while allowing OpenSSL or vice-versa | ||||
|         # may result in failures depending on the exposed built-in hashes. | ||||
|         cls._hashlib = import_helper.import_module("_hashlib") | ||||
|         cls._hmac = import_helper.import_module("_hmac") | ||||
|         cls._md5 = import_helper.import_module("_md5") | ||||
| 
 | ||||
|     def skip_if_fips_mode(self): | ||||
|         if self._hashlib.get_fips_mode(): | ||||
|             self.skipTest("disabled in FIPS mode") | ||||
| 
 | ||||
|     def skip_if_not_fips_mode(self): | ||||
|         if not self._hashlib.get_fips_mode(): | ||||
|             self.skipTest("requires FIPS mode") | ||||
| 
 | ||||
|     def check_context(self, disabled=True): | ||||
|         if disabled: | ||||
|  | @ -853,25 +864,19 @@ def try_import_attribute(self, fullname, default=None): | |||
|         except TypeError: | ||||
|             return default | ||||
| 
 | ||||
|     def validate_modules(self): | ||||
|         if hasattr(hashlib_helper, 'hashlib'): | ||||
|             self.assertIs(hashlib_helper.hashlib, self.hashlib) | ||||
|         if hasattr(hashlib_helper, 'hmac'): | ||||
|             self.assertIs(hashlib_helper.hmac, self.hmac) | ||||
| 
 | ||||
|     def fetch_hash_function(self, name, typ): | ||||
|         entry = hashlib_helper._EXPLICIT_CONSTRUCTORS[name] | ||||
|         match typ: | ||||
|     def fetch_hash_function(self, name, implementation): | ||||
|         info = hashlib_helper.get_hash_info(name) | ||||
|         match implementation: | ||||
|             case "hashlib": | ||||
|                 assert entry.hashlib is not None, entry | ||||
|                 return getattr(self.hashlib, entry.hashlib) | ||||
|                 assert info.hashlib is not None, info | ||||
|                 return getattr(self.hashlib, info.hashlib) | ||||
|             case "openssl": | ||||
|                 try: | ||||
|                     return getattr(self._hashlib, entry.openssl, None) | ||||
|                     return getattr(self._hashlib, info.openssl, None) | ||||
|                 except TypeError: | ||||
|                     return None | ||||
|             case "builtin": | ||||
|                 return self.try_import_attribute(entry.fullname(typ)) | ||||
|         fullname = info.fullname(implementation) | ||||
|         return self.try_import_attribute(fullname) | ||||
| 
 | ||||
|     def fetch_hmac_function(self, name): | ||||
|         fullname = hashlib_helper._EXPLICIT_HMAC_CONSTRUCTORS[name] | ||||
|  | @ -936,16 +941,12 @@ def check_builtin_hmac(self, name, *, disabled=True): | |||
|     ) | ||||
|     def test_disable_hash(self, name, allow_openssl, allow_builtin): | ||||
|         # In FIPS mode, the function may be available but would still need | ||||
|         # to raise a ValueError. For simplicity, we don't test the helper | ||||
|         # when we're in FIPS mode. | ||||
|         if self._hashlib.get_fips_mode(): | ||||
|             self.skipTest("hash functions may still be blocked in FIPS mode") | ||||
|         # to raise a ValueError, so we will test the helper separately. | ||||
|         self.skip_if_fips_mode() | ||||
|         flags = dict(allow_openssl=allow_openssl, allow_builtin=allow_builtin) | ||||
|         is_simple_disabled = not allow_builtin and not allow_openssl | ||||
|         is_fully_disabled = not allow_builtin and not allow_openssl | ||||
| 
 | ||||
|         with hashlib_helper.block_algorithm(name, **flags): | ||||
|             self.validate_modules() | ||||
| 
 | ||||
|             # OpenSSL's blake2s and blake2b are unknown names | ||||
|             # when only the OpenSSL interface is available. | ||||
|             if allow_openssl and not allow_builtin: | ||||
|  | @ -954,25 +955,104 @@ def test_disable_hash(self, name, allow_openssl, allow_builtin): | |||
|             else: | ||||
|                 name_for_hashlib_new = name | ||||
| 
 | ||||
|             with self.check_context(is_simple_disabled): | ||||
|             with self.check_context(is_fully_disabled): | ||||
|                 _ = self.hashlib.new(name_for_hashlib_new) | ||||
|             with self.check_context(is_simple_disabled): | ||||
|                 _ = getattr(self.hashlib, name)(b"") | ||||
| 
 | ||||
|             # Since _hashlib is present, explicit blake2b/blake2s constructors | ||||
|             # use the built-in implementation, while others (since we are not | ||||
|             # in FIPS mode and since _hashlib exists) use the OpenSSL function. | ||||
|             with self.check_context(is_fully_disabled): | ||||
|                 _ = getattr(self.hashlib, name)() | ||||
| 
 | ||||
|             self.check_openssl_hash(name, disabled=not allow_openssl) | ||||
|             self.check_builtin_hash(name, disabled=not allow_builtin) | ||||
| 
 | ||||
|             if name not in hashlib_helper.NON_HMAC_DIGEST_NAMES: | ||||
|                 with self.check_context(is_simple_disabled): | ||||
|                 with self.check_context(is_fully_disabled): | ||||
|                     _ = self.hmac.new(b"", b"", name) | ||||
|                 with self.check_context(is_simple_disabled): | ||||
|                 with self.check_context(is_fully_disabled): | ||||
|                     _ = self.hmac.HMAC(b"", b"", name) | ||||
|                 with self.check_context(is_simple_disabled): | ||||
|                 with self.check_context(is_fully_disabled): | ||||
|                     _ = self.hmac.digest(b"", b"", name) | ||||
| 
 | ||||
|                 self.check_openssl_hmac(name, disabled=not allow_openssl) | ||||
|                 self.check_builtin_hmac(name, disabled=not allow_builtin) | ||||
| 
 | ||||
|     @hashlib_helper.block_algorithm("md5") | ||||
|     def test_disable_hash_md5_in_fips_mode(self): | ||||
|         self.skip_if_not_fips_mode() | ||||
| 
 | ||||
|         self.assertRaises(ValueError, self.hashlib.new, "md5") | ||||
|         self.assertRaises(ValueError, self._hashlib.new, "md5") | ||||
|         self.assertRaises(ValueError, self.hashlib.md5) | ||||
|         self.assertRaises(ValueError, self._hashlib.openssl_md5) | ||||
| 
 | ||||
|         kwargs = dict(usedforsecurity=True) | ||||
|         self.assertRaises(ValueError, self.hashlib.new, "md5", **kwargs) | ||||
|         self.assertRaises(ValueError, self._hashlib.new, "md5", **kwargs) | ||||
|         self.assertRaises(ValueError, self.hashlib.md5, **kwargs) | ||||
|         self.assertRaises(ValueError, self._hashlib.openssl_md5, **kwargs) | ||||
| 
 | ||||
|     @hashlib_helper.block_algorithm("md5", allow_openssl=True) | ||||
|     def test_disable_hash_md5_in_fips_mode_allow_openssl(self): | ||||
|         self.skip_if_not_fips_mode() | ||||
|         # Allow the OpenSSL interface to be used but not the HACL* one. | ||||
|         # hashlib.new("md5") is dispatched to hashlib.openssl_md5() | ||||
|         self.assertRaises(ValueError, self.hashlib.new, "md5") | ||||
|         # dispatched to hashlib.openssl_md5() in FIPS mode | ||||
|         h2 = self.hashlib.new("md5", usedforsecurity=False) | ||||
|         self.assertIsInstance(h2, self._hashlib.HASH) | ||||
| 
 | ||||
|         # block_algorithm() does not mock hashlib.md5 and _hashlib.openssl_md5 | ||||
|         self.assertNotHasAttr(self.hashlib.md5, "__wrapped__") | ||||
|         self.assertNotHasAttr(self._hashlib.openssl_md5, "__wrapped__") | ||||
| 
 | ||||
|         hashlib_md5 = inspect.unwrap(self.hashlib.md5) | ||||
|         self.assertIs(hashlib_md5, self._hashlib.openssl_md5) | ||||
|         self.assertRaises(ValueError, self.hashlib.md5) | ||||
|         # allow MD5 to be used in FIPS mode if usedforsecurity=False | ||||
|         h3 = self.hashlib.md5(usedforsecurity=False) | ||||
|         self.assertIsInstance(h3, self._hashlib.HASH) | ||||
| 
 | ||||
|     @hashlib_helper.block_algorithm("md5", allow_builtin=True) | ||||
|     def test_disable_hash_md5_in_fips_mode_allow_builtin(self): | ||||
|         self.skip_if_not_fips_mode() | ||||
|         # Allow the HACL* interface to be used but not the OpenSSL one. | ||||
|         h1 = self.hashlib.new("md5")  # dispatched to _md5.md5() | ||||
|         self.assertNotIsInstance(h1, self._hashlib.HASH) | ||||
|         h2 = self.hashlib.new("md5", usedforsecurity=False) | ||||
|         self.assertIsInstance(h2, type(h1)) | ||||
| 
 | ||||
|         # block_algorithm() mocks hashlib.md5 and _hashlib.openssl_md5 | ||||
|         self.assertHasAttr(self.hashlib.md5, "__wrapped__") | ||||
|         self.assertHasAttr(self._hashlib.openssl_md5, "__wrapped__") | ||||
| 
 | ||||
|         hashlib_md5 = inspect.unwrap(self.hashlib.md5) | ||||
|         openssl_md5 = inspect.unwrap(self._hashlib.openssl_md5) | ||||
|         self.assertIs(hashlib_md5, openssl_md5) | ||||
|         self.assertRaises(ValueError, self.hashlib.md5) | ||||
|         self.assertRaises(ValueError, self.hashlib.md5, | ||||
|                           usedforsecurity=False) | ||||
| 
 | ||||
|     @hashlib_helper.block_algorithm("md5", | ||||
|                                     allow_openssl=True, | ||||
|                                     allow_builtin=True) | ||||
|     def test_disable_hash_md5_in_fips_mode_allow_all(self): | ||||
|         self.skip_if_not_fips_mode() | ||||
|         # hashlib.new() isn't blocked as it falls back to _md5.md5 | ||||
|         self.assertIsInstance(self.hashlib.new("md5"), self._md5.MD5Type) | ||||
|         self.assertRaises(ValueError, self._hashlib.new, "md5") | ||||
|         h = self._hashlib.new("md5", usedforsecurity=False) | ||||
|         self.assertIsInstance(h, self._hashlib.HASH) | ||||
| 
 | ||||
|         self.assertNotHasAttr(self.hashlib.md5, "__wrapped__") | ||||
|         self.assertNotHasAttr(self._hashlib.openssl_md5, "__wrapped__") | ||||
| 
 | ||||
|         self.assertIs(self.hashlib.md5, self._hashlib.openssl_md5) | ||||
|         self.assertRaises(ValueError, self.hashlib.md5) | ||||
|         h = self.hashlib.md5(usedforsecurity=False) | ||||
|         self.assertIsInstance(h, self._hashlib.HASH) | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == '__main__': | ||||
|     unittest.main() | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue
	
	 Bénédikt Tran
						Bénédikt Tran