mirror of
				https://github.com/python/cpython.git
				synced 2025-10-30 05:01:30 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			71 lines
		
	
	
	
		
			2.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			71 lines
		
	
	
	
		
			2.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import signal
 | |
| import weakref
 | |
| 
 | |
| from functools import wraps
 | |
| 
 | |
| __unittest = True
 | |
| 
 | |
| 
 | |
| class _InterruptHandler(object):
 | |
|     def __init__(self, default_handler):
 | |
|         self.called = False
 | |
|         self.original_handler = default_handler
 | |
|         if isinstance(default_handler, int):
 | |
|             if default_handler == signal.SIG_DFL:
 | |
|                 # Pretend it's signal.default_int_handler instead.
 | |
|                 default_handler = signal.default_int_handler
 | |
|             elif default_handler == signal.SIG_IGN:
 | |
|                 # Not quite the same thing as SIG_IGN, but the closest we
 | |
|                 # can make it: do nothing.
 | |
|                 def default_handler(unused_signum, unused_frame):
 | |
|                     pass
 | |
|             else:
 | |
|                 raise TypeError("expected SIGINT signal handler to be "
 | |
|                                 "signal.SIG_IGN, signal.SIG_DFL, or a "
 | |
|                                 "callable object")
 | |
|         self.default_handler = default_handler
 | |
| 
 | |
|     def __call__(self, signum, frame):
 | |
|         installed_handler = signal.getsignal(signal.SIGINT)
 | |
|         if installed_handler is not self:
 | |
|             # if we aren't the installed handler, then delegate immediately
 | |
|             # to the default handler
 | |
|             self.default_handler(signum, frame)
 | |
| 
 | |
|         if self.called:
 | |
|             self.default_handler(signum, frame)
 | |
|         self.called = True
 | |
|         for result in _results.keys():
 | |
|             result.stop()
 | |
| 
 | |
| _results = weakref.WeakKeyDictionary()
 | |
| def registerResult(result):
 | |
|     _results[result] = 1
 | |
| 
 | |
| def removeResult(result):
 | |
|     return bool(_results.pop(result, None))
 | |
| 
 | |
| _interrupt_handler = None
 | |
| def installHandler():
 | |
|     global _interrupt_handler
 | |
|     if _interrupt_handler is None:
 | |
|         default_handler = signal.getsignal(signal.SIGINT)
 | |
|         _interrupt_handler = _InterruptHandler(default_handler)
 | |
|         signal.signal(signal.SIGINT, _interrupt_handler)
 | |
| 
 | |
| 
 | |
| def removeHandler(method=None):
 | |
|     if method is not None:
 | |
|         @wraps(method)
 | |
|         def inner(*args, **kwargs):
 | |
|             initial = signal.getsignal(signal.SIGINT)
 | |
|             removeHandler()
 | |
|             try:
 | |
|                 return method(*args, **kwargs)
 | |
|             finally:
 | |
|                 signal.signal(signal.SIGINT, initial)
 | |
|         return inner
 | |
| 
 | |
|     global _interrupt_handler
 | |
|     if _interrupt_handler is not None:
 | |
|         signal.signal(signal.SIGINT, _interrupt_handler.original_handler)
 | 
