Boost logo

Boost Users :

Subject: Re: [Boost-users] [asio] Async (or sync) in threads
From: Michael Powell (mwpowellhtx_at_[hidden])
Date: 2013-07-24 21:08:07


On Wed, Jul 24, 2013 at 6:57 PM, Michael Powell <mwpowellhtx_at_[hidden]> wrote:
> Hello,
>
> I am having a hard time getting my Asio usage off the ground.
>
> I started with what I thought was a nominal (knock on wood)
> asynchronous server, which runs in a worker thread, but was running
> into seg fault issues.

Taking a step back...

I want to accomplish some IPC through socket channels, as a sort of
event broker type architecture.

So it will be important for internal callers to pub/sub
mutex-protected event-channels, which subsequently
serialize/deserialize through the running socket-worker-thread.

But... All the Asio examples I am reading seem to prefer an instance
of io_service in the main, which the main's sole purpose is to run a
server.

I am beginning to wonder if that's the case here. At least io_service
needs to be available to the main thread?

A bit puzzled. I'm sure it is straightforward once you get past these
couple of hurdles.

> In the interest of "simplifying" the issue, I decided to simplify to a
> "blocking" server, hopefully also hosted in a thread, but am also
> running into seg fault issues.
>
> First I am trying to capture the server instance in a lambda passing
> it to a thread as a runner. Then I am trying to capture a shared
> pointer to the server thinking there is some move semantic or
> something. Seg fault in either case.
>
> Most of the examples are one main being the sole main thread hosting
> the io_service. Would that this were the case with me. This has to be
> one worker among several other workers running in the same process.
>
> Wouldn't mind if there were a couple of ideas recommending what I
> might explore next.
>
> Thank you...
>
> In the diagnostic routine I've got wired up:
>
> boost::shared_ptr<dchem::asio::ip::tcp::server2> server_
> = boost::make_shared<dchem::asio::ip::tcp::server2>(17017);
>
> boost::thread runner_([&server_]() {
> server_->start_server();
> });
> runner_.start_thread();
>
> while (!timer_.has_elapsed_query_wait(wait_timeout_,
> elapsed_milliseconds_)) {
> //...
> }
>
> runner_.interrupt();
> runner_.join();
>
> And the server itself, header first:
>
> class server2 {
> public:
>
> /**
> * \brief Endpoint type definition.
> */
> typedef boost::asio::ip::tcp::endpoint endpoint_type;
>
> /**
> * \brief Acceptor type definition.
> */
> typedef boost::asio::ip::tcp::acceptor acceptor_type;
>
> /**
> * \brief Service type definition.
> */
> typedef boost::asio::io_service service_type;
>
> /**
> * \brief Socket type definition.
> */
> typedef boost::asio::ip::tcp::socket socket_type;
>
> /**
> * \brief Socket pointer type definition.
> */
> typedef boost::shared_ptr<socket_type> socket_pointer;
>
> /**
> * \brief Port type definition.
> */
> typedef unsigned int port_type;
>
> protected:
>
> /**
> * \brief Returns whether bytes are available.
> * \param s A socket reference.
> */
> static bool are_bytes_readable(socket_type& s);
>
> /**
> * \brief Starts a new session.
> * \param s Starts running a new session.
> */
> void session(socket_pointer s);
>
> public:
>
> /**
> * \brief Constructor
> * \param port A TCP port.
> */
> server2(port_type port);
>
> /**
> * \brief Destructor
> */
> virtual ~server2();
>
> /**
> * \brief Starts the server.
> */
> void start_server();
>
> private:
>
> /**
> * \brief Service.
> */
> service_type m_service;
>
> /**
> * \brief Port.
> */
> port_type m_port;
> };
>
> /////////////////////////////////////////////////////////////////////
> server2::server2(port_type port)
> : m_service(),
> m_port(port) {
> }
>
> /////////////////////////////////////////////////////////////////////
> server2::~server2() {
> }
>
> /////////////////////////////////////////////////////////////////////
> bool server2::are_bytes_readable(socket_type& s) {
>
> boost::asio::socket_base::bytes_readable cmd_(true);
> s.io_control(cmd_);
> std::size_t cmd_result_ = cmd_.get();
> return cmd_result_ > 0;
> }
>
> /////////////////////////////////////////////////////////////////////
> void server2::start_server() {
>
> try {
>
> endpoint_type ep_(boost::asio::ip::tcp::v4(), m_port);
> acceptor_type a_(m_service, ep_);
>
> std::cout << "accepting connection" << std::endl;
>
> boost::system::error_code ec_;
>
> auto s_ = boost::make_shared<socket_type>(m_service);
> a_.accept((*s_), ec_);
>
> if (!ec_) return;
>
> std::cout << "starting session" << std::endl;
>
> session(s_);
> }
> catch (boost::thread_interrupted& tiex) {
> }
> catch (...) {
> }
> }
>
> /////////////////////////////////////////////////////////////////////
> void server2::session(socket_pointer s) {
>
> auto& s_ = (*s);
>
> while (true) {
>
> {
> utils::sleep_for_milliseconds(100);
>
> threading::thread_interruption_disabler tid;
>
> std::cout << "are bytes readable" << std::endl;
>
> if (!are_bytes_readable(s_)) continue;
>
> std::cout << "reading some" << std::endl;
>
> boost::system::error_code ec_;
> std::vector<byte> buffer_;
>
> size_t length_ = s_.read_some(boost::asio::buffer(buffer_), ec_);
>
> //Connection reset cleanly by peer.
> if (ec_ == boost::asio::error::eof)
> break;
> else if (ec_) //Some other error.
> throw boost::system::system_error(ec_);
>
> boost::asio::write(s_, boost::asio::buffer(buffer_, length_));
> }
> }
> }
>
>
>
> Regards,
>
> Michael Powell


Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net