Boost logo

Boost Users :

Subject: Re: [Boost-users] Boost-users Digest, Vol 3643, Issue 1
From: Damien Kick (dkick_at_[hidden])
Date: 2013-12-02 13:25:02


On Dec 1, 2013, at 11:00 AM, boost-users-request_at_[hidden] wrote:

> Date: Sun, 1 Dec 2013 20:38:32 +0530
> From: Rahul Mathur <srivmuk_at_[hidden]>
> Subject: [Boost-users] ZeroMQ integration with Boost-v1.54
> Message-ID:
> <CALtf4sBn6Dh0WVOgAfaXvSj4obMiMw335w88xDj5vguPOoHMaw_at_[hidden]>
[…]
> BTW, How do I proceed to have ZeroMQ integrated with Boost-v1.54 with
> GCC on Linux platform?

Here is some code that I've been using to integrate Boost.Asio and
0MQ. The OP question asked for GNU/Linux only but this code has been
working for me on both Windows and POSIX. Most of this comes from
comments on various Internet forums but integrates together a few
things which I personally found less than obvious on my first few
attempts. For example, most of the code snippets I found only
mentioned POSIX and I had to beat my head against a few walls to get
something that worked on both, though YMMV.

==> config.hpp.in <==
#ifndef INCLUDE_SHOR_DM_CONFIG_HPP
#define INCLUDE_SHOR_DM_CONFIG_HPP

// Boost C++ headers
#include <boost/filesystem.hpp>
// C++ headers
#include <string>

#ifndef _WIN32_WINNT
# define _WIN32_WINNT 0x0601
#endif

#ifdef _MSC_VER
# define SHOR_DM_CONSTEXPR
# define SHOR_DM_DEFAULT_CTOR(expr) expr { }
# define SHOR_DM_DEFAULT_COPY(expr)
# define SHOR_DM_DEFAULT_DTOR(expr) expr { }
# define SHOR_DM_DEFAULT_ASSIGN(expr)
# define SHOR_DM_DELETE(expr) expr
# define SHOR_DM_NOEXCEPT throw()
#else
# define SHOR_DM_CONSTEXPR constexpr
# define SHOR_DM_DEFAULT_CTOR(expr) expr = default
# define SHOR_DM_DEFAULT_COPY(expr) expr = default
# define SHOR_DM_DEFAULT_DTOR(expr) expr = default
# define SHOR_DM_DEFAULT_ASSIGN(expr) expr = default
# define SHOR_DM_DELETE(expr) expr = delete
# define SHOR_DM_NOEXCEPT noexcept
#endif

#endif

==> zee/basic_zmq_fd.hpp <==
#ifndef INCLUDE_SHOR_DM_ZEE_BASIC_ZMQ_FD_HPP
#define INCLUDE_SHOR_DM_ZEE_BASIC_ZMQ_FD_HPP

// 0MQ headers
#include <zmq.hpp>

namespace shor {
namespace dm {
namespace zee {
template<typename Native_handle>
struct Basic_zmq_fd {
    typedef Native_handle native_handle_type;
    typedef native_handle_type value_type;

private:
    zmq::socket_t& zmq_socket_;
    Native_handle zmq_fd_;

public:
    explicit Basic_zmq_fd(zmq::socket_t& zmq_socket)
        : zmq_socket_(zmq_socket),
          zmq_fd_()
        {
            std::size_t zmq_fd_size = sizeof(zmq_fd_);
            zmq_socket_.getsockopt(ZMQ_FD, &zmq_fd_, &zmq_fd_size);
        }
    value_type operator () () const { return zmq_fd_; }
};

} // namespace zee
} // namespace dm
} // namespace shor

#endif

==> zee/handler.hpp <==
#ifndef INCLUDE_SHOR_DM_ZEE_HANDLER_HPP
#define INCLUDE_SHOR_DM_ZEE_HANDLER_HPP

// DM headers
#include "dm/zee/msg.hpp"
// Boost C++ headers
#include <boost/system/error_code.hpp>
#include <boost/function.hpp>
// C++ headers
#include <functional>

