Boost logo

Boost :

From: Kim Barrett (kab_at_[hidden])
Date: 2006-03-20 12:11:48


At 12:31 AM +0000 3/18/06, Marcin Kalicinski wrote:
>I'm using serialization library to create UDP packets. My program is
>sending/receiving hundreds of these each second. I'm currently creating
>iarchive/oarchive for every packet sent/received. This is quite a
>performance bottleneck. Is it possible to reuse single binary archive object
>for all UDP packets? I.e. somehow reinitialize underlying stream with new
>data and read/write to the same archive again?

At 6:35 PM -0800 3/17/06, Robert Ramey wrote:
>I would guess its possible but I don't know for sure that's were any
>performance bottleneck is.

This was an issue I brought up last July. At the time I mentioned
that I'd done some measurements and modified the serialization
library by adding a reset member function to basic_iarchive_impl
and basic_oarchive_impl so that they could be reused, and got a
significant performance improvement by doing so. Unfortunately I
didn't follow up more fully at the time (I was doing the relevant
tests just before going on vacation) and haven't had time to do
anything about it since then. Below is a description of the test
and the relevant numbers. I'm presently attempting to ensure that
my project's schedule has some time allotted to my working on this,
possibly in April (I hope), with the intent of submitting a patch
back to you.

I've cleaned up the test and re-ran it with my current
configuration, with the results provided below. I'm also appending
the test code to the end of this message, in case you want to have
a look, or run it against a different configuration.

First, here is the timing breakdown.

Construct stream and archive on each iteration. All times are in
microseconds (averaged over 1 million loop executions).

   19.9 mutex, condition, and process switching overhead
    9.3 stream construct
   23.0 archive construct
   14.1 serialize data
    4.4 polymorphic archive overhead
  -----
   70.7

This breakdown was produced by running different variants of the
test which remove or modify its behavior according to command-line
arguments. The variant checks are inline in the main loop rather
than having different loop variants in order to prevent the
compiler from optimizing away dead code in variants that don't do
much.

Tests were run on 2GHz Pentium laptop, SuSE 9.3 (gcc 3.3.5) against
Boost 1.33.1 with the addition of the review version (0.9) of
Boost.Shmem from the vault.

The streams used for archive input and output are
stream<array_sink> and stream<array_source> from the
boost.iostreams library. The associated buffers are in shared
memory.

The archives are constructed with no_header and no_codecvt options.
The no_codecvt option has a very large impact (roughly 50usec). I
think I looked into that once and found a data structure that was
being initialized at archive construction time. If so, for an
application that can't use that option archive reuse becomes still
more desirable.

So stream construction/destruction is accounting for 13%, and
archive construction/destruction is accounting for 1/3 (!) of the
time in this test. As you can see, for small transfers like this
the construction / destruction time for the archive and stream is
pretty significant. Some additional hacked up tests and discussion
on the mailing list back in July suggested that the dominating
factor was probably memory allocation by various standard container
members of the archives, though I haven't actually tried to track
it down to that level.

I haven't run my tests against your recent changes to use
stream_buffers instead of streams internally in the the archive
implementation. I expect that will help some too (though one might
hope that boost.iostreams array_sink and array_source are pretty
efficient already). But even if your changes made the serialization
time drop to zero, it would still be a smaller improvement for this
situation than being able to reuse the archives. (This is not at
all intended as a criticism of those changes, merely noting that
this is not the biggest factor for this specific use case.)

Back in July I hacked in a patch to the serialization library to
add a reset operation (described in email back then, though the
details are probably not that important). Using that and resetting
the streams (via seekp/seekg) dropped the construction time down
into the noise. I haven't repeated that, since I'm planning to do
something better than a quick hack soon anyway, but if one
substitutes say 1usec for each for a reset rather than construction
(which is probably pessimistic), one gets:

Reset and reuse stream and archive

   19.9 mutex, condition, and process switching overhead
    1.0 stream reset
    1.0 archive reset
   14.1 serialize data
    4.4 polymorphic archive overhead
  -----
   40.4

