mirror of
				https://github.com/python/cpython.git
				synced 2025-10-30 21:21:22 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			312 lines
		
	
	
	
		
			9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			312 lines
		
	
	
	
		
			9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """Miscellaneous utility functions useful for dealing with ESIS streams."""
 | |
| 
 | |
| import re
 | |
| 
 | |
| import xml.dom.pulldom
 | |
| 
 | |
| import xml.sax
 | |
| import xml.sax.handler
 | |
| import xml.sax.xmlreader
 | |
| 
 | |
| 
 | |
| _data_match = re.compile(r"[^\\][^\\]*").match
 | |
| 
 | |
| def decode(s):
 | |
|     r = ''
 | |
|     while s:
 | |
|         m = _data_match(s)
 | |
|         if m:
 | |
|             r = r + m.group()
 | |
|             s = s[m.end():]
 | |
|         elif s[1] == "\\":
 | |
|             r = r + "\\"
 | |
|             s = s[2:]
 | |
|         elif s[1] == "n":
 | |
|             r = r + "\n"
 | |
|             s = s[2:]
 | |
|         elif s[1] == "%":
 | |
|             s = s[2:]
 | |
|             n, s = s.split(";", 1)
 | |
|             r = r + unichr(int(n))
 | |
|         else:
 | |
|             raise ValueError, "can't handle " + `s`
 | |
|     return r
 | |
| 
 | |
| 
 | |
| _charmap = {}
 | |
| for c in range(128):
 | |
|     _charmap[chr(c)] = chr(c)
 | |
|     _charmap[unichr(c + 128)] = chr(c + 128)
 | |
| _charmap["\n"] = r"\n"
 | |
| _charmap["\\"] = r"\\"
 | |
| del c
 | |
| 
 | |
| _null_join = ''.join
 | |
| def encode(s):
 | |
|     try:
 | |
|         return _null_join(map(_charmap.get, s))
 | |
|     except TypeError:
 | |
|         raise Exception("could not encode %r: %r" % (s, map(_charmap.get, s)))
 | |
| 
 | |
| 
 | |
| class ESISReader(xml.sax.xmlreader.XMLReader):
 | |
|     """SAX Reader which reads from an ESIS stream.
 | |
| 
 | |
|     No verification of the document structure is performed by the
 | |
|     reader; a general verifier could be used as the target
 | |
|     ContentHandler instance.
 | |
| 
 | |
|     """
 | |
|     _decl_handler = None
 | |
|     _lexical_handler = None
 | |
| 
 | |
|     _public_id = None
 | |
|     _system_id = None
 | |
| 
 | |
|     _buffer = ""
 | |
|     _is_empty = 0
 | |
|     _lineno = 0
 | |
|     _started = 0
 | |
| 
 | |
|     def __init__(self, contentHandler=None, errorHandler=None):
 | |
|         xml.sax.xmlreader.XMLReader.__init__(self)
 | |
|         self._attrs = {}
 | |
|         self._attributes = Attributes(self._attrs)
 | |
|         self._locator = Locator()
 | |
|         self._empties = {}
 | |
|         if contentHandler:
 | |
|             self.setContentHandler(contentHandler)
 | |
|         if errorHandler:
 | |
|             self.setErrorHandler(errorHandler)
 | |
| 
 | |
|     def get_empties(self):
 | |
|         return self._empties.keys()
 | |
| 
 | |
|     #
 | |
|     #  XMLReader interface
 | |
|     #
 | |
| 
 | |
|     def parse(self, source):
 | |
|         raise RuntimeError
 | |
|         self._locator._public_id = source.getPublicId()
 | |
|         self._locator._system_id = source.getSystemId()
 | |
|         fp = source.getByteStream()
 | |
|         handler = self.getContentHandler()
 | |
|         if handler:
 | |
|             handler.startDocument()
 | |
|         lineno = 0
 | |
|         while 1:
 | |
|             token, data = self._get_token(fp)
 | |
|             if token is None:
 | |
|                 break
 | |
|             lineno = lineno + 1
 | |
|             self._locator._lineno = lineno
 | |
|             self._handle_token(token, data)
 | |
|         handler = self.getContentHandler()
 | |
