mirror of
				https://github.com/msgpack/msgpack-python.git
				synced 2025-10-26 07:04:10 +00:00 
			
		
		
		
	Implement streaming deserializer.
This commit is contained in:
		
							parent
							
								
									b944eefb96
								
							
						
					
					
						commit
						1781092bd8
					
				
					 2 changed files with 183 additions and 15 deletions
				
			
		|  | @ -6,13 +6,16 @@ cdef extern from "Python.h": | ||||||
|     ctypedef char* const_char_ptr "const char*" |     ctypedef char* const_char_ptr "const char*" | ||||||
|     ctypedef struct PyObject |     ctypedef struct PyObject | ||||||
|     cdef object PyString_FromStringAndSize(const_char_ptr b, Py_ssize_t len) |     cdef object PyString_FromStringAndSize(const_char_ptr b, Py_ssize_t len) | ||||||
|  |     char* PyString_AsString(object o) | ||||||
| 
 | 
 | ||||||
| cdef extern from "stdlib.h": | cdef extern from "stdlib.h": | ||||||
|     void* malloc(int) |     void* malloc(size_t) | ||||||
|  |     void* realloc(void*, size_t) | ||||||
|     void free(void*) |     void free(void*) | ||||||
| 
 | 
 | ||||||
| cdef extern from "string.h": | cdef extern from "string.h": | ||||||
|     int memcpy(char*dst, char*src, unsigned int size) |     void* memcpy(char* dst, char* src, size_t size) | ||||||
|  |     void* memmove(char* dst, char* src, size_t size) | ||||||
| 
 | 
 | ||||||
| cdef extern from "pack.h": | cdef extern from "pack.h": | ||||||
|     ctypedef int (*msgpack_packer_write)(void* data, const_char_ptr buf, unsigned int len) |     ctypedef int (*msgpack_packer_write)(void* data, const_char_ptr buf, unsigned int len) | ||||||
|  | @ -34,8 +37,6 @@ cdef extern from "pack.h": | ||||||
|     void msgpack_pack_raw_body(msgpack_packer* pk, char* body, size_t l) |     void msgpack_pack_raw_body(msgpack_packer* pk, char* body, size_t l) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| cdef int BUFF_SIZE=2*1024 |  | ||||||
| 
 |  | ||||||
| cdef class Packer: | cdef class Packer: | ||||||
|     """Packer that pack data into strm. |     """Packer that pack data into strm. | ||||||
| 
 | 
 | ||||||
|  | @ -48,10 +49,7 @@ cdef class Packer: | ||||||
|     cdef msgpack_packer pk |     cdef msgpack_packer pk | ||||||
|     cdef object strm |     cdef object strm | ||||||
| 
 | 
 | ||||||
|     def __init__(self, strm, int size=0): |     def __init__(self, strm, int size=4*1024): | ||||||
|         if size <= 0: |  | ||||||
|             size = BUFF_SIZE |  | ||||||
| 
 |  | ||||||
|         self.strm = strm |         self.strm = strm | ||||||
|         self.buff = <char*> malloc(size) |         self.buff = <char*> malloc(size) | ||||||
|         self.allocated = size |         self.allocated = size | ||||||
|  | @ -147,6 +145,8 @@ cdef class Packer: | ||||||
|         if flush: |         if flush: | ||||||
|             self.flush() |             self.flush() | ||||||
| 
 | 
 | ||||||
|  |     close = flush | ||||||
|  | 
 | ||||||
| cdef int _packer_write(Packer packer, const_char_ptr b, unsigned int l): | cdef int _packer_write(Packer packer, const_char_ptr b, unsigned int l): | ||||||
|     if packer.length + l > packer.allocated: |     if packer.length + l > packer.allocated: | ||||||
|         if packer.length > 0: |         if packer.length > 0: | ||||||
|  | @ -163,20 +163,28 @@ cdef int _packer_write(Packer packer, const_char_ptr b, unsigned int l): | ||||||
|     return 0 |     return 0 | ||||||
| 
 | 
 | ||||||
| def pack(object o, object stream): | def pack(object o, object stream): | ||||||
|  |     u"""pack o and write to stream).""" | ||||||
|     packer = Packer(stream) |     packer = Packer(stream) | ||||||
|     packer.pack(o) |     packer.pack(o) | ||||||
|     packer.flush() |     packer.flush() | ||||||
| 
 | 
 | ||||||
| def packs(object o): | def packb(object o): | ||||||
|  |     u"""pack o and return packed bytes.""" | ||||||
|     buf = StringIO() |     buf = StringIO() | ||||||
|     packer = Packer(buf) |     packer = Packer(buf) | ||||||
|     packer.pack(o) |     packer.pack(o) | ||||||
|     packer.flush() |     packer.flush() | ||||||
|     return buf.getvalue() |     return buf.getvalue() | ||||||
| 
 | 
 | ||||||
|  | packs = packb | ||||||
|  | 
 | ||||||