And at this point the recent stream_buffer improvements start
looking highly productive.

Test code below:

==============================================================================

// Copyright 2006, Kim Barrett.
// Distributed under the Boost Software License, Version 1.0
// (see http://www.boost.org/LICENSE_1_1.txt)
//
// assumes boost installed in /opt/irobot, this file called new-shm-test.cpp
//
// g++ -Wall -fPIC -pthread -D_REENTRANT -ftemplate-depth-256 -O3
// -I/opt/irobot/include -L/opt/irobot/lib -o new-shm-test
// new-shm-test.cpp -lboost_serialization -lpthread
//
// in one shell, run this program as server, specifying arguments zero or one
// of the following arguments:
// --no_polymorphic : don't use polymorphic archive
// --no_transport : don't transport data, only construct archive objects
// --no_archive : don't construct archives, only streams
// --no_stream : don't construct streams
//
// in another shell, run this program as client, by specifying arguments:
// --iterations <count>
//
// Tests were run on kab's Dell Inspirion 8500 laptop, running SuSE9.3
// (gcc 3.3.5) against Boost 1.33.1 with the addition of the review version
// (0.9) of Boost.Shmem from the boost vault.
//
// Data, iterations = 1,000,000: run 5, discard high&low, average remaining
// poly, transport: 70.645, 70.837, 70.678, 71.193, 71.363 :: 70.903
// non-poly, transport: 66.357, 66.535, 61.596, 66.737, 66.995 :: 66.523
// poly, no-transport: 52.204, 52.460, 54.251, 52.480, 52.309 :: 52.416
// non-poly, no-trans: 52.609, 52.286, 51.957, 52.123, 52.226 :: 52.212
// no-archive: 29.049, 29.230, 30.194, 29.205, 29.298 :: 29.244
// no-stream: 20.374, 19.936, 19.684, 20.216, 19.665 :: 19.945
//
// 4.4 polymorphic transport overhead
// 14.1 non-polymorphic transport
// 23.0 archive construction
// 9.3 stream construction
// 19.9 mutex, condition, process switching &etc
// -----
// 70.7 total (slightly less than actual, due to rounding)

#define NEW_SHM_TEST_DEBUG_PRINT 0
//#define NEW_SHM_TEST_DEBUG_PRINT 1

#if NEW_SHM_TEST_DEBUG_PRINT
#include <iostream>
#endif

#include <string>
#include <boost/lexical_cast.hpp>
#include <boost/iostreams/stream.hpp>
#include <boost/shmem/named_shared_object.hpp>
#include <boost/shmem/sync/shared_mutex.hpp>
#include <boost/shmem/sync/shared_condition.hpp>
#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/archive/polymorphic_binary_iarchive.hpp>
#include <boost/archive/polymorphic_binary_oarchive.hpp>
#include <boost/serialization/access.hpp>
#include <boost/serialization/nvp.hpp>

#undef NDEBUG
#include <cassert>

namespace io = boost::iostreams;
namespace shmem = boost::shmem;
namespace archive = boost::archive;
namespace serial = boost::serialization;

class test_struct {
     friend class boost::serialization::access;

     double d;
     float f;
     signed long sl;
     unsigned long ul;
     signed short ss;
     unsigned short us;

     template<typename Archive>
     void serialize(Archive& ar, unsigned int /* version */) {
        ar & BOOST_SERIALIZATION_NVP(d);
        ar & BOOST_SERIALIZATION_NVP(f);
        ar & BOOST_SERIALIZATION_NVP(sl);
        ar & BOOST_SERIALIZATION_NVP(ul);
        ar & BOOST_SERIALIZATION_NVP(ss);
        ar & BOOST_SERIALIZATION_NVP(us);
     }

public:
     test_struct() { }
     test_struct(double d_,
                float f_,
                signed long sl_,
                unsigned long ul_,
                signed short ss_,
                unsigned short us_)
        : d(d_), f(f_), sl(sl_), ul(ul_), ss(ss_), us(us_) {
     }

     bool operator==(const test_struct& rhs) {
        return d == rhs.d
            && f == rhs.f
            && sl == rhs.sl
            && ul == rhs.ul
            && ss == rhs.ss
            && us == rhs.us;
     }

};

test_struct expected(2.0, 2.0, -10, 10, -5, 5);

// arguments:
// --iterations count
// --no_polymorphic
// --no_transport
// --no_archive
// --no_stream

const int archive_flags = archive::no_header | archive::no_codecvt;

struct parsed_args {
     const char* segment_name;
     size_t segment_size;
     size_t iterations;
     size_t buffer_size;
     bool transport;
     bool polymorphic;
     bool make_archive;
     bool make_stream;
     bool server;
};

const parsed_args parse_args(int argc, char* argv[]) {
     parsed_args result;
     memset(&result, 0, sizeof result);
     result.segment_name = "/archive_test";
     result.segment_size = 2000;
     result.buffer_size = 1000;
     result.transport = true;
     result.polymorphic = true;
     result.make_archive = true;
     result.make_stream = true;
     result.server = true;
     int i = 1;
     while (i < argc) {
        const std::string arg(argv[i]);
        if (arg == "--iterations") {
            assert(++i < argc);
            result.iterations = boost::lexical_cast<size_t>(argv[i]);
            result.server = false;
        } else if (arg == "--no_transport") {
            result.transport = false;
        } else if (arg == "--no_polymorphic") {
            result.polymorphic = false;
        } else if (arg == "--no_archive") {
            result.make_archive = false;
        } else if (arg == "--no_stream") {
            result.make_stream = false;
        } else {
            assert(false);
        }
        i += 1;
     }
     return result;
}

void test_server(const parsed_args& args) {
     shmem::named_shared_object segment;
     assert(segment.create(args.segment_name, args.segment_size));
     shmem::shared_mutex* mutex(
        segment.construct<shmem::shared_mutex>("mutex")());
     assert(mutex != 0);
     shmem::shared_condition* condition(
        segment.construct<shmem::shared_condition>("condition")());
     assert(condition != 0);
     char* shared_buffer(segment.construct<char>("buffer")[args.buffer_size]());
     assert(shared_buffer != 0);
     parsed_args* shared_args(segment.construct<parsed_args>("arguments")());
     assert(shared_args != 0);
     volatile int* value_count(segment.construct<int>("value_count")());
     assert(value_count != 0);
     volatile int* reply_count(segment.construct<int>("reply_count")());
     assert(reply_count != 0);
     volatile int* stop_request(segment.construct<int>("stop_request")());
     assert(stop_request != 0);

     *value_count = 0;
     *reply_count = 0;
     *shared_args = args;
     *stop_request = 0;

     int last_value_count = 0;
     test_struct local_struct;

     shmem::shared_mutex::scoped_lock lock(*mutex);
     while (!*stop_request) {
#if NEW_SHM_TEST_DEBUG_PRINT
        std::cerr << "Waiting for a value after " << *value_count << std::endl;
#endif
        while (!*stop_request && (*value_count == last_value_count)) {
            condition->wait(lock);
        }
        if (*stop_request) break;
        last_value_count = *value_count;
#if NEW_SHM_TEST_DEBUG_PRINT
        std::cerr << "Got a value, reading it\n";
#endif
        if (args.make_stream) {
            io::stream<io::array_source> in(shared_buffer, args.buffer_size);
            if (!args.make_archive) {
                // do nothing
            } else if (args.polymorphic) {
                archive::polymorphic_binary_iarchive ar(in, archive_flags);
                if (args.transport) {
                    *static_cast<archive::polymorphic_iarchive*>(&ar)
                        & local_struct;
                }
            } else {
                archive::binary_iarchive ar(in, archive_flags);
                if (args.transport) ar & local_struct;
            }
           
#if NEW_SHM_TEST_DEBUG_PRINT
            std::cerr << "Writing reply\n";
#endif
            io::stream<io::array_sink> out(shared_buffer, args.buffer_size);
            if (!args.make_archive) {
                // do nothing
            } else if (args.polymorphic) {
                archive::polymorphic_binary_oarchive ar(out, archive_flags);
                if (args.transport) {
                    *static_cast<archive::polymorphic_oarchive*>(&ar)
                        & local_struct;
                }
            } else {
                archive::binary_oarchive ar(out, archive_flags);
                if (args.transport) ar & local_struct;
            }
        }
        *reply_count += 1;
#if NEW_SHM_TEST_DEBUG_PRINT
        std::cerr << "Notifying of reply " << *reply_count << std::endl;
#endif
        condition->notify_all();
     }
}

void test_client(parsed_args args) {
     size_t iterations = args.iterations;
     shmem::named_shared_object segment;
     assert(segment.open(args.segment_name));
     shmem::shared_mutex* mutex(
        segment.find<shmem::shared_mutex>("mutex").first);
     assert(mutex != 0);
     shmem::shared_condition* condition(
        segment.find<shmem::shared_condition>("condition").first);
     assert(condition != 0);
     std::pair<char*, size_t> buffer_info(segment.find<char>("buffer"));
     char* shared_buffer(buffer_info.first);
     assert(shared_buffer != 0);
     size_t buffer_size(buffer_info.second);
     assert(buffer_size != 0);
     parsed_args* argsp(segment.find<parsed_args>("arguments").first);
     assert(argsp != 0);
     args = *argsp; // overwrite from shared after getting iters
     volatile int* value_count(segment.find<int>("value_count").first);
     assert(value_count != 0);
     volatile int* reply_count(segment.find<int>("reply_count").first);
     assert(reply_count != 0);
     volatile int* stop_request(segment.find<int>("stop_request").first);
     assert(stop_request != 0);

     test_struct local_struct;

     shmem::shared_mutex::scoped_lock lock(*mutex);
     int last_reply_count = *reply_count;
     for (size_t i = 0; i < iterations; i++) {
#if NEW_SHM_TEST_DEBUG_PRINT
        std::cerr << "writing value\n";
#endif
        if (args.make_stream) {
            io::stream<io::array_sink> out(shared_buffer, buffer_size);
            if (!args.make_archive) {
                // do nothing
            } else if (args.polymorphic) {
                archive::polymorphic_binary_oarchive ar(out, archive_flags);
                if (args.transport) {
                    *static_cast<archive::polymorphic_oarchive*>(&ar)
                        & expected;
                }
            } else {
                archive::binary_oarchive ar(out, archive_flags);
                if (args.transport) ar & expected;
            }
        }
        *value_count += 1;
#if NEW_SHM_TEST_DEBUG_PRINT
        std::cerr << "notifying of value " << *value_count << std::endl;
#endif
        condition->notify_all();
#if NEW_SHM_TEST_DEBUG_PRINT
        std::cerr << "waiting for reply from " << *reply_count << std::endl;
#endif
        while (*reply_count == last_reply_count) {
            condition->wait(lock);
        }
        last_reply_count = *reply_count;
        if (args.make_stream) {
            io::stream<io::array_source> in(shared_buffer, buffer_size);
            if (!args.make_archive) {
                // do nothing
            } else if (args.polymorphic) {
                archive::polymorphic_binary_iarchive ar(in, archive_flags);
                if (args.transport) {
                    *static_cast<archive::polymorphic_iarchive*>(&ar)
                        & local_struct;
                }
            } else {
                archive::binary_iarchive ar(in, archive_flags);
                if (args.transport) ar & local_struct;
            }
        }
     }
     *stop_request = 1;
     condition->notify_all();
}

int main(int argc, char* argv[]) {
     const parsed_args args(parse_args(argc, argv));
     if (args.server) {
        test_server(args);
     } else {
        test_client(args);
     }
     return 0;
}


Boost list run by bdawes at acm.org, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk