|
Boost Users : |
Subject: Re: [Boost-users] Boost-users Digest, Vol 3643, Issue 1
From: Damien Kick (dkick_at_[hidden])
Date: 2013-12-02 13:25:02
On Dec 1, 2013, at 11:00 AM, boost-users-request_at_[hidden] wrote:
> Date: Sun, 1 Dec 2013 20:38:32 +0530
> From: Rahul Mathur <srivmuk_at_[hidden]>
> Subject: [Boost-users] ZeroMQ integration with Boost-v1.54
> Message-ID:
> <CALtf4sBn6Dh0WVOgAfaXvSj4obMiMw335w88xDj5vguPOoHMaw_at_[hidden]>
[
]
> BTW, How do I proceed to have ZeroMQ integrated with Boost-v1.54 with
> GCC on Linux platform?
Here is some code that I've been using to integrate Boost.Asio and
0MQ. The OP question asked for GNU/Linux only but this code has been
working for me on both Windows and POSIX. Most of this comes from
comments on various Internet forums but integrates together a few
things which I personally found less than obvious on my first few
attempts. For example, most of the code snippets I found only
mentioned POSIX and I had to beat my head against a few walls to get
something that worked on both, though YMMV.
==> config.hpp.in <==
#ifndef INCLUDE_SHOR_DM_CONFIG_HPP
#define INCLUDE_SHOR_DM_CONFIG_HPP
// Boost C++ headers
#include <boost/filesystem.hpp>
// C++ headers
#include <string>
#ifndef _WIN32_WINNT
# define _WIN32_WINNT 0x0601
#endif
#ifdef _MSC_VER
# define SHOR_DM_CONSTEXPR
# define SHOR_DM_DEFAULT_CTOR(expr) expr { }
# define SHOR_DM_DEFAULT_COPY(expr)
# define SHOR_DM_DEFAULT_DTOR(expr) expr { }
# define SHOR_DM_DEFAULT_ASSIGN(expr)
# define SHOR_DM_DELETE(expr) expr
# define SHOR_DM_NOEXCEPT throw()
#else
# define SHOR_DM_CONSTEXPR constexpr
# define SHOR_DM_DEFAULT_CTOR(expr) expr = default
# define SHOR_DM_DEFAULT_COPY(expr) expr = default
# define SHOR_DM_DEFAULT_DTOR(expr) expr = default
# define SHOR_DM_DEFAULT_ASSIGN(expr) expr = default
# define SHOR_DM_DELETE(expr) expr = delete
# define SHOR_DM_NOEXCEPT noexcept
#endif
#endif
==> zee/basic_zmq_fd.hpp <==
#ifndef INCLUDE_SHOR_DM_ZEE_BASIC_ZMQ_FD_HPP
#define INCLUDE_SHOR_DM_ZEE_BASIC_ZMQ_FD_HPP
// 0MQ headers
#include <zmq.hpp>
namespace shor {
namespace dm {
namespace zee {
template<typename Native_handle>
struct Basic_zmq_fd {
typedef Native_handle native_handle_type;
typedef native_handle_type value_type;
private:
zmq::socket_t& zmq_socket_;
Native_handle zmq_fd_;
public:
explicit Basic_zmq_fd(zmq::socket_t& zmq_socket)
: zmq_socket_(zmq_socket),
zmq_fd_()
{
std::size_t zmq_fd_size = sizeof(zmq_fd_);
zmq_socket_.getsockopt(ZMQ_FD, &zmq_fd_, &zmq_fd_size);
}
value_type operator () () const { return zmq_fd_; }
};
} // namespace zee
} // namespace dm
} // namespace shor
#endif
==> zee/handler.hpp <==
#ifndef INCLUDE_SHOR_DM_ZEE_HANDLER_HPP
#define INCLUDE_SHOR_DM_ZEE_HANDLER_HPP
// DM headers
#include "dm/zee/msg.hpp"
// Boost C++ headers
#include <boost/system/error_code.hpp>
#include <boost/function.hpp>
// C++ headers
#include <functional>
namespace shor {
namespace dm {
namespace zee {
typedef std::function<
void (const boost::system::error_code& err, shor::dm::zee::Msg& msg)
> Async_recv_handler;
} // namespace zee
} // namespace dm
} // namespace shor
#endif
==> zee/lossy_service.hpp <==
#ifndef INCLUDE_SHOR_DM_ZEE_LOSSY_SERVICE_HPP
#define INCLUDE_SHOR_DM_ZEE_LOSSY_SERVICE_HPP
// Boost C++ headers
#include <boost/asio.hpp>
namespace shor {
namespace dm {
namespace zee {
template<typename Service>
struct Lossy_service : public Service {
explicit Lossy_service(boost::asio::io_service& ios)
: Service(ios)
{ }
void destory(typename Service::implementation_type& impl)
{ }
};
} // namespace zee
} // namespace dm
} // namespace shor
#endif
==> zee/msg.cpp <==
#include "dm/zee/msg.hpp"
shor::dm::zee::detail::Msg::Frame::Frame(
Frame&& that)
: that_(std::move(that.that_))
{ }
shor::dm::zee::detail::Msg::Frame::Frame(
zmq::message_t&& that)
: that_(std::move(that))
{ }
shor::dm::zee::detail::Msg::Frame::Frame(
const std::string& the_other)
: that_(the_other.size())
{
std::copy(the_other.begin(), the_other.end(),
static_cast<char*>(that_.data()));
}
shor::dm::zee::detail::Msg::Frame&
shor::dm::zee::detail::Msg::Frame::operator = (
Frame&& that)
{
that_ = std::move(that.that_);
return *this;
}
shor::dm::zee::detail::Msg::Frame&
shor::dm::zee::detail::Msg::Frame::operator = (
const std::string& the_other)
{
that_.rebuild(the_other.size());
std::copy(the_other.begin(), the_other.end(),
static_cast<char*>(that_.data()));
return *this;
}
shor::dm::zee::detail::Msg::Msg(
Msg&& that)
: frames_(std::move(that.frames_))
{ }
shor::dm::zee::detail::Msg&
shor::dm::zee::detail::Msg::operator = (
Msg&& that)
{
frames_ = std::move(that.frames_);
return *this;
}
==> zee/msg.hpp <==
#ifndef INCLUDE_SHOR_DM_ZEE_MSG_HPP
#define INCLUDE_SHOR_DM_ZEE_MSG_HPP
// My header
#include "dm/zee/msg_fwd.hpp"
// DM headers
#include "dm/config.hpp"
// 0MQ headers
#include <zmq.hpp>
// C++ headers
#include <deque>
namespace shor {
namespace dm {
namespace zee {
namespace detail {
struct Msg {
class Frame {
zmq::message_t that_;
public:
SHOR_DM_DEFAULT_CTOR(Frame());
Frame(Frame&& that);
Frame(zmq::message_t&& that);
Frame(const std::string& the_other);
SHOR_DM_DEFAULT_DTOR(~Frame());
Frame& operator = (Frame&& that);
Frame& operator = (const std::string& the_other);
operator zmq::message_t& ();
operator const zmq::message_t& () const;
zmq::message_t& get();
const zmq::message_t& cget() const;
const zmq::message_t& get() const;
private:
SHOR_DM_DELETE(Frame(const Frame& that));
SHOR_DM_DELETE(Frame& operator = (const Frame& that));
};
typedef std::deque<Frame> Frames;
private:
Frames frames_;
public:
SHOR_DM_DEFAULT_CTOR(Msg());
Msg(Msg&& that);
SHOR_DM_DEFAULT_DTOR(~Msg());
Msg& operator = (Msg&& that);
Frames& frames();
const Frames& frames() const;
private:
SHOR_DM_DELETE(Msg(const Msg& that));
SHOR_DM_DELETE(Msg& operator = (const Msg& that));
};
} // namespace detail
typedef detail::Msg Msg;
} // namespace zee
} // namespace dm
} // namespace shor
#include "dm/zee/msg.ipp"
#endif
==> zee/msg.ipp <==
#ifndef INCLUDE_SHOR_DM_ZEE_MSG_IPP
#define INCLUDE_SHOR_DM_ZEE_MSG_IPP
// My header
#include "dm/zee/msg.hpp"
inline shor::dm::zee::detail::Msg::Frame::operator
zmq::message_t& ()
{
return that_;
}
inline shor::dm::zee::detail::Msg::Frame::operator
const zmq::message_t& () const
{
return that_;
}
inline zmq::message_t&
shor::dm::zee::detail::Msg::Frame::get()
{
return that_;
}
inline const zmq::message_t&
shor::dm::zee::detail::Msg::Frame::cget()
const
{
return that_;
}
inline const zmq::message_t&
shor::dm::zee::detail::Msg::Frame::get()
const
{
return that_;
}
inline shor::dm::zee::detail::Msg::Frames&
shor::dm::zee::detail::Msg::frames()
{
return frames_;
}
inline const shor::dm::zee::detail::Msg::Frames&
shor::dm::zee::detail::Msg::frames()
const
{
return frames_;
}
#endif
==> zee/msg_fwd.hpp <==
#ifndef INCLUDE_SHOR_DM_ZEE_MSG_FWD_HPP
#define INCLUDE_SHOR_DM_ZEE_MSG_FWD_HPP
namespace shor {
namespace dm {
namespace zee {
namespace detail {
struct Msg;
} // namespace detail
typedef detail::Msg Msg;
} // namespace zee
} // namespace dm
} // namespace shor
#endif
==> zee/posix_zmq_socket.cpp <==
// My header
#include "dm/zee/posix_zmq_socket.hpp"
// DM headers
#include "dm/zee/handler.hpp"
// Boost C++ headers
#include <boost/bind.hpp>
struct shor::dm::zee::detail::Posix_zmq_socket::Read_handler {
typedef Posix_zmq_socket::implementation_type Io_type;
typedef Posix_zmq_socket::Base_read_handler Base_read_handler;
private:
Io_type& io_;
Base_read_handler read_handler_;
Async_recv_handler recv_handler_;
public:
Read_handler(
Io_type& io,
Base_read_handler read_handler,
Async_recv_handler recv_handler)
: io_(io),
read_handler_(read_handler),
recv_handler_(recv_handler)
{ }
void operator () (
const boost::system::error_code& err,
std::size_t /*bytes_transferred*/)
{
read_handler_(*this, err, recv_handler_);
}
void operator () ()
{
io_.async_read_some(boost::asio::null_buffers(), *this);
}
};
shor::dm::zee::detail::Posix_zmq_socket::Posix_zmq_socket(
boost::asio::io_service& ios,
zmq::context_t& zenv,
const int zmq_socket_type)
: Zmq_socket_base(ios, zenv, zmq_socket_type),
zmq_fd_(Zmq_socket_base::zmq_socket()),
io_(ios, zmq_fd_()),
io_release_(&io_, Release())
{ }
void
shor::dm::zee::detail::Posix_zmq_socket::async_recv(
Async_recv_handler recv_handler)
{
const auto base_read_handler_fn = boost::bind(
&Posix_zmq_socket::read_handler, this, _1, _2, _3);
Read_handler read_handler(
std::ref(io_), base_read_handler_fn, recv_handler);
read_handler();
}
void
shor::dm::zee::detail::Posix_zmq_socket::close_ext()
{
this->cancel();
// TODO: reset zmq_fd_
io_.release();
}
==> zee/posix_zmq_socket.hpp <==
#ifndef INCLUDE_SHOR_DM_ZEE_POSIX_ZMQ_SOCKET_HPP
#define INCLUDE_SHOR_DM_ZEE_POSIX_ZMQ_SOCKET_HPP
// Base class header
#include "dm/zee/zmq_socket_base.hpp"
// DM headers
#include "dm/zee/handler.hpp"
#include "dm/zee/lossy_service.hpp"
#include "dm/zee/basic_zmq_fd.hpp"
// Boost C++ headers
#include <boost/asio/posix/basic_stream_descriptor.hpp>
#include <boost/asio/posix/stream_descriptor_service.hpp>
#include <boost/asio.hpp>
namespace shor {
namespace dm {
namespace zee {
namespace detail {
class Posix_zmq_socket : public Zmq_socket_base {
using Stream_descriptor_service =
boost::asio::posix::stream_descriptor_service;
template<typename Service> using Basic_stream_descriptor =
boost::asio::posix::basic_stream_descriptor<Service>;
// Borrowed from <http://preview.tinyurl.com/n6jh2ne> but it
// doesn't seem to work; i.e. I was still finding that the file
// descriptor was being closed, causing a crash at shutdown.
// Because of this, I (re)introduced Release (see below).
typedef Lossy_service<Stream_descriptor_service> service_type;
typedef Basic_stream_descriptor<service_type> implementation_type;
struct Read_handler;
struct Release {
void operator () (implementation_type* ptr)
{ if (ptr->is_open()) { ptr->release(); } }
};
Basic_zmq_fd<int> zmq_fd_;
implementation_type io_;
std::unique_ptr<implementation_type, Release> io_release_;
public:
Posix_zmq_socket(
boost::asio::io_service& ios, zmq::context_t& zenv,
int zmq_socket_type);
SHOR_DM_DEFAULT_DTOR(~Posix_zmq_socket());
void async_recv(Async_recv_handler handler);
void cancel();
private:
SHOR_DM_DELETE(Posix_zmq_socket(const Posix_zmq_socket& that));
SHOR_DM_DELETE(
Posix_zmq_socket& operator = (const Posix_zmq_socket& that));
virtual void close_ext() override;
};
} // namespace detail
typedef detail::Posix_zmq_socket Posix_zmq_socket;
} // namespace zee
} // namespace dm
} // namespace shor
#include "dm/zee/posix_zmq_socket.ipp"
#endif
==> zee/posix_zmq_socket.ipp <==
#ifndef INCLUDE_SHOR_DM_ZEE_POSIX_ZMQ_SOCKET_IPP
#define INCLUDE_SHOR_DM_ZEE_POSIX_ZMQ_SOCKET_IPP
// My header
#include "dm/zee/posix_zmq_socket.hpp"
inline void
shor::dm::zee::detail::Posix_zmq_socket::cancel()
{
io_.cancel();
}
#endif
==> zee/windows_zmq_socket.cpp <==
// My header
#include "dm/zee/windows_zmq_socket.hpp"
// DM headers
#include "dm/zee/handler.hpp"
// Boost C++ headers
#include <boost/system/error_code.hpp>
#include <boost/system/system_error.hpp>
#include <boost/asio/error.hpp>
#include <boost/bind.hpp>
struct shor::dm::zee::detail::Windows_zmq_socket::Read_handler {
typedef Windows_zmq_socket::implementation_type Io_type;
typedef Windows_zmq_socket::Base_read_handler Base_read_handler;
private:
Io_type& io_;
Base_read_handler read_handler_;
Async_recv_handler recv_handler_;
public:
Read_handler(
Io_type& io,
Base_read_handler read_handler,
Async_recv_handler recv_handler)
: io_(io),
read_handler_(read_handler),
recv_handler_(recv_handler)
{ }
void operator () (const boost::system::error_code& err)
{
read_handler_(*this, err, recv_handler_);
}
void operator () ()
{
io_.async_wait(*this);
}
};
shor::dm::zee::detail::Windows_zmq_socket::Windows_zmq_socket(
boost::asio::io_service& ios,
zmq::context_t& zenv,
const int zmq_socket_type)
: Zmq_socket_base(ios, zenv, zmq_socket_type),
zmq_fd_(Zmq_socket_base::zmq_socket()),
io_(ios)
{
auto zfd_event = CreateEvent(
nullptr, // default security settings
false, // auto reset
false, // initial state: unsignalled
nullptr); // anonymous object
if (!zfd_event) {
const boost::system::error_code an_error_code(
GetLastError(),
boost::asio::error::get_system_category());
throw boost::system::system_error(an_error_code, "CreateEvent");
}
io_.assign(zfd_event);
if (WSAEventSelect(zmq_fd_(), zfd_event, FD_READ) == SOCKET_ERROR) {
const boost::system::error_code an_error_code(
WSAGetLastError(),
boost::asio::error::get_system_category());
throw boost::system::system_error(
an_error_code, "WSAEventSelect");
}
}
void
shor::dm::zee::detail::Windows_zmq_socket::async_recv(
Async_recv_handler recv_handler)
{
const auto base_read_handler_fn = boost::bind(
&Windows_zmq_socket::read_handler, this, _1, _2, _3);
Read_handler read_handler(
std::ref(io_), base_read_handler_fn, recv_handler);
read_handler();
}
void
shor::dm::zee::detail::Windows_zmq_socket::close_ext()
{
this->cancel();
// TODO: reset zmq_fd_
io_.close();
}
==> zee/windows_zmq_socket.hpp <==
#ifndef INCLUDE_SHOR_DM_ZEE_WINDOWS_ZMQ_SOCKET_HPP
#define INCLUDE_SHOR_DM_ZEE_WINDOWS_ZMQ_SOCKET_HPP
// Base class header
#include "dm/zee/zmq_socket_base.hpp"
// DM headers
#include "dm/zee/basic_zmq_fd.hpp"
#include "dm/zee/handler.hpp"
// Boost C++ headers
#include <boost/asio/windows/object_handle.hpp>
#include <boost/asio.hpp>
namespace shor {
namespace dm {
namespace zee {
namespace detail {
class Windows_zmq_socket : public Zmq_socket_base {
typedef boost::asio::windows::object_handle implementation_type;
struct Read_handler;
Basic_zmq_fd<SOCKET> zmq_fd_;
implementation_type io_;
public:
Windows_zmq_socket(
boost::asio::io_service& ios, zmq::context_t& zenv,
int zmq_socket_type);
SHOR_DM_DEFAULT_DTOR(~Windows_zmq_socket());
void async_recv(Async_recv_handler handler);
void cancel();
private:
SHOR_DM_DELETE(Windows_zmq_socket(const Windows_zmq_socket& that));
SHOR_DM_DELETE(
Windows_zmq_socket& operator = (const Windows_zmq_socket& that));
virtual void close_ext() override;
};
} // namespace detail
typedef detail::Windows_zmq_socket Windows_zmq_socket;
} // namespace zee
} // namespace dm
} // namespace shor
#include "dm/zee/windows_zmq_socket.ipp"
#endif
==> zee/windows_zmq_socket.ipp <==
#ifndef INCLUDE_SHOR_DM_ZEE_WINDOWS_ZMQ_SOCKET_IPP
#define INCLUDE_SHOR_DM_ZEE_WINDOWS_ZMQ_SOCKET_IPP
// My header
#include "dm/zee/windows_zmq_socket.hpp"
inline void
shor::dm::zee::detail::Windows_zmq_socket::cancel()
{
io_.cancel();
}
#endif
==> zee/zmq_socket.hpp <==
#ifndef INCLUDE_SHOR_DM_ZEE_ZMQ_SOCKET_HPP
#define INCLUDE_SHOR_DM_ZEE_ZMQ_SOCKET_HPP
// My header
#include "dm/zee/zmq_socket_fwd.hpp"
// Conditional headers
#ifndef _MSC_VER
# include "dm/zee/posix_zmq_socket.hpp"
#else
# include "dm/zee/windows_zmq_socket.hpp"
#endif
// Boost C++ headers
#include <boost/asio.hpp>
namespace shor {
namespace dm {
namespace zee {
#ifndef _MSC_VER
typedef Posix_zmq_socket Zmq_socket;
#else
typedef Windows_zmq_socket Zmq_socket;
#endif
} // namespace zee
} // namespace dm
} // namespace shor
#endif
==> zee/zmq_socket_base.cpp <==
// My headers
#include "dm/zee/zmq_socket_base.hpp"
// C++ headers
#include <cassert>
shor::dm::zee::detail::Zmq_socket_base::Opts::Opts(
zmq::socket_t& zmq_socket)
: zmq_socket_(zmq_socket)
{ }
shor::dm::zee::detail::Zmq_socket_base::Zmq_socket_base(
boost::asio::io_service& ios,
zmq::context_t& zenv,
const int zmq_socket_type)
: ios_(ios),
zenv_(zenv),
zmq_socket_(zenv, zmq_socket_type),
opts_(zmq_socket_)
{
opts_.linger(0);
}
bool
shor::dm::zee::detail::Zmq_socket_base::send(
Msg msg)
{
if (msg.frames().empty()) {
auto frame = Msg::Frame();
const auto send_p = zmq_socket_.send(frame);
static_cast<void>(opts_.events());
return send_p;
}
const auto first = msg.frames().begin();
const auto last = std::prev(msg.frames().end());
for (auto pos = first; pos != last; ++pos) {
const auto send_p = zmq_socket_.send(*pos, ZMQ_SNDMORE);
static_cast<void>(opts_.events());
if (!send_p) {
return false;
}
}
const auto send_p = zmq_socket_.send(msg.frames().back(), 0);
static_cast<void>(opts_.events());
return send_p;
}
shor::dm::zee::Msg
shor::dm::zee::detail::Zmq_socket_base::recv()
{
Msg my_value;
int more_p = 0;
do {
zmq::message_t frame;
const int flags = 0;
zmq_socket_.recv(&frame, flags);
static_cast<void>(opts_.events());
my_value.frames().push_back(std::move(frame));
more_p = opts_.rcvmore();
} while (more_p);
return my_value;
}
void
shor::dm::zee::detail::Zmq_socket_base::read_handler(
std::function<void ()> io_fn,
const boost::system::error_code& err,
const Async_recv_handler recv_handler)
{
if (err == boost::asio::error::operation_aborted) {
auto msg = Msg();
recv_handler(err, msg);
} else if (opts_.events() & ZMQ_POLLIN) {
do {
auto msg = this->recv();
recv_handler(err, msg);
} while (opts_.events() & ZMQ_POLLIN);
} else {
io_fn();
}
}
==> zee/zmq_socket_base.hpp <==
#ifndef INCLUDE_SHOR_DM_ZEE_ZMQ_SOCKET_BASE_HPP
#define INCLUDE_SHOR_DM_ZEE_ZMQ_SOCKET_BASE_HPP
// DM headers
#include "dm/zee/handler.hpp"
#include "dm/zee/msg.hpp"
// 0MQ headers
#include <zmq.hpp>
// Boost C++ headers
#include <boost/asio.hpp>
// C++ headers
#include <string>
namespace shor {
namespace dm {
namespace zee {
namespace detail {
struct Zmq_socket_base {
class Opts {
zmq::socket_t& zmq_socket_;
public:
explicit Opts(zmq::socket_t& zmq_socket);
SHOR_DM_DEFAULT_DTOR(~Opts());
void linger(int value);
void subscribe(const std::string& value);
int events() const;
int linger() const;
int rcvmore() const;
private:
template<typename Opt_value>
Opt_value getsockopt(int opt_name) const;
void setsockopt(int opt_name, const std::string& opt_value);
template<typename Opt_value>
void setsockopt(int opt_name, const Opt_value& opt_value);
SHOR_DM_DELETE(Opts(const Opts& that));
SHOR_DM_DELETE(Opts& operator = (const Opts& that));
};
private:
boost::asio::io_service& ios_;
zmq::context_t& zenv_;
zmq::socket_t zmq_socket_;
Opts opts_;
public:
void bind(const char* addr);
void bind(const std::string& addr);
void close();
void connect(const char* addr);
void connect(const std::string& addr);
boost::asio::io_service& get_io_service();
Opts& opts();
bool send(Msg msg);
Msg recv();
bool connected() const;
const boost::asio::io_service& get_const_io_service() const;
const boost::asio::io_service& get_io_service() const;
const Opts& opts() const;
protected:
typedef std::function<void ()> Void_fn;
typedef std::function<
void (Void_fn, const boost::system::error_code& err,
Async_recv_handler recv_handler)
> Base_read_handler;
Zmq_socket_base(
boost::asio::io_service& ios, zmq::context_t& zenv,
int zmq_socket_type);
SHOR_DM_DEFAULT_DTOR(virtual ~Zmq_socket_base());
void read_handler(
std::function<void ()> io_fn,
const boost::system::error_code& err,
Async_recv_handler handler);
zmq::context_t& zenv();
zmq::socket_t& zmq_socket();
const zmq::context_t& const_zenv() const;
const zmq::socket_t& const_zmq_socket() const;
const zmq::context_t& zenv() const;
const zmq::socket_t& zmq_socket() const;
private:
SHOR_DM_DELETE(Zmq_socket_base(const Zmq_socket_base& that));
SHOR_DM_DELETE(
Zmq_socket_base& operator = (const Zmq_socket_base& that));
virtual void close_ext() = 0;
};
} // namespace detail
typedef detail::Zmq_socket_base Zmq_socket_base;
} // namespace zee
} // namespace dm
} // namespace shor
#include "dm/zee/zmq_socket_base.ipp"
#endif
==> zee/zmq_socket_base.ipp <==
#ifndef INCLUDE_SHOR_DM_ZEE_ZMQ_SOCKET_BASE_IPP
#define INCLUDE_SHOR_DM_ZEE_ZMQ_SOCKET_BASE_IPP
// My header
#include "dm/zee/zmq_socket_base.hpp"
inline void
shor::dm::zee::detail::Zmq_socket_base::Opts::linger(
const int value)
{
this->setsockopt(ZMQ_LINGER, value);
}
inline void
shor::dm::zee::detail::Zmq_socket_base::Opts::subscribe(
const std::string& value)
{
this->setsockopt(ZMQ_SUBSCRIBE, value);
}
inline int
shor::dm::zee::detail::Zmq_socket_base::Opts::events()
const
{
return this->getsockopt<int>(ZMQ_EVENTS);
}
inline int
shor::dm::zee::detail::Zmq_socket_base::Opts::linger()
const
{
return this->getsockopt<int>(ZMQ_LINGER);
}
inline int
shor::dm::zee::detail::Zmq_socket_base::Opts::rcvmore()
const
{
return this->getsockopt<int>(ZMQ_RCVMORE);
}
template<typename Opt_value>
Opt_value
shor::dm::zee::detail::Zmq_socket_base::Opts::getsockopt(
const int opt_name)
const
{
Opt_value opt_value;
std::size_t opt_size = sizeof(Opt_value);
zmq_socket_.getsockopt(opt_name, &opt_value, &opt_size);
return opt_value;
}
inline void
shor::dm::zee::detail::Zmq_socket_base::Opts::setsockopt(
const int opt_name,
const std::string& opt_value)
{
zmq_socket_.setsockopt(opt_name, opt_value.c_str(), opt_value.size());
}
template<typename Opt_value>
inline void
shor::dm::zee::detail::Zmq_socket_base::Opts::setsockopt(
const int opt_name,
const Opt_value& opt_value)
{
const std::size_t opt_size = sizeof(Opt_value);
zmq_socket_.setsockopt(opt_name, &opt_value, opt_size);
}
inline void
shor::dm::zee::detail::Zmq_socket_base::bind(
const char* addr)
{
zmq_socket_.bind(addr);
}
inline void
shor::dm::zee::detail::Zmq_socket_base::bind(
const std::string& addr)
{
zmq_socket_.bind(addr.c_str());
}
inline void
shor::dm::zee::detail::Zmq_socket_base::close()
{
this->close_ext();
zmq_socket_.close();
}
inline void
shor::dm::zee::detail::Zmq_socket_base::connect(
const char* addr)
{
zmq_socket_.connect(addr);
}
inline void
shor::dm::zee::detail::Zmq_socket_base::connect(
const std::string& addr)
{
zmq_socket_.connect(addr.c_str());
}
inline boost::asio::io_service&
shor::dm::zee::detail::Zmq_socket_base::get_io_service()
{
return ios_;
}
inline shor::dm::zee::detail::Zmq_socket_base::Opts&
shor::dm::zee::detail::Zmq_socket_base::opts()
{
return opts_;
}
inline bool
shor::dm::zee::detail::Zmq_socket_base::connected()
const
{
// This function should have been const in zmq::socket_t
return const_cast<zmq::socket_t&>(zmq_socket_).connected();
}
inline const boost::asio::io_service&
shor::dm::zee::detail::Zmq_socket_base::get_const_io_service()
const
{
return ios_;
}
inline const boost::asio::io_service&
shor::dm::zee::detail::Zmq_socket_base::get_io_service()
const
{
return ios_;
}
inline zmq::context_t&
shor::dm::zee::detail::Zmq_socket_base::zenv()
{
return zenv_;
}
inline zmq::socket_t&
shor::dm::zee::detail::Zmq_socket_base::zmq_socket()
{
return zmq_socket_;
}
inline const zmq::context_t&
shor::dm::zee::detail::Zmq_socket_base::const_zenv()
const
{
return zenv_;
}
inline const zmq::socket_t&
shor::dm::zee::detail::Zmq_socket_base::const_zmq_socket()
const
{
return zmq_socket_;
}
inline const zmq::context_t&
shor::dm::zee::detail::Zmq_socket_base::zenv()
const
{
return zenv_;
}
inline const zmq::socket_t&
shor::dm::zee::detail::Zmq_socket_base::zmq_socket()
const
{
return zmq_socket_;
}
#endif
==> zee/zmq_socket_fwd.hpp <==
#ifndef INCLUDE_SHOR_DM_ZEE_ZMQ_SOCKET_FWD_HPP
#define INCLUDE_SHOR_DM_ZEE_ZMQ_SOCKET_FWD_HPP
namespace shor {
namespace dm {
namespace zee {
namespace detail {
#ifndef _MSC_VER
class Posix_zmq_socket;
#else
class Windows_zmq_socket;
#endif
} // namespace detail
#ifndef _MSC_VER
typedef detail::Posix_zmq_socket Posix_zmq_socket;
typedef Posix_zmq_socket Zmq_socket;
#else
typedef detail::Windows_zmq_socket Windows_zmq_socket;
typedef Windows_zmq_socket Zmq_socket;
#endif
} // namespace zee
} // namespace dm
} // namespace shor
#endif
________________________________
This e-mail and any attachments are confidential. If it is not intended for you, please notify the sender, and please erase and ignore the contents.
Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net