| 
									
										
										
										
											2009-06-26 14:10:20 +09:00
										 |  |  | #!/usr/bin/env python | 
					
						
							|  |  |  | # coding: utf-8 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-10-19 01:49:03 +02:00
										 |  |  | import py | 
					
						
							| 
									
										
										
										
											2012-09-23 11:13:44 +10:00
										 |  |  | import six | 
					
						
							| 
									
										
										
										
											2012-07-20 02:02:37 +09:00
										 |  |  | from msgpack import Unpacker, BufferFull | 
					
						
							| 
									
										
										
										
											2013-10-19 01:49:03 +02:00
										 |  |  | from msgpack.exceptions import OutOfData, ExtraData, UnpackValueError | 
					
						
							| 
									
										
										
										
											2012-12-29 11:24:25 +09:00
										 |  |  | from pytest import raises | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2009-06-26 14:10:20 +09:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-01-29 02:58:26 +01:00
										 |  |  | def test_partialdata(): | 
					
						
							|  |  |  |     unpacker = Unpacker() | 
					
						
							|  |  |  |     unpacker.feed(b'\xa5') | 
					
						
							|  |  |  |     with raises(StopIteration): next(iter(unpacker)) | 
					
						
							|  |  |  |     unpacker.feed(b'h') | 
					
						
							|  |  |  |     with raises(StopIteration): next(iter(unpacker)) | 
					
						
							|  |  |  |     unpacker.feed(b'a') | 
					
						
							|  |  |  |     with raises(StopIteration): next(iter(unpacker)) | 
					
						
							|  |  |  |     unpacker.feed(b'l') | 
					
						
							|  |  |  |     with raises(StopIteration): next(iter(unpacker)) | 
					
						
							|  |  |  |     unpacker.feed(b'l') | 
					
						
							|  |  |  |     with raises(StopIteration): next(iter(unpacker)) | 
					
						
							|  |  |  |     unpacker.feed(b'o') | 
					
						
							| 
									
										
										
										
											2013-01-29 03:03:13 +01:00
										 |  |  |     assert next(iter(unpacker)) == b'hallo' | 
					
						
							| 
									
										
										
										
											2013-01-29 02:58:26 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2009-06-26 14:10:20 +09:00
										 |  |  | def test_foobar(): | 
					
						
							| 
									
										
										
										
											2012-09-24 02:12:55 +09:00
										 |  |  |     unpacker = Unpacker(read_size=3, use_list=1) | 
					
						
							| 
									
										
										
										
											2012-06-19 13:55:14 +09:00
										 |  |  |     unpacker.feed(b'foobar') | 
					
						
							|  |  |  |     assert unpacker.unpack() == ord(b'f') | 
					
						
							|  |  |  |     assert unpacker.unpack() == ord(b'o') | 
					
						
							|  |  |  |     assert unpacker.unpack() == ord(b'o') | 
					
						
							|  |  |  |     assert unpacker.unpack() == ord(b'b') | 
					
						
							|  |  |  |     assert unpacker.unpack() == ord(b'a') | 
					
						
							|  |  |  |     assert unpacker.unpack() == ord(b'r') | 
					
						
							| 
									
										
										
										
											2012-12-29 11:24:25 +09:00
										 |  |  |     with raises(OutOfData): | 
					
						
							|  |  |  |         unpacker.unpack() | 
					
						
							| 
									
										
										
										
											2012-06-19 14:20:56 +09:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-06-19 13:55:14 +09:00
										 |  |  |     unpacker.feed(b'foo') | 
					
						
							|  |  |  |     unpacker.feed(b'bar') | 
					
						
							| 
									
										
										
										
											2009-06-26 14:10:20 +09:00
										 |  |  | 
 | 
					
						
							|  |  |  |     k = 0 | 
					
						
							| 
									
										
										
										
											2012-06-19 14:20:56 +09:00
										 |  |  |     for o, e in zip(unpacker, 'foobarbaz'): | 
					
						
							|  |  |  |         assert o == ord(e) | 
					
						
							| 
									
										
										
										
											2009-06-26 14:10:20 +09:00
										 |  |  |         k += 1 | 
					
						
							| 
									
										
										
										
											2012-06-19 13:55:14 +09:00
										 |  |  |     assert k == len(b'foobar') | 
					
						
							| 
									
										
										
										
											2009-06-26 14:10:20 +09:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-09-23 10:09:51 +09:00
										 |  |  | def test_foobar_skip(): | 
					
						
							| 
									
										
										
										
											2012-09-24 02:12:55 +09:00
										 |  |  |     unpacker = Unpacker(read_size=3, use_list=1) | 
					
						
							| 
									
										
										
										
											2012-09-23 10:09:51 +09:00
										 |  |  |     unpacker.feed(b'foobar') | 
					
						
							|  |  |  |     assert unpacker.unpack() == ord(b'f') | 
					
						
							|  |  |  |     unpacker.skip() | 
					
						
							|  |  |  |     assert unpacker.unpack() == ord(b'o') | 
					
						
							|  |  |  |     unpacker.skip() | 
					
						
							|  |  |  |     assert unpacker.unpack() == ord(b'a') | 
					
						
							|  |  |  |     unpacker.skip() | 
					
						
							| 
									
										
										
										
											2012-12-29 11:24:25 +09:00
										 |  |  |     with raises(OutOfData): | 
					
						
							|  |  |  |         unpacker.unpack() | 
					
						
							| 
									
										
										
										
											2009-06-26 14:10:20 +09:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-07-20 02:02:37 +09:00
										 |  |  | def test_maxbuffersize(): | 
					
						
							| 
									
										
										
										
											2012-12-29 11:24:25 +09:00
										 |  |  |     with raises(ValueError): | 
					
						
							|  |  |  |         Unpacker(read_size=5, max_buffer_size=3) | 
					
						
							| 
									
										
										
										
											2012-09-24 02:12:55 +09:00
										 |  |  |     unpacker = Unpacker(read_size=3, max_buffer_size=3, use_list=1) | 
					
						
							| 
									
										
										
										
											2012-07-20 02:02:37 +09:00
										 |  |  |     unpacker.feed(b'fo') | 
					
						
							| 
									
										
										
										
											2012-12-29 11:24:25 +09:00
										 |  |  |     with raises(BufferFull): | 
					
						
							|  |  |  |         unpacker.feed(b'ob') | 
					
						
							| 
									
										
										
										
											2012-07-20 02:02:37 +09:00
										 |  |  |     unpacker.feed(b'o') | 
					
						
							|  |  |  |     assert ord('f') == next(unpacker) | 
					
						
							|  |  |  |     unpacker.feed(b'b') | 
					
						
							|  |  |  |     assert ord('o') == next(unpacker) | 
					
						
							|  |  |  |     assert ord('o') == next(unpacker) | 
					
						
							|  |  |  |     assert ord('b') == next(unpacker) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-09-23 11:13:44 +10:00
										 |  |  | def test_readbytes(): | 
					
						
							|  |  |  |     unpacker = Unpacker(read_size=3) | 
					
						
							|  |  |  |     unpacker.feed(b'foobar') | 
					
						
							|  |  |  |     assert unpacker.unpack() == ord(b'f') | 
					
						
							|  |  |  |     assert unpacker.read_bytes(3) == b'oob' | 
					
						
							|  |  |  |     assert unpacker.unpack() == ord(b'a') | 
					
						
							|  |  |  |     assert unpacker.unpack() == ord(b'r') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # Test buffer refill | 
					
						
							|  |  |  |     unpacker = Unpacker(six.BytesIO(b'foobar'), read_size=3) | 
					
						
							|  |  |  |     assert unpacker.unpack() == ord(b'f') | 
					
						
							|  |  |  |     assert unpacker.read_bytes(3) == b'oob' | 
					
						
							|  |  |  |     assert unpacker.unpack() == ord(b'a') | 
					
						
							|  |  |  |     assert unpacker.unpack() == ord(b'r') | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-10-19 01:49:03 +02:00
										 |  |  | def test_unpack_one(): | 
					
						
							|  |  |  |     unpacker = Unpacker() | 
					
						
							|  |  |  |     unpacker.feed('\xda\x00\x03abc') | 
					
						
							|  |  |  |     assert unpacker.unpack_one() == 'abc' | 
					
						
							|  |  |  |     # | 
					
						
							|  |  |  |     unpacker = Unpacker() | 
					
						
							|  |  |  |     unpacker.feed('\xda\x00\x03abcd') | 
					
						
							|  |  |  |     py.test.raises(ExtraData, "unpacker.unpack_one()") | 
					
						
							|  |  |  |     # | 
					
						
							|  |  |  |     unpacker = Unpacker() | 
					
						
							|  |  |  |     unpacker.feed('\xda\x00\x03ab') | 
					
						
							|  |  |  |     py.test.raises(UnpackValueError, "unpacker.unpack_one()") |