mirror of
				https://github.com/python/cpython.git
				synced 2025-10-26 03:04:41 +00:00 
			
		
		
		
	 303aac8c56
			
		
	
	
		303aac8c56
		
			
		
	
	
	
	
		
			
			I am re-submitting an older PR which was abandoned but is still relevant, #10783 by @timb07. The issue being solved () is still relevant. The original PR #10783 was closed as the final request changes were not applied and since abandoned. In this new PR I have re-used the original patch plus applied both comments from the review, by @maxking and @pganssle. For reference, here is the original PR description: In email.utils.parsedate_to_datetime(), a failure to parse the date, or invalid date components (such as hour outside 0..23) raises an exception. Document this behaviour, and add tests to test_email/test_utils.py to confirm this behaviour. In email.headerregistry.DateHeader.parse(), check when parsedate_to_datetime() raises an exception and add a new defect InvalidDateDefect; preserve the invalid value as the string value of the header, but set the datetime attribute to None. Add tests to test_email/test_headerregistry.py to confirm this behaviour; also added test to test_email/test_inversion.py to confirm emails with such defective date headers round trip successfully. This pull request incorporates feedback gratefully received from @bitdancer, @brettcannon, @Mariatta and @warsaw, and replaces the earlier PR #2254. Automerge-Triggered-By: GH:warsaw
		
			
				
	
	
		
			549 lines
		
	
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			549 lines
		
	
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # Copyright (C) 2002-2007 Python Software Foundation
 | |
| # Contact: email-sig@python.org
 | |
| 
 | |
| """Email address parsing code.
 | |
| 
 | |
| Lifted directly from rfc822.py.  This should eventually be rewritten.
 | |
| """
 | |
| 
 | |
| __all__ = [
 | |
|     'mktime_tz',
 | |
|     'parsedate',
 | |
|     'parsedate_tz',
 | |
|     'quote',
 | |
|     ]
 | |
| 
 | |
| import time, calendar
 | |
| 
 | |
| SPACE = ' '
 | |
| EMPTYSTRING = ''
 | |
| COMMASPACE = ', '
 | |
| 
 | |
| # Parse a date field
 | |
| _monthnames = ['jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul',
 | |
|                'aug', 'sep', 'oct', 'nov', 'dec',
 | |
|                'january', 'february', 'march', 'april', 'may', 'june', 'july',
 | |
|                'august', 'september', 'october', 'november', 'december']
 | |
| 
 | |
| _daynames = ['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun']
 | |
| 
 | |
| # The timezone table does not include the military time zones defined
 | |
| # in RFC822, other than Z.  According to RFC1123, the description in
 | |
| # RFC822 gets the signs wrong, so we can't rely on any such time
 | |
| # zones.  RFC1123 recommends that numeric timezone indicators be used
 | |
| # instead of timezone names.
 | |
| 
 | |
| _timezones = {'UT':0, 'UTC':0, 'GMT':0, 'Z':0,
 | |
|               'AST': -400, 'ADT': -300,  # Atlantic (used in Canada)
 | |
|               'EST': -500, 'EDT': -400,  # Eastern
 | |
|               'CST': -600, 'CDT': -500,  # Central
 | |
|               'MST': -700, 'MDT': -600,  # Mountain
 | |
|               'PST': -800, 'PDT': -700   # Pacific
 | |
|               }
 | |
| 
 | |
| 
 | |
| def parsedate_tz(data):
 | |
|     """Convert a date string to a time tuple.
 | |
| 
 | |
|     Accounts for military timezones.
 | |
|     """
 | |
|     res = _parsedate_tz(data)
 | |
|     if not res:
 | |
|         return
 | |
|     if res[9] is None:
 | |
|         res[9] = 0
 | |
|     return tuple(res)
 | |
| 
 | |
| def _parsedate_tz(data):
 | |
|     """Convert date to extended time tuple.
 | |
| 
 | |
|     The last (additional) element is the time zone offset in seconds, except if
 | |
|     the timezone was specified as -0000.  In that case the last element is
 | |
|     None.  This indicates a UTC timestamp that explicitly declaims knowledge of
 | |
|     the source timezone, as opposed to a +0000 timestamp that indicates the
 | |
|     source timezone really was UTC.
 | |
| 
 | |
|     """
 | |
