diff options
| author | Lieuwe <lieuwemo@gmail.com> | 2011-03-18 16:04:51 (GMT) |
|---|---|---|
| committer | Lieuwe <lieuwemo@gmail.com> | 2011-03-18 16:04:51 (GMT) |
| commit | 9c39875ef09d439ef51716dba091188f72977f5e (patch) | |
| tree | fb0fd27a1bcd3e54a1913852c23aa137ee5eb30b /src/python/stdlib/test/test_socket.py | |
| parent | bc8af4e2101ac93bdc51b59ee64fa43ecb2442f3 (diff) | |
| download | powder-9c39875ef09d439ef51716dba091188f72977f5e.zip powder-9c39875ef09d439ef51716dba091188f72977f5e.tar.gz | |
...
Diffstat (limited to 'src/python/stdlib/test/test_socket.py')
| -rw-r--r-- | src/python/stdlib/test/test_socket.py | 1572 |
1 files changed, 1572 insertions, 0 deletions
diff --git a/src/python/stdlib/test/test_socket.py b/src/python/stdlib/test/test_socket.py new file mode 100644 index 0000000..d41bca5 --- /dev/null +++ b/src/python/stdlib/test/test_socket.py @@ -0,0 +1,1572 @@ +#!/usr/bin/env python + +import unittest +from test import test_support + +import errno +import socket +import select +import time +import traceback +import Queue +import sys +import os +import array +import contextlib +from weakref import proxy +import signal +import math + +def try_address(host, port=0, family=socket.AF_INET): + """Try to bind a socket on the given host:port and return True + if that has been possible.""" + try: + sock = socket.socket(family, socket.SOCK_STREAM) + sock.bind((host, port)) + except (socket.error, socket.gaierror): + return False + else: + sock.close() + return True + +HOST = test_support.HOST +MSG = b'Michael Gilfix was here\n' +SUPPORTS_IPV6 = socket.has_ipv6 and try_address('::1', family=socket.AF_INET6) + +try: + import thread + import threading +except ImportError: + thread = None + threading = None + +HOST = test_support.HOST +MSG = 'Michael Gilfix was here\n' + +class SocketTCPTest(unittest.TestCase): + + def setUp(self): + self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.port = test_support.bind_port(self.serv) + self.serv.listen(1) + + def tearDown(self): + self.serv.close() + self.serv = None + +class SocketUDPTest(unittest.TestCase): + + def setUp(self): + self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.port = test_support.bind_port(self.serv) + + def tearDown(self): + self.serv.close() + self.serv = None + +class ThreadableTest: + """Threadable Test class + + The ThreadableTest class makes it easy to create a threaded + client/server pair from an existing unit test. To create a + new threaded class from an existing unit test, use multiple + inheritance: + + class NewClass (OldClass, ThreadableTest): + pass + + This class defines two new fixture functions with obvious + purposes for overriding: + + clientSetUp () + clientTearDown () + + Any new test functions within the class must then define + tests in pairs, where the test name is preceeded with a + '_' to indicate the client portion of the test. Ex: + + def testFoo(self): + # Server portion + + def _testFoo(self): + # Client portion + + Any exceptions raised by the clients during their tests + are caught and transferred to the main thread to alert + the testing framework. + + Note, the server setup function cannot call any blocking + functions that rely on the client thread during setup, + unless serverExplicitReady() is called just before + the blocking call (such as in setting up a client/server + connection and performing the accept() in setUp(). + """ + + def __init__(self): + # Swap the true setup function + self.__setUp = self.setUp + self.__tearDown = self.tearDown + self.setUp = self._setUp + self.tearDown = self._tearDown + + def serverExplicitReady(self): + """This method allows the server to explicitly indicate that + it wants the client thread to proceed. This is useful if the + server is about to execute a blocking routine that is + dependent upon the client thread during its setup routine.""" + self.server_ready.set() + + def _setUp(self): + self.server_ready = threading.Event() + self.client_ready = threading.Event() + self.done = threading.Event() + self.queue = Queue.Queue(1) + + # Do some munging to start the client test. + methodname = self.id() + i = methodname.rfind('.') + methodname = methodname[i+1:] + test_method = getattr(self, '_' + methodname) + self.client_thread = thread.start_new_thread( + self.clientRun, (test_method,)) + + self.__setUp() + if not self.server_ready.is_set(): + self.server_ready.set() + self.client_ready.wait() + + def _tearDown(self): + self.__tearDown() + self.done.wait() + + if not self.queue.empty(): + msg = self.queue.get() + self.fail(msg) + + def clientRun(self, test_func): + self.server_ready.wait() + self.client_ready.set() + self.clientSetUp() + with test_support.check_py3k_warnings(): + if not callable(test_func): + raise TypeError("test_func must be a callable function.") + try: + test_func() + except Exception, strerror: + self.queue.put(strerror) + self.clientTearDown() + + def clientSetUp(self): + raise NotImplementedError("clientSetUp must be implemented.") + + def clientTearDown(self): + self.done.set() + thread.exit() + +class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest): + + def __init__(self, methodName='runTest'): + SocketTCPTest.__init__(self, methodName=methodName) + ThreadableTest.__init__(self) + + def clientSetUp(self): + self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + def clientTearDown(self): + self.cli.close() + self.cli = None + ThreadableTest.clientTearDown(self) + +class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest): + + def __init__(self, methodName='runTest'): + SocketUDPTest.__init__(self, methodName=methodName) + ThreadableTest.__init__(self) + + def clientSetUp(self): + self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + + def clientTearDown(self): + self.cli.close() + self.cli = None + ThreadableTest.clientTearDown(self) + +class SocketConnectedTest(ThreadedTCPSocketTest): + + def __init__(self, methodName='runTest'): + ThreadedTCPSocketTest.__init__(self, methodName=methodName) + + def setUp(self): + ThreadedTCPSocketTest.setUp(self) + # Indicate explicitly we're ready for the client thread to + # proceed and then perform the blocking call to accept + self.serverExplicitReady() + conn, addr = self.serv.accept() + self.cli_conn = conn + + def tearDown(self): + self.cli_conn.close() + self.cli_conn = None + ThreadedTCPSocketTest.tearDown(self) + + def clientSetUp(self): + ThreadedTCPSocketTest.clientSetUp(self) + self.cli.connect((HOST, self.port)) + self.serv_conn = self.cli + + def clientTearDown(self): + self.serv_conn.close() + self.serv_conn = None + ThreadedTCPSocketTest.clientTearDown(self) + +class SocketPairTest(unittest.TestCase, ThreadableTest): + + def __init__(self, methodName='runTest'): + unittest.TestCase.__init__(self, methodName=methodName) + ThreadableTest.__init__(self) + + def setUp(self): + self.serv, self.cli = socket.socketpair() + + def tearDown(self): + self.serv.close() + self.serv = None + + def clientSetUp(self): + pass + + def clientTearDown(self): + self.cli.close() + self.cli = None + ThreadableTest.clientTearDown(self) + + +####################################################################### +## Begin Tests + +class GeneralModuleTests(unittest.TestCase): + + def test_weakref(self): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + p = proxy(s) + self.assertEqual(p.fileno(), s.fileno()) + s.close() + s = None + try: + p.fileno() + except ReferenceError: + pass + else: + self.fail('Socket proxy still exists') + + def testSocketError(self): + # Testing socket module exceptions + def raise_error(*args, **kwargs): + raise socket.error + def raise_herror(*args, **kwargs): + raise socket.herror + def raise_gaierror(*args, **kwargs): + raise socket.gaierror + self.assertRaises(socket.error, raise_error, + "Error raising socket exception.") + self.assertRaises(socket.error, raise_herror, + "Error raising socket exception.") + self.assertRaises(socket.error, raise_gaierror, + "Error raising socket exception.") + + def testCrucialConstants(self): + # Testing for mission critical constants + socket.AF_INET + socket.SOCK_STREAM + socket.SOCK_DGRAM + socket.SOCK_RAW + socket.SOCK_RDM + socket.SOCK_SEQPACKET + socket.SOL_SOCKET + socket.SO_REUSEADDR + + def testHostnameRes(self): + # Testing hostname resolution mechanisms + hostname = socket.gethostname() + try: + ip = socket.gethostbyname(hostname) + except socket.error: + # Probably name lookup wasn't set up right; skip this test + return + self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.") + try: + hname, aliases, ipaddrs = socket.gethostbyaddr(ip) + except socket.error: + # Probably a similar problem as above; skip this test + return + all_host_names = [hostname, hname] + aliases + fqhn = socket.getfqdn(ip) + if not fqhn in all_host_names: + self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names))) + + def testRefCountGetNameInfo(self): + # Testing reference count for getnameinfo + if hasattr(sys, "getrefcount"): + try: + # On some versions, this loses a reference + orig = sys.getrefcount(__name__) + socket.getnameinfo(__name__,0) + except TypeError: + self.assertEqual(sys.getrefcount(__name__), orig, + "socket.getnameinfo loses a reference") + + def testInterpreterCrash(self): + # Making sure getnameinfo doesn't crash the interpreter + try: + # On some versions, this crashes the interpreter. + socket.getnameinfo(('x', 0, 0, 0), 0) + except socket.error: + pass + + def testNtoH(self): + # This just checks that htons etc. are their own inverse, + # when looking at the lower 16 or 32 bits. + sizes = {socket.htonl: 32, socket.ntohl: 32, + socket.htons: 16, socket.ntohs: 16} + for func, size in sizes.items(): + mask = (1L<<size) - 1 + for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210): + self.assertEqual(i & mask, func(func(i&mask)) & mask) + + swapped = func(mask) + self.assertEqual(swapped & mask, mask) + self.assertRaises(OverflowError, func, 1L<<34) + + def testNtoHErrors(self): + good_values = [ 1, 2, 3, 1L, 2L, 3L ] + bad_values = [ -1, -2, -3, -1L, -2L, -3L ] + for k in good_values: + socket.ntohl(k) + socket.ntohs(k) + socket.htonl(k) + socket.htons(k) + for k in bad_values: + self.assertRaises(OverflowError, socket.ntohl, k) + self.assertRaises(OverflowError, socket.ntohs, k) + self.assertRaises(OverflowError, socket.htonl, k) + self.assertRaises(OverflowError, socket.htons, k) + + def testGetServBy(self): + eq = self.assertEqual + # Find one service that exists, then check all the related interfaces. + # I've ordered this by protocols that have both a tcp and udp + # protocol, at least for modern Linuxes. + if (sys.platform.startswith('linux') or + sys.platform.startswith('freebsd') or + sys.platform.startswith('netbsd') or + sys.platform == 'darwin'): + # avoid the 'echo' service on this platform, as there is an + # assumption breaking non-standard port/protocol entry + services = ('daytime', 'qotd', 'domain') + else: + services = ('echo', 'daytime', 'domain') + for service in services: + try: + port = socket.getservbyname(service, 'tcp') + break + except socket.error: + pass + else: + raise socket.error + # Try same call with optional protocol omitted + port2 = socket.getservbyname(service) + eq(port, port2) + # Try udp, but don't barf it it doesn't exist + try: + udpport = socket.getservbyname(service, 'udp') + except socket.error: + udpport = None + else: + eq(udpport, port) + # Now make sure the lookup by port returns the same service name + eq(socket.getservbyport(port2), service) + eq(socket.getservbyport(port, 'tcp'), service) + if udpport is not None: + eq(socket.getservbyport(udpport, 'udp'), service) + # Make sure getservbyport does not accept out of range ports. + self.assertRaises(OverflowError, socket.getservbyport, -1) + self.assertRaises(OverflowError, socket.getservbyport, 65536) + + def testDefaultTimeout(self): + # Testing default timeout + # The default timeout should initially be None + self.assertEqual(socket.getdefaulttimeout(), None) + s = socket.socket() + self.assertEqual(s.gettimeout(), None) + s.close() + + # Set the default timeout to 10, and see if it propagates + socket.setdefaulttimeout(10) + self.assertEqual(socket.getdefaulttimeout(), 10) + s = socket.socket() + self.assertEqual(s.gettimeout(), 10) + s.close() + + # Reset the default timeout to None, and see if it propagates + socket.setdefaulttimeout(None) + self.assertEqual(socket.getdefaulttimeout(), None) + s = socket.socket() + self.assertEqual(s.gettimeout(), None) + s.close() + + # Check that setting it to an invalid value raises ValueError + self.assertRaises(ValueError, socket.setdefaulttimeout, -1) + + # Check that setting it to an invalid type raises TypeError + self.assertRaises(TypeError, socket.setdefaulttimeout, "spam") + + def testIPv4_inet_aton_fourbytes(self): + if not hasattr(socket, 'inet_aton'): + return # No inet_aton, nothing to check + # Test that issue1008086 and issue767150 are fixed. + # It must return 4 bytes. + self.assertEqual('\x00'*4, socket.inet_aton('0.0.0.0')) + self.assertEqual('\xff'*4, socket.inet_aton('255.255.255.255')) + + def testIPv4toString(self): + if not hasattr(socket, 'inet_pton'): + return # No inet_pton() on this platform + from socket import inet_aton as f, inet_pton, AF_INET + g = lambda a: inet_pton(AF_INET, a) + + self.assertEqual('\x00\x00\x00\x00', f('0.0.0.0')) + self.assertEqual('\xff\x00\xff\x00', f('255.0.255.0')) + self.assertEqual('\xaa\xaa\xaa\xaa', f('170.170.170.170')) + self.assertEqual('\x01\x02\x03\x04', f('1.2.3.4')) + self.assertEqual('\xff\xff\xff\xff', f('255.255.255.255')) + + self.assertEqual('\x00\x00\x00\x00', g('0.0.0.0')) + self.assertEqual('\xff\x00\xff\x00', g('255.0.255.0')) + self.assertEqual('\xaa\xaa\xaa\xaa', g('170.170.170.170')) + self.assertEqual('\xff\xff\xff\xff', g('255.255.255.255')) + + def testIPv6toString(self): + if not hasattr(socket, 'inet_pton'): + return # No inet_pton() on this platform + try: + from socket import inet_pton, AF_INET6, has_ipv6 + if not has_ipv6: + return + except ImportError: + return + f = lambda a: inet_pton(AF_INET6, a) + + self.assertEqual('\x00' * 16, f('::')) + self.assertEqual('\x00' * 16, f('0::0')) + self.assertEqual('\x00\x01' + '\x00' * 14, f('1::')) + self.assertEqual( + '\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae', + f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae') + ) + + def testStringToIPv4(self): + if not hasattr(socket, 'inet_ntop'): + return # No inet_ntop() on this platform + from socket import inet_ntoa as f, inet_ntop, AF_INET + g = lambda a: inet_ntop(AF_INET, a) + + self.assertEqual('1.0.1.0', f('\x01\x00\x01\x00')) + self.assertEqual('170.85.170.85', f('\xaa\x55\xaa\x55')) + self.assertEqual('255.255.255.255', f('\xff\xff\xff\xff')) + self.assertEqual('1.2.3.4', f('\x01\x02\x03\x04')) + + self.assertEqual('1.0.1.0', g('\x01\x00\x01\x00')) + self.assertEqual('170.85.170.85', g('\xaa\x55\xaa\x55')) + self.assertEqual('255.255.255.255', g('\xff\xff\xff\xff')) + + def testStringToIPv6(self): + if not hasattr(socket, 'inet_ntop'): + return # No inet_ntop() on this platform + try: + from socket import inet_ntop, AF_INET6, has_ipv6 + if not has_ipv6: + return + except ImportError: + return + f = lambda a: inet_ntop(AF_INET6, a) + + self.assertEqual('::', f('\x00' * 16)) + self.assertEqual('::1', f('\x00' * 15 + '\x01')) + self.assertEqual( + 'aef:b01:506:1001:ffff:9997:55:170', + f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70') + ) + + # XXX The following don't test module-level functionality... + + def _get_unused_port(self, bind_address='0.0.0.0'): + """Use a temporary socket to elicit an unused ephemeral port. + + Args: + bind_address: Hostname or IP address to search for a port on. + + Returns: A most likely to be unused port. + """ + tempsock = socket.socket() + tempsock.bind((bind_address, 0)) + host, port = tempsock.getsockname() + tempsock.close() + return port + + def testSockName(self): + # Testing getsockname() + port = self._get_unused_port() + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.addCleanup(sock.close) + sock.bind(("0.0.0.0", port)) + name = sock.getsockname() + # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate + # it reasonable to get the host's addr in addition to 0.0.0.0. + # At least for eCos. This is required for the S/390 to pass. + try: + my_ip_addr = socket.gethostbyname(socket.gethostname()) + except socket.error: + # Probably name lookup wasn't set up right; skip this test + return + self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0]) + self.assertEqual(name[1], port) + + def testGetSockOpt(self): + # Testing getsockopt() + # We know a socket should start without reuse==0 + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.addCleanup(sock.close) + reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) + self.assertFalse(reuse != 0, "initial mode is reuse") + + def testSetSockOpt(self): + # Testing setsockopt() + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.addCleanup(sock.close) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) + self.assertFalse(reuse == 0, "failed to set reuse mode") + + def testSendAfterClose(self): + # testing send() after close() with timeout + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(1) + sock.close() + self.assertRaises(socket.error, sock.send, "spam") + + def testNewAttributes(self): + # testing .family, .type and .protocol + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.assertEqual(sock.family, socket.AF_INET) + self.assertEqual(sock.type, socket.SOCK_STREAM) + self.assertEqual(sock.proto, 0) + sock.close() + + def test_getsockaddrarg(self): + host = '0.0.0.0' + port = self._get_unused_port(bind_address=host) + big_port = port + 65536 + neg_port = port - 65536 + sock = socket.socket() + try: + self.assertRaises(OverflowError, sock.bind, (host, big_port)) + self.assertRaises(OverflowError, sock.bind, (host, neg_port)) + sock.bind((host, port)) + finally: + sock.close() + + @unittest.skipUnless(os.name == "nt", "Windows specific") + def test_sock_ioctl(self): + self.assertTrue(hasattr(socket.socket, 'ioctl')) + self.assertTrue(hasattr(socket, 'SIO_RCVALL')) + self.assertTrue(hasattr(socket, 'RCVALL_ON')) + self.assertTrue(hasattr(socket, 'RCVALL_OFF')) + self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS')) + s = socket.socket() + self.addCleanup(s.close) + self.assertRaises(ValueError, s.ioctl, -1, None) + s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100)) + + def testGetaddrinfo(self): + try: + socket.getaddrinfo('localhost', 80) + except socket.gaierror as err: + if err.errno == socket.EAI_SERVICE: + # see http://bugs.python.org/issue1282647 + self.skipTest("buggy libc version") + raise + # len of every sequence is supposed to be == 5 + for info in socket.getaddrinfo(HOST, None): + self.assertEqual(len(info), 5) + # host can be a domain name, a string representation of an + # IPv4/v6 address or None + socket.getaddrinfo('localhost', 80) + socket.getaddrinfo('127.0.0.1', 80) + socket.getaddrinfo(None, 80) + if SUPPORTS_IPV6: + socket.getaddrinfo('::1', 80) + # port can be a string service name such as "http", a numeric + # port number or None + socket.getaddrinfo(HOST, "http") + socket.getaddrinfo(HOST, 80) + socket.getaddrinfo(HOST, None) + # test family and socktype filters + infos = socket.getaddrinfo(HOST, None, socket.AF_INET) + for family, _, _, _, _ in infos: + self.assertEqual(family, socket.AF_INET) + infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM) + for _, socktype, _, _, _ in infos: + self.assertEqual(socktype, socket.SOCK_STREAM) + # test proto and flags arguments + socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP) + socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE) + # a server willing to support both IPv4 and IPv6 will + # usually do this + socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, + socket.AI_PASSIVE) + + + def check_sendall_interrupted(self, with_timeout): + # socketpair() is not stricly required, but it makes things easier. + if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'): + self.skipTest("signal.alarm and socket.socketpair required for this test") + # Our signal handlers clobber the C errno by calling a math function + # with an invalid domain value. + def ok_handler(*args): + self.assertRaises(ValueError, math.acosh, 0) + def raising_handler(*args): + self.assertRaises(ValueError, math.acosh, 0) + 1 // 0 + c, s = socket.socketpair() + old_alarm = signal.signal(signal.SIGALRM, raising_handler) + try: + if with_timeout: + # Just above the one second minimum for signal.alarm + c.settimeout(1.5) + with self.assertRaises(ZeroDivisionError): + signal.alarm(1) + c.sendall(b"x" * (1024**2)) + if with_timeout: + signal.signal(signal.SIGALRM, ok_handler) + signal.alarm(1) + self.assertRaises(socket.timeout, c.sendall, b"x" * (1024**2)) + finally: + signal.signal(signal.SIGALRM, old_alarm) + c.close() + s.close() + + def test_sendall_interrupted(self): + self.check_sendall_interrupted(False) + + def test_sendall_interrupted_with_timeout(self): + self.check_sendall_interrupted(True) + + +@unittest.skipUnless(thread, 'Threading required for this test.') +class BasicTCPTest(SocketConnectedTest): + + def __init__(self, methodName='runTest'): + SocketConnectedTest.__init__(self, methodName=methodName) + + def testRecv(self): + # Testing large receive over TCP + msg = self.cli_conn.recv(1024) + self.assertEqual(msg, MSG) + + def _testRecv(self): + self.serv_conn.send(MSG) + + def testOverFlowRecv(self): + # Testing receive in chunks over TCP + seg1 = self.cli_conn.recv(len(MSG) - 3) + seg2 = self.cli_conn.recv(1024) + msg = seg1 + seg2 + self.assertEqual(msg, MSG) + + def _testOverFlowRecv(self): + self.serv_conn.send(MSG) + + def testRecvFrom(self): + # Testing large recvfrom() over TCP + msg, addr = self.cli_conn.recvfrom(1024) + self.assertEqual(msg, MSG) + + def _testRecvFrom(self): + self.serv_conn.send(MSG) + + def testOverFlowRecvFrom(self): + # Testing recvfrom() in chunks over TCP + seg1, addr = self.cli_conn.recvfrom(len(MSG)-3) + seg2, addr = self.cli_conn.recvfrom(1024) + msg = seg1 + seg2 + self.assertEqual(msg, MSG) + + def _testOverFlowRecvFrom(self): + self.serv_conn.send(MSG) + + def testSendAll(self): + # Testing sendall() with a 2048 byte string over TCP + msg = '' + while 1: + read = self.cli_conn.recv(1024) + if not read: + break + msg += read + self.assertEqual(msg, 'f' * 2048) + + def _testSendAll(self): + big_chunk = 'f' * 2048 + self.serv_conn.sendall(big_chunk) + + def testFromFd(self): + # Testing fromfd() + if not hasattr(socket, "fromfd"): + return # On Windows, this doesn't exist + fd = self.cli_conn.fileno() + sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) + self.addCleanup(sock.close) + msg = sock.recv(1024) + self.assertEqual(msg, MSG) + + def _testFromFd(self): + self.serv_conn.send(MSG) + + def testDup(self): + # Testing dup() + sock = self.cli_conn.dup() + self.addCleanup(sock.close) + msg = sock.recv(1024) + self.assertEqual(msg, MSG) + + def _testDup(self): + self.serv_conn.send(MSG) + + def testShutdown(self): + # Testing shutdown() + msg = self.cli_conn.recv(1024) + self.assertEqual(msg, MSG) + # wait for _testShutdown to finish: on OS X, when the server + # closes the connection the client also becomes disconnected, + # and the client's shutdown call will fail. (Issue #4397.) + self.done.wait() + + def _testShutdown(self): + self.serv_conn.send(MSG) + self.serv_conn.shutdown(2) + +@unittest.skipUnless(thread, 'Threading required for this test.') +class BasicUDPTest(ThreadedUDPSocketTest): + + def __init__(self, methodName='runTest'): + ThreadedUDPSocketTest.__init__(self, methodName=methodName) + + def testSendtoAndRecv(self): + # Testing sendto() and Recv() over UDP + msg = self.serv.recv(len(MSG)) + self.assertEqual(msg, MSG) + + def _testSendtoAndRecv(self): + self.cli.sendto(MSG, 0, (HOST, self.port)) + + def testRecvFrom(self): + # Testing recvfrom() over UDP + msg, addr = self.serv.recvfrom(len(MSG)) + self.assertEqual(msg, MSG) + + def _testRecvFrom(self): + self.cli.sendto(MSG, 0, (HOST, self.port)) + + def testRecvFromNegative(self): + # Negative lengths passed to recvfrom should give ValueError. + self.assertRaises(ValueError, self.serv.recvfrom, -1) + + def _testRecvFromNegative(self): + self.cli.sendto(MSG, 0, (HOST, self.port)) + +@unittest.skipUnless(thread, 'Threading required for this test.') +class TCPCloserTest(ThreadedTCPSocketTest): + + def testClose(self): + conn, addr = self.serv.accept() + conn.close() + + sd = self.cli + read, write, err = select.select([sd], [], [], 1.0) + self.assertEqual(read, [sd]) + self.assertEqual(sd.recv(1), '') + + def _testClose(self): + self.cli.connect((HOST, self.port)) + time.sleep(1.0) + +@unittest.skipUnless(thread, 'Threading required for this test.') +class BasicSocketPairTest(SocketPairTest): + + def __init__(self, methodName='runTest'): + SocketPairTest.__init__(self, methodName=methodName) + + def testRecv(self): + msg = self.serv.recv(1024) + self.assertEqual(msg, MSG) + + def _testRecv(self): + self.cli.send(MSG) + + def testSend(self): + self.serv.send(MSG) + + def _testSend(self): + msg = self.cli.recv(1024) + self.assertEqual(msg, MSG) + +@unittest.skipUnless(thread, 'Threading required for this test.') +class NonBlockingTCPTests(ThreadedTCPSocketTest): + + def __init__(self, methodName='runTest'): + ThreadedTCPSocketTest.__init__(self, methodName=methodName) + + def testSetBlocking(self): + # Testing whether set blocking works + self.serv.setblocking(0) + start = time.time() + try: + self.serv.accept() + except socket.error: + pass + end = time.time() + self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.") + + def _testSetBlocking(self): + pass + + def testAccept(self): + # Testing non-blocking accept + self.serv.setblocking(0) + try: + conn, addr = self.serv.accept() + except socket.error: + pass + else: + self.fail("Error trying to do non-blocking accept.") + read, write, err = select.select([self.serv], [], []) + if self.serv in read: + conn, addr = self.serv.accept() + conn.close() + else: + self.fail("Error trying to do accept after select.") + + def _testAccept(self): + time.sleep(0.1) + self.cli.connect((HOST, self.port)) + + def testConnect(self): + # Testing non-blocking connect + conn, addr = self.serv.accept() + conn.close() + + def _testConnect(self): + self.cli.settimeout(10) + self.cli.connect((HOST, self.port)) + + def testRecv(self): + # Testing non-blocking recv + conn, addr = self.serv.accept() + conn.setblocking(0) + try: + msg = conn.recv(len(MSG)) + except socket.error: + pass + else: + self.fail("Error trying to do non-blocking recv.") + read, write, err = select.select([conn], [], []) + if conn in read: + msg = conn.recv(len(MSG)) + conn.close() + self.assertEqual(msg, MSG) + else: + self.fail("Error during select call to non-blocking socket.") + + def _testRecv(self): + self.cli.connect((HOST, self.port)) + time.sleep(0.1) + self.cli.send(MSG) + +@unittest.skipUnless(thread, 'Threading required for this test.') +class FileObjectClassTestCase(SocketConnectedTest): + + bufsize = -1 # Use default buffer size + + def __init__(self, methodName='runTest'): + SocketConnectedTest.__init__(self, methodName=methodName) + + def setUp(self): + SocketConnectedTest.setUp(self) + self.serv_file = self.cli_conn.makefile('rb', self.bufsize) + + def tearDown(self): + self.serv_file.close() + self.assertTrue(self.serv_file.closed) + self.serv_file = None + SocketConnectedTest.tearDown(self) + + def clientSetUp(self): + SocketConnectedTest.clientSetUp(self) + self.cli_file = self.serv_conn.makefile('wb') + + def clientTearDown(self): + self.cli_file.close() + self.assertTrue(self.cli_file.closed) + self.cli_file = None + SocketConnectedTest.clientTearDown(self) + + def testSmallRead(self): + # Performing small file read test + first_seg = self.serv_file.read(len(MSG)-3) + second_seg = self.serv_file.read(3) + msg = first_seg + second_seg + self.assertEqual(msg, MSG) + + def _testSmallRead(self): + self.cli_file.write(MSG) + self.cli_file.flush() + + def testFullRead(self): + # read until EOF + msg = self.serv_file.read() + self.assertEqual(msg, MSG) + + def _testFullRead(self): + self.cli_file.write(MSG) + self.cli_file.close() + + def testUnbufferedRead(self): + # Performing unbuffered file read test + buf = '' + while 1: + char = self.serv_file.read(1) + if not char: + break + buf += char + self.assertEqual(buf, MSG) + + def _testUnbufferedRead(self): + self.cli_file.write(MSG) + self.cli_file.flush() + + def testReadline(self): + # Performing file readline test + line = self.serv_file.readline() + self.assertEqual(line, MSG) + + def _testReadline(self): + self.cli_file.write(MSG) + self.cli_file.flush() + + def testReadlineAfterRead(self): + a_baloo_is = self.serv_file.read(len("A baloo is")) + self.assertEqual("A baloo is", a_baloo_is) + _a_bear = self.serv_file.read(len(" a bear")) + self.assertEqual(" a bear", _a_bear) + line = self.serv_file.readline() + self.assertEqual("\n", line) + line = self.serv_file.readline() + self.assertEqual("A BALOO IS A BEAR.\n", line) + line = self.serv_file.readline() + self.assertEqual(MSG, line) + + def _testReadlineAfterRead(self): + self.cli_file.write("A baloo is a bear\n") + self.cli_file.write("A BALOO IS A BEAR.\n") + self.cli_file.write(MSG) + self.cli_file.flush() + + def testReadlineAfterReadNoNewline(self): + end_of_ = self.serv_file.read(len("End Of ")) + self.assertEqual("End Of ", end_of_) + line = self.serv_file.readline() + self.assertEqual("Line", line) + + def _testReadlineAfterReadNoNewline(self): + self.cli_file.write("End Of Line") + + def testClosedAttr(self): + self.assertTrue(not self.serv_file.closed) + + def _testClosedAttr(self): + self.assertTrue(not self.cli_file.closed) + + +class FileObjectInterruptedTestCase(unittest.TestCase): + """Test that the file object correctly handles EINTR internally.""" + + class MockSocket(object): + def __init__(self, recv_funcs=()): + # A generator that returns callables that we'll call for each + # call to recv(). + self._recv_step = iter(recv_funcs) + + def recv(self, size): + return self._recv_step.next()() + + @staticmethod + def _raise_eintr(): + raise socket.error(errno.EINTR) + + def _test_readline(self, size=-1, **kwargs): + mock_sock = self.MockSocket(recv_funcs=[ + lambda : "This is the first line\nAnd the sec", + self._raise_eintr, + lambda : "ond line is here\n", + lambda : "", + ]) + fo = socket._fileobject(mock_sock, **kwargs) + self.assertEqual(fo.readline(size), "This is the first line\n") + self.assertEqual(fo.readline(size), "And the second line is here\n") + + def _test_read(self, size=-1, **kwargs): + mock_sock = self.MockSocket(recv_funcs=[ + lambda : "This is the first line\nAnd the sec", + self._raise_eintr, + lambda : "ond line is here\n", + lambda : "", + ]) + fo = socket._fileobject(mock_sock, **kwargs) + self.assertEqual(fo.read(size), "This is the first line\n" + "And the second line is here\n") + + def test_default(self): + self._test_readline() + self._test_readline(size=100) + self._test_read() + self._test_read(size=100) + + def test_with_1k_buffer(self): + self._test_readline(bufsize=1024) + self._test_readline(size=100, bufsize=1024) + self._test_read(bufsize=1024) + self._test_read(size=100, bufsize=1024) + + def _test_readline_no_buffer(self, size=-1): + mock_sock = self.MockSocket(recv_funcs=[ + lambda : "aa", + lambda : "\n", + lambda : "BB", + self._raise_eintr, + lambda : "bb", + lambda : "", + ]) + fo = socket._fileobject(mock_sock, bufsize=0) + self.assertEqual(fo.readline(size), "aa\n") + self.assertEqual(fo.readline(size), "BBbb") + + def test_no_buffer(self): + self._test_readline_no_buffer() + self._test_readline_no_buffer(size=4) + self._test_read(bufsize=0) + self._test_read(size=100, bufsize=0) + + +class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase): + + """Repeat the tests from FileObjectClassTestCase with bufsize==0. + + In this case (and in this case only), it should be possible to + create a file object, read a line from it, create another file + object, read another line from it, without loss of data in the + first file object's buffer. Note that httplib relies on this + when reading multiple requests from the same socket.""" + + bufsize = 0 # Use unbuffered mode + + def testUnbufferedReadline(self): + # Read a line, create a new file object, read another line with it + line = self.serv_file.readline() # first line + self.assertEqual(line, "A. " + MSG) # first line + self.serv_file = self.cli_conn.makefile('rb', 0) + line = self.serv_file.readline() # second line + self.assertEqual(line, "B. " + MSG) # second line + + def _testUnbufferedReadline(self): + self.cli_file.write("A. " + MSG) + self.cli_file.write("B. " + MSG) + self.cli_file.flush() + +class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase): + + bufsize = 1 # Default-buffered for reading; line-buffered for writing + + +class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase): + + bufsize = 2 # Exercise the buffering code + + +class NetworkConnectionTest(object): + """Prove network connection.""" + def clientSetUp(self): + # We're inherited below by BasicTCPTest2, which also inherits + # BasicTCPTest, which defines self.port referenced below. + self.cli = socket.create_connection((HOST, self.port)) + self.serv_conn = self.cli + +class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest): + """Tests that NetworkConnection does not break existing TCP functionality. + """ + +class NetworkConnectionNoServer(unittest.TestCase): + class MockSocket(socket.socket): + def connect(self, *args): + raise socket.timeout('timed out') + + @contextlib.contextmanager + def mocked_socket_module(self): + """Return a socket which times out on connect""" + old_socket = socket.socket + socket.socket = self.MockSocket + try: + yield + finally: + socket.socket = old_socket + + def test_connect(self): + port = test_support.find_unused_port() + cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.addCleanup(cli.close) + with self.assertRaises(socket.error) as cm: + cli.connect((HOST, port)) + self.assertEqual(cm.exception.errno, errno.ECONNREFUSED) + + def test_create_connection(self): + # Issue #9792: errors raised by create_connection() should have + # a proper errno attribute. + port = test_support.find_unused_port() + with self.assertRaises(socket.error) as cm: + socket.create_connection((HOST, port)) + self.assertEqual(cm.exception.errno, errno.ECONNREFUSED) + + def test_create_connection_timeout(self): + # Issue #9792: create_connection() should not recast timeout errors + # as generic socket errors. + with self.mocked_socket_module(): + with self.assertRaises(socket.timeout): + socket.create_connection((HOST, 1234)) + + +@unittest.skipUnless(thread, 'Threading required for this test.') +class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest): + + def __init__(self, methodName='runTest'): + SocketTCPTest.__init__(self, methodName=methodName) + ThreadableTest.__init__(self) + + def clientSetUp(self): + self.source_port = test_support.find_unused_port() + + def clientTearDown(self): + self.cli.close() + self.cli = None + ThreadableTest.clientTearDown(self) + + def _justAccept(self): + conn, addr = self.serv.accept() + conn.close() + + testFamily = _justAccept + def _testFamily(self): + self.cli = socket.create_connection((HOST, self.port), timeout=30) + self.addCleanup(self.cli.close) + self.assertEqual(self.cli.family, 2) + + testSourceAddress = _justAccept + def _testSourceAddress(self): + self.cli = socket.create_connection((HOST, self.port), timeout=30, + source_address=('', self.source_port)) + self.addCleanup(self.cli.close) + self.assertEqual(self.cli.getsockname()[1], self.source_port) + # The port number being used is sufficient to show that the bind() + # call happened. + + testTimeoutDefault = _justAccept + def _testTimeoutDefault(self): + # passing no explicit timeout uses socket's global default + self.assertTrue(socket.getdefaulttimeout() is None) + socket.setdefaulttimeout(42) + try: + self.cli = socket.create_connection((HOST, self.port)) + self.addCleanup(self.cli.close) + finally: + socket.setdefaulttimeout(None) + self.assertEqual(self.cli.gettimeout(), 42) + + testTimeoutNone = _justAccept + def _testTimeoutNone(self): + # None timeout means the same as sock.settimeout(None) + self.assertTrue(socket.getdefaulttimeout() is None) + socket.setdefaulttimeout(30) + try: + self.cli = socket.create_connection((HOST, self.port), timeout=None) + self.addCleanup(self.cli.close) + finally: + socket.setdefaulttimeout(None) + self.assertEqual(self.cli.gettimeout(), None) + + testTimeoutValueNamed = _justAccept + def _testTimeoutValueNamed(self): + self.cli = socket.create_connection((HOST, self.port), timeout=30) + self.assertEqual(self.cli.gettimeout(), 30) + + testTimeoutValueNonamed = _justAccept + def _testTimeoutValueNonamed(self): + self.cli = socket.create_connection((HOST, self.port), 30) + self.addCleanup(self.cli.close) + self.assertEqual(self.cli.gettimeout(), 30) + +@unittest.skipUnless(thread, 'Threading required for this test.') +class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest): + + def __init__(self, methodName='runTest'): + SocketTCPTest.__init__(self, methodName=methodName) + ThreadableTest.__init__(self) + + def clientSetUp(self): + pass + + def clientTearDown(self): + self.cli.close() + self.cli = None + ThreadableTest.clientTearDown(self) + + def testInsideTimeout(self): + conn, addr = self.serv.accept() + self.addCleanup(conn.close) + time.sleep(3) + conn.send("done!") + testOutsideTimeout = testInsideTimeout + + def _testInsideTimeout(self): + self.cli = sock = socket.create_connection((HOST, self.port)) + data = sock.recv(5) + self.assertEqual(data, "done!") + + def _testOutsideTimeout(self): + self.cli = sock = socket.create_connection((HOST, self.port), timeout=1) + self.assertRaises(socket.timeout, lambda: sock.recv(5)) + + +class Urllib2FileobjectTest(unittest.TestCase): + + # urllib2.HTTPHandler has "borrowed" socket._fileobject, and requires that + # it close the socket if the close c'tor argument is true + + def testClose(self): + class MockSocket: + closed = False + def flush(self): pass + def close(self): self.closed = True + + # must not close unless we request it: the original use of _fileobject + # by module socket requires that the underlying socket not be closed until + # the _socketobject that created the _fileobject is closed + s = MockSocket() + f = socket._fileobject(s) + f.close() + self.assertTrue(not s.closed) + + s = MockSocket() + f = socket._fileobject(s, close=True) + f.close() + self.assertTrue(s.closed) + +class TCPTimeoutTest(SocketTCPTest): + + def testTCPTimeout(self): + def raise_timeout(*args, **kwargs): + self.serv.settimeout(1.0) + self.serv.accept() + self.assertRaises(socket.timeout, raise_timeout, + "Error generating a timeout exception (TCP)") + + def testTimeoutZero(self): + ok = False + try: + self.serv.settimeout(0.0) + foo = self.serv.accept() + except socket.timeout: + self.fail("caught timeout instead of error (TCP)") + except socket.error: + ok = True + except: + self.fail("caught unexpected exception (TCP)") + if not ok: + self.fail("accept() returned success when we did not expect it") + + def testInterruptedTimeout(self): + # XXX I don't know how to do this test on MSWindows or any other + # plaform that doesn't support signal.alarm() or os.kill(), though + # the bug should have existed on all platforms. + if not hasattr(signal, "alarm"): + return # can only test on *nix + self.serv.settimeout(5.0) # must be longer than alarm + class Alarm(Exception): + pass + def alarm_handler(signal, frame): + raise Alarm + old_alarm = signal.signal(signal.SIGALRM, alarm_handler) + try: + signal.alarm(2) # POSIX allows alarm to be up to 1 second early + try: + foo = self.serv.accept() + except socket.timeout: + self.fail("caught timeout instead of Alarm") + except Alarm: + pass + except: + self.fail("caught other exception instead of Alarm:" + " %s(%s):\n%s" % + (sys.exc_info()[:2] + (traceback.format_exc(),))) + else: + self.fail("nothing caught") + finally: + signal.alarm(0) # shut off alarm + except Alarm: + self.fail("got Alarm in wrong place") + finally: + # no alarm can be pending. Safe to restore old handler. + signal.signal(signal.SIGALRM, old_alarm) + +class UDPTimeoutTest(SocketTCPTest): + + def testUDPTimeout(self): + def raise_timeout(*args, **kwargs): + self.serv.settimeout(1.0) + self.serv.recv(1024) + self.assertRaises(socket.timeout, raise_timeout, + "Error generating a timeout exception (UDP)") + + def testTimeoutZero(self): + ok = False + try: + self.serv.settimeout(0.0) + foo = self.serv.recv(1024) + except socket.timeout: + self.fail("caught timeout instead of error (UDP)") + except socket.error: + ok = True + except: + self.fail("caught unexpected exception (UDP)") + if not ok: + self.fail("recv() returned success when we did not expect it") + +class TestExceptions(unittest.TestCase): + + def testExceptionTree(self): + self.assertTrue(issubclass(socket.error, Exception)) + self.assertTrue(issubclass(socket.herror, socket.error)) + self.assertTrue(issubclass(socket.gaierror, socket.error)) + self.assertTrue(issubclass(socket.timeout, socket.error)) + +class TestLinuxAbstractNamespace(unittest.TestCase): + + UNIX_PATH_MAX = 108 + + def testLinuxAbstractNamespace(self): + address = "\x00python-test-hello\x00\xff" + s1 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s1.bind(address) + s1.listen(1) + s2 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s2.connect(s1.getsockname()) + s1.accept() + self.assertEqual(s1.getsockname(), address) + self.assertEqual(s2.getpeername(), address) + + def testMaxName(self): + address = "\x00" + "h" * (self.UNIX_PATH_MAX - 1) + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s.bind(address) + self.assertEqual(s.getsockname(), address) + + def testNameOverflow(self): + address = "\x00" + "h" * self.UNIX_PATH_MAX + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.assertRaises(socket.error, s.bind, address) + + +@unittest.skipUnless(thread, 'Threading required for this test.') +class BufferIOTest(SocketConnectedTest): + """ + Test the buffer versions of socket.recv() and socket.send(). + """ + def __init__(self, methodName='runTest'): + SocketConnectedTest.__init__(self, methodName=methodName) + + def testRecvIntoArray(self): + buf = array.array('c', ' '*1024) + nbytes = self.cli_conn.recv_into(buf) + self.assertEqual(nbytes, len(MSG)) + msg = buf.tostring()[:len(MSG)] + self.assertEqual(msg, MSG) + + def _testRecvIntoArray(self): + with test_support.check_py3k_warnings(): + buf = buffer(MSG) + self.serv_conn.send(buf) + + def testRecvIntoBytearray(self): + buf = bytearray(1024) + nbytes = self.cli_conn.recv_into(buf) + self.assertEqual(nbytes, len(MSG)) + msg = buf[:len(MSG)] + self.assertEqual(msg, MSG) + + _testRecvIntoBytearray = _testRecvIntoArray + + def testRecvIntoMemoryview(self): + buf = bytearray(1024) + nbytes = self.cli_conn.recv_into(memoryview(buf)) + self.assertEqual(nbytes, len(MSG)) + msg = buf[:len(MSG)] + self.assertEqual(msg, MSG) + + _testRecvIntoMemoryview = _testRecvIntoArray + + def testRecvFromIntoArray(self): + buf = array.array('c', ' '*1024) + nbytes, addr = self.cli_conn.recvfrom_into(buf) + self.assertEqual(nbytes, len(MSG)) + msg = buf.tostring()[:len(MSG)] + self.assertEqual(msg, MSG) + + def _testRecvFromIntoArray(self): + with test_support.check_py3k_warnings(): + buf = buffer(MSG) + self.serv_conn.send(buf) + + def testRecvFromIntoBytearray(self): + buf = bytearray(1024) + nbytes, addr = self.cli_conn.recvfrom_into(buf) + self.assertEqual(nbytes, len(MSG)) + msg = buf[:len(MSG)] + self.assertEqual(msg, MSG) + + _testRecvFromIntoBytearray = _testRecvFromIntoArray + + def testRecvFromIntoMemoryview(self): + buf = bytearray(1024) + nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf)) + self.assertEqual(nbytes, len(MSG)) + msg = buf[:len(MSG)] + self.assertEqual(msg, MSG) + + _testRecvFromIntoMemoryview = _testRecvFromIntoArray + + +TIPC_STYPE = 2000 +TIPC_LOWER = 200 +TIPC_UPPER = 210 + +def isTipcAvailable(): + """Check if the TIPC module is loaded + + The TIPC module is not loaded automatically on Ubuntu and probably + other Linux distros. + """ + if not hasattr(socket, "AF_TIPC"): + return False + if not os.path.isfile("/proc/modules"): + return False + with open("/proc/modules") as f: + for line in f: + if line.startswith("tipc "): + return True + if test_support.verbose: + print "TIPC module is not loaded, please 'sudo modprobe tipc'" + return False + +class TIPCTest (unittest.TestCase): + def testRDM(self): + srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) + cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) + + srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE, + TIPC_LOWER, TIPC_UPPER) + srv.bind(srvaddr) + + sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE, + TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0) + cli.sendto(MSG, sendaddr) + + msg, recvaddr = srv.recvfrom(1024) + + self.assertEqual(cli.getsockname(), recvaddr) + self.assertEqual(msg, MSG) + + +class TIPCThreadableTest (unittest.TestCase, ThreadableTest): + def __init__(self, methodName = 'runTest'): + unittest.TestCase.__init__(self, methodName = methodName) + ThreadableTest.__init__(self) + + def setUp(self): + self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM) + self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE, + TIPC_LOWER, TIPC_UPPER) + self.srv.bind(srvaddr) + self.srv.listen(5) + self.serverExplicitReady() + self.conn, self.connaddr = self.srv.accept() + + def clientSetUp(self): + # The is a hittable race between serverExplicitReady() and the + # accept() call; sleep a little while to avoid it, otherwise + # we could get an exception + time.sleep(0.1) + self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM) + addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE, + TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0) + self.cli.connect(addr) + self.cliaddr = self.cli.getsockname() + + def testStream(self): + msg = self.conn.recv(1024) + self.assertEqual(msg, MSG) + self.assertEqual(self.cliaddr, self.connaddr) + + def _testStream(self): + self.cli.send(MSG) + self.cli.close() + + +def test_main(): + tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest, + TestExceptions, BufferIOTest, BasicTCPTest2, BasicUDPTest, + UDPTimeoutTest ] + + tests.extend([ + NonBlockingTCPTests, + FileObjectClassTestCase, + FileObjectInterruptedTestCase, + UnbufferedFileObjectClassTestCase, + LineBufferedFileObjectClassTestCase, + SmallBufferedFileObjectClassTestCase, + Urllib2FileobjectTest, + NetworkConnectionNoServer, + NetworkConnectionAttributesTest, + NetworkConnectionBehaviourTest, + ]) + if hasattr(socket, "socketpair"): + tests.append(BasicSocketPairTest) + if sys.platform == 'linux2': + tests.append(TestLinuxAbstractNamespace) + if isTipcAvailable(): + tests.append(TIPCTest) + tests.append(TIPCThreadableTest) + + thread_info = test_support.threading_setup() + test_support.run_unittest(*tests) + test_support.threading_cleanup(*thread_info) + +if __name__ == "__main__": + test_main() |
