mirror of
				https://github.com/python/cpython.git
				synced 2025-10-30 21:21:22 +00:00 
			
		
		
		
	 de9b624fb9
			
		
	
	
		de9b624fb9
		
	
	
	
	
		
			
			On the way, add a decorator to test_support to facilitate running single test functions in different locales with automatic cleanup.
		
			
				
	
	
		
			681 lines
		
	
	
	
		
			20 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			681 lines
		
	
	
	
		
			20 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python
 | |
| #
 | |
| # Copyright 2001-2004 by Vinay Sajip. All Rights Reserved.
 | |
| #
 | |
| # Permission to use, copy, modify, and distribute this software and its
 | |
| # documentation for any purpose and without fee is hereby granted,
 | |
| # provided that the above copyright notice appear in all copies and that
 | |
| # both that copyright notice and this permission notice appear in
 | |
| # supporting documentation, and that the name of Vinay Sajip
 | |
| # not be used in advertising or publicity pertaining to distribution
 | |
| # of the software without specific, written prior permission.
 | |
| # VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING
 | |
| # ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
 | |
| # VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR
 | |
| # ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
 | |
| # IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
 | |
| # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 | |
| #
 | |
| # This file is part of the Python logging distribution. See
 | |
| # http://www.red-dove.com/python_logging.html
 | |
| #
 | |
| """Test harness for the logging module. Run all tests.
 | |
| 
 | |
| Copyright (C) 2001-2002 Vinay Sajip. All Rights Reserved.
 | |
| """
 | |
| 
 | |
| import select
 | |
| import os, sys, string, struct, types, cPickle, cStringIO
 | |
| import socket, tempfile, threading, time
 | |
| import logging, logging.handlers, logging.config
 | |
| from test.test_support import run_with_locale
 | |
| 
 | |
| BANNER = "-- %-10s %-6s ---------------------------------------------------\n"
 | |
| 
 | |
| FINISH_UP = "Finish up, it's closing time. Messages should bear numbers 0 through 24."
 | |
| 
 | |
| #----------------------------------------------------------------------------
 | |
| # Log receiver
 | |
| #----------------------------------------------------------------------------
 | |
| 
 | |
| TIMEOUT         = 10
 | |
| 
 | |
| from SocketServer import ThreadingTCPServer, StreamRequestHandler
 | |
| 
 | |
| class LogRecordStreamHandler(StreamRequestHandler):
 | |
|     """
 | |
|     Handler for a streaming logging request. It basically logs the record
 | |
|     using whatever logging policy is configured locally.
 | |
|     """
 | |
| 
 | |
|     def handle(self):
 | |
|         """
 | |
|         Handle multiple requests - each expected to be a 4-byte length,
 | |
|         followed by the LogRecord in pickle format. Logs the record
 | |
|         according to whatever policy is configured locally.
 | |
|         """
 | |
|         while 1:
 | |
|             try:
 | |
|                 chunk = self.connection.recv(4)
 | |
|                 if len(chunk) < 4:
 | |
|                     break
 | |
|                 slen = struct.unpack(">L", chunk)[0]
 | |
|                 chunk = self.connection.recv(slen)
 | |
|                 while len(chunk) < slen:
 | |
|                     chunk = chunk + self.connection.recv(slen - len(chunk))
 | |
|                 obj = self.unPickle(chunk)
 | |
|                 record = logging.makeLogRecord(obj)
 | |
|                 self.handleLogRecord(record)
 | |
|             except:
 | |
|                 raise
 | |
| 
 | |
|     def unPickle(self, data):
 | |
|         return cPickle.loads(data)
 | |
| 
 | |
|     def handleLogRecord(self, record):
 | |
|         logname = "logrecv.tcp." + record.name
 | |
|         #If the end-of-messages sentinel is seen, tell the server to terminate
 | |
|         if record.msg == FINISH_UP:
 | |
|             self.server.abort = 1
 | |
|         record.msg = record.msg + " (via " + logname + ")"
 | |
|         logger = logging.getLogger(logname)
 | |
|         logger.handle(record)
 | |
| 
 | |
| # The server sets socketDataProcessed when it's done.
 | |
| socketDataProcessed = threading.Event()
 | |
| 
 | |
| class LogRecordSocketReceiver(ThreadingTCPServer):
 | |
