mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 21:51:50 +00:00 
			
		
		
		
	 180510d29b
			
		
	
	
		180510d29b
		
	
	
	
	
		
			
			svn+ssh://pythondev@svn.python.org/python/trunk ........ r61189 | brett.cannon | 2008-03-03 01:38:58 +0100 (Mon, 03 Mar 2008) | 5 lines Refactor test_logging to use unittest. This should finally solve the flakiness issues. Thanks to Antoine Pitrou for the patch. ........ r61190 | jeffrey.yasskin | 2008-03-03 02:27:03 +0100 (Mon, 03 Mar 2008) | 3 lines compile.c always emits END_FINALLY after WITH_CLEANUP, so predict that in ceval.c. This is worth about a .03-.04us speedup on a simple with block. ........ r61192 | brett.cannon | 2008-03-03 03:41:40 +0100 (Mon, 03 Mar 2008) | 4 lines Move test_largefile over to using 'with' statements for open files. Also rename the driver function to test_main() instead of main_test(). ........ r61194 | brett.cannon | 2008-03-03 04:24:48 +0100 (Mon, 03 Mar 2008) | 3 lines Add a note in the main test class' docstring that the order of execution of the tests is important. ........ r61195 | brett.cannon | 2008-03-03 04:26:43 +0100 (Mon, 03 Mar 2008) | 3 lines Add a note in the main test class' docstring that the order of execution of the tests is important. ........ r61198 | brett.cannon | 2008-03-03 05:19:29 +0100 (Mon, 03 Mar 2008) | 4 lines Add test_main() functions to various tests where it was simple to do. Done so that regrtest can execute the test_main() directly instead of relying on import side-effects. ........ r61199 | neal.norwitz | 2008-03-03 05:37:45 +0100 (Mon, 03 Mar 2008) | 1 line Only DECREF if ret != NULL ........
		
			
				
	
	
		
			822 lines
		
	
	
	
		
			26 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			822 lines
		
	
	
	
		
			26 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python
 | |
| #
 | |
| # Copyright 2001-2004 by Vinay Sajip. All Rights Reserved.
 | |
| #
 | |
| # Permission to use, copy, modify, and distribute this software and its
 | |
| # documentation for any purpose and without fee is hereby granted,
 | |
| # provided that the above copyright notice appear in all copies and that
 | |
| # both that copyright notice and this permission notice appear in
 | |
| # supporting documentation, and that the name of Vinay Sajip
 | |
| # not be used in advertising or publicity pertaining to distribution
 | |
| # of the software without specific, written prior permission.
 | |
| # VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING
 | |
| # ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
 | |
| # VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR
 | |
| # ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
 | |
| # IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
 | |
| # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 | |
| 
 | |
| """Test harness for the logging module. Run all tests.
 | |
| 
 | |
| Copyright (C) 2001-2002 Vinay Sajip. All Rights Reserved.
 | |
| """
 | |
| 
 | |
| import logging
 | |
| import logging.handlers
 | |
| import logging.config
 | |
| 
 | |
| import copy
 | |
| import pickle
 | |
| import io
 | |
| import gc
 | |
| import os
 | |
| import re
 | |
| import select
 | |
| import socket
 | |
| from SocketServer import ThreadingTCPServer, StreamRequestHandler
 | |
| import string
 | |
| import struct
 | |
| import sys
 | |
| import tempfile
 | |
| from test.test_support import captured_stdout, run_with_locale, run_unittest
 | |
| import textwrap
 | |
| import threading
 | |
| import time
 | |
| import types
 | |
| import unittest
 | |
| import weakref
 | |
| 
 | |
| 
 | |
| class BaseTest(unittest.TestCase):
 | |
| 
 | |
|     """Base class for logging tests."""
 | |
| 
 | |
|     log_format = "%(name)s -> %(levelname)s: %(message)s"
 | |
|     expected_log_pat = r"^([\w.]+) -> ([\w]+): ([\d]+)$"
 | |
|     message_num = 0
 | |
| 
 | |
|     def setUp(self):
 | |
|         """Setup the default logging stream to an internal StringIO instance,
 | |
|         so that we can examine log output as we want."""
 | |
|         logger_dict = logging.getLogger().manager.loggerDict
 | |
|         logging._acquireLock()
 | |
|         try:
 | |
|             self.saved_handlers = logging._handlers.copy()
 | |
|             self.saved_handler_list = logging._handlerList[:]
 | |
|             self.saved_loggers = logger_dict.copy()
 | |
|             self.saved_level_names = logging._levelNames.copy()
 | |
|         finally:
 | |
|             logging._releaseLock()
 | |
| 
 | |
|         self.root_logger = logging.getLogger("")
 | |