|         if handler:
 | |
|             handler.startDocument()
 | |
| 
 | |
|     def feed(self, data):
 | |
|         if not self._started:
 | |
|             handler = self.getContentHandler()
 | |
|             if handler:
 | |
|                 handler.startDocument()
 | |
|             self._started = 1
 | |
|         data = self._buffer + data
 | |
|         self._buffer = None
 | |
|         lines = data.split("\n")
 | |
|         if lines:
 | |
|             for line in lines[:-1]:
 | |
|                 self._lineno = self._lineno + 1
 | |
|                 self._locator._lineno = self._lineno
 | |
|                 if not line:
 | |
|                     e = xml.sax.SAXParseException(
 | |
|                         "ESIS input line contains no token type mark",
 | |
|                         None, self._locator)
 | |
|                     self.getErrorHandler().error(e)
 | |
|                 else:
 | |
|                     self._handle_token(line[0], line[1:])
 | |
|             self._buffer = lines[-1]
 | |
|         else:
 | |
|             self._buffer = ""
 | |
| 
 | |
|     def close(self):
 | |
|         handler = self.getContentHandler()
 | |
|         if handler:
 | |
|             handler.endDocument()
 | |
|         self._buffer = ""
 | |
| 
 | |
|     def _get_token(self, fp):
 | |
|         try:
 | |
|             line = fp.readline()
 | |
|         except IOError, e:
 | |
|             e = SAXException("I/O error reading input stream", e)
 | |
|             self.getErrorHandler().fatalError(e)
 | |
|             return
 | |
|         if not line:
 | |
|             return None, None
 | |
|         if line[-1] == "\n":
 | |
|             line = line[:-1]
 | |
|         if not line:
 | |
|             e = xml.sax.SAXParseException(
 | |
|                 "ESIS input line contains no token type mark",
 | |
|                 None, self._locator)
 | |
|             self.getErrorHandler().error(e)
 | |
|             return
 | |
|         return line[0], line[1:]
 | |
| 
 | |
|     def _handle_token(self, token, data):
 | |
|         handler = self.getContentHandler()
 | |
|         if token == '-':
 | |
|             if data and handler:
 | |
|                 handler.characters(decode(data))
 | |
|         elif token == ')':
 | |
|             if handler:
 | |
|                 handler.endElement(decode(data))
 | |
|         elif token == '(':
 | |
|             if self._is_empty:
 | |
|                 self._empties[data] = 1
 | |
|             if handler:
 | |
|                 handler.startElement(data, self._attributes)
 | |
|             self._attrs.clear()
 | |
|             self._is_empty = 0
 | |
|         elif token == 'A':
 | |
|             name, value = data.split(' ', 1)
 | |
|             if value != "IMPLIED":
 | |
|                 type, value = value.split(' ', 1)
 | |
|                 self._attrs[name] = (decode(value), type)
 | |
|         elif token == '&':
 | |
|             # entity reference in SAX?
 | |
|             pass
 | |
|         elif token == '?':
 | |
|             if handler:
 | |
|                 if ' ' in data:
 | |
|                     target, data = data.split(None, 1)
 | |
|                 else:
 | |
|                     target, data = data, ""
 | |
|                 handler.processingInstruction(target, decode(data))
 | |
|         elif token == 'N':
 | |
|             handler = self.getDTDHandler()
 | |
|             if handler:
 | |
|                 handler.notationDecl(data, self._public_id, self._system_id)
 | |
|             self._public_id = None
 | |
|             self._system_id = None
 | |
|         elif token == 'p':
 | |
|             self._public_id = decode(data)
 | |
|         elif token == 's':
 | |
|             self._system_id = decode(data)
 | |
|         elif token == 'e':
 | |
|             self._is_empty = 1
 | |
|         elif token == 'C':
 | |
|             pass
 | |
|         else:
 | |
|             e = SAXParseException("unknown ESIS token in event stream",
 | |
|                                   None, self._locator)
 | |
|             self.getErrorHandler().error(e)
 | |
| 
 | |
|     def setContentHandler(self, handler):
 | |
|         old = self.getContentHandler()
 | |
|         if old:
 | |
