2009-06-29 09:31:43 +09:00
|
|
|
#!/usr/bin/env python
|
|
|
|
|
2019-01-25 20:52:57 +09:00
|
|
|
from collections import OrderedDict
|
|
|
|
from io import BytesIO
|
2012-09-21 14:15:30 +09:00
|
|
|
import struct
|
2019-12-03 20:53:11 +09:00
|
|
|
import sys
|
2019-01-25 20:52:57 +09:00
|
|
|
|
|
|
|
import pytest
|
2012-12-29 11:24:25 +09:00
|
|
|
from pytest import raises, xfail
|
2009-06-29 09:31:43 +09:00
|
|
|
|
2018-11-09 20:55:13 +09:00
|
|
|
from msgpack import packb, unpackb, Unpacker, Packer, pack
|
2009-06-29 09:31:43 +09:00
|
|
|
|
2011-08-22 01:52:45 +09:00
|
|
|
|
2012-09-24 02:12:55 +09:00
|
|
|
def check(data, use_list=False):
|
2019-12-06 22:23:15 +09:00
|
|
|
re = unpackb(packb(data), use_list=use_list, strict_map_key=False)
|
2012-12-29 11:24:25 +09:00
|
|
|
assert re == data
|
2009-06-29 09:31:43 +09:00
|
|
|
|
2019-12-05 18:51:45 +09:00
|
|
|
|
2009-06-29 09:31:43 +09:00
|
|
|
def testPack():
|
|
|
|
test_data = [
|
2019-12-05 18:51:45 +09:00
|
|
|
0,
|
|
|
|
1,
|
|
|
|
127,
|
|
|
|
128,
|
|
|
|
255,
|
|
|
|
256,
|
|
|
|
65535,
|
|
|
|
65536,
|
|
|
|
4294967295,
|
|
|
|
4294967296,
|
|
|
|
-1,
|
|
|
|
-32,
|
|
|
|
-33,
|
|
|
|
-128,
|
|
|
|
-129,
|
|
|
|
-32768,
|
|
|
|
-32769,
|
|
|
|
-4294967296,
|
|
|
|
-4294967297,
|
|
|
|
1.0,
|
|
|
|
b"",
|
|
|
|
b"a",
|
|
|
|
b"a" * 31,
|
|
|
|
b"a" * 32,
|
|
|
|
None,
|
|
|
|
True,
|
|
|
|
False,
|
|
|
|
(),
|
|
|
|
((),),
|
2020-02-06 22:11:04 +09:00
|
|
|
((), None),
|
2011-04-15 17:36:17 +03:00
|
|
|
{None: 0},
|
2019-12-05 18:51:45 +09:00
|
|
|
(1 << 23),
|
|
|
|
]
|
2009-06-29 09:31:43 +09:00
|
|
|
for td in test_data:
|
|
|
|
check(td)
|
|
|
|
|
2019-12-05 18:51:45 +09:00
|
|
|
|
2011-04-15 17:36:17 +03:00
|
|
|
def testPackUnicode():
|
2014-02-15 22:36:52 +09:00
|
|
|
test_data = ["", "abcd", ["defgh"], "Русский текст"]
|
2011-04-15 17:36:17 +03:00
|
|
|
for td in test_data:
|
2018-01-12 19:22:36 +09:00
|
|
|
re = unpackb(packb(td), use_list=1, raw=False)
|
2012-12-29 11:24:25 +09:00
|
|
|
assert re == td
|
2018-01-11 17:02:41 +09:00
|
|
|
packer = Packer()
|
2011-08-22 01:52:45 +09:00
|
|
|
data = packer.pack(td)
|
2018-01-12 19:22:36 +09:00
|
|
|
re = Unpacker(BytesIO(data), raw=False, use_list=1).unpack()
|
2012-12-29 11:24:25 +09:00
|
|
|
assert re == td
|
2011-04-15 17:36:17 +03:00
|
|
|
|
2019-12-05 18:51:45 +09:00
|
|
|
|
2011-04-15 17:36:17 +03:00
|
|
|
def testPackBytes():
|
2020-02-06 22:11:04 +09:00
|
|
|
test_data = [b"", b"abcd", (b"defgh",)]
|
2011-04-15 17:36:17 +03:00
|
|
|
for td in test_data:
|
2017-05-18 13:03:15 +02:00
|
|
|
check(td)
|
|
|
|
|
2019-12-05 18:51:45 +09:00
|
|
|
|
2017-05-18 13:03:15 +02:00
|
|
|
def testPackByteArrays():
|
2020-02-06 22:11:04 +09:00
|
|
|
test_data = [bytearray(b""), bytearray(b"abcd"), (bytearray(b"defgh"),)]
|
2017-05-18 13:03:15 +02:00
|
|
|
for td in test_data:
|
2011-04-15 17:36:17 +03:00
|
|
|
check(td)
|
|
|
|
|
2019-12-05 18:51:45 +09:00
|
|
|
|
2019-12-03 20:53:11 +09:00
|
|
|
def testIgnoreUnicodeErrors():
|
2023-05-23 18:41:08 +02:00
|
|
|
re = unpackb(packb(b"abc\xeddef", use_bin_type=False), raw=False, unicode_errors="ignore")
|
2019-12-03 20:53:11 +09:00
|
|
|
assert re == "abcdef"
|
|
|
|
|
2019-12-05 18:51:45 +09:00
|
|
|
|
2011-04-15 17:36:17 +03:00
|
|
|
def testStrictUnicodeUnpack():
|
2019-12-05 18:51:45 +09:00
|
|
|
packed = packb(b"abc\xeddef", use_bin_type=False)
|
2019-01-25 20:52:57 +09:00
|
|
|
with pytest.raises(UnicodeDecodeError):
|
|
|
|
unpackb(packed, raw=False, use_list=1)
|
2011-04-15 17:36:17 +03:00
|
|
|
|
2019-12-05 18:51:45 +09:00
|
|
|
|
2019-12-03 20:53:11 +09:00
|
|
|
def testIgnoreErrorsPack():
|
2019-12-05 18:51:45 +09:00
|
|
|
re = unpackb(
|
|
|
|
packb("abc\uDC80\uDCFFdef", use_bin_type=True, unicode_errors="ignore"),
|
|
|
|
raw=False,
|
|
|
|
use_list=1,
|
|
|
|
)
|
2019-12-03 20:53:11 +09:00
|
|
|
assert re == "abcdef"
|
|
|
|
|
2019-12-05 18:51:45 +09:00
|
|
|
|
2011-04-15 17:36:17 +03:00
|
|
|
def testDecodeBinary():
|
2019-12-03 20:53:11 +09:00
|
|
|
re = unpackb(packb(b"abc"), use_list=1)
|
2012-12-29 11:24:25 +09:00
|
|
|
assert re == b"abc"
|
2011-04-15 17:36:17 +03:00
|
|
|
|
2019-12-05 18:51:45 +09:00
|
|
|
|
2012-09-21 14:15:30 +09:00
|
|
|
def testPackFloat():
|
2023-05-23 18:41:08 +02:00
|
|
|
assert packb(1.0, use_single_float=True) == b"\xca" + struct.pack(">f", 1.0)
|
|
|
|
assert packb(1.0, use_single_float=False) == b"\xcb" + struct.pack(">d", 1.0)
|
2019-12-05 18:51:45 +09:00
|
|
|
|
2012-09-21 14:15:30 +09:00
|
|
|
|
2012-09-23 17:26:16 +10:00
|
|
|
def testArraySize(sizes=[0, 5, 50, 1000]):
|
2014-02-15 22:16:01 +09:00
|
|
|
bio = BytesIO()
|
2012-09-23 17:26:16 +10:00
|
|
|
packer = Packer()
|
|
|
|
for size in sizes:
|
|
|
|
bio.write(packer.pack_array_header(size))
|
|
|
|
for i in range(size):
|
|
|
|
bio.write(packer.pack(i))
|
|
|
|
|
|
|
|
bio.seek(0)
|
2012-10-01 01:34:58 +09:00
|
|
|
unpacker = Unpacker(bio, use_list=1)
|
2012-09-23 17:26:16 +10:00
|
|
|
for size in sizes:
|
2012-10-01 01:34:58 +09:00
|
|
|
assert unpacker.unpack() == list(range(size))
|
2012-09-23 17:26:16 +10:00
|
|
|
|
2019-12-05 18:51:45 +09:00
|
|
|
|
2012-12-10 21:47:18 +09:00
|
|
|
def test_manualreset(sizes=[0, 5, 50, 1000]):
|
|
|
|
packer = Packer(autoreset=False)
|
|
|
|
for size in sizes:
|
|
|
|
packer.pack_array_header(size)
|
|
|
|
for i in range(size):
|
|
|
|
packer.pack(i)
|
|
|
|
|
2014-02-15 22:16:01 +09:00
|
|
|
bio = BytesIO(packer.bytes())
|
2012-12-10 21:47:18 +09:00
|
|
|
unpacker = Unpacker(bio, use_list=1)
|
|
|
|
for size in sizes:
|
|
|
|
assert unpacker.unpack() == list(range(size))
|
|
|
|
|
|
|
|
packer.reset()
|
2019-12-05 18:51:45 +09:00
|
|
|
assert packer.bytes() == b""
|
|
|
|
|
2012-12-10 21:47:18 +09:00
|
|
|
|
2012-09-23 17:26:16 +10:00
|
|
|
def testMapSize(sizes=[0, 5, 50, 1000]):
|
2014-02-15 22:16:01 +09:00
|
|
|
bio = BytesIO()
|
2012-09-23 17:26:16 +10:00
|
|
|
packer = Packer()
|
|
|
|
for size in sizes:
|
|
|
|
bio.write(packer.pack_map_header(size))
|
|
|
|
for i in range(size):
|
2019-12-05 18:51:45 +09:00
|
|
|
bio.write(packer.pack(i)) # key
|
|
|
|
bio.write(packer.pack(i * 2)) # value
|
2012-09-23 17:26:16 +10:00
|
|
|
|
|
|
|
bio.seek(0)
|
2019-12-06 22:23:15 +09:00
|
|
|
unpacker = Unpacker(bio, strict_map_key=False)
|
2012-09-23 17:26:16 +10:00
|
|
|
for size in sizes:
|
2023-05-23 18:41:08 +02:00
|
|
|
assert unpacker.unpack() == {i: i * 2 for i in range(size)}
|
2012-09-23 17:26:16 +10:00
|
|
|
|
|
|
|
|
2012-09-23 02:13:32 +09:00
|
|
|
def test_odict():
|
2019-12-05 18:51:45 +09:00
|
|
|
seq = [(b"one", 1), (b"two", 2), (b"three", 3), (b"four", 4)]
|
2017-10-12 10:27:39 +03:00
|
|
|
od = OrderedDict(seq)
|
2012-12-29 11:24:25 +09:00
|
|
|
assert unpackb(packb(od), use_list=1) == dict(seq)
|
2019-12-05 18:51:45 +09:00
|
|
|
|
2012-09-23 19:37:28 +10:00
|
|
|
def pair_hook(seq):
|
2013-02-03 00:20:00 +09:00
|
|
|
return list(seq)
|
2019-12-05 18:51:45 +09:00
|
|
|
|
2012-12-29 11:24:25 +09:00
|
|
|
assert unpackb(packb(od), object_pairs_hook=pair_hook, use_list=1) == seq
|
2012-09-23 02:13:32 +09:00
|
|
|
|
|
|
|
|
2012-12-10 21:26:41 +09:00
|
|
|
def test_pairlist():
|
2019-12-05 18:51:45 +09:00
|
|
|
pairlist = [(b"a", 1), (2, b"b"), (b"foo", b"bar")]
|
2012-12-10 21:26:41 +09:00
|
|
|
packer = Packer()
|
|
|
|
packed = packer.pack_map_pairs(pairlist)
|
2019-12-06 22:23:15 +09:00
|
|
|
unpacked = unpackb(packed, object_pairs_hook=list, strict_map_key=False)
|
2012-12-10 21:26:41 +09:00
|
|
|
assert pairlist == unpacked
|
2018-11-09 20:55:13 +09:00
|
|
|
|
2019-12-05 18:51:45 +09:00
|
|
|
|
2018-11-09 20:55:13 +09:00
|
|
|
def test_get_buffer():
|
|
|
|
packer = Packer(autoreset=0, use_bin_type=True)
|
|
|
|
packer.pack([1, 2])
|
|
|
|
strm = BytesIO()
|
|
|
|
strm.write(packer.getbuffer())
|
|
|
|
written = strm.getvalue()
|
|
|
|
|
|
|
|
expected = packb([1, 2], use_bin_type=True)
|
|
|
|
assert written == expected
|