mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 05:31:20 +00:00 
			
		
		
		
	 da5b701aee
			
		
	
	
		da5b701aee
		
	
	
	
	
		
			
			discussion. There are two places of documentation that still mention __context__: Doc/lib/libstdtypes.tex -- I wasn't quite sure how to rewrite that without spending a whole lot of time thinking about it; and whatsnew, which Andrew usually likes to change himself.
		
			
				
	
	
		
			366 lines
		
	
	
	
		
			9.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			366 lines
		
	
	
	
		
			9.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """Unit tests for contextlib.py, and other context managers."""
 | |
| 
 | |
| from __future__ import with_statement
 | |
| 
 | |
| import sys
 | |
| import os
 | |
| import decimal
 | |
| import tempfile
 | |
| import unittest
 | |
| import threading
 | |
| from contextlib import *  # Tests __all__
 | |
| from test.test_support import run_suite
 | |
| 
 | |
| class ContextManagerTestCase(unittest.TestCase):
 | |
| 
 | |
|     def test_contextfactory_plain(self):
 | |
|         state = []
 | |
|         @contextfactory
 | |
|         def woohoo():
 | |
|             state.append(1)
 | |
|             yield 42
 | |
|             state.append(999)
 | |
|         with woohoo() as x:
 | |
|             self.assertEqual(state, [1])
 | |
|             self.assertEqual(x, 42)
 | |
|             state.append(x)
 | |
|         self.assertEqual(state, [1, 42, 999])
 | |
| 
 | |
|     def test_contextfactory_finally(self):
 | |
|         state = []
 | |
|         @contextfactory
 | |
|         def woohoo():
 | |
|             state.append(1)
 | |
|             try:
 | |
|                 yield 42
 | |
|             finally:
 | |
|                 state.append(999)
 | |
|         try:
 | |
|             with woohoo() as x:
 | |
|                 self.assertEqual(state, [1])
 | |
|                 self.assertEqual(x, 42)
 | |
|                 state.append(x)
 | |
|                 raise ZeroDivisionError()
 | |
|         except ZeroDivisionError:
 | |
|             pass
 | |
|         else:
 | |
|             self.fail("Expected ZeroDivisionError")
 | |
|         self.assertEqual(state, [1, 42, 999])
 | |
| 
 | |
|     def test_contextfactory_no_reraise(self):
 | |
|         @contextfactory
 | |
|         def whee():
 | |
|             yield
 | |
|         ctx = whee()
 | |
|         ctx.__enter__()
 | |
|         # Calling __exit__ should not result in an exception
 | |
|         self.failIf(ctx.__exit__(TypeError, TypeError("foo"), None))
 | |
| 
 | |
|     def test_contextfactory_trap_yield_after_throw(self):
 | |
|         @contextfactory
 | |
|         def whoo():
 | |
|             try:
 | |
|                 yield
 | |
|             except:
 | |
|                 yield
 | |
|         ctx = whoo()
 | |
|         ctx.__enter__()
 | |
|         self.assertRaises(
 | |
|             RuntimeError, ctx.__exit__, TypeError, TypeError("foo"), None
 | |
|         )
 | |
| 
 | |
|     def test_contextfactory_except(self):
 | |
|         state = []
 | |
|         @contextfactory
 | |
|         def woohoo():
 | |
|             state.append(1)
 | |
|             try:
 | |
|                 yield 42
 | |
|             except ZeroDivisionError, e:
 | |
|                 state.append(e.args[0])
 | |
|                 self.assertEqual(state, [1, 42, 999])
 | |
|         with woohoo() as x:
 | |
|             self.assertEqual(state, [1])
 | |
|             self.assertEqual(x, 42)
 | |
|             state.append(x)
 | |
|             raise ZeroDivisionError(999)
 | |
|         self.assertEqual(state, [1, 42, 999])
 | |
| 
 | |
|     def test_contextfactory_attribs(self):
 | |
|         def attribs(**kw):
 | |
|             def decorate(func):
 | |
|                 for k,v in kw.items():
 | |
|                     setattr(func,k,v)
 | |
|                 return func
 | |
|             return decorate
 | |
|         @contextfactory
 | |
|         @attribs(foo='bar')
 | |
|         def baz(spam):
 | |
|             """Whee!"""
 | |
|         self.assertEqual(baz.__name__,'baz')
 | |
|         self.assertEqual(baz.foo, 'bar')
 | |
|         self.assertEqual(baz.__doc__, "Whee!")
 | |
| 
 | |
| class NestedTestCase(unittest.TestCase):
 | |
| 
 | |
|     # XXX This needs more work
 | |
| 
 | |
|     def test_nested(self):
 | |
|         @contextfactory
 | |
|         def a():
 | |
|             yield 1
 | |
|         @contextfactory
 | |
|         def b():
 | |
|             yield 2
 | |
|         @contextfactory
 | |
|         def c():
 | |
|             yield 3
 | |
|         with nested(a(), b(), c()) as (x, y, z):
 | |
|             self.assertEqual(x, 1)
 | |
|             self.assertEqual(y, 2)
 | |
|             self.assertEqual(z, 3)
 | |
| 
 | |
|     def test_nested_cleanup(self):
 | |
|         state = []
 | |
|         @contextfactory
 | |
|         def a():
 | |
|             state.append(1)
 | |
|             try:
 | |
|                 yield 2
 | |
|             finally:
 | |
|                 state.append(3)
 | |
|         @contextfactory
 | |
|         def b():
 | |
|             state.append(4)
 | |
|             try:
 | |
|                 yield 5
 | |
|             finally:
 | |
|                 state.append(6)
 | |
|         try:
 | |
|             with nested(a(), b()) as (x, y):
 | |
|                 state.append(x)
 | |
|                 state.append(y)
 | |
|                 1/0
 | |
|         except ZeroDivisionError:
 | |
|             self.assertEqual(state, [1, 4, 2, 5, 6, 3])
 | |
|         else:
 | |
|             self.fail("Didn't raise ZeroDivisionError")
 | |
| 
 | |
|     def test_nested_right_exception(self):
 | |
|         state = []
 | |
|         @contextfactory
 | |
|         def a():
 | |
|             yield 1
 | |
|         class b(object):
 | |
|             def __enter__(self):
 | |
|                 return 2
 | |
|             def __exit__(self, *exc_info):
 | |
|                 try:
 | |
|                     raise Exception()
 | |
|                 except:
 | |
|                     pass
 | |
|         try:
 | |
|             with nested(a(), b()) as (x, y):
 | |
|                 1/0
 | |
|         except ZeroDivisionError:
 | |
|             self.assertEqual((x, y), (1, 2))
 | |
|         except Exception:
 | |
|             self.fail("Reraised wrong exception")
 | |
|         else:
 | |
|             self.fail("Didn't raise ZeroDivisionError")
 | |
| 
 | |
|     def test_nested_b_swallows(self):
 | |
|         @contextfactory
 | |
|         def a():
 | |
|             yield
 | |
|         @contextfactory
 | |
|         def b():
 | |
|             try:
 | |
|                 yield
 | |
|             except:
 | |
|                 # Swallow the exception
 | |
|                 pass
 | |
|         try:
 | |
|             with nested(a(), b()):
 | |
|                 1/0
 | |
|         except ZeroDivisionError:
 | |
|             self.fail("Didn't swallow ZeroDivisionError")
 | |
| 
 | |
|     def test_nested_break(self):
 | |
|         @contextfactory
 | |
|         def a():
 | |
|             yield
 | |
|         state = 0
 | |
|         while True:
 | |
|             state += 1
 | |
|             with nested(a(), a()):
 | |
|                 break
 | |
|             state += 10
 | |
|         self.assertEqual(state, 1)
 | |
| 
 | |
|     def test_nested_continue(self):
 | |
|         @contextfactory
 | |
|         def a():
 | |
|             yield
 | |
|         state = 0
 | |
|         while state < 3:
 | |
|             state += 1
 | |
|             with nested(a(), a()):
 | |
|                 continue
 | |
|             state += 10
 | |
|         self.assertEqual(state, 3)
 | |
| 
 | |
|     def test_nested_return(self):
 | |
|         @contextfactory
 | |
|         def a():
 | |
|             try:
 | |
|                 yield
 | |
|             except:
 | |
|                 pass
 | |
|         def foo():
 | |
|             with nested(a(), a()):
 | |
|                 return 1
 | |
|             return 10
 | |
|         self.assertEqual(foo(), 1)
 | |
| 
 | |
| class ClosingTestCase(unittest.TestCase):
 | |
| 
 | |
|     # XXX This needs more work
 | |
| 
 | |
|     def test_closing(self):
 | |
|         state = []
 | |
|         class C:
 | |
|             def close(self):
 | |
|                 state.append(1)
 | |
|         x = C()
 | |
|         self.assertEqual(state, [])
 | |
|         with closing(x) as y:
 | |
|             self.assertEqual(x, y)
 | |
|         self.assertEqual(state, [1])
 | |
| 
 | |
|     def test_closing_error(self):
 | |
|         state = []
 | |
|         class C:
 | |
|             def close(self):
 | |
|                 state.append(1)
 | |
|         x = C()
 | |
|         self.assertEqual(state, [])
 | |
|         try:
 | |
|             with closing(x) as y:
 | |
|                 self.assertEqual(x, y)
 | |
|                 1/0
 | |
|         except ZeroDivisionError:
 | |
|             self.assertEqual(state, [1])
 | |
|         else:
 | |
|             self.fail("Didn't raise ZeroDivisionError")
 | |
| 
 | |
| class FileContextTestCase(unittest.TestCase):
 | |
| 
 | |
|     def testWithOpen(self):
 | |
|         tfn = tempfile.mktemp()
 | |
|         try:
 | |
|             f = None
 | |
|             with open(tfn, "w") as f:
 | |
|                 self.failIf(f.closed)
 | |
|                 f.write("Booh\n")
 | |
|             self.failUnless(f.closed)
 | |
|             f = None
 | |
|             try:
 | |
|                 with open(tfn, "r") as f:
 | |
|                     self.failIf(f.closed)
 | |
|                     self.assertEqual(f.read(), "Booh\n")
 | |
|                     1/0
 | |
|             except ZeroDivisionError:
 | |
|                 self.failUnless(f.closed)
 | |
|             else:
 | |
|                 self.fail("Didn't raise ZeroDivisionError")
 | |
|         finally:
 | |
|             try:
 | |
|                 os.remove(tfn)
 | |
|             except os.error:
 | |
|                 pass
 | |
| 
 | |
| class LockContextTestCase(unittest.TestCase):
 | |
| 
 | |
|     def boilerPlate(self, lock, locked):
 | |
|         self.failIf(locked())
 | |
|         with lock:
 | |
|             self.failUnless(locked())
 | |
|         self.failIf(locked())
 | |
|         try:
 | |
|             with lock:
 | |
|                 self.failUnless(locked())
 | |
|                 1/0
 | |
|         except ZeroDivisionError:
 | |
|             self.failIf(locked())
 | |
|         else:
 | |
|             self.fail("Didn't raise ZeroDivisionError")
 | |
| 
 | |
|     def testWithLock(self):
 | |
|         lock = threading.Lock()
 | |
|         self.boilerPlate(lock, lock.locked)
 | |
| 
 | |
|     def testWithRLock(self):
 | |
|         lock = threading.RLock()
 | |
|         self.boilerPlate(lock, lock._is_owned)
 | |
| 
 | |
|     def testWithCondition(self):
 | |
|         lock = threading.Condition()
 | |
|         def locked():
 | |
|             return lock._is_owned()
 | |
|         self.boilerPlate(lock, locked)
 | |
| 
 | |
|     def testWithSemaphore(self):
 | |
|         lock = threading.Semaphore()
 | |
|         def locked():
 | |
|             if lock.acquire(False):
 | |
|                 lock.release()
 | |
|                 return False
 | |
|             else:
 | |
|                 return True
 | |
|         self.boilerPlate(lock, locked)
 | |
| 
 | |
|     def testWithBoundedSemaphore(self):
 | |
|         lock = threading.BoundedSemaphore()
 | |
|         def locked():
 | |
|             if lock.acquire(False):
 | |
|                 lock.release()
 | |
|                 return False
 | |
|             else:
 | |
|                 return True
 | |
|         self.boilerPlate(lock, locked)
 | |
| 
 | |
| class DecimalContextTestCase(unittest.TestCase):
 | |
| 
 | |
|     # XXX Somebody should write more thorough tests for this
 | |
| 
 | |
|     def testBasic(self):
 | |
|         ctx = decimal.getcontext()
 | |
|         orig_context = ctx.copy()
 | |
|         try:
 | |
|             ctx.prec = save_prec = decimal.ExtendedContext.prec + 5
 | |
|             with decimal.ExtendedContext.context_manager():
 | |
|                 self.assertEqual(decimal.getcontext().prec,
 | |
|                                  decimal.ExtendedContext.prec)
 | |
|             self.assertEqual(decimal.getcontext().prec, save_prec)
 | |
|             try:
 | |
|                 with decimal.ExtendedContext.context_manager():
 | |
|                     self.assertEqual(decimal.getcontext().prec,
 | |
|                                      decimal.ExtendedContext.prec)
 | |
|                     1/0
 | |
|             except ZeroDivisionError:
 | |
|                 self.assertEqual(decimal.getcontext().prec, save_prec)
 | |
|             else:
 | |
|                 self.fail("Didn't raise ZeroDivisionError")
 | |
|         finally:
 | |
|             decimal.setcontext(orig_context)
 | |
| 
 | |
| 
 | |
| # This is needed to make the test actually run under regrtest.py!
 | |
| def test_main():
 | |
|     run_suite(
 | |
|         unittest.defaultTestLoader.loadTestsFromModule(sys.modules[__name__])
 | |
|     )
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     test_main()
 |