|         self.original_logging_level = self.root_logger.getEffectiveLevel()
 | |
| 
 | |
|         self.stream = io.StringIO()
 | |
|         self.root_logger.setLevel(logging.DEBUG)
 | |
|         self.root_hdlr = logging.StreamHandler(self.stream)
 | |
|         self.root_formatter = logging.Formatter(self.log_format)
 | |
|         self.root_hdlr.setFormatter(self.root_formatter)
 | |
|         self.root_logger.addHandler(self.root_hdlr)
 | |
| 
 | |
|     def tearDown(self):
 | |
|         """Remove our logging stream, and restore the original logging
 | |
|         level."""
 | |
|         self.stream.close()
 | |
|         self.root_logger.removeHandler(self.root_hdlr)
 | |
|         self.root_logger.setLevel(self.original_logging_level)
 | |
|         logging._acquireLock()
 | |
|         try:
 | |
|             logging._levelNames.clear()
 | |
|             logging._levelNames.update(self.saved_level_names)
 | |
|             logging._handlers.clear()
 | |
|             logging._handlers.update(self.saved_handlers)
 | |
|             logging._handlerList[:] = self.saved_handler_list
 | |
|             loggerDict = logging.getLogger().manager.loggerDict
 | |
|             loggerDict.clear()
 | |
|             loggerDict.update(self.saved_loggers)
 | |
|         finally:
 | |
|             logging._releaseLock()
 | |
| 
 | |
|     def assert_log_lines(self, expected_values, stream=None):
 | |
|         """Match the collected log lines against the regular expression
 | |
|         self.expected_log_pat, and compare the extracted group values to
 | |
|         the expected_values list of tuples."""
 | |
|         stream = stream or self.stream
 | |
|         pat = re.compile(self.expected_log_pat)
 | |
|         try:
 | |
|             stream.reset()
 | |
|             actual_lines = stream.readlines()
 | |
|         except AttributeError:
 | |
|             # StringIO.StringIO lacks a reset() method.
 | |
|             actual_lines = stream.getvalue().splitlines()
 | |
|         self.assertEquals(len(actual_lines), len(expected_values))
 | |
|         for actual, expected in zip(actual_lines, expected_values):
 | |
|             match = pat.search(actual)
 | |
|             if not match:
 | |
|                 self.fail("Log line does not match expected pattern:\n" +
 | |
|                             actual)
 | |
|             self.assertEquals(tuple(match.groups()), expected)
 | |
|         s = stream.read()
 | |
|         if s:
 | |
|             self.fail("Remaining output at end of log stream:\n" + s)
 | |
| 
 | |
|     def next_message(self):
 | |
|         """Generate a message consisting solely of an auto-incrementing
 | |
|         integer."""
 | |
|         self.message_num += 1
 | |
|         return "%d" % self.message_num
 | |
| 
 | |
| 
 | |
| class BuiltinLevelsTest(BaseTest):
 | |
|     """Test builtin levels and their inheritance."""
 | |
| 
 | |
|     def test_flat(self):
 | |
|         #Logging levels in a flat logger namespace.
 | |
|         m = self.next_message
 | |
| 
 | |
|         ERR = logging.getLogger("ERR")
 | |
|         ERR.setLevel(logging.ERROR)
 | |
|         INF = logging.getLogger("INF")
 | |
|         INF.setLevel(logging.INFO)
 | |
|         DEB = logging.getLogger("DEB")
 | |
|         DEB.setLevel(logging.DEBUG)
 | |
| 
 | |
|         # These should log.
 | |
|         ERR.log(logging.CRITICAL, m())
 | |
|         ERR.error(m())
 | |
| 
 | |
|         INF.log(logging.CRITICAL, m())
 | |
|         INF.error(m())
 | |
|         INF.warn(m())
 | |
|         INF.info(m())
 | |
| 
 | |
|         DEB.log(logging.CRITICAL, m())
 | |
|         DEB.error(m())
 | |
|         DEB.warn (m())
 | |
|         DEB.info (m())
 | |
|         DEB.debug(m())
 | |
| 
 | |
|         # These should not log.
 | |
|         ERR.warn(m())
 | |
|         ERR.info(m())
 | |
|         ERR.debug(m())
 | |
| 
 | |
|         INF.debug(m())
 | |
| 
 | |
|         self.assert_log_lines([
 | |
|             ('ERR', 'CRITICAL', '1'),
 | |
|             ('ERR', 'ERROR', '2'),
 | |
|             ('INF', 'CRITICAL', '3'),
 | |
|             ('INF', 'ERROR', '4'),
 | |
|             ('INF', 'WARNING', '5'),
 | |
|             ('INF', 'INFO', '6'),
 | |
|             ('DEB', 'CRITICAL', '7'),
 | |
|             ('DEB', 'ERROR', '8'),
 | |
|             ('DEB', 'WARNING', '9'),
 | |
|             ('DEB', 'INFO', '10'),
 | |
|             ('DEB', 'DEBUG', '11'),
 | |
|         ])
 | |