|             old.setDocumentLocator(None)
 | |
|         if handler:
 | |
|             handler.setDocumentLocator(self._locator)
 | |
|         xml.sax.xmlreader.XMLReader.setContentHandler(self, handler)
 | |
| 
 | |
|     def getProperty(self, property):
 | |
|         if property == xml.sax.handler.property_lexical_handler:
 | |
|             return self._lexical_handler
 | |
| 
 | |
|         elif property == xml.sax.handler.property_declaration_handler:
 | |
|             return self._decl_handler
 | |
| 
 | |
|         else:
 | |
|             raise xml.sax.SAXNotRecognizedException("unknown property %s"
 | |
|                                                     % `property`)
 | |
| 
 | |
|     def setProperty(self, property, value):
 | |
|         if property == xml.sax.handler.property_lexical_handler:
 | |
|             if self._lexical_handler:
 | |
|                 self._lexical_handler.setDocumentLocator(None)
 | |
|             if value:
 | |
|                 value.setDocumentLocator(self._locator)
 | |
|             self._lexical_handler = value
 | |
| 
 | |
|         elif property == xml.sax.handler.property_declaration_handler:
 | |
|             if self._decl_handler:
 | |
|                 self._decl_handler.setDocumentLocator(None)
 | |
|             if value:
 | |
|                 value.setDocumentLocator(self._locator)
 | |
|             self._decl_handler = value
 | |
| 
 | |
|         else:
 | |
|             raise xml.sax.SAXNotRecognizedException()
 | |
| 
 | |
|     def getFeature(self, feature):
 | |
|         if feature == xml.sax.handler.feature_namespaces:
 | |
|             return 1
 | |
|         else:
 | |
|             return xml.sax.xmlreader.XMLReader.getFeature(self, feature)
 | |
| 
 | |
|     def setFeature(self, feature, enabled):
 | |
|         if feature == xml.sax.handler.feature_namespaces:
 | |
|             pass
 | |
|         else:
 | |
|             xml.sax.xmlreader.XMLReader.setFeature(self, feature, enabled)
 | |
| 
 | |
| 
 | |
| class Attributes(xml.sax.xmlreader.AttributesImpl):
 | |
|     # self._attrs has the form {name: (value, type)}
 | |
| 
 | |
|     def getType(self, name):
 | |
|         return self._attrs[name][1]
 | |
| 
 | |
|     def getValue(self, name):
 | |
|         return self._attrs[name][0]
 | |
| 
 | |
|     def getValueByQName(self, name):
 | |
|         return self._attrs[name][0]
 | |
| 
 | |
|     def __getitem__(self, name):
 | |
|         return self._attrs[name][0]
 | |
| 
 | |
|     def get(self, name, default=None):
 | |
|         if self._attrs.has_key(name):
 | |
|             return self._attrs[name][0]
 | |
|         return default
 | |
| 
 | |
|     def items(self):
 | |
|         L = []
 | |
|         for name, (value, type) in self._attrs.items():
 | |
|             L.append((name, value))
 | |
|         return L
 | |
| 
 | |
|     def values(self):
 | |
|         L = []
 | |
|         for value, type in self._attrs.values():
 | |
|             L.append(value)
 | |
|         return L
 | |
| 
 | |
| 
 | |
| class Locator(xml.sax.xmlreader.Locator):
 | |
|     _lineno = -1
 | |
|     _public_id = None
 | |
|     _system_id = None
 | |
| 
 | |
|     def getLineNumber(self):
 | |
|         return self._lineno
 | |
| 
 | |
|     def getPublicId(self):
 | |
|         return self._public_id
 | |
| 
 | |
|     def getSystemId(self):
 | |
|         return self._system_id
 | |
| 
 | |
| 
 | |
| def parse(stream_or_string, parser=None):
 | |
|     if type(stream_or_string) in [type(""), type(u"")]:
 | |
|         stream = open(stream_or_string)
 | |
|     else:
 | |
|         stream = stream_or_string
 | |
|     if not parser:
 | |
|         parser = ESISReader()
 | |
|     return xml.dom.pulldom.DOMEventStream(stream, parser, (2 ** 14) - 20)
 | 