namespace shor {
namespace dm {
namespace zee {
typedef std::function<
    void (const boost::system::error_code& err, shor::dm::zee::Msg& msg)
> Async_recv_handler;

} // namespace zee
} // namespace dm
} // namespace shor

#endif

==> zee/lossy_service.hpp <==
#ifndef INCLUDE_SHOR_DM_ZEE_LOSSY_SERVICE_HPP
#define INCLUDE_SHOR_DM_ZEE_LOSSY_SERVICE_HPP

// Boost C++ headers
#include <boost/asio.hpp>

namespace shor {
namespace dm {
namespace zee {
template<typename Service>
struct Lossy_service : public Service {
    explicit Lossy_service(boost::asio::io_service& ios)
        : Service(ios)
        { }

    void destory(typename Service::implementation_type& impl)
        { }
};

} // namespace zee
} // namespace dm
} // namespace shor

#endif

==> zee/msg.cpp <==
#include "dm/zee/msg.hpp"

shor::dm::zee::detail::Msg::Frame::Frame(
    Frame&& that)
    : that_(std::move(that.that_))
{ }

shor::dm::zee::detail::Msg::Frame::Frame(
    zmq::message_t&& that)
    : that_(std::move(that))
{ }

shor::dm::zee::detail::Msg::Frame::Frame(
    const std::string& the_other)
    : that_(the_other.size())
{
    std::copy(the_other.begin(), the_other.end(),
              static_cast<char*>(that_.data()));
}

shor::dm::zee::detail::Msg::Frame&
shor::dm::zee::detail::Msg::Frame::operator = (
    Frame&& that)
{
    that_ = std::move(that.that_);
    return *this;
}

shor::dm::zee::detail::Msg::Frame&
shor::dm::zee::detail::Msg::Frame::operator = (
    const std::string& the_other)
{
    that_.rebuild(the_other.size());
    std::copy(the_other.begin(), the_other.end(),
              static_cast<char*>(that_.data()));
    return *this;
}

shor::dm::zee::detail::Msg::Msg(
    Msg&& that)
    : frames_(std::move(that.frames_))
{ }

shor::dm::zee::detail::Msg&
shor::dm::zee::detail::Msg::operator = (
    Msg&& that)
{
    frames_ = std::move(that.frames_);
    return *this;
}

==> zee/msg.hpp <==
#ifndef INCLUDE_SHOR_DM_ZEE_MSG_HPP
#define INCLUDE_SHOR_DM_ZEE_MSG_HPP

// My header
#include "dm/zee/msg_fwd.hpp"
// DM headers
#include "dm/config.hpp"
// 0MQ headers
#include <zmq.hpp>
// C++ headers
#include <deque>

namespace shor {
namespace dm {
namespace zee {
namespace detail {
struct Msg {
    class Frame {
        zmq::message_t that_;

    public:
        SHOR_DM_DEFAULT_CTOR(Frame());
        Frame(Frame&& that);
        Frame(zmq::message_t&& that);
        Frame(const std::string& the_other);
        SHOR_DM_DEFAULT_DTOR(~Frame());

        Frame& operator = (Frame&& that);
        Frame& operator = (const std::string& the_other);
        operator zmq::message_t& ();
        operator const zmq::message_t& () const;

        zmq::message_t& get();

        const zmq::message_t& cget() const;
        const zmq::message_t& get() const;

    private:
        SHOR_DM_DELETE(Frame(const Frame& that));
        SHOR_DM_DELETE(Frame& operator = (const Frame& that));
    };

    typedef std::deque<Frame> Frames;

private:
    Frames frames_;

public:
    SHOR_DM_DEFAULT_CTOR(Msg());
    Msg(Msg&& that);
    SHOR_DM_DEFAULT_DTOR(~Msg());

    Msg& operator = (Msg&& that);

    Frames& frames();

    const Frames& frames() const;

private:
    SHOR_DM_DELETE(Msg(const Msg& that));
    SHOR_DM_DELETE(Msg& operator = (const Msg& that));
};

} // namespace detail
typedef detail::Msg Msg;

} // namespace zee
} // namespace dm
} // namespace shor

#include "dm/zee/msg.ipp"

#endif

==> zee/msg.ipp <==
#ifndef INCLUDE_SHOR_DM_ZEE_MSG_IPP
#define INCLUDE_SHOR_DM_ZEE_MSG_IPP

// My header
#include "dm/zee/msg.hpp"

inline shor::dm::zee::detail::Msg::Frame::operator
zmq::message_t& ()
{
    return that_;
}

inline shor::dm::zee::detail::Msg::Frame::operator
const zmq::message_t& () const
{
    return that_;
}

inline zmq::message_t&
shor::dm::zee::detail::Msg::Frame::get()
{
    return that_;
}

inline const zmq::message_t&
shor::dm::zee::detail::Msg::Frame::cget()
    const
{
    return that_;
}

inline const zmq::message_t&
shor::dm::zee::detail::Msg::Frame::get()
    const
{
    return that_;
}

inline shor::dm::zee::detail::Msg::Frames&
shor::dm::zee::detail::Msg::frames()
{
    return frames_;
}

inline const shor::dm::zee::detail::Msg::Frames&
shor::dm::zee::detail::Msg::frames()
    const
{
    return frames_;
}

#endif

==> zee/msg_fwd.hpp <==
#ifndef INCLUDE_SHOR_DM_ZEE_MSG_FWD_HPP
#define INCLUDE_SHOR_DM_ZEE_MSG_FWD_HPP

namespace shor {
namespace dm {
namespace zee {
namespace detail {
struct Msg;

} // namespace detail
typedef detail::Msg Msg;

} // namespace zee
} // namespace dm
} // namespace shor

#endif

==> zee/posix_zmq_socket.cpp <==
// My header
#include "dm/zee/posix_zmq_socket.hpp"
// DM headers
#include "dm/zee/handler.hpp"
// Boost C++ headers
#include <boost/bind.hpp>

struct shor::dm::zee::detail::Posix_zmq_socket::Read_handler {
    typedef Posix_zmq_socket::implementation_type Io_type;
    typedef Posix_zmq_socket::Base_read_handler Base_read_handler;

private:
    Io_type& io_;
    Base_read_handler read_handler_;
    Async_recv_handler recv_handler_;

public:
    Read_handler(
        Io_type& io,
        Base_read_handler read_handler,
        Async_recv_handler recv_handler)
        : io_(io),
          read_handler_(read_handler),
          recv_handler_(recv_handler)
        { }

    void operator () (
        const boost::system::error_code& err,
        std::size_t /*bytes_transferred*/)
        {
            read_handler_(*this, err, recv_handler_);
        }

    void operator () ()
        {
            io_.async_read_some(boost::asio::null_buffers(), *this);
        }
};

shor::dm::zee::detail::Posix_zmq_socket::Posix_zmq_socket(
    boost::asio::io_service& ios,
    zmq::context_t& zenv,
    const int zmq_socket_type)
    : Zmq_socket_base(ios, zenv, zmq_socket_type),
      zmq_fd_(Zmq_socket_base::zmq_socket()),
      io_(ios, zmq_fd_()),
      io_release_(&io_, Release())
{ }

void
shor::dm::zee::detail::Posix_zmq_socket::async_recv(
    Async_recv_handler recv_handler)
{
    const auto base_read_handler_fn = boost::bind(
        &Posix_zmq_socket::read_handler, this, _1, _2, _3);
    Read_handler read_handler(
        std::ref(io_), base_read_handler_fn, recv_handler);
    read_handler();
}

void
shor::dm::zee::detail::Posix_zmq_socket::close_ext()
{
    this->cancel();
    // TODO: reset zmq_fd_
    io_.release();
}

