blacken test

This commit is contained in:
Inada Naoki 2019-12-05 18:51:45 +09:00
parent e557e17cbd
commit 10e5e39ff9
16 changed files with 501 additions and 334 deletions

View file

@ -6,27 +6,28 @@ from msgpack import packb, unpackb
def test_unpack_buffer(): def test_unpack_buffer():
from array import array from array import array
buf = array('b')
buf = array("b")
try: try:
buf.frombytes(packb((b'foo', b'bar'))) buf.frombytes(packb((b"foo", b"bar")))
except AttributeError: # PY2 except AttributeError: # PY2
buf.fromstring(packb((b'foo', b'bar'))) buf.fromstring(packb((b"foo", b"bar")))
obj = unpackb(buf, use_list=1) obj = unpackb(buf, use_list=1)
assert [b'foo', b'bar'] == obj assert [b"foo", b"bar"] == obj
def test_unpack_bytearray(): def test_unpack_bytearray():
buf = bytearray(packb(('foo', 'bar'))) buf = bytearray(packb(("foo", "bar")))
obj = unpackb(buf, use_list=1) obj = unpackb(buf, use_list=1)
assert [b'foo', b'bar'] == obj assert [b"foo", b"bar"] == obj
expected_type = bytes expected_type = bytes
assert all(type(s) == expected_type for s in obj) assert all(type(s) == expected_type for s in obj)
def test_unpack_memoryview(): def test_unpack_memoryview():
buf = bytearray(packb(('foo', 'bar'))) buf = bytearray(packb(("foo", "bar")))
view = memoryview(buf) view = memoryview(buf)
obj = unpackb(view, use_list=1) obj = unpackb(view, use_list=1)
assert [b'foo', b'bar'] == obj assert [b"foo", b"bar"] == obj
expected_type = bytes expected_type = bytes
assert all(type(s) == expected_type for s in obj) assert all(type(s) == expected_type for s in obj)

View file

@ -6,97 +6,133 @@ from msgpack import packb, unpackb
def check(length, obj): def check(length, obj):
v = packb(obj) v = packb(obj)
assert len(v) == length, \ assert len(v) == length, "%r length should be %r but get %r" % (obj, length, len(v))
"%r length should be %r but get %r" % (obj, length, len(v))
assert unpackb(v, use_list=0) == obj assert unpackb(v, use_list=0) == obj
def test_1(): def test_1():
for o in [None, True, False, 0, 1, (1 << 6), (1 << 7) - 1, -1, for o in [
-((1<<5)-1), -(1<<5)]: None,
True,
False,
0,
1,
(1 << 6),
(1 << 7) - 1,
-1,
-((1 << 5) - 1),
-(1 << 5),
]:
check(1, o) check(1, o)
def test_2(): def test_2():
for o in [1 << 7, (1 << 8) - 1, for o in [1 << 7, (1 << 8) - 1, -((1 << 5) + 1), -(1 << 7)]:
-((1<<5)+1), -(1<<7)
]:
check(2, o) check(2, o)
def test_3(): def test_3():
for o in [1 << 8, (1 << 16) - 1, for o in [1 << 8, (1 << 16) - 1, -((1 << 7) + 1), -(1 << 15)]:
-((1<<7)+1), -(1<<15)]:
check(3, o) check(3, o)
def test_5(): def test_5():
for o in [1 << 16, (1 << 32) - 1, for o in [1 << 16, (1 << 32) - 1, -((1 << 15) + 1), -(1 << 31)]:
-((1<<15)+1), -(1<<31)]:
check(5, o) check(5, o)
def test_9(): def test_9():
for o in [1 << 32, (1 << 64) - 1, for o in [
-((1<<31)+1), -(1<<63), 1 << 32,
1.0, 0.1, -0.1, -1.0]: (1 << 64) - 1,
-((1 << 31) + 1),
-(1 << 63),
1.0,
0.1,
-0.1,
-1.0,
]:
check(9, o) check(9, o)
def check_raw(overhead, num): def check_raw(overhead, num):
check(num + overhead, b" " * num) check(num + overhead, b" " * num)
def test_fixraw(): def test_fixraw():
check_raw(1, 0) check_raw(1, 0)
check_raw(1, (1<<5) - 1) check_raw(1, (1 << 5) - 1)
def test_raw16(): def test_raw16():
check_raw(3, 1<<5) check_raw(3, 1 << 5)
check_raw(3, (1<<16) - 1) check_raw(3, (1 << 16) - 1)
def test_raw32(): def test_raw32():
check_raw(5, 1<<16) check_raw(5, 1 << 16)
def check_array(overhead, num): def check_array(overhead, num):
check(num + overhead, (None,) * num) check(num + overhead, (None,) * num)
def test_fixarray(): def test_fixarray():
check_array(1, 0) check_array(1, 0)
check_array(1, (1 << 4) - 1) check_array(1, (1 << 4) - 1)
def test_array16(): def test_array16():
check_array(3, 1 << 4) check_array(3, 1 << 4)
check_array(3, (1<<16)-1) check_array(3, (1 << 16) - 1)
def test_array32(): def test_array32():
check_array(5, (1<<16)) check_array(5, (1 << 16))
def match(obj, buf): def match(obj, buf):
assert packb(obj) == buf assert packb(obj) == buf
assert unpackb(buf, use_list=0) == obj assert unpackb(buf, use_list=0) == obj
def test_match(): def test_match():
cases = [ cases = [
(None, b'\xc0'), (None, b"\xc0"),
(False, b'\xc2'), (False, b"\xc2"),
(True, b'\xc3'), (True, b"\xc3"),
(0, b'\x00'), (0, b"\x00"),
(127, b'\x7f'), (127, b"\x7f"),
(128, b'\xcc\x80'), (128, b"\xcc\x80"),
(256, b'\xcd\x01\x00'), (256, b"\xcd\x01\x00"),
(-1, b'\xff'), (-1, b"\xff"),
(-33, b'\xd0\xdf'), (-33, b"\xd0\xdf"),
(-129, b'\xd1\xff\x7f'), (-129, b"\xd1\xff\x7f"),
({1:1}, b'\x81\x01\x01'), ({1: 1}, b"\x81\x01\x01"),
(1.0, b"\xcb\x3f\xf0\x00\x00\x00\x00\x00\x00"), (1.0, b"\xcb\x3f\xf0\x00\x00\x00\x00\x00\x00"),
((), b'\x90'), ((), b"\x90"),
(tuple(range(15)),b"\x9f\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e"), (
(tuple(range(16)),b"\xdc\x00\x10\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f"), tuple(range(15)),
({}, b'\x80'), b"\x9f\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e",
(dict([(x,x) for x in range(15)]), b'\x8f\x00\x00\x01\x01\x02\x02\x03\x03\x04\x04\x05\x05\x06\x06\x07\x07\x08\x08\t\t\n\n\x0b\x0b\x0c\x0c\r\r\x0e\x0e'), ),
(dict([(x,x) for x in range(16)]), b'\xde\x00\x10\x00\x00\x01\x01\x02\x02\x03\x03\x04\x04\x05\x05\x06\x06\x07\x07\x08\x08\t\t\n\n\x0b\x0b\x0c\x0c\r\r\x0e\x0e\x0f\x0f'), (
tuple(range(16)),
b"\xdc\x00\x10\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f",
),
({}, b"\x80"),
(
dict([(x, x) for x in range(15)]),
b"\x8f\x00\x00\x01\x01\x02\x02\x03\x03\x04\x04\x05\x05\x06\x06\x07\x07\x08\x08\t\t\n\n\x0b\x0b\x0c\x0c\r\r\x0e\x0e",
),
(
dict([(x, x) for x in range(16)]),
b"\xde\x00\x10\x00\x00\x01\x01\x02\x02\x03\x03\x04\x04\x05\x05\x06\x06\x07\x07\x08\x08\t\t\n\n\x0b\x0b\x0c\x0c\r\r\x0e\x0e\x0f\x0f",
),
] ]
for v, p in cases: for v, p in cases:
match(v, p) match(v, p)
def test_unicode():
assert unpackb(packb('foobar'), use_list=1) == b'foobar'
def test_unicode():
assert unpackb(packb("foobar"), use_list=1) == b"foobar"

View file

@ -9,35 +9,39 @@ def test_pack_ext_type():
packer = msgpack.Packer() packer = msgpack.Packer()
packer.pack_ext_type(0x42, s) packer.pack_ext_type(0x42, s)
return packer.bytes() return packer.bytes()
assert p(b'A') == b'\xd4\x42A' # fixext 1
assert p(b'AB') == b'\xd5\x42AB' # fixext 2 assert p(b"A") == b"\xd4\x42A" # fixext 1
assert p(b'ABCD') == b'\xd6\x42ABCD' # fixext 4 assert p(b"AB") == b"\xd5\x42AB" # fixext 2
assert p(b'ABCDEFGH') == b'\xd7\x42ABCDEFGH' # fixext 8 assert p(b"ABCD") == b"\xd6\x42ABCD" # fixext 4
assert p(b'A'*16) == b'\xd8\x42' + b'A'*16 # fixext 16 assert p(b"ABCDEFGH") == b"\xd7\x42ABCDEFGH" # fixext 8
assert p(b'ABC') == b'\xc7\x03\x42ABC' # ext 8 assert p(b"A" * 16) == b"\xd8\x42" + b"A" * 16 # fixext 16
assert p(b'A'*0x0123) == b'\xc8\x01\x23\x42' + b'A'*0x0123 # ext 16 assert p(b"ABC") == b"\xc7\x03\x42ABC" # ext 8
assert p(b'A'*0x00012345) == b'\xc9\x00\x01\x23\x45\x42' + b'A'*0x00012345 # ext 32 assert p(b"A" * 0x0123) == b"\xc8\x01\x23\x42" + b"A" * 0x0123 # ext 16
assert (
p(b"A" * 0x00012345) == b"\xc9\x00\x01\x23\x45\x42" + b"A" * 0x00012345
) # ext 32
def test_unpack_ext_type(): def test_unpack_ext_type():
def check(b, expected): def check(b, expected):
assert msgpack.unpackb(b) == expected assert msgpack.unpackb(b) == expected
check(b'\xd4\x42A', ExtType(0x42, b'A')) # fixext 1 check(b"\xd4\x42A", ExtType(0x42, b"A")) # fixext 1
check(b'\xd5\x42AB', ExtType(0x42, b'AB')) # fixext 2 check(b"\xd5\x42AB", ExtType(0x42, b"AB")) # fixext 2
check(b'\xd6\x42ABCD', ExtType(0x42, b'ABCD')) # fixext 4 check(b"\xd6\x42ABCD", ExtType(0x42, b"ABCD")) # fixext 4
check(b'\xd7\x42ABCDEFGH', ExtType(0x42, b'ABCDEFGH')) # fixext 8 check(b"\xd7\x42ABCDEFGH", ExtType(0x42, b"ABCDEFGH")) # fixext 8
check(b'\xd8\x42' + b'A'*16, ExtType(0x42, b'A'*16)) # fixext 16 check(b"\xd8\x42" + b"A" * 16, ExtType(0x42, b"A" * 16)) # fixext 16
check(b'\xc7\x03\x42ABC', ExtType(0x42, b'ABC')) # ext 8 check(b"\xc7\x03\x42ABC", ExtType(0x42, b"ABC")) # ext 8
check(b'\xc8\x01\x23\x42' + b'A'*0x0123, check(b"\xc8\x01\x23\x42" + b"A" * 0x0123, ExtType(0x42, b"A" * 0x0123)) # ext 16
ExtType(0x42, b'A'*0x0123)) # ext 16 check(
check(b'\xc9\x00\x01\x23\x45\x42' + b'A'*0x00012345, b"\xc9\x00\x01\x23\x45\x42" + b"A" * 0x00012345,
ExtType(0x42, b'A'*0x00012345)) # ext 32 ExtType(0x42, b"A" * 0x00012345),
) # ext 32
def test_extension_type(): def test_extension_type():
def default(obj): def default(obj):
print('default called', obj) print("default called", obj)
if isinstance(obj, array.array): if isinstance(obj, array.array):
typecode = 123 # application specific typecode typecode = 123 # application specific typecode
try: try:
@ -48,24 +52,27 @@ def test_extension_type():
raise TypeError("Unknown type object %r" % (obj,)) raise TypeError("Unknown type object %r" % (obj,))
def ext_hook(code, data): def ext_hook(code, data):
print('ext_hook called', code, data) print("ext_hook called", code, data)
assert code == 123 assert code == 123
obj = array.array('d') obj = array.array("d")
try: try:
obj.frombytes(data) obj.frombytes(data)
except AttributeError: # PY2 except AttributeError: # PY2
obj.fromstring(data) obj.fromstring(data)
return obj return obj
obj = [42, b'hello', array.array('d', [1.1, 2.2, 3.3])] obj = [42, b"hello", array.array("d", [1.1, 2.2, 3.3])]
s = msgpack.packb(obj, default=default) s = msgpack.packb(obj, default=default)
obj2 = msgpack.unpackb(s, ext_hook=ext_hook) obj2 = msgpack.unpackb(s, ext_hook=ext_hook)
assert obj == obj2 assert obj == obj2
import sys import sys
if sys.version > '3':
if sys.version > "3":
long = int long = int
def test_overriding_hooks(): def test_overriding_hooks():
def default(obj): def default(obj):
if isinstance(obj, long): if isinstance(obj, long):

View file

@ -3,34 +3,38 @@
from msgpack import unpackb from msgpack import unpackb
def check(src, should, use_list=0): def check(src, should, use_list=0):
assert unpackb(src, use_list=use_list) == should assert unpackb(src, use_list=use_list) == should
def testSimpleValue(): def testSimpleValue():
check(b"\x93\xc0\xc2\xc3", check(b"\x93\xc0\xc2\xc3", (None, False, True,))
(None, False, True,))
def testFixnum(): def testFixnum():
check(b"\x92\x93\x00\x40\x7f\x93\xe0\xf0\xff", check(b"\x92\x93\x00\x40\x7f\x93\xe0\xf0\xff", ((0, 64, 127,), (-32, -16, -1,),))
((0,64,127,), (-32,-16,-1,),)
)
def testFixArray(): def testFixArray():
check(b"\x92\x90\x91\x91\xc0", check(
((),((None,),),), b"\x92\x90\x91\x91\xc0", ((), ((None,),),),
) )
def testFixRaw(): def testFixRaw():
check(b"\x94\xa0\xa1a\xa2bc\xa3def", check(
(b"", b"a", b"bc", b"def",), b"\x94\xa0\xa1a\xa2bc\xa3def", (b"", b"a", b"bc", b"def",),
) )
def testFixMap(): def testFixMap():
check( check(
b"\x82\xc2\x81\xc0\xc0\xc3\x81\xc0\x80", b"\x82\xc2\x81\xc0\xc0\xc3\x81\xc0\x80",
{False: {None: None}, True:{None:{}}}, {False: {None: None}, True: {None: {}}},
) )
def testUnsignedInt(): def testUnsignedInt():
check( check(
b"\x99\xcc\x00\xcc\x80\xcc\xff\xcd\x00\x00\xcd\x80\x00" b"\x99\xcc\x00\xcc\x80\xcc\xff\xcd\x00\x00\xcd\x80\x00"
@ -39,24 +43,33 @@ def testUnsignedInt():
(0, 128, 255, 0, 32768, 65535, 0, 2147483648, 4294967295,), (0, 128, 255, 0, 32768, 65535, 0, 2147483648, 4294967295,),
) )
def testSignedInt(): def testSignedInt():
check(b"\x99\xd0\x00\xd0\x80\xd0\xff\xd1\x00\x00\xd1\x80\x00" check(
b"\x99\xd0\x00\xd0\x80\xd0\xff\xd1\x00\x00\xd1\x80\x00"
b"\xd1\xff\xff\xd2\x00\x00\x00\x00\xd2\x80\x00\x00\x00" b"\xd1\xff\xff\xd2\x00\x00\x00\x00\xd2\x80\x00\x00\x00"
b"\xd2\xff\xff\xff\xff", b"\xd2\xff\xff\xff\xff",
(0, -128, -1, 0, -32768, -1, 0, -2147483648, -1,)) (0, -128, -1, 0, -32768, -1, 0, -2147483648, -1,),
)
def testRaw(): def testRaw():
check(b"\x96\xda\x00\x00\xda\x00\x01a\xda\x00\x02ab\xdb\x00\x00" check(
b"\x96\xda\x00\x00\xda\x00\x01a\xda\x00\x02ab\xdb\x00\x00"
b"\x00\x00\xdb\x00\x00\x00\x01a\xdb\x00\x00\x00\x02ab", b"\x00\x00\xdb\x00\x00\x00\x01a\xdb\x00\x00\x00\x02ab",
(b"", b"a", b"ab", b"", b"a", b"ab")) (b"", b"a", b"ab", b"", b"a", b"ab"),
)
def testArray(): def testArray():
check(b"\x96\xdc\x00\x00\xdc\x00\x01\xc0\xdc\x00\x02\xc2\xc3\xdd\x00" check(
b"\x96\xdc\x00\x00\xdc\x00\x01\xc0\xdc\x00\x02\xc2\xc3\xdd\x00"
b"\x00\x00\x00\xdd\x00\x00\x00\x01\xc0\xdd\x00\x00\x00\x02" b"\x00\x00\x00\xdd\x00\x00\x00\x01\xc0\xdd\x00\x00\x00\x02"
b"\xc2\xc3", b"\xc2\xc3",
((), (None,), (False,True), (), (None,), (False,True)) ((), (None,), (False, True), (), (None,), (False, True)),
) )
def testMap(): def testMap():
check( check(
b"\x96" b"\x96"
@ -66,5 +79,12 @@ def testMap():
b"\xdf\x00\x00\x00\x00" b"\xdf\x00\x00\x00\x00"
b"\xdf\x00\x00\x00\x01\xc0\xc2" b"\xdf\x00\x00\x00\x01\xc0\xc2"
b"\xdf\x00\x00\x00\x02\xc0\xc2\xc3\xc2", b"\xdf\x00\x00\x00\x02\xc0\xc2\xc3\xc2",
({}, {None: False}, {True: False, None: False}, {}, (
{None: False}, {True: False, None: False})) {},
{None: False},
{True: False, None: False},
{},
{None: False},
{True: False, None: False},
),
)