| 
 | |
|     def test_nested_explicit(self):
 | |
|         # Logging levels in a nested namespace, all explicitly set.
 | |
|         m = self.next_message
 | |
| 
 | |
|         INF = logging.getLogger("INF")
 | |
|         INF.setLevel(logging.INFO)
 | |
|         INF_ERR  = logging.getLogger("INF.ERR")
 | |
|         INF_ERR.setLevel(logging.ERROR)
 | |
| 
 | |
|         # These should log.
 | |
|         INF_ERR.log(logging.CRITICAL, m())
 | |
|         INF_ERR.error(m())
 | |
| 
 | |
|         # These should not log.
 | |
|         INF_ERR.warn(m())
 | |
|         INF_ERR.info(m())
 | |
|         INF_ERR.debug(m())
 | |
| 
 | |
|         self.assert_log_lines([
 | |
|             ('INF.ERR', 'CRITICAL', '1'),
 | |
|             ('INF.ERR', 'ERROR', '2'),
 | |
|         ])
 | |
| 
 | |
|     def test_nested_inherited(self):
 | |
|         #Logging levels in a nested namespace, inherited from parent loggers.
 | |
|         m = self.next_message
 | |
| 
 | |
|         INF = logging.getLogger("INF")
 | |
|         INF.setLevel(logging.INFO)
 | |
|         INF_ERR  = logging.getLogger("INF.ERR")
 | |
|         INF_ERR.setLevel(logging.ERROR)
 | |
|         INF_UNDEF = logging.getLogger("INF.UNDEF")
 | |
|         INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF")
 | |
|         UNDEF = logging.getLogger("UNDEF")
 | |
| 
 | |
|         # These should log.
 | |
|         INF_UNDEF.log(logging.CRITICAL, m())
 | |
|         INF_UNDEF.error(m())
 | |
|         INF_UNDEF.warn(m())
 | |
|         INF_UNDEF.info(m())
 | |
|         INF_ERR_UNDEF.log(logging.CRITICAL, m())
 | |
|         INF_ERR_UNDEF.error(m())
 | |
| 
 | |
|         # These should not log.
 | |
|         INF_UNDEF.debug(m())
 | |
|         INF_ERR_UNDEF.warn(m())
 | |
|         INF_ERR_UNDEF.info(m())
 | |
|         INF_ERR_UNDEF.debug(m())
 | |
| 
 | |
|         self.assert_log_lines([
 | |
|             ('INF.UNDEF', 'CRITICAL', '1'),
 | |
|             ('INF.UNDEF', 'ERROR', '2'),
 | |
|             ('INF.UNDEF', 'WARNING', '3'),
 | |
|             ('INF.UNDEF', 'INFO', '4'),
 | |
|             ('INF.ERR.UNDEF', 'CRITICAL', '5'),
 | |
|             ('INF.ERR.UNDEF', 'ERROR', '6'),
 | |
|         ])
 | |
| 
 | |
|     def test_nested_with_virtual_parent(self):
 | |
|         # Logging levels when some parent does not exist yet.
 | |
|         m = self.next_message
 | |
| 
 | |
|         INF = logging.getLogger("INF")
 | |
|         GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF")
 | |
|         CHILD = logging.getLogger("INF.BADPARENT")
 | |
|         INF.setLevel(logging.INFO)
 | |
| 
 | |
|         # These should log.
 | |
|         GRANDCHILD.log(logging.FATAL, m())
 | |
|         GRANDCHILD.info(m())
 | |
|         CHILD.log(logging.FATAL, m())
 | |
|         CHILD.info(m())
 | |
| 
 | |
|         # These should not log.
 | |
|         GRANDCHILD.debug(m())
 | |
|         CHILD.debug(m())
 | |
| 
 | |
|         self.assert_log_lines([
 | |
|             ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'),
 | |
|             ('INF.BADPARENT.UNDEF', 'INFO', '2'),
 | |
|             ('INF.BADPARENT', 'CRITICAL', '3'),
 | |
|             ('INF.BADPARENT', 'INFO', '4'),
 | |
|         ])
 | |
| 
 | |
| 
 | |
| class BasicFilterTest(BaseTest):
 | |
| 
 | |
|     """Test the bundled Filter class."""
 | |
| 
 | |
|     def test_filter(self):
 | |
|         # Only messages satisfying the specified criteria pass through the
 | |
|         #  filter.
 | |