==> zee/posix_zmq_socket.hpp <==
#ifndef INCLUDE_SHOR_DM_ZEE_POSIX_ZMQ_SOCKET_HPP
#define INCLUDE_SHOR_DM_ZEE_POSIX_ZMQ_SOCKET_HPP

// Base class header
#include "dm/zee/zmq_socket_base.hpp"
// DM headers
#include "dm/zee/handler.hpp"
#include "dm/zee/lossy_service.hpp"
#include "dm/zee/basic_zmq_fd.hpp"
// Boost C++ headers
#include <boost/asio/posix/basic_stream_descriptor.hpp>
#include <boost/asio/posix/stream_descriptor_service.hpp>
#include <boost/asio.hpp>

namespace shor {
namespace dm {
namespace zee {
namespace detail {
class Posix_zmq_socket : public Zmq_socket_base {
    using Stream_descriptor_service =
        boost::asio::posix::stream_descriptor_service;

    template<typename Service> using Basic_stream_descriptor =
        boost::asio::posix::basic_stream_descriptor<Service>;

    // Borrowed from <http://preview.tinyurl.com/n6jh2ne> but it
    // doesn't seem to work; i.e. I was still finding that the file
    // descriptor was being closed, causing a crash at shutdown.
    // Because of this, I (re)introduced Release (see below).
    typedef Lossy_service<Stream_descriptor_service> service_type;
    typedef Basic_stream_descriptor<service_type> implementation_type;

    struct Read_handler;
    struct Release {
        void operator () (implementation_type* ptr)
            { if (ptr->is_open()) { ptr->release(); } }
    };

    Basic_zmq_fd<int> zmq_fd_;
    implementation_type io_;
    std::unique_ptr<implementation_type, Release> io_release_;

public:
    Posix_zmq_socket(
        boost::asio::io_service& ios, zmq::context_t& zenv,
        int zmq_socket_type);
    SHOR_DM_DEFAULT_DTOR(~Posix_zmq_socket());

    void async_recv(Async_recv_handler handler);
    void cancel();

private:
    SHOR_DM_DELETE(Posix_zmq_socket(const Posix_zmq_socket& that));
    SHOR_DM_DELETE(
        Posix_zmq_socket& operator = (const Posix_zmq_socket& that));

    virtual void close_ext() override;
};

} // namespace detail
typedef detail::Posix_zmq_socket Posix_zmq_socket;

} // namespace zee
} // namespace dm
} // namespace shor

#include "dm/zee/posix_zmq_socket.ipp"

#endif

==> zee/posix_zmq_socket.ipp <==
#ifndef INCLUDE_SHOR_DM_ZEE_POSIX_ZMQ_SOCKET_IPP
#define INCLUDE_SHOR_DM_ZEE_POSIX_ZMQ_SOCKET_IPP

// My header
#include "dm/zee/posix_zmq_socket.hpp"

inline void
shor::dm::zee::detail::Posix_zmq_socket::cancel()
{
    io_.cancel();
}

#endif

==> zee/windows_zmq_socket.cpp <==
// My header
#include "dm/zee/windows_zmq_socket.hpp"
// DM headers
#include "dm/zee/handler.hpp"
// Boost C++ headers
#include <boost/system/error_code.hpp>
#include <boost/system/system_error.hpp>
#include <boost/asio/error.hpp>
#include <boost/bind.hpp>

struct shor::dm::zee::detail::Windows_zmq_socket::Read_handler {
    typedef Windows_zmq_socket::implementation_type Io_type;
    typedef Windows_zmq_socket::Base_read_handler Base_read_handler;

private:
    Io_type& io_;
    Base_read_handler read_handler_;
    Async_recv_handler recv_handler_;

public:
    Read_handler(
        Io_type& io,
        Base_read_handler read_handler,
        Async_recv_handler recv_handler)
        : io_(io),
          read_handler_(read_handler),
          recv_handler_(recv_handler)
        { }

    void operator () (const boost::system::error_code& err)
        {
            read_handler_(*this, err, recv_handler_);
        }

    void operator () ()
        {
            io_.async_wait(*this);
        }
};

