mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 07:31:38 +00:00 
			
		
		
		
	Add more syslog tests (GH-97953)
This commit is contained in:
		
							parent
							
								
									80b3e32d62
								
							
						
					
					
						commit
						cae7d1d7a7
					
				
					 4 changed files with 95 additions and 8 deletions
				
			
		| 
						 | 
					@ -429,6 +429,26 @@ def hook(event, args):
 | 
				
			||||||
    sys.addaudithook(hook)
 | 
					    sys.addaudithook(hook)
 | 
				
			||||||
    _wmi.exec_query("SELECT * FROM Win32_OperatingSystem")
 | 
					    _wmi.exec_query("SELECT * FROM Win32_OperatingSystem")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def test_syslog():
 | 
				
			||||||
 | 
					    import syslog
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def hook(event, args):
 | 
				
			||||||
 | 
					        if event.startswith("syslog."):
 | 
				
			||||||
 | 
					            print(event, *args)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    sys.addaudithook(hook)
 | 
				
			||||||
 | 
					    syslog.openlog('python')
 | 
				
			||||||
 | 
					    syslog.syslog('test')
 | 
				
			||||||
 | 
					    syslog.setlogmask(syslog.LOG_DEBUG)
 | 
				
			||||||
 | 
					    syslog.closelog()
 | 
				
			||||||
 | 
					    # implicit open
 | 
				
			||||||
 | 
					    syslog.syslog('test2')
 | 
				
			||||||
 | 
					    # open with default ident
 | 
				
			||||||
 | 
					    syslog.openlog(logoption=syslog.LOG_NDELAY, facility=syslog.LOG_LOCAL0)
 | 
				
			||||||
 | 
					    sys.argv = None
 | 
				
			||||||
 | 
					    syslog.openlog()
 | 
				
			||||||
 | 
					    syslog.closelog()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
if __name__ == "__main__":
 | 
					if __name__ == "__main__":
 | 
				
			||||||
    from test.support import suppress_msvcrt_asserts
 | 
					    from test.support import suppress_msvcrt_asserts
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -16,6 +16,7 @@
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class AuditTest(unittest.TestCase):
 | 
					class AuditTest(unittest.TestCase):
 | 
				
			||||||
 | 
					    maxDiff = None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @support.requires_subprocess()
 | 
					    @support.requires_subprocess()
 | 
				
			||||||
    def do_test(self, *args):
 | 
					    def do_test(self, *args):
 | 
				
			||||||
| 
						 | 
					@ -185,7 +186,6 @@ def test_sys_getframe(self):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.assertEqual(actual, expected)
 | 
					        self.assertEqual(actual, expected)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					 | 
				
			||||||
    def test_wmi_exec_query(self):
 | 
					    def test_wmi_exec_query(self):
 | 
				
			||||||
        import_helper.import_module("_wmi")
 | 
					        import_helper.import_module("_wmi")
 | 
				
			||||||
        returncode, events, stderr = self.run_python("test_wmi_exec_query")
 | 
					        returncode, events, stderr = self.run_python("test_wmi_exec_query")
 | 
				
			||||||
