mirror of
https://github.com/msgpack/msgpack-python.git
synced 2026-02-12 04:40:07 +00:00
commit
df4f23779d
16 changed files with 437 additions and 142 deletions
|
|
@ -9,8 +9,8 @@ def test_unpack_buffer():
|
|||
from array import array
|
||||
buf = array('b')
|
||||
buf.fromstring(packb(('foo', 'bar')))
|
||||
obj = unpackb(buf)
|
||||
assert_equal((b'foo', b'bar'), obj)
|
||||
obj = unpackb(buf, use_list=1)
|
||||
assert_equal([b'foo', b'bar'], obj)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ from msgpack import packb, unpackb
|
|||
def check(length, obj):
|
||||
v = packb(obj)
|
||||
assert_equal(len(v), length, "%r length should be %r but get %r" % (obj, length, len(v)))
|
||||
assert_equal(unpackb(v), obj)
|
||||
assert_equal(unpackb(v, use_list=0), obj)
|
||||
|
||||
def test_1():
|
||||
for o in [None, True, False, 0, 1, (1 << 6), (1 << 7) - 1, -1,
|
||||
|
|
@ -71,7 +71,7 @@ def test_array32():
|
|||
|
||||
def match(obj, buf):
|
||||
assert_equal(packb(obj), buf)
|
||||
assert_equal(unpackb(buf), obj)
|
||||
assert_equal(unpackb(buf, use_list=0), obj)
|
||||
|
||||
def test_match():
|
||||
cases = [
|
||||
|
|
@ -99,7 +99,7 @@ def test_match():
|
|||
match(v, p)
|
||||
|
||||
def test_unicode():
|
||||
assert_equal(b'foobar', unpackb(packb('foobar')))
|
||||
assert_equal(b'foobar', unpackb(packb('foobar'), use_list=1))
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
|
|
|||
|
|
@ -5,8 +5,8 @@ from nose import main
|
|||
from nose.tools import *
|
||||
from msgpack import unpackb
|
||||
|
||||
def check(src, should):
|
||||
assert_equal(unpackb(src), should)
|
||||
def check(src, should, use_list=0):
|
||||
assert_equal(unpackb(src, use_list=use_list), should)
|
||||
|
||||
def testSimpleValue():
|
||||
check(b"\x93\xc0\xc2\xc3",
|
||||
|
|
|
|||
|
|
@ -18,25 +18,35 @@ def _encode_complex(obj):
|
|||
|
||||
def test_encode_hook():
|
||||
packed = packb([3, 1+2j], default=_encode_complex)
|
||||
unpacked = unpackb(packed)
|
||||
unpacked = unpackb(packed, use_list=1)
|
||||
eq_(unpacked[1], {b'__complex__': True, b'real': 1, b'imag': 2})
|
||||
|
||||
def test_decode_hook():
|
||||
packed = packb([3, {b'__complex__': True, b'real': 1, b'imag': 2}])
|
||||
unpacked = unpackb(packed, object_hook=_decode_complex)
|
||||
unpacked = unpackb(packed, object_hook=_decode_complex, use_list=1)
|
||||
eq_(unpacked[1], 1+2j)
|
||||
|
||||
def test_decode_pairs_hook():
|
||||
packed = packb([3, {1: 2, 3: 4}])
|
||||
prod_sum = 1 * 2 + 3 * 4
|
||||
unpacked = unpackb(packed, object_pairs_hook=lambda l: sum(k * v for k, v in l), use_list=1)
|
||||
eq_(unpacked[1], prod_sum)
|
||||
|
||||
@raises(ValueError)
|
||||
def test_only_one_obj_hook():
|
||||
unpackb(b'', object_hook=lambda x: x, object_pairs_hook=lambda x: x)
|
||||
|
||||
@raises(ValueError)
|
||||
def test_bad_hook():
|
||||
packed = packb([3, 1+2j], default=lambda o: o)
|
||||
unpacked = unpackb(packed)
|
||||
unpacked = unpackb(packed, use_list=1)
|
||||
|
||||
def _arr_to_str(arr):
|
||||
return ''.join(str(c) for c in arr)
|
||||
|
||||
def test_array_hook():
|
||||
packed = packb([1,2,3])
|
||||
unpacked = unpackb(packed, list_hook=_arr_to_str)
|
||||
unpacked = unpackb(packed, list_hook=_arr_to_str, use_list=1)
|
||||
eq_(unpacked, '123')
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
|||
|
|
@ -11,8 +11,8 @@ from msgpack import packb, unpackb, Unpacker, Packer
|
|||
|
||||
from io import BytesIO
|
||||
|
||||
def check(data):
|
||||
re = unpackb(packb(data))
|
||||
def check(data, use_list=False):
|
||||
re = unpackb(packb(data), use_list=use_list)
|
||||
assert_equal(re, data)
|
||||
|
||||
def testPack():
|
||||
|
|
@ -31,14 +31,14 @@ def testPack():
|
|||
|
||||
def testPackUnicode():
|
||||
test_data = [
|
||||
six.u(""), six.u("abcd"), (six.u("defgh"),), six.u("Русский текст"),
|
||||
six.u(""), six.u("abcd"), [six.u("defgh")], six.u("Русский текст"),
|
||||
]
|
||||
for td in test_data:
|
||||
re = unpackb(packb(td, encoding='utf-8'), encoding='utf-8')
|
||||
re = unpackb(packb(td, encoding='utf-8'), use_list=1, encoding='utf-8')
|
||||
assert_equal(re, td)
|
||||
packer = Packer(encoding='utf-8')
|
||||
data = packer.pack(td)
|
||||
re = Unpacker(BytesIO(data), encoding='utf-8').unpack()
|
||||
re = Unpacker(BytesIO(data), encoding='utf-8', use_list=1).unpack()
|
||||
assert_equal(re, td)
|
||||
|
||||
def testPackUTF32():
|
||||
|
|
@ -46,11 +46,11 @@ def testPackUTF32():
|
|||
test_data = [
|
||||
six.u(""),
|
||||
six.u("abcd"),
|
||||
(six.u("defgh"),),
|
||||
[six.u("defgh")],
|
||||
six.u("Русский текст"),
|
||||
]
|
||||
for td in test_data:
|
||||
re = unpackb(packb(td, encoding='utf-32'), encoding='utf-32')
|
||||
re = unpackb(packb(td, encoding='utf-32'), use_list=1, encoding='utf-32')
|
||||
assert_equal(re, td)
|
||||
except LookupError:
|
||||
raise SkipTest
|
||||
|
|
@ -63,20 +63,19 @@ def testPackBytes():
|
|||
check(td)
|
||||
|
||||
def testIgnoreUnicodeErrors():
|
||||
re = unpackb(packb(b'abc\xeddef'),
|
||||
encoding='utf-8', unicode_errors='ignore')
|
||||
re = unpackb(packb(b'abc\xeddef'), encoding='utf-8', unicode_errors='ignore', use_list=1)
|
||||
assert_equal(re, "abcdef")
|
||||
|
||||
@raises(UnicodeDecodeError)
|
||||
def testStrictUnicodeUnpack():
|
||||
unpackb(packb(b'abc\xeddef'), encoding='utf-8')
|
||||
unpackb(packb(b'abc\xeddef'), encoding='utf-8', use_list=1)
|
||||
|
||||
@raises(UnicodeEncodeError)
|
||||
def testStrictUnicodePack():
|
||||
packb(six.u("abc\xeddef"), encoding='ascii', unicode_errors='strict')
|
||||
|
||||
def testIgnoreErrorsPack():
|
||||
re = unpackb(packb(six.u("abcФФФdef"), encoding='ascii', unicode_errors='ignore'), encoding='utf-8')
|
||||
re = unpackb(packb(six.u("abcФФФdef"), encoding='ascii', unicode_errors='ignore'), encoding='utf-8', use_list=1)
|
||||
assert_equal(re, six.u("abcdef"))
|
||||
|
||||
@raises(TypeError)
|
||||
|
|
@ -84,12 +83,66 @@ def testNoEncoding():
|
|||
packb(six.u("abc"), encoding=None)
|
||||
|
||||
def testDecodeBinary():
|
||||
re = unpackb(packb("abc"), encoding=None)
|
||||
re = unpackb(packb("abc"), encoding=None, use_list=1)
|
||||
assert_equal(re, b"abc")
|
||||
|
||||
def testPackFloat():
|
||||
assert_equal(packb(1.0, use_single_float=True), b'\xca' + struct.pack('>f', 1.0))
|
||||
assert_equal(packb(1.0, use_single_float=False), b'\xcb' + struct.pack('>d', 1.0))
|
||||
|
||||
def testArraySize(sizes=[0, 5, 50, 1000]):
|
||||
bio = six.BytesIO()
|
||||
packer = Packer()
|
||||
for size in sizes:
|
||||
bio.write(packer.pack_array_header(size))
|
||||
for i in range(size):
|
||||
bio.write(packer.pack(i))
|
||||
|
||||
bio.seek(0)
|
||||
unpacker = Unpacker(bio, use_list=1)
|
||||
for size in sizes:
|
||||
assert unpacker.unpack() == list(range(size))
|
||||
|
||||
def testMapSize(sizes=[0, 5, 50, 1000]):
|
||||
bio = six.BytesIO()
|
||||
packer = Packer()
|
||||
for size in sizes:
|
||||
bio.write(packer.pack_map_header(size))
|
||||
for i in range(size):
|
||||
bio.write(packer.pack(i)) # key
|
||||
bio.write(packer.pack(i * 2)) # value
|
||||
|
||||
bio.seek(0)
|
||||
unpacker = Unpacker(bio)
|
||||
for size in sizes:
|
||||
assert unpacker.unpack() == dict((i, i * 2) for i in range(size))
|
||||
|
||||
|
||||
|
||||
|
||||
class odict(dict):
|
||||
'''Reimplement OrderedDict to run test on Python 2.6'''
|
||||
def __init__(self, seq):
|
||||
self._seq = seq
|
||||
dict.__init__(self, seq)
|
||||
|
||||
def items(self):
|
||||
return self._seq[:]
|
||||
|
||||
def iteritems(self):
|
||||
return iter(self._seq)
|
||||
|
||||
def keys(self):
|
||||
return [x[0] for x in self._seq]
|
||||
|
||||
def test_odict():
|
||||
seq = [(b'one', 1), (b'two', 2), (b'three', 3), (b'four', 4)]
|
||||
od = odict(seq)
|
||||
assert_equal(unpackb(packb(od), use_list=1), dict(seq))
|
||||
def pair_hook(seq):
|
||||
return seq
|
||||
assert_equal(unpackb(packb(od), object_pairs_hook=pair_hook, use_list=1), seq)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
|
|
|||
66
test/test_read_size.py
Normal file
66
test/test_read_size.py
Normal file
|
|
@ -0,0 +1,66 @@
|
|||
"""Test Unpacker's read_array_header and read_map_header methods"""
|
||||
from msgpack import packb, Unpacker
|
||||
UnexpectedTypeException = ValueError
|
||||
|
||||
def test_read_array_header():
|
||||
unpacker = Unpacker()
|
||||
unpacker.feed(packb(['a', 'b', 'c']))
|
||||
assert unpacker.read_array_header() == 3
|
||||
assert unpacker.unpack() == b'a'
|
||||
assert unpacker.unpack() == b'b'
|
||||
assert unpacker.unpack() == b'c'
|
||||
try:
|
||||
unpacker.unpack()
|
||||
assert 0, 'should raise exception'
|
||||
except StopIteration:
|
||||
assert 1, 'okay'
|
||||
|
||||
|
||||
def test_read_map_header():
|
||||
unpacker = Unpacker()
|
||||
unpacker.feed(packb({'a': 'A'}))
|
||||
assert unpacker.read_map_header() == 1
|
||||
assert unpacker.unpack() == B'a'
|
||||
assert unpacker.unpack() == B'A'
|
||||
try:
|
||||
unpacker.unpack()
|
||||
assert 0, 'should raise exception'
|
||||
except StopIteration:
|
||||
assert 1, 'okay'
|
||||
|
||||
def test_incorrect_type_array():
|
||||
unpacker = Unpacker()
|
||||
unpacker.feed(packb(1))
|
||||
try:
|
||||
unpacker.read_array_header()
|
||||
assert 0, 'should raise exception'
|
||||
except UnexpectedTypeException:
|
||||
assert 1, 'okay'
|
||||
|
||||
def test_incorrect_type_map():
|
||||
unpacker = Unpacker()
|
||||
unpacker.feed(packb(1))
|
||||
try:
|
||||
unpacker.read_map_header()
|
||||
assert 0, 'should raise exception'
|
||||
except UnexpectedTypeException:
|
||||
assert 1, 'okay'
|
||||
|
||||
def test_correct_type_nested_array():
|
||||
unpacker = Unpacker()
|
||||
unpacker.feed(packb({'a': ['b', 'c', 'd']}))
|
||||
try:
|
||||
unpacker.read_array_header()
|
||||
assert 0, 'should raise exception'
|
||||
except UnexpectedTypeException:
|
||||
assert 1, 'okay'
|
||||
|
||||
def test_incorrect_type_nested_map():
|
||||
unpacker = Unpacker()
|
||||
unpacker.feed(packb([{'a': 'b'}]))
|
||||
try:
|
||||
unpacker.read_map_header()
|
||||
assert 0, 'should raise exception'
|
||||
except UnexpectedTypeException:
|
||||
assert 1, 'okay'
|
||||
|
||||
|
|
@ -34,7 +34,7 @@ def test_exceeding_unpacker_read_size():
|
|||
f = io.BytesIO(dumpf.getvalue())
|
||||
dumpf.close()
|
||||
|
||||
unpacker = msgpack.Unpacker(f, read_size=read_size)
|
||||
unpacker = msgpack.Unpacker(f, read_size=read_size, use_list=1)
|
||||
|
||||
read_count = 0
|
||||
for idx, o in enumerate(unpacker):
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ from msgpack import Unpacker, BufferFull
|
|||
import nose
|
||||
|
||||
def test_foobar():
|
||||
unpacker = Unpacker(read_size=3)
|
||||
unpacker = Unpacker(read_size=3, use_list=1)
|
||||
unpacker.feed(b'foobar')
|
||||
assert unpacker.unpack() == ord(b'f')
|
||||
assert unpacker.unpack() == ord(b'o')
|
||||
|
|
@ -29,10 +29,24 @@ def test_foobar():
|
|||
k += 1
|
||||
assert k == len(b'foobar')
|
||||
|
||||
def test_foobar_skip():
|
||||
unpacker = Unpacker(read_size=3, use_list=1)
|
||||
unpacker.feed(b'foobar')
|
||||
assert unpacker.unpack() == ord(b'f')
|
||||
unpacker.skip()
|
||||
assert unpacker.unpack() == ord(b'o')
|
||||
unpacker.skip()
|
||||
assert unpacker.unpack() == ord(b'a')
|
||||
unpacker.skip()
|
||||
try:
|
||||
o = unpacker.unpack()
|
||||
assert 0, "should raise exception"
|
||||
except StopIteration:
|
||||
assert 1, "ok"
|
||||
|
||||
def test_maxbuffersize():
|
||||
nose.tools.assert_raises(ValueError, Unpacker, read_size=5, max_buffer_size=3)
|
||||
unpacker = Unpacker(read_size=3, max_buffer_size=3)
|
||||
unpacker = Unpacker(read_size=3, max_buffer_size=3, use_list=1)
|
||||
unpacker.feed(b'fo')
|
||||
nose.tools.assert_raises(BufferFull, unpacker.feed, b'ob')
|
||||
unpacker.feed(b'o')
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue