2013-10-15 16:59:43 +02:00
|
|
|
import array
|
2013-10-18 15:45:50 +02:00
|
|
|
import struct
|
2013-10-15 16:59:43 +02:00
|
|
|
import msgpack
|
|
|
|
|
2013-10-18 15:45:50 +02:00
|
|
|
def test_pack_extended_type():
|
|
|
|
def p(s):
|
|
|
|
packer = msgpack.Packer()
|
|
|
|
packer.pack_extended_type(0x42, s)
|
|
|
|
return packer._buffer.getvalue()
|
|
|
|
assert p('A') == '\xd4\x42A' # fixext 1
|
|
|
|
assert p('AB') == '\xd5\x42AB' # fixext 2
|
|
|
|
assert p('ABCD') == '\xd6\x42ABCD' # fixext 4
|
|
|
|
assert p('ABCDEFGH') == '\xd7\x42ABCDEFGH' # fixext 8
|
|
|
|
assert p('A'*16) == '\xd8\x42' + 'A'*16 # fixext 16
|
|
|
|
assert p('ABC') == '\xc7\x03\x42ABC' # ext 8
|
|
|
|
assert p('A'*0x0123) == '\xc8\x01\x23\x42' + 'A'*0x0123 # ext 16
|
|
|
|
assert p('A'*0x00012345) == '\xc9\x00\x01\x23\x45\x42' + 'A'*0x00012345 # ext 32
|
|
|
|
|
2013-10-15 16:59:43 +02:00
|
|
|
def test_extension_type():
|
|
|
|
class MyPacker(msgpack.Packer):
|
2013-10-18 15:03:58 +02:00
|
|
|
def handle_unknown_type(self, obj):
|
2013-10-15 16:59:43 +02:00
|
|
|
if isinstance(obj, array.array):
|
|
|
|
typecode = 123 # application specific typecode
|
|
|
|
data = obj.tostring()
|
2013-10-18 15:45:50 +02:00
|
|
|
self.pack_extended_type(typecode, data)
|
2013-10-18 15:03:58 +02:00
|
|
|
return True
|
2013-10-15 16:59:43 +02:00
|
|
|
|
|
|
|
class MyUnpacker(msgpack.Unpacker):
|
2013-10-18 15:03:58 +02:00
|
|
|
def read_extended_type(self, typecode, data):
|
2013-10-15 16:59:43 +02:00
|
|
|
assert typecode == 123
|
|
|
|
obj = array.array('d')
|
|
|
|
obj.fromstring(data)
|
|
|
|
return obj
|
|
|
|
|
|
|
|
obj = [42, 'hello', array.array('d', [1.1, 2.2, 3.3])]
|
2013-10-18 14:38:52 +02:00
|
|
|
packer = MyPacker()
|
|
|
|
unpacker = MyUnpacker(None)
|
|
|
|
s = packer.pack(obj)
|
|
|
|
unpacker.feed(s)
|
|
|
|
obj2 = unpacker.unpack_one()
|
2013-10-15 16:59:43 +02:00
|
|
|
assert obj == obj2
|