mirror of
				https://github.com/python/cpython.git
				synced 2025-11-03 23:21:29 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			278 lines
		
	
	
	
		
			6.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			278 lines
		
	
	
	
		
			6.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#
 | 
						|
# A test file for the `multiprocessing` package
 | 
						|
#
 | 
						|
# Copyright (c) 2006-2008, R Oudkerk
 | 
						|
# All rights reserved.
 | 
						|
#
 | 
						|
 | 
						|
import time
 | 
						|
import sys
 | 
						|
import random
 | 
						|
from queue import Empty
 | 
						|
 | 
						|
import multiprocessing               # may get overwritten
 | 
						|
 | 
						|
 | 
						|
#### TEST_VALUE
 | 
						|
 | 
						|
def value_func(running, mutex):
 | 
						|
    random.seed()
 | 
						|
    time.sleep(random.random()*4)
 | 
						|
 | 
						|
    mutex.acquire()
 | 
						|
    print('\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished')
 | 
						|
    running.value -= 1
 | 
						|
    mutex.release()
 | 
						|
 | 
						|
def test_value():
 | 
						|
    TASKS = 10
 | 
						|
    running = multiprocessing.Value('i', TASKS)
 | 
						|
    mutex = multiprocessing.Lock()
 | 
						|
 | 
						|
    for i in range(TASKS):
 | 
						|
        p = multiprocessing.Process(target=value_func, args=(running, mutex))
 | 
						|
        p.start()
 | 
						|
 | 
						|
    while running.value > 0:
 | 
						|
        time.sleep(0.08)
 | 
						|
        mutex.acquire()
 | 
						|
        print(running.value, end=' ')
 | 
						|
        sys.stdout.flush()
 | 
						|
        mutex.release()
 | 
						|
 | 
						|
    print()
 | 
						|
    print('No more running processes')
 | 
						|
 | 
						|
 | 
						|
#### TEST_QUEUE
 | 
						|
 | 
						|
def queue_func(queue):
 | 
						|
    for i in range(30):
 | 
						|
        time.sleep(0.5 * random.random())
 | 
						|
        queue.put(i*i)
 | 
						|
    queue.put('STOP')
 | 
						|
 | 
						|
def test_queue():
 | 
						|
    q = multiprocessing.Queue()
 | 
						|
 | 
						|
    p = multiprocessing.Process(target=queue_func, args=(q,))
 | 
						|
    p.start()
 | 
						|
 | 
						|
    o = None
 | 
						|
    while o != 'STOP':
 | 
						|
        try:
 | 
						|
            o = q.get(timeout=0.3)
 | 
						|
            print(o, end=' ')
 | 
						|
            sys.stdout.flush()
 | 
						|
        except Empty:
 | 
						|
            print('TIMEOUT')
 | 
						|
 | 
						|
    print()
 | 
						|
 | 
						|
 | 
						|
#### TEST_CONDITION
 | 
						|
 | 
						|
def condition_func(cond):
 | 
						|
    cond.acquire()
 | 
						|
    print('\t' + str(cond))
 | 
						|
    time.sleep(2)
 | 
						|
    print('\tchild is notifying')
 | 
						|
    print('\t' + str(cond))
 | 
						|
    cond.notify()
 | 
						|
    cond.release()
 | 
						|
 | 
						|
def test_condition():
 | 
						|
    cond = multiprocessing.Condition()
 | 
						|
 | 
						|
    p = multiprocessing.Process(target=condition_func, args=(cond,))
 | 
						|
    print(cond)
 | 
						|
 | 
						|
    cond.acquire()
 | 
						|
    print(cond)
 | 
						|
    cond.acquire()
 | 
						|
    print(cond)
 | 
						|
 | 
						|
    p.start()
 | 
						|
 | 
						|
    print('main is waiting')
 | 
						|
    cond.wait()
 | 
						|
    print('main has woken up')
 | 
						|
 | 
						|
    print(cond)
 | 
						|
    cond.release()
 | 
						|
    print(cond)
 | 
						|
    cond.release()
 | 
						|
 | 
						|
    p.join()
 | 
						|
    print(cond)
 | 
						|
 | 
						|
 | 
						|
#### TEST_SEMAPHORE
 | 
						|
 | 
						|
def semaphore_func(sema, mutex, running):
 | 
						|
    sema.acquire()
 | 
						|
 | 
						|
    mutex.acquire()
 | 
						|
    running.value += 1
 | 
						|
    print(running.value, 'tasks are running')
 | 
						|
    mutex.release()
 | 
						|
 | 
						|
    random.seed()
 | 
						|
    time.sleep(random.random()*2)
 | 
						|
 | 
						|
    mutex.acquire()
 | 
						|
    running.value -= 1
 | 
						|
    print('%s has finished' % multiprocessing.current_process())
 | 
						|
    mutex.release()
 | 
						|
 | 
						|
    sema.release()
 | 
						|
 | 
						|
def test_semaphore():
 | 
						|
    sema = multiprocessing.Semaphore(3)
 | 
						|
    mutex = multiprocessing.RLock()
 | 
						|
    running = multiprocessing.Value('i', 0)
 | 
						|
 | 
						|
    processes = [
 | 
						|
        multiprocessing.Process(target=semaphore_func,
 | 
						|
                                args=(sema, mutex, running))
 | 
						|
        for i in range(10)
 | 
						|
        ]
 | 
						|
 | 
						|
    for p in processes:
 | 
						|
        p.start()
 | 
						|
 | 
						|
    for p in processes:
 | 
						|
        p.join()
 | 
						|
 | 
						|
 | 
						|
