#include #include #include #include #include #include using Middleware::Runnable; using Middleware::ThreadDelegate; using Middleware::RequestProcessor; class ThreadPool; class Processor : public Runnable { ThreadPool *pool; bool *shutdown; public: Processor(ThreadPool *p, bool *s); void operator()(); }; class ThreadPool { public: ThreadPool(int n, bool *sflag) : nthreads(n), shutdown(sflag) { boost::recursive_mutex::scoped_lock lk(pool_mutex, false); threads = new boost::thread*[n]; nthreads = n; active = 0; for(int i=0; ijoin(); } } void addRequest(Netxx::Peer &p) { boost::recursive_mutex::scoped_lock lk(pool_mutex); requests.push_back(p); has_requests.notify_one(); } Netxx::Peer nextRequest() { boost::recursive_mutex::scoped_lock lk(pool_mutex); while(requests.size() == 0) { active--; has_requests.wait(lk); active++; } Netxx::Peer client = requests.front(); requests.pop_front(); return client; } int getActiveCount() { boost::recursive_mutex::scoped_lock lk(pool_mutex); return active; } private: boost::thread **threads; std::list requests; boost::recursive_mutex pool_mutex; boost::condition has_requests; bool *shutdown; int nthreads; int active; }; inline Processor::Processor(ThreadPool *p, bool *s) : pool(p), shutdown(s) {} inline void Processor::operator()() { while(!*shutdown) { Netxx::Peer client = pool->nextRequest(); Middleware::RequestProcessor process(client, shutdown); process(); } }