#include #include #include #include #include #include #include namespace mpl = boost::mpl; namespace msmb = boost::msm::back; namespace msmf = boost::msm::front; size_t gProcessed = 0; namespace { namespace events { struct DataReceived { int index; }; struct StartBatch {}; } /** * State Machine Definition */ template struct ActionWithEvent { template void operator()(const Evt& evt, Fsm& fsm, Source& src, Target& tgt) { Action()(evt, fsm, src, tgt); fsm.process_event(Event()); } }; struct Batcher_ : public msmf::state_machine_def { explicit Batcher_(size_t batch_size) : batch_size_(batch_size) , count_(0) {} // Enable deferred events using activate_deferred_events = int; // States struct Holding; struct Publishing; // Initial using initial_state = mpl::vector; // Transition Actions struct EnqueueEvent; using EnqueueAndBatch = ActionWithEvent; struct PublishEvent; // Transition Guards struct CountNotEqualToBatch; struct CountEqualToBatch; struct CountNotZero; struct CountIsZero; // Transition Table struct transition_table : mpl::vector< // +------------+----------------------+------------+-----------------+----------------------+ // | Start | Event | Target | Action | Guard | // +------------+----------------------+------------+-----------------+----------------------+ msmf::Row< Holding , events::DataReceived , msmf::none , EnqueueEvent , CountNotEqualToBatch >, msmf::Row< Holding , events::DataReceived , msmf::none , EnqueueAndBatch , CountEqualToBatch >, msmf::Row< Holding , events::StartBatch , Publishing , msmf::none , msmf::none >, msmf::Row< Publishing , events::DataReceived , msmf::none , PublishEvent , CountNotZero >, msmf::Row< Publishing , msmf::none , Holding , msmf::none , CountIsZero > // +------------+----------------------+------------+-----------------+----------------------+ > {}; // State Variables size_t batch_size_; size_t count_; }; /** * States */ struct Batcher_::Holding : public msmf::state<> {}; struct Batcher_::Publishing : public msmf::state<> { template void on_entry(const Event&, Fsm& fsm) { std::cout << "Starting Batch" << std::endl; } template void on_exit(const Event&, Fsm&) { std::cout << "Ending Batch" << std::endl; } }; /** * Transition Actions */ struct Batcher_::EnqueueEvent { using deferring_action = int; template void operator()(const Event& e, Fsm& fsm, Source&, Target&) { fsm.defer_event(e); std::cout << "Queue Incremented [" << ++fsm.count_ << "/" << fsm.batch_size_ << "] - (" << "I: " << e.index << " | " << "MQ: " << fsm.get_message_queue().size() << " | " << "DQ: " << fsm.get_deferred_queue().size() << ")" << std::endl; } }; struct Batcher_::PublishEvent { template void operator()(const Event& e, Fsm& fsm, Source&, Target&) { std::cout << "Queue Decremented [" << --fsm.count_ << "/" << fsm.batch_size_ << "] - (" << "I: " << e.index << " | " << "MQ: " << fsm.get_message_queue().size() << " | " << "DQ: " << fsm.get_deferred_queue().size() << ")" << std::endl; ++gProcessed; } }; /** * Transition Guards */ struct Batcher_::CountNotEqualToBatch { template bool operator()(const Event&, Fsm& fsm, Source&, Target&) { return fsm.count_ < (fsm.batch_size_ - 1); } }; struct Batcher_::CountEqualToBatch { template bool operator()(const Event&, Fsm& fsm, Source&, Target&) { return fsm.count_ == (fsm.batch_size_ - 1); } }; struct Batcher_::CountNotZero { template bool operator()(const Event&, Fsm& fsm, Source&, Target&) { return fsm.count_ != 0; } }; struct Batcher_::CountIsZero { template bool operator()(const Event&, Fsm& fsm, Source&, Target&) { return fsm.count_ == 0; } }; // State Machine Declaration using Batcher = msmb::state_machine; } // anonymous namespace int main(int argc, char** argv) { if(argc < 2) { std::cerr << "usage: " << argv[0] << " [total_size=100000]" << std::endl; exit(1); } auto batch_size = atoi(argv[1]); auto total_size = argc > 2 ? atoi(argv[2]) : 100000; Batcher b(batch_size); b.start(); for(int i = 1; i <= total_size; ++i) { b.process_event(events::DataReceived{i}); } std::cout << "Total: " << gProcessed << " (" << total_size - gProcessed << " unprocessed)" << std::endl; }