|     """
 | |
|     A simple-minded TCP socket-based logging receiver suitable for test
 | |
|     purposes.
 | |
|     """
 | |
| 
 | |
|     allow_reuse_address = 1
 | |
| 
 | |
|     def __init__(self, host='localhost',
 | |
|                              port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
 | |
|                      handler=LogRecordStreamHandler):
 | |
|         ThreadingTCPServer.__init__(self, (host, port), handler)
 | |
|         self.abort = 0
 | |
|         self.timeout = 1
 | |
| 
 | |
|     def serve_until_stopped(self):
 | |
|         while not self.abort:
 | |
|             rd, wr, ex = select.select([self.socket.fileno()], [], [],
 | |
|                                        self.timeout)
 | |
|             if rd:
 | |
|                 self.handle_request()
 | |
|         #notify the main thread that we're about to exit
 | |
|         socketDataProcessed.set()
 | |
|         # close the listen socket
 | |
|         self.server_close()
 | |
| 
 | |
|     def process_request(self, request, client_address):
 | |
|         #import threading
 | |
|         t = threading.Thread(target = self.finish_request,
 | |
|                              args = (request, client_address))
 | |
|         t.start()
 | |
| 
 | |
| def runTCP(tcpserver):
 | |
|     tcpserver.serve_until_stopped()
 | |
| 
 | |
| #----------------------------------------------------------------------------
 | |
| # Test 0
 | |
| #----------------------------------------------------------------------------
 | |
| 
 | |
| msgcount = 0
 | |
| 
 | |
| def nextmessage():
 | |
|     global msgcount
 | |
|     rv = "Message %d" % msgcount
 | |
|     msgcount = msgcount + 1
 | |
|     return rv
 | |
| 
 | |
| def test0():
 | |
|     ERR = logging.getLogger("ERR")
 | |
|     ERR.setLevel(logging.ERROR)
 | |
|     INF = logging.getLogger("INF")
 | |
|     INF.setLevel(logging.INFO)
 | |
|     INF_ERR  = logging.getLogger("INF.ERR")
 | |
|     INF_ERR.setLevel(logging.ERROR)
 | |
|     DEB = logging.getLogger("DEB")
 | |
|     DEB.setLevel(logging.DEBUG)
 | |
| 
 | |
|     INF_UNDEF = logging.getLogger("INF.UNDEF")
 | |
|     INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF")
 | |
|     UNDEF = logging.getLogger("UNDEF")
 | |
| 
 | |
|     GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF")
 | |
|     CHILD = logging.getLogger("INF.BADPARENT")
 | |
| 
 | |
|     #These should log
 | |
|     ERR.log(logging.FATAL, nextmessage())
 | |
|     ERR.error(nextmessage())
 | |
| 
 | |
|     INF.log(logging.FATAL, nextmessage())
 | |
|     INF.error(nextmessage())
 | |
|     INF.warn(nextmessage())
 | |
|     INF.info(nextmessage())
 | |
| 
 | |
|     INF_UNDEF.log(logging.FATAL, nextmessage())
 | |
|     INF_UNDEF.error(nextmessage())
 | |
|     INF_UNDEF.warn (nextmessage())
 | |
|     INF_UNDEF.info (nextmessage())
 | |
| 
 | |
|     INF_ERR.log(logging.FATAL, nextmessage())
 | |
|     INF_ERR.error(nextmessage())
 | |
| 
 | |
|     INF_ERR_UNDEF.log(logging.FATAL, nextmessage())
 | |
|     INF_ERR_UNDEF.error(nextmessage())
 | |
| 
 | |
|     DEB.log(logging.FATAL, nextmessage())
 | |
|     DEB.error(nextmessage())
 | |
|     DEB.warn (nextmessage())
 | |
|     DEB.info (nextmessage())
 | |
|     DEB.debug(nextmessage())
 | |
| 
 | |
|     UNDEF.log(logging.FATAL, nextmessage())
 | |
|     UNDEF.error(nextmessage())
 | |
|     UNDEF.warn (nextmessage())
 | |
|     UNDEF.info (nextmessage())
 | |
| 
 | |
|     GRANDCHILD.log(logging.FATAL, nextmessage())
 | |
|     CHILD.log(logging.FATAL, nextmessage())
 | |
