mirror of
				https://github.com/python/cpython.git
				synced 2025-10-30 21:21:22 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			147 lines
		
	
	
	
		
			6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			147 lines
		
	
	
	
		
			6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from _weakrefset import WeakSet
 | |
| 
 | |
| 
 | |
| def get_cache_token():
 | |
|     """Returns the current ABC cache token.
 | |
| 
 | |
|     The token is an opaque object (supporting equality testing) identifying the
 | |
|     current version of the ABC cache for virtual subclasses. The token changes
 | |
|     with every call to ``register()`` on any ABC.
 | |
|     """
 | |
|     return ABCMeta._abc_invalidation_counter
 | |
| 
 | |
| 
 | |
| class ABCMeta(type):
 | |
|     """Metaclass for defining Abstract Base Classes (ABCs).
 | |
| 
 | |
|     Use this metaclass to create an ABC.  An ABC can be subclassed
 | |
|     directly, and then acts as a mix-in class.  You can also register
 | |
|     unrelated concrete classes (even built-in classes) and unrelated
 | |
|     ABCs as 'virtual subclasses' -- these and their descendants will
 | |
|     be considered subclasses of the registering ABC by the built-in
 | |
|     issubclass() function, but the registering ABC won't show up in
 | |
|     their MRO (Method Resolution Order) nor will method
 | |
|     implementations defined by the registering ABC be callable (not
 | |
|     even via super()).
 | |
|     """
 | |
| 
 | |
|     # A global counter that is incremented each time a class is
 | |
|     # registered as a virtual subclass of anything.  It forces the
 | |
|     # negative cache to be cleared before its next use.
 | |
|     # Note: this counter is private. Use `abc.get_cache_token()` for
 | |
|     #       external code.
 | |
|     _abc_invalidation_counter = 0
 | |
| 
 | |
|     def __new__(mcls, name, bases, namespace, /, **kwargs):
 | |
|         cls = super().__new__(mcls, name, bases, namespace, **kwargs)
 | |
|         # Compute set of abstract method names
 | |
|         abstracts = {name
 | |
|                      for name, value in namespace.items()
 | |
|                      if getattr(value, "__isabstractmethod__", False)}
 | |
|         for base in bases:
 | |
|             for name in getattr(base, "__abstractmethods__", set()):
 | |
|                 value = getattr(cls, name, None)
 | |
|                 if getattr(value, "__isabstractmethod__", False):
 | |
|                     abstracts.add(name)
 | |
|         cls.__abstractmethods__ = frozenset(abstracts)
 | |
|         # Set up inheritance registry
 | |
|         cls._abc_registry = WeakSet()
 | |
|         cls._abc_cache = WeakSet()
 | |
|         cls._abc_negative_cache = WeakSet()
 | |
|         cls._abc_negative_cache_version = ABCMeta._abc_invalidation_counter
 | |
|         return cls
 | |
| 
 | |
|     def register(cls, subclass):
 | |
|         """Register a virtual subclass of an ABC.
 | |
| 
 | |
|         Returns the subclass, to allow usage as a class decorator.
 | |
|         """
 | |
|         if not isinstance(subclass, type):
 | |
|             raise TypeError("Can only register classes")
 | |
|         if issubclass(subclass, cls):
 | |
|             return subclass  # Already a subclass
 | |
|         # Subtle: test for cycles *after* testing for "already a subclass";
 | |
|         # this means we allow X.register(X) and interpret it as a no-op.
 | |
|         if issubclass(cls, subclass):
 | |
|             # This would create a cycle, which is bad for the algorithm below
 | |
|             raise RuntimeError("Refusing to create an inheritance cycle")
 | |
|         cls._abc_registry.add(subclass)
 | |
|         ABCMeta._abc_invalidation_counter += 1  # Invalidate negative cache
 | |
|         return subclass
 | |
| 
 | |
|     def _dump_registry(cls, file=None):
 | |
|         """Debug helper to print the ABC registry."""
 | |
|         print(f"Class: {cls.__module__}.{cls.__qualname__}", file=file)
 | |
|         print(f"Inv. counter: {get_cache_token()}", file=file)
 | |
|         for name in cls.__dict__:
 | |
|             if name.startswith("_abc_"):
 | |
|                 value = getattr(cls, name)
 | |
|                 if isinstance(value, WeakSet):
 | |
|                     value = set(value)
 | |
|                 print(f"{name}: {value!r}", file=file)
 | |
| 
 | |
|     def _abc_registry_clear(cls):
 | |
|         """Clear the registry (for debugging or testing)."""
 | |
|         cls._abc_registry.clear()
 | |
| 
 | |
|     def _abc_caches_clear(cls):
 | |
|         """Clear the caches (for debugging or testing)."""
 | |
|         cls._abc_cache.clear()
 | |
|         cls._abc_negative_cache.clear()
 | |
| 
 | |
|     def __instancecheck__(cls, instance):
 | |
|         """Override for isinstance(instance, cls)."""
 | |
|         # Inline the cache checking
 | |
|         subclass = instance.__class__
 | |
|         if subclass in cls._abc_cache:
 | |
|             return True
 | |
|         subtype = type(instance)
 | |
|         if subtype is subclass:
 | |
|             if (cls._abc_negative_cache_version ==
 | |
|                 ABCMeta._abc_invalidation_counter and
 | |
|                 subclass in cls._abc_negative_cache):
 | |
|                 return False
 | |
|             # Fall back to the subclass check.
 | |
|             return cls.__subclasscheck__(subclass)
 | |
|         return any(cls.__subclasscheck__(c) for c in (subclass, subtype))
 | |
| 
 | |
|     def __subclasscheck__(cls, subclass):
 | |
|         """Override for issubclass(subclass, cls)."""
 | |
|         if not isinstance(subclass, type):
 | |
|             raise TypeError('issubclass() arg 1 must be a class')
 | |
|         # Check cache
 | |
|         if subclass in cls._abc_cache:
 | |
|             return True
 | |
|         # Check negative cache; may have to invalidate
 | |
|         if cls._abc_negative_cache_version < ABCMeta._abc_invalidation_counter:
 | |
|             # Invalidate the negative cache
 | |
|             cls._abc_negative_cache = WeakSet()
 | |
|             cls._abc_negative_cache_version = ABCMeta._abc_invalidation_counter
 | |
|         elif subclass in cls._abc_negative_cache:
 | |
|             return False
 | |
|         # Check the subclass hook
 | |
|         ok = cls.__subclasshook__(subclass)
 | |
|         if ok is not NotImplemented:
 | |
|             assert isinstance(ok, bool)
 | |
|             if ok:
 | |
|                 cls._abc_cache.add(subclass)
 | |
|             else:
 | |
|                 cls._abc_negative_cache.add(subclass)
 | |
|             return ok
 | |
|         # Check if it's a direct subclass
 | |
|         if cls in getattr(subclass, '__mro__', ()):
 | |
|             cls._abc_cache.add(subclass)
 | |
|             return True
 | |
|         # Check if it's a subclass of a registered class (recursive)
 | |
|         for rcls in cls._abc_registry:
 | |
|             if issubclass(subclass, rcls):
 | |
|                 cls._abc_cache.add(subclass)
 | |
|                 return True
 | |
|         # Check if it's a subclass of a subclass (recursive)
 | |
|         for scls in cls.__subclasses__():
 | |
|             if issubclass(subclass, scls):
 | |
|                 cls._abc_cache.add(subclass)
 | |
|                 return True
 | |
|         # No dice; update negative cache
 | |
|         cls._abc_negative_cache.add(subclass)
 | |
|         return False
 | 
