mirror of
https://github.com/python/cpython.git
synced 2025-10-24 10:23:58 +00:00

The topological sort functionality that was introduced initially in the functools module has been moved to a new graphlib module to better accommodate the new tools and keep the original scope of the functools module.
244 lines
8 KiB
Python
244 lines
8 KiB
Python
from itertools import chain
|
|
import graphlib
|
|
import os
|
|
import unittest
|
|
|
|
from test.support.script_helper import assert_python_ok
|
|
|
|
class TestTopologicalSort(unittest.TestCase):
|
|
def _test_graph(self, graph, expected):
|
|
def static_order_with_groups(ts):
|
|
ts.prepare()
|
|
while ts.is_active():
|
|
nodes = ts.get_ready()
|
|
for node in nodes:
|
|
ts.done(node)
|
|
yield nodes
|
|
|
|
ts = graphlib.TopologicalSorter(graph)
|
|
self.assertEqual(list(static_order_with_groups(ts)), list(expected))
|
|
|
|
ts = graphlib.TopologicalSorter(graph)
|
|
self.assertEqual(list(ts.static_order()), list(chain(*expected)))
|
|
|
|
def _assert_cycle(self, graph, cycle):
|
|
ts = graphlib.TopologicalSorter()
|
|
for node, dependson in graph.items():
|
|
ts.add(node, *dependson)
|
|
try:
|
|
ts.prepare()
|
|
except graphlib.CycleError as e:
|
|
msg, seq = e.args
|
|
self.assertIn(" ".join(map(str, cycle)), " ".join(map(str, seq * 2)))
|
|
else:
|
|
raise
|
|
|
|
def test_simple_cases(self):
|
|
self._test_graph(
|
|
{2: {11}, 9: {11, 8}, 10: {11, 3}, 11: {7, 5}, 8: {7, 3}},
|
|
[(3, 5, 7), (11, 8), (2, 10, 9)],
|
|
)
|
|
|
|
self._test_graph({1: {}}, [(1,)])
|
|
|
|
self._test_graph(
|
|
{x: {x + 1} for x in range(10)}, [(x,) for x in range(10, -1, -1)]
|
|
)
|
|
|
|
self._test_graph(
|
|
{2: {3}, 3: {4}, 4: {5}, 5: {1}, 11: {12}, 12: {13}, 13: {14}, 14: {15}},
|
|
[(1, 15), (5, 14), (4, 13), (3, 12), (2, 11)],
|
|
)
|
|
|
|
self._test_graph(
|
|
{
|
|
0: [1, 2],
|
|
1: [3],
|
|
2: [5, 6],
|
|
3: [4],
|
|
4: [9],
|
|
5: [3],
|
|
6: [7],
|
|
7: [8],
|
|
8: [4],
|
|
9: [],
|
|
},
|
|
[(9,), (4,), (3, 8), (1, 5, 7), (6,), (2,), (0,)],
|
|
)
|
|
|
|
self._test_graph({0: [1, 2], 1: [], 2: [3], 3: []}, [(1, 3), (2,), (0,)])
|
|
|
|
self._test_graph(
|
|
{0: [1, 2], 1: [], 2: [3], 3: [], 4: [5], 5: [6], 6: []},
|
|
[(1, 3, 6), (2, 5), (0, 4)],
|
|
)
|
|
|
|
def test_no_dependencies(self):
|
|
self._test_graph({1: {2}, 3: {4}, 5: {6}}, [(2, 4, 6), (1, 3, 5)])
|
|
|
|
self._test_graph({1: set(), 3: set(), 5: set()}, [(1, 3, 5)])
|
|
|
|
def test_the_node_multiple_times(self):
|
|
# Test same node multiple times in dependencies
|
|
self._test_graph({1: {2}, 3: {4}, 0: [2, 4, 4, 4, 4, 4]}, [(2, 4), (1, 3, 0)])
|
|
|
|
# Test adding the same dependency multiple times
|
|
ts = graphlib.TopologicalSorter()
|
|
ts.add(1, 2)
|
|
ts.add(1, 2)
|
|
ts.add(1, 2)
|
|
self.assertEqual([*ts.static_order()], [2, 1])
|
|
|
|
def test_graph_with_iterables(self):
|
|
dependson = (2 * x + 1 for x in range(5))
|
|
ts = graphlib.TopologicalSorter({0: dependson})
|
|
self.assertEqual(list(ts.static_order()), [1, 3, 5, 7, 9, 0])
|
|
|
|
def test_add_dependencies_for_same_node_incrementally(self):
|
|
# Test same node multiple times
|
|
ts = graphlib.TopologicalSorter()
|
|
ts.add(1, 2)
|
|
ts.add(1, 3)
|
|
ts.add(1, 4)
|
|
ts.add(1, 5)
|
|
|
|
ts2 = graphlib.TopologicalSorter({1: {2, 3, 4, 5}})
|
|
self.assertEqual([*ts.static_order()], [*ts2.static_order()])
|
|
|
|
def test_empty(self):
|
|
self._test_graph({}, [])
|
|
|
|
def test_cycle(self):
|
|
# Self cycle
|
|
self._assert_cycle({1: {1}}, [1, 1])
|
|
# Simple cycle
|
|
self._assert_cycle({1: {2}, 2: {1}}, [1, 2, 1])
|
|
# Indirect cycle
|
|
self._assert_cycle({1: {2}, 2: {3}, 3: {1}}, [1, 3, 2, 1])
|
|
# not all elements involved in a cycle
|
|
self._assert_cycle({1: {2}, 2: {3}, 3: {1}, 5: {4}, 4: {6}}, [1, 3, 2, 1])
|
|
# Multiple cycles
|
|
self._assert_cycle({1: {2}, 2: {1}, 3: {4}, 4: {5}, 6: {7}, 7: {6}}, [1, 2, 1])
|
|
# Cycle in the middle of the graph
|
|
self._assert_cycle({1: {2}, 2: {3}, 3: {2, 4}, 4: {5}}, [3, 2])
|
|
|
|
def test_calls_before_prepare(self):
|
|
ts = graphlib.TopologicalSorter()
|
|
|
|
with self.assertRaisesRegex(ValueError, r"prepare\(\) must be called first"):
|
|
ts.get_ready()
|
|
with self.assertRaisesRegex(ValueError, r"prepare\(\) must be called first"):
|
|
ts.done(3)
|
|
with self.assertRaisesRegex(ValueError, r"prepare\(\) must be called first"):
|
|
ts.is_active()
|
|
|
|
def test_prepare_multiple_times(self):
|
|
ts = graphlib.TopologicalSorter()
|
|
ts.prepare()
|
|
with self.assertRaisesRegex(ValueError, r"cannot prepare\(\) more than once"):
|
|
ts.prepare()
|
|
|
|
def test_invalid_nodes_in_done(self):
|
|
ts = graphlib.TopologicalSorter()
|
|
ts.add(1, 2, 3, 4)
|
|
ts.add(2, 3, 4)
|
|
ts.prepare()
|
|
ts.get_ready()
|
|
|
|
with self.assertRaisesRegex(ValueError, "node 2 was not passed out"):
|
|
ts.done(2)
|
|
with self.assertRaisesRegex(ValueError, r"node 24 was not added using add\(\)"):
|
|
ts.done(24)
|
|
|
|
def test_done(self):
|
|
ts = graphlib.TopologicalSorter()
|
|
ts.add(1, 2, 3, 4)
|
|
ts.add(2, 3)
|
|
ts.prepare()
|
|
|
|
self.assertEqual(ts.get_ready(), (3, 4))
|
|
# If we don't mark anything as done, get_ready() returns nothing
|
|
self.assertEqual(ts.get_ready(), ())
|
|
ts.done(3)
|
|
# Now 2 becomes available as 3 is done
|
|
self.assertEqual(ts.get_ready(), (2,))
|
|
self.assertEqual(ts.get_ready(), ())
|
|
ts.done(4)
|
|
ts.done(2)
|
|
# Only 1 is missing
|
|
self.assertEqual(ts.get_ready(), (1,))
|
|
self.assertEqual(ts.get_ready(), ())
|
|
ts.done(1)
|
|
self.assertEqual(ts.get_ready(), ())
|
|
self.assertFalse(ts.is_active())
|
|
|
|
def test_is_active(self):
|
|
ts = graphlib.TopologicalSorter()
|
|
ts.add(1, 2)
|
|
ts.prepare()
|
|
|
|
self.assertTrue(ts.is_active())
|
|
self.assertEqual(ts.get_ready(), (2,))
|
|
self.assertTrue(ts.is_active())
|
|
ts.done(2)
|
|
self.assertTrue(ts.is_active())
|
|
self.assertEqual(ts.get_ready(), (1,))
|
|
self.assertTrue(ts.is_active())
|
|
ts.done(1)
|
|
self.assertFalse(ts.is_active())
|
|
|
|
def test_not_hashable_nodes(self):
|
|
ts = graphlib.TopologicalSorter()
|
|
self.assertRaises(TypeError, ts.add, dict(), 1)
|
|
self.assertRaises(TypeError, ts.add, 1, dict())
|
|
self.assertRaises(TypeError, ts.add, dict(), dict())
|
|
|
|
def test_order_of_insertion_does_not_matter_between_groups(self):
|
|
def get_groups(ts):
|
|
ts.prepare()
|
|
while ts.is_active():
|
|
nodes = ts.get_ready()
|
|
ts.done(*nodes)
|
|
yield set(nodes)
|
|
|
|
ts = graphlib.TopologicalSorter()
|
|
ts.add(3, 2, 1)
|
|
ts.add(1, 0)
|
|
ts.add(4, 5)
|
|
ts.add(6, 7)
|
|
ts.add(4, 7)
|
|
|
|
ts2 = graphlib.TopologicalSorter()
|
|
ts2.add(1, 0)
|
|
ts2.add(3, 2, 1)
|
|
ts2.add(4, 7)
|
|
ts2.add(6, 7)
|
|
ts2.add(4, 5)
|
|
|
|
self.assertEqual(list(get_groups(ts)), list(get_groups(ts2)))
|
|
|
|
def test_static_order_does_not_change_with_the_hash_seed(self):
|
|
def check_order_with_hash_seed(seed):
|
|
code = """if 1:
|
|
import graphlib
|
|
ts = graphlib.TopologicalSorter()
|
|
ts.add('blech', 'bluch', 'hola')
|
|
ts.add('abcd', 'blech', 'bluch', 'a', 'b')
|
|
ts.add('a', 'a string', 'something', 'b')
|
|
ts.add('bluch', 'hola', 'abcde', 'a', 'b')
|
|
print(list(ts.static_order()))
|
|
"""
|
|
env = os.environ.copy()
|
|
# signal to assert_python not to do a copy
|
|
# of os.environ on its own
|
|
env["__cleanenv"] = True
|
|
env["PYTHONHASHSEED"] = str(seed)
|
|
out = assert_python_ok("-c", code, **env)
|
|
return out
|
|
|
|
run1 = check_order_with_hash_seed(1234)
|
|
run2 = check_order_with_hash_seed(31415)
|
|
|
|
self.assertNotEqual(run1, "")
|
|
self.assertNotEqual(run2, "")
|
|
self.assertEqual(run1, run2)
|