| 
 | |
|     #These should not log
 | |
|     ERR.warn(nextmessage())
 | |
|     ERR.info(nextmessage())
 | |
|     ERR.debug(nextmessage())
 | |
| 
 | |
|     INF.debug(nextmessage())
 | |
|     INF_UNDEF.debug(nextmessage())
 | |
| 
 | |
|     INF_ERR.warn(nextmessage())
 | |
|     INF_ERR.info(nextmessage())
 | |
|     INF_ERR.debug(nextmessage())
 | |
|     INF_ERR_UNDEF.warn(nextmessage())
 | |
|     INF_ERR_UNDEF.info(nextmessage())
 | |
|     INF_ERR_UNDEF.debug(nextmessage())
 | |
| 
 | |
|     INF.info(FINISH_UP)
 | |
| 
 | |
| #----------------------------------------------------------------------------
 | |
| # Test 1
 | |
| #----------------------------------------------------------------------------
 | |
| 
 | |
| #
 | |
| #   First, we define our levels. There can be as many as you want - the only
 | |
| #     limitations are that they should be integers, the lowest should be > 0 and
 | |
| #   larger values mean less information being logged. If you need specific
 | |
| #   level values which do not fit into these limitations, you can use a
 | |
| #   mapping dictionary to convert between your application levels and the
 | |
| #   logging system.
 | |
| #
 | |
| SILENT      = 10
 | |
| TACITURN    = 9
 | |
| TERSE       = 8
 | |
| EFFUSIVE    = 7
 | |
| SOCIABLE    = 6
 | |
| VERBOSE     = 5
 | |
| TALKATIVE   = 4
 | |
| GARRULOUS   = 3
 | |
| CHATTERBOX  = 2
 | |
| BORING      = 1
 | |
| 
 | |
| LEVEL_RANGE = range(BORING, SILENT + 1)
 | |
| 
 | |
| #
 | |
| #   Next, we define names for our levels. You don't need to do this - in which
 | |
| #   case the system will use "Level n" to denote the text for the level.
 | |
| #
 | |
| my_logging_levels = {
 | |
|     SILENT      : 'Silent',
 | |
|     TACITURN    : 'Taciturn',
 | |
|     TERSE       : 'Terse',
 | |
|     EFFUSIVE    : 'Effusive',
 | |
|     SOCIABLE    : 'Sociable',
 | |
|     VERBOSE     : 'Verbose',
 | |
|     TALKATIVE   : 'Talkative',
 | |
|     GARRULOUS   : 'Garrulous',
 | |
|     CHATTERBOX  : 'Chatterbox',
 | |
|     BORING      : 'Boring',
 | |
| }
 | |
| 
 | |
| #
 | |
| #   Now, to demonstrate filtering: suppose for some perverse reason we only
 | |
| #   want to print out all except GARRULOUS messages. Let's create a filter for
 | |
| #   this purpose...
 | |
| #
 | |
| class SpecificLevelFilter(logging.Filter):
 | |
|     def __init__(self, lvl):
 | |
|         self.level = lvl
 | |
| 
 | |
|     def filter(self, record):
 | |
|         return self.level != record.levelno
 | |
| 
 | |
| class GarrulousFilter(SpecificLevelFilter):
 | |
|     def __init__(self):
 | |
|         SpecificLevelFilter.__init__(self, GARRULOUS)
 | |
| 
 | |
| #
 | |
| #   Now, let's demonstrate filtering at the logger. This time, use a filter
 | |
| #   which excludes SOCIABLE and TACITURN messages. Note that GARRULOUS events
 | |
| #   are still excluded.
 | |
| #
 | |
| class VerySpecificFilter(logging.Filter):
 | |
|     def filter(self, record):
 | |
|         return record.levelno not in [SOCIABLE, TACITURN]
 | |
| 
 | |
| def message(s):
 | |
|     sys.stdout.write("%s\n" % s)
 | |
| 
 | |
| SHOULD1 = "This should only be seen at the '%s' logging level (or lower)"
 | |
| 
 | |
| def test1():
 | |
| #
 | |
| #   Now, tell the logging system to associate names with our levels.
 | |
| #
 | |
|     for lvl in my_logging_levels.keys():
 | |
|         logging.addLevelName(lvl, my_logging_levels[lvl])
 | |
| 
 | |
| #
 | |
| #   Now, define a test function which logs an event at each of our levels.
 | |
| #
 | |
| 
 | |
|     def doLog(log):
 | |
|         for lvl in LEVEL_RANGE:
 | |
|             log.log(lvl, SHOULD1, logging.getLevelName(lvl))
 | |
| 
 | |
|     log = logging.getLogger("")
 | |
|     hdlr = log.handlers[0]
 | |
| #
 | |
| #   Set the logging level to each different value and call the utility
 | |
| #   function to log events.
 | |
| #   In the output, you should see that each time round the loop, the number of
 | |
| #   logging events which are actually output decreases.
 | |
| #
 | |
|     for lvl in LEVEL_RANGE:
 | |
|         message("-- setting logging level to '%s' -----" %
 | |
|                         logging.getLevelName(lvl))
 | |
|         log.setLevel(lvl)
 | |
|         doLog(log)
 | |
|   #
 | |
|   #   Now, we demonstrate level filtering at the handler level. Tell the
 | |
|   #   handler defined above to filter at level 'SOCIABLE', and repeat the
 | |
|   #   above loop. Compare the output from the two runs.
 | |
|   #
 | |
|     hdlr.setLevel(SOCIABLE)
 | |
|     message("-- Filtering at handler level to SOCIABLE --")
 | |
|     for lvl in LEVEL_RANGE:
 | |
|         message("-- setting logging level to '%s' -----" %
 | |
|                       logging.getLevelName(lvl))
 | |
|         log.setLevel(lvl)
 | |
|         doLog(log)
 | |
| 
 | |
|     hdlr.setLevel(0)    #turn off level filtering at the handler
 | |
| 
 | |
|     garr = GarrulousFilter()
 | |
|     hdlr.addFilter(garr)
 | |
|     message("-- Filtering using GARRULOUS filter --")
 | |
|     for lvl in LEVEL_RANGE:
 | |
|         message("-- setting logging level to '%s' -----" %
 | |
|                         logging.getLevelName(lvl))
 | |
|         log.setLevel(lvl)
 | |
|         doLog(log)
 | |
|     spec = VerySpecificFilter()
 | |
|     log.addFilter(spec)
 | |
|     message("-- Filtering using specific filter for SOCIABLE, TACITURN --")
 | |
|     for lvl in LEVEL_RANGE:
 | |
|         message("-- setting logging level to '%s' -----" %
 | |
|                       logging.getLevelName(lvl))
 | |
|         log.setLevel(lvl)
 | |
|         doLog(log)
 | |
| 
 | |
|     log.removeFilter(spec)
 | |
|     hdlr.removeFilter(garr)
 | |
|     #Undo the one level which clashes...for regression tests
 | |
|     logging.addLevelName(logging.DEBUG, "DEBUG")
 | |
| 
 | |
| #----------------------------------------------------------------------------
 | |
| # Test 2
 | |
| #----------------------------------------------------------------------------
 | |
| 
 | |
| MSG = "-- logging %d at INFO, messages should be seen every 10 events --"
 | |
| def test2():
 | |
|     logger = logging.getLogger("")
 | |
|     sh = logger.handlers[0]
 | |
|     sh.close()
 | |
|     logger.removeHandler(sh)
 | |
|     mh = logging.handlers.MemoryHandler(10,logging.WARNING, sh)
 | |
|     logger.setLevel(logging.DEBUG)
 | |
|     logger.addHandler(mh)
 | |
|     message("-- logging at DEBUG, nothing should be seen yet --")
 | |
|     logger.debug("Debug message")
 | |
|     message("-- logging at INFO, nothing should be seen yet --")
 | |
|     logger.info("Info message")
 | |
|     message("-- logging at WARNING, 3 messages should be seen --")
 | |
|     logger.warn("Warn message")
 | |
|     for i in xrange(102):
 | |
|         message(MSG % i)
 | |
|         logger.info("Info index = %d", i)
 | |
|     mh.close()
 | |
|     logger.removeHandler(mh)
 | |
|     logger.addHandler(sh)
 | |
| 
 | |
| #----------------------------------------------------------------------------
 | |
| # Test 3
 | |
| #----------------------------------------------------------------------------
 | |
| 
 | |
| FILTER = "a.b"
 | |
| 
 | |
| def doLog3():
 | |
|     logging.getLogger("a").info("Info 1")
 | |
|     logging.getLogger("a.b").info("Info 2")
 | |
|     logging.getLogger("a.c").info("Info 3")
 | |
|     logging.getLogger("a.b.c").info("Info 4")
 | |
|     logging.getLogger("a.b.c.d").info("Info 5")
 | |
|     logging.getLogger("a.bb.c").info("Info 6")
 | |
|     logging.getLogger("b").info("Info 7")
 | |
|     logging.getLogger("b.a").info("Info 8")
 | |
|     logging.getLogger("c.a.b").info("Info 9")
 | |
|     logging.getLogger("a.bb").info("Info 10")
 | |
| 
 | |
| def test3():
 | |
|     root = logging.getLogger()
 | |
|     root.setLevel(logging.DEBUG)
 | |
|     hand = root.handlers[0]
 | |
|     message("Unfiltered...")
 | |
|     doLog3()
 | |
|     message("Filtered with '%s'..." % FILTER)
 | |
|     filt = logging.Filter(FILTER)
 | |
|     hand.addFilter(filt)
 | |
|     doLog3()
 | |
|     hand.removeFilter(filt)
 | |
| 
 | |
| #----------------------------------------------------------------------------
 | |
| # Test 4
 | |
| #----------------------------------------------------------------------------
 | |
| 
 | |
| # config0 is a standard configuration.
 | |
| config0 = """
 | |
| [loggers]
 | |
| keys=root
 | |
| 
 | |
| [handlers]
 | |
| keys=hand1
 | |
| 
 | |
| [formatters]
 | |
| keys=form1
 | |
| 
 | |
| [logger_root]
 | |
| level=NOTSET
 | |
| handlers=hand1
 | |
| 
 | |
| [handler_hand1]
 | |
| class=StreamHandler
 | |
| level=NOTSET
 | |
| formatter=form1
 | |
| args=(sys.stdout,)
 | |
| 
 | |
| [formatter_form1]
 | |
| format=%(levelname)s:%(name)s:%(message)s
 | |
| datefmt=
 | |
| """
 | |
| 
 | |
| # config1 adds a little to the standard configuration.
 | |
| config1 = """
 | |
| [loggers]
 | |
| keys=root,parser
 | |
| 
 | |
| [handlers]
 | |
| keys=hand1
 | |
| 
 | |
| [formatters]
 | |
| keys=form1
 | |
| 
 | |
| [logger_root]
 | |
| level=NOTSET
 | |
| handlers=hand1
 | |
| 
 | |
| [logger_parser]
 | |
| level=DEBUG
 | |
| handlers=hand1
 | |
| propagate=1
 | |
| qualname=compiler.parser
 | |
| 
 | |
| [handler_hand1]
 | |
| class=StreamHandler
 | |
| level=NOTSET
 | |
| formatter=form1
 | |
| args=(sys.stdout,)
 | |
| 
 | |
| [formatter_form1]
 | |
| format=%(levelname)s:%(name)s:%(message)s
 | |
| datefmt=
 | |
| """
 | |
| 
 | |
| # config2 has a subtle configuration error that should be reported
 | |
| config2 = string.replace(config1, "sys.stdout", "sys.stbout")
 | |
| 
 | |
| # config3 has a less subtle configuration error
 | |
| config3 = string.replace(
 | |
|     config1, "formatter=form1", "formatter=misspelled_name")
 | |
| 
 | |
| def test4():
 | |
|     for i in range(4):
 | |
|         conf = globals()['config%d' % i]
 | |
|         sys.stdout.write('config%d: ' % i)
 | |
|         loggerDict = logging.getLogger().manager.loggerDict
 | |
|         logging._acquireLock()
 | |
|         try:
 | |
|             saved_handlers = logging._handlers.copy()
 | |
|             saved_handler_list = logging._handlerList[:]
 | |
|             saved_loggers = loggerDict.copy()
 | |
|         finally:
 | |
|             logging._releaseLock()
 | |
|         try:
 | |
|             fn = tempfile.mktemp(".ini")
 | |
|             f = open(fn, "w")
 | |
|             f.write(conf)
 | |
|             f.close()
 | |
|             try:
 | |
|                 logging.config.fileConfig(fn)
 | |
|             except:
 | |
|                 t = sys.exc_info()[0]
 | |
|                 message(str(t))
 | |
|             else:
 | |
|                 message('ok.')
 | |
|             os.remove(fn)
 | |
|         finally:
 | |
|             logging._acquireLock()
 | |
|             try:
 | |
|                 logging._handlers.clear()
 | |
|                 logging._handlers.update(saved_handlers)
 | |
|                 logging._handlerList = saved_handler_list
 | |
|                 loggerDict = logging.getLogger().manager.loggerDict
 | |
|                 loggerDict.clear()
 | |
|                 loggerDict.update(saved_loggers)
 | |
|             finally:
 | |
|                 logging._releaseLock()
 | |
| 
 | |
| #----------------------------------------------------------------------------
 | |
| # Test 5
 | |
| #----------------------------------------------------------------------------
 | |
| 
 | |
| test5_config = """
 | |
| [loggers]
 | |
| keys=root
 | |
| 
 | |
| [handlers]
 | |
| keys=hand1
 | |
| 
 | |
| [formatters]
 | |
| keys=form1
 | |
| 
 | |
| [logger_root]
 | |
| level=NOTSET
 | |
| handlers=hand1
 | |
| 
 | |
| [handler_hand1]
 | |
| class=StreamHandler
 | |
| level=NOTSET
 | |
| formatter=form1
 | |
| args=(sys.stdout,)
 | |
| 
 | |
| [formatter_form1]
 | |
| class=test.test_logging.FriendlyFormatter
 | |
| format=%(levelname)s:%(name)s:%(message)s
 | |
| datefmt=
 | |
| """
 | |
| 
 | |
| class FriendlyFormatter (logging.Formatter):
 | |
|     def formatException(self, ei):
 | |
|         return "%s... Don't panic!" % str(ei[0])
 | |
| 
 | |
| 
 | |
| def test5():
 | |
|     loggerDict = logging.getLogger().manager.loggerDict
 | |
|     logging._acquireLock()
 | |
|     try:
 | |
|         saved_handlers = logging._handlers.copy()
 | |
|         saved_handler_list = logging._handlerList[:]
 | |
|         saved_loggers = loggerDict.copy()
 | |
|     finally:
 | |
|         logging._releaseLock()
 | |
|     try:
 | |
|         fn = tempfile.mktemp(".ini")
 | |
|         f = open(fn, "w")
 | |
|         f.write(test5_config)
 | |
|         f.close()
 | |
|         logging.config.fileConfig(fn)
 | |
|         try:
 | |
|             raise KeyError
 | |
|         except KeyError:
 | |
|             logging.exception("just testing")
 | |
|         os.remove(fn)
 | |
|     finally:
 | |
|         logging._acquireLock()
 | |
|         try:
 | |
|             logging._handlers.clear()
 | |
|             logging._handlers.update(saved_handlers)
 | |
|             logging._handlerList = saved_handler_list
 | |
|             loggerDict = logging.getLogger().manager.loggerDict
 | |
|             loggerDict.clear()
 | |
|             loggerDict.update(saved_loggers)
 | |
|         finally:
 | |
|             logging._releaseLock()
 | |
| 
 | |
| 
 | |
| #----------------------------------------------------------------------------
 | |
| # Test Harness
 | |
| #----------------------------------------------------------------------------
 | |
| def banner(nm, typ):
 | |
|     sep = BANNER % (nm, typ)
 | |
|     sys.stdout.write(sep)
 | |
|     sys.stdout.flush()
 | |
| 
 | |
| def test_main_inner():
 | |
|     rootLogger = logging.getLogger("")
 | |
|     rootLogger.setLevel(logging.DEBUG)
 | |
|     hdlr = logging.StreamHandler(sys.stdout)
 | |
|     fmt = logging.Formatter(logging.BASIC_FORMAT)
 | |
|     hdlr.setFormatter(fmt)
 | |
|     rootLogger.addHandler(hdlr)
 | |
| 
 | |
|     # Find an unused port number
 | |
|     port = logging.handlers.DEFAULT_TCP_LOGGING_PORT
 | |
|     while port < logging.handlers.DEFAULT_TCP_LOGGING_PORT+100:
 | |
