mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 07:31:38 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			1104 lines
		
	
	
	
		
			36 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1104 lines
		
	
	
	
		
			36 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#!/usr/bin/env python
 | 
						|
 | 
						|
import unittest
 | 
						|
from test import test_support
 | 
						|
 | 
						|
import socket
 | 
						|
import select
 | 
						|
import time
 | 
						|
import thread, threading
 | 
						|
import Queue
 | 
						|
import sys
 | 
						|
import array
 | 
						|
from weakref import proxy
 | 
						|
import signal
 | 
						|
 | 
						|
PORT = 50007
 | 
						|
HOST = 'localhost'
 | 
						|
MSG = 'Michael Gilfix was here\n'
 | 
						|
 | 
						|
class SocketTCPTest(unittest.TestCase):
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | 
						|
        self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 | 
						|
        global PORT
 | 
						|
        PORT = test_support.bind_port(self.serv, HOST, PORT)
 | 
						|
        self.serv.listen(1)
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        self.serv.close()
 | 
						|
        self.serv = None
 | 
						|
 | 
						|
class SocketUDPTest(unittest.TestCase):
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 | 
						|
        self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 | 
						|
        global PORT
 | 
						|
        PORT = test_support.bind_port(self.serv, HOST, PORT)
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        self.serv.close()
 | 
						|
        self.serv = None
 | 
						|
 | 
						|
