mirror of
https://github.com/python/cpython.git
synced 2025-10-31 21:51:50 +00:00
GH-84850: Remove urllib.request.URLopener and FancyURLopener (#125739)
This commit is contained in:
parent
a99dd23c1f
commit
4d771977b1
7 changed files with 44 additions and 987 deletions
|
|
@ -7,11 +7,9 @@
|
|||
import email.message
|
||||
import io
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
from test import support
|
||||
from test.support import os_helper
|
||||
from test.support import socket_helper
|
||||
from test.support import warnings_helper
|
||||
import os
|
||||
try:
|
||||
import ssl
|
||||
|
|
@ -20,7 +18,6 @@
|
|||
import sys
|
||||
import tempfile
|
||||
|
||||
from base64 import b64encode
|
||||
import collections
|
||||
|
||||
|
||||
|
|
@ -35,32 +32,6 @@ def hexescape(char):
|
|||
hex_repr = "0%s" % hex_repr
|
||||
return "%" + hex_repr
|
||||
|
||||
# Shortcut for testing FancyURLopener
|
||||
_urlopener = None
|
||||
|
||||
|
||||
def urlopen(url, data=None, proxies=None):
|
||||
"""urlopen(url [, data]) -> open file-like object"""
|
||||
global _urlopener
|
||||
if proxies is not None:
|
||||
opener = urllib.request.FancyURLopener(proxies=proxies)
|
||||
elif not _urlopener:
|
||||
opener = FancyURLopener()
|
||||
_urlopener = opener
|
||||
else:
|
||||
opener = _urlopener
|
||||
if data is None:
|
||||
return opener.open(url)
|
||||
else:
|
||||
return opener.open(url, data)
|
||||
|
||||
|
||||
def FancyURLopener():
|
||||
with warnings_helper.check_warnings(
|
||||
('FancyURLopener style of invoking requests is deprecated.',
|
||||
DeprecationWarning)):
|
||||
return urllib.request.FancyURLopener()
|
||||
|
||||
|
||||
def fakehttp(fakedata, mock_close=False):
|
||||
class FakeSocket(io.BytesIO):
|
||||
|
|
@ -119,26 +90,6 @@ def unfakehttp(self):
|
|||
http.client.HTTPConnection = self._connection_class
|
||||
|
||||
|
||||
class FakeFTPMixin(object):
|
||||
def fakeftp(self):
|
||||
class FakeFtpWrapper(object):
|
||||
def __init__(self, user, passwd, host, port, dirs, timeout=None,
|
||||
persistent=True):
|
||||
pass
|
||||
|
||||
def retrfile(self, file, type):
|
||||
return io.BytesIO(), 0
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
self._ftpwrapper_class = urllib.request.ftpwrapper
|
||||
urllib.request.ftpwrapper = FakeFtpWrapper
|
||||
|
||||
def unfakeftp(self):
|
||||
urllib.request.ftpwrapper = self._ftpwrapper_class
|
||||
|
||||
|
||||
class urlopen_FileTests(unittest.TestCase):
|
||||
"""Test urlopen() opening a temporary file.
|
||||
|
||||
|
|
@ -158,7 +109,7 @@ def setUp(self):
|
|||
f.close()
|
||||
self.pathname = os_helper.TESTFN
|
||||
self.quoted_pathname = urllib.parse.quote(self.pathname)
|
||||
self.returned_obj = urlopen("file:%s" % self.quoted_pathname)
|
||||
self.returned_obj = urllib.request.urlopen("file:%s" % self.quoted_pathname)
|
||||
|
||||
def tearDown(self):
|
||||
"""Shut down the open object"""
|
||||
|
|
@ -205,7 +156,7 @@ def test_headers(self):
|
|||
self.assertIsInstance(self.returned_obj.headers, email.message.Message)
|
||||
|
||||
def test_url(self):
|
||||
self.assertEqual(self.returned_obj.url, self.quoted_pathname)
|
||||
self.assertEqual(self.returned_obj.url, "file://" + self.quoted_pathname)
|
||||
|
||||
def test_status(self):
|
||||
self.assertIsNone(self.returned_obj.status)
|
||||
|
|
@ -214,7 +165,7 @@ def test_info(self):
|
|||
self.assertIsInstance(self.returned_obj.info(), email.message.Message)
|
||||
|
||||
def test_geturl(self):
|
||||
self.assertEqual(self.returned_obj.geturl(), self.quoted_pathname)
|
||||
self.assertEqual(self.returned_obj.geturl(), "file://" + self.quoted_pathname)
|
||||
|
||||
def test_getcode(self):
|
||||
self.assertIsNone(self.returned_obj.getcode())
|
||||
|
|
@ -339,13 +290,13 @@ def test_getproxies_environment_prefer_lowercase(self):
|
|||
self.assertEqual('http://somewhere:3128', proxies['http'])
|
||||
|
||||
|
||||
class urlopen_HttpTests(unittest.TestCase, FakeHTTPMixin, FakeFTPMixin):
|
||||
class urlopen_HttpTests(unittest.TestCase, FakeHTTPMixin):
|
||||
"""Test urlopen() opening a fake http connection."""
|
||||
|
||||
def check_read(self, ver):
|
||||
self.fakehttp(b"HTTP/" + ver + b" 200 OK\r\n\r\nHello!")
|
||||
try:
|
||||
fp = urlopen("http://python.org/")
|
||||
fp = urllib.request.urlopen("http://python.org/")
|
||||
self.assertEqual(fp.readline(), b"Hello!")
|
||||
self.assertEqual(fp.readline(), b"")
|
||||
self.assertEqual(fp.geturl(), 'http://python.org/')
|
||||
|
|
@ -366,8 +317,8 @@ def test_url_fragment(self):
|
|||
def test_willclose(self):
|
||||
self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello!")
|
||||
try:
|
||||
resp = urlopen("http://www.python.org")
|
||||
self.assertTrue(resp.fp.will_close)
|
||||
resp = urllib.request.urlopen("http://www.python.org")
|
||||
self.assertTrue(resp.will_close)
|
||||
finally:
|
||||
self.unfakehttp()
|
||||
|
||||
|
|
@ -392,9 +343,6 @@ def test_url_path_with_control_char_rejected(self):
|
|||
with self.assertRaisesRegex(
|
||||
InvalidURL, f"contain control.*{escaped_char_repr}"):
|
||||
urllib.request.urlopen(f"https:{schemeless_url}")
|
||||
# This code path quotes the URL so there is no injection.
|
||||
resp = urlopen(f"http:{schemeless_url}")
|
||||
self.assertNotIn(char, resp.geturl())
|
||||
finally:
|
||||
self.unfakehttp()
|
||||
|
||||
|
|
@ -416,11 +364,6 @@ def test_url_path_with_newline_header_injection_rejected(self):
|
|||
urllib.request.urlopen(f"http:{schemeless_url}")
|
||||
with self.assertRaisesRegex(InvalidURL, r"contain control.*\\n"):
|
||||
urllib.request.urlopen(f"https:{schemeless_url}")
|
||||
# This code path quotes the URL so there is no injection.
|
||||
resp = urlopen(f"http:{schemeless_url}")
|
||||
self.assertNotIn(' ', resp.geturl())
|
||||
self.assertNotIn('\r', resp.geturl())
|
||||
self.assertNotIn('\n', resp.geturl())
|
||||
finally:
|
||||
self.unfakehttp()
|
||||
|
||||
|
|
@ -435,9 +378,9 @@ def test_url_host_with_control_char_rejected(self):
|
|||
InvalidURL = http.client.InvalidURL
|
||||
with self.assertRaisesRegex(
|
||||
InvalidURL, f"contain control.*{escaped_char_repr}"):
|
||||
urlopen(f"http:{schemeless_url}")
|
||||
urllib.request.urlopen(f"http:{schemeless_url}")
|
||||
with self.assertRaisesRegex(InvalidURL, f"contain control.*{escaped_char_repr}"):
|
||||
urlopen(f"https:{schemeless_url}")
|
||||
urllib.request.urlopen(f"https:{schemeless_url}")
|
||||
finally:
|
||||
self.unfakehttp()
|
||||
|
||||
|
|
@ -450,9 +393,9 @@ def test_url_host_with_newline_header_injection_rejected(self):
|
|||
InvalidURL = http.client.InvalidURL
|
||||
with self.assertRaisesRegex(
|
||||
InvalidURL, r"contain control.*\\r"):
|
||||
urlopen(f"http:{schemeless_url}")
|
||||
urllib.request.urlopen(f"http:{schemeless_url}")
|
||||
with self.assertRaisesRegex(InvalidURL, r"contain control.*\\n"):
|
||||
urlopen(f"https:{schemeless_url}")
|
||||
urllib.request.urlopen(f"https:{schemeless_url}")
|
||||
finally:
|
||||
self.unfakehttp()
|
||||
|
||||
|
|
@ -476,7 +419,7 @@ def test_read_bogus(self):
|
|||
Content-Type: text/html; charset=iso-8859-1
|
||||
''', mock_close=True)
|
||||
try:
|
||||
self.assertRaises(OSError, urlopen, "http://python.org/")
|
||||
self.assertRaises(OSError, urllib.request.urlopen, "http://python.org/")
|
||||
finally:
|
||||
self.unfakehttp()
|
||||
|
||||
|
|
@ -492,20 +435,20 @@ def test_invalid_redirect(self):
|
|||
try:
|
||||
msg = "Redirection to url 'file:"
|
||||
with self.assertRaisesRegex(urllib.error.HTTPError, msg):
|
||||
urlopen("http://python.org/")
|
||||
urllib.request.urlopen("http://python.org/")
|
||||
finally:
|
||||
self.unfakehttp()
|
||||
|
||||
def test_redirect_limit_independent(self):
|
||||
# Ticket #12923: make sure independent requests each use their
|
||||
# own retry limit.
|
||||
for i in range(FancyURLopener().maxtries):
|
||||
for i in range(urllib.request.HTTPRedirectHandler.max_redirections):
|
||||
self.fakehttp(b'''HTTP/1.1 302 Found
|
||||
Location: file://guidocomputer.athome.com:/python/license
|
||||
Connection: close
|
||||
''', mock_close=True)
|
||||
try:
|
||||
self.assertRaises(urllib.error.HTTPError, urlopen,
|
||||
self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen,
|
||||
"http://something")
|
||||
finally:
|
||||
self.unfakehttp()
|
||||
|
|
@ -515,14 +458,14 @@ def test_empty_socket(self):
|
|||
# data. (#1680230)
|
||||
self.fakehttp(b'')
|
||||
try:
|
||||
self.assertRaises(OSError, urlopen, "http://something")
|
||||
self.assertRaises(OSError, urllib.request.urlopen, "http://something")
|
||||
finally:
|
||||
self.unfakehttp()
|
||||
|
||||
def test_missing_localfile(self):
|
||||
# Test for #10836
|
||||
with self.assertRaises(urllib.error.URLError) as e:
|
||||
urlopen('file://localhost/a/file/which/doesnot/exists.py')
|
||||
urllib.request.urlopen('file://localhost/a/file/which/doesnot/exists.py')
|
||||
self.assertTrue(e.exception.filename)
|
||||
self.assertTrue(e.exception.reason)
|
||||
|
||||
|
|
@ -531,71 +474,28 @@ def test_file_notexists(self):
|
|||
tmp_fileurl = 'file://localhost/' + tmp_file.replace(os.path.sep, '/')
|
||||
try:
|
||||
self.assertTrue(os.path.exists(tmp_file))
|
||||
with urlopen(tmp_fileurl) as fobj:
|
||||
with urllib.request.urlopen(tmp_fileurl) as fobj:
|
||||
self.assertTrue(fobj)
|
||||
finally:
|
||||
os.close(fd)
|
||||
os.unlink(tmp_file)
|
||||
self.assertFalse(os.path.exists(tmp_file))
|
||||
with self.assertRaises(urllib.error.URLError):
|
||||
urlopen(tmp_fileurl)
|
||||
urllib.request.urlopen(tmp_fileurl)
|
||||
|
||||
def test_ftp_nohost(self):
|
||||
test_ftp_url = 'ftp:///path'
|
||||
with self.assertRaises(urllib.error.URLError) as e:
|
||||
urlopen(test_ftp_url)
|
||||
urllib.request.urlopen(test_ftp_url)
|
||||
self.assertFalse(e.exception.filename)
|
||||
self.assertTrue(e.exception.reason)
|
||||
|
||||
def test_ftp_nonexisting(self):
|
||||
with self.assertRaises(urllib.error.URLError) as e:
|
||||
urlopen('ftp://localhost/a/file/which/doesnot/exists.py')
|
||||
urllib.request.urlopen('ftp://localhost/a/file/which/doesnot/exists.py')
|
||||
self.assertFalse(e.exception.filename)
|
||||
self.assertTrue(e.exception.reason)
|
||||
|
||||
@patch.object(urllib.request, 'MAXFTPCACHE', 0)
|
||||
def test_ftp_cache_pruning(self):
|
||||
self.fakeftp()
|
||||
try:
|
||||
urllib.request.ftpcache['test'] = urllib.request.ftpwrapper('user', 'pass', 'localhost', 21, [])
|
||||
urlopen('ftp://localhost')
|
||||
finally:
|
||||
self.unfakeftp()
|
||||
|
||||
def test_userpass_inurl(self):
|
||||
self.fakehttp(b"HTTP/1.0 200 OK\r\n\r\nHello!")
|
||||
try:
|
||||
fp = urlopen("http://user:pass@python.org/")
|
||||
self.assertEqual(fp.readline(), b"Hello!")
|
||||
self.assertEqual(fp.readline(), b"")
|
||||
self.assertEqual(fp.geturl(), 'http://user:pass@python.org/')
|
||||
self.assertEqual(fp.getcode(), 200)
|
||||
finally:
|
||||
self.unfakehttp()
|
||||
|
||||
def test_userpass_inurl_w_spaces(self):
|
||||
self.fakehttp(b"HTTP/1.0 200 OK\r\n\r\nHello!")
|
||||
try:
|
||||
userpass = "a b:c d"
|
||||
url = "http://{}@python.org/".format(userpass)
|
||||
fakehttp_wrapper = http.client.HTTPConnection
|
||||
authorization = ("Authorization: Basic %s\r\n" %
|
||||
b64encode(userpass.encode("ASCII")).decode("ASCII"))
|
||||
fp = urlopen(url)
|
||||
# The authorization header must be in place
|
||||
self.assertIn(authorization, fakehttp_wrapper.buf.decode("UTF-8"))
|
||||
self.assertEqual(fp.readline(), b"Hello!")
|
||||
self.assertEqual(fp.readline(), b"")
|
||||
# the spaces are quoted in URL so no match
|
||||
self.assertNotEqual(fp.geturl(), url)
|
||||
self.assertEqual(fp.getcode(), 200)
|
||||
finally:
|
||||
self.unfakehttp()
|
||||
|
||||
def test_URLopener_deprecation(self):
|
||||
with warnings_helper.check_warnings(('',DeprecationWarning)):
|
||||
urllib.request.URLopener()
|
||||
|
||||
|
||||
class urlopen_DataTests(unittest.TestCase):
|
||||
"""Test urlopen() opening a data URL."""
|
||||
|
|
@ -1620,56 +1520,6 @@ def test_thishost(self):
|
|||
self.assertIsInstance(urllib.request.thishost(), tuple)
|
||||
|
||||
|
||||
class URLopener_Tests(FakeHTTPMixin, unittest.TestCase):
|
||||
"""Testcase to test the open method of URLopener class."""
|
||||
|
||||
def test_quoted_open(self):
|
||||
class DummyURLopener(urllib.request.URLopener):
|
||||
def open_spam(self, url):
|
||||
return url
|
||||
with warnings_helper.check_warnings(
|
||||
('DummyURLopener style of invoking requests is deprecated.',
|
||||
DeprecationWarning)):
|
||||
self.assertEqual(DummyURLopener().open(
|
||||
'spam://example/ /'),'//example/%20/')
|
||||
|
||||
# test the safe characters are not quoted by urlopen
|
||||
self.assertEqual(DummyURLopener().open(
|
||||
"spam://c:|windows%/:=&?~#+!$,;'@()*[]|/path/"),
|
||||
"//c:|windows%/:=&?~#+!$,;'@()*[]|/path/")
|
||||
|
||||
@warnings_helper.ignore_warnings(category=DeprecationWarning)
|
||||
def test_urlopener_retrieve_file(self):
|
||||
with os_helper.temp_dir() as tmpdir:
|
||||
fd, tmpfile = tempfile.mkstemp(dir=tmpdir)
|
||||
os.close(fd)
|
||||
fileurl = "file:" + urllib.request.pathname2url(tmpfile)
|
||||
filename, _ = urllib.request.URLopener().retrieve(fileurl)
|
||||
# Some buildbots have TEMP folder that uses a lowercase drive letter.
|
||||
self.assertEqual(os.path.normcase(filename), os.path.normcase(tmpfile))
|
||||
|
||||
@warnings_helper.ignore_warnings(category=DeprecationWarning)
|
||||
def test_urlopener_retrieve_remote(self):
|
||||
url = "http://www.python.org/file.txt"
|
||||
self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello!")
|
||||
self.addCleanup(self.unfakehttp)
|
||||
filename, _ = urllib.request.URLopener().retrieve(url)
|
||||
self.assertEqual(os.path.splitext(filename)[1], ".txt")
|
||||
|
||||
@warnings_helper.ignore_warnings(category=DeprecationWarning)
|
||||
def test_local_file_open(self):
|
||||
# bpo-35907, CVE-2019-9948: urllib must reject local_file:// scheme
|
||||
class DummyURLopener(urllib.request.URLopener):
|
||||
def open_local_file(self, url):
|
||||
return url
|
||||
for url in ('local_file://example', 'local-file://example'):
|
||||
self.assertRaises(OSError, urllib.request.urlopen, url)
|
||||
self.assertRaises(OSError, urllib.request.URLopener().open, url)
|
||||
self.assertRaises(OSError, urllib.request.URLopener().retrieve, url)
|
||||
self.assertRaises(OSError, DummyURLopener().open, url)
|
||||
self.assertRaises(OSError, DummyURLopener().retrieve, url)
|
||||
|
||||
|
||||
class RequestTests(unittest.TestCase):
|
||||
"""Unit tests for urllib.request.Request."""
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue