// // server.cpp // ~~~~~~~~~~ // // Copyright (c) 2003-2011 Christopher M. Kohlhoff (chris at kohlhoff dot com) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // // g++ mrvServer.cpp mrvClient.cpp -o server -lboost_system -lboost_thread -lpthread // #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "mrvClient.h" #include "mrvServer.h" #include "mrViewer.h" #include "gui/mrvReel.h" #include "gui/mrvImageView.h" #include "gui/mrvImageBrowser.h" using boost::asio::deadline_timer; using boost::asio::ip::tcp; //---------------------------------------------------------------------- //---------------------------------------------------------------------- namespace mrv { ParseCommands::ParseCommands( mrv::ViewerUI* v ) : ui( v ) { ui->uiView->_client = this; } bool ParseCommands::parse( const std::string& m ) { std::istringstream is( m ); std::string cmd; is >> cmd; static mrv::Reel r; if ( cmd == "Reel" ) { std::string name; is >> name; std::cerr << "Reel " << name << std::endl; r = ui->uiReelWindow->uiBrowser->reel( name.c_str() ); if (!r) { r = ui->uiReelWindow->uiBrowser->new_reel( name.c_str() ); } return true; } else if ( cmd == "Image" ) { std::string imgname; is >> imgname; imgname = imgname.substr( 1, imgname.size() - 2 ); bool found = false; if ( r ) { mrv::MediaList::iterator j = r->images.begin(); mrv::MediaList::iterator e = r->images.end(); for ( ; j != e; ++j ) { if ( (*j)->image() && (*j)->image()->fileroot() == imgname ) { found = true; } } if (!found) { stringArray files; files.push_back( imgname ); std::cerr << "load " << imgname << std::endl; ui->uiReelWindow->uiBrowser->load( files, false ); } } return true; } if ( cmd == "sync_image" ) { unsigned num = ui->uiReelWindow->uiBrowser->number_of_reels(); for (unsigned i = 0; i < num; ++i ) { mrv::Reel r = ui->uiReelWindow->uiBrowser->reel( i ); std::string cmd = "Reel "; cmd += r->name; send( cmd ); mrv::MediaList::iterator j = r->images.begin(); mrv::MediaList::iterator e = r->images.end(); for ( ; j != e; ++j ) { cmd = "Image \""; cmd += (*j)->image()->fileroot(); cmd += "\""; send( cmd ); } } return true; } else if ( cmd == "stop" ) { mrv::ParseCommands* c = ui->uiView->_client; ui->uiView->_client = NULL; boost::int64_t f; is >> f; ui->uiView->stop(); ui->uiView->_client = c; return true; } else if ( cmd == "playfwd" ) { mrv::ParseCommands* c = ui->uiView->_client; ui->uiView->_client = NULL; ui->uiView->play_forwards(); ui->uiView->_client = c; return true; } else if ( cmd == "playback" ) { mrv::ParseCommands* c = ui->uiView->_client; ui->uiView->_client = NULL; ui->uiView->play_backwards(); ui->uiView->_client = c; return true; } else if ( cmd == "seek" ) { mrv::ParseCommands* c = ui->uiView->_client; ui->uiView->_client = NULL; boost::int64_t f; is >> f; ui->uiView->seek( f ); ui->uiView->_client = c; return true; } // else if ( cmd == "client" ) // { // ServerData* s = new ServerData; // is >> s->host; // is >> s->group; // s->port = atol( s->group.c_str() ); // s->ui = ui; // std::cerr << "START CLIENT THREAD " << std::endl; // boost::thread t( client_thread, s ); // return true; // } return false; } //---------------------------------------------------------------------- // // This class manages socket timeouts by applying the concept of a deadline. // Some asynchronous operations are given deadlines by which they must complete. // Deadlines are enforced by two "actors" that persist for the lifetime of the // session object, one for input and one for output: // // +----------------+ +----------------+ // | | | | // | check_deadline |<---+ | check_deadline |<---+ // | | | async_wait() | | | async_wait() // +----------------+ | on input +----------------+ | on output // | | deadline | | deadline // +---------+ +---------+ // // If either deadline actor determines that the corresponding deadline has // expired, the socket is closed and any outstanding operations are cancelled. // // The input actor reads messages from the socket, where messages are delimited // by the newline character: // // +------------+ // | | // | start_read |<---+ // | | | // +------------+ | // | | // async_- | +-------------+ // read_- | | | // until() +--->| handle_read | // | | // +-------------+ // // The deadline for receiving a complete message is 30 seconds. If a non-empty // message is received, it is delivered to all subscribers. If a heartbeat (a // message that consists of a single newline character) is received, a heartbeat // is enqueued for the client, provided there are no other messages waiting to // be sent. // // The output actor is responsible for sending messages to the client: // // +--------------+ // | |<---------------------+ // | await_output | | // | |<---+ | // +--------------+ | | // | | | async_wait() | // | +--------+ | // V | // +-------------+ +--------------+ // | | async_write() | | // | start_write |-------------->| handle_write | // | | | | // +-------------+ +--------------+ // // The output actor first waits for an output message to be enqueued. It does // this by using a deadline_timer as an asynchronous condition variable. The // deadline_timer will be signalled whenever the output queue is non-empty. // // Once a message is available, it is sent to the client. The deadline for // sending a complete message is 30 seconds. After the message is successfully // sent, the output actor again waits for the output queue to become non-empty. // class tcp_session : public ParseCommands, public boost::enable_shared_from_this { public: tcp_session(boost::asio::io_service& io_service, mrv::ViewerUI* v) : ParseCommands( v ), socket_(io_service), input_deadline_(io_service), non_empty_output_queue_(io_service), output_deadline_(io_service) { input_deadline_.expires_at(boost::posix_time::pos_infin); output_deadline_.expires_at(boost::posix_time::pos_infin); // The non_empty_output_queue_ deadline_timer is set to pos_infin whenever // the output queue is empty. This ensures that the output actor stays // asleep until a message is put into the queue. non_empty_output_queue_.expires_at(boost::posix_time::pos_infin); } tcp::socket& socket() { return socket_; } // Called by the server object to initiate the four actors. void start() { start_read(); input_deadline_.async_wait( boost::bind(&tcp_session::check_deadline, shared_from_this(), &input_deadline_)); await_output(); output_deadline_.async_wait( boost::bind(&tcp_session::check_deadline, shared_from_this(), &output_deadline_)); } virtual void send( const std::string s ) { std::cerr << "send " << s << " stopped? " << stopped() << std::endl; deliver( s ); } bool stopped() const { return !socket_.is_open(); } private: void stop() { std::cerr << "STOP" << std::endl; boost::system::error_code ignored_ec; socket_.close(ignored_ec); input_deadline_.cancel(); non_empty_output_queue_.cancel(); output_deadline_.cancel(); } void deliver(const std::string& msg) { std::cerr << "deliver " << msg << " stopped? " << stopped() << std::endl; output_queue_.push_back(msg + "\n"); std::cerr << "deliver qsize " << output_queue_.size() << std::endl; // Signal that the output queue contains messages. Modifying the expiry // will wake the output actor, if it is waiting on the timer. non_empty_output_queue_.expires_at(boost::posix_time::neg_infin); non_empty_output_queue_.async_wait( boost::bind(&tcp_session::await_output, shared_from_this())); } void start_read() { // Set a deadline for the read operation. std::cerr << "start read stopped? " << stopped() << std::endl; // input_deadline_.expires_from_now(boost::posix_time::seconds(30)); input_deadline_.expires_at(boost::posix_time::pos_infin); // Start an asynchronous operation to read a newline-delimited message. boost::asio::async_read_until(socket_, input_buffer_, '\n', boost::bind(&tcp_session::handle_read, shared_from_this(), boost::asio::placeholders::error)); } void handle_read(const boost::system::error_code& ec) { std::cerr << "handle read stopped " << stopped() << std::endl; if (stopped()) return; if (!ec) { // Extract the newline-delimited message from the buffer. std::string msg; std::istream is(&input_buffer_); is.exceptions( std::ifstream::failbit | std::ifstream::badbit | std::ifstream::eofbit ); try { while ( std::getline(is, msg) ) { if ( msg != "" && msg != "OK" && msg != "Not OK") { std::cerr << "MESSAGE IS " << msg << std::endl; if ( parse( msg ) ) deliver( "OK" ); else deliver( "Not OK" ); } } } catch ( std::ios_base::failure e ) { std::cerr << "getline failure" << std::endl; } start_read(); } else { std::cerr << "ERROR handle_read " << ec << std::endl; stop(); } } void await_output() { std::cerr << "await output " << output_queue_.size() << " stopped? " << stopped() << std::endl; //if (stopped()) // return; if (output_queue_.empty()) { std::cerr << "\tEmpty queue" << std::endl; // There are no messages that are ready to be sent. The actor goes to // sleep by waiting on the non_empty_output_queue_ timer. When a new // message is added, the timer will be modified and the actor will wake. non_empty_output_queue_.expires_at(boost::posix_time::pos_infin); } else { typedef std::deque veclist; veclist::const_iterator i = output_queue_.begin(); veclist::const_iterator e = output_queue_.end(); unsigned idx = 1; for ( ; i != e; ++i, ++idx ) { std::cerr << "#" << idx << " is " << *i; } std::cerr << "\tFull queue" << std::endl; start_write(); } } void start_write() { std::cerr << "WRITE " << output_queue_.front() << std::endl; // Set a deadline for the write operation. // output_deadline_.expires_from_now(boost::posix_time::seconds(240)); output_deadline_.expires_at(boost::posix_time::pos_infin); // Start an asynchronous operation to send a message. boost::asio::async_write(socket_, boost::asio::buffer(output_queue_.front()), boost::bind(&tcp_session::handle_write, shared_from_this(), _1)); } void handle_write(const boost::system::error_code& ec) { std::cerr << "handle write " << stopped() << std::endl; if (stopped()) return; if (!ec) { std::cerr << "pop front" << std::endl; if ( !output_queue_.empty() ) output_queue_.pop_front(); await_output(); } else { std::cerr << "error handle write" << std::endl; stop(); } } void check_deadline(deadline_timer* deadline) { std::cerr << "check deadline stopped? " << stopped() << std::endl; if (stopped()) return; // Check whether the deadline has passed. We compare the deadline against // the current time since a new asynchronous operation may have moved the // deadline before this actor had a chance to run. if (deadline->expires_at() <= deadline_timer::traits_type::now()) { // The deadline has passed. Stop the session. The other actors will // terminate as soon as possible. std::cerr << ">>>>>>>>>>>>> Error " << deadline << " " << &input_deadline_ << " " << &output_deadline_ << std::endl; stop(); } else { // Put the actor back to sleep. deadline->async_wait( boost::bind(&tcp_session::check_deadline, shared_from_this(), deadline)); } } boost::asio::streambuf input_buffer_; deadline_timer input_deadline_; std::deque output_queue_; deadline_timer non_empty_output_queue_; deadline_timer output_deadline_; tcp::socket socket_; }; typedef boost::shared_ptr tcp_session_ptr; //---------------------------------------------------------------------- //---------------------------------------------------------------------- class server { public: server(boost::asio::io_service& io_service, const tcp::endpoint& listen_endpoint, mrv::ViewerUI* v) : io_service_(io_service), acceptor_(io_service, listen_endpoint), ui_( v ) { start_accept(); } void start_accept() { tcp_session_ptr new_session(new tcp_session(io_service_, ui_)); acceptor_.async_accept(new_session->socket(), boost::bind(&server::handle_accept, this, new_session, _1)); } void handle_accept(tcp_session_ptr session, const boost::system::error_code& ec) { if (!ec) { session->start(); } start_accept(); } private: boost::asio::io_service& io_service_; tcp::acceptor acceptor_; mrv::ViewerUI* ui_; }; //---------------------------------------------------------------------- void server_thread( const ServerData* s ) { try { boost::asio::io_service io_service; tcp::endpoint listen_endpoint(tcp::v4(), s->port); server rp(io_service, listen_endpoint, s->ui); io_service.run(); } catch (std::exception& e) { std::cerr << "Exception: " << e.what() << "\n"; } s->ui->uiView->_client = NULL; } } // namespace mrv #if 0 int main( const int argc, const char** argv ) { mrv::ServerData* data = new mrv::ServerData; if ( argc > 1 ) { data->host = argv[1]; data->ui = NULL; boost::thread t( boost::bind( mrv::client_thread, data ) ); t.join(); } else { data->ui = NULL; boost::thread t( boost::bind( mrv::server_thread, data ) ); t.join(); } return 0; } #endif