mirror of
https://github.com/msgpack/msgpack-python.git
synced 2025-10-19 20:03:16 +00:00

The following steps have been taken: 1. Black was updated to latest version. The code has been formatted with the new version. 2. The pyupgrade utility is installed. This helped to remove all the code that was needed to support Python < 3.7. Fix #541. Co-authored-by: Inada Naoki <songofacandy@gmail.com>
147 lines
4 KiB
Python
147 lines
4 KiB
Python
#!/usr/bin/env python
|
|
import io
|
|
from msgpack import Unpacker, BufferFull
|
|
from msgpack import pack, packb
|
|
from msgpack.exceptions import OutOfData
|
|
from pytest import raises
|
|
|
|
|
|
def test_partialdata():
|
|
unpacker = Unpacker()
|
|
unpacker.feed(b"\xa5")
|
|
with raises(StopIteration):
|
|
next(iter(unpacker))
|
|
unpacker.feed(b"h")
|
|
with raises(StopIteration):
|
|
next(iter(unpacker))
|
|
unpacker.feed(b"a")
|
|
with raises(StopIteration):
|
|
next(iter(unpacker))
|
|
unpacker.feed(b"l")
|
|
with raises(StopIteration):
|
|
next(iter(unpacker))
|
|
unpacker.feed(b"l")
|
|
with raises(StopIteration):
|
|
next(iter(unpacker))
|
|
unpacker.feed(b"o")
|
|
assert next(iter(unpacker)) == "hallo"
|
|
|
|
|
|
def test_foobar():
|
|
unpacker = Unpacker(read_size=3, use_list=1)
|
|
unpacker.feed(b"foobar")
|
|
assert unpacker.unpack() == ord(b"f")
|
|
assert unpacker.unpack() == ord(b"o")
|
|
assert unpacker.unpack() == ord(b"o")
|
|
assert unpacker.unpack() == ord(b"b")
|
|
assert unpacker.unpack() == ord(b"a")
|
|
assert unpacker.unpack() == ord(b"r")
|
|
with raises(OutOfData):
|
|
unpacker.unpack()
|
|
|
|
unpacker.feed(b"foo")
|
|
unpacker.feed(b"bar")
|
|
|
|
k = 0
|
|
for o, e in zip(unpacker, "foobarbaz"):
|
|
assert o == ord(e)
|
|
k += 1
|
|
assert k == len(b"foobar")
|
|
|
|
|
|
def test_foobar_skip():
|
|
unpacker = Unpacker(read_size=3, use_list=1)
|
|
unpacker.feed(b"foobar")
|
|
assert unpacker.unpack() == ord(b"f")
|
|
unpacker.skip()
|
|
assert unpacker.unpack() == ord(b"o")
|
|
unpacker.skip()
|
|
assert unpacker.unpack() == ord(b"a")
|
|
unpacker.skip()
|
|
with raises(OutOfData):
|
|
unpacker.unpack()
|
|
|
|
|
|
def test_maxbuffersize():
|
|
with raises(ValueError):
|
|
Unpacker(read_size=5, max_buffer_size=3)
|
|
unpacker = Unpacker(read_size=3, max_buffer_size=3, use_list=1)
|
|
unpacker.feed(b"fo")
|
|
with raises(BufferFull):
|
|
unpacker.feed(b"ob")
|
|
unpacker.feed(b"o")
|
|
assert ord("f") == next(unpacker)
|
|
unpacker.feed(b"b")
|
|
assert ord("o") == next(unpacker)
|
|
assert ord("o") == next(unpacker)
|
|
assert ord("b") == next(unpacker)
|
|
|
|
|
|
def test_maxbuffersize_file():
|
|
buff = io.BytesIO(packb(b"a" * 10) + packb([b"a" * 20] * 2))
|
|
unpacker = Unpacker(buff, read_size=1, max_buffer_size=19, max_bin_len=20)
|
|
assert unpacker.unpack() == b"a" * 10
|
|
# assert unpacker.unpack() == [b"a" * 20]*2
|
|
with raises(BufferFull):
|
|
print(unpacker.unpack())
|
|
|
|
|
|
def test_readbytes():
|
|
unpacker = Unpacker(read_size=3)
|
|
unpacker.feed(b"foobar")
|
|
assert unpacker.unpack() == ord(b"f")
|
|
assert unpacker.read_bytes(3) == b"oob"
|
|
assert unpacker.unpack() == ord(b"a")
|
|
assert unpacker.unpack() == ord(b"r")
|
|
|
|
# Test buffer refill
|
|
unpacker = Unpacker(io.BytesIO(b"foobar"), read_size=3)
|
|
assert unpacker.unpack() == ord(b"f")
|
|
assert unpacker.read_bytes(3) == b"oob"
|
|
assert unpacker.unpack() == ord(b"a")
|
|
assert unpacker.unpack() == ord(b"r")
|
|
|
|
# Issue 352
|
|
u = Unpacker()
|
|
u.feed(b"x")
|
|
assert bytes(u.read_bytes(1)) == b"x"
|
|
with raises(StopIteration):
|
|
next(u)
|
|
u.feed(b"\1")
|
|
assert next(u) == 1
|
|
|
|
|
|
def test_issue124():
|
|
unpacker = Unpacker()
|
|
unpacker.feed(b"\xa1?\xa1!")
|
|
assert tuple(unpacker) == ("?", "!")
|
|
assert tuple(unpacker) == ()
|
|
unpacker.feed(b"\xa1?\xa1")
|
|
assert tuple(unpacker) == ("?",)
|
|
assert tuple(unpacker) == ()
|
|
unpacker.feed(b"!")
|
|
assert tuple(unpacker) == ("!",)
|
|
assert tuple(unpacker) == ()
|
|
|
|
|
|
def test_unpack_tell():
|
|
stream = io.BytesIO()
|
|
messages = [2**i - 1 for i in range(65)]
|
|
messages += [-(2**i) for i in range(1, 64)]
|
|
messages += [
|
|
b"hello",
|
|
b"hello" * 1000,
|
|
list(range(20)),
|
|
{i: bytes(i) * i for i in range(10)},
|
|
{i: bytes(i) * i for i in range(32)},
|
|
]
|
|
offsets = []
|
|
for m in messages:
|
|
pack(m, stream)
|
|
offsets.append(stream.tell())
|
|
stream.seek(0)
|
|
unpacker = Unpacker(stream, strict_map_key=False)
|
|
for m, o in zip(messages, offsets):
|
|
m2 = next(unpacker)
|
|
assert m == m2
|
|
assert o == unpacker.tell()
|