mirror of
https://github.com/msgpack/msgpack-python.git
synced 2025-10-19 20:03:16 +00:00
71 lines
1.8 KiB
Python
71 lines
1.8 KiB
Python
from io import BytesIO
|
|
import sys
|
|
from msgpack import Unpacker, packb, OutOfData, ExtType
|
|
from pytest import raises, mark
|
|
|
|
|
|
def test_unpack_array_header_from_file():
|
|
f = BytesIO(packb([1,2,3,4]))
|
|
unpacker = Unpacker(f)
|
|
assert unpacker.read_array_header() == 4
|
|
assert unpacker.unpack() == 1
|
|
assert unpacker.unpack() == 2
|
|
assert unpacker.unpack() == 3
|
|
assert unpacker.unpack() == 4
|
|
with raises(OutOfData):
|
|
unpacker.unpack()
|
|
|
|
|
|
@mark.skipif("not hasattr(sys, 'getrefcount') == True",
|
|
reason='sys.getrefcount() is needed to pass this test')
|
|
def test_unpacker_hook_refcnt():
|
|
result = []
|
|
|
|
def hook(x):
|
|
result.append(x)
|
|
return x
|
|
|
|
basecnt = sys.getrefcount(hook)
|
|
|
|
up = Unpacker(object_hook=hook, list_hook=hook)
|
|
|
|
assert sys.getrefcount(hook) >= basecnt + 2
|
|
|
|
up.feed(packb([{}]))
|
|
up.feed(packb([{}]))
|
|
assert up.unpack() == [{}]
|
|
assert up.unpack() == [{}]
|
|
assert result == [{}, [{}], {}, [{}]]
|
|
|
|
del up
|
|
|
|
assert sys.getrefcount(hook) == basecnt
|
|
|
|
|
|
def test_unpacker_ext_hook():
|
|
|
|
class MyUnpacker(Unpacker):
|
|
|
|
def __init__(self):
|
|
super(MyUnpacker, self).__init__(
|
|
ext_hook=self._hook, raw=False)
|
|
|
|
def _hook(self, code, data):
|
|
if code == 1:
|
|
return int(data)
|
|
else:
|
|
return ExtType(code, data)
|
|
|
|
unpacker = MyUnpacker()
|
|
unpacker.feed(packb({'a': 1}))
|
|
assert unpacker.unpack() == {'a': 1}
|
|
unpacker.feed(packb({'a': ExtType(1, b'123')}))
|
|
assert unpacker.unpack() == {'a': 123}
|
|
unpacker.feed(packb({'a': ExtType(2, b'321')}))
|
|
assert unpacker.unpack() == {'a': ExtType(2, b'321')}
|
|
|
|
|
|
if __name__ == '__main__':
|
|
test_unpack_array_header_from_file()
|
|
test_unpacker_hook_refcnt()
|
|
test_unpacker_ext_hook()
|