From 91a1f8d9e121c1b1379476aa61426dd8a3366987 Mon Sep 17 00:00:00 2001 From: frsyuki Date: Thu, 29 Apr 2010 22:15:03 +0900 Subject: [PATCH] cpp: new streaming deserialier API. --- cpp/msgpack/unpack.hpp | 149 ++++++++++++++++++++++++++++++---------- cpp/test/pack_unpack.cc | 26 +++++++ cpp/test/streaming.cc | 114 ++++++++++++++++++++++++++++-- 3 files changed, 247 insertions(+), 42 deletions(-) diff --git a/cpp/msgpack/unpack.hpp b/cpp/msgpack/unpack.hpp index d39b5df..dbe7777 100644 --- a/cpp/msgpack/unpack.hpp +++ b/cpp/msgpack/unpack.hpp @@ -37,6 +37,31 @@ struct unpack_error : public std::runtime_error { }; +class unpacked { +public: + unpacked() { } + + unpacked(object obj, std::auto_ptr z) : + m_obj(obj), m_zone(z) { } + + object& get() + { return m_obj; } + + const object& get() const + { return m_obj; } + + std::auto_ptr& zone() + { return m_zone; } + + const std::auto_ptr& zone() const + { return m_zone; } + +private: + object m_obj; + std::auto_ptr m_zone; +}; + + class unpacker : public msgpack_unpacker { public: unpacker(size_t init_buffer_size = MSGPACK_UNPACKER_DEFAULT_INITIAL_BUFFER_SIZE); @@ -53,39 +78,22 @@ public: /*! 3. specify the number of bytes actually copied */ void buffer_consumed(size_t size); - /*! 4. repeat execute() until it retunrs false */ - bool execute(); + /*! 4. repeat next() until it retunrs false */ + bool next(unpacked* result); - /*! 5.1. if execute() returns true, take out the parsed object */ - object data(); - - /*! 5.2. the object is valid until the zone is deleted */ - // Note that once release_zone() from unpacker, you must delete it - // otherwise the memrory will leak. - zone* release_zone(); - - /*! 5.2. this method is equivalence to `delete release_zone()` */ - void reset_zone(); - - /*! 5.3. after release_zone(), re-initialize unpacker */ - void reset(); - - /*! 6. check if the size of message doesn't exceed assumption. */ + /*! 5. check if the size of message doesn't exceed assumption. */ size_t message_size() const; - // Basic usage of the unpacker is as following: // // msgpack::unpacker pac; - // - // while( /* readable */ ) { + // while( /* input is readable */ ) { // // // 1. - // pac.reserve_buffer(1024); + // pac.reserve_buffer(32*1024); // // // 2. - // ssize_t bytes = - // read(the_source, pac.buffer(), pac.buffer_capacity()); + // size_t bytes = input.readsome(pac.buffer(), pac.buffer_capacity()); // // // error handling ... // @@ -93,25 +101,40 @@ public: // pac.buffer_consumed(bytes); // // // 4. - // while(pac.execute()) { - // // 5.1 - // object o = pac.data(); + // msgpack::unpacked result; + // while(pac.next(&result)) { + // // do some with the object with the zone. + // msgpack::object obj = result.get(); + // std::auto_ptr z = result.zone(); + // on_message(obj, z); // - // // 5.2 - // std::auto_ptr olife( pac.release_zone() ); + // //// boost::shared_ptr is also usable: + // // boost::shared_ptr life(z.release()); + // // on_message(result.get(), life); + // } // - // // boost::shared_ptr is also usable: - // // boost::shared_ptr olife( pac.release_zone() ); - // - // // 5.3 - // pac.reset(); - // - // // do some with the object with the old zone. - // do_something(o, olife); + // // 5. + // if(pac.message_size() > 10*1024*1024) { + // throw std::runtime_error("message is too large"); // } // } // + /*! for backward compatibility */ + bool execute(); + + /*! for backward compatibility */ + object data(); + + /*! for backward compatibility */ + zone* release_zone(); + + /*! for backward compatibility */ + void reset_zone(); + + /*! for backward compatibility */ + void reset(); + public: // These functions are usable when non-MessagePack message follows after // MessagePack message. @@ -137,6 +160,11 @@ private: }; +static bool unpack(unpacked* result, + const char* data, size_t len, size_t* offset = NULL); + + +// obsolete typedef enum { UNPACK_SUCCESS = 2, UNPACK_EXTRA_BYTES = 1, @@ -144,6 +172,7 @@ typedef enum { UNPACK_PARSE_ERROR = -1, } unpack_return; +// obsolete static unpack_return unpack(const char* data, size_t len, size_t* off, zone* z, object* result); @@ -187,6 +216,20 @@ inline void unpacker::buffer_consumed(size_t size) return msgpack_unpacker_buffer_consumed(this, size); } +inline bool unpacker::next(unpacked* result) +{ + int ret = msgpack_unpacker_execute(this); + if(ret < 0) { + throw unpack_error("parse error"); + } + + result->zone().reset( release_zone() ); + result->get() = data(); + reset(); + + return ret > 0; +} + inline bool unpacker::execute() { @@ -230,12 +273,12 @@ inline void unpacker::reset() msgpack_unpacker_reset(this); } + inline size_t unpacker::message_size() const { return msgpack_unpacker_message_size(this); } - inline size_t unpacker::parsed_size() const { return msgpack_unpacker_parsed_size(this); @@ -262,6 +305,38 @@ inline void unpacker::remove_nonparsed_buffer() } +inline bool unpack(unpacked* result, + const char* data, size_t len, size_t* offset) +{ + msgpack::object obj; + std::auto_ptr z(new zone()); + + unpack_return ret = (unpack_return)msgpack_unpack( + data, len, offset, z.get(), + reinterpret_cast(&obj)); + + switch(ret) { + case UNPACK_SUCCESS: + result->get() = obj; + result->zone() = z; + return false; + + case UNPACK_EXTRA_BYTES: + result->get() = obj; + result->zone() = z; + return true; + + case UNPACK_CONTINUE: + throw unpack_error("insufficient bytes"); + + case UNPACK_PARSE_ERROR: + default: + throw unpack_error("parse error"); + } +} + + +// obsolete inline unpack_return unpack(const char* data, size_t len, size_t* off, zone* z, object* result) { diff --git a/cpp/test/pack_unpack.cc b/cpp/test/pack_unpack.cc index ecf52c5..ca9b7d5 100644 --- a/cpp/test/pack_unpack.cc +++ b/cpp/test/pack_unpack.cc @@ -77,6 +77,32 @@ TEST(unpack, sequence) msgpack::pack(sbuf, 2); msgpack::pack(sbuf, 3); + bool cont; + size_t offset = 0; + + msgpack::unpacked msg; + + cont = msgpack::unpack(&msg, sbuf.data(), sbuf.size(), &offset); + EXPECT_TRUE(cont); + EXPECT_EQ(1, msg.get().as()); + + cont = msgpack::unpack(&msg, sbuf.data(), sbuf.size(), &offset); + EXPECT_TRUE(cont); + EXPECT_EQ(2, msg.get().as()); + + cont = msgpack::unpack(&msg, sbuf.data(), sbuf.size(), &offset); + EXPECT_FALSE(cont); + EXPECT_EQ(3, msg.get().as()); +} + + +TEST(unpack, sequence_compat) +{ + msgpack::sbuffer sbuf; + msgpack::pack(sbuf, 1); + msgpack::pack(sbuf, 2); + msgpack::pack(sbuf, 3); + size_t offset = 0; msgpack::zone z; diff --git a/cpp/test/streaming.cc b/cpp/test/streaming.cc index 2d03976..c01b8be 100644 --- a/cpp/test/streaming.cc +++ b/cpp/test/streaming.cc @@ -2,6 +2,7 @@ #include #include + TEST(streaming, basic) { std::ostringstream stream; @@ -15,6 +16,108 @@ TEST(streaming, basic) msgpack::unpacker pac; + int count = 0; + while(count < 3) { + pac.reserve_buffer(32*1024); + + size_t len = input.readsome(pac.buffer(), pac.buffer_capacity()); + pac.buffer_consumed(len); + + msgpack::unpacked result; + while(pac.next(&result)) { + msgpack::object obj = result.get(); + switch(count++) { + case 0: + EXPECT_EQ(1, obj.as()); + break; + case 1: + EXPECT_EQ(2, obj.as()); + break; + case 2: + EXPECT_EQ(3, obj.as()); + return; + } + } + } +} + + +class event_handler { +public: + event_handler(std::istream& input) : input(input) { } + ~event_handler() { } + + void on_read() + { + while(true) { + pac.reserve_buffer(32*1024); + + size_t len = input.readsome(pac.buffer(), pac.buffer_capacity()); + + if(len == 0) { + return; + } + + pac.buffer_consumed(len); + + msgpack::unpacked result; + while(pac.next(&result)) { + on_message(result.get(), result.zone()); + } + + if(pac.message_size() > 10*1024*1024) { + throw std::runtime_error("message is too large"); + } + } + } + + void on_message(msgpack::object obj, std::auto_ptr z) + { + EXPECT_EQ(expect, obj.as()); + } + + int expect; + +private: + std::istream& input; + msgpack::unpacker pac; +}; + +TEST(streaming, event) +{ + std::stringstream stream; + msgpack::packer pk(&stream); + + event_handler handler(stream); + + pk.pack(1); + handler.expect = 1; + handler.on_read(); + + pk.pack(2); + handler.expect = 2; + handler.on_read(); + + pk.pack(3); + handler.expect = 3; + handler.on_read(); +} + + +// backward compatibility +TEST(streaming, basic_compat) +{ + std::ostringstream stream; + msgpack::packer pk(&stream); + + pk.pack(1); + pk.pack(2); + pk.pack(3); + + std::istringstream input(stream.str()); + + msgpack::unpacker pac; + int count = 0; while(count < 3) { pac.reserve_buffer(32*1024); @@ -44,10 +147,11 @@ TEST(streaming, basic) } -class event_handler { +// backward compatibility +class event_handler_compat { public: - event_handler(std::istream& input) : input(input) { } - ~event_handler() { } + event_handler_compat(std::istream& input) : input(input) { } + ~event_handler_compat() { } void on_read() { @@ -87,12 +191,12 @@ private: msgpack::unpacker pac; }; -TEST(streaming, event) +TEST(streaming, event_compat) { std::stringstream stream; msgpack::packer pk(&stream); - event_handler handler(stream); + event_handler_compat handler(stream); pk.pack(1); handler.expect = 1;