View file

@ -4,8 +4,14 @@ from __future__ import absolute_import, division, print_function, unicode_litera
import pytest import pytest
from msgpack import ( from msgpack import (
packb, unpackb, Packer, Unpacker, ExtType, packb,
PackOverflowError, PackValueError, UnpackValueError, unpackb,
Packer,
Unpacker,
ExtType,
PackOverflowError,
PackValueError,
UnpackValueError,
) )
@ -13,30 +19,30 @@ def test_integer():
x = -(2 ** 63) x = -(2 ** 63)
assert unpackb(packb(x)) == x assert unpackb(packb(x)) == x
with pytest.raises(PackOverflowError): with pytest.raises(PackOverflowError):
packb(x-1) packb(x - 1)
x = 2 ** 64 - 1 x = 2 ** 64 - 1
assert unpackb(packb(x)) == x assert unpackb(packb(x)) == x
with pytest.raises(PackOverflowError): with pytest.raises(PackOverflowError):
packb(x+1) packb(x + 1)
def test_array_header(): def test_array_header():
packer = Packer() packer = Packer()
packer.pack_array_header(2**32-1) packer.pack_array_header(2 ** 32 - 1)
with pytest.raises(PackValueError): with pytest.raises(PackValueError):
packer.pack_array_header(2**32) packer.pack_array_header(2 ** 32)
def test_map_header(): def test_map_header():
packer = Packer() packer = Packer()
packer.pack_map_header(2**32-1) packer.pack_map_header(2 ** 32 - 1)
with pytest.raises(PackValueError): with pytest.raises(PackValueError):
packer.pack_array_header(2**32) packer.pack_array_header(2 ** 32)
def test_max_str_len(): def test_max_str_len():
d = 'x' * 3 d = "x" * 3
packed = packb(d) packed = packb(d)
unpacker = Unpacker(max_str_len=3, raw=False) unpacker = Unpacker(max_str_len=3, raw=False)
@ -50,7 +56,7 @@ def test_max_str_len():
def test_max_bin_len(): def test_max_bin_len():
d = b'x' * 3 d = b"x" * 3
packed = packb(d, use_bin_type=True) packed = packb(d, use_bin_type=True)
unpacker = Unpacker(max_bin_len=3) unpacker = Unpacker(max_bin_len=3)
@ -64,7 +70,7 @@ def test_max_bin_len():
def test_max_array_len(): def test_max_array_len():
d = [1,2,3] d = [1, 2, 3]
packed = packb(d) packed = packb(d)
unpacker = Unpacker(max_array_len=3) unpacker = Unpacker(max_array_len=3)
@ -107,8 +113,8 @@ def test_max_ext_len():
# PyPy fails following tests because of constant folding? # PyPy fails following tests because of constant folding?
# https://bugs.pypy.org/issue1721 # https://bugs.pypy.org/issue1721
#@pytest.mark.skipif(True, reason="Requires very large memory.") # @pytest.mark.skipif(True, reason="Requires very large memory.")
#def test_binary(): # def test_binary():
# x = b'x' * (2**32 - 1) # x = b'x' * (2**32 - 1)
# assert unpackb(packb(x)) == x # assert unpackb(packb(x)) == x
# del x # del x
@ -117,8 +123,8 @@ def test_max_ext_len():
# packb(x) # packb(x)
# #
# #
#@pytest.mark.skipif(True, reason="Requires very large memory.") # @pytest.mark.skipif(True, reason="Requires very large memory.")
#def test_string(): # def test_string():
# x = 'x' * (2**32 - 1) # x = 'x' * (2**32 - 1)
# assert unpackb(packb(x)) == x # assert unpackb(packb(x)) == x
# x += 'y' # x += 'y'
@ -126,8 +132,8 @@ def test_max_ext_len():
# packb(x) # packb(x)
# #
# #
#@pytest.mark.skipif(True, reason="Requires very large memory.") # @pytest.mark.skipif(True, reason="Requires very large memory.")
#def test_array(): # def test_array():
# x = [0] * (2**32 - 1) # x = [0] * (2**32 - 1)
# assert unpackb(packb(x)) == x # assert unpackb(packb(x)) == x
# x.append(0) # x.append(0)
@ -137,8 +143,9 @@ def test_max_ext_len():
# auto max len # auto max len
def test_auto_max_array_len(): def test_auto_max_array_len():
packed = b'\xde\x00\x06zz' packed = b"\xde\x00\x06zz"
with pytest.raises(UnpackValueError): with pytest.raises(UnpackValueError):
unpackb(packed, raw=False) unpackb(packed, raw=False)
@ -147,9 +154,10 @@ def test_auto_max_array_len():
with pytest.raises(UnpackValueError): with pytest.raises(UnpackValueError):
unpacker.unpack() unpacker.unpack()
def test_auto_max_map_len(): def test_auto_max_map_len():
# len(packed) == 6 -> max_map_len == 3 # len(packed) == 6 -> max_map_len == 3
packed = b'\xde\x00\x04zzz' packed = b"\xde\x00\x04zzz"
with pytest.raises(UnpackValueError): with pytest.raises(UnpackValueError):
unpackb(packed, raw=False) unpackb(packed, raw=False)

View file

@ -10,6 +10,7 @@ import sys
# - array type only supports old buffer interface # - array type only supports old buffer interface
# - array.frombytes is not available, must use deprecated array.fromstring # - array.frombytes is not available, must use deprecated array.fromstring
if sys.version_info[0] < 3: if sys.version_info[0] < 3:
def make_memoryview(obj): def make_memoryview(obj):
return memoryview(buffer(obj)) return memoryview(buffer(obj))
@ -20,6 +21,8 @@ if sys.version_info[0] < 3:
def get_data(a): def get_data(a):
return a.tostring() return a.tostring()
else: else:
make_memoryview = memoryview make_memoryview = memoryview
@ -49,64 +52,64 @@ def _runtest(format, nbytes, expected_header, expected_prefix, use_bin_type):
# check packed header # check packed header
assert packed[:1] == expected_header assert packed[:1] == expected_header
# check packed length prefix, if any # check packed length prefix, if any
assert packed[1:1+len(expected_prefix)] == expected_prefix assert packed[1 : 1 + len(expected_prefix)] == expected_prefix
# check packed data # check packed data
assert packed[1+len(expected_prefix):] == original_data assert packed[1 + len(expected_prefix) :] == original_data
# check array unpacked correctly # check array unpacked correctly
assert original_array == reconstructed_array assert original_array == reconstructed_array
def test_fixstr_from_byte(): def test_fixstr_from_byte():
_runtest('B', 1, b'\xa1', b'', False) _runtest("B", 1, b"\xa1", b"", False)
_runtest('B', 31, b'\xbf', b'', False) _runtest("B", 31, b"\xbf", b"", False)
def test_fixstr_from_float(): def test_fixstr_from_float():
_runtest('f', 4, b'\xa4', b'', False) _runtest("f", 4, b"\xa4", b"", False)
_runtest('f', 28, b'\xbc', b'', False) _runtest("f", 28, b"\xbc", b"", False)
def test_str16_from_byte(): def test_str16_from_byte():
_runtest('B', 2**8, b'\xda', b'\x01\x00', False) _runtest("B", 2 ** 8, b"\xda", b"\x01\x00", False)
_runtest('B', 2**16-1, b'\xda', b'\xff\xff', False) _runtest("B", 2 ** 16 - 1, b"\xda", b"\xff\xff", False)
def test_str16_from_float(): def test_str16_from_float():
_runtest('f', 2**8, b'\xda', b'\x01\x00', False) _runtest("f", 2 ** 8, b"\xda", b"\x01\x00", False)
_runtest('f', 2**16-4, b'\xda', b'\xff\xfc', False) _runtest("f", 2 ** 16 - 4, b"\xda", b"\xff\xfc", False)
def test_str32_from_byte(): def test_str32_from_byte():
_runtest('B', 2**16, b'\xdb', b'\x00\x01\x00\x00', False) _runtest("B", 2 ** 16, b"\xdb", b"\x00\x01\x00\x00", False)
def test_str32_from_float(): def test_str32_from_float():
_runtest('f', 2**16, b'\xdb', b'\x00\x01\x00\x00', False) _runtest("f", 2 ** 16, b"\xdb", b"\x00\x01\x00\x00", False)
def test_bin8_from_byte(): def test_bin8_from_byte():
_runtest('B', 1, b'\xc4', b'\x01', True) _runtest("B", 1, b"\xc4", b"\x01", True)
_runtest('B', 2**8-1, b'\xc4', b'\xff', True) _runtest("B", 2 ** 8 - 1, b"\xc4", b"\xff", True)
def test_bin8_from_float(): def test_bin8_from_float():
_runtest('f', 4, b'\xc4', b'\x04', True) _runtest("f", 4, b"\xc4", b"\x04", True)
_runtest('f', 2**8-4, b'\xc4', b'\xfc', True) _runtest("f", 2 ** 8 - 4, b"\xc4", b"\xfc", True)
def test_bin16_from_byte(): def test_bin16_from_byte():
_runtest('B', 2**8, b'\xc5', b'\x01\x00', True) _runtest("B", 2 ** 8, b"\xc5", b"\x01\x00", True)
_runtest('B', 2**16-1, b'\xc5', b'\xff\xff', True) _runtest("B", 2 ** 16 - 1, b"\xc5", b"\xff\xff", True)
def test_bin16_from_float(): def test_bin16_from_float():
_runtest('f', 2**8, b'\xc5', b'\x01\x00', True) _runtest("f", 2 ** 8, b"\xc5", b"\x01\x00", True)
_runtest('f', 2**16-4, b'\xc5', b'\xff\xfc', True) _runtest("f", 2 ** 16 - 4, b"\xc5", b"\xff\xfc", True)
def test_bin32_from_byte(): def test_bin32_from_byte():
_runtest('B', 2**16, b'\xc6', b'\x00\x01\x00\x00', True) _runtest("B", 2 ** 16, b"\xc6", b"\x00\x01\x00\x00", True)
def test_bin32_from_float(): def test_bin32_from_float():
_runtest('f', 2**16, b'\xc6', b'\x00\x01\x00\x00', True) _runtest("f", 2 ** 16, b"\xc6", b"\x00\x01\x00\x00", True)

View file

@ -4,85 +4,87 @@ from msgpack import packb, unpackb, ExtType
def test_str8(): def test_str8():
header = b'\xd9' header = b"\xd9"
data = b'x' * 32 data = b"x" * 32
b = packb(data.decode(), use_bin_type=True) b = packb(data.decode(), use_bin_type=True)
assert len(b) == len(data) + 2 assert len(b) == len(data) + 2
assert b[0:2] == header + b'\x20' assert b[0:2] == header + b"\x20"
assert b[2:] == data assert b[2:] == data
assert unpackb(b) == data assert unpackb(b) == data
data = b'x' * 255 data = b"x" * 255
b = packb(data.decode(), use_bin_type=True) b = packb(data.decode(), use_bin_type=True)
assert len(b) == len(data) + 2 assert len(b) == len(data) + 2
assert b[0:2] == header + b'\xff' assert b[0:2] == header + b"\xff"
assert b[2:] == data assert b[2:] == data
assert unpackb(b) == data assert unpackb(b) == data
def test_bin8(): def test_bin8():
header = b'\xc4' header = b"\xc4"
data = b'' data = b""
b = packb(data, use_bin_type=True) b = packb(data, use_bin_type=True)
assert len(b) == len(data) + 2 assert len(b) == len(data) + 2
assert b[0:2] == header + b'\x00' assert b[0:2] == header + b"\x00"
assert b[2:] == data assert b[2:] == data
assert unpackb(b) == data assert unpackb(b) == data
data = b'x' * 255 data = b"x" * 255
b = packb(data, use_bin_type=True) b = packb(data, use_bin_type=True)
assert len(b) == len(data) + 2 assert len(b) == len(data) + 2
assert b[0:2] == header + b'\xff' assert b[0:2] == header + b"\xff"
assert b[2:] == data assert b[2:] == data
assert unpackb(b) == data assert unpackb(b) == data
def test_bin16(): def test_bin16():
header = b'\xc5' header = b"\xc5"
data = b'x' * 256 data = b"x" * 256
b = packb(data, use_bin_type=True) b = packb(data, use_bin_type=True)
assert len(b) == len(data) + 3 assert len(b) == len(data) + 3
assert b[0:1] == header assert b[0:1] == header
assert b[1:3] == b'\x01\x00' assert b[1:3] == b"\x01\x00"
assert b[3:] == data assert b[3:] == data
assert unpackb(b) == data assert unpackb(b) == data
data = b'x' * 65535 data = b"x" * 65535
b = packb(data, use_bin_type=True) b = packb(data, use_bin_type=True)
assert len(b) == len(data) + 3 assert len(b) == len(data) + 3
assert b[0:1] == header assert b[0:1] == header
assert b[1:3] == b'\xff\xff' assert b[1:3] == b"\xff\xff"
assert b[3:] == data assert b[3:] == data
assert unpackb(b) == data assert unpackb(b) == data
def test_bin32(): def test_bin32():
header = b'\xc6' header = b"\xc6"
data = b'x' * 65536 data = b"x" * 65536
b = packb(data, use_bin_type=True) b = packb(data, use_bin_type=True)
assert len(b) == len(data) + 5 assert len(b) == len(data) + 5
assert b[0:1] == header assert b[0:1] == header
assert b[1:5] == b'\x00\x01\x00\x00' assert b[1:5] == b"\x00\x01\x00\x00"
assert b[5:] == data assert b[5:] == data
assert unpackb(b) == data assert unpackb(b) == data
def test_ext(): def test_ext():
def check(ext, packed): def check(ext, packed):
assert packb(ext) == packed assert packb(ext) == packed
assert unpackb(packed) == ext assert unpackb(packed) == ext
check(ExtType(0x42, b'Z'), b'\xd4\x42Z') # fixext 1
check(ExtType(0x42, b'ZZ'), b'\xd5\x42ZZ') # fixext 2 check(ExtType(0x42, b"Z"), b"\xd4\x42Z") # fixext 1
check(ExtType(0x42, b'Z'*4), b'\xd6\x42' + b'Z'*4) # fixext 4 check(ExtType(0x42, b"ZZ"), b"\xd5\x42ZZ") # fixext 2
check(ExtType(0x42, b'Z'*8), b'\xd7\x42' + b'Z'*8) # fixext 8 check(ExtType(0x42, b"Z" * 4), b"\xd6\x42" + b"Z" * 4) # fixext 4
check(ExtType(0x42, b'Z'*16), b'\xd8\x42' + b'Z'*16) # fixext 16 check(ExtType(0x42, b"Z" * 8), b"\xd7\x42" + b"Z" * 8) # fixext 8
check(ExtType(0x42, b"Z" * 16), b"\xd8\x42" + b"Z" * 16) # fixext 16
# ext 8 # ext 8
check(ExtType(0x42, b''), b'\xc7\x00\x42') check(ExtType(0x42, b""), b"\xc7\x00\x42")
check(ExtType(0x42, b'Z'*255), b'\xc7\xff\x42' + b'Z'*255) check(ExtType(0x42, b"Z" * 255), b"\xc7\xff\x42" + b"Z" * 255)
# ext 16 # ext 16
check(ExtType(0x42, b'Z'*256), b'\xc8\x01\x00\x42' + b'Z'*256) check(ExtType(0x42, b"Z" * 256), b"\xc8\x01\x00\x42" + b"Z" * 256)
check(ExtType(0x42, b'Z'*0xffff), b'\xc8\xff\xff\x42' + b'Z'*0xffff) check(ExtType(0x42, b"Z" * 0xFFFF), b"\xc8\xff\xff\x42" + b"Z" * 0xFFFF)
# ext 32 # ext 32
check(ExtType(0x42, b'Z'*0x10000), b'\xc9\x00\x01\x00\x00\x42' + b'Z'*0x10000) check(ExtType(0x42, b"Z" * 0x10000), b"\xc9\x00\x01\x00\x00\x42" + b"Z" * 0x10000)
# needs large memory # needs large memory
#check(ExtType(0x42, b'Z'*0xffffffff), # check(ExtType(0x42, b'Z'*0xffffffff),
# b'\xc9\xff\xff\xff\xff\x42' + b'Z'*0xffffffff) # b'\xc9\xff\xff\xff\xff\x42' + b'Z'*0xffffffff)

View file

@ -4,64 +4,76 @@
from pytest import raises from pytest import raises
from msgpack import packb, unpackb from msgpack import packb, unpackb
def _decode_complex(obj): def _decode_complex(obj):
if b'__complex__' in obj: if b"__complex__" in obj:
return complex(obj[b'real'], obj[b'imag']) return complex(obj[b"real"], obj[b"imag"])
return obj return obj
def _encode_complex(obj): def _encode_complex(obj):
if isinstance(obj, complex): if isinstance(obj, complex):
return {b'__complex__': True, b'real': 1, b'imag': 2} return {b"__complex__": True, b"real": 1, b"imag": 2}
return obj return obj
def test_encode_hook(): def test_encode_hook():
packed = packb([3, 1+2j], default=_encode_complex) packed = packb([3, 1 + 2j], default=_encode_complex)
unpacked = unpackb(packed, use_list=1) unpacked = unpackb(packed, use_list=1)
assert unpacked[1] == {b'__complex__': True, b'real': 1, b'imag': 2} assert unpacked[1] == {b"__complex__": True, b"real": 1, b"imag": 2}
def test_decode_hook(): def test_decode_hook():
packed = packb([3, {b'__complex__': True, b'real': 1, b'imag': 2}]) packed = packb([3, {b"__complex__": True, b"real": 1, b"imag": 2}])
unpacked = unpackb(packed, object_hook=_decode_complex, use_list=1) unpacked = unpackb(packed, object_hook=_decode_complex, use_list=1)
assert unpacked[1] == 1+2j assert unpacked[1] == 1 + 2j
def test_decode_pairs_hook(): def test_decode_pairs_hook():
packed = packb([3, {1: 2, 3: 4}]) packed = packb([3, {1: 2, 3: 4}])
prod_sum = 1 * 2 + 3 * 4 prod_sum = 1 * 2 + 3 * 4
unpacked = unpackb(packed, object_pairs_hook=lambda l: sum(k * v for k, v in l), use_list=1) unpacked = unpackb(
packed, object_pairs_hook=lambda l: sum(k * v for k, v in l), use_list=1
)
assert unpacked[1] == prod_sum assert unpacked[1] == prod_sum
def test_only_one_obj_hook(): def test_only_one_obj_hook():
with raises(TypeError): with raises(TypeError):
unpackb(b'', object_hook=lambda x: x, object_pairs_hook=lambda x: x) unpackb(b"", object_hook=lambda x: x, object_pairs_hook=lambda x: x)
def test_bad_hook(): def test_bad_hook():
with raises(TypeError): with raises(TypeError):
packed = packb([3, 1+2j], default=lambda o: o) packed = packb([3, 1 + 2j], default=lambda o: o)
unpacked = unpackb(packed, use_list=1) unpacked = unpackb(packed, use_list=1)
def _arr_to_str(arr): def _arr_to_str(arr):
return ''.join(str(c) for c in arr) return "".join(str(c) for c in arr)
def test_array_hook(): def test_array_hook():
packed = packb([1,2,3]) packed = packb([1, 2, 3])
unpacked = unpackb(packed, list_hook=_arr_to_str, use_list=1) unpacked = unpackb(packed, list_hook=_arr_to_str, use_list=1)
assert unpacked == '123' assert unpacked == "123"
class DecodeError(Exception): class DecodeError(Exception):
pass pass
def bad_complex_decoder(o): def bad_complex_decoder(o):
raise DecodeError("Ooops!") raise DecodeError("Ooops!")
def test_an_exception_in_objecthook1(): def test_an_exception_in_objecthook1():
with raises(DecodeError): with raises(DecodeError):
packed = packb({1: {'__complex__': True, 'real': 1, 'imag': 2}}) packed = packb({1: {"__complex__": True, "real": 1, "imag": 2}})
unpackb(packed, object_hook=bad_complex_decoder) unpackb(packed, object_hook=bad_complex_decoder)
def test_an_exception_in_objecthook2(): def test_an_exception_in_objecthook2():
with raises(DecodeError): with raises(DecodeError):
packed = packb({1: [{'__complex__': True, 'real': 1, 'imag': 2}]}) packed = packb({1: [{"__complex__": True, "real": 1, "imag": 2}]})
unpackb(packed, list_hook=bad_complex_decoder, use_list=1) unpackb(packed, list_hook=bad_complex_decoder, use_list=1)

View file

