mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 07:31:38 +00:00 
			
		
		
		
	
		
			
	
	
		
			95 lines
		
	
	
	
		
			2.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			95 lines
		
	
	
	
		
			2.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| 
								 | 
							
								# Test case for the select.devpoll() function
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								# Initial tests are copied as is from "test_poll.py"
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								import os, select, random, unittest, sys
							 | 
						||
| 
								 | 
							
								from test.support import TESTFN, run_unittest
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								try:
							 | 
						||
| 
								 | 
							
								    select.devpoll
							 | 
						||
| 
								 | 
							
								except AttributeError:
							 | 
						||
| 
								 | 
							
								    raise unittest.SkipTest("select.devpoll not defined -- skipping test_devpoll")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def find_ready_matching(ready, flag):
							 | 
						||
| 
								 | 
							
								    match = []
							 | 
						||
| 
								 | 
							
								    for fd, mode in ready:
							 | 
						||
| 
								 | 
							
								        if mode & flag:
							 | 
						||
| 
								 | 
							
								            match.append(fd)
							 | 
						||
| 
								 | 
							
								    return match
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class DevPollTests(unittest.TestCase):
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def test_devpoll1(self):
							 | 
						||
| 
								 | 
							
								        # Basic functional test of poll object
							 | 
						||
| 
								 | 
							
								        # Create a bunch of pipe and test that poll works with them.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        p = select.devpoll()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        NUM_PIPES = 12
							 | 
						||
| 
								 | 
							
								        MSG = b" This is a test."
							 | 
						||
| 
								 | 
							
								        MSG_LEN = len(MSG)
							 | 
						||
| 
								 | 
							
								        readers = []
							 | 
						||
| 
								 | 
							
								        writers = []
							 | 
						||
| 
								 | 
							
								        r2w = {}
							 | 
						||
| 
								 | 
							
								        w2r = {}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        for i in range(NUM_PIPES):
							 | 
						||
| 
								 | 
							
								            rd, wr = os.pipe()
							 | 
						||
| 
								 | 
							
								            p.register(rd)
							 | 
						||
| 
								 | 
							
								            p.modify(rd, select.POLLIN)
							 | 
						||
| 
								 | 
							
								            p.register(wr, select.POLLOUT)
							 | 
						||
| 
								 | 
							
								            readers.append(rd)
							 | 
						||
| 
								 | 
							
								            writers.append(wr)
							 | 
						||
| 
								 | 
							
								            r2w[rd] = wr
							 | 
						||
| 
								 | 
							
								            w2r[wr] = rd
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        bufs = []
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        while writers:
							 | 
						||
| 
								 | 
							
								            ready = p.poll()
							 | 
						||
| 
								 | 
							
								            ready_writers = find_ready_matching(ready, select.POLLOUT)
							 | 
						||
| 
								 | 
							
								            if not ready_writers:
							 | 
						||
| 
								 | 
							
								                self.fail("no pipes ready for writing")
							 | 
						||
| 
								 | 
							
								            wr = random.choice(ready_writers)
							 | 
						||
| 
								 | 
							
								            os.write(wr, MSG)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            ready = p.poll()
							 | 
						||
| 
								 | 
							
								            ready_readers = find_ready_matching(ready, select.POLLIN)
							 | 
						||
| 
								 | 
							
								            if not ready_readers:
							 | 
						||
| 
								 | 
							
								                self.fail("no pipes ready for reading")
							 | 
						||
| 
								 | 
							
								            self.assertEqual([w2r[wr]], ready_readers)
							 | 
						||
| 
								 | 
							
								            rd = ready_readers[0]
							 | 
						||
| 
								 | 
							
								            buf = os.read(rd, MSG_LEN)
							 | 
						||
| 
								 | 
							
								            self.assertEqual(len(buf), MSG_LEN)
							 | 
						||
| 
								 | 
							
								            bufs.append(buf)
							 | 
						||
| 
								 | 
							
								            os.close(r2w[rd]) ; os.close(rd)
							 | 
						||
| 
								 | 
							
								            p.unregister(r2w[rd])
							 | 
						||
| 
								 | 
							
								            p.unregister(rd)
							 | 
						||
| 
								 | 
							
								            writers.remove(r2w[rd])
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        self.assertEqual(bufs, [MSG] * NUM_PIPES)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def test_timeout_overflow(self):
							 | 
						||
| 
								 | 
							
								        pollster = select.devpoll()
							 | 
						||
| 
								 | 
							
								        w, r = os.pipe()
							 | 
						||
| 
								 | 
							
								        pollster.register(w)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        pollster.poll(-1)
							 | 
						||
| 
								 | 
							
								        self.assertRaises(OverflowError, pollster.poll, -2)
							 | 
						||
| 
								 | 
							
								        self.assertRaises(OverflowError, pollster.poll, -1 << 31)
							 | 
						||
| 
								 | 
							
								        self.assertRaises(OverflowError, pollster.poll, -1 << 64)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        pollster.poll(0)
							 | 
						||
| 
								 | 
							
								        pollster.poll(1)
							 | 
						||
| 
								 | 
							
								        pollster.poll(1 << 30)
							 | 
						||
| 
								 | 
							
								        self.assertRaises(OverflowError, pollster.poll, 1 << 31)
							 | 
						||
| 
								 | 
							
								        self.assertRaises(OverflowError, pollster.poll, 1 << 63)
							 | 
						||
| 
								 | 
							
								        self.assertRaises(OverflowError, pollster.poll, 1 << 64)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def test_main():
							 | 
						||
| 
								 | 
							
								    run_unittest(DevPollTests)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								if __name__ == '__main__':
							 | 
						||
| 
								 | 
							
								    test_main()
							 |