#### TEST_JOIN_TIMEOUT
 | 
						|
 | 
						|
def join_timeout_func():
 | 
						|
    print('\tchild sleeping')
 | 
						|
    time.sleep(5.5)
 | 
						|
    print('\n\tchild terminating')
 | 
						|
 | 
						|
def test_join_timeout():
 | 
						|
    p = multiprocessing.Process(target=join_timeout_func)
 | 
						|
    p.start()
 | 
						|
 | 
						|
    print('waiting for process to finish')
 | 
						|
 | 
						|
    while 1:
 | 
						|
        p.join(timeout=1)
 | 
						|
        if not p.is_alive():
 | 
						|
            break
 | 
						|
        print('.', end=' ')
 | 
						|
        sys.stdout.flush()
 | 
						|
 | 
						|
 | 
						|
#### TEST_EVENT
 | 
						|
 | 
						|
def event_func(event):
 | 
						|
    print('\t%r is waiting' % multiprocessing.current_process())
 | 
						|
    event.wait()
 | 
						|
    print('\t%r has woken up' % multiprocessing.current_process())
 | 
						|
 | 
						|
def test_event():
 | 
						|
    event = multiprocessing.Event()
 | 
						|
 | 
						|
    processes = [multiprocessing.Process(target=event_func, args=(event,))
 | 
						|
                 for i in range(5)]
 | 
						|
 | 
						|
    for p in processes:
 | 
						|
        p.start()
 | 
						|
 | 
						|
    print('main is sleeping')
 | 
						|
    time.sleep(2)
 | 
						|
 | 
						|
    print('main is setting event')
 | 
						|
    event.set()
 | 
						|
 | 
						|
    for p in processes:
 | 
						|
        p.join()
 | 
						|
 | 
						|
 | 
						|
#### TEST_SHAREDVALUES
 | 
						|
 | 
						|
def sharedvalues_func(values, arrays, shared_values, shared_arrays):
 | 
						|
    for i in range(len(values)):
 | 
						|
        v = values[i][1]
 | 
						|
        sv = shared_values[i].value
 | 
						|
        assert v == sv
 | 
						|
 | 
						|
    for i in range(len(values)):
 | 
						|
        a = arrays[i][1]
 | 
						|
        sa = list(shared_arrays[i][:])
 | 
						|
        assert a == sa
 | 
						|
 | 
						|
    print('Tests passed')
 | 
						|
 | 
						|
def test_sharedvalues():
 | 
						|
    values = [
 | 
						|
        ('i', 10),
 | 
						|
        ('h', -2),
 | 
						|
        ('d', 1.25)
 | 
						|
        ]
 | 
						|
    arrays = [
 | 
						|
        ('i', list(range(100))),
 | 
						|
        ('d', [0.25 * i for i in range(100)]),
 | 
						|
        ('H', list(range(1000)))
 | 
						|
        ]
 | 
						|
 | 
						|
    shared_values = [multiprocessing.Value(id, v) for id, v in values]
 | 
						|
    shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]
 | 
						|
 | 
						|
    p = multiprocessing.Process(
 | 
						|
        target=sharedvalues_func,
 | 
						|
        args=(values, arrays, shared_values, shared_arrays)
 | 
						|
        )
 | 
						|
    p.start()
 | 
						|
    p.join()
 | 
						|
 | 
						|
    assert p.exitcode == 0
 | 
						|
 | 
						|
 | 
						|
####
 | 
						|
 | 
						|
def test(namespace=multiprocessing):
 | 
						|
    global multiprocessing
 | 
						|
 | 
						|
    multiprocessing = namespace
 | 
						|
 | 
						|
    for func in [test_value, test_queue, test_condition,
 | 
						|
                 test_semaphore, test_join_timeout, test_event,
 | 
						|
                 test_sharedvalues]:
 | 
						|
 | 
						|
        print('\n\t######## %s\n' % func.__name__)
 | 
						|
        func()
 | 
						|
 | 
						|
    ignore = multiprocessing.active_children()      # cleanup any old processes
 | 
						|
    if hasattr(multiprocessing, '_debug_info'):
 | 
						|
        info = multiprocessing._debug_info()
 | 
						|
        if info:
 | 
						|
            print(info)
 | 
						|
            raise ValueError('there should be no positive refcounts left')
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    multiprocessing.freeze_support()
 | 
						|
 | 
						|
    assert len(sys.argv) in (1, 2)
 | 
						|
 | 
						|
    if len(sys.argv) == 1 or sys.argv[1] == 'processes':
 | 
						|
        print(' Using processes '.center(79, '-'))
 | 
						|
        namespace = multiprocessing
 | 
						|
    elif sys.argv[1] == 'manager':
 | 
						|
        print(' Using processes and a manager '.center(79, '-'))
 | 
						|
        namespace = multiprocessing.Manager()
 | 
						|
        namespace.Process = multiprocessing.Process
 | 
						|
        namespace.current_process = multiprocessing.current_process
 | 
						|
        namespace.active_children = multiprocessing.active_children
 | 
						|
    elif sys.argv[1] == 'threads':
 | 
						|
        print(' Using threads '.center(79, '-'))
 | 
						|
        import multiprocessing.dummy as namespace
 | 
						|
    else:
 | 
						|
        print('Usage:\n\t%s [processes | manager | threads]' % sys.argv[0])
 | 
						|
        raise SystemExit(2)
 | 
						|
 | 
						|
    test(namespace)
 |