| 
						 | 
					@ -199,6 +199,29 @@ def test_wmi_exec_query(self):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.assertEqual(actual, expected)
 | 
					        self.assertEqual(actual, expected)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_syslog(self):
 | 
				
			||||||
 | 
					        syslog = import_helper.import_module("syslog")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        returncode, events, stderr = self.run_python("test_syslog")
 | 
				
			||||||
 | 
					        if returncode:
 | 
				
			||||||
 | 
					            self.fail(stderr)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if support.verbose:
 | 
				
			||||||
 | 
					            print('Events:', *events, sep='\n  ')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.assertSequenceEqual(
 | 
				
			||||||
 | 
					            events,
 | 
				
			||||||
 | 
					            [('syslog.openlog', ' ', f'python 0 {syslog.LOG_USER}'),
 | 
				
			||||||
 | 
					            ('syslog.syslog', ' ', f'{syslog.LOG_INFO} test'),
 | 
				
			||||||
 | 
					            ('syslog.setlogmask', ' ', f'{syslog.LOG_DEBUG}'),
 | 
				
			||||||
 | 
					            ('syslog.closelog', '', ''),
 | 
				
			||||||
 | 
					            ('syslog.syslog', ' ', f'{syslog.LOG_INFO} test2'),
 | 
				
			||||||
 | 
					            ('syslog.openlog', ' ', f'audit-tests.py 0 {syslog.LOG_USER}'),
 | 
				
			||||||
 | 
					            ('syslog.openlog', ' ', f'audit-tests.py {syslog.LOG_NDELAY} {syslog.LOG_LOCAL0}'),
 | 
				
			||||||
 | 
					            ('syslog.openlog', ' ', f'None 0 {syslog.LOG_USER}'),
 | 
				
			||||||
 | 
					            ('syslog.closelog', '', '')]
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
if __name__ == "__main__":
 | 
					if __name__ == "__main__":
 | 
				
			||||||
    unittest.main()
 | 
					    unittest.main()
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,5 +1,9 @@
 | 
				
			||||||
from test.support import import_helper
 | 
					from test.support import import_helper, threading_helper
 | 
				
			||||||
syslog = import_helper.import_module("syslog") #skip if not supported
 | 
					syslog = import_helper.import_module("syslog") #skip if not supported
 | 
				
			||||||
 | 
					from test import support
 | 
				
			||||||
 | 
					import sys
 | 
				
			||||||
 | 
					import threading
 | 
				
			||||||
 | 
					import time
 | 
				
			||||||
import unittest
 | 
					import unittest
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# XXX(nnorwitz): This test sucks.  I don't know of a platform independent way
 | 
					# XXX(nnorwitz): This test sucks.  I don't know of a platform independent way
 | 
				
			||||||
| 
						 | 
					@ -8,6 +12,9 @@
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class Test(unittest.TestCase):
 | 
					class Test(unittest.TestCase):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def tearDown(self):
 | 
				
			||||||
 | 
					        syslog.closelog()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def test_openlog(self):
 | 
					    def test_openlog(self):
 | 
				
			||||||
        syslog.openlog('python')
 | 
					        syslog.openlog('python')
 | 
				
			||||||
        # Issue #6697.
 | 
					        # Issue #6697.
 | 
				
			||||||
| 
						 | 
					@ -18,22 +25,59 @@ def test_syslog(self):
 | 
				
			||||||
        syslog.syslog('test message from python test_syslog')
 | 
					        syslog.syslog('test message from python test_syslog')
 | 
				
			||||||
        syslog.syslog(syslog.LOG_ERR, 'test error from python test_syslog')
 | 
					        syslog.syslog(syslog.LOG_ERR, 'test error from python test_syslog')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_syslog_implicit_open(self):
 | 
				
			||||||
 | 
					        syslog.closelog() # Make sure log is closed
 | 
				
			||||||
 | 
					        syslog.syslog('test message from python test_syslog')
 | 
				
			||||||
 | 
					        syslog.syslog(syslog.LOG_ERR, 'test error from python test_syslog')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def test_closelog(self):
 | 
					    def test_closelog(self):
 | 
				
			||||||
        syslog.openlog('python')
 | 
					        syslog.openlog('python')
 | 
				
			||||||
        syslog.closelog()
 | 
					        syslog.closelog()
 | 
				
			||||||
 | 
					        syslog.closelog()  # idempotent operation
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def test_setlogmask(self):
 | 
					    def test_setlogmask(self):
 | 
				
			||||||
        syslog.setlogmask(syslog.LOG_DEBUG)
 | 
					        mask = syslog.LOG_UPTO(syslog.LOG_WARNING)
 | 
				
			||||||
 | 
					        oldmask = syslog.setlogmask(mask)
 | 
				
			||||||
 | 
					        self.assertEqual(syslog.setlogmask(0), mask)
 | 
				
			||||||
 | 
					        self.assertEqual(syslog.setlogmask(oldmask), mask)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def test_log_mask(self):
 | 
					    def test_log_mask(self):
 | 
				
			||||||
        syslog.LOG_MASK(syslog.LOG_INFO)
 | 
					        mask = syslog.LOG_UPTO(syslog.LOG_WARNING)
 | 
				
			||||||
 | 
					        self.assertTrue(mask & syslog.LOG_MASK(syslog.LOG_WARNING))
 | 
				
			||||||
    def test_log_upto(self):
 | 
					        self.assertTrue(mask & syslog.LOG_MASK(syslog.LOG_ERR))
 | 
				
			||||||
        syslog.LOG_UPTO(syslog.LOG_INFO)
 | 
					        self.assertFalse(mask & syslog.LOG_MASK(syslog.LOG_INFO))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def test_openlog_noargs(self):
 | 
					    def test_openlog_noargs(self):
 | 
				
			||||||
        syslog.openlog()
 | 
					        syslog.openlog()
 | 
				
			||||||
        syslog.syslog('test message from python test_syslog')
 | 
					        syslog.syslog('test message from python test_syslog')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @threading_helper.requires_working_threading()
 | 
				
			||||||
 | 
					    def test_syslog_threaded(self):
 | 
				
			||||||
 | 
					        start = threading.Event()
 | 
				
			||||||
 | 
					        stop = False
 | 
				
			||||||
 | 
					        def opener():
 | 
				
			||||||
 | 
					            start.wait(10)
 | 
				
			||||||
 | 
					            i = 1
 | 
				
			||||||
 | 
					            while not stop:
 | 
				
			||||||
 | 
					                syslog.openlog(f'python-test-{i}')  # new string object
 | 
				
			||||||
 | 
					                i += 1
 | 
				
			||||||
 | 
					        def logger():
 | 
				
			||||||
 | 
					            start.wait(10)
 | 
				
			||||||
 | 
					            while not stop:
 | 
				
			||||||
 | 
					                syslog.syslog('test message from python test_syslog')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        orig_si = sys.getswitchinterval()
 | 
				
			||||||
 | 
					        support.setswitchinterval(1e-9)
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            threads = [threading.Thread(target=opener)]
 | 
				
			||||||
 | 
					            threads += [threading.Thread(target=logger) for k in range(10)]
 | 
				
			||||||
 | 
					            with threading_helper.start_threads(threads):
 | 
				
			||||||
 | 
					                start.set()
 | 
				
			||||||
 | 
					                time.sleep(0.1)
 | 
				
			||||||
 | 
					                stop = True
 | 
				
			||||||
 | 
					        finally:
 | 
				
			||||||
 | 
					            sys.setswitchinterval(orig_si)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
if __name__ == "__main__":
 | 
					if __name__ == "__main__":
 | 
				
			||||||
    unittest.main()
 | 
					    unittest.main()
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -235,7 +235,7 @@ syslog_setlogmask(PyObject *self, PyObject *args)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if (!PyArg_ParseTuple(args, "l;mask for priority", &maskpri))
 | 
					    if (!PyArg_ParseTuple(args, "l;mask for priority", &maskpri))
 | 
				
			||||||
        return NULL;
 | 
					        return NULL;
 | 
				
			||||||
    if (PySys_Audit("syslog.setlogmask", "(O)", args ? args : Py_None) < 0) {
 | 
					    if (PySys_Audit("syslog.setlogmask", "l", maskpri) < 0) {
 | 
				
			||||||
        return NULL;
 | 
					        return NULL;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    omaskpri = setlogmask(maskpri);
 | 
					    omaskpri = setlogmask(maskpri);
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue