mirror of
				https://github.com/python/cpython.git
				synced 2025-11-03 23:21:29 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			94 lines
		
	
	
	
		
			2.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			94 lines
		
	
	
	
		
			2.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# Test case for the select.devpoll() function
 | 
						|
 | 
						|
# Initial tests are copied as is from "test_poll.py"
 | 
						|
 | 
						|
import os, select, random, unittest, sys
 | 
						|
from test.support import TESTFN, run_unittest
 | 
						|
 | 
						|
try:
 | 
						|
    select.devpoll
 | 
						|
except AttributeError:
 | 
						|
    raise unittest.SkipTest("select.devpoll not defined -- skipping test_devpoll")
 | 
						|
 | 
						|
 | 
						|
def find_ready_matching(ready, flag):
 | 
						|
    match = []
 | 
						|
    for fd, mode in ready:
 | 
						|
        if mode & flag:
 | 
						|
            match.append(fd)
 | 
						|
    return match
 | 
						|
 | 
						|
class DevPollTests(unittest.TestCase):
 | 
						|
 | 
						|
    def test_devpoll1(self):
 | 
						|
        # Basic functional test of poll object
 | 
						|
        # Create a bunch of pipe and test that poll works with them.
 | 
						|
 | 
						|
        p = select.devpoll()
 | 
						|
 | 
						|
        NUM_PIPES = 12
 | 
						|
        MSG = b" This is a test."
 | 
						|
        MSG_LEN = len(MSG)
 | 
						|
        readers = []
 | 
						|
        writers = []
 | 
						|
        r2w = {}
 | 
						|
        w2r = {}
 | 
						|
 | 
						|
        for i in range(NUM_PIPES):
 | 
						|
            rd, wr = os.pipe()
 | 
						|
            p.register(rd)
 | 
						|
            p.modify(rd, select.POLLIN)
 | 
						|
            p.register(wr, select.POLLOUT)
 | 
						|
            readers.append(rd)
 | 
						|
            writers.append(wr)
 | 
						|
            r2w[rd] = wr
 | 
						|
            w2r[wr] = rd
 | 
						|
 | 
						|
        bufs = []
 | 
						|
 | 
						|
        while writers:
 | 
						|
            ready = p.poll()
 | 
						|
            ready_writers = find_ready_matching(ready, select.POLLOUT)
 | 
						|
            if not ready_writers:
 | 
						|
                self.fail("no pipes ready for writing")
 | 
						|
            wr = random.choice(ready_writers)
 | 
						|
            os.write(wr, MSG)
 | 
						|
 | 
						|
            ready = p.poll()
 | 
						|
            ready_readers = find_ready_matching(ready, select.POLLIN)
 | 
						|
            if not ready_readers:
 | 
						|
                self.fail("no pipes ready for reading")
 | 
						|
            self.assertEqual([w2r[wr]], ready_readers)
 | 
						|
            rd = ready_readers[0]
 | 
						|
            buf = os.read(rd, MSG_LEN)
 | 
						|
            self.assertEqual(len(buf), MSG_LEN)
 | 
						|
            bufs.append(buf)
 | 
						|
            os.close(r2w[rd]) ; os.close(rd)
 | 
						|
            p.unregister(r2w[rd])
 | 
						|
            p.unregister(rd)
 | 
						|
            writers.remove(r2w[rd])
 | 
						|
 | 
						|
        self.assertEqual(bufs, [MSG] * NUM_PIPES)
 | 
						|
 | 
						|
    def test_timeout_overflow(self):
 | 
						|
        pollster = select.devpoll()
 | 
						|
        w, r = os.pipe()
 | 
						|
        pollster.register(w)
 | 
						|
 | 
						|
        pollster.poll(-1)
 | 
						|
        self.assertRaises(OverflowError, pollster.poll, -2)
 | 
						|
        self.assertRaises(OverflowError, pollster.poll, -1 << 31)
 | 
						|
        self.assertRaises(OverflowError, pollster.poll, -1 << 64)
 | 
						|
 | 
						|
        pollster.poll(0)
 | 
						|
        pollster.poll(1)
 | 
						|
        pollster.poll(1 << 30)
 | 
						|
        self.assertRaises(OverflowError, pollster.poll, 1 << 31)
 | 
						|
        self.assertRaises(OverflowError, pollster.poll, 1 << 63)
 | 
						|
        self.assertRaises(OverflowError, pollster.poll, 1 << 64)
 | 
						|
 | 
						|
def test_main():
 | 
						|
    run_unittest(DevPollTests)
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    test_main()
 |