mirror of
https://github.com/python/cpython.git
synced 2026-01-04 06:22:20 +00:00
Add tests for Bluetooth RFCOMM, HCI and SCO (GH-132023)
This commit is contained in:
parent
6ab4acecf9
commit
2ccd6aae4d
1 changed files with 102 additions and 0 deletions
|
|
@ -1,4 +1,5 @@
|
|||
import unittest
|
||||
from unittest import mock
|
||||
from test import support
|
||||
from test.support import (
|
||||
is_apple, os_helper, refleak_helper, socket_helper, threading_helper
|
||||
|
|
@ -2670,6 +2671,107 @@ def testBindBrEdrL2capSocket(self):
|
|||
addr = f.getsockname()
|
||||
self.assertEqual(addr, (socket.BDADDR_ANY, psm))
|
||||
|
||||
@unittest.skipUnless(HAVE_SOCKET_BLUETOOTH_L2CAP, 'Bluetooth L2CAP sockets required for this test')
|
||||
def testBadL2capAddr(self):
|
||||
with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) as f:
|
||||
with self.assertRaises(OSError):
|
||||
f.bind((socket.BDADDR_ANY, 0, 0, socket.BDADDR_BREDR, 0))
|
||||
with self.assertRaises(OSError):
|
||||
f.bind((socket.BDADDR_ANY,))
|
||||
with self.assertRaises(OSError):
|
||||
f.bind(socket.BDADDR_ANY)
|
||||
with self.assertRaises(OSError):
|
||||
f.bind((socket.BDADDR_ANY.encode(), 0x1001))
|
||||
|
||||
def testBindRfcommSocket(self):
|
||||
with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) as s:
|
||||
channel = 0
|
||||
try:
|
||||
s.bind((socket.BDADDR_ANY, channel))
|
||||
except OSError as err:
|
||||
if sys.platform == 'win32' and err.winerror == 10050:
|
||||
self.skipTest(str(err))
|
||||
raise
|
||||
addr = s.getsockname()
|
||||
self.assertEqual(addr, (mock.ANY, channel))
|
||||
self.assertRegex(addr[0], r'(?i)[0-9a-f]{2}(?::[0-9a-f]{2}){4}')
|
||||
if sys.platform != 'win32':
|
||||
self.assertEqual(addr, (socket.BDADDR_ANY, channel))
|
||||
with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) as s:
|
||||
s.bind(addr)
|
||||
addr2 = s.getsockname()
|
||||
self.assertEqual(addr2, addr)
|
||||
|
||||
def testBadRfcommAddr(self):
|
||||
with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) as s:
|
||||
channel = 0
|
||||
with self.assertRaises(OSError):
|
||||
s.bind((socket.BDADDR_ANY.encode(), channel))
|
||||
with self.assertRaises(OSError):
|
||||
s.bind((socket.BDADDR_ANY,))
|
||||
with self.assertRaises(OSError):
|
||||
s.bind((socket.BDADDR_ANY, channel, 0))
|
||||
with self.assertRaises(OSError):
|
||||
s.bind((socket.BDADDR_ANY + '\0', channel))
|
||||
with self.assertRaises(OSError):
|
||||
s.bind(('invalid', channel))
|
||||
|
||||
@unittest.skipUnless(hasattr(socket, 'BTPROTO_HCI'), 'Bluetooth HCI sockets required for this test')
|
||||
def testBindHciSocket(self):
|
||||
with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) as s:
|
||||
if sys.platform.startswith(('netbsd', 'dragonfly', 'freebsd')):
|
||||
s.bind(socket.BDADDR_ANY.encode())
|
||||
addr = s.getsockname()
|
||||
self.assertEqual(addr, socket.BDADDR_ANY)
|
||||
else:
|
||||
dev = 0
|
||||
s.bind((dev,))
|
||||
addr = s.getsockname()
|
||||
self.assertEqual(addr, dev)
|
||||
|
||||
@unittest.skipUnless(hasattr(socket, 'BTPROTO_HCI'), 'Bluetooth HCI sockets required for this test')
|
||||
def testBadHciAddr(self):
|
||||
with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) as s:
|
||||
if sys.platform.startswith(('netbsd', 'dragonfly', 'freebsd')):
|
||||
with self.assertRaises(OSError):
|
||||
s.bind(socket.BDADDR_ANY)
|
||||
with self.assertRaises(OSError):
|
||||
s.bind((socket.BDADDR_ANY.encode(),))
|
||||
if sys.platform.startswith('freebsd'):
|
||||
with self.assertRaises(ValueError):
|
||||
s.bind(socket.BDADDR_ANY.encode() + b'\0')
|
||||
with self.assertRaises(ValueError):
|
||||
s.bind(socket.BDADDR_ANY.encode() + b' '*100)
|
||||
with self.assertRaises(OSError):
|
||||
s.bind(b'invalid')
|
||||
else:
|
||||
dev = 0
|
||||
with self.assertRaises(OSError):
|
||||
s.bind(())
|
||||
with self.assertRaises(OSError):
|
||||
s.bind((dev, 0))
|
||||
with self.assertRaises(OSError):
|
||||
s.bind(dev)
|
||||
with self.assertRaises(OSError):
|
||||
s.bind(socket.BDADDR_ANY.encode())
|
||||
|
||||
@unittest.skipUnless(hasattr(socket, 'BTPROTO_SCO'), 'Bluetooth SCO sockets required for this test')
|
||||
def testBindScoSocket(self):
|
||||
with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO) as s:
|
||||
s.bind(socket.BDADDR_ANY.encode())
|
||||
addr = s.getsockname()
|
||||
self.assertEqual(addr, socket.BDADDR_ANY)
|
||||
|
||||
@unittest.skipUnless(hasattr(socket, 'BTPROTO_SCO'), 'Bluetooth SCO sockets required for this test')
|
||||
def testBadScoAddr(self):
|
||||
with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO) as s:
|
||||
with self.assertRaises(OSError):
|
||||
s.bind(socket.BDADDR_ANY)
|
||||
with self.assertRaises(OSError):
|
||||
s.bind((socket.BDADDR_ANY.encode(),))
|
||||
with self.assertRaises(OSError):
|
||||
s.bind(b'invalid')
|
||||
|
||||
|
||||
@unittest.skipUnless(HAVE_SOCKET_HYPERV,
|
||||
'Hyper-V sockets required for this test.')
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue