fallback: add actual rollback and add a testcase for it

Signed-off-by: Bas Westerbaan <bas@westerbaan.name>
This commit is contained in:
Bas Westerbaan 2013-01-29 02:58:26 +01:00
parent fb81f80d14
commit d2f549a470
2 changed files with 28 additions and 8 deletions

View file

@ -208,17 +208,13 @@ class Unpacker(object):
def __iter__(self): def __iter__(self):
return self return self
def next(self):
try:
ret = self._fb_unpack(EX_CONSTRUCT, None)
self._fb_consume()
return ret
except OutOfData:
raise StopIteration
def read_bytes(self, n): def read_bytes(self, n):
return self._fb_read(n) return self._fb_read(n)
def _fb_rollback(self):
self._fb_buf_i = 0
self._fb_buf_o = 0
def _fb_get_extradata(self): def _fb_get_extradata(self):
bufs = self._fb_buffers[self._fb_buf_i:] bufs = self._fb_buffers[self._fb_buf_i:]
if bufs: if bufs:
@ -244,6 +240,7 @@ class Unpacker(object):
self._fb_buf_o = 0 self._fb_buf_o = 0
self._fb_buf_i += 1 self._fb_buf_i += 1
if len(ret) != n: if len(ret) != n:
self._fb_rollback()
raise OutOfData raise OutOfData
if write_bytes is not None: if write_bytes is not None:
write_bytes(ret) write_bytes(ret)
@ -363,6 +360,14 @@ class Unpacker(object):
assert typ == TYPE_IMMEDIATE assert typ == TYPE_IMMEDIATE
return obj return obj
def next(self):
try:
ret = self._fb_unpack(EX_CONSTRUCT, None)
self._fb_consume()
return ret
except OutOfData:
raise StopIteration
def skip(self, write_bytes=None): def skip(self, write_bytes=None):
self._fb_unpack(EX_SKIP, write_bytes) self._fb_unpack(EX_SKIP, write_bytes)
self._fb_consume() self._fb_consume()

View file

@ -7,6 +7,21 @@ from msgpack.exceptions import OutOfData
from pytest import raises from pytest import raises
def test_partialdata():
unpacker = Unpacker()
unpacker.feed(b'\xa5')
with raises(StopIteration): next(iter(unpacker))
unpacker.feed(b'h')
with raises(StopIteration): next(iter(unpacker))
unpacker.feed(b'a')
with raises(StopIteration): next(iter(unpacker))
unpacker.feed(b'l')
with raises(StopIteration): next(iter(unpacker))
unpacker.feed(b'l')
with raises(StopIteration): next(iter(unpacker))
unpacker.feed(b'o')
assert next(iter(unpacker)) == 'hallo'
def test_foobar(): def test_foobar():
unpacker = Unpacker(read_size=3, use_list=1) unpacker = Unpacker(read_size=3, use_list=1)
unpacker.feed(b'foobar') unpacker.feed(b'foobar')