| 
									
										
										
										
											2024-08-06 05:28:58 +01:00
										 |  |  | import io | 
					
						
							| 
									
										
										
										
											2024-04-30 15:00:31 +01:00
										 |  |  | import platform | 
					
						
							|  |  |  | import queue | 
					
						
							|  |  |  | import re | 
					
						
							|  |  |  | import subprocess | 
					
						
							|  |  |  | import sys | 
					
						
							|  |  |  | import unittest | 
					
						
							| 
									
										
										
										
											2024-08-06 05:28:58 +01:00
										 |  |  | from _android_support import TextLogStream | 
					
						
							| 
									
										
										
										
											2024-04-30 15:00:31 +01:00
										 |  |  | from array import array | 
					
						
							| 
									
										
										
										
											2024-08-06 05:28:58 +01:00
										 |  |  | from contextlib import ExitStack, contextmanager | 
					
						
							| 
									
										
										
										
											2024-04-30 15:00:31 +01:00
										 |  |  | from threading import Thread | 
					
						
							|  |  |  | from test.support import LOOPBACK_TIMEOUT | 
					
						
							| 
									
										
										
										
											2024-09-13 05:58:11 +01:00
										 |  |  | from time import time | 
					
						
							| 
									
										
										
										
											2024-08-06 05:28:58 +01:00
										 |  |  | from unittest.mock import patch | 
					
						
							| 
									
										
										
										
											2024-04-30 15:00:31 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | if sys.platform != "android": | 
					
						
							|  |  |  |     raise unittest.SkipTest("Android-specific") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | api_level = platform.android_ver().api_level | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-08-16 06:00:29 +01:00
										 |  |  | # (name, level, fileno) | 
					
						
							|  |  |  | STREAM_INFO = [("stdout", "I", 1), ("stderr", "W", 2)] | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-04-30 15:00:31 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | # Test redirection of stdout and stderr to the Android log. | 
					
						
							|  |  |  | @unittest.skipIf( | 
					
						
							|  |  |  |     api_level < 23 and platform.machine() == "aarch64", | 
					
						
							|  |  |  |     "SELinux blocks reading logs on older ARM64 emulators" | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | class TestAndroidOutput(unittest.TestCase): | 
					
						
							|  |  |  |     maxDiff = None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def setUp(self): | 
					
						
							|  |  |  |         self.logcat_process = subprocess.Popen( | 
					
						
							|  |  |  |             ["logcat", "-v", "tag"], stdout=subprocess.PIPE, | 
					
						
							|  |  |  |             errors="backslashreplace" | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         self.logcat_queue = queue.Queue() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         def logcat_thread(): | 
					
						
							|  |  |  |             for line in self.logcat_process.stdout: | 
					
						
							|  |  |  |                 self.logcat_queue.put(line.rstrip("\n")) | 
					
						
							|  |  |  |             self.logcat_process.stdout.close() | 
					
						
							| 
									
										
										
										
											2024-09-13 05:58:11 +01:00
										 |  |  |         self.logcat_thread = Thread(target=logcat_thread) | 
					
						
							|  |  |  |         self.logcat_thread.start() | 
					
						
							| 
									
										
										
										
											2024-04-30 15:00:31 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |         from ctypes import CDLL, c_char_p, c_int | 
					
						
							|  |  |  |         android_log_write = getattr(CDLL("liblog.so"), "__android_log_write") | 
					
						
							|  |  |  |         android_log_write.argtypes = (c_int, c_char_p, c_char_p) | 
					
						
							|  |  |  |         ANDROID_LOG_INFO = 4 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Separate tests using a marker line with a different tag. | 
					
						
							|  |  |  |         tag, message = "python.test", f"{self.id()} {time()}" | 
					
						
							|  |  |  |         android_log_write( | 
					
						
							|  |  |  |             ANDROID_LOG_INFO, tag.encode("UTF-8"), message.encode("UTF-8")) | 
					
						
							|  |  |  |         self.assert_log("I", tag, message, skip=True, timeout=5) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def assert_logs(self, level, tag, expected, **kwargs): | 
					
						
							|  |  |  |         for line in expected: | 
					
						
							|  |  |  |             self.assert_log(level, tag, line, **kwargs) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def assert_log(self, level, tag, expected, *, skip=False, timeout=0.5): | 
					
						
							|  |  |  |         deadline = time() + timeout | 
					
						
							|  |  |  |         while True: | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 line = self.logcat_queue.get(timeout=(deadline - time())) | 
					
						
							|  |  |  |             except queue.Empty: | 
					
						
							|  |  |  |                 self.fail(f"line not found: {expected!r}") | 
					
						
							|  |  |  |             if match := re.fullmatch(fr"(.)/{tag}: (.*)", line): | 
					
						
							|  |  |  |                 try: | 
					
						
							|  |  |  |                     self.assertEqual(level, match[1]) | 
					
						
							|  |  |  |                     self.assertEqual(expected, match[2]) | 
					
						
							|  |  |  |                     break | 
					
						
							|  |  |  |                 except AssertionError: | 
					
						
							|  |  |  |                     if not skip: | 
					
						
							|  |  |  |                         raise | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def tearDown(self): | 
					
						
							|  |  |  |         self.logcat_process.terminate() | 
					
						
							|  |  |  |         self.logcat_process.wait(LOOPBACK_TIMEOUT) | 
					
						
							| 
									
										
										
										
											2024-09-13 05:58:11 +01:00
										 |  |  |         self.logcat_thread.join(LOOPBACK_TIMEOUT) | 
					
						
							| 
									
										
										
										
											2024-04-30 15:00:31 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |     @contextmanager | 
					
						
							|  |  |  |     def unbuffered(self, stream): | 
					
						
							|  |  |  |         stream.reconfigure(write_through=True) | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             yield | 
					
						
							|  |  |  |         finally: | 
					
						
							|  |  |  |             stream.reconfigure(write_through=False) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-08-06 05:28:58 +01:00
										 |  |  |     # In --verbose3 mode, sys.stdout and sys.stderr are captured, so we can't | 
					
						
							|  |  |  |     # test them directly. Detect this mode and use some temporary streams with | 
					
						
							|  |  |  |     # the same properties. | 
					
						
							|  |  |  |     def stream_context(self, stream_name, level): | 
					
						
							|  |  |  |         # https://developer.android.com/ndk/reference/group/logging | 
					
						
							|  |  |  |         prio = {"I": 4, "W": 5}[level] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         stack = ExitStack() | 
					
						
							|  |  |  |         stack.enter_context(self.subTest(stream_name)) | 
					
						
							|  |  |  |         stream = getattr(sys, stream_name) | 
					
						
							| 
									
										
										
										
											2024-08-16 06:00:29 +01:00
										 |  |  |         native_stream = getattr(sys, f"__{stream_name}__") | 
					
						
							| 
									
										
										
										
											2024-08-06 05:28:58 +01:00
										 |  |  |         if isinstance(stream, io.StringIO): | 
					
						
							|  |  |  |             stack.enter_context( | 
					
						
							|  |  |  |                 patch( | 
					
						
							|  |  |  |                     f"sys.{stream_name}", | 
					
						
							|  |  |  |                     TextLogStream( | 
					
						
							| 
									
										
										
										
											2024-08-16 06:00:29 +01:00
										 |  |  |                         prio, f"python.{stream_name}", native_stream.fileno(), | 
					
						
							|  |  |  |                         errors="backslashreplace" | 
					
						
							| 
									
										
										
										
											2024-08-06 05:28:58 +01:00
										 |  |  |                     ), | 
					
						
							|  |  |  |                 ) | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |         return stack | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-04-30 15:00:31 +01:00
										 |  |  |     def test_str(self): | 
					
						
							| 
									
										
										
										
											2024-08-16 06:00:29 +01:00
										 |  |  |         for stream_name, level, fileno in STREAM_INFO: | 
					
						
							| 
									
										
										
										
											2024-08-06 05:28:58 +01:00
										 |  |  |             with self.stream_context(stream_name, level): | 
					
						
							| 
									
										
										
										
											2024-04-30 15:00:31 +01:00
										 |  |  |                 stream = getattr(sys, stream_name) | 
					
						
							|  |  |  |                 tag = f"python.{stream_name}" | 
					
						
							|  |  |  |                 self.assertEqual(f"<TextLogStream '{tag}'>", repr(stream)) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-08-06 05:28:58 +01:00
										 |  |  |                 self.assertIs(stream.writable(), True) | 
					
						
							|  |  |  |                 self.assertIs(stream.readable(), False) | 
					
						
							| 
									
										
										
										
											2024-08-16 06:00:29 +01:00
										 |  |  |                 self.assertEqual(stream.fileno(), fileno) | 
					
						
							| 
									
										
										
										
											2024-04-30 15:00:31 +01:00
										 |  |  |                 self.assertEqual("UTF-8", stream.encoding) | 
					
						
							| 
									
										
										
										
											2024-10-25 01:35:41 +01:00
										 |  |  |                 self.assertEqual("backslashreplace", stream.errors) | 
					
						
							| 
									
										
										
										
											2024-08-06 05:28:58 +01:00
										 |  |  |                 self.assertIs(stream.line_buffering, True) | 
					
						
							|  |  |  |                 self.assertIs(stream.write_through, False) | 
					
						
							| 
									
										
										
										
											2024-04-30 15:00:31 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |                 def write(s, lines=None, *, write_len=None): | 
					
						
							|  |  |  |                     if write_len is None: | 
					
						
							|  |  |  |                         write_len = len(s) | 
					
						
							|  |  |  |                     self.assertEqual(write_len, stream.write(s)) | 
					
						
							|  |  |  |                     if lines is None: | 
					
						
							|  |  |  |                         lines = [s] | 
					
						
							|  |  |  |                     self.assert_logs(level, tag, lines) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 # Single-line messages, | 
					
						
							|  |  |  |                 with self.unbuffered(stream): | 
					
						
							|  |  |  |                     write("", []) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                     write("a") | 
					
						
							|  |  |  |                     write("Hello") | 
					
						
							|  |  |  |                     write("Hello world") | 
					
						
							|  |  |  |                     write(" ") | 
					
						
							|  |  |  |                     write("  ") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                     # Non-ASCII text | 
					
						
							|  |  |  |                     write("ol\u00e9")  # Spanish | 
					
						
							|  |  |  |                     write("\u4e2d\u6587")  # Chinese | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                     # Non-BMP emoji | 
					
						
							|  |  |  |                     write("\U0001f600") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                     # Non-encodable surrogates | 
					
						
							|  |  |  |                     write("\ud800\udc00", [r"\ud800\udc00"]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                     # Code used by surrogateescape (which isn't enabled here) | 
					
						
							|  |  |  |                     write("\udc80", [r"\udc80"]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                     # Null characters are logged using "modified UTF-8". | 
					
						
							|  |  |  |                     write("\u0000", [r"\xc0\x80"]) | 
					
						
							|  |  |  |                     write("a\u0000", [r"a\xc0\x80"]) | 
					
						
							|  |  |  |                     write("\u0000b", [r"\xc0\x80b"]) | 
					
						
							|  |  |  |                     write("a\u0000b", [r"a\xc0\x80b"]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 # Multi-line messages. Avoid identical consecutive lines, as | 
					
						
							|  |  |  |                 # they may activate "chatty" filtering and break the tests. | 
					
						
							|  |  |  |                 write("\nx", [""]) | 
					
						
							|  |  |  |                 write("\na\n", ["x", "a"]) | 
					
						
							|  |  |  |                 write("\n", [""]) | 
					
						
							|  |  |  |                 write("b\n", ["b"]) | 
					
						
							|  |  |  |                 write("c\n\n", ["c", ""]) | 
					
						
							|  |  |  |                 write("d\ne", ["d"]) | 
					
						
							|  |  |  |                 write("xx", []) | 
					
						
							|  |  |  |                 write("f\n\ng", ["exxf", ""]) | 
					
						
							|  |  |  |                 write("\n", ["g"]) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-08-06 05:28:58 +01:00
										 |  |  |                 # Since this is a line-based logging system, line buffering | 
					
						
							|  |  |  |                 # cannot be turned off, i.e. a newline always causes a flush. | 
					
						
							|  |  |  |                 stream.reconfigure(line_buffering=False) | 
					
						
							|  |  |  |                 self.assertIs(stream.line_buffering, True) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 # However, buffering can be turned off completely if you want a | 
					
						
							|  |  |  |                 # flush after every write. | 
					
						
							| 
									
										
										
										
											2024-04-30 15:00:31 +01:00
										 |  |  |                 with self.unbuffered(stream): | 
					
						
							|  |  |  |                     write("\nx", ["", "x"]) | 
					
						
							|  |  |  |                     write("\na\n", ["", "a"]) | 
					
						
							|  |  |  |                     write("\n", [""]) | 
					
						
							|  |  |  |                     write("b\n", ["b"]) | 
					
						
							|  |  |  |                     write("c\n\n", ["c", ""]) | 
					
						
							|  |  |  |                     write("d\ne", ["d", "e"]) | 
					
						
							|  |  |  |                     write("xx", ["xx"]) | 
					
						
							|  |  |  |                     write("f\n\ng", ["f", "", "g"]) | 
					
						
							|  |  |  |                     write("\n", [""]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 # "\r\n" should be translated into "\n". | 
					
						
							|  |  |  |                 write("hello\r\n", ["hello"]) | 
					
						
							|  |  |  |                 write("hello\r\nworld\r\n", ["hello", "world"]) | 
					
						
							|  |  |  |                 write("\r\n", [""]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 # Non-standard line separators should be preserved. | 
					
						
							|  |  |  |                 write("before form feed\x0cafter form feed\n", | 
					
						
							|  |  |  |                       ["before form feed\x0cafter form feed"]) | 
					
						
							|  |  |  |                 write("before line separator\u2028after line separator\n", | 
					
						
							|  |  |  |                       ["before line separator\u2028after line separator"]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 # String subclasses are accepted, but they should be converted | 
					
						
							|  |  |  |                 # to a standard str without calling any of their methods. | 
					
						
							|  |  |  |                 class CustomStr(str): | 
					
						
							|  |  |  |                     def splitlines(self, *args, **kwargs): | 
					
						
							|  |  |  |                         raise AssertionError() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                     def __len__(self): | 
					
						
							|  |  |  |                         raise AssertionError() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                     def __str__(self): | 
					
						
							|  |  |  |                         raise AssertionError() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 write(CustomStr("custom\n"), ["custom"], write_len=7) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 # Non-string classes are not accepted. | 
					
						
							|  |  |  |                 for obj in [b"", b"hello", None, 42]: | 
					
						
							|  |  |  |                     with self.subTest(obj=obj): | 
					
						
							|  |  |  |                         with self.assertRaisesRegex( | 
					
						
							|  |  |  |                             TypeError, | 
					
						
							|  |  |  |                             fr"write\(\) argument must be str, not " | 
					
						
							|  |  |  |                             fr"{type(obj).__name__}" | 
					
						
							|  |  |  |                         ): | 
					
						
							|  |  |  |                             stream.write(obj) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 # Manual flushing is supported. | 
					
						
							|  |  |  |                 write("hello", []) | 
					
						
							|  |  |  |                 stream.flush() | 
					
						
							|  |  |  |                 self.assert_log(level, tag, "hello") | 
					
						
							|  |  |  |                 write("hello", []) | 
					
						
							|  |  |  |                 write("world", []) | 
					
						
							|  |  |  |                 stream.flush() | 
					
						
							|  |  |  |                 self.assert_log(level, tag, "helloworld") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 # Long lines are split into blocks of 1000 characters | 
					
						
							|  |  |  |                 # (MAX_CHARS_PER_WRITE in _android_support.py), but | 
					
						
							|  |  |  |                 # TextIOWrapper should then join them back together as much as | 
					
						
							|  |  |  |                 # possible without exceeding 4000 UTF-8 bytes | 
					
						
							|  |  |  |                 # (MAX_BYTES_PER_WRITE). | 
					
						
							|  |  |  |                 # | 
					
						
							|  |  |  |                 # ASCII (1 byte per character) | 
					
						
							| 
									
										
										
										
											2024-08-06 05:28:58 +01:00
										 |  |  |                 write(("foobar" * 700) + "\n",  # 4200 bytes in | 
					
						
							|  |  |  |                       [("foobar" * 666) + "foob",  # 4000 bytes out | 
					
						
							|  |  |  |                        "ar" + ("foobar" * 33)])  # 200 bytes out | 
					
						
							| 
									
										
										
										
											2024-04-30 15:00:31 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |                 # "Full-width" digits 0-9 (3 bytes per character) | 
					
						
							|  |  |  |                 s = "\uff10\uff11\uff12\uff13\uff14\uff15\uff16\uff17\uff18\uff19" | 
					
						
							| 
									
										
										
										
											2024-08-06 05:28:58 +01:00
										 |  |  |                 write((s * 150) + "\n",  # 4500 bytes in | 
					
						
							|  |  |  |                       [s * 100,  # 3000 bytes out | 
					
						
							|  |  |  |                        s * 50])  # 1500 bytes out | 
					
						
							| 
									
										
										
										
											2024-04-30 15:00:31 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |                 s = "0123456789" | 
					
						
							| 
									
										
										
										
											2024-08-06 05:28:58 +01:00
										 |  |  |                 write(s * 200, [])  # 2000 bytes in | 
					
						
							|  |  |  |                 write(s * 150, [])  # 1500 bytes in | 
					
						
							|  |  |  |                 write(s * 51, [s * 350])  # 510 bytes in, 3500 bytes out | 
					
						
							|  |  |  |                 write("\n", [s * 51])  # 0 bytes in, 510 bytes out | 
					
						
							| 
									
										
										
										
											2024-04-30 15:00:31 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_bytes(self): | 
					
						
							| 
									
										
										
										
											2024-08-16 06:00:29 +01:00
										 |  |  |         for stream_name, level, fileno in STREAM_INFO: | 
					
						
							| 
									
										
										
										
											2024-08-06 05:28:58 +01:00
										 |  |  |             with self.stream_context(stream_name, level): | 
					
						
							| 
									
										
										
										
											2024-04-30 15:00:31 +01:00
										 |  |  |                 stream = getattr(sys, stream_name).buffer | 
					
						
							|  |  |  |                 tag = f"python.{stream_name}" | 
					
						
							|  |  |  |                 self.assertEqual(f"<BinaryLogStream '{tag}'>", repr(stream)) | 
					
						
							| 
									
										
										
										
											2024-08-06 05:28:58 +01:00
										 |  |  |                 self.assertIs(stream.writable(), True) | 
					
						
							|  |  |  |                 self.assertIs(stream.readable(), False) | 
					
						
							| 
									
										
										
										
											2024-08-16 06:00:29 +01:00
										 |  |  |                 self.assertEqual(stream.fileno(), fileno) | 
					
						
							| 
									
										
										
										
											2024-04-30 15:00:31 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |                 def write(b, lines=None, *, write_len=None): | 
					
						
							|  |  |  |                     if write_len is None: | 
					
						
							|  |  |  |                         write_len = len(b) | 
					
						
							|  |  |  |                     self.assertEqual(write_len, stream.write(b)) | 
					
						
							|  |  |  |                     if lines is None: | 
					
						
							|  |  |  |                         lines = [b.decode()] | 
					
						
							|  |  |  |                     self.assert_logs(level, tag, lines) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 # Single-line messages, | 
					
						
							|  |  |  |                 write(b"", []) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 write(b"a") | 
					
						
							|  |  |  |                 write(b"Hello") | 
					
						
							|  |  |  |                 write(b"Hello world") | 
					
						
							|  |  |  |                 write(b" ") | 
					
						
							|  |  |  |                 write(b"  ") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 # Non-ASCII text | 
					
						
							|  |  |  |                 write(b"ol\xc3\xa9")  # Spanish | 
					
						
							|  |  |  |                 write(b"\xe4\xb8\xad\xe6\x96\x87")  # Chinese | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 # Non-BMP emoji | 
					
						
							|  |  |  |                 write(b"\xf0\x9f\x98\x80") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 # Null bytes are logged using "modified UTF-8". | 
					
						
							|  |  |  |                 write(b"\x00", [r"\xc0\x80"]) | 
					
						
							|  |  |  |                 write(b"a\x00", [r"a\xc0\x80"]) | 
					
						
							|  |  |  |                 write(b"\x00b", [r"\xc0\x80b"]) | 
					
						
							|  |  |  |                 write(b"a\x00b", [r"a\xc0\x80b"]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 # Invalid UTF-8 | 
					
						
							|  |  |  |                 write(b"\xff", [r"\xff"]) | 
					
						
							|  |  |  |                 write(b"a\xff", [r"a\xff"]) | 
					
						
							|  |  |  |                 write(b"\xffb", [r"\xffb"]) | 
					
						
							|  |  |  |                 write(b"a\xffb", [r"a\xffb"]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 # Log entries containing newlines are shown differently by | 
					
						
							|  |  |  |                 # `logcat -v tag`, `logcat -v long`, and Android Studio. We | 
					
						
							|  |  |  |                 # currently use `logcat -v tag`, which shows each line as if it | 
					
						
							|  |  |  |                 # was a separate log entry, but strips a single trailing | 
					
						
							|  |  |  |                 # newline. | 
					
						
							|  |  |  |                 # | 
					
						
							|  |  |  |                 # On newer versions of Android, all three of the above tools (or | 
					
						
							|  |  |  |                 # maybe Logcat itself) will also strip any number of leading | 
					
						
							|  |  |  |                 # newlines. | 
					
						
							|  |  |  |                 write(b"\nx", ["", "x"] if api_level < 30 else ["x"]) | 
					
						
							|  |  |  |                 write(b"\na\n", ["", "a"] if api_level < 30 else ["a"]) | 
					
						
							|  |  |  |                 write(b"\n", [""]) | 
					
						
							|  |  |  |                 write(b"b\n", ["b"]) | 
					
						
							|  |  |  |                 write(b"c\n\n", ["c", ""]) | 
					
						
							|  |  |  |                 write(b"d\ne", ["d", "e"]) | 
					
						
							|  |  |  |                 write(b"xx", ["xx"]) | 
					
						
							|  |  |  |                 write(b"f\n\ng", ["f", "", "g"]) | 
					
						
							|  |  |  |                 write(b"\n", [""]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 # "\r\n" should be translated into "\n". | 
					
						
							|  |  |  |                 write(b"hello\r\n", ["hello"]) | 
					
						
							|  |  |  |                 write(b"hello\r\nworld\r\n", ["hello", "world"]) | 
					
						
							|  |  |  |                 write(b"\r\n", [""]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 # Other bytes-like objects are accepted. | 
					
						
							|  |  |  |                 write(bytearray(b"bytearray")) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 mv = memoryview(b"memoryview") | 
					
						
							|  |  |  |                 write(mv, ["memoryview"])  # Continuous | 
					
						
							|  |  |  |                 write(mv[::2], ["mmrve"])  # Discontinuous | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 write( | 
					
						
							|  |  |  |                     # Android only supports little-endian architectures, so the | 
					
						
							|  |  |  |                     # bytes representation is as follows: | 
					
						
							|  |  |  |                     array("H", [ | 
					
						
							|  |  |  |                         0,      # 00 00 | 
					
						
							|  |  |  |                         1,      # 01 00 | 
					
						
							|  |  |  |                         65534,  # FE FF | 
					
						
							|  |  |  |                         65535,  # FF FF | 
					
						
							|  |  |  |                     ]), | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                     # After encoding null bytes with modified UTF-8, the only | 
					
						
							|  |  |  |                     # valid UTF-8 sequence is \x01. All other bytes are handled | 
					
						
							|  |  |  |                     # by backslashreplace. | 
					
						
							|  |  |  |                     ["\\xc0\\x80\\xc0\\x80" | 
					
						
							|  |  |  |                      "\x01\\xc0\\x80" | 
					
						
							|  |  |  |                      "\\xfe\\xff" | 
					
						
							|  |  |  |                      "\\xff\\xff"], | 
					
						
							|  |  |  |                     write_len=8, | 
					
						
							|  |  |  |                 ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 # Non-bytes-like classes are not accepted. | 
					
						
							|  |  |  |                 for obj in ["", "hello", None, 42]: | 
					
						
							|  |  |  |                     with self.subTest(obj=obj): | 
					
						
							|  |  |  |                         with self.assertRaisesRegex( | 
					
						
							|  |  |  |                             TypeError, | 
					
						
							|  |  |  |                             fr"write\(\) argument must be bytes-like, not " | 
					
						
							|  |  |  |                             fr"{type(obj).__name__}" | 
					
						
							|  |  |  |                         ): | 
					
						
							|  |  |  |                             stream.write(obj) | 
					
						
							| 
									
										
										
										
											2024-08-06 05:28:58 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-09-13 05:58:11 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | class TestAndroidRateLimit(unittest.TestCase): | 
					
						
							| 
									
										
										
										
											2024-08-06 05:28:58 +01:00
										 |  |  |     def test_rate_limit(self): | 
					
						
							|  |  |  |         # https://cs.android.com/android/platform/superproject/+/android-14.0.0_r1:system/logging/liblog/include/log/log_read.h;l=39 | 
					
						
							|  |  |  |         PER_MESSAGE_OVERHEAD = 28 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # https://developer.android.com/ndk/reference/group/logging | 
					
						
							|  |  |  |         ANDROID_LOG_DEBUG = 3 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # To avoid flooding the test script output, use a different tag rather | 
					
						
							|  |  |  |         # than stdout or stderr. | 
					
						
							|  |  |  |         tag = "python.rate_limit" | 
					
						
							|  |  |  |         stream = TextLogStream(ANDROID_LOG_DEBUG, tag) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Make a test message which consumes 1 KB of the logcat buffer. | 
					
						
							|  |  |  |         message = "Line {:03d} " | 
					
						
							|  |  |  |         message += "." * ( | 
					
						
							|  |  |  |             1024 - PER_MESSAGE_OVERHEAD - len(tag) - len(message.format(0)) | 
					
						
							|  |  |  |         ) + "\n" | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-09-13 05:58:11 +01:00
										 |  |  |         # To avoid depending on the performance of the test device, we mock the | 
					
						
							|  |  |  |         # passage of time. | 
					
						
							|  |  |  |         mock_now = time() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         def mock_time(): | 
					
						
							|  |  |  |             # Avoid division by zero by simulating a small delay. | 
					
						
							|  |  |  |             mock_sleep(0.0001) | 
					
						
							|  |  |  |             return mock_now | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         def mock_sleep(duration): | 
					
						
							|  |  |  |             nonlocal mock_now | 
					
						
							|  |  |  |             mock_now += duration | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-08-06 05:28:58 +01:00
										 |  |  |         # See _android_support.py. The default values of these parameters work | 
					
						
							|  |  |  |         # well across a wide range of devices, but we'll use smaller values to | 
					
						
							|  |  |  |         # ensure a quick and reliable test that doesn't flood the log too much. | 
					
						
							|  |  |  |         MAX_KB_PER_SECOND = 100 | 
					
						
							|  |  |  |         BUCKET_KB = 10 | 
					
						
							|  |  |  |         with ( | 
					
						
							|  |  |  |             patch("_android_support.MAX_BYTES_PER_SECOND", MAX_KB_PER_SECOND * 1024), | 
					
						
							|  |  |  |             patch("_android_support.BUCKET_SIZE", BUCKET_KB * 1024), | 
					
						
							| 
									
										
										
										
											2024-09-13 05:58:11 +01:00
										 |  |  |             patch("_android_support.sleep", mock_sleep), | 
					
						
							|  |  |  |             patch("_android_support.time", mock_time), | 
					
						
							| 
									
										
										
										
											2024-08-06 05:28:58 +01:00
										 |  |  |         ): | 
					
						
							|  |  |  |             # Make sure the token bucket is full. | 
					
						
							| 
									
										
										
										
											2024-09-13 05:58:11 +01:00
										 |  |  |             stream.write("Initial message to reset _prev_write_time") | 
					
						
							|  |  |  |             mock_sleep(BUCKET_KB / MAX_KB_PER_SECOND) | 
					
						
							| 
									
										
										
										
											2024-08-06 05:28:58 +01:00
										 |  |  |             line_num = 0 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Write BUCKET_KB messages, and return the rate at which they were | 
					
						
							|  |  |  |             # accepted in KB per second. | 
					
						
							|  |  |  |             def write_bucketful(): | 
					
						
							|  |  |  |                 nonlocal line_num | 
					
						
							| 
									
										
										
										
											2024-09-13 05:58:11 +01:00
										 |  |  |                 start = mock_time() | 
					
						
							| 
									
										
										
										
											2024-08-06 05:28:58 +01:00
										 |  |  |                 max_line_num = line_num + BUCKET_KB | 
					
						
							|  |  |  |                 while line_num < max_line_num: | 
					
						
							|  |  |  |                     stream.write(message.format(line_num)) | 
					
						
							|  |  |  |                     line_num += 1 | 
					
						
							| 
									
										
										
										
											2024-09-13 05:58:11 +01:00
										 |  |  |                 return BUCKET_KB / (mock_time() - start) | 
					
						
							| 
									
										
										
										
											2024-08-06 05:28:58 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |             # The first bucketful should be written with minimal delay. The | 
					
						
							|  |  |  |             # factor of 2 here is not arbitrary: it verifies that the system can | 
					
						
							|  |  |  |             # write fast enough to empty the bucket within two bucketfuls, which | 
					
						
							|  |  |  |             # the next part of the test depends on. | 
					
						
							|  |  |  |             self.assertGreater(write_bucketful(), MAX_KB_PER_SECOND * 2) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Write another bucketful to empty the token bucket completely. | 
					
						
							|  |  |  |             write_bucketful() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # The next bucketful should be written at the rate limit. | 
					
						
							|  |  |  |             self.assertAlmostEqual( | 
					
						
							|  |  |  |                 write_bucketful(), MAX_KB_PER_SECOND, | 
					
						
							|  |  |  |                 delta=MAX_KB_PER_SECOND * 0.1 | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Once the token bucket refills, we should go back to full speed. | 
					
						
							| 
									
										
										
										
											2024-09-13 05:58:11 +01:00
										 |  |  |             mock_sleep(BUCKET_KB / MAX_KB_PER_SECOND) | 
					
						
							| 
									
										
										
										
											2024-08-06 05:28:58 +01:00
										 |  |  |             self.assertGreater(write_bucketful(), MAX_KB_PER_SECOND * 2) |