shor::dm::zee::detail::Windows_zmq_socket::Windows_zmq_socket(
    boost::asio::io_service& ios,
    zmq::context_t& zenv,
    const int zmq_socket_type)
    : Zmq_socket_base(ios, zenv, zmq_socket_type),
      zmq_fd_(Zmq_socket_base::zmq_socket()),
      io_(ios)
{
    auto zfd_event = CreateEvent(
        nullptr, // default security settings
        false, // auto reset
        false, // initial state: unsignalled
        nullptr); // anonymous object
    if (!zfd_event) {
        const boost::system::error_code an_error_code(
            GetLastError(),
            boost::asio::error::get_system_category());
        throw boost::system::system_error(an_error_code, "CreateEvent");
    }
    io_.assign(zfd_event);
    if (WSAEventSelect(zmq_fd_(), zfd_event, FD_READ) == SOCKET_ERROR) {
        const boost::system::error_code an_error_code(
            WSAGetLastError(),
            boost::asio::error::get_system_category());
        throw boost::system::system_error(
            an_error_code, "WSAEventSelect");
    }
}

void
shor::dm::zee::detail::Windows_zmq_socket::async_recv(
    Async_recv_handler recv_handler)
{
    const auto base_read_handler_fn = boost::bind(
        &Windows_zmq_socket::read_handler, this, _1, _2, _3);
    Read_handler read_handler(
        std::ref(io_), base_read_handler_fn, recv_handler);
    read_handler();
}

void
shor::dm::zee::detail::Windows_zmq_socket::close_ext()
{
    this->cancel();
    // TODO: reset zmq_fd_
    io_.close();
}

==> zee/windows_zmq_socket.hpp <==
#ifndef INCLUDE_SHOR_DM_ZEE_WINDOWS_ZMQ_SOCKET_HPP
#define INCLUDE_SHOR_DM_ZEE_WINDOWS_ZMQ_SOCKET_HPP

// Base class header
#include "dm/zee/zmq_socket_base.hpp"
// DM headers
#include "dm/zee/basic_zmq_fd.hpp"
#include "dm/zee/handler.hpp"
// Boost C++ headers
#include <boost/asio/windows/object_handle.hpp>
#include <boost/asio.hpp>

namespace shor {
namespace dm {
namespace zee {
namespace detail {
class Windows_zmq_socket : public Zmq_socket_base {
    typedef boost::asio::windows::object_handle implementation_type;

    struct Read_handler;

    Basic_zmq_fd<SOCKET> zmq_fd_;
    implementation_type io_;

public:
    Windows_zmq_socket(
        boost::asio::io_service& ios, zmq::context_t& zenv,
        int zmq_socket_type);
    SHOR_DM_DEFAULT_DTOR(~Windows_zmq_socket());

    void async_recv(Async_recv_handler handler);
    void cancel();

private:
    SHOR_DM_DELETE(Windows_zmq_socket(const Windows_zmq_socket& that));
    SHOR_DM_DELETE(
        Windows_zmq_socket& operator = (const Windows_zmq_socket& that));

    virtual void close_ext() override;
};

} // namespace detail
typedef detail::Windows_zmq_socket Windows_zmq_socket;

} // namespace zee
} // namespace dm
} // namespace shor

#include "dm/zee/windows_zmq_socket.ipp"

#endif

==> zee/windows_zmq_socket.ipp <==
#ifndef INCLUDE_SHOR_DM_ZEE_WINDOWS_ZMQ_SOCKET_IPP
#define INCLUDE_SHOR_DM_ZEE_WINDOWS_ZMQ_SOCKET_IPP

// My header
#include "dm/zee/windows_zmq_socket.hpp"

inline void
shor::dm::zee::detail::Windows_zmq_socket::cancel()
{
    io_.cancel();
}

#endif

==> zee/zmq_socket.hpp <==
#ifndef INCLUDE_SHOR_DM_ZEE_ZMQ_SOCKET_HPP
#define INCLUDE_SHOR_DM_ZEE_ZMQ_SOCKET_HPP

// My header
#include "dm/zee/zmq_socket_fwd.hpp"
// Conditional headers
#ifndef _MSC_VER
# include "dm/zee/posix_zmq_socket.hpp"
#else
# include "dm/zee/windows_zmq_socket.hpp"
#endif
// Boost C++ headers
#include <boost/asio.hpp>

namespace shor {
namespace dm {
namespace zee {
#ifndef _MSC_VER
typedef Posix_zmq_socket Zmq_socket;
#else
typedef Windows_zmq_socket Zmq_socket;
#endif

} // namespace zee
} // namespace dm
} // namespace shor

#endif

==> zee/zmq_socket_base.cpp <==
// My headers
#include "dm/zee/zmq_socket_base.hpp"
// C++ headers
#include <cassert>

shor::dm::zee::detail::Zmq_socket_base::Opts::Opts(
    zmq::socket_t& zmq_socket)
    : zmq_socket_(zmq_socket)
{ }

shor::dm::zee::detail::Zmq_socket_base::Zmq_socket_base(
    boost::asio::io_service& ios,
    zmq::context_t& zenv,
    const int zmq_socket_type)
    : ios_(ios),
      zenv_(zenv),
      zmq_socket_(zenv, zmq_socket_type),
      opts_(zmq_socket_)
{
    opts_.linger(0);
}

bool
shor::dm::zee::detail::Zmq_socket_base::send(
    Msg msg)
{
    if (msg.frames().empty()) {
        auto frame = Msg::Frame();
        const auto send_p = zmq_socket_.send(frame);
        static_cast<void>(opts_.events());
        return send_p;
    }
    const auto first = msg.frames().begin();
    const auto last = std::prev(msg.frames().end());
    for (auto pos = first; pos != last; ++pos) {
        const auto send_p = zmq_socket_.send(*pos, ZMQ_SNDMORE);
        static_cast<void>(opts_.events());
        if (!send_p) {
            return false;
        }
    }
    const auto send_p = zmq_socket_.send(msg.frames().back(), 0);
    static_cast<void>(opts_.events());
    return send_p;
}

shor::dm::zee::Msg
shor::dm::zee::detail::Zmq_socket_base::recv()
{
    Msg my_value;
    int more_p = 0;
    do {
        zmq::message_t frame;
        const int flags = 0;
        zmq_socket_.recv(&frame, flags);
        static_cast<void>(opts_.events());
        my_value.frames().push_back(std::move(frame));
        more_p = opts_.rcvmore();
    } while (more_p);
    return my_value;
}

void
shor::dm::zee::detail::Zmq_socket_base::read_handler(
    std::function<void ()> io_fn,
    const boost::system::error_code& err,
    const Async_recv_handler recv_handler)
{
    if (err == boost::asio::error::operation_aborted) {
        auto msg = Msg();
        recv_handler(err, msg);
    } else if (opts_.events() & ZMQ_POLLIN) {
        do {
            auto msg = this->recv();
            recv_handler(err, msg);
        } while (opts_.events() & ZMQ_POLLIN);
    } else {
        io_fn();
    }
}

==> zee/zmq_socket_base.hpp <==
#ifndef INCLUDE_SHOR_DM_ZEE_ZMQ_SOCKET_BASE_HPP
#define INCLUDE_SHOR_DM_ZEE_ZMQ_SOCKET_BASE_HPP

// DM headers
#include "dm/zee/handler.hpp"
#include "dm/zee/msg.hpp"
// 0MQ headers
#include <zmq.hpp>
// Boost C++ headers
#include <boost/asio.hpp>
// C++ headers
#include <string>

namespace shor {
namespace dm {
namespace zee {
namespace detail {
struct Zmq_socket_base {
    class Opts {
        zmq::socket_t& zmq_socket_;

    public:
        explicit Opts(zmq::socket_t& zmq_socket);
        SHOR_DM_DEFAULT_DTOR(~Opts());

        void linger(int value);
        void subscribe(const std::string& value);

        int events() const;
        int linger() const;
        int rcvmore() const;

    private:
        template<typename Opt_value>
        Opt_value getsockopt(int opt_name) const;

        void setsockopt(int opt_name, const std::string& opt_value);

        template<typename Opt_value>
        void setsockopt(int opt_name, const Opt_value& opt_value);

        SHOR_DM_DELETE(Opts(const Opts& that));
        SHOR_DM_DELETE(Opts& operator = (const Opts& that));
    };

private:
    boost::asio::io_service& ios_;
    zmq::context_t& zenv_;
    zmq::socket_t zmq_socket_;
    Opts opts_;

public:
    void bind(const char* addr);
    void bind(const std::string& addr);
    void close();
    void connect(const char* addr);
    void connect(const std::string& addr);
    boost::asio::io_service& get_io_service();
    Opts& opts();
    bool send(Msg msg);
    Msg recv();

    bool connected() const;
    const boost::asio::io_service& get_const_io_service() const;
    const boost::asio::io_service& get_io_service() const;
    const Opts& opts() const;

protected:
    typedef std::function<void ()> Void_fn;
    typedef std::function<
        void (Void_fn, const boost::system::error_code& err,
              Async_recv_handler recv_handler)
> Base_read_handler;

    Zmq_socket_base(
        boost::asio::io_service& ios, zmq::context_t& zenv,
        int zmq_socket_type);
    SHOR_DM_DEFAULT_DTOR(virtual ~Zmq_socket_base());

    void read_handler(
        std::function<void ()> io_fn,
        const boost::system::error_code& err,
        Async_recv_handler handler);
    zmq::context_t& zenv();
    zmq::socket_t& zmq_socket();

    const zmq::context_t& const_zenv() const;
    const zmq::socket_t& const_zmq_socket() const;
    const zmq::context_t& zenv() const;
    const zmq::socket_t& zmq_socket() const;

private:
    SHOR_DM_DELETE(Zmq_socket_base(const Zmq_socket_base& that));
    SHOR_DM_DELETE(
        Zmq_socket_base& operator = (const Zmq_socket_base& that));

    virtual void close_ext() = 0;
};

} // namespace detail
typedef detail::Zmq_socket_base Zmq_socket_base;

} // namespace zee
} // namespace dm
} // namespace shor

#include "dm/zee/zmq_socket_base.ipp"

#endif

==> zee/zmq_socket_base.ipp <==
#ifndef INCLUDE_SHOR_DM_ZEE_ZMQ_SOCKET_BASE_IPP
#define INCLUDE_SHOR_DM_ZEE_ZMQ_SOCKET_BASE_IPP

// My header
#include "dm/zee/zmq_socket_base.hpp"

inline void
shor::dm::zee::detail::Zmq_socket_base::Opts::linger(
    const int value)
{
    this->setsockopt(ZMQ_LINGER, value);
}

inline void
shor::dm::zee::detail::Zmq_socket_base::Opts::subscribe(
    const std::string& value)
{
    this->setsockopt(ZMQ_SUBSCRIBE, value);
}

inline int
shor::dm::zee::detail::Zmq_socket_base::Opts::events()
    const
{
    return this->getsockopt<int>(ZMQ_EVENTS);
}

inline int
shor::dm::zee::detail::Zmq_socket_base::Opts::linger()
    const
{
    return this->getsockopt<int>(ZMQ_LINGER);
}

inline int
shor::dm::zee::detail::Zmq_socket_base::Opts::rcvmore()
    const
{
    return this->getsockopt<int>(ZMQ_RCVMORE);
}

template<typename Opt_value>
Opt_value
shor::dm::zee::detail::Zmq_socket_base::Opts::getsockopt(
    const int opt_name)
    const
{
    Opt_value opt_value;
    std::size_t opt_size = sizeof(Opt_value);
    zmq_socket_.getsockopt(opt_name, &opt_value, &opt_size);
    return opt_value;
}

inline void
shor::dm::zee::detail::Zmq_socket_base::Opts::setsockopt(
    const int opt_name,
    const std::string& opt_value)
{
    zmq_socket_.setsockopt(opt_name, opt_value.c_str(), opt_value.size());
}

template<typename Opt_value>
inline void
shor::dm::zee::detail::Zmq_socket_base::Opts::setsockopt(
    const int opt_name,
    const Opt_value& opt_value)
{
    const std::size_t opt_size = sizeof(Opt_value);
    zmq_socket_.setsockopt(opt_name, &opt_value, opt_size);
}

inline void
shor::dm::zee::detail::Zmq_socket_base::bind(
    const char* addr)
{
    zmq_socket_.bind(addr);
}

inline void
shor::dm::zee::detail::Zmq_socket_base::bind(
    const std::string& addr)
{
    zmq_socket_.bind(addr.c_str());
}

inline void
shor::dm::zee::detail::Zmq_socket_base::close()
{
    this->close_ext();
    zmq_socket_.close();
}

inline void
shor::dm::zee::detail::Zmq_socket_base::connect(
    const char* addr)
{
    zmq_socket_.connect(addr);
}

inline void
shor::dm::zee::detail::Zmq_socket_base::connect(
    const std::string& addr)
{
    zmq_socket_.connect(addr.c_str());
}

inline boost::asio::io_service&
shor::dm::zee::detail::Zmq_socket_base::get_io_service()
{
    return ios_;
}

inline shor::dm::zee::detail::Zmq_socket_base::Opts&
shor::dm::zee::detail::Zmq_socket_base::opts()
{
    return opts_;
}

inline bool
shor::dm::zee::detail::Zmq_socket_base::connected()
    const
{
    // This function should have been const in zmq::socket_t
    return const_cast<zmq::socket_t&>(zmq_socket_).connected();
}

inline const boost::asio::io_service&
shor::dm::zee::detail::Zmq_socket_base::get_const_io_service()
    const
{
    return ios_;
}

inline const boost::asio::io_service&
shor::dm::zee::detail::Zmq_socket_base::get_io_service()
    const
{
    return ios_;
}

inline zmq::context_t&
shor::dm::zee::detail::Zmq_socket_base::zenv()
{
    return zenv_;
}

inline zmq::socket_t&
shor::dm::zee::detail::Zmq_socket_base::zmq_socket()
{
    return zmq_socket_;
}

inline const zmq::context_t&
shor::dm::zee::detail::Zmq_socket_base::const_zenv()
    const
{
    return zenv_;
}

inline const zmq::socket_t&
shor::dm::zee::detail::Zmq_socket_base::const_zmq_socket()
    const
{
    return zmq_socket_;
}

inline const zmq::context_t&
shor::dm::zee::detail::Zmq_socket_base::zenv()
    const
{
    return zenv_;
}

inline const zmq::socket_t&
shor::dm::zee::detail::Zmq_socket_base::zmq_socket()
    const
{
    return zmq_socket_;
}

#endif

==> zee/zmq_socket_fwd.hpp <==
#ifndef INCLUDE_SHOR_DM_ZEE_ZMQ_SOCKET_FWD_HPP
#define INCLUDE_SHOR_DM_ZEE_ZMQ_SOCKET_FWD_HPP

namespace shor {
namespace dm {
namespace zee {
namespace detail {
#ifndef _MSC_VER
class Posix_zmq_socket;
#else
class Windows_zmq_socket;
#endif

} // namespace detail
#ifndef _MSC_VER
typedef detail::Posix_zmq_socket Posix_zmq_socket;
typedef Posix_zmq_socket Zmq_socket;
#else
typedef detail::Windows_zmq_socket Windows_zmq_socket;
typedef Windows_zmq_socket Zmq_socket;
#endif

} // namespace zee
} // namespace dm
} // namespace shor

#endif

________________________________

This e-mail and any attachments are confidential. If it is not intended for you, please notify the sender, and please erase and ignore the contents.


Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net