|         try:
 | |
|             tcpserver = LogRecordSocketReceiver(port=port)
 | |
|         except socket.error:
 | |
|             port += 1
 | |
|         else:
 | |
|             break
 | |
|     else:
 | |
|         raise ImportError, "Could not find unused port"
 | |
| 
 | |
| 
 | |
|     #Set up a handler such that all events are sent via a socket to the log
 | |
|     #receiver (logrecv).
 | |
|     #The handler will only be added to the rootLogger for some of the tests
 | |
|     shdlr = logging.handlers.SocketHandler('localhost', port)
 | |
| 
 | |
|     #Configure the logger for logrecv so events do not propagate beyond it.
 | |
|     #The sockLogger output is buffered in memory until the end of the test,
 | |
|     #and printed at the end.
 | |
|     sockOut = cStringIO.StringIO()
 | |
|     sockLogger = logging.getLogger("logrecv")
 | |
|     sockLogger.setLevel(logging.DEBUG)
 | |
|     sockhdlr = logging.StreamHandler(sockOut)
 | |
|     sockhdlr.setFormatter(logging.Formatter(
 | |
|                                    "%(name)s -> %(levelname)s: %(message)s"))
 | |
|     sockLogger.addHandler(sockhdlr)
 | |
|     sockLogger.propagate = 0
 | |
| 
 | |
|     #Set up servers
 | |
|     threads = []
 | |
|     #sys.stdout.write("About to start TCP server...\n")
 | |
|     threads.append(threading.Thread(target=runTCP, args=(tcpserver,)))
 | |
| 
 | |
|     for thread in threads:
 | |
|         thread.start()
 | |
|     try:
 | |
|         banner("log_test0", "begin")
 | |
| 
 | |
|         rootLogger.addHandler(shdlr)
 | |
|         test0()
 | |
|         # XXX(nnorwitz): Try to fix timing related test failures.
 | |
|         # This sleep gives us some extra time to read messages.
 | |
|         # The test generally only fails on Solaris without this sleep.
 | |
|         time.sleep(2.0)
 | |
|         shdlr.close()
 | |
|         rootLogger.removeHandler(shdlr)
 | |
| 
 | |
|         banner("log_test0", "end")
 | |
| 
 | |
|         for t in range(1,6):
 | |
|             banner("log_test%d" % t, "begin")
 | |
|             globals()['test%d' % t]()
 | |
|             banner("log_test%d" % t, "end")
 | |
| 
 | |
|     finally:
 | |
|         #wait for TCP receiver to terminate
 | |
|         socketDataProcessed.wait()
 | |
|         # ensure the server dies
 | |
|         tcpserver.abort = 1
 | |
|         for thread in threads:
 | |
|             thread.join(2.0)
 | |
|         banner("logrecv output", "begin")
 | |
|         sys.stdout.write(sockOut.getvalue())
 | |
|         sockOut.close()
 | |
|         sockLogger.removeHandler(sockhdlr)
 | |
|         sockhdlr.close()
 | |
|         banner("logrecv output", "end")
 | |
|         sys.stdout.flush()
 | |
|         try:
 | |
|             hdlr.close()
 | |
|         except:
 | |
|             pass
 | |
|         rootLogger.removeHandler(hdlr)
 | |
| 
 | |
| # Set the locale to the platform-dependent default.  I have no idea
 | |
| # why the test does this, but in any case we save the current locale
 | |
| # first and restore it at the end.
 | |
| @run_with_locale('LC_ALL', '')
 | |
| def test_main():
 | |
|     # Save and restore the original root logger level across the tests.
 | |
|     # Otherwise, e.g., if any test using cookielib runs after test_logging,
 | |
|     # cookielib's debug-level logger tries to log messages, leading to
 | |
|     # confusing:
 | |
|     #    No handlers could be found for logger "cookielib"
 | |
|     # output while the tests are running.
 | |
|     root_logger = logging.getLogger("")
 | |
|     original_logging_level = root_logger.getEffectiveLevel()
 | |
|     try:
 | |
|         test_main_inner()
 | |
|     finally:
 | |
|         root_logger.setLevel(original_logging_level)
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     sys.stdout.write("test_logging\n")
 | |
|     test_main()
 |