| cdef extern from "unpack.h": | cdef extern from "unpack.h": | ||||||
|     ctypedef struct template_context: |     ctypedef struct template_context: | ||||||
|         pass |         PyObject* obj | ||||||
|  |         size_t count | ||||||
|  |         unsigned int ct | ||||||
|  |         PyObject* key | ||||||
|  | 
 | ||||||
|     int template_execute(template_context* ctx, const_char_ptr data, |     int template_execute(template_context* ctx, const_char_ptr data, | ||||||
|             size_t len, size_t* off) |             size_t len, size_t* off) | ||||||
|     void template_init(template_context* ctx) |     void template_init(template_context* ctx) | ||||||
|  | @ -188,15 +196,139 @@ def unpacks(object packed_bytes): | ||||||
|     cdef const_char_ptr p = packed_bytes |     cdef const_char_ptr p = packed_bytes | ||||||
|     cdef template_context ctx |     cdef template_context ctx | ||||||
|     cdef size_t off = 0 |     cdef size_t off = 0 | ||||||
|  |     cdef int ret | ||||||
|     template_init(&ctx) |     template_init(&ctx) | ||||||
|     template_execute(&ctx, p, len(packed_bytes), &off) |     ret = template_execute(&ctx, p, len(packed_bytes), &off) | ||||||
|     return template_data(&ctx) |     if ret == 1: | ||||||
|  |         return template_data(&ctx) | ||||||
|  |     else: | ||||||
|  |         return None | ||||||
| 
 | 
 | ||||||
| def unpack(object stream): | def unpack(object stream): | ||||||
|     """unpack from stream.""" |     """unpack from stream.""" | ||||||
|     packed = stream.read() |     packed = stream.read() | ||||||
|     return unpacks(packed) |     return unpacks(packed) | ||||||
| 
 | 
 | ||||||
| cdef class Unpacker: | cdef class UnpackIterator(object): | ||||||
|     """Do nothing. This function is for symmetric to Packer""" |     cdef object unpacker | ||||||
|     unpack = staticmethod(unpacks) | 
 | ||||||
|  |     def __init__(self, unpacker): | ||||||
|  |         self.unpacker = unpacker | ||||||
|  | 
 | ||||||
|  |     def __next__(self): | ||||||
|  |         return self.unpacker.unpack() | ||||||
|  | 
 | ||||||
|  |     def __iter__(self): | ||||||
|  |         return self | ||||||
|  | 
 | ||||||
|  | cdef class Unpacker(object): | ||||||
|  |     """Unpacker(file_like=None, read_size=4096) | ||||||
|  | 
 | ||||||
|  |     Streaming unpacker. | ||||||
|  |     file_like must have read(n) method. | ||||||
|  |     read_size is used like file_like.read(read_size) | ||||||
|  | 
 | ||||||
|  |     If file_like is None, you can feed() bytes. feed() is useful | ||||||
|  |     for unpack from non-blocking stream. | ||||||
|  | 
 | ||||||
|  |     exsample 1: | ||||||
|  |         unpacker = Unpacker(afile) | ||||||
|  |         for o in unpacker: | ||||||
|  |             do_something(o) | ||||||
|  | 
 | ||||||
|  |     example 2: | ||||||
|  |         unpacker = Unpacker() | ||||||
|  |         while 1: | ||||||
|  |             buf = astream.read() | ||||||
|  |             unpacker.feed(buf) | ||||||
|  |             for o in unpacker: | ||||||
|  |                 do_something(o) | ||||||
|  |     """ | ||||||
|  | 
 | ||||||
|  |     cdef template_context ctx | ||||||
|  |     cdef char* buf | ||||||
|  |     cdef size_t buf_size, buf_head, buf_tail | ||||||
|  |     cdef object file_like | ||||||
|  |     cdef int read_size | ||||||
|  |     cdef object waiting_bytes | ||||||
|  | 
 | ||||||
|  |     def __init__(self, file_like=None, int read_size=4096): | ||||||
|  |         self.file_like = file_like | ||||||
|  |         self.read_size = read_size | ||||||
|  |         self.waiting_bytes = [] | ||||||
|  |         self.buf = <char*>malloc(read_size) | ||||||
|  |         self.buf_size = read_size | ||||||
|  |         self.buf_head = 0 | ||||||
|  |         self.buf_tail = 0 | ||||||
|  |         template_init(&self.ctx) | ||||||
|  | 
 | ||||||
|  |     def feed(self, next_bytes): | ||||||
|  |         if not isinstance(next_bytes, str): | ||||||
|  |            raise ValueError, "Argument must be bytes object" | ||||||
|  |         self.waiting_bytes.append(next_bytes) | ||||||
|  | 
 | ||||||
|  |     cdef append_buffer(self): | ||||||
|  |         cdef char* buf = self.buf | ||||||
|  |         cdef Py_ssize_t tail = self.buf_tail | ||||||
|  |         cdef Py_ssize_t l | ||||||
|  | 
 | ||||||
|  |         for b in self.waiting_bytes: | ||||||
|  |             l = len(b) | ||||||
|  |             memcpy(buf + tail, PyString_AsString(b), l) | ||||||
|  |             tail += l | ||||||
|  |         self.buf_tail = tail | ||||||
|  |         del self.waiting_bytes[:] | ||||||
|  | 
 | ||||||
|  |     # prepare self.buf | ||||||
|  |     cdef fill_buffer(self): | ||||||
|  |         cdef Py_ssize_t add_size | ||||||
|  | 
 | ||||||
|  |         if self.file_like is not None: | ||||||
|  |             self.waiting_bytes.append(self.file_like.read(self.read_size)) | ||||||
|  | 
 | ||||||
|  |         if not self.waiting_bytes: | ||||||
|  |             return | ||||||
|  | 
 | ||||||
|  |         add_size = 0 | ||||||
|  |         for b in self.waiting_bytes: | ||||||
|  |             add_size += len(b) | ||||||
|  | 
 | ||||||
|  |         cdef char* buf = self.buf | ||||||
|  |         cdef size_t head = self.buf_head | ||||||
|  |         cdef size_t tail = self.buf_tail | ||||||
|  |         cdef size_t size = self.buf_size | ||||||
|  | 
 | ||||||
|  |         if self.buf_tail + add_size <= self.buf_size: | ||||||
|  |             # do nothing. | ||||||
|  |             pass | ||||||
|  |         if self.buf_tail - self.buf_head + add_size < self.buf_size: | ||||||
|  |             # move to front. | ||||||
|  |             memmove(buf, buf + head, tail - head) | ||||||
|  |             tail -= head | ||||||
|  |             head = 0 | ||||||
|  |         else: | ||||||
|  |             # expand buffer | ||||||
|  |             size = tail + add_size | ||||||
|  |             buf = <char*>realloc(<void*>buf, size) | ||||||
|  | 
 | ||||||
|  |         self.buf = buf | ||||||
|  |         self.buf_head = head | ||||||
|  |         self.buf_tail = tail | ||||||
|  |         self.buf_size = size | ||||||
|  | 
 | ||||||
|  |         self.append_buffer() | ||||||
|  | 
 | ||||||
|  |     cpdef unpack(self): | ||||||
|  |         """unpack one object""" | ||||||
|  |         cdef int ret | ||||||
|  |         self.fill_buffer() | ||||||
|  |         ret = template_execute(&self.ctx, self.buf, self.buf_tail, &self.buf_head) | ||||||
|  |         if ret == 1: | ||||||
|  |             return template_data(&self.ctx) | ||||||
|  |         elif ret == 0: | ||||||
|  |             raise StopIteration, "No more unpack data." | ||||||
|  |         else: | ||||||
|  |             raise ValueError, "Unpack failed." | ||||||
|  | 
 | ||||||
|  |     def __iter__(self): | ||||||
|  |         return UnpackIterator(self) | ||||||
|  |  | ||||||
							
								
								
									
										36
									
								
								test_sequnpack.py
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										36
									
								
								test_sequnpack.py
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,36 @@ | ||||||
|  | #!/usr/bin/env python | ||||||
|  | # coding: utf-8 | ||||||
|  | 
 | ||||||
|  | from __future__ import unicode_literals, print_function | ||||||
|  | 
 | ||||||
|  | from msgpack import Unpacker | ||||||
|  | 
 | ||||||
|  | def test_foobar(): | ||||||
|  |     unpacker = Unpacker(read_size=3) | ||||||
|  |     unpacker.feed(b'foobar') | ||||||
|  |     assert unpacker.unpack() == ord('f') | ||||||
|  |     assert unpacker.unpack() == ord('o') | ||||||
|  |     assert unpacker.unpack() == ord('o') | ||||||
|  |     assert unpacker.unpack() == ord('b') | ||||||
|  |     assert unpacker.unpack() == ord('a') | ||||||
|  |     assert unpacker.unpack() == ord('r') | ||||||
|  |     try: | ||||||
|  |         o = unpacker.unpack() | ||||||
|  |         print("Oops!", o) | ||||||
|  |         assert 0 | ||||||
|  |     except StopIteration: | ||||||
|  |         assert 1 | ||||||
|  |     else: | ||||||
|  |         assert 0 | ||||||
|  |     unpacker.feed(b'foo') | ||||||
|  |     unpacker.feed(b'bar') | ||||||
|  | 
 | ||||||
|  |     k = 0 | ||||||
|  |     for o, e in zip(unpacker, b'foobarbaz'): | ||||||
|  |         assert o == ord(e) | ||||||
|  |         k += 1 | ||||||
|  |     assert k == len(b'foobar') | ||||||
|  | 
 | ||||||
|  | if __name__ == '__main__': | ||||||
|  |     test_foobar() | ||||||
|  | 
 | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue
	
	 Naoki INADA
						Naoki INADA