@ -17,20 +17,46 @@ def check(data, use_list=False):
re = unpackb(packb(data), use_list=use_list) re = unpackb(packb(data), use_list=use_list)
assert re == data assert re == data
def testPack(): def testPack():
test_data = [ test_data = [
0, 1, 127, 128, 255, 256, 65535, 65536, 4294967295, 4294967296, 0,
-1, -32, -33, -128, -129, -32768, -32769, -4294967296, -4294967297, 1,
127,
128,
255,
256,
65535,
65536,
4294967295,
4294967296,
-1,
-32,
-33,
-128,
-129,
-32768,
-32769,
-4294967296,
-4294967297,
1.0, 1.0,
b"", b"a", b"a"*31, b"a"*32, b"",
None, True, False, b"a",
(), ((),), ((), None,), b"a" * 31,
b"a" * 32,
None,
True,
False,
(),
((),),
((), None,),
{None: 0}, {None: 0},
(1<<23), (1 << 23),
] ]
for td in test_data: for td in test_data:
check(td) check(td)
def testPackUnicode(): def testPackUnicode():
test_data = ["", "abcd", ["defgh"], "Русский текст"] test_data = ["", "abcd", ["defgh"], "Русский текст"]
for td in test_data: for td in test_data:
@ -41,43 +67,64 @@ def testPackUnicode():
re = Unpacker(BytesIO(data), raw=False, use_list=1).unpack() re = Unpacker(BytesIO(data), raw=False, use_list=1).unpack()
assert re == td assert re == td
def testPackBytes(): def testPackBytes():
test_data = [ test_data = [
b"", b"abcd", (b"defgh",), b"",
b"abcd",
(b"defgh",),
] ]
for td in test_data: for td in test_data:
check(td) check(td)
def testPackByteArrays(): def testPackByteArrays():
test_data = [ test_data = [
bytearray(b""), bytearray(b"abcd"), (bytearray(b"defgh"),), bytearray(b""),
bytearray(b"abcd"),
(bytearray(b"defgh"),),
] ]
for td in test_data: for td in test_data:
check(td) check(td)
@pytest.mark.skipif(sys.version_info < (3,0), reason="Python 2 passes invalid surrogates")
@pytest.mark.skipif(
sys.version_info < (3, 0), reason="Python 2 passes invalid surrogates"
)
def testIgnoreUnicodeErrors(): def testIgnoreUnicodeErrors():
re = unpackb(packb(b'abc\xeddef', use_bin_type=False), re = unpackb(
raw=False, unicode_errors='ignore') packb(b"abc\xeddef", use_bin_type=False), raw=False, unicode_errors="ignore"
)
assert re == "abcdef" assert re == "abcdef"
def testStrictUnicodeUnpack(): def testStrictUnicodeUnpack():
packed = packb(b'abc\xeddef', use_bin_type=False) packed = packb(b"abc\xeddef", use_bin_type=False)
with pytest.raises(UnicodeDecodeError): with pytest.raises(UnicodeDecodeError):
unpackb(packed, raw=False, use_list=1) unpackb(packed, raw=False, use_list=1)
@pytest.mark.skipif(sys.version_info < (3,0), reason="Python 2 passes invalid surrogates")
@pytest.mark.skipif(
sys.version_info < (3, 0), reason="Python 2 passes invalid surrogates"
)
def testIgnoreErrorsPack(): def testIgnoreErrorsPack():
re = unpackb(packb(u"abc\uDC80\uDCFFdef", use_bin_type=True, unicode_errors='ignore'), raw=False, use_list=1) re = unpackb(
packb("abc\uDC80\uDCFFdef", use_bin_type=True, unicode_errors="ignore"),
raw=False,
use_list=1,
)
assert re == "abcdef" assert re == "abcdef"
def testDecodeBinary(): def testDecodeBinary():
re = unpackb(packb(b"abc"), use_list=1) re = unpackb(packb(b"abc"), use_list=1)
assert re == b"abc" assert re == b"abc"
def testPackFloat(): def testPackFloat():
assert packb(1.0, use_single_float=True) == b'\xca' + struct.pack(str('>f'), 1.0) assert packb(1.0, use_single_float=True) == b"\xca" + struct.pack(str(">f"), 1.0)
assert packb(1.0, use_single_float=False) == b'\xcb' + struct.pack(str('>d'), 1.0) assert packb(1.0, use_single_float=False) == b"\xcb" + struct.pack(str(">d"), 1.0)
def testArraySize(sizes=[0, 5, 50, 1000]): def testArraySize(sizes=[0, 5, 50, 1000]):
bio = BytesIO() bio = BytesIO()
@ -92,6 +139,7 @@ def testArraySize(sizes=[0, 5, 50, 1000]):
for size in sizes: for size in sizes:
assert unpacker.unpack() == list(range(size)) assert unpacker.unpack() == list(range(size))
def test_manualreset(sizes=[0, 5, 50, 1000]): def test_manualreset(sizes=[0, 5, 50, 1000]):
packer = Packer(autoreset=False) packer = Packer(autoreset=False)
for size in sizes: for size in sizes:
@ -105,7 +153,8 @@ def test_manualreset(sizes=[0, 5, 50, 1000]):
assert unpacker.unpack() == list(range(size)) assert unpacker.unpack() == list(range(size))
packer.reset() packer.reset()
assert packer.bytes() == b'' assert packer.bytes() == b""
def testMapSize(sizes=[0, 5, 50, 1000]): def testMapSize(sizes=[0, 5, 50, 1000]):
bio = BytesIO() bio = BytesIO()
@ -123,21 +172,24 @@ def testMapSize(sizes=[0, 5, 50, 1000]):
def test_odict(): def test_odict():
seq = [(b'one', 1), (b'two', 2), (b'three', 3), (b'four', 4)] seq = [(b"one", 1), (b"two", 2), (b"three", 3), (b"four", 4)]
od = OrderedDict(seq) od = OrderedDict(seq)
assert unpackb(packb(od), use_list=1) == dict(seq) assert unpackb(packb(od), use_list=1) == dict(seq)
def pair_hook(seq): def pair_hook(seq):
return list(seq) return list(seq)
assert unpackb(packb(od), object_pairs_hook=pair_hook, use_list=1) == seq assert unpackb(packb(od), object_pairs_hook=pair_hook, use_list=1) == seq
def test_pairlist(): def test_pairlist():
pairlist = [(b'a', 1), (2, b'b'), (b'foo', b'bar')] pairlist = [(b"a", 1), (2, b"b"), (b"foo", b"bar")]
packer = Packer() packer = Packer()
packed = packer.pack_map_pairs(pairlist) packed = packer.pack_map_pairs(pairlist)
unpacked = unpackb(packed, object_pairs_hook=list) unpacked = unpackb(packed, object_pairs_hook=list)
assert pairlist == unpacked assert pairlist == unpacked
def test_get_buffer(): def test_get_buffer():
packer = Packer(autoreset=0, use_bin_type=True) packer = Packer(autoreset=0, use_bin_type=True)
packer.pack([1, 2]) packer.pack([1, 2])

View file

@ -1,66 +1,71 @@
"""Test Unpacker's read_array_header and read_map_header methods""" """Test Unpacker's read_array_header and read_map_header methods"""
from msgpack import packb, Unpacker, OutOfData from msgpack import packb, Unpacker, OutOfData
UnexpectedTypeException = ValueError UnexpectedTypeException = ValueError
def test_read_array_header(): def test_read_array_header():
unpacker = Unpacker() unpacker = Unpacker()
unpacker.feed(packb(['a', 'b', 'c'])) unpacker.feed(packb(["a", "b", "c"]))
assert unpacker.read_array_header() == 3 assert unpacker.read_array_header() == 3
assert unpacker.unpack() == b'a' assert unpacker.unpack() == b"a"
assert unpacker.unpack() == b'b' assert unpacker.unpack() == b"b"
assert unpacker.unpack() == b'c' assert unpacker.unpack() == b"c"
try: try:
unpacker.unpack() unpacker.unpack()
assert 0, 'should raise exception' assert 0, "should raise exception"
except OutOfData: except OutOfData:
assert 1, 'okay' assert 1, "okay"
def test_read_map_header(): def test_read_map_header():
unpacker = Unpacker() unpacker = Unpacker()
unpacker.feed(packb({'a': 'A'})) unpacker.feed(packb({"a": "A"}))
assert unpacker.read_map_header() == 1 assert unpacker.read_map_header() == 1
assert unpacker.unpack() == B'a' assert unpacker.unpack() == b"a"
assert unpacker.unpack() == B'A' assert unpacker.unpack() == b"A"
try: try:
unpacker.unpack() unpacker.unpack()
assert 0, 'should raise exception' assert 0, "should raise exception"
except OutOfData: except OutOfData:
assert 1, 'okay' assert 1, "okay"
def test_incorrect_type_array(): def test_incorrect_type_array():
unpacker = Unpacker() unpacker = Unpacker()
unpacker.feed(packb(1)) unpacker.feed(packb(1))
try: try:
unpacker.read_array_header() unpacker.read_array_header()
assert 0, 'should raise exception' assert 0, "should raise exception"
except UnexpectedTypeException: except UnexpectedTypeException:
assert 1, 'okay' assert 1, "okay"
def test_incorrect_type_map(): def test_incorrect_type_map():
unpacker = Unpacker() unpacker = Unpacker()
unpacker.feed(packb(1)) unpacker.feed(packb(1))
try: try:
unpacker.read_map_header() unpacker.read_map_header()
assert 0, 'should raise exception' assert 0, "should raise exception"
except UnexpectedTypeException: except UnexpectedTypeException:
assert 1, 'okay' assert 1, "okay"
def test_correct_type_nested_array(): def test_correct_type_nested_array():
unpacker = Unpacker() unpacker = Unpacker()
unpacker.feed(packb({'a': ['b', 'c', 'd']})) unpacker.feed(packb({"a": ["b", "c", "d"]}))
try: try:
unpacker.read_array_header() unpacker.read_array_header()
assert 0, 'should raise exception' assert 0, "should raise exception"
except UnexpectedTypeException: except UnexpectedTypeException:
assert 1, 'okay' assert 1, "okay"
def test_incorrect_type_nested_map(): def test_incorrect_type_nested_map():
unpacker = Unpacker() unpacker = Unpacker()
unpacker.feed(packb([{'a': 'b'}])) unpacker.feed(packb([{"a": "b"}]))
try: try:
unpacker.read_map_header() unpacker.read_map_header()
assert 0, 'should raise exception' assert 0, "should raise exception"
except UnexpectedTypeException: except UnexpectedTypeException:
assert 1, 'okay' assert 1, "okay"

View file

@ -7,8 +7,9 @@ import msgpack
binarydata = bytes(bytearray(range(256))) binarydata = bytes(bytearray(range(256)))
def gen_binary_data(idx): def gen_binary_data(idx):
return binarydata[:idx % 300] return binarydata[: idx % 300]
def test_exceeding_unpacker_read_size(): def test_exceeding_unpacker_read_size():

View file

