//C++ Includes #include #include //Boost Includes #include #include #include #include #include #include //Rdss Include #include class ReceiveProcessSend { public: struct Arguments { public: Arguments() { using boost::asio::ip::udp; using boost::asio::ip::address; receive_interface = udp::endpoint(address::from_string("127.0.0.1"), 36000); receive_multicast = boost::optional
(); send_interface = udp::endpoint(address::from_string("127.0.0.1"), 35000); destination_ip = udp::endpoint(address::from_string("127.0.0.1"), 36000); } virtual ~Arguments() {} boost::asio::ip::udp::endpoint receive_interface; ///< ip receive interface boost::optional receive_multicast; ///< ip receive multicast boost::asio::ip::udp::endpoint send_interface; ///< ip send interface boost::asio::ip::udp::endpoint destination_ip; ///< destination Ip }; private: enum { max_length = 1450 }; ///buffer to receive data typedef std::vector t_buffer; typedef boost::shared_ptr t_boostVector; typedef std::list t_boostList; typedef rdss::pool::Object t_buffer_pool; ///< call back process function typedef boost::function< t_boostList( t_boostVector, t_buffer::size_type)> t_ProcessFunction; public: ReceiveProcessSend( boost::asio::io_service& io_service, t_ProcessFunction func, Arguments const& arguments = Arguments()) : m_ReceiveSocket(io_service) , m_SendSocket(io_service) , m_DestinationIP(arguments.destination_ip) , m_ProcessFunction(func) { m_Pool = t_buffer_pool::create(2, t_buffer(max_length, 0)); // Create the socket so that multiple may be bound to the same address. m_ReceiveSocket.open(arguments.receive_interface.protocol()); m_ReceiveSocket.set_option(boost::asio::ip::udp::socket::reuse_address(true)); m_ReceiveSocket.bind(arguments.receive_interface); // Join the multicast group. if (arguments.receive_multicast) { m_ReceiveSocket.set_option( boost::asio::ip::multicast::join_group( arguments.getReceiveMulticast()->to_v4(), m_ReceiveSocket.local_endpoint().address().to_v4())); } // Create the sending socket. m_SendSocket.open(arguments.send_interface.protocol()); m_SendSocket.set_option(boost::asio::ip::udp::socket::reuse_address(true)); m_SendSocket.bind(arguments.send_interface); //Want to replace this call with a specific way to add handle_receive //directly to io_service. t_buffer_pool::boost_pointer newData = m_Pool->getObject(); m_ReceiveSocket.async_receive( boost::asio::buffer(boost::asio::buffer(*newData)), boost::bind(&ReceiveProcessSend::handle_receive, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred, newData)); } private: void handle_receive(boost::system::error_code const& error, size_t bytes_recvd, t_buffer_pool::boost_pointer data) { if (!error) { try { t_boostList sendlist = m_ProcessFunction(data, bytes_recvd); for (t_boostList::iterator iter = sendlist.begin(); iter != sendlist.end(); ++iter) { m_SendSocket.async_send_to( boost::asio::buffer(**iter), m_DestinationIP, boost::bind(&ReceiveProcessSend::handle_send_to, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred, *iter)); } } catch (std::exception& exception) { std::cerr << exception.what() << std::endl; } catch (...) {} } //Want to replace this call with a specific way to add handle_receive //directly to io_service. t_buffer_pool::boost_pointer newData = m_Pool->getObject(); m_ReceiveSocket.async_receive( boost::asio::buffer(boost::asio::buffer(*newData)), boost::bind(&ReceiveProcessSend::handle_receive, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred, newData)); } void handle_send_to( boost::system::error_code const& error, size_t, t_boostVector data) { if (!error) { } } private: Arguments const m_Arguments; ///< internal argument Structure boost::asio::ip::udp::socket m_ReceiveSocket; ///< udp receive socket boost::asio::ip::udp::socket m_SendSocket; ///< udp send socket boost::asio::ip::udp::endpoint m_DestinationIP; ///< ip destination boost::shared_ptr m_Pool; ///< buffer t_ProcessFunction m_ProcessFunction; ///< call back process function };