#include "WorkerThreadManager.h" #include "gateway_log.h" #include /** * @brief Creates semaphore and starts threads. */ WorkerThreadManager::WorkerThreadManager(unsigned threadsNumber) : m_threadsNumber(threadsNumber) { if ( sem_init(&m_tasksSem, 0, 0) ) { std::stringstream ss; ss << "Semaphore could not be initialized: " << errno << " - " << strerror(errno); LOG_FATAL(ss); throw std::runtime_error(ss.str()); } m_pthreads = new pthread_t[m_threadsNumber]; for (unsigned i = 0; i < m_threadsNumber; ++i) { int rc = pthread_create(&m_pthreads[i], NULL, WorkerThreadManager::WorkerThread, (void*) this ); if(rc) { std::stringstream ss; ss << "Pthread could not be started: " << errno << " - " << strerror(errno); LOG_FATAL(ss.str()); if ( sem_destroy(&m_tasksSem) ) LOG_ERROR("Semaphore could not be destroyed: " << errno << " - " << strerror(errno)); delete [] m_pthreads; throw std::runtime_error(ss.str()); } else { LOG_DEBUG("Worker thread started " << m_pthreads[i]); if(pthread_detach(m_pthreads[i])) LOG_WARN("Failed to detach worker thread"); } } } /** * @brief Cancels all threads, destroys semaphore */ WorkerThreadManager::~WorkerThreadManager() { LOG_DEBUG("~WorkerThreadManager()"); for(unsigned i = 0; i < m_threadsNumber; ++i) { if ( pthread_cancel(m_pthreads[i]) ) LOG_ERROR("Worker thread cancellation failed"); } if ( sem_destroy(&m_tasksSem) ) LOG_ERROR("Semaphore could not be destroyed: " << errno << " - " << strerror(errno)); delete [] m_pthreads; } /** * @brief Adds new task to queue, so worker threads can * @param A_threadFun function which will be executed by thread * @param A_resultFun function which will be enqueued for calling with return value of A_threadFun as parameter * after worker thread executes A_threadFun. */ void WorkerThreadManager::PushTask(thread_function_t A_threadFun, result_function_t A_resultFun) { utils::ScopedLock mutex(m_tasksMutex); worker_thread_data_t data(A_threadFun, A_resultFun); m_tasks.push( data ); sem_post(&m_tasksSem); LOG_DEBUG("Task for worker threads has been added to queue"); } /** * @brief Executes result functions (if there are any) to give feedback * to classes which requested task execution in worker thread. */ void WorkerThreadManager::SignalResults() { while(true) { signal_function_t signal; { utils::ScopedLock mutex(m_resultsMutex); if(m_results.size()) { signal = m_results.front(); m_results.pop(); } else return; } signal(); } } /** * @brief Enqueues result of function executed in worker thread. * @param A_result return value of function executed in worker thread * @param A_resultFun function which will be enqueued for calling with A_result as a parameter. */ void WorkerThreadManager::PushResult(int A_result, result_function_t A_resultFun) { utils::ScopedLock mutex(m_resultsMutex); signal_function_t signal = boost::bind(A_resultFun, A_result); m_results.push( signal ); } /** * @brief worker thread body * @param A_data pointer to WorkerThreadManager instance */ void* WorkerThreadManager::WorkerThread(void* A_data) { WorkerThreadManager* manager = reinterpret_cast(A_data); LOG_DEBUG("Starting worker thread loop"); while (1) { if ( -1 == sem_wait(&manager->m_tasksSem) && errno == EINTR ) { LOG_DEBUG("sem_wait interrupted with signal"); continue; } LOG_DEBUG("WorkerThread:::::: about to call lock mutex"); worker_thread_data_t data; { utils::ScopedLock mutex(manager->m_tasksMutex); data = manager->m_tasks.front(); manager->m_results.pop(); } LOG_DEBUG("WorkerThread:::::: about to call resultFun"); int result = data.threadFun(); LOG_DEBUG("WorkerThread:::::: after call resultFun"); pthread_testcancel(); manager->PushResult(result, data.resultFun); } return NULL; }