|         filter_ = logging.Filter("spam.eggs")
 | |
|         handler = self.root_logger.handlers[0]
 | |
|         try:
 | |
|             handler.addFilter(filter_)
 | |
|             spam = logging.getLogger("spam")
 | |
|             spam_eggs = logging.getLogger("spam.eggs")
 | |
|             spam_eggs_fish = logging.getLogger("spam.eggs.fish")
 | |
|             spam_bakedbeans = logging.getLogger("spam.bakedbeans")
 | |
| 
 | |
|             spam.info(self.next_message())
 | |
|             spam_eggs.info(self.next_message())  # Good.
 | |
|             spam_eggs_fish.info(self.next_message())  # Good.
 | |
|             spam_bakedbeans.info(self.next_message())
 | |
| 
 | |
|             self.assert_log_lines([
 | |
|                 ('spam.eggs', 'INFO', '2'),
 | |
|                 ('spam.eggs.fish', 'INFO', '3'),
 | |
|             ])
 | |
|         finally:
 | |
|             handler.removeFilter(filter_)
 | |
| 
 | |
| 
 | |
| #
 | |
| #   First, we define our levels. There can be as many as you want - the only
 | |
| #     limitations are that they should be integers, the lowest should be > 0 and
 | |
| #   larger values mean less information being logged. If you need specific
 | |
| #   level values which do not fit into these limitations, you can use a
 | |
| #   mapping dictionary to convert between your application levels and the
 | |
| #   logging system.
 | |
| #
 | |
| SILENT      = 120
 | |
| TACITURN    = 119
 | |
| TERSE       = 118
 | |
| EFFUSIVE    = 117
 | |
| SOCIABLE    = 116
 | |
| VERBOSE     = 115
 | |
| TALKATIVE   = 114
 | |
| GARRULOUS   = 113
 | |
| CHATTERBOX  = 112
 | |
| BORING      = 111
 | |
| 
 | |
| LEVEL_RANGE = range(BORING, SILENT + 1)
 | |
| 
 | |
| #
 | |
| #   Next, we define names for our levels. You don't need to do this - in which
 | |
| #   case the system will use "Level n" to denote the text for the level.
 | |
| #
 | |
| my_logging_levels = {
 | |
|     SILENT      : 'Silent',
 | |
|     TACITURN    : 'Taciturn',
 | |
|     TERSE       : 'Terse',
 | |
|     EFFUSIVE    : 'Effusive',
 | |
|     SOCIABLE    : 'Sociable',
 | |
|     VERBOSE     : 'Verbose',
 | |
|     TALKATIVE   : 'Talkative',
 | |
|     GARRULOUS   : 'Garrulous',
 | |
|     CHATTERBOX  : 'Chatterbox',
 | |
|     BORING      : 'Boring',
 | |
| }
 | |
| 
 | |
| class GarrulousFilter(logging.Filter):
 | |
| 
 | |
|     """A filter which blocks garrulous messages."""
 | |
| 
 | |
|     def filter(self, record):
 | |
|         return record.levelno != GARRULOUS
 | |
| 
 | |
| class VerySpecificFilter(logging.Filter):
 | |
| 
 | |
|     """A filter which blocks sociable and taciturn messages."""
 | |
| 
 | |
|     def filter(self, record):
 | |
|         return record.levelno not in [SOCIABLE, TACITURN]
 | |
| 
 | |
| 
 | |
| class CustomLevelsAndFiltersTest(BaseTest):
 | |
| 
 | |
|     """Test various filtering possibilities with custom logging levels."""
 | |
| 
 | |
|     # Skip the logger name group.
 | |
|     expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
 | |
| 
 | |
|     def setUp(self):
 | |
|         BaseTest.setUp(self)
 | |
|         for k, v in list(my_logging_levels.items()):
 | |
|             logging.addLevelName(k, v)
 | |
| 
 | |
|     def log_at_all_levels(self, logger):
 | |
|         for lvl in LEVEL_RANGE:
 | |
|             logger.log(lvl, self.next_message())
 | |
| 
 | |
|     def test_logger_filter(self):
 | |
|         # Filter at logger level.
 | |
|         self.root_logger.setLevel(VERBOSE)
 | |
|         # Levels >= 'Verbose' are good.
 | |
|         self.log_at_all_levels(self.root_logger)
 | |
|         self.assert_log_lines([
 | |
|             ('Verbose', '5'),
 | |
|             ('Sociable', '6'),
 | |
|             ('Effusive', '7'),
 | |
|             ('Terse', '8'),
 | |
|             ('Taciturn', '9'),
 | |
|             ('Silent', '10'),
 | |
|         ])
 | |
| 
 | |
|     def test_handler_filter(self):
 | |
|         # Filter at handler level.
 | |
|         self.root_logger.handlers[0].setLevel(SOCIABLE)
 | |
|         try:
 | |
|             # Levels >= 'Sociable' are good.
 | |
|             self.log_at_all_levels(self.root_logger)
 | |
|             self.assert_log_lines([
 | |
|                 ('Sociable', '6'),
 | |
|                 ('Effusive', '7'),
 | |
|                 ('Terse', '8'),
 | |
|                 ('Taciturn', '9'),
 | |
|                 ('Silent', '10'),
 | |
|             ])
 | |
|         finally:
 | |
|             self.root_logger.handlers[0].setLevel(logging.NOTSET)
 | |
| 
 | |
|     def test_specific_filters(self):
 | |
|         # Set a specific filter object on the handler, and then add another
 | |
|         #  filter object on the logger itself.
 | |
|         handler = self.root_logger.handlers[0]
 | |
|         specific_filter = None
 | |
|         garr = GarrulousFilter()
 | |
|         handler.addFilter(garr)
 | |
|         try:
 | |
|             self.log_at_all_levels(self.root_logger)
 | |
|             first_lines = [
 | |
|                 # Notice how 'Garrulous' is missing
 | |
|                 ('Boring', '1'),
 | |
|                 ('Chatterbox', '2'),
 | |
|                 ('Talkative', '4'),
 | |
|                 ('Verbose', '5'),
 | |
|                 ('Sociable', '6'),
 | |
|                 ('Effusive', '7'),
 | |
|                 ('Terse', '8'),
 | |
|                 ('Taciturn', '9'),
 | |
|                 ('Silent', '10'),
 | |
|             ]
 | |
|             self.assert_log_lines(first_lines)
 | |
| 
 | |
|             specific_filter = VerySpecificFilter()
 | |
|             self.root_logger.addFilter(specific_filter)
 | |
|             self.log_at_all_levels(self.root_logger)
 | |
|             self.assert_log_lines(first_lines + [
 | |
|                 # Not only 'Garrulous' is still missing, but also 'Sociable'
 | |
|                 # and 'Taciturn'
 | |
|                 ('Boring', '11'),
 | |
|                 ('Chatterbox', '12'),
 | |
|                 ('Talkative', '14'),
 | |
|                 ('Verbose', '15'),
 | |
|                 ('Effusive', '17'),
 | |
|                 ('Terse', '18'),
 | |
|                 ('Silent', '20'),
 | |
|         ])
 | |
|         finally:
 | |
|             if specific_filter:
 | |
|                 self.root_logger.removeFilter(specific_filter)
 | |
|             handler.removeFilter(garr)
 | |
| 
 | |
| 
 | |
| class MemoryHandlerTest(BaseTest):
 | |
| 
 | |
|     """Tests for the MemoryHandler."""
 | |
| 
 | |
|     # Do not bother with a logger name group.
 | |
|     expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
 | |
| 
 | |
|     def setUp(self):
 | |
|         BaseTest.setUp(self)
 | |
|         self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
 | |
|                                                         self.root_hdlr)
 | |
|         self.mem_logger = logging.getLogger('mem')
 | |
|         self.mem_logger.propagate = 0
 | |
|         self.mem_logger.addHandler(self.mem_hdlr)
 | |
| 
 | |
|     def tearDown(self):
 | |
|         self.mem_hdlr.close()
 | |
| 
 | |
|     def test_flush(self):
 | |
|         # The memory handler flushes to its target handler based on specific
 | |
|         #  criteria (message count and message level).
 | |
|         self.mem_logger.debug(self.next_message())
 | |
|         self.assert_log_lines([])
 | |
|         self.mem_logger.info(self.next_message())
 | |
|         self.assert_log_lines([])
 | |
|         # This will flush because the level is >= logging.WARNING
 | |
|         self.mem_logger.warn(self.next_message())
 | |
|         lines = [
 | |
|             ('DEBUG', '1'),
 | |
|             ('INFO', '2'),
 | |
|             ('WARNING', '3'),
 | |
|         ]
 | |
|         self.assert_log_lines(lines)
 | |
|         for n in (4, 14):
 | |
|             for i in range(9):
 | |
|                 self.mem_logger.debug(self.next_message())
 | |
|             self.assert_log_lines(lines)
 | |
|             # This will flush because it's the 10th message since the last
 | |
|             #  flush.
 | |
|             self.mem_logger.debug(self.next_message())
 | |
|             lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)]
 | |
|             self.assert_log_lines(lines)
 | |
| 
 | |
|         self.mem_logger.debug(self.next_message())
 | |
|         self.assert_log_lines(lines)
 | |
| 
 | |
| 
 | |
