mirror of
				https://github.com/msgpack/msgpack-python.git
				synced 2025-11-04 03:20:56 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			139 lines
		
	
	
	
		
			3.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			139 lines
		
	
	
	
		
			3.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#!/usr/bin/env python
 | 
						|
# coding: utf-8
 | 
						|
import io
 | 
						|
from msgpack import Unpacker, BufferFull
 | 
						|
from msgpack import pack
 | 
						|
from msgpack.exceptions import OutOfData
 | 
						|
from pytest import raises
 | 
						|
 | 
						|
 | 
						|
def test_partialdata():
 | 
						|
    unpacker = Unpacker()
 | 
						|
    unpacker.feed(b"\xa5")
 | 
						|
    with raises(StopIteration):
 | 
						|
        next(iter(unpacker))
 | 
						|
    unpacker.feed(b"h")
 | 
						|
    with raises(StopIteration):
 | 
						|
        next(iter(unpacker))
 | 
						|
    unpacker.feed(b"a")
 | 
						|
    with raises(StopIteration):
 | 
						|
        next(iter(unpacker))
 | 
						|
    unpacker.feed(b"l")
 | 
						|
    with raises(StopIteration):
 | 
						|
        next(iter(unpacker))
 | 
						|
    unpacker.feed(b"l")
 | 
						|
    with raises(StopIteration):
 | 
						|
        next(iter(unpacker))
 | 
						|
    unpacker.feed(b"o")
 | 
						|
    assert next(iter(unpacker)) == "hallo"
 | 
						|
 | 
						|
 | 
						|
def test_foobar():
 | 
						|
    unpacker = Unpacker(read_size=3, use_list=1)
 | 
						|
    unpacker.feed(b"foobar")
 | 
						|
    assert unpacker.unpack() == ord(b"f")
 | 
						|
    assert unpacker.unpack() == ord(b"o")
 | 
						|
    assert unpacker.unpack() == ord(b"o")
 | 
						|
    assert unpacker.unpack() == ord(b"b")
 | 
						|
    assert unpacker.unpack() == ord(b"a")
 | 
						|
    assert unpacker.unpack() == ord(b"r")
 | 
						|
    with raises(OutOfData):
 | 
						|
        unpacker.unpack()
 | 
						|
 | 
						|
    unpacker.feed(b"foo")
 | 
						|
    unpacker.feed(b"bar")
 | 
						|
 | 
						|
    k = 0
 | 
						|
    for o, e in zip(unpacker, "foobarbaz"):
 | 
						|
        assert o == ord(e)
 | 
						|
        k += 1
 | 
						|
    assert k == len(b"foobar")
 | 
						|
 | 
						|
 | 
						|
def test_foobar_skip():
 | 
						|
    unpacker = Unpacker(read_size=3, use_list=1)
 | 
						|
    unpacker.feed(b"foobar")
 | 
						|
    assert unpacker.unpack() == ord(b"f")
 | 
						|
    unpacker.skip()
 | 
						|
    assert unpacker.unpack() == ord(b"o")
 | 
						|
    unpacker.skip()
 | 
						|
    assert unpacker.unpack() == ord(b"a")
 | 
						|
    unpacker.skip()
 | 
						|
    with raises(OutOfData):
 | 
						|
        unpacker.unpack()
 | 
						|
 | 
						|
 | 
						|
def test_maxbuffersize():
 | 
						|
    with raises(ValueError):
 | 
						|
        Unpacker(read_size=5, max_buffer_size=3)
 | 
						|
    unpacker = Unpacker(read_size=3, max_buffer_size=3, use_list=1)
 | 
						|
    unpacker.feed(b"fo")
 | 
						|
    with raises(BufferFull):
 | 
						|
        unpacker.feed(b"ob")
 | 
						|
    unpacker.feed(b"o")
 | 
						|
    assert ord("f") == next(unpacker)
 | 
						|
    unpacker.feed(b"b")
 | 
						|
    assert ord("o") == next(unpacker)
 | 
						|
    assert ord("o") == next(unpacker)
 | 
						|
    assert ord("b") == next(unpacker)
 | 
						|
 | 
						|
 | 
						|
def test_readbytes():
 | 
						|
    unpacker = Unpacker(read_size=3)
 | 
						|
    unpacker.feed(b"foobar")
 | 
						|
    assert unpacker.unpack() == ord(b"f")
 | 
						|
    assert unpacker.read_bytes(3) == b"oob"
 | 
						|
    assert unpacker.unpack() == ord(b"a")
 | 
						|
    assert unpacker.unpack() == ord(b"r")
 | 
						|
 | 
						|
    # Test buffer refill
 | 
						|
    unpacker = Unpacker(io.BytesIO(b"foobar"), read_size=3)
 | 
						|
    assert unpacker.unpack() == ord(b"f")
 | 
						|
    assert unpacker.read_bytes(3) == b"oob"
 | 
						|
    assert unpacker.unpack() == ord(b"a")
 | 
						|
    assert unpacker.unpack() == ord(b"r")
 | 
						|
 | 
						|
    # Issue 352
 | 
						|
    u = Unpacker()
 | 
						|
    u.feed(b"x")
 | 
						|
    assert bytes(u.read_bytes(1)) == b"x"
 | 
						|
    with raises(StopIteration):
 | 
						|
        next(u)
 | 
						|
    u.feed(b"\1")
 | 
						|
    assert next(u) == 1
 | 
						|
 | 
						|
 | 
						|
def test_issue124():
 | 
						|
    unpacker = Unpacker()
 | 
						|
    unpacker.feed(b"\xa1?\xa1!")
 | 
						|
    assert tuple(unpacker) == ("?", "!")
 | 
						|
    assert tuple(unpacker) == ()
 | 
						|
    unpacker.feed(b"\xa1?\xa1")
 | 
						|
    assert tuple(unpacker) == ("?",)
 | 
						|
    assert tuple(unpacker) == ()
 | 
						|
    unpacker.feed(b"!")
 | 
						|
    assert tuple(unpacker) == ("!",)
 | 
						|
    assert tuple(unpacker) == ()
 | 
						|
 | 
						|
 | 
						|
def test_unpack_tell():
 | 
						|
    stream = io.BytesIO()
 | 
						|
    messages = [2 ** i - 1 for i in range(65)]
 | 
						|
    messages += [-(2 ** i) for i in range(1, 64)]
 | 
						|
    messages += [
 | 
						|
        b"hello",
 | 
						|
        b"hello" * 1000,
 | 
						|
        list(range(20)),
 | 
						|
        {i: bytes(i) * i for i in range(10)},
 | 
						|
        {i: bytes(i) * i for i in range(32)},
 | 
						|
    ]
 | 
						|
    offsets = []
 | 
						|
    for m in messages:
 | 
						|
        pack(m, stream)
 | 
						|
        offsets.append(stream.tell())
 | 
						|
    stream.seek(0)
 | 
						|
    unpacker = Unpacker(stream, strict_map_key=False)
 | 
						|
    for m, o in zip(messages, offsets):
 | 
						|
        m2 = next(unpacker)
 | 
						|
        assert m == m2
 | 
						|
        assert o == unpacker.tell()
 |