msgpack-python/test/test_unpack.py

93 lines
2.4 KiB
Python
Raw Normal View History

2014-02-13 09:40:12 +09:00
from io import BytesIO
2014-02-13 09:55:17 +09:00
import sys
2014-03-24 15:31:06 +02:00
from msgpack import Unpacker, packb, OutOfData, ExtType
2014-02-13 09:55:17 +09:00
from pytest import raises, mark
2014-02-13 09:40:12 +09:00
try:
from itertools import izip as zip
except ImportError:
pass
2014-02-13 09:40:12 +09:00
def test_unpack_array_header_from_file():
2019-12-05 18:51:45 +09:00
f = BytesIO(packb([1, 2, 3, 4]))
2014-02-13 09:40:12 +09:00
unpacker = Unpacker(f)
assert unpacker.read_array_header() == 4
assert unpacker.unpack() == 1
assert unpacker.unpack() == 2
assert unpacker.unpack() == 3
assert unpacker.unpack() == 4
with raises(OutOfData):
unpacker.unpack()
2019-12-05 18:51:45 +09:00
@mark.skipif(
"not hasattr(sys, 'getrefcount') == True",
reason="sys.getrefcount() is needed to pass this test",
)
2014-02-13 09:55:17 +09:00
def test_unpacker_hook_refcnt():
result = []
def hook(x):
result.append(x)
return x
basecnt = sys.getrefcount(hook)
2014-02-13 09:57:51 +09:00
up = Unpacker(object_hook=hook, list_hook=hook)
2014-02-13 09:55:17 +09:00
assert sys.getrefcount(hook) >= basecnt + 2
up.feed(packb([{}]))
up.feed(packb([{}]))
assert up.unpack() == [{}]
assert up.unpack() == [{}]
2014-02-13 09:57:51 +09:00
assert result == [{}, [{}], {}, [{}]]
2014-02-13 09:55:17 +09:00
del up
assert sys.getrefcount(hook) == basecnt
2014-03-24 15:31:06 +02:00
def test_unpacker_ext_hook():
class MyUnpacker(Unpacker):
def __init__(self):
super().__init__(ext_hook=self._hook, raw=False)
2014-03-24 15:31:06 +02:00
def _hook(self, code, data):
if code == 1:
return int(data)
else:
return ExtType(code, data)
unpacker = MyUnpacker()
2019-12-05 18:51:45 +09:00
unpacker.feed(packb({"a": 1}))
assert unpacker.unpack() == {"a": 1}
unpacker.feed(packb({"a": ExtType(1, b"123")}))
assert unpacker.unpack() == {"a": 123}
unpacker.feed(packb({"a": ExtType(2, b"321")}))
assert unpacker.unpack() == {"a": ExtType(2, b"321")}
2014-03-24 15:31:06 +02:00
def test_unpacker_tell():
objects = 1, 2, "abc", "def", "ghi"
packed = b"\x01\x02\xa3abc\xa3def\xa3ghi"
positions = 1, 2, 6, 10, 14
unpacker = Unpacker(BytesIO(packed))
for obj, unp, pos in zip(objects, unpacker, positions):
assert obj == unp
assert pos == unpacker.tell()
def test_unpacker_tell_read_bytes():
objects = 1, "abc", "ghi"
packed = b"\x01\x02\xa3abc\xa3def\xa3ghi"
raw_data = b"\x02", b"\xa3def", b""
lenghts = 1, 4, 999
positions = 1, 6, 14
unpacker = Unpacker(BytesIO(packed))
for obj, unp, pos, n, raw in zip(objects, unpacker, positions, lenghts, raw_data):
assert obj == unp
assert pos == unpacker.tell()
assert unpacker.read_bytes(n) == raw