| class ExceptionFormatter(logging.Formatter):
 | |
|     """A special exception formatter."""
 | |
|     def formatException(self, ei):
 | |
|         return "Got a [%s]" % ei[0].__name__
 | |
| 
 | |
| 
 | |
| class ConfigFileTest(BaseTest):
 | |
| 
 | |
|     """Reading logging config from a .ini-style config file."""
 | |
| 
 | |
|     expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$"
 | |
| 
 | |
|     # config0 is a standard configuration.
 | |
|     config0 = """
 | |
|     [loggers]
 | |
|     keys=root
 | |
| 
 | |
|     [handlers]
 | |
|     keys=hand1
 | |
| 
 | |
|     [formatters]
 | |
|     keys=form1
 | |
| 
 | |
|     [logger_root]
 | |
|     level=WARNING
 | |
|     handlers=hand1
 | |
| 
 | |
|     [handler_hand1]
 | |
|     class=StreamHandler
 | |
|     level=NOTSET
 | |
|     formatter=form1
 | |
|     args=(sys.stdout,)
 | |
| 
 | |
|     [formatter_form1]
 | |
|     format=%(levelname)s ++ %(message)s
 | |
|     datefmt=
 | |
|     """
 | |
| 
 | |
|     # config1 adds a little to the standard configuration.
 | |
|     config1 = """
 | |
|     [loggers]
 | |
|     keys=root,parser
 | |
| 
 | |
|     [handlers]
 | |
|     keys=hand1
 | |
| 
 | |
|     [formatters]
 | |
|     keys=form1
 | |
| 
 | |
|     [logger_root]
 | |
|     level=WARNING
 | |
|     handlers=
 | |
| 
 | |
|     [logger_parser]
 | |
|     level=DEBUG
 | |
|     handlers=hand1
 | |
|     propagate=1
 | |
|     qualname=compiler.parser
 | |
| 
 | |
|     [handler_hand1]
 | |
|     class=StreamHandler
 | |
|     level=NOTSET
 | |
|     formatter=form1
 | |
|     args=(sys.stdout,)
 | |
| 
 | |
|     [formatter_form1]
 | |
|     format=%(levelname)s ++ %(message)s
 | |
|     datefmt=
 | |
|     """
 | |
| 
 | |
|     # config2 has a subtle configuration error that should be reported
 | |
|     config2 = config1.replace("sys.stdout", "sys.stbout")
 | |
| 
 | |
|     # config3 has a less subtle configuration error
 | |
|     config3 = config1.replace("formatter=form1", "formatter=misspelled_name")
 | |
| 
 | |
|     # config4 specifies a custom formatter class to be loaded
 | |
|     config4 = """
 | |
|     [loggers]
 | |
|     keys=root
 | |
| 
 | |
|     [handlers]
 | |
|     keys=hand1
 | |
| 
 | |
|     [formatters]
 | |
|     keys=form1
 | |
| 
 | |
|     [logger_root]
 | |
|     level=NOTSET
 | |
|     handlers=hand1
 | |
| 
 | |
|     [handler_hand1]
 | |
|     class=StreamHandler
 | |
|     level=NOTSET
 | |
|     formatter=form1
 | |
|     args=(sys.stdout,)
 | |
| 
 | |
|     [formatter_form1]
 | |
|     class=""" + __name__ + """.ExceptionFormatter
 | |
|     format=%(levelname)s:%(name)s:%(message)s
 | |
|     datefmt=
 | |
|     """
 | |
| 
 | |
|     def apply_config(self, conf):
 | |
|         try:
 | |
|             fn = tempfile.mktemp(".ini")
 | |
|             f = open(fn, "w")
 | |
|             f.write(textwrap.dedent(conf))
 | |
|             f.close()
 | |
|             logging.config.fileConfig(fn)
 | |
|         finally:
 | |
|             os.remove(fn)
 | |
| 
 | |
|     def test_config0_ok(self):
 | |
|         # A simple config file which overrides the default settings.
 | |
|         with captured_stdout() as output:
 | |
|             self.apply_config(self.config0)
 | |
|             logger = logging.getLogger()
 | |
|             # Won't output anything
 | |
|             logger.info(self.next_message())
 | |
|             # Outputs a message
 | |
|             logger.error(self.next_message())
 | |
|             self.assert_log_lines([
 | |
|                 ('ERROR', '2'),
 | |
|             ], stream=output)
 | |
|             # Original logger output is empty.
 | |
|             self.assert_log_lines([])
 | |
| 
 | |
|     def test_config1_ok(self):
 | |
|         # A config file defining a sub-parser as well.
 | |
|         with captured_stdout() as output:
 | |
|             self.apply_config(self.config1)
 | |
|             logger = logging.getLogger("compiler.parser")
 | |
|             # Both will output a message
 | |
|             logger.info(self.next_message())
 | |
|             logger.error(self.next_message())
 | |
|             self.assert_log_lines([
 | |
|                 ('INFO', '1'),
 | |
|                 ('ERROR', '2'),
 | |
|             ], stream=output)
 | |
|             # Original logger output is empty.
 | |
|             self.assert_log_lines([])
 | |
| 
 | |
|     def test_config2_failure(self):
 | |
|         # A simple config file which overrides the default settings.
 | |
|         self.assertRaises(Exception, self.apply_config, self.config2)
 | |
| 
 | |
|     def test_config3_failure(self):
 | |
|         # A simple config file which overrides the default settings.
 | |
|         self.assertRaises(Exception, self.apply_config, self.config3)
 | |
| 
 | |
|     def test_config4_ok(self):
 | |
|         # A config file specifying a custom formatter class.
 | |
|         with captured_stdout() as output:
 | |
|             self.apply_config(self.config4)
 | |
|             logger = logging.getLogger()
 | |
|             try:
 | |
|                 raise RuntimeError()
 | |
|             except RuntimeError:
 | |
|                 logging.exception("just testing")
 | |
|             sys.stdout.seek(0)
 | |
|             self.assertEquals(output.getvalue(),
 | |
|                 "ERROR:root:just testing\nGot a [RuntimeError]\n")
 | |
|             # Original logger output is empty
 | |
|             self.assert_log_lines([])
 | |
| 
 | |
| 
 | |
| class LogRecordStreamHandler(StreamRequestHandler):
 | |
| 
 | |
|     """Handler for a streaming logging request. It saves the log message in the
 | |
|     TCP server's 'log_output' attribute."""
 | |
| 
 | |
|     TCP_LOG_END = "!!!END!!!"
 | |
| 
 | |
|     def handle(self):
 | |
|         """Handle multiple requests - each expected to be of 4-byte length,
 | |
|         followed by the LogRecord in pickle format. Logs the record
 | |
|         according to whatever policy is configured locally."""
 | |
|         while True:
 | |
|             chunk = self.connection.recv(4)
 | |
|             if len(chunk) < 4:
 | |
|                 break
 | |
|             slen = struct.unpack(">L", chunk)[0]
 | |
|             chunk = self.connection.recv(slen)
 | |
|             while len(chunk) < slen:
 | |
|                 chunk = chunk + self.connection.recv(slen - len(chunk))
 | |
|             obj = self.unpickle(chunk)
 | |
|             record = logging.makeLogRecord(obj)
 | |
|             self.handle_log_record(record)
 | |
| 
 | |
|     def unpickle(self, data):
 | |
|         return pickle.loads(data)
 | |
| 
 | |
|     def handle_log_record(self, record):
 | |
|         # If the end-of-messages sentinel is seen, tell the server to
 | |
|         #  terminate.
 | |
|         if self.TCP_LOG_END in record.msg:
 | |
|             self.server.abort = 1
 | |
|             return
 | |
|         self.server.log_output += record.msg + "\n"
 | |
| 
 | |
| 
 | |
| class LogRecordSocketReceiver(ThreadingTCPServer):
 | |
| 
 | |
|     """A simple-minded TCP socket-based logging receiver suitable for test
 | |
|     purposes."""
 | |
| 
 | |
|     allow_reuse_address = 1
 | |
|     log_output = ""
 | |
| 
 | |
|     def __init__(self, host='localhost',
 | |
|                              port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
 | |
|                      handler=LogRecordStreamHandler):
 | |
|         ThreadingTCPServer.__init__(self, (host, port), handler)
 | |
|         self.abort = False
 | |
|         self.timeout = 0.1
 | |
|         self.finished = threading.Event()
 | |
| 
 | |
|     def serve_until_stopped(self):
 | |
|         while not self.abort:
 | |
|             rd, wr, ex = select.select([self.socket.fileno()], [], [],
 | |
|                                        self.timeout)
 | |
|             if rd:
 | |
|                 self.handle_request()
 | |
|         # Notify the main thread that we're about to exit
 | |
|         self.finished.set()
 | |
|         # close the listen socket
 | |
|         self.server_close()
 | |
| 
 | |
| 
 | |
| class SocketHandlerTest(BaseTest):
 | |
| 
 | |
|     """Test for SocketHandler objects."""
 | |
| 
 | |
|     def setUp(self):
 | |
|         """Set up a TCP server to receive log messages, and a SocketHandler
 | |
|         pointing to that server's address and port."""
 | |
|         BaseTest.setUp(self)
 | |
|         self.tcpserver = LogRecordSocketReceiver(port=0)
 | |
|         self.port = self.tcpserver.socket.getsockname()[1]
 | |
|         self.threads = [
 | |
|                 threading.Thread(target=self.tcpserver.serve_until_stopped)]
 | |
|         for thread in self.threads:
 | |
|             thread.start()
 | |
| 
 | |
|         self.sock_hdlr = logging.handlers.SocketHandler('localhost', self.port)
 | |
|         self.sock_hdlr.setFormatter(self.root_formatter)
 | |
|         self.root_logger.removeHandler(self.root_logger.handlers[0])
 | |
|         self.root_logger.addHandler(self.sock_hdlr)
 | |
| 
 | |
|     def tearDown(self):
 | |
|         """Shutdown the TCP server."""
 | |
|         try:
 | |
|             self.tcpserver.abort = True
 | |
|             del self.tcpserver
 | |
|             self.root_logger.removeHandler(self.sock_hdlr)
 | |
|             self.sock_hdlr.close()
 | |
|             for thread in self.threads:
 | |
|                 thread.join(2.0)
 | |
|         finally:
 | |
|             BaseTest.tearDown(self)
 | |
| 
 | |
|     def get_output(self):
 | |
|         """Get the log output as received by the TCP server."""
 | |
|         # Signal the TCP receiver and wait for it to terminate.
 | |
|         self.root_logger.critical(LogRecordStreamHandler.TCP_LOG_END)
 | |
|         self.tcpserver.finished.wait(2.0)
 | |
|         return self.tcpserver.log_output
 | |
| 
 | |
|     def test_output(self):
 | |
|         # The log message sent to the SocketHandler is properly received.
 | |
|         logger = logging.getLogger("tcp")
 | |
|         logger.error("spam")
 | |
|         logger.debug("eggs")
 | |
|         self.assertEquals(self.get_output(), "spam\neggs\n")
 | |
| 
 | |
| 
 | |
| class MemoryTest(BaseTest):
 | |
| 
 | |
|     """Test memory persistence of logger objects."""
 | |
| 
 | |
|     def setUp(self):
 | |
|         """Create a dict to remember potentially destroyed objects."""
 | |
|         BaseTest.setUp(self)
 | |
|         self._survivors = {}
 | |
| 
 | |
|     def _watch_for_survival(self, *args):
 | |
|         """Watch the given objects for survival, by creating weakrefs to
 | |
|         them."""
 | |
|         for obj in args:
 | |
|             key = id(obj), repr(obj)
 | |
|             self._survivors[key] = weakref.ref(obj)
 | |
| 
 | |
|     def _assert_survival(self):
 | |
|         """Assert that all objects watched for survival have survived."""
 | |
|         # Trigger cycle breaking.
 | |
|         gc.collect()
 | |
|         dead = []
 | |
|         for (id_, repr_), ref in list(self._survivors.items()):
 | |
|             if ref() is None:
 | |
|                 dead.append(repr_)
 | |
|         if dead:
 | |
|             self.fail("%d objects should have survived "
 | |
|                 "but have been destroyed: %s" % (len(dead), ", ".join(dead)))
 | |
| 
 | |
|     def test_persistent_loggers(self):
 | |
|         # Logger objects are persistent and retain their configuration, even
 | |
|         #  if visible references are destroyed.
 | |
|         self.root_logger.setLevel(logging.INFO)
 | |
|         foo = logging.getLogger("foo")
 | |
|         self._watch_for_survival(foo)
 | |
|         foo.setLevel(logging.DEBUG)
 | |
|         self.root_logger.debug(self.next_message())
 | |
|         foo.debug(self.next_message())
 | |
|         self.assert_log_lines([
 | |
|             ('foo', 'DEBUG', '2'),
 | |
|         ])
 | |
|         del foo
 | |
|         # foo has survived.
 | |
|         self._assert_survival()
 | |
|         # foo has retained its settings.
 | |
|         bar = logging.getLogger("foo")
 | |
|         bar.debug(self.next_message())
 | |
|         self.assert_log_lines([
 | |
|             ('foo', 'DEBUG', '2'),
 | |
|             ('foo', 'DEBUG', '3'),
 | |
|         ])
 | |
| 
 | |
| 
 | |
| # Set the locale to the platform-dependent default.  I have no idea
 | |
| # why the test does this, but in any case we save the current locale
 | |
| # first and restore it at the end.
 | |
| @run_with_locale('LC_ALL', '')
 | |
| def test_main():
 | |
|     run_unittest(BuiltinLevelsTest, BasicFilterTest,
 | |
|                     CustomLevelsAndFiltersTest, MemoryHandlerTest,
 | |
|                     ConfigFileTest, SocketHandlerTest, MemoryTest)
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     test_main()
 |