@ -10,102 +10,115 @@ from pytest import raises
def test_partialdata(): def test_partialdata():
unpacker = Unpacker() unpacker = Unpacker()
unpacker.feed(b'\xa5') unpacker.feed(b"\xa5")
with raises(StopIteration): next(iter(unpacker)) with raises(StopIteration):
unpacker.feed(b'h') next(iter(unpacker))
with raises(StopIteration): next(iter(unpacker)) unpacker.feed(b"h")
unpacker.feed(b'a') with raises(StopIteration):
with raises(StopIteration): next(iter(unpacker)) next(iter(unpacker))
unpacker.feed(b'l') unpacker.feed(b"a")
with raises(StopIteration): next(iter(unpacker)) with raises(StopIteration):
unpacker.feed(b'l') next(iter(unpacker))
with raises(StopIteration): next(iter(unpacker)) unpacker.feed(b"l")
unpacker.feed(b'o') with raises(StopIteration):
assert next(iter(unpacker)) == b'hallo' next(iter(unpacker))
unpacker.feed(b"l")
with raises(StopIteration):
next(iter(unpacker))
unpacker.feed(b"o")
assert next(iter(unpacker)) == b"hallo"
def test_foobar(): def test_foobar():
unpacker = Unpacker(read_size=3, use_list=1) unpacker = Unpacker(read_size=3, use_list=1)
unpacker.feed(b'foobar') unpacker.feed(b"foobar")
assert unpacker.unpack() == ord(b'f') assert unpacker.unpack() == ord(b"f")
assert unpacker.unpack() == ord(b'o') assert unpacker.unpack() == ord(b"o")
assert unpacker.unpack() == ord(b'o') assert unpacker.unpack() == ord(b"o")
assert unpacker.unpack() == ord(b'b') assert unpacker.unpack() == ord(b"b")
assert unpacker.unpack() == ord(b'a') assert unpacker.unpack() == ord(b"a")
assert unpacker.unpack() == ord(b'r') assert unpacker.unpack() == ord(b"r")
with raises(OutOfData): with raises(OutOfData):
unpacker.unpack() unpacker.unpack()
unpacker.feed(b'foo') unpacker.feed(b"foo")
unpacker.feed(b'bar') unpacker.feed(b"bar")
k = 0 k = 0
for o, e in zip(unpacker, 'foobarbaz'): for o, e in zip(unpacker, "foobarbaz"):
assert o == ord(e) assert o == ord(e)
k += 1 k += 1
assert k == len(b'foobar') assert k == len(b"foobar")
def test_foobar_skip(): def test_foobar_skip():
unpacker = Unpacker(read_size=3, use_list=1) unpacker = Unpacker(read_size=3, use_list=1)
unpacker.feed(b'foobar') unpacker.feed(b"foobar")
assert unpacker.unpack() == ord(b'f') assert unpacker.unpack() == ord(b"f")
unpacker.skip() unpacker.skip()
assert unpacker.unpack() == ord(b'o') assert unpacker.unpack() == ord(b"o")
unpacker.skip() unpacker.skip()
assert unpacker.unpack() == ord(b'a') assert unpacker.unpack() == ord(b"a")
unpacker.skip() unpacker.skip()
with raises(OutOfData): with raises(OutOfData):
unpacker.unpack() unpacker.unpack()
def test_maxbuffersize(): def test_maxbuffersize():
with raises(ValueError): with raises(ValueError):
Unpacker(read_size=5, max_buffer_size=3) Unpacker(read_size=5, max_buffer_size=3)
unpacker = Unpacker(read_size=3, max_buffer_size=3, use_list=1) unpacker = Unpacker(read_size=3, max_buffer_size=3, use_list=1)
unpacker.feed(b'fo') unpacker.feed(b"fo")
with raises(BufferFull): with raises(BufferFull):
unpacker.feed(b'ob') unpacker.feed(b"ob")
unpacker.feed(b'o') unpacker.feed(b"o")
assert ord('f') == next(unpacker) assert ord("f") == next(unpacker)
unpacker.feed(b'b') unpacker.feed(b"b")
assert ord('o') == next(unpacker) assert ord("o") == next(unpacker)
assert ord('o') == next(unpacker) assert ord("o") == next(unpacker)
assert ord('b') == next(unpacker) assert ord("b") == next(unpacker)
def test_readbytes(): def test_readbytes():
unpacker = Unpacker(read_size=3) unpacker = Unpacker(read_size=3)
unpacker.feed(b'foobar') unpacker.feed(b"foobar")
assert unpacker.unpack() == ord(b'f') assert unpacker.unpack() == ord(b"f")
assert unpacker.read_bytes(3) == b'oob' assert unpacker.read_bytes(3) == b"oob"
assert unpacker.unpack() == ord(b'a') assert unpacker.unpack() == ord(b"a")
assert unpacker.unpack() == ord(b'r') assert unpacker.unpack() == ord(b"r")
# Test buffer refill # Test buffer refill
unpacker = Unpacker(io.BytesIO(b'foobar'), read_size=3) unpacker = Unpacker(io.BytesIO(b"foobar"), read_size=3)
assert unpacker.unpack() == ord(b'f') assert unpacker.unpack() == ord(b"f")
assert unpacker.read_bytes(3) == b'oob' assert unpacker.read_bytes(3) == b"oob"
assert unpacker.unpack() == ord(b'a') assert unpacker.unpack() == ord(b"a")
assert unpacker.unpack() == ord(b'r') assert unpacker.unpack() == ord(b"r")
def test_issue124(): def test_issue124():
unpacker = Unpacker() unpacker = Unpacker()
unpacker.feed(b'\xa1?\xa1!') unpacker.feed(b"\xa1?\xa1!")
assert tuple(unpacker) == (b'?', b'!') assert tuple(unpacker) == (b"?", b"!")
assert tuple(unpacker) == () assert tuple(unpacker) == ()
unpacker.feed(b"\xa1?\xa1") unpacker.feed(b"\xa1?\xa1")
assert tuple(unpacker) == (b'?',) assert tuple(unpacker) == (b"?",)
assert tuple(unpacker) == () assert tuple(unpacker) == ()
unpacker.feed(b"!") unpacker.feed(b"!")
assert tuple(unpacker) == (b'!',) assert tuple(unpacker) == (b"!",)
assert tuple(unpacker) == () assert tuple(unpacker) == ()
def test_unpack_tell(): def test_unpack_tell():
stream = io.BytesIO() stream = io.BytesIO()
messages = [2**i-1 for i in range(65)] messages = [2 ** i - 1 for i in range(65)]
messages += [-(2**i) for i in range(1, 64)] messages += [-(2 ** i) for i in range(1, 64)]
messages += [b'hello', b'hello'*1000, list(range(20)), messages += [
{i: bytes(i)*i for i in range(10)}, b"hello",
{i: bytes(i)*i for i in range(32)}] b"hello" * 1000,
list(range(20)),
{i: bytes(i) * i for i in range(10)},
{i: bytes(i) * i for i in range(32)},
]
offsets = [] offsets = []
for m in messages: for m in messages:
pack(m, stream) pack(m, stream)

View file

@ -5,30 +5,32 @@ from msgpack import packb, unpackb, ExtType
def test_namedtuple(): def test_namedtuple():
T = namedtuple('T', "foo bar") T = namedtuple("T", "foo bar")
def default(o): def default(o):
if isinstance(o, T): if isinstance(o, T):
return dict(o._asdict()) return dict(o._asdict())
raise TypeError('Unsupported type %s' % (type(o),)) raise TypeError("Unsupported type %s" % (type(o),))
packed = packb(T(1, 42), strict_types=True, use_bin_type=True, default=default) packed = packb(T(1, 42), strict_types=True, use_bin_type=True, default=default)
unpacked = unpackb(packed, raw=False) unpacked = unpackb(packed, raw=False)
assert unpacked == {'foo': 1, 'bar': 42} assert unpacked == {"foo": 1, "bar": 42}
def test_tuple(): def test_tuple():
t = ('one', 2, b'three', (4, )) t = ("one", 2, b"three", (4,))
def default(o): def default(o):
if isinstance(o, tuple): if isinstance(o, tuple):
return { return {
'__type__': 'tuple', "__type__": "tuple",
'value': list(o), "value": list(o),
} }
raise TypeError('Unsupported type %s' % (type(o),)) raise TypeError("Unsupported type %s" % (type(o),))
def convert(o): def convert(o):
if o.get('__type__') == 'tuple': if o.get("__type__") == "tuple":
return tuple(o['value']) return tuple(o["value"])
return o return o
data = packb(t, strict_types=True, use_bin_type=True, default=default) data = packb(t, strict_types=True, use_bin_type=True, default=default)
@ -38,7 +40,7 @@ def test_tuple():
def test_tuple_ext(): def test_tuple_ext():
t = ('one', 2, b'three', (4, )) t = ("one", 2, b"three", (4,))
MSGPACK_EXT_TYPE_TUPLE = 0 MSGPACK_EXT_TYPE_TUPLE = 0
@ -46,7 +48,8 @@ def test_tuple_ext():
if isinstance(o, tuple): if isinstance(o, tuple):
# Convert to list and pack # Convert to list and pack
payload = packb( payload = packb(
list(o), strict_types=True, use_bin_type=True, default=default) list(o), strict_types=True, use_bin_type=True, default=default
)
return ExtType(MSGPACK_EXT_TYPE_TUPLE, payload) return ExtType(MSGPACK_EXT_TYPE_TUPLE, payload)
raise TypeError(repr(o)) raise TypeError(repr(o))
@ -54,7 +57,7 @@ def test_tuple_ext():
if code == MSGPACK_EXT_TYPE_TUPLE: if code == MSGPACK_EXT_TYPE_TUPLE:
# Unpack and convert to tuple # Unpack and convert to tuple
return tuple(unpackb(payload, raw=False, ext_hook=convert)) return tuple(unpackb(payload, raw=False, ext_hook=convert))
raise ValueError('Unknown Ext code {}'.format(code)) raise ValueError("Unknown Ext code {}".format(code))
data = packb(t, strict_types=True, use_bin_type=True, default=default) data = packb(t, strict_types=True, use_bin_type=True, default=default)
expected = unpackb(data, raw=False, ext_hook=convert) expected = unpackb(data, raw=False, ext_hook=convert)

View file

@ -4,16 +4,21 @@
from msgpack import packb, unpackb from msgpack import packb, unpackb
from collections import namedtuple from collections import namedtuple
class MyList(list): class MyList(list):
pass pass
class MyDict(dict): class MyDict(dict):
pass pass
class MyTuple(tuple): class MyTuple(tuple):
pass pass
MyNamedTuple = namedtuple('MyNamedTuple', 'x y')
MyNamedTuple = namedtuple("MyNamedTuple", "x y")
def test_types(): def test_types():
assert packb(MyDict()) == packb(dict()) assert packb(MyDict()) == packb(dict())

View file

@ -4,34 +4,34 @@ from msgpack import Timestamp
def test_timestamp(): def test_timestamp():
# timestamp32 # timestamp32
ts = Timestamp(2**32 - 1) ts = Timestamp(2 ** 32 - 1)
assert ts.to_bytes() == b"\xff\xff\xff\xff" assert ts.to_bytes() == b"\xff\xff\xff\xff"
packed = msgpack.packb(ts) packed = msgpack.packb(ts)
assert packed == b"\xd6\xff" + ts.to_bytes() assert packed == b"\xd6\xff" + ts.to_bytes()
unpacked = msgpack.unpackb(packed) unpacked = msgpack.unpackb(packed)
assert ts == unpacked assert ts == unpacked
assert ts.seconds == 2**32 - 1 and ts.nanoseconds == 0 assert ts.seconds == 2 ** 32 - 1 and ts.nanoseconds == 0
# timestamp64 # timestamp64
ts = Timestamp(2**34 - 1, 999999999) ts = Timestamp(2 ** 34 - 1, 999999999)
assert ts.to_bytes() == b"\xee\x6b\x27\xff\xff\xff\xff\xff" assert ts.to_bytes() == b"\xee\x6b\x27\xff\xff\xff\xff\xff"
packed = msgpack.packb(ts) packed = msgpack.packb(ts)
assert packed == b"\xd7\xff" + ts.to_bytes() assert packed == b"\xd7\xff" + ts.to_bytes()
unpacked = msgpack.unpackb(packed) unpacked = msgpack.unpackb(packed)
assert ts == unpacked assert ts == unpacked
assert ts.seconds == 2**34 - 1 and ts.nanoseconds == 999999999 assert ts.seconds == 2 ** 34 - 1 and ts.nanoseconds == 999999999
# timestamp96 # timestamp96
ts = Timestamp(2**63 - 1, 999999999) ts = Timestamp(2 ** 63 - 1, 999999999)
assert ts.to_bytes() == b"\x3b\x9a\xc9\xff\x7f\xff\xff\xff\xff\xff\xff\xff" assert ts.to_bytes() == b"\x3b\x9a\xc9\xff\x7f\xff\xff\xff\xff\xff\xff\xff"
packed = msgpack.packb(ts) packed = msgpack.packb(ts)
assert packed == b"\xc7\x0c\xff" + ts.to_bytes() assert packed == b"\xc7\x0c\xff" + ts.to_bytes()
unpacked = msgpack.unpackb(packed) unpacked = msgpack.unpackb(packed)
assert ts == unpacked assert ts == unpacked
assert ts.seconds == 2**63 - 1 and ts.nanoseconds == 999999999 assert ts.seconds == 2 ** 63 - 1 and ts.nanoseconds == 999999999
# negative fractional # negative fractional
ts = Timestamp(-2.3) #s: -3, ns: 700000000 ts = Timestamp(-2.3) # s: -3, ns: 700000000
assert ts.to_bytes() == b"\x29\xb9\x27\x00\xff\xff\xff\xff\xff\xff\xff\xfd" assert ts.to_bytes() == b"\x29\xb9\x27\x00\xff\xff\xff\xff\xff\xff\xff\xfd"
packed = msgpack.packb(ts) packed = msgpack.packb(ts)
assert packed == b"\xc7\x0c\xff" + ts.to_bytes() assert packed == b"\xc7\x0c\xff" + ts.to_bytes()

View file

@ -5,7 +5,7 @@ from pytest import raises, mark
def test_unpack_array_header_from_file(): def test_unpack_array_header_from_file():
f = BytesIO(packb([1,2,3,4])) f = BytesIO(packb([1, 2, 3, 4]))
unpacker = Unpacker(f) unpacker = Unpacker(f)
assert unpacker.read_array_header() == 4 assert unpacker.read_array_header() == 4
assert unpacker.unpack() == 1 assert unpacker.unpack() == 1
@ -16,8 +16,10 @@ def test_unpack_array_header_from_file():
unpacker.unpack() unpacker.unpack()
@mark.skipif("not hasattr(sys, 'getrefcount') == True", @mark.skipif(
reason='sys.getrefcount() is needed to pass this test') "not hasattr(sys, 'getrefcount') == True",
reason="sys.getrefcount() is needed to pass this test",
)
def test_unpacker_hook_refcnt(): def test_unpacker_hook_refcnt():
result = [] result = []
@ -43,12 +45,9 @@ def test_unpacker_hook_refcnt():
def test_unpacker_ext_hook(): def test_unpacker_ext_hook():
class MyUnpacker(Unpacker): class MyUnpacker(Unpacker):
def __init__(self): def __init__(self):
super(MyUnpacker, self).__init__( super(MyUnpacker, self).__init__(ext_hook=self._hook, raw=False)
ext_hook=self._hook, raw=False)
def _hook(self, code, data): def _hook(self, code, data):
if code == 1: if code == 1:
@ -57,15 +56,15 @@ def test_unpacker_ext_hook():
return ExtType(code, data) return ExtType(code, data)
unpacker = MyUnpacker() unpacker = MyUnpacker()
unpacker.feed(packb({'a': 1})) unpacker.feed(packb({"a": 1}))
assert unpacker.unpack() == {'a': 1} assert unpacker.unpack() == {"a": 1}
unpacker.feed(packb({'a': ExtType(1, b'123')})) unpacker.feed(packb({"a": ExtType(1, b"123")}))
assert unpacker.unpack() == {'a': 123} assert unpacker.unpack() == {"a": 123}
unpacker.feed(packb({'a': ExtType(2, b'321')})) unpacker.feed(packb({"a": ExtType(2, b"321")}))
assert unpacker.unpack() == {'a': ExtType(2, b'321')} assert unpacker.unpack() == {"a": ExtType(2, b"321")}
if __name__ == '__main__': if __name__ == "__main__":
test_unpack_array_header_from_file() test_unpack_array_header_from_file()
test_unpacker_hook_refcnt() test_unpacker_hook_refcnt()
test_unpacker_ext_hook() test_unpacker_ext_hook()