class ThreadableTest:
 | 
						|
    """Threadable Test class
 | 
						|
 | 
						|
    The ThreadableTest class makes it easy to create a threaded
 | 
						|
    client/server pair from an existing unit test. To create a
 | 
						|
    new threaded class from an existing unit test, use multiple
 | 
						|
    inheritance:
 | 
						|
 | 
						|
        class NewClass (OldClass, ThreadableTest):
 | 
						|
            pass
 | 
						|
 | 
						|
    This class defines two new fixture functions with obvious
 | 
						|
    purposes for overriding:
 | 
						|
 | 
						|
        clientSetUp ()
 | 
						|
        clientTearDown ()
 | 
						|
 | 
						|
    Any new test functions within the class must then define
 | 
						|
    tests in pairs, where the test name is preceeded with a
 | 
						|
    '_' to indicate the client portion of the test. Ex:
 | 
						|
 | 
						|
        def testFoo(self):
 | 
						|
            # Server portion
 | 
						|
 | 
						|
        def _testFoo(self):
 | 
						|
            # Client portion
 | 
						|
 | 
						|
    Any exceptions raised by the clients during their tests
 | 
						|
    are caught and transferred to the main thread to alert
 | 
						|
    the testing framework.
 | 
						|
 | 
						|
    Note, the server setup function cannot call any blocking
 | 
						|
    functions that rely on the client thread during setup,
 | 
						|
    unless serverExplicitReady() is called just before
 | 
						|
    the blocking call (such as in setting up a client/server
 | 
						|
    connection and performing the accept() in setUp().
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self):
 | 
						|
        # Swap the true setup function
 | 
						|
        self.__setUp = self.setUp
 | 
						|
        self.__tearDown = self.tearDown
 | 
						|
        self.setUp = self._setUp
 | 
						|
        self.tearDown = self._tearDown
 | 
						|
 | 
						|
    def serverExplicitReady(self):
 | 
						|
        """This method allows the server to explicitly indicate that
 | 
						|
        it wants the client thread to proceed. This is useful if the
 | 
						|
        server is about to execute a blocking routine that is
 | 
						|
        dependent upon the client thread during its setup routine."""
 | 
						|
        self.server_ready.set()
 | 
						|
 | 
						|
    def _setUp(self):
 | 
						|
        self.server_ready = threading.Event()
 | 
						|
        self.client_ready = threading.Event()
 | 
						|
        self.done = threading.Event()
 | 
						|
        self.queue = Queue.Queue(1)
 | 
						|
 | 
						|
        # Do some munging to start the client test.
 | 
						|
        methodname = self.id()
 | 
						|
        i = methodname.rfind('.')
 | 
						|
        methodname = methodname[i+1:]
 | 
						|
        test_method = getattr(self, '_' + methodname)
 | 
						|
        self.client_thread = thread.start_new_thread(
 | 
						|
            self.clientRun, (test_method,))
 | 
						|
 | 
						|
        self.__setUp()
 | 
						|
        if not self.server_ready.isSet():
 | 
						|
            self.server_ready.set()
 | 
						|
        self.client_ready.wait()
 | 
						|
 | 
						|
    def _tearDown(self):
 | 
						|
        self.__tearDown()
 | 
						|
        self.done.wait()
 | 
						|
 | 
						|
        if not self.queue.empty():
 | 
						|
            msg = self.queue.get()
 | 
						|
            self.fail(msg)
 | 
						|
 | 
						|
    def clientRun(self, test_func):
 | 
						|
        self.server_ready.wait()
 | 
						|
        self.client_ready.set()
 | 
						|
        self.clientSetUp()
 | 
						|
        if not callable(test_func):
 | 
						|
            raise TypeError, "test_func must be a callable function"
 | 
						|
        try:
 | 
						|
            test_func()
 | 
						|
        except Exception, strerror:
 | 
						|
            self.queue.put(strerror)
 | 
						|
        self.clientTearDown()
 | 
						|
 | 
						|
    def clientSetUp(self):
 | 
						|
        raise NotImplementedError, "clientSetUp must be implemented."
 | 
						|
 | 
						|
    def clientTearDown(self):
 | 
						|
        self.done.set()
 | 
						|
        thread.exit()
 | 
						|
 | 
						|
class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
 | 
						|
 | 
						|
    def __init__(self, methodName='runTest'):
 | 
						|
        SocketTCPTest.__init__(self, methodName=methodName)
 | 
						|
        ThreadableTest.__init__(self)
 | 
						|
 | 
						|
    def clientSetUp(self):
 | 
						|
        self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | 
						|
 | 
						|
    def clientTearDown(self):
 | 
						|
        self.cli.close()
 | 
						|
        self.cli = None
 | 
						|
        ThreadableTest.clientTearDown(self)
 | 
						|
 | 
						|
class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
 | 
						|
 | 
						|
    def __init__(self, methodName='runTest'):
 | 
						|
        SocketUDPTest.__init__(self, methodName=methodName)
 | 
						|
        ThreadableTest.__init__(self)
 | 
						|
 | 
						|
    def clientSetUp(self):
 | 
						|
        self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 | 
						|
 | 
						|
class SocketConnectedTest(ThreadedTCPSocketTest):
 | 
						|
 | 
						|
    def __init__(self, methodName='runTest'):
 | 
						|
        ThreadedTCPSocketTest.__init__(self, methodName=methodName)
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        ThreadedTCPSocketTest.setUp(self)
 | 
						|
        # Indicate explicitly we're ready for the client thread to
 | 
						|
        # proceed and then perform the blocking call to accept
 | 
						|
        self.serverExplicitReady()
 | 
						|
        conn, addr = self.serv.accept()
 | 
						|
        self.cli_conn = conn
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        self.cli_conn.close()
 | 
						|
        self.cli_conn = None
 | 
						|
        ThreadedTCPSocketTest.tearDown(self)
 | 
						|
 | 
						|
    def clientSetUp(self):
 | 
						|
        ThreadedTCPSocketTest.clientSetUp(self)
 | 
						|
        self.cli.connect((HOST, PORT))
 | 
						|
        self.serv_conn = self.cli
 | 
						|
 | 
						|
    def clientTearDown(self):
 | 
						|
        self.serv_conn.close()
 | 
						|
        self.serv_conn = None
 | 
						|
        ThreadedTCPSocketTest.clientTearDown(self)
 | 
						|
 | 
						|
class SocketPairTest(unittest.TestCase, ThreadableTest):
 | 
						|
 | 
						|
    def __init__(self, methodName='runTest'):
 | 
						|
        unittest.TestCase.__init__(self, methodName=methodName)
 | 
						|
        ThreadableTest.__init__(self)
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        self.serv, self.cli = socket.socketpair()
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        self.serv.close()
 | 
						|
        self.serv = None
 | 
						|
 | 
						|
    def clientSetUp(self):
 | 
						|
        pass
 | 
						|
 | 
						|
    def clientTearDown(self):
 | 
						|
        self.cli.close()
 | 
						|
        self.cli = None
 | 
						|
        ThreadableTest.clientTearDown(self)
 | 
						|
 | 
						|
 | 
						|
#######################################################################
 | 
						|
## Begin Tests
 | 
						|
 | 
						|
class GeneralModuleTests(unittest.TestCase):
 | 
						|
 | 
						|
    def test_weakref(self):
 | 
						|
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | 
						|
        p = proxy(s)
 | 
						|
        self.assertEqual(p.fileno(), s.fileno())
 | 
						|
        s.close()
 | 
						|
        s = None
 | 
						|
        try:
 | 
						|
            p.fileno()
 | 
						|
        except ReferenceError:
 | 
						|
            pass
 | 
						|
        else:
 | 
						|
            self.fail('Socket proxy still exists')
 | 
						|
 | 
						|
    def testSocketError(self):
 | 
						|
        # Testing socket module exceptions
 | 
						|
        def raise_error(*args, **kwargs):
 | 
						|
            raise socket.error
 | 
						|
        def raise_herror(*args, **kwargs):
 | 
						|
            raise socket.herror
 | 
						|
        def raise_gaierror(*args, **kwargs):
 | 
						|
            raise socket.gaierror
 | 
						|
        self.failUnlessRaises(socket.error, raise_error,
 | 
						|
                              "Error raising socket exception.")
 | 
						|
        self.failUnlessRaises(socket.error, raise_herror,
 | 
						|
                              "Error raising socket exception.")
 | 
						|
        self.failUnlessRaises(socket.error, raise_gaierror,
 | 
						|
                              "Error raising socket exception.")
 | 
						|
 | 
						|
    def testCrucialConstants(self):
 | 
						|
        # Testing for mission critical constants
 | 
						|
        socket.AF_INET
 | 
						|
        socket.SOCK_STREAM
 | 
						|
        socket.SOCK_DGRAM
 | 
						|
        socket.SOCK_RAW
 | 
						|
        socket.SOCK_RDM
 | 
						|
        socket.SOCK_SEQPACKET
 | 
						|
        socket.SOL_SOCKET
 | 
						|
        socket.SO_REUSEADDR
 | 
						|
 | 
						|
    def testHostnameRes(self):
 | 
						|
        # Testing hostname resolution mechanisms
 | 
						|
        hostname = socket.gethostname()
 | 
						|
        try:
 | 
						|
            ip = socket.gethostbyname(hostname)
 | 
						|
        except socket.error:
 | 
						|
            # Probably name lookup wasn't set up right; skip this test
 | 
						|
            return
 | 
						|
        self.assert_(ip.find('.') >= 0, "Error resolving host to ip.")
 | 
						|
        try:
 | 
						|
            hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
 | 
						|
        except socket.error:
 | 
						|
            # Probably a similar problem as above; skip this test
 | 
						|
            return
 | 
						|
        all_host_names = [hostname, hname] + aliases
 | 
						|
        fqhn = socket.getfqdn(ip)
 | 
						|
        if not fqhn in all_host_names:
 | 
						|
            self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
 | 
						|
 | 
						|
    def testRefCountGetNameInfo(self):
 | 
						|
        # Testing reference count for getnameinfo
 | 
						|
        import sys
 | 
						|
        if hasattr(sys, "getrefcount"):
 | 
						|
            try:
 | 
						|
                # On some versions, this loses a reference
 | 
						|
                orig = sys.getrefcount(__name__)
 | 
						|
                socket.getnameinfo(__name__,0)
 | 
						|
            except SystemError:
 | 
						|
                if sys.getrefcount(__name__) <> orig:
 | 
						|
                    self.fail("socket.getnameinfo loses a reference")
 | 
						|
 | 
						|
    def testInterpreterCrash(self):
 | 
						|
        # Making sure getnameinfo doesn't crash the interpreter
 | 
						|
        try:
 | 
						|
            # On some versions, this crashes the interpreter.
 | 
						|
            socket.getnameinfo(('x', 0, 0, 0), 0)
 | 
						|
        except socket.error:
 | 
						|
            pass
 | 
						|
 | 
						|
    def testNtoH(self):
 | 
						|
        # This just checks that htons etc. are their own inverse,
 | 
						|
        # when looking at the lower 16 or 32 bits.
 | 
						|
        sizes = {socket.htonl: 32, socket.ntohl: 32,
 | 
						|
                 socket.htons: 16, socket.ntohs: 16}
 | 
						|
        for func, size in sizes.items():
 | 
						|
            mask = (1L<<size) - 1
 | 
						|
            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
 | 
						|
                self.assertEqual(i & mask, func(func(i&mask)) & mask)
 | 
						|
 | 
						|
            swapped = func(mask)
 | 
						|
            self.assertEqual(swapped & mask, mask)
 | 
						|
            self.assertRaises(OverflowError, func, 1L<<34)
 | 
						|
 | 
						|
    def testNtoHErrors(self):
 | 
						|
        good_values = [ 1, 2, 3, 1L, 2L, 3L ]
 | 
						|
        bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
 | 
						|
        for k in good_values:
 | 
						|
            socket.ntohl(k)
 | 
						|
            socket.ntohs(k)
 | 
						|
            socket.htonl(k)
 | 
						|
            socket.htons(k)
 | 
						|
        for k in bad_values:
 | 
						|
            self.assertRaises(OverflowError, socket.ntohl, k)
 | 
						|
            self.assertRaises(OverflowError, socket.ntohs, k)
 | 
						|
            self.assertRaises(OverflowError, socket.htonl, k)
 | 
						|
            self.assertRaises(OverflowError, socket.htons, k)
 | 
						|
 | 
						|
    def testGetServBy(self):
 | 
						|
        eq = self.assertEqual
 | 
						|
        # Find one service that exists, then check all the related interfaces.
 | 
						|
        # I've ordered this by protocols that have both a tcp and udp
 | 
						|
        # protocol, at least for modern Linuxes.
 | 
						|
        if sys.platform in ('linux2', 'freebsd4', 'freebsd5', 'freebsd6',
 | 
						|
                            'freebsd7', 'darwin'):
 | 
						|
            # avoid the 'echo' service on this platform, as there is an
 | 
						|
            # assumption breaking non-standard port/protocol entry
 | 
						|
            services = ('daytime', 'qotd', 'domain')
 | 
						|
        else:
 | 
						|
            services = ('echo', 'daytime', 'domain')
 | 
						|
        for service in services:
 | 
						|
            try:
 | 
						|
                port = socket.getservbyname(service, 'tcp')
 | 
						|
                break
 | 
						|
            except socket.error:
 | 
						|
                pass
 | 
						|
        else:
 | 
						|
            raise socket.error
 | 
						|
        # Try same call with optional protocol omitted
 | 
						|
        port2 = socket.getservbyname(service)
 | 
						|
        eq(port, port2)
 | 
						|
        # Try udp, but don't barf it it doesn't exist
 | 
						|
        try:
 | 
						|
            udpport = socket.getservbyname(service, 'udp')
 | 
						|
        except socket.error:
 | 
						|
            udpport = None
 | 
						|
        else:
 | 
						|
            eq(udpport, port)
 | 
						|
        # Now make sure the lookup by port returns the same service name
 | 
						|
        eq(socket.getservbyport(port2), service)
 | 
						|
        eq(socket.getservbyport(port, 'tcp'), service)
 | 
						|
        if udpport is not None:
 | 
						|
            eq(socket.getservbyport(udpport, 'udp'), service)
 | 
						|
 | 
						|
    def testDefaultTimeout(self):
 | 
						|
        # Testing default timeout
 | 
						|
        # The default timeout should initially be None
 | 
						|
        self.assertEqual(socket.getdefaulttimeout(), None)
 | 
						|
        s = socket.socket()
 | 
						|
        self.assertEqual(s.gettimeout(), None)
 | 
						|
        s.close()
 | 
						|
 | 
						|
        # Set the default timeout to 10, and see if it propagates
 | 
						|
        socket.setdefaulttimeout(10)
 | 
						|
        self.assertEqual(socket.getdefaulttimeout(), 10)
 | 
						|
        s = socket.socket()
 | 
						|
        self.assertEqual(s.gettimeout(), 10)
 | 
						|
        s.close()
 | 
						|
 | 
						|
        # Reset the default timeout to None, and see if it propagates
 | 
						|
        socket.setdefaulttimeout(None)
 | 
						|
        self.assertEqual(socket.getdefaulttimeout(), None)
 | 
						|
        s = socket.socket()
 | 
						|
        self.assertEqual(s.gettimeout(), None)
 | 
						|
        s.close()
 | 
						|
 | 
						|
        # Check that setting it to an invalid value raises ValueError
 | 
						|
        self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
 | 
						|
 | 
						|
        # Check that setting it to an invalid type raises TypeError
 | 
						|
        self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
 | 
						|
 | 
						|
    def testIPv4toString(self):
 | 
						|
        if not hasattr(socket, 'inet_pton'):
 | 
						|
            return # No inet_pton() on this platform
 | 
						|
        from socket import inet_aton as f, inet_pton, AF_INET
 | 
						|
        g = lambda a: inet_pton(AF_INET, a)
 | 
						|
 | 
						|
        self.assertEquals('\x00\x00\x00\x00', f('0.0.0.0'))
 | 
						|
        self.assertEquals('\xff\x00\xff\x00', f('255.0.255.0'))
 | 
						|
        self.assertEquals('\xaa\xaa\xaa\xaa', f('170.170.170.170'))
 | 
						|
        self.assertEquals('\x01\x02\x03\x04', f('1.2.3.4'))
 | 
						|
        self.assertEquals('\xff\xff\xff\xff', f('255.255.255.255'))
 | 
						|
 | 
						|
        self.assertEquals('\x00\x00\x00\x00', g('0.0.0.0'))
 | 
						|
        self.assertEquals('\xff\x00\xff\x00', g('255.0.255.0'))
 | 
						|
        self.assertEquals('\xaa\xaa\xaa\xaa', g('170.170.170.170'))
 | 
						|
        self.assertEquals('\xff\xff\xff\xff', g('255.255.255.255'))
 | 
						|
 | 
						|
    def testIPv6toString(self):
 | 
						|
        if not hasattr(socket, 'inet_pton'):
 | 
						|
            return # No inet_pton() on this platform
 | 
						|
        try:
 | 
						|
            from socket import inet_pton, AF_INET6, has_ipv6
 | 
						|
            if not has_ipv6:
 | 
						|
                return
 | 
						|
        except ImportError:
 | 
						|
            return
 | 
						|
        f = lambda a: inet_pton(AF_INET6, a)
 | 
						|
 | 
						|
        self.assertEquals('\x00' * 16, f('::'))
 | 
						|
        self.assertEquals('\x00' * 16, f('0::0'))
 | 
						|
        self.assertEquals('\x00\x01' + '\x00' * 14, f('1::'))
 | 
						|
        self.assertEquals(
 | 
						|
            '\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
 | 
						|
            f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
 | 
						|
        )
 | 
						|
 | 
						|
    def testStringToIPv4(self):
 | 
						|
        if not hasattr(socket, 'inet_ntop'):
 | 
						|
            return # No inet_ntop() on this platform
 | 
						|
        from socket import inet_ntoa as f, inet_ntop, AF_INET
 | 
						|
        g = lambda a: inet_ntop(AF_INET, a)
 | 
						|
 | 
						|
        self.assertEquals('1.0.1.0', f('\x01\x00\x01\x00'))
 | 
						|
        self.assertEquals('170.85.170.85', f('\xaa\x55\xaa\x55'))
 | 
						|
        self.assertEquals('255.255.255.255', f('\xff\xff\xff\xff'))
 | 
						|
        self.assertEquals('1.2.3.4', f('\x01\x02\x03\x04'))
 | 
						|
 | 
						|
        self.assertEquals('1.0.1.0', g('\x01\x00\x01\x00'))
 | 
						|
        self.assertEquals('170.85.170.85', g('\xaa\x55\xaa\x55'))
 | 
						|
        self.assertEquals('255.255.255.255', g('\xff\xff\xff\xff'))
 | 
						|
 | 
						|
    def testStringToIPv6(self):
 | 
						|
        if not hasattr(socket, 'inet_ntop'):
 | 
						|
            return # No inet_ntop() on this platform
 | 
						|
        try:
 | 
						|
            from socket import inet_ntop, AF_INET6, has_ipv6
 | 
						|
            if not has_ipv6:
 | 
						|
                return
 | 
						|
        except ImportError:
 | 
						|
            return
 | 
						|
        f = lambda a: inet_ntop(AF_INET6, a)
 | 
						|
 | 
						|
        self.assertEquals('::', f('\x00' * 16))
 | 
						|
        self.assertEquals('::1', f('\x00' * 15 + '\x01'))
 | 
						|
        self.assertEquals(
 | 
						|
            'aef:b01:506:1001:ffff:9997:55:170',
 | 
						|
            f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
 | 
						|
        )
 | 
						|
 | 
						|
    # XXX The following don't test module-level functionality...
 | 
						|
 | 
						|
    def testSockName(self):
 | 
						|
        # Testing getsockname()
 | 
						|
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | 
						|
        sock.bind(("0.0.0.0", PORT+1))
 | 
						|
        name = sock.getsockname()
 | 
						|
        # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
 | 
						|
        # it reasonable to get the host's addr in addition to 0.0.0.0.
 | 
						|
        # At least for eCos.  This is required for the S/390 to pass.
 | 
						|
        my_ip_addr = socket.gethostbyname(socket.gethostname())
 | 
						|
        self.assert_(name[0] in ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
 | 
						|
        self.assertEqual(name[1], PORT+1)
 | 
						|
 | 
						|
    def testGetSockOpt(self):
 | 
						|
        # Testing getsockopt()
 | 
						|
        # We know a socket should start without reuse==0
 | 
						|
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | 
						|
        reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
 | 
						|
        self.failIf(reuse != 0, "initial mode is reuse")
 | 
						|
 | 
						|
    def testSetSockOpt(self):
 | 
						|
        # Testing setsockopt()
 | 
						|
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | 
						|
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 | 
						|
        reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
 | 
						|
        self.failIf(reuse == 0, "failed to set reuse mode")
 | 
						|
 | 
						|
    def testSendAfterClose(self):
 | 
						|
        # testing send() after close() with timeout
 | 
						|
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | 
						|
        sock.settimeout(1)
 | 
						|
        sock.close()
 | 
						|
        self.assertRaises(socket.error, sock.send, "spam")
 | 
						|
 | 
						|
    def testNewAttributes(self):
 | 
						|
        # testing .family, .type and .protocol
 | 
						|
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | 
						|
        self.assertEqual(sock.family, socket.AF_INET)
 | 
						|
        self.assertEqual(sock.type, socket.SOCK_STREAM)
 | 
						|
        self.assertEqual(sock.proto, 0)
 | 
						|
        sock.close()
 | 
						|
 | 
						|
class BasicTCPTest(SocketConnectedTest):
 | 
						|
 | 
						|
    def __init__(self, methodName='runTest'):
 | 
						|
        SocketConnectedTest.__init__(self, methodName=methodName)
 | 
						|
 | 
						|
    def testRecv(self):
 | 
						|
        # Testing large receive over TCP
 | 
						|
        msg = self.cli_conn.recv(1024)
 | 
						|
        self.assertEqual(msg, MSG)
 | 
						|
 | 
						|
    def _testRecv(self):
 | 
						|
        self.serv_conn.send(MSG)
 | 
						|
 | 
						|
    def testOverFlowRecv(self):
 | 
						|
        # Testing receive in chunks over TCP
 | 
						|
        seg1 = self.cli_conn.recv(len(MSG) - 3)
 | 
						|
        seg2 = self.cli_conn.recv(1024)
 | 
						|
        msg = seg1 + seg2
 | 
						|
        self.assertEqual(msg, MSG)
 | 
						|
 | 
						|
    def _testOverFlowRecv(self):
 | 
						|
        self.serv_conn.send(MSG)
 | 
						|
 | 
						|
    def testRecvFrom(self):
 | 
						|
        # Testing large recvfrom() over TCP
 | 
						|
        msg, addr = self.cli_conn.recvfrom(1024)
 | 
						|
        self.assertEqual(msg, MSG)
 | 
						|
 | 
						|
    def _testRecvFrom(self):
 | 
						|
        self.serv_conn.send(MSG)
 | 
						|
 | 
						|
    def testOverFlowRecvFrom(self):
 | 
						|
        # Testing recvfrom() in chunks over TCP
 | 
						|
        seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
 | 
						|
        seg2, addr = self.cli_conn.recvfrom(1024)
 | 
						|
        msg = seg1 + seg2
 | 
						|
        self.assertEqual(msg, MSG)
 | 
						|
 | 
						|
    def _testOverFlowRecvFrom(self):
 | 
						|
        self.serv_conn.send(MSG)
 | 
						|
 | 
						|
    def testSendAll(self):
 | 
						|
        # Testing sendall() with a 2048 byte string over TCP
 | 
						|
        msg = ''
 | 
						|
        while 1:
 | 
						|
            read = self.cli_conn.recv(1024)
 | 
						|
            if not read:
 | 
						|
                break
 | 
						|
            msg += read
 | 
						|
        self.assertEqual(msg, 'f' * 2048)
 | 
						|
 | 
						|
    def _testSendAll(self):
 | 
						|
        big_chunk = 'f' * 2048
 | 
						|
        self.serv_conn.sendall(big_chunk)
 | 
						|
 | 
						|
    def testFromFd(self):
 | 
						|
        # Testing fromfd()
 | 
						|
        if not hasattr(socket, "fromfd"):
 | 
						|
            return # On Windows, this doesn't exist
 | 
						|
        fd = self.cli_conn.fileno()
 | 
						|
        sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
 | 
						|
        msg = sock.recv(1024)
 | 
						|
        self.assertEqual(msg, MSG)
 | 
						|
 | 
						|
    def _testFromFd(self):
 | 
						|
        self.serv_conn.send(MSG)
 | 
						|
 | 
						|
    def testShutdown(self):
 | 
						|
        # Testing shutdown()
 | 
						|
        msg = self.cli_conn.recv(1024)
 | 
						|
        self.assertEqual(msg, MSG)
 | 
						|
 | 
						|
    def _testShutdown(self):
 | 
						|
        self.serv_conn.send(MSG)
 | 
						|
        self.serv_conn.shutdown(2)
 | 
						|
 | 
						|
class BasicUDPTest(ThreadedUDPSocketTest):
 | 
						|
 | 
						|
    def __init__(self, methodName='runTest'):
 | 
						|
        ThreadedUDPSocketTest.__init__(self, methodName=methodName)
 | 
						|
 | 
						|
    def testSendtoAndRecv(self):
 | 
						|
        # Testing sendto() and Recv() over UDP
 | 
						|
        msg = self.serv.recv(len(MSG))
 | 
						|
        self.assertEqual(msg, MSG)
 | 
						|
 | 
						|
    def _testSendtoAndRecv(self):
 | 
						|
        self.cli.sendto(MSG, 0, (HOST, PORT))
 | 
						|
 | 
						|
    def testRecvFrom(self):
 | 
						|
        # Testing recvfrom() over UDP
 | 
						|
        msg, addr = self.serv.recvfrom(len(MSG))
 | 
						|
        self.assertEqual(msg, MSG)
 | 
						|
 | 
						|
    def _testRecvFrom(self):
 | 
						|
        self.cli.sendto(MSG, 0, (HOST, PORT))
 | 
						|
 | 
						|
    def testRecvFromNegative(self):
 | 
						|
        # Negative lengths passed to recvfrom should give ValueError.
 | 
						|
        self.assertRaises(ValueError, self.serv.recvfrom, -1)
 | 
						|
 | 
						|
    def _testRecvFromNegative(self):
 | 
						|
        self.cli.sendto(MSG, 0, (HOST, PORT))
 | 
						|
 | 
						|
class TCPCloserTest(ThreadedTCPSocketTest):
 | 
						|
 | 
						|
    def testClose(self):
 | 
						|
        conn, addr = self.serv.accept()
 | 
						|
        conn.close()
 | 
						|
 | 
						|
        sd = self.cli
 | 
						|
        read, write, err = select.select([sd], [], [], 1.0)
 | 
						|
        self.assertEqual(read, [sd])
 | 
						|
        self.assertEqual(sd.recv(1), '')
 | 
						|
 | 
						|
    def _testClose(self):
 | 
						|
        self.cli.connect((HOST, PORT))
 | 
						|
        time.sleep(1.0)
 | 
						|
 | 
						|
class BasicSocketPairTest(SocketPairTest):
 | 
						|
 | 
						|
    def __init__(self, methodName='runTest'):
 | 
						|
        SocketPairTest.__init__(self, methodName=methodName)
 | 
						|
 | 
						|
    def testRecv(self):
 | 
						|
        msg = self.serv.recv(1024)
 | 
						|
        self.assertEqual(msg, MSG)
 | 
						|
 | 
						|
    def _testRecv(self):
 | 
						|
        self.cli.send(MSG)
 | 
						|
 | 
						|
    def testSend(self):
 | 
						|
        self.serv.send(MSG)
 | 
						|
 | 
						|
    def _testSend(self):
 | 
						|
        msg = self.cli.recv(1024)
 | 
						|
        self.assertEqual(msg, MSG)
 | 
						|
 | 
						|
class NonBlockingTCPTests(ThreadedTCPSocketTest):
 | 
						|
 | 
						|
    def __init__(self, methodName='runTest'):
 | 
						|
        ThreadedTCPSocketTest.__init__(self, methodName=methodName)
 | 
						|
 | 
						|
    def testSetBlocking(self):
 | 
						|
        # Testing whether set blocking works
 | 
						|
        self.serv.setblocking(0)
 | 
						|
        start = time.time()
 | 
						|
        try:
 | 
						|
            self.serv.accept()
 | 
						|
        except socket.error:
 | 
						|
            pass
 | 
						|
        end = time.time()
 | 
						|
        self.assert_((end - start) < 1.0, "Error setting non-blocking mode.")
 | 
						|
 | 
						|
    def _testSetBlocking(self):
 | 
						|
        pass
 | 
						|
 | 
						|
    def testAccept(self):
 | 
						|
        # Testing non-blocking accept
 | 
						|
        self.serv.setblocking(0)
 | 
						|
        try:
 | 
						|
            conn, addr = self.serv.accept()
 | 
						|
        except socket.error:
 | 
						|
            pass
 | 
						|
        else:
 | 
						|
            self.fail("Error trying to do non-blocking accept.")
 | 
						|
        read, write, err = select.select([self.serv], [], [])
 | 
						|
        if self.serv in read:
 | 
						|
            conn, addr = self.serv.accept()
 | 
						|
        else:
 | 
						|
            self.fail("Error trying to do accept after select.")
 | 
						|
 | 
						|
    def _testAccept(self):
 | 
						|
        time.sleep(0.1)
 | 
						|
        self.cli.connect((HOST, PORT))
 | 
						|
 | 
						|
    def testConnect(self):
 | 
						|
        # Testing non-blocking connect
 | 
						|
        conn, addr = self.serv.accept()
 | 
						|
 | 
						|
    def _testConnect(self):
 | 
						|
        self.cli.settimeout(10)
 | 
						|
        self.cli.connect((HOST, PORT))
 | 
						|
 | 
						|
    def testRecv(self):
 | 
						|
        # Testing non-blocking recv
 | 
						|
        conn, addr = self.serv.accept()
 | 
						|
        conn.setblocking(0)
 | 
						|
        try:
 | 
						|
            msg = conn.recv(len(MSG))
 | 
						|
        except socket.error:
 | 
						|
            pass
 | 
						|
        else:
 | 
						|
            self.fail("Error trying to do non-blocking recv.")
 | 
						|
        read, write, err = select.select([conn], [], [])
 | 
						|
        if conn in read:
 | 
						|
            msg = conn.recv(len(MSG))
 | 
						|
            self.assertEqual(msg, MSG)
 | 
						|
        else:
 | 
						|
            self.fail("Error during select call to non-blocking socket.")
 | 
						|
 | 
						|
    def _testRecv(self):
 | 
						|
        self.cli.connect((HOST, PORT))
 | 
						|
        time.sleep(0.1)
 | 
						|
        self.cli.send(MSG)
 | 
						|
 | 
						|
class FileObjectClassTestCase(SocketConnectedTest):
 | 
						|
 | 
						|
    bufsize = -1 # Use default buffer size
 | 
						|
 | 
						|
    def __init__(self, methodName='runTest'):
 | 
						|
        SocketConnectedTest.__init__(self, methodName=methodName)
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        SocketConnectedTest.setUp(self)
 | 
						|
        self.serv_file = self.cli_conn.makefile('rb', self.bufsize)
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        self.serv_file.close()
 | 
						|
        self.assert_(self.serv_file.closed)
 | 
						|
        self.serv_file = None
 | 
						|
        SocketConnectedTest.tearDown(self)
 | 
						|
 | 
						|
    def clientSetUp(self):
 | 
						|
        SocketConnectedTest.clientSetUp(self)
 | 
						|
        self.cli_file = self.serv_conn.makefile('wb')
 | 
						|
 | 
						|
    def clientTearDown(self):
 | 
						|
        self.cli_file.close()
 | 
						|
        self.assert_(self.cli_file.closed)
 | 
						|
        self.cli_file = None
 | 
						|
        SocketConnectedTest.clientTearDown(self)
 | 
						|
 | 
						|
    def testSmallRead(self):
 | 
						|
        # Performing small file read test
 | 
						|
        first_seg = self.serv_file.read(len(MSG)-3)
 | 
						|
        second_seg = self.serv_file.read(3)
 | 
						|
        msg = first_seg + second_seg
 | 
						|
        self.assertEqual(msg, MSG)
 | 
						|
 | 
						|
    def _testSmallRead(self):
 | 
						|
        self.cli_file.write(MSG)
 | 
						|
        self.cli_file.flush()
 | 
						|
 | 
						|
    def testFullRead(self):
 | 
						|
        # read until EOF
 | 
						|
        msg = self.serv_file.read()
 | 
						|
        self.assertEqual(msg, MSG)
 | 
						|
 | 
						|
    def _testFullRead(self):
 | 
						|
        self.cli_file.write(MSG)
 | 
						|
        self.cli_file.close()
 | 
						|
 | 
						|
    def testUnbufferedRead(self):
 | 
						|
        # Performing unbuffered file read test
 | 
						|
        buf = ''
 | 
						|
        while 1:
 | 
						|
            char = self.serv_file.read(1)
 | 
						|
            if not char:
 | 
						|
                break
 | 
						|
            buf += char
 | 
						|
        self.assertEqual(buf, MSG)
 | 
						|
 | 
						|
    def _testUnbufferedRead(self):
 | 
						|
        self.cli_file.write(MSG)
 | 
						|
        self.cli_file.flush()
 | 
						|
 | 
						|
    def testReadline(self):
 | 
						|
        # Performing file readline test
 | 
						|
        line = self.serv_file.readline()
 | 
						|
        self.assertEqual(line, MSG)
 | 
						|
 | 
						|
    def _testReadline(self):
 | 
						|
        self.cli_file.write(MSG)
 | 
						|
        self.cli_file.flush()
 | 
						|
 | 
						|
    def testClosedAttr(self):
 | 
						|
        self.assert_(not self.serv_file.closed)
 | 
						|
 | 
						|
    def _testClosedAttr(self):
 | 
						|
        self.assert_(not self.cli_file.closed)
 | 
						|
 | 
						|
class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
 | 
						|
 | 
						|
    """Repeat the tests from FileObjectClassTestCase with bufsize==0.
 | 
						|
 | 
						|
    In this case (and in this case only), it should be possible to
 | 
						|
    create a file object, read a line from it, create another file
 | 
						|
    object, read another line from it, without loss of data in the
 | 
						|
    first file object's buffer.  Note that httplib relies on this
 | 
						|
    when reading multiple requests from the same socket."""
 | 
						|
 | 
						|
    bufsize = 0 # Use unbuffered mode
 | 
						|
 | 
						|
    def testUnbufferedReadline(self):
 | 
						|
        # Read a line, create a new file object, read another line with it
 | 
						|
        line = self.serv_file.readline() # first line
 | 
						|
        self.assertEqual(line, "A. " + MSG) # first line
 | 
						|
        self.serv_file = self.cli_conn.makefile('rb', 0)
 | 
						|
        line = self.serv_file.readline() # second line
 | 
						|
        self.assertEqual(line, "B. " + MSG) # second line
 | 
						|
 | 
						|
    def _testUnbufferedReadline(self):
 | 
						|
        self.cli_file.write("A. " + MSG)
 | 
						|
        self.cli_file.write("B. " + MSG)
 | 
						|
        self.cli_file.flush()
 | 
						|
 | 
						|
class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
 | 
						|
 | 
						|
    bufsize = 1 # Default-buffered for reading; line-buffered for writing
 | 
						|
 | 
						|
 | 
						|
class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
 | 
						|
 | 
						|
    bufsize = 2 # Exercise the buffering code
 | 
						|
 | 
						|
 | 
						|
class NetworkConnectionTest(object):
 | 
						|
    """Prove network connection."""
 | 
						|
    def clientSetUp(self):
 | 
						|
        self.cli = socket.create_connection((HOST, PORT))
 | 
						|
        self.serv_conn = self.cli
 | 
						|
 | 
						|
class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
 | 
						|
    """Tests that NetworkConnection does not break existing TCP functionality.
 | 
						|
    """
 | 
						|
 | 
						|
class NetworkConnectionNoServer(unittest.TestCase):
 | 
						|
    def testWithoutServer(self):
 | 
						|
        self.failUnlessRaises(socket.error, lambda: socket.create_connection((HOST, PORT)))
 | 
						|
 | 
						|
class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
 | 
						|
 | 
						|
    def __init__(self, methodName='runTest'):
 | 
						|
        SocketTCPTest.__init__(self, methodName=methodName)
 | 
						|
        ThreadableTest.__init__(self)
 | 
						|
 | 
						|
    def clientSetUp(self):
 | 
						|
        pass
 | 
						|
 | 
						|
    def clientTearDown(self):
 | 
						|
        self.cli.close()
 | 
						|
        self.cli = None
 | 
						|
        ThreadableTest.clientTearDown(self)
 | 
						|
 | 
						|
    def _justAccept(self):
 | 
						|
        conn, addr = self.serv.accept()
 | 
						|
 | 
						|
    testFamily = _justAccept
 | 
						|
    def _testFamily(self):
 | 
						|
        self.cli = socket.create_connection((HOST, PORT), timeout=30)
 | 
						|
        self.assertEqual(self.cli.family, 2)
 | 
						|
 | 
						|
    testTimeoutDefault = _justAccept
 | 
						|
    def _testTimeoutDefault(self):
 | 
						|
        self.cli = socket.create_connection((HOST, PORT))
 | 
						|
        self.assertTrue(self.cli.gettimeout() is None)
 | 
						|
 | 
						|
    testTimeoutValueNamed = _justAccept
 | 
						|
    def _testTimeoutValueNamed(self):
 | 
						|
        self.cli = socket.create_connection((HOST, PORT), timeout=30)
 | 
						|
        self.assertEqual(self.cli.gettimeout(), 30)
 | 
						|
 | 
						|
    testTimeoutValueNonamed = _justAccept
 | 
						|
    def _testTimeoutValueNonamed(self):
 | 
						|
        self.cli = socket.create_connection((HOST, PORT), 30)
 | 
						|
        self.assertEqual(self.cli.gettimeout(), 30)
 | 
						|
 | 
						|
    testTimeoutNone = _justAccept
 | 
						|
    def _testTimeoutNone(self):
 | 
						|
        previous = socket.getdefaulttimeout()
 | 
						|
        socket.setdefaulttimeout(30)
 | 
						|
        try:
 | 
						|
            self.cli = socket.create_connection((HOST, PORT), timeout=None)
 | 
						|
        finally:
 | 
						|
            socket.setdefaulttimeout(previous)
 | 
						|
        self.assertEqual(self.cli.gettimeout(), 30)
 | 
						|
 | 
						|
 | 
						|
class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):
 | 
						|
 | 
						|
    def __init__(self, methodName='runTest'):
 | 
						|
        SocketTCPTest.__init__(self, methodName=methodName)
 | 
						|
        ThreadableTest.__init__(self)
 | 
						|
 | 
						|
    def clientSetUp(self):
 | 
						|
        pass
 | 
						|
 | 
						|
    def clientTearDown(self):
 | 
						|
        self.cli.close()
 | 
						|
        self.cli = None
 | 
						|
        ThreadableTest.clientTearDown(self)
 | 
						|
 | 
						|
    def testInsideTimeout(self):
 | 
						|
        conn, addr = self.serv.accept()
 | 
						|
        time.sleep(3)
 | 
						|
        conn.send("done!")
 | 
						|
    testOutsideTimeout = testInsideTimeout
 | 
						|
 | 
						|
    def _testInsideTimeout(self):
 | 
						|
        self.cli = sock = socket.create_connection((HOST, PORT))
 | 
						|
        data = sock.recv(5)
 | 
						|
        self.assertEqual(data, "done!")
 | 
						|
 | 
						|
    def _testOutsideTimeout(self):
 | 
						|
        self.cli = sock = socket.create_connection((HOST, PORT), timeout=1)
 | 
						|
        self.failUnlessRaises(socket.timeout, lambda: sock.recv(5))
 | 
						|
 | 
						|
 | 
						|
class Urllib2FileobjectTest(unittest.TestCase):
 | 
						|
 | 
						|
    # urllib2.HTTPHandler has "borrowed" socket._fileobject, and requires that
 | 
						|
    # it close the socket if the close c'tor argument is true
 | 
						|
 | 
						|
    def testClose(self):
 | 
						|
        class MockSocket:
 | 
						|
            closed = False
 | 
						|
            def flush(self): pass
 | 
						|
            def close(self): self.closed = True
 | 
						|
 | 
						|
        # must not close unless we request it: the original use of _fileobject
 | 
						|
        # by module socket requires that the underlying socket not be closed until
 | 
						|
        # the _socketobject that created the _fileobject is closed
 | 
						|
        s = MockSocket()
 | 
						|
        f = socket._fileobject(s)
 | 
						|
        f.close()
 | 
						|
        self.assert_(not s.closed)
 | 
						|
 | 
						|
        s = MockSocket()
 | 
						|
        f = socket._fileobject(s, close=True)
 | 
						|
        f.close()
 | 
						|
        self.assert_(s.closed)
 | 
						|
 | 
						|
class TCPTimeoutTest(SocketTCPTest):
 | 
						|
 | 
						|
    def testTCPTimeout(self):
 | 
						|
        def raise_timeout(*args, **kwargs):
 | 
						|
            self.serv.settimeout(1.0)
 | 
						|
            self.serv.accept()
 | 
						|
        self.failUnlessRaises(socket.timeout, raise_timeout,
 | 
						|
                              "Error generating a timeout exception (TCP)")
 | 
						|
 | 
						|
    def testTimeoutZero(self):
 | 
						|
        ok = False
 | 
						|
        try:
 | 
						|
            self.serv.settimeout(0.0)
 | 
						|
            foo = self.serv.accept()
 | 
						|
        except socket.timeout:
 | 
						|
            self.fail("caught timeout instead of error (TCP)")
 | 
						|
        except socket.error:
 | 
						|
            ok = True
 | 
						|
        except:
 | 
						|
            self.fail("caught unexpected exception (TCP)")
 | 
						|
        if not ok:
 | 
						|
            self.fail("accept() returned success when we did not expect it")
 | 
						|
 | 
						|
    def testInterruptedTimeout(self):
 | 
						|
        # XXX I don't know how to do this test on MSWindows or any other
 | 
						|
        # plaform that doesn't support signal.alarm() or os.kill(), though
 | 
						|
        # the bug should have existed on all platforms.
 | 
						|
        if not hasattr(signal, "alarm"):
 | 
						|
            return                  # can only test on *nix
 | 
						|
        self.serv.settimeout(5.0)   # must be longer than alarm
 | 
						|
        class Alarm(Exception):
 | 
						|
            pass
 | 
						|
        def alarm_handler(signal, frame):
 | 
						|
            raise Alarm
 | 
						|
        old_alarm = signal.signal(signal.SIGALRM, alarm_handler)
 | 
						|
        try:
 | 
						|
            signal.alarm(2)    # POSIX allows alarm to be up to 1 second early
 | 
						|
            try:
 | 
						|
                foo = self.serv.accept()
 | 
						|
            except socket.timeout:
 | 
						|
                self.fail("caught timeout instead of Alarm")
 | 
						|
            except Alarm:
 | 
						|
                pass
 | 
						|
            except:
 | 
						|
                self.fail("caught other exception instead of Alarm")
 | 
						|
            else:
 | 
						|
                self.fail("nothing caught")
 | 
						|
            signal.alarm(0)         # shut off alarm
 | 
						|
        except Alarm:
 | 
						|
            self.fail("got Alarm in wrong place")
 | 
						|
        finally:
 | 
						|
            # no alarm can be pending.  Safe to restore old handler.
 | 
						|
            signal.signal(signal.SIGALRM, old_alarm)
 | 
						|
 | 
						|
class UDPTimeoutTest(SocketTCPTest):
 | 
						|
 | 
						|
    def testUDPTimeout(self):
 | 
						|
        def raise_timeout(*args, **kwargs):
 | 
						|
            self.serv.settimeout(1.0)
 | 
						|
            self.serv.recv(1024)
 | 
						|
        self.failUnlessRaises(socket.timeout, raise_timeout,
 | 
						|
                              "Error generating a timeout exception (UDP)")
 | 
						|
 | 
						|
    def testTimeoutZero(self):
 | 
						|
        ok = False
 | 
						|
        try:
 | 
						|
            self.serv.settimeout(0.0)
 | 
						|
            foo = self.serv.recv(1024)
 | 
						|
        except socket.timeout:
 | 
						|
            self.fail("caught timeout instead of error (UDP)")
 | 
						|
        except socket.error:
 | 
						|
            ok = True
 | 
						|
        except:
 | 
						|
            self.fail("caught unexpected exception (UDP)")
 | 
						|
        if not ok:
 | 
						|
            self.fail("recv() returned success when we did not expect it")
 | 
						|
 | 
						|
class TestExceptions(unittest.TestCase):
 | 
						|
 | 
						|
    def testExceptionTree(self):
 | 
						|
        self.assert_(issubclass(socket.error, Exception))
 | 
						|
        self.assert_(issubclass(socket.herror, socket.error))
 | 
						|
        self.assert_(issubclass(socket.gaierror, socket.error))
 | 
						|
        self.assert_(issubclass(socket.timeout, socket.error))
 | 
						|
 | 
						|
class TestLinuxAbstractNamespace(unittest.TestCase):
 | 
						|
 | 
						|
    UNIX_PATH_MAX = 108
 | 
						|
 | 
						|
    def testLinuxAbstractNamespace(self):
 | 
						|
        address = "\x00python-test-hello\x00\xff"
 | 
						|
        s1 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
 | 
						|
        s1.bind(address)
 | 
						|
        s1.listen(1)
 | 
						|
        s2 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
 | 
						|
        s2.connect(s1.getsockname())
 | 
						|
        s1.accept()
 | 
						|
        self.assertEqual(s1.getsockname(), address)
 | 
						|
        self.assertEqual(s2.getpeername(), address)
 | 
						|
 | 
						|
    def testMaxName(self):
 | 
						|
        address = "\x00" + "h" * (self.UNIX_PATH_MAX - 1)
 | 
						|
        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
 | 
						|
        s.bind(address)
 | 
						|
        self.assertEqual(s.getsockname(), address)
 | 
						|
 | 
						|
    def testNameOverflow(self):
 | 
						|
        address = "\x00" + "h" * self.UNIX_PATH_MAX
 | 
						|
        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
 | 
						|
        self.assertRaises(socket.error, s.bind, address)
 | 
						|
 | 
						|
 | 
						|
class BufferIOTest(SocketConnectedTest):
 | 
						|
    """
 | 
						|
    Test the buffer versions of socket.recv() and socket.send().
 | 
						|
    """
 | 
						|
    def __init__(self, methodName='runTest'):
 | 
						|
        SocketConnectedTest.__init__(self, methodName=methodName)
 | 
						|
 | 
						|
    def testRecvInto(self):
 | 
						|
        buf = array.array('c', ' '*1024)
 | 
						|
        nbytes = self.cli_conn.recv_into(buf)
 | 
						|
        self.assertEqual(nbytes, len(MSG))
 | 
						|
        msg = buf.tostring()[:len(MSG)]
 | 
						|
        self.assertEqual(msg, MSG)
 | 
						|
 | 
						|
    def _testRecvInto(self):
 | 
						|
        buf = buffer(MSG)
 | 
						|
        self.serv_conn.send(buf)
 | 
						|
 | 
						|
    def testRecvFromInto(self):
 | 
						|
        buf = array.array('c', ' '*1024)
 | 
						|
        nbytes, addr = self.cli_conn.recvfrom_into(buf)
 | 
						|
        self.assertEqual(nbytes, len(MSG))
 | 
						|
        msg = buf.tostring()[:len(MSG)]
 | 
						|
        self.assertEqual(msg, MSG)
 | 
						|
 | 
						|
    def _testRecvFromInto(self):
 | 
						|
        buf = buffer(MSG)
 | 
						|
        self.serv_conn.send(buf)
 | 
						|
 | 
						|
def test_main():
 | 
						|
    tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest,
 | 
						|
             TestExceptions, BufferIOTest, BasicTCPTest2]
 | 
						|
    if sys.platform != 'mac':
 | 
						|
        tests.extend([ BasicUDPTest, UDPTimeoutTest ])
 | 
						|
 | 
						|
    tests.extend([
 | 
						|
        NonBlockingTCPTests,
 | 
						|
        FileObjectClassTestCase,
 | 
						|
        UnbufferedFileObjectClassTestCase,
 | 
						|
        LineBufferedFileObjectClassTestCase,
 | 
						|
        SmallBufferedFileObjectClassTestCase,
 | 
						|
        Urllib2FileobjectTest,
 | 
						|
        NetworkConnectionNoServer,
 | 
						|
        NetworkConnectionAttributesTest,
 | 
						|
        NetworkConnectionBehaviourTest,
 | 
						|
    ])
 | 
						|
    if hasattr(socket, "socketpair"):
 | 
						|
        tests.append(BasicSocketPairTest)
 | 
						|
    if sys.platform == 'linux2':
 | 
						|
        tests.append(TestLinuxAbstractNamespace)
 | 
						|
 | 
						|
    thread_info = test_support.threading_setup()
 | 
						|
    test_support.run_unittest(*tests)
 | 
						|
    test_support.threading_cleanup(*thread_info)
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    test_main()
 |