mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 07:31:38 +00:00 
			
		
		
		
	Replace hardcoded timeout constants in tests with SHORT_TIMEOUT of test.support, so it's easier to ajdust this timeout for all tests at once. SHORT_TIMEOUT is 30 seconds by default, but it can be longer depending on --timeout command line option. The change makes almost all timeouts longer, except test_reap_children() of test_support which is made 2x shorter: SHORT_TIMEOUT should be enough. If this test starts to fail, LONG_TIMEOUT should be used instead. Uniformize also "from test import support" import in some test files.
		
			
				
	
	
		
			199 lines
		
	
	
	
		
			6.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			199 lines
		
	
	
	
		
			6.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import queue
 | 
						|
import sched
 | 
						|
import threading
 | 
						|
import time
 | 
						|
import unittest
 | 
						|
from test import support
 | 
						|
 | 
						|
 | 
						|
TIMEOUT = support.SHORT_TIMEOUT
 | 
						|
 | 
						|
 | 
						|
class Timer:
 | 
						|
    def __init__(self):
 | 
						|
        self._cond = threading.Condition()
 | 
						|
        self._time = 0
 | 
						|
        self._stop = 0
 | 
						|
 | 
						|
    def time(self):
 | 
						|
        with self._cond:
 | 
						|
            return self._time
 | 
						|
 | 
						|
    # increase the time but not beyond the established limit
 | 
						|
    def sleep(self, t):
 | 
						|
        assert t >= 0
 | 
						|
        with self._cond:
 | 
						|
            t += self._time
 | 
						|
            while self._stop < t:
 | 
						|
                self._time = self._stop
 | 
						|
                self._cond.wait()
 | 
						|
            self._time = t
 | 
						|
 | 
						|
    # advance time limit for user code
 | 
						|
    def advance(self, t):
 | 
						|
        assert t >= 0
 | 
						|
        with self._cond:
 | 
						|
            self._stop += t
 | 
						|
            self._cond.notify_all()
 | 
						|
 | 
						|
 | 
						|
class TestCase(unittest.TestCase):
 | 
						|
 | 
						|
    def test_enter(self):
 | 
						|
        l = []
 | 
						|
        fun = lambda x: l.append(x)
 | 
						|
        scheduler = sched.scheduler(time.time, time.sleep)
 | 
						|
        for x in [0.5, 0.4, 0.3, 0.2, 0.1]:
 | 
						|
            z = scheduler.enter(x, 1, fun, (x,))
 | 
						|
        scheduler.run()
 | 
						|
        self.assertEqual(l, [0.1, 0.2, 0.3, 0.4, 0.5])
 | 
						|
 | 
						|
    def test_enterabs(self):
 | 
						|
        l = []
 | 
						|
        fun = lambda x: l.append(x)
 | 
						|
        scheduler = sched.scheduler(time.time, time.sleep)
 | 
						|
        for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
 | 
						|
            z = scheduler.enterabs(x, 1, fun, (x,))
 | 
						|
        scheduler.run()
 | 
						|
        self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
 | 
						|
 | 
						|
    def test_enter_concurrent(self):
 | 
						|
        q = queue.Queue()
 | 
						|
        fun = q.put
 | 
						|
        timer = Timer()
 | 
						|
        scheduler = sched.scheduler(timer.time, timer.sleep)
 | 
						|
        scheduler.enter(1, 1, fun, (1,))
 | 
						|
        scheduler.enter(3, 1, fun, (3,))
 | 
						|
        t = threading.Thread(target=scheduler.run)
 | 
						|
        t.start()
 | 
						|
        timer.advance(1)
 | 
						|
        self.assertEqual(q.get(timeout=TIMEOUT), 1)
 | 
						|
        self.assertTrue(q.empty())
 | 
						|
        for x in [4, 5, 2]:
 | 
						|
            z = scheduler.enter(x - 1, 1, fun, (x,))
 | 
						|
        timer.advance(2)
 | 
						|
        self.assertEqual(q.get(timeout=TIMEOUT), 2)
 | 
						|
        self.assertEqual(q.get(timeout=TIMEOUT), 3)
 | 
						|
        self.assertTrue(q.empty())
 | 
						|
        timer.advance(1)
 | 
						|
        self.assertEqual(q.get(timeout=TIMEOUT), 4)
 | 
						|
        self.assertTrue(q.empty())
 | 
						|
        timer.advance(1)
 | 
						|
        self.assertEqual(q.get(timeout=TIMEOUT), 5)
 | 
						|
        self.assertTrue(q.empty())
 | 
						|
        timer.advance(1000)
 | 
						|
        support.join_thread(t)
 | 
						|
        self.assertTrue(q.empty())
 | 
						|
        self.assertEqual(timer.time(), 5)
 | 
						|
 | 
						|
    def test_priority(self):
 | 
						|
        l = []
 | 
						|
        fun = lambda x: l.append(x)
 | 
						|
        scheduler = sched.scheduler(time.time, time.sleep)
 | 
						|
        for priority in [1, 2, 3, 4, 5]:
 | 
						|
            z = scheduler.enterabs(0.01, priority, fun, (priority,))
 | 
						|
        scheduler.run()
 | 
						|
        self.assertEqual(l, [1, 2, 3, 4, 5])
 | 
						|
 | 
						|
    def test_cancel(self):
 | 
						|
        l = []
 | 
						|
        fun = lambda x: l.append(x)
 | 
						|
        scheduler = sched.scheduler(time.time, time.sleep)
 | 
						|
        now = time.time()
 | 
						|
        event1 = scheduler.enterabs(now + 0.01, 1, fun, (0.01,))
 | 
						|
        event2 = scheduler.enterabs(now + 0.02, 1, fun, (0.02,))
 | 
						|
        event3 = scheduler.enterabs(now + 0.03, 1, fun, (0.03,))
 | 
						|
        event4 = scheduler.enterabs(now + 0.04, 1, fun, (0.04,))
 | 
						|
        event5 = scheduler.enterabs(now + 0.05, 1, fun, (0.05,))
 | 
						|
        scheduler.cancel(event1)
 | 
						|
        scheduler.cancel(event5)
 | 
						|
        scheduler.run()
 | 
						|
        self.assertEqual(l, [0.02, 0.03, 0.04])
 | 
						|
 | 
						|
    def test_cancel_concurrent(self):
 | 
						|
        q = queue.Queue()
 | 
						|
        fun = q.put
 | 
						|
        timer = Timer()
 | 
						|
        scheduler = sched.scheduler(timer.time, timer.sleep)
 | 
						|
        now = timer.time()
 | 
						|
        event1 = scheduler.enterabs(now + 1, 1, fun, (1,))
 | 
						|
        event2 = scheduler.enterabs(now + 2, 1, fun, (2,))
 | 
						|
        event4 = scheduler.enterabs(now + 4, 1, fun, (4,))
 | 
						|
        event5 = scheduler.enterabs(now + 5, 1, fun, (5,))
 | 
						|
        event3 = scheduler.enterabs(now + 3, 1, fun, (3,))
 | 
						|
        t = threading.Thread(target=scheduler.run)
 | 
						|
        t.start()
 | 
						|
        timer.advance(1)
 | 
						|
        self.assertEqual(q.get(timeout=TIMEOUT), 1)
 | 
						|
        self.assertTrue(q.empty())
 | 
						|
        scheduler.cancel(event2)
 | 
						|
        scheduler.cancel(event5)
 | 
						|
        timer.advance(1)
 | 
						|
        self.assertTrue(q.empty())
 | 
						|
        timer.advance(1)
 | 
						|
        self.assertEqual(q.get(timeout=TIMEOUT), 3)
 | 
						|
        self.assertTrue(q.empty())
 | 
						|
        timer.advance(1)
 | 
						|
        self.assertEqual(q.get(timeout=TIMEOUT), 4)
 | 
						|
        self.assertTrue(q.empty())
 | 
						|
        timer.advance(1000)
 | 
						|
        support.join_thread(t)
 | 
						|
        self.assertTrue(q.empty())
 | 
						|
        self.assertEqual(timer.time(), 4)
 | 
						|
 | 
						|
    def test_empty(self):
 | 
						|
        l = []
 | 
						|
        fun = lambda x: l.append(x)
 | 
						|
        scheduler = sched.scheduler(time.time, time.sleep)
 | 
						|
        self.assertTrue(scheduler.empty())
 | 
						|
        for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
 | 
						|
            z = scheduler.enterabs(x, 1, fun, (x,))
 | 
						|
        self.assertFalse(scheduler.empty())
 | 
						|
        scheduler.run()
 | 
						|
        self.assertTrue(scheduler.empty())
 | 
						|
 | 
						|
    def test_queue(self):
 | 
						|
        l = []
 | 
						|
        fun = lambda x: l.append(x)
 | 
						|
        scheduler = sched.scheduler(time.time, time.sleep)
 | 
						|
        now = time.time()
 | 
						|
        e5 = scheduler.enterabs(now + 0.05, 1, fun)
 | 
						|
        e1 = scheduler.enterabs(now + 0.01, 1, fun)
 | 
						|
        e2 = scheduler.enterabs(now + 0.02, 1, fun)
 | 
						|
        e4 = scheduler.enterabs(now + 0.04, 1, fun)
 | 
						|
        e3 = scheduler.enterabs(now + 0.03, 1, fun)
 | 
						|
        # queue property is supposed to return an order list of
 | 
						|
        # upcoming events
 | 
						|
        self.assertEqual(scheduler.queue, [e1, e2, e3, e4, e5])
 | 
						|
 | 
						|
    def test_args_kwargs(self):
 | 
						|
        seq = []
 | 
						|
        def fun(*a, **b):
 | 
						|
            seq.append((a, b))
 | 
						|
 | 
						|
        now = time.time()
 | 
						|
        scheduler = sched.scheduler(time.time, time.sleep)
 | 
						|
        scheduler.enterabs(now, 1, fun)
 | 
						|
        scheduler.enterabs(now, 1, fun, argument=(1, 2))
 | 
						|
        scheduler.enterabs(now, 1, fun, argument=('a', 'b'))
 | 
						|
        scheduler.enterabs(now, 1, fun, argument=(1, 2), kwargs={"foo": 3})
 | 
						|
        scheduler.run()
 | 
						|
        self.assertCountEqual(seq, [
 | 
						|
            ((), {}),
 | 
						|
            ((1, 2), {}),
 | 
						|
            (('a', 'b'), {}),
 | 
						|
            ((1, 2), {'foo': 3})
 | 
						|
        ])
 | 
						|
 | 
						|
    def test_run_non_blocking(self):
 | 
						|
        l = []
 | 
						|
        fun = lambda x: l.append(x)
 | 
						|
        scheduler = sched.scheduler(time.time, time.sleep)
 | 
						|
        for x in [10, 9, 8, 7, 6]:
 | 
						|
            scheduler.enter(x, 1, fun, (x,))
 | 
						|
        scheduler.run(blocking=False)
 | 
						|
        self.assertEqual(l, [])
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    unittest.main()
 |