|     if not data:
 | |
|         return None
 | |
|     data = data.split()
 | |
|     # The FWS after the comma after the day-of-week is optional, so search and
 | |
|     # adjust for this.
 | |
|     if data[0].endswith(',') or data[0].lower() in _daynames:
 | |
|         # There's a dayname here. Skip it
 | |
|         del data[0]
 | |
|     else:
 | |
|         i = data[0].rfind(',')
 | |
|         if i >= 0:
 | |
|             data[0] = data[0][i+1:]
 | |
|     if len(data) == 3: # RFC 850 date, deprecated
 | |
|         stuff = data[0].split('-')
 | |
|         if len(stuff) == 3:
 | |
|             data = stuff + data[1:]
 | |
|     if len(data) == 4:
 | |
|         s = data[3]
 | |
|         i = s.find('+')
 | |
|         if i == -1:
 | |
|             i = s.find('-')
 | |
|         if i > 0:
 | |
|             data[3:] = [s[:i], s[i:]]
 | |
|         else:
 | |
|             data.append('') # Dummy tz
 | |
|     if len(data) < 5:
 | |
|         return None
 | |
|     data = data[:5]
 | |
|     [dd, mm, yy, tm, tz] = data
 | |
|     mm = mm.lower()
 | |
|     if mm not in _monthnames:
 | |
|         dd, mm = mm, dd.lower()
 | |
|         if mm not in _monthnames:
 | |
|             return None
 | |
|     mm = _monthnames.index(mm) + 1
 | |
|     if mm > 12:
 | |
|         mm -= 12
 | |
|     if dd[-1] == ',':
 | |
|         dd = dd[:-1]
 | |
|     i = yy.find(':')
 | |
|     if i > 0:
 | |
|         yy, tm = tm, yy
 | |
|     if yy[-1] == ',':
 | |
|         yy = yy[:-1]
 | |
|     if not yy[0].isdigit():
 | |
|         yy, tz = tz, yy
 | |
|     if tm[-1] == ',':
 | |
|         tm = tm[:-1]
 | |
|     tm = tm.split(':')
 | |
|     if len(tm) == 2:
 | |
|         [thh, tmm] = tm
 | |
|         tss = '0'
 | |
|     elif len(tm) == 3:
 | |
|         [thh, tmm, tss] = tm
 | |
|     elif len(tm) == 1 and '.' in tm[0]:
 | |
|         # Some non-compliant MUAs use '.' to separate time elements.
 | |
|         tm = tm[0].split('.')
 | |
|         if len(tm) == 2:
 | |
|             [thh, tmm] = tm
 | |
|             tss = 0
 | |
|         elif len(tm) == 3:
 | |
|             [thh, tmm, tss] = tm
 | |
|     else:
 | |
|         return None
 | |
|     try:
 | |
|         yy = int(yy)
 | |
|         dd = int(dd)
 | |
|         thh = int(thh)
 | |
|         tmm = int(tmm)
 | |
|         tss = int(tss)
 | |
|     except ValueError:
 | |
|         return None
 | |
|     # Check for a yy specified in two-digit format, then convert it to the
 | |
|     # appropriate four-digit format, according to the POSIX standard. RFC 822
 | |
|     # calls for a two-digit yy, but RFC 2822 (which obsoletes RFC 822)
 | |
|     # mandates a 4-digit yy. For more information, see the documentation for
 | |
|     # the time module.
 | |
|     if yy < 100:
 | |
|         # The year is between 1969 and 1999 (inclusive).
 | |
|         if yy > 68:
 | |
|             yy += 1900
 | |
|         # The year is between 2000 and 2068 (inclusive).
 | |
|         else:
 | |
|             yy += 2000
 | |
|     tzoffset = None
 | |
|     tz = tz.upper()
 | |
|     if tz in _timezones:
 | |
|         tzoffset = _timezones[tz]
 | |
|     else:
 | |
|         try:
 | |
|             tzoffset = int(tz)
 | |
|         except ValueError:
 | |
|             pass
 | |
|         if tzoffset==0 and tz.startswith('-'):
 | |
|             tzoffset = None
 | |
|     # Convert a timezone offset into seconds ; -0500 -> -18000
 | |
|     if tzoffset:
 | |
|         if tzoffset < 0:
 | |
|             tzsign = -1
 | |
|             tzoffset = -tzoffset
 | |
|         else:
 | |
|             tzsign = 1
 | |
|         tzoffset = tzsign * ( (tzoffset//100)*3600 + (tzoffset % 100)*60)
 | |
|     # Daylight Saving Time flag is set to -1, since DST is unknown.
 | |
|     return [yy, mm, dd, thh, tmm, tss, 0, 1, -1, tzoffset]
 | |
| 
 | |
| 
 | |
| def parsedate(data):
 | |
|     """Convert a time string to a time tuple."""
 | |
|     t = parsedate_tz(data)
 | |
|     if isinstance(t, tuple):
 | |
|         return t[:9]
 | |
|     else:
 | |
|         return t
 | |
| 
 | |
| 
 | |
| def mktime_tz(data):
 | |
|     """Turn a 10-tuple as returned by parsedate_tz() into a POSIX timestamp."""
 | |
|     if data[9] is None:
 | |
|         # No zone info, so localtime is better assumption than GMT
 | |
|         return time.mktime(data[:8] + (-1,))
 | |
|     else:
 | |
|         t = calendar.timegm(data)
 | |
|         return t - data[9]
 | |
| 
 | |
| 
 | |
| def quote(str):
 | |
|     """Prepare string to be used in a quoted string.
 | |
| 
 | |
|     Turns backslash and double quote characters into quoted pairs.  These
 | |
|     are the only characters that need to be quoted inside a quoted string.
 | |
|     Does not add the surrounding double quotes.
 | |
|     """
 | |
|     return str.replace('\\', '\\\\').replace('"', '\\"')
 | |
| 
 | |
| 
 | |
| class AddrlistClass:
 | |
|     """Address parser class by Ben Escoto.
 | |
| 
 | |
|     To understand what this class does, it helps to have a copy of RFC 2822 in
 | |
|     front of you.
 | |
| 
 | |
|     Note: this class interface is deprecated and may be removed in the future.
 | |
|     Use email.utils.AddressList instead.
 | |
|     """
 | |
| 
 | |
|     def __init__(self, field):
 | |
|         """Initialize a new instance.
 | |
| 
 | |
|         `field' is an unparsed address header field, containing
 | |
|         one or more addresses.
 | |
|         """
 | |
|         self.specials = '()<>@,:;.\"[]'
 | |
|         self.pos = 0
 | |
|         self.LWS = ' \t'
 | |
|         self.CR = '\r\n'
 | |
|         self.FWS = self.LWS + self.CR
 | |
|         self.atomends = self.specials + self.LWS + self.CR
 | |
|         # Note that RFC 2822 now specifies `.' as obs-phrase, meaning that it
 | |
|         # is obsolete syntax.  RFC 2822 requires that we recognize obsolete
 | |
|         # syntax, so allow dots in phrases.
 | |
|         self.phraseends = self.atomends.replace('.', '')
 | |
|         self.field = field
 | |
|         self.commentlist = []
 | |
| 
 | |
|     def gotonext(self):
 | |
|         """Skip white space and extract comments."""
 | |
|         wslist = []
 | |
|         while self.pos < len(self.field):
 | |
|             if self.field[self.pos] in self.LWS + '\n\r':
 | |
|                 if self.field[self.pos] not in '\n\r':
 | |
|                     wslist.append(self.field[self.pos])
 | |
|                 self.pos += 1
 | |
|             elif self.field[self.pos] == '(':
 | |
|                 self.commentlist.append(self.getcomment())
 | |
|             else:
 | |
|                 break
 | |
|         return EMPTYSTRING.join(wslist)
 | |
| 
 | |
|     def getaddrlist(self):
 | |
|         """Parse all addresses.
 | |
| 
 | |
|         Returns a list containing all of the addresses.
 | |
|         """
 | |
|         result = []
 | |
|         while self.pos < len(self.field):
 | |
|             ad = self.getaddress()
 | |
|             if ad:
 | |
|                 result += ad
 | |
|             else:
 | |
|                 result.append(('', ''))
 | |
|         return result
 | |
| 
 | |
|     def getaddress(self):
 | |
|         """Parse the next address."""
 | |
|         self.commentlist = []
 | |
|         self.gotonext()
 | |
| 
 | |
|         oldpos = self.pos
 | |
|         oldcl = self.commentlist
 | |
|         plist = self.getphraselist()
 | |
| 
 | |
|         self.gotonext()
 | |
|         returnlist = []
 | |
| 
 | |
|         if self.pos >= len(self.field):
 | |
|             # Bad email address technically, no domain.
 | |
|             if plist:
 | |
|                 returnlist = [(SPACE.join(self.commentlist), plist[0])]
 | |
| 
 | |
|         elif self.field[self.pos] in '.@':
 | |
|             # email address is just an addrspec
 | |
|             # this isn't very efficient since we start over
 | |
|             self.pos = oldpos
 | |
|             self.commentlist = oldcl
 | |
|             addrspec = self.getaddrspec()
 | |
|             returnlist = [(SPACE.join(self.commentlist), addrspec)]
 | |
| 
 | |
|         elif self.field[self.pos] == ':':
 | |
|             # address is a group
 | |
|             returnlist = []
 | |
| 
 | |
|             fieldlen = len(self.field)
 | |
|             self.pos += 1
 | |
|             while self.pos < len(self.field):
 | |
|                 self.gotonext()
 | |
|                 if self.pos < fieldlen and self.field[self.pos] == ';':
 | |
|                     self.pos += 1
 | |
|                     break
 | |
|                 returnlist = returnlist + self.getaddress()
 | |
| 
 | |
|         elif self.field[self.pos] == '<':
 | |
|             # Address is a phrase then a route addr
 | |
|             routeaddr = self.getrouteaddr()
 | |
| 
 | |
|             if self.commentlist:
 | |
|                 returnlist = [(SPACE.join(plist) + ' (' +
 | |
|                                ' '.join(self.commentlist) + ')', routeaddr)]
 | |
|             else:
 | |
|                 returnlist = [(SPACE.join(plist), routeaddr)]
 | |
| 
 | |
|         else:
 | |
|             if plist:
 | |
|                 returnlist = [(SPACE.join(self.commentlist), plist[0])]
 | |
|             elif self.field[self.pos] in self.specials:
 | |
|                 self.pos += 1
 | |
| 
 | |
|         self.gotonext()
 | |
|         if self.pos < len(self.field) and self.field[self.pos] == ',':
 | |
|             self.pos += 1
 | |
|         return returnlist
 | |
| 
 | |
|     def getrouteaddr(self):
 | |
|         """Parse a route address (Return-path value).
 | |
| 
 | |
|         This method just skips all the route stuff and returns the addrspec.
 | |
|         """
 | |
|         if self.field[self.pos] != '<':
 | |
|             return
 | |
| 
 | |
|         expectroute = False
 | |
|         self.pos += 1
 | |
|         self.gotonext()
 | |
|         adlist = ''
 | |
|         while self.pos < len(self.field):
 | |
|             if expectroute:
 | |
|                 self.getdomain()
 | |
|                 expectroute = False
 | |
|             elif self.field[self.pos] == '>':
 | |
|                 self.pos += 1
 | |
|                 break
 | |
|             elif self.field[self.pos] == '@':
 | |
|                 self.pos += 1
 | |
|                 expectroute = True
 | |
|             elif self.field[self.pos] == ':':
 | |
|                 self.pos += 1
 | |
|             else:
 | |
|                 adlist = self.getaddrspec()
 | |
|                 self.pos += 1
 | |
|                 break
 | |
|             self.gotonext()
 | |
| 
 | |
|         return adlist
 | |
| 
 | |
|     def getaddrspec(self):
 | |
|         """Parse an RFC 2822 addr-spec."""
 | |
|         aslist = []
 | |
| 
 | |
|         self.gotonext()
 | |
|         while self.pos < len(self.field):
 | |
|             preserve_ws = True
 | |
|             if self.field[self.pos] == '.':
 | |
|                 if aslist and not aslist[-1].strip():
 | |
|                     aslist.pop()
 | |
|                 aslist.append('.')
 | |
|                 self.pos += 1
 | |
|                 preserve_ws = False
 | |
|             elif self.field[self.pos] == '"':
 | |
|                 aslist.append('"%s"' % quote(self.getquote()))
 | |
|             elif self.field[self.pos] in self.atomends:
 | |
|                 if aslist and not aslist[-1].strip():
 | |
|                     aslist.pop()
 | |
|                 break
 | |
|             else:
 | |
|                 aslist.append(self.getatom())
 | |
|             ws = self.gotonext()
 | |
|             if preserve_ws and ws:
 | |
|                 aslist.append(ws)
 | |
| 
 | |
|         if self.pos >= len(self.field) or self.field[self.pos] != '@':
 | |
|             return EMPTYSTRING.join(aslist)
 | |
| 
 | |
|         aslist.append('@')
 | |
|         self.pos += 1
 | |
|         self.gotonext()
 | |
|         domain = self.getdomain()
 | |
|         if not domain:
 | |
|             # Invalid domain, return an empty address instead of returning a
 | |
|             # local part to denote failed parsing.
 | |
|             return EMPTYSTRING
 | |
|         return EMPTYSTRING.join(aslist) + domain
 | |
| 
 | |
|     def getdomain(self):
 | |
|         """Get the complete domain name from an address."""
 | |
|         sdlist = []
 | |
|         while self.pos < len(self.field):
 | |
|             if self.field[self.pos] in self.LWS:
 | |
|                 self.pos += 1
 | |
|             elif self.field[self.pos] == '(':
 | |
|                 self.commentlist.append(self.getcomment())
 | |
|             elif self.field[self.pos] == '[':
 | |
|                 sdlist.append(self.getdomainliteral())
 | |
|             elif self.field[self.pos] == '.':
 | |
|                 self.pos += 1
 | |
|                 sdlist.append('.')
 | |
|             elif self.field[self.pos] == '@':
 | |
|                 # bpo-34155: Don't parse domains with two `@` like
 | |
|                 # `a@malicious.org@important.com`.
 | |
|                 return EMPTYSTRING
 | |
|             elif self.field[self.pos] in self.atomends:
 | |
|                 break
 | |
|             else:
 | |
|                 sdlist.append(self.getatom())
 | |
|         return EMPTYSTRING.join(sdlist)
 | |
| 
 | |
|     def getdelimited(self, beginchar, endchars, allowcomments=True):
 | |
|         """Parse a header fragment delimited by special characters.
 | |
| 
 | |
|         `beginchar' is the start character for the fragment.
 | |
|         If self is not looking at an instance of `beginchar' then
 | |
|         getdelimited returns the empty string.
 | |
| 
 | |
|         `endchars' is a sequence of allowable end-delimiting characters.
 | |
|         Parsing stops when one of these is encountered.
 | |
| 
 | |
|         If `allowcomments' is non-zero, embedded RFC 2822 comments are allowed
 | |
|         within the parsed fragment.
 | |
|         """
 | |
|         if self.field[self.pos] != beginchar:
 | |
|             return ''
 | |
| 
 | |
|         slist = ['']
 | |
|         quote = False
 | |
|         self.pos += 1
 | |
|         while self.pos < len(self.field):
 | |
|             if quote:
 | |
|                 slist.append(self.field[self.pos])
 | |
|                 quote = False
 | |
|             elif self.field[self.pos] in endchars:
 | |
|                 self.pos += 1
 | |
|                 break
 | |
|             elif allowcomments and self.field[self.pos] == '(':
 | |
|                 slist.append(self.getcomment())
 | |
|                 continue        # have already advanced pos from getcomment
 | |
|             elif self.field[self.pos] == '\\':
 | |
|                 quote = True
 | |
|             else:
 | |
|                 slist.append(self.field[self.pos])
 | |
|             self.pos += 1
 | |
| 
 | |
|         return EMPTYSTRING.join(slist)
 | |
| 
 | |
|     def getquote(self):
 | |
|         """Get a quote-delimited fragment from self's field."""
 | |
|         return self.getdelimited('"', '"\r', False)
 | |
| 
 | |
|     def getcomment(self):
 | |
|         """Get a parenthesis-delimited fragment from self's field."""
 | |
|         return self.getdelimited('(', ')\r', True)
 | |
| 
 | |
|     def getdomainliteral(self):
 | |
|         """Parse an RFC 2822 domain-literal."""
 | |
|         return '[%s]' % self.getdelimited('[', ']\r', False)
 | |
| 
 | |
|     def getatom(self, atomends=None):
 | |
|         """Parse an RFC 2822 atom.
 | |
| 
 | |
|         Optional atomends specifies a different set of end token delimiters
 | |
|         (the default is to use self.atomends).  This is used e.g. in
 | |
|         getphraselist() since phrase endings must not include the `.' (which
 | |
|         is legal in phrases)."""
 | |
|         atomlist = ['']
 | |
|         if atomends is None:
 | |
|             atomends = self.atomends
 | |
| 
 | |
|         while self.pos < len(self.field):
 | |
|             if self.field[self.pos] in atomends:
 | |
|                 break
 | |
|             else:
 | |
|                 atomlist.append(self.field[self.pos])
 | |
|             self.pos += 1
 | |
| 
 | |
|         return EMPTYSTRING.join(atomlist)
 | |
| 
 | |
|     def getphraselist(self):
 | |
|         """Parse a sequence of RFC 2822 phrases.
 | |
| 
 | |
|         A phrase is a sequence of words, which are in turn either RFC 2822
 | |
|         atoms or quoted-strings.  Phrases are canonicalized by squeezing all
 | |
|         runs of continuous whitespace into one space.
 | |
|         """
 | |
|         plist = []
 | |
| 
 | |
|         while self.pos < len(self.field):
 | |
|             if self.field[self.pos] in self.FWS:
 | |
|                 self.pos += 1
 | |
|             elif self.field[self.pos] == '"':
 | |
|                 plist.append(self.getquote())
 | |
|             elif self.field[self.pos] == '(':
 | |
|                 self.commentlist.append(self.getcomment())
 | |
|             elif self.field[self.pos] in self.phraseends:
 | |
|                 break
 | |
|             else:
 | |
|                 plist.append(self.getatom(self.phraseends))
 | |
| 
 | |
|         return plist
 | |
| 
 | |
| class AddressList(AddrlistClass):
 | |
|     """An AddressList encapsulates a list of parsed RFC 2822 addresses."""
 | |
|     def __init__(self, field):
 | |
|         AddrlistClass.__init__(self, field)
 | |
|         if field:
 | |
|             self.addresslist = self.getaddrlist()
 | |
|         else:
 | |
|             self.addresslist = []
 | |
| 
 | |
|     def __len__(self):
 | |
|         return len(self.addresslist)
 | |
| 
 | |
|     def __add__(self, other):
 | |
|         # Set union
 | |
|         newaddr = AddressList(None)
 | |
|         newaddr.addresslist = self.addresslist[:]
 | |
|         for x in other.addresslist:
 | |
|             if not x in self.addresslist:
 | |
|                 newaddr.addresslist.append(x)
 | |
|         return newaddr
 | |
| 
 | |
|     def __iadd__(self, other):
 | |
|         # Set union, in-place
 | |
|         for x in other.addresslist:
 | |
|             if not x in self.addresslist:
 | |
|                 self.addresslist.append(x)
 | |
|         return self
 | |
| 
 | |
|     def __sub__(self, other):
 | |
|         # Set difference
 | |
|         newaddr = AddressList(None)
 | |
|         for x in self.addresslist:
 | |
|             if not x in other.addresslist:
 | |
|                 newaddr.addresslist.append(x)
 | |
|         return newaddr
 | |
| 
 | |
|     def __isub__(self, other):
 | |
|         # Set difference, in-place
 | |
|         for x in other.addresslist:
 | |
|             if x in self.addresslist:
 | |
|                 self.addresslist.remove(x)
 | |
|         return self
 | |
| 
 | |
|     def __getitem__(self, index):
 | |
|         # Make indexing, slices, and 'in' work
 | |
|         return self.addresslist[index]
 |