mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 07:31:38 +00:00 
			
		
		
		
	Added test for behavior of operations on an unconnected SMTP object,
and tests for NOOP, RSET, and VRFY. Corrected typo in a comment for testNonnumericPort. Added a check for constructing SMTP objects when non-numeric ports are included in the host name. Derived a server from SMTPServer to test various ESMTP/SMTP capabilities. Check that a second HELO to DebuggingServer returns an error. [GSoC - Alan McIntyre]
This commit is contained in:
		
							parent
							
								
									32008321f5
								
							
						
					
					
						commit
						1bc8d639df
					
				
					 1 changed files with 192 additions and 11 deletions
				
			
		| 
						 | 
					@ -1,4 +1,5 @@
 | 
				
			||||||
import asyncore
 | 
					import asyncore
 | 
				
			||||||
 | 
					import email.utils
 | 
				
			||||||
import socket
 | 
					import socket
 | 
				
			||||||
import threading
 | 
					import threading
 | 
				
			||||||
import smtpd
 | 
					import smtpd
 | 
				
			||||||
| 
						 | 
					@ -75,6 +76,15 @@ def testBasic2(self):
 | 
				
			||||||
        smtp = smtplib.SMTP("%s:%s" % (HOST, PORT))
 | 
					        smtp = smtplib.SMTP("%s:%s" % (HOST, PORT))
 | 
				
			||||||
        smtp.sock.close()
 | 
					        smtp.sock.close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def testNotConnected(self):
 | 
				
			||||||
 | 
					        # Test various operations on an unconnected SMTP object that
 | 
				
			||||||
 | 
					        # should raise exceptions (at present the attempt in SMTP.send
 | 
				
			||||||
 | 
					        # to reference the nonexistent 'sock' attribute of the SMTP object
 | 
				
			||||||
 | 
					        # causes an AttributeError)
 | 
				
			||||||
 | 
					        smtp = smtplib.SMTP()
 | 
				
			||||||
 | 
					        self.assertRaises(AttributeError, smtp.ehlo)
 | 
				
			||||||
 | 
					        self.assertRaises(AttributeError, smtp.send, 'test msg')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def testLocalHostName(self):
 | 
					    def testLocalHostName(self):
 | 
				
			||||||
        # check that supplied local_hostname is used
 | 
					        # check that supplied local_hostname is used
 | 
				
			||||||
        smtp = smtplib.SMTP(HOST, PORT, local_hostname="testhost")
 | 
					        smtp = smtplib.SMTP(HOST, PORT, local_hostname="testhost")
 | 
				
			||||||
| 
						 | 
					@ -82,9 +92,11 @@ def testLocalHostName(self):
 | 
				
			||||||
        smtp.sock.close()
 | 
					        smtp.sock.close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def testNonnumericPort(self):
 | 
					    def testNonnumericPort(self):
 | 
				
			||||||
        # check that non-numeric port raises ValueError
 | 
					        # check that non-numeric port raises socket.error
 | 
				
			||||||
        self.assertRaises(socket.error, smtplib.SMTP,
 | 
					        self.assertRaises(socket.error, smtplib.SMTP,
 | 
				
			||||||
                          "localhost", "bogus")
 | 
					                          "localhost", "bogus")
 | 
				
			||||||
 | 
					        self.assertRaises(socket.error, smtplib.SMTP,
 | 
				
			||||||
 | 
					                          "localhost:bogus")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def testTimeoutDefault(self):
 | 
					    def testTimeoutDefault(self):
 | 
				
			||||||
        # default
 | 
					        # default
 | 
				
			||||||
| 
						 | 
					@ -110,9 +122,9 @@ def testTimeoutNone(self):
 | 
				
			||||||
        smtp.sock.close()
 | 
					        smtp.sock.close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# Test server using smtpd.DebuggingServer
 | 
					# Test server thread using the specified SMTP server class
 | 
				
			||||||
def debugging_server(serv_evt, client_evt):
 | 
					def debugging_server(server_class, serv_evt, client_evt):
 | 
				
			||||||
    serv = smtpd.DebuggingServer(("", 0), ('nowhere', -1))
 | 
					    serv = server_class(("", 0), ('nowhere', -1))
 | 
				
			||||||
    global PORT
 | 
					    global PORT
 | 
				
			||||||
    PORT = serv.getsockname()[1]
 | 
					    PORT = serv.getsockname()[1]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -148,11 +160,12 @@ def debugging_server(serv_evt, client_evt):
 | 
				
			||||||
MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
 | 
					MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
 | 
				
			||||||
MSG_END = '------------ END MESSAGE ------------\n'
 | 
					MSG_END = '------------ END MESSAGE ------------\n'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# Test behavior of smtpd.DebuggingServer
 | 
					# NOTE: Some SMTP objects in the tests below are created with a non-default
 | 
				
			||||||
# NOTE: the SMTP objects are created with a non-default local_hostname
 | 
					# local_hostname argument to the constructor, since (on some systems) the FQDN
 | 
				
			||||||
# argument to the constructor, since (on some systems) the FQDN lookup
 | 
					# lookup caused by the default local_hostname sometimes takes so long that the
 | 
				
			||||||
# caused by the default local_hostname sometimes takes so long that the
 | 
					 | 
				
			||||||
# test server times out, causing the test to fail.
 | 
					# test server times out, causing the test to fail.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# Test behavior of smtpd.DebuggingServer
 | 
				
			||||||
class DebuggingServerTests(TestCase):
 | 
					class DebuggingServerTests(TestCase):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def setUp(self):
 | 
					    def setUp(self):
 | 
				
			||||||
| 
						 | 
					@ -163,7 +176,7 @@ def setUp(self):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.serv_evt = threading.Event()
 | 
					        self.serv_evt = threading.Event()
 | 
				
			||||||
        self.client_evt = threading.Event()
 | 
					        self.client_evt = threading.Event()
 | 
				
			||||||
        serv_args = (self.serv_evt, self.client_evt)
 | 
					        serv_args = (smtpd.DebuggingServer, self.serv_evt, self.client_evt)
 | 
				
			||||||
        threading.Thread(target=debugging_server, args=serv_args).start()
 | 
					        threading.Thread(target=debugging_server, args=serv_args).start()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # wait until server thread has assigned a port number
 | 
					        # wait until server thread has assigned a port number
 | 
				
			||||||
| 
						 | 
					@ -189,12 +202,42 @@ def testBasic(self):
 | 
				
			||||||
        smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
 | 
					        smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
 | 
				
			||||||
        smtp.quit()
 | 
					        smtp.quit()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def testEHLO(self):
 | 
					    def testNOOP(self):
 | 
				
			||||||
 | 
					        smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
 | 
				
			||||||
 | 
					        expected = (250, 'Ok')
 | 
				
			||||||
 | 
					        self.assertEqual(smtp.noop(), expected)
 | 
				
			||||||
 | 
					        smtp.quit()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def testRSET(self):
 | 
				
			||||||
 | 
					        smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
 | 
				
			||||||
 | 
					        expected = (250, 'Ok')
 | 
				
			||||||
 | 
					        self.assertEqual(smtp.rset(), expected)
 | 
				
			||||||
 | 
					        smtp.quit()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def testNotImplemented(self):
 | 
				
			||||||
 | 
					        # EHLO isn't implemented in DebuggingServer
 | 
				
			||||||
        smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
 | 
					        smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
 | 
				
			||||||
        expected = (502, 'Error: command "EHLO" not implemented')
 | 
					        expected = (502, 'Error: command "EHLO" not implemented')
 | 
				
			||||||
        self.assertEqual(smtp.ehlo(), expected)
 | 
					        self.assertEqual(smtp.ehlo(), expected)
 | 
				
			||||||
        smtp.quit()
 | 
					        smtp.quit()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def testVRFY(self):
 | 
				
			||||||
 | 
					        # VRFY isn't implemented in DebuggingServer
 | 
				
			||||||
 | 
					        smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
 | 
				
			||||||
 | 
					        expected = (502, 'Error: command "VRFY" not implemented')
 | 
				
			||||||
 | 
					        self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)
 | 
				
			||||||
 | 
					        self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)
 | 
				
			||||||
 | 
					        smtp.quit()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def testSecondHELO(self):
 | 
				
			||||||
 | 
					        # check that a second HELO returns a message that it's a duplicate
 | 
				
			||||||
 | 
					        # (this behavior is specific to smtpd.SMTPChannel)
 | 
				
			||||||
 | 
					        smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
 | 
				
			||||||
 | 
					        smtp.helo()
 | 
				
			||||||
 | 
					        expected = (503, 'Duplicate HELO/EHLO')
 | 
				
			||||||
 | 
					        self.assertEqual(smtp.helo(), expected)
 | 
				
			||||||
 | 
					        smtp.quit()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def testHELP(self):
 | 
					    def testHELP(self):
 | 
				
			||||||
        smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
 | 
					        smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
 | 
				
			||||||
        self.assertEqual(smtp.help(), 'Error: command "HELP" not implemented')
 | 
					        self.assertEqual(smtp.help(), 'Error: command "HELP" not implemented')
 | 
				
			||||||
| 
						 | 
					@ -214,6 +257,7 @@ def testSend(self):
 | 
				
			||||||
        self.assertEqual(self.output.getvalue(), mexpect)
 | 
					        self.assertEqual(self.output.getvalue(), mexpect)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# test response of client to a non-successful HELO message
 | 
				
			||||||
class BadHELOServerTests(TestCase):
 | 
					class BadHELOServerTests(TestCase):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def setUp(self):
 | 
					    def setUp(self):
 | 
				
			||||||
| 
						 | 
					@ -243,9 +287,146 @@ def testFailingHELO(self):
 | 
				
			||||||
        self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
 | 
					        self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
 | 
				
			||||||
                            HOST, PORT, 'localhost', 3)
 | 
					                            HOST, PORT, 'localhost', 3)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					sim_users = {'Mr.A@somewhere.com':'John A',
 | 
				
			||||||
 | 
					             'Ms.B@somewhere.com':'Sally B',
 | 
				
			||||||
 | 
					             'Mrs.C@somewhereesle.com':'Ruth C',
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],
 | 
				
			||||||
 | 
					             'list-2':['Ms.B@somewhere.com',],
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# Simulated SMTP channel & server
 | 
				
			||||||
 | 
					class SimSMTPChannel(smtpd.SMTPChannel):
 | 
				
			||||||
 | 
					    def smtp_EHLO(self, arg):
 | 
				
			||||||
 | 
					        resp = '250-testhost\r\n' \
 | 
				
			||||||
 | 
					               '250-EXPN\r\n' \
 | 
				
			||||||
 | 
					               '250-SIZE 20000000\r\n' \
 | 
				
			||||||
 | 
					               '250-STARTTLS\r\n' \
 | 
				
			||||||
 | 
					               '250-DELIVERBY\r\n' \
 | 
				
			||||||
 | 
					               '250 HELP'
 | 
				
			||||||
 | 
					        self.push(resp)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def smtp_VRFY(self, arg):
 | 
				
			||||||
 | 
					#        print '\nsmtp_VRFY(%r)\n' % arg
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        raw_addr = email.utils.parseaddr(arg)[1]
 | 
				
			||||||
 | 
					        quoted_addr = smtplib.quoteaddr(arg)
 | 
				
			||||||
 | 
					        if raw_addr in sim_users:
 | 
				
			||||||
 | 
					            self.push('250 %s %s' % (sim_users[raw_addr], quoted_addr))
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            self.push('550 No such user: %s' % arg)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def smtp_EXPN(self, arg):
 | 
				
			||||||
 | 
					#        print '\nsmtp_EXPN(%r)\n' % arg
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        list_name = email.utils.parseaddr(arg)[1].lower()
 | 
				
			||||||
 | 
					        if list_name in sim_lists:
 | 
				
			||||||
 | 
					            user_list = sim_lists[list_name]
 | 
				
			||||||
 | 
					            for n, user_email in enumerate(user_list):
 | 
				
			||||||
 | 
					                quoted_addr = smtplib.quoteaddr(user_email)
 | 
				
			||||||
 | 
					                if n < len(user_list) - 1:
 | 
				
			||||||
 | 
					                    self.push('250-%s %s' % (sim_users[user_email], quoted_addr))
 | 
				
			||||||
 | 
					                else:
 | 
				
			||||||
 | 
					                    self.push('250 %s %s' % (sim_users[user_email], quoted_addr))
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            self.push('550 No access for you!')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class SimSMTPServer(smtpd.SMTPServer):
 | 
				
			||||||
 | 
					    def handle_accept(self):
 | 
				
			||||||
 | 
					        conn, addr = self.accept()
 | 
				
			||||||
 | 
					        channel = SimSMTPChannel(self, conn, addr)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def process_message(self, peer, mailfrom, rcpttos, data):
 | 
				
			||||||
 | 
					        pass
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# Test various SMTP & ESMTP commands/behaviors that require a simulated server
 | 
				
			||||||
 | 
					# (i.e., something with more features than DebuggingServer)
 | 
				
			||||||
 | 
					class SMTPSimTests(TestCase):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def setUp(self):
 | 
				
			||||||
 | 
					        self.serv_evt = threading.Event()
 | 
				
			||||||
 | 
					        self.client_evt = threading.Event()
 | 
				
			||||||
 | 
					        serv_args = (SimSMTPServer, self.serv_evt, self.client_evt)
 | 
				
			||||||
 | 
					        threading.Thread(target=debugging_server, args=serv_args).start()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # wait until server thread has assigned a port number
 | 
				
			||||||
 | 
					        n = 500
 | 
				
			||||||
 | 
					        while PORT is None and n > 0:
 | 
				
			||||||
 | 
					            time.sleep(0.01)
 | 
				
			||||||
 | 
					            n -= 1
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # wait a little longer (sometimes connections are refused
 | 
				
			||||||
 | 
					        # on slow machines without this additional wait)
 | 
				
			||||||
 | 
					        time.sleep(0.5)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def tearDown(self):
 | 
				
			||||||
 | 
					        # indicate that the client is finished
 | 
				
			||||||
 | 
					        self.client_evt.set()
 | 
				
			||||||
 | 
					        # wait for the server thread to terminate
 | 
				
			||||||
 | 
					        self.serv_evt.wait()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def testBasic(self):
 | 
				
			||||||
 | 
					        # smoke test
 | 
				
			||||||
 | 
					        smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
 | 
				
			||||||
 | 
					        smtp.quit()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def testEHLO(self):
 | 
				
			||||||
 | 
					        smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # no features should be present before the EHLO
 | 
				
			||||||
 | 
					        self.assertEqual(smtp.esmtp_features, {})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # features expected from the test server
 | 
				
			||||||
 | 
					        expected_features = {'expn':'',
 | 
				
			||||||
 | 
					                             'size': '20000000',
 | 
				
			||||||
 | 
					                             'starttls': '',
 | 
				
			||||||
 | 
					                             'deliverby': '',
 | 
				
			||||||
 | 
					                             'help': '',
 | 
				
			||||||
 | 
					                             }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        smtp.ehlo()
 | 
				
			||||||
 | 
					        self.assertEqual(smtp.esmtp_features, expected_features)
 | 
				
			||||||
 | 
					        for k in expected_features:
 | 
				
			||||||
 | 
					            self.assertTrue(smtp.has_extn(k))
 | 
				
			||||||
 | 
					        self.assertFalse(smtp.has_extn('unsupported-feature'))
 | 
				
			||||||
 | 
					        smtp.quit()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def testVRFY(self):
 | 
				
			||||||
 | 
					        smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        for email, name in sim_users.items():
 | 
				
			||||||
 | 
					            expected_known = (250, '%s %s' % (name, smtplib.quoteaddr(email)))
 | 
				
			||||||
 | 
					            self.assertEqual(smtp.vrfy(email), expected_known)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        u = 'nobody@nowhere.com'
 | 
				
			||||||
 | 
					        expected_unknown = (550, 'No such user: %s' % smtplib.quoteaddr(u))
 | 
				
			||||||
 | 
					        self.assertEqual(smtp.vrfy(u), expected_unknown)
 | 
				
			||||||
 | 
					        smtp.quit()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def testEXPN(self):
 | 
				
			||||||
 | 
					        smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        for listname, members in sim_lists.items():
 | 
				
			||||||
 | 
					            users = []
 | 
				
			||||||
 | 
					            for m in members:
 | 
				
			||||||
 | 
					                users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m)))
 | 
				
			||||||
 | 
					            expected_known = (250, '\n'.join(users))
 | 
				
			||||||
 | 
					            self.assertEqual(smtp.expn(listname), expected_known)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        u = 'PSU-Members-List'
 | 
				
			||||||
 | 
					        expected_unknown = (550, 'No access for you!')
 | 
				
			||||||
 | 
					        self.assertEqual(smtp.expn(u), expected_unknown)
 | 
				
			||||||
 | 
					        smtp.quit()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def test_main(verbose=None):
 | 
					def test_main(verbose=None):
 | 
				
			||||||
    test_support.run_unittest(GeneralTests, DebuggingServerTests,
 | 
					    test_support.run_unittest(GeneralTests, DebuggingServerTests,
 | 
				
			||||||
                              BadHELOServerTests)
 | 
					                              BadHELOServerTests, SMTPSimTests)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
if __name__ == '__main__':
 | 
					if __name__ == '__main__':
 | 
				
			||||||
    test_main()
 | 
					    test_main()
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue