This commit is contained in:
Amuthan Mannar 2025-12-08 06:10:38 +02:00 committed by GitHub
commit 12f5b4d781
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -66,6 +66,26 @@ def header_app(environ, start_response):
]).encode('iso-8859-1')] ]).encode('iso-8859-1')]
def input_app(func_name, *args):
def app(e,s):
req = getattr(e['wsgi.input'], func_name)(*args)
s("200 OK", [("Content-Type", "text/plain; charset=utf-8")])
if type(req) is list:
resp = b";".join(req)
else:
resp = req
return [resp]
return app
def errors_app(func_name, *args):
def app(e,s):
getattr(e['wsgi.errors'], func_name)(*args)
s("200 OK", [("Content-Type", "text/plain; charset=utf-8")])
return [b"data"]
return app
def run_amock(app=hello_app, data=b"GET / HTTP/1.0\n\n"): def run_amock(app=hello_app, data=b"GET / HTTP/1.0\n\n"):
server = make_server("", 80, app, MockServer, MockHandler) server = make_server("", 80, app, MockServer, MockHandler)
inp = BufferedReader(BytesIO(data)) inp = BufferedReader(BytesIO(data))
@ -192,6 +212,94 @@ def bad_app(e,s):
err.splitlines()[-2], "AssertionError" err.splitlines()[-2], "AssertionError"
) )
def test_wsgi_input_read(self):
bad_app = input_app("read")
good_app = input_app("read", 5)
out, err = run_amock(validator(bad_app))
self.assertTrue(out.endswith(
b"A server error occurred. Please contact the administrator."
))
self.assertEqual(
err.splitlines()[-2], "AssertionError"
)
out, err = run_amock(validator(good_app), b"GET / HTTP/1.0\n\nTest 1\nTest 2\n")
self.assertTrue(out.endswith(b"Test "))
def test_wsgi_input_readlines(self):
bad_app = input_app("readlines", 3, 5)
good_app = input_app("readlines", 1)
out, err = run_amock(validator(bad_app))
self.assertTrue(out.endswith(
b"A server error occurred. Please contact the administrator."
))
self.assertEqual(
err.splitlines()[-2], "AssertionError"
)
out, err = run_amock(validator(good_app), b"GET / HTTP/1.0\n\nTest Line 1\nTest Line 2\n")
self.assertTrue(out.endswith(b"Test Line 1\n"))
def test_wsgi_input_readline(self):
bad_app = input_app("readline", 3, 4)
good_app = input_app("readline", 2)
out, err = run_amock(validator(bad_app))
self.assertTrue(out.endswith(
b"A server error occurred. Please contact the administrator."
))
self.assertEqual(
err.splitlines()[-2], "AssertionError"
)
out, err = run_amock(validator(good_app), b"GET / HTTP/1.0\n\nTest 1\nTest 2\n")
self.assertTrue(out.endswith(b"Te"))
def test_wsgi_input_close(self):
app = input_app("close")
out, err = run_amock(validator(app), b"GET / HTTP/1.0\n\nTest 1\nTest 2\n")
self.assertEqual(err.splitlines()[-2], 'AssertionError: input.close() must not be called')
self.assertTrue(out.endswith(b"A server error occurred. Please contact the administrator."))
def test_wsgi_input_iter(self):
def app(e,s):
req = []
for line in e['wsgi.input']:
req.append(line)
s("200 OK", [('Content-Type', 'text/plain; charser=utf-8')])
return [b';'.join(req)]
out, err = run_amock(validator(app), b"GET / HTTP/1.0\n\nTest 1\nTest 2\n")
self.assertTrue(out.endswith(b"Test 1\n;Test 2\n"))
def test_wsgi_errors_write(self):
bad_app = errors_app("write", b"Test")
good_app = errors_app("write", "Test")
out, err = run_amock(validator(bad_app), b"GET / HTTP/1.0\n\n")
self.assertEqual(err.splitlines()[-2], 'AssertionError')
out, err = run_amock(validator(good_app), b"GET / HTTP/1.0\n\n")
self.assertTrue(err.startswith("Test"))
def test_wsgi_errors_writelines(self):
bad_app = errors_app("writelines", [1, "Test"])
good_app = errors_app("writelines", ["Test", "Test"])
out, err = run_amock(validator(bad_app), b"GET / HTTP/1.0\n\n")
self.assertEqual(err.splitlines()[-2], 'AssertionError')
out, err = run_amock(validator(good_app), b"GET / HTTP/1.0\n\n")
self.assertTrue(err.startswith("TestTest"))
def test_wsgi_errors_close(self):
app = errors_app("close")
out, err = run_amock(validator(app), b"GET / HTTP/1.0\n\n")
self.assertTrue(err.splitlines()[-2], 'AssertionError: errors.close() must not be called')
def test_bytes_validation(self): def test_bytes_validation(self):
def app(e, s): def app(e, s):
s("200 OK", [ s("200 OK", [