|
Boost-Commit : |
Subject: [Boost-commit] svn:boost r59237 - in sandbox/task: boost boost/task boost/task/detail boost/task/spin libs/task/build libs/task/examples libs/task/src libs/task/src/detail
From: oliver.kowalke_at_[hidden]
Date: 2010-01-23 07:25:19
Author: olli
Date: 2010-01-23 07:25:17 EST (Sat, 23 Jan 2010)
New Revision: 59237
URL: http://svn.boost.org/trac/boost/changeset/59237
Log:
- introduction of fast-semaphore used to notifyworker-threads in pool if new work-item is avaialble
Added:
sandbox/task/boost/task/fast_semaphore.hpp (contents, props changed)
sandbox/task/boost/task/semaphore.hpp (contents, props changed)
sandbox/task/libs/task/src/fast_semaphore.cpp (contents, props changed)
sandbox/task/libs/task/src/semaphore_posix.cpp (contents, props changed)
sandbox/task/libs/task/src/semaphore_windows.cpp (contents, props changed)
Removed:
sandbox/task/boost/task/detail/guard.hpp
sandbox/task/boost/task/scanns.hpp
sandbox/task/libs/task/src/detail/guard.cpp
sandbox/task/libs/task/src/scanns.cpp
Text files modified:
sandbox/task/boost/task.hpp | 3
sandbox/task/boost/task/bounded_fifo.hpp | 104 +++-------------------------
sandbox/task/boost/task/bounded_prio_queue.hpp | 144 +++------------------------------------
sandbox/task/boost/task/bounded_smart_queue.hpp | 140 +++-----------------------------------
sandbox/task/boost/task/detail/pool_base.hpp | 57 ++++-----------
sandbox/task/boost/task/detail/worker.hpp | 50 -------------
sandbox/task/boost/task/detail/wsq.hpp | 4
sandbox/task/boost/task/exceptions.hpp | 16 ----
sandbox/task/boost/task/spin/bounded_channel.hpp | 26 +-----
sandbox/task/boost/task/spin/unbounded_channel.hpp | 6 +
sandbox/task/boost/task/static_pool.hpp | 27 ++-----
sandbox/task/boost/task/unbounded_fifo.hpp | 66 +++---------------
sandbox/task/boost/task/unbounded_prio_queue.hpp | 94 ++-----------------------
sandbox/task/boost/task/unbounded_smart_queue.hpp | 90 ++----------------------
sandbox/task/libs/task/build/Jamfile.v2 | 8 +-
sandbox/task/libs/task/examples/bind_to_processors.cpp | 33 ++++++--
sandbox/task/libs/task/src/detail/wsq.cpp | 18 ++--
17 files changed, 152 insertions(+), 734 deletions(-)
Modified: sandbox/task/boost/task.hpp
==============================================================================
--- sandbox/task/boost/task.hpp (original)
+++ sandbox/task/boost/task.hpp 2010-01-23 07:25:17 EST (Sat, 23 Jan 2010)
@@ -15,12 +15,13 @@
#include <boost/task/callable.hpp>
#include <boost/task/context.hpp>
#include <boost/task/exceptions.hpp>
+#include <boost/task/fast_semaphore.hpp>
#include <boost/task/handle.hpp>
#include <boost/task/meta.hpp>
#include <boost/task/new_thread.hpp>
#include <boost/task/own_thread.hpp>
#include <boost/task/poolsize.hpp>
-#include <boost/task/scanns.hpp>
+#include <boost/task/semaphore.hpp>
#include <boost/task/spin/auto_reset_event.hpp>
#include <boost/task/spin/barrier.hpp>
#include <boost/task/spin/bounded_channel.hpp>
Modified: sandbox/task/boost/task/bounded_fifo.hpp
==============================================================================
--- sandbox/task/boost/task/bounded_fifo.hpp (original)
+++ sandbox/task/boost/task/bounded_fifo.hpp 2010-01-23 07:25:17 EST (Sat, 23 Jan 2010)
@@ -20,6 +20,7 @@
#include <boost/task/callable.hpp>
#include <boost/task/detail/meta.hpp>
#include <boost/task/exceptions.hpp>
+#include <boost/task/fast_semaphore.hpp>
#include <boost/task/watermark.hpp>
#include <boost/config/abi_prefix.hpp>
@@ -54,10 +55,10 @@
mutable mutex head_mtx_;
node::sptr_t tail_;
mutable mutex tail_mtx_;
- condition not_empty_cond_;
condition not_full_cond_;
std::size_t hwm_;
std::size_t lwm_;
+ fast_semaphore & fsem_;
bool active_() const
{ return ACTIVE == state_.load(); }
@@ -91,6 +92,7 @@
public:
bounded_fifo(
+ fast_semaphore & fsem,
high_watermark const& hwm,
low_watermark const& lwm) :
state_( ACTIVE),
@@ -99,33 +101,15 @@
head_mtx_(),
tail_( head_),
tail_mtx_(),
- not_empty_cond_(),
not_full_cond_(),
hwm_( hwm),
- lwm_( lwm)
+ lwm_( lwm),
+ fsem_( fsem)
{}
- void upper_bound_( std::size_t hwm)
- {
- if ( lwm_ > hwm )
- throw invalid_watermark();
- std::size_t tmp( hwm_);
- hwm_ = hwm;
- if ( hwm_ > tmp) not_full_cond_.notify_one();
- }
-
std::size_t upper_bound() const
{ return hwm_; }
- void lower_bound_( std::size_t lwm)
- {
- if ( lwm > hwm_ )
- throw invalid_watermark();
- std::size_t tmp( lwm_);
- lwm_ = lwm;
- if ( lwm_ > tmp) not_full_cond_.notify_one();
- }
-
std::size_t lower_bound() const
{ return lwm_; }
@@ -133,7 +117,11 @@
{ return active_(); }
void deactivate()
- { deactivate_(); }
+ {
+ unique_lock< mutex > lk( head_mtx_);
+ deactivate_();
+ not_full_cond_.notify_all();
+ }
bool empty() const
{
@@ -160,7 +148,7 @@
tail_ = new_node;
count_.fetch_add( 1);
}
- not_empty_cond_.notify_one();
+ fsem_.post();
}
template< typename TimeDuration >
@@ -186,75 +174,7 @@
tail_ = new_node;
count_.fetch_add( 1);
}
- not_empty_cond_.notify_one();
- }
-
- bool take( value_type & va)
- {
- unique_lock< mutex > lk( head_mtx_);
- bool empty = empty_();
- if ( ! active_() && empty)
- return false;
- if ( empty)
- {
- try
- {
- while ( active_() && empty_() )
- not_empty_cond_.wait( lk);
- }
- catch ( thread_interrupted const&)
- { return false; }
- }
- if ( ! active_() && empty_() )
- return false;
- va.swap( head_->va);
- pop_head_();
- if ( size_() <= lwm_)
- {
- if ( lwm_ == hwm_)
- not_full_cond_.notify_one();
- else
- // more than one producer could be waiting
- // for submiting an action object
- not_full_cond_.notify_all();
- }
- return ! va.empty();
- }
-
- template< typename TimeDuration >
- bool take(
- value_type & va,
- TimeDuration const& rel_time)
- {
- unique_lock< mutex > lk( head_mtx_);
- bool empty = empty_();
- if ( ! active_() && empty)
- return false;
- if ( empty)
- {
- try
- {
- while ( active_() && empty_() )
- if ( ! not_empty_cond_.timed_wait( lk, rel_time) )
- return false;
- }
- catch ( thread_interrupted const&)
- { return false; }
- }
- if ( ! active_() && empty_() )
- return false;
- va.swap( head_->va);
- pop_head_();
- if ( size_() <= lwm_)
- {
- if ( lwm_ == hwm_)
- not_full_cond_.notify_one();
- else
- // more than one producer could be waiting
- // for submiting an action object
- not_full_cond_.notify_all();
- }
- return ! va.empty();
+ fsem_.post();
}
bool try_take( value_type & va)
Modified: sandbox/task/boost/task/bounded_prio_queue.hpp
==============================================================================
--- sandbox/task/boost/task/bounded_prio_queue.hpp (original)
+++ sandbox/task/boost/task/bounded_prio_queue.hpp 2010-01-23 07:25:17 EST (Sat, 23 Jan 2010)
@@ -23,6 +23,7 @@
#include <boost/task/callable.hpp>
#include <boost/task/detail/meta.hpp>
#include <boost/task/exceptions.hpp>
+#include <boost/task/fast_semaphore.hpp>
#include <boost/task/watermark.hpp>
#include <boost/config/abi_prefix.hpp>
@@ -80,10 +81,10 @@
atomic< state > state_;
queue_type queue_;
mutable shared_mutex mtx_;
- condition not_empty_cond_;
condition not_full_cond_;
std::size_t hwm_;
std::size_t lwm_;
+ fast_semaphore & fsem_;
bool active_() const
{ return ACTIVE == state_.load(); }
@@ -100,24 +101,6 @@
std::size_t size_() const
{ return queue_.size(); }
- void upper_bound_( std::size_t hwm)
- {
- if ( lwm_ > hwm )
- throw invalid_watermark();
- std::size_t tmp( hwm_);
- hwm_ = hwm;
- if ( hwm_ > tmp) not_full_cond_.notify_one();
- }
-
- void lower_bound_( std::size_t lwm)
- {
- if ( lwm > hwm_ )
- throw invalid_watermark();
- std::size_t tmp( lwm_);
- lwm_ = lwm;
- if ( lwm_ > tmp) not_full_cond_.notify_one();
- }
-
void put_(
value_type const& va,
unique_lock< shared_mutex > & lk)
@@ -133,7 +116,7 @@
if ( ! active_() )
throw task_rejected("queue is not active");
queue_.push( va);
- not_empty_cond_.notify_one();
+ fsem_.post();
}
template< typename TimeDuration >
@@ -155,85 +138,7 @@
if ( ! active_() )
throw task_rejected("queue is not active");
queue_.push( va);
- not_empty_cond_.notify_one();
- }
-
- bool take_(
- callable & ca,
- unique_lock< shared_mutex > & lk)
- {
- bool empty = empty_();
- if ( ! active_() && empty)
- return false;
- if ( empty)
- {
- try
- {
- not_empty_cond_.wait(
- lk,
- bind(
- & bounded_prio_queue::consumers_activate_,
- this) );
- }
- catch ( thread_interrupted const&)
- { return false; }
- }
- if ( ! active_() && empty_() )
- return false;
- callable tmp( queue_.top().ca);
- queue_.pop();
- ca.swap( tmp);
- if ( size_() <= lwm_)
- {
- if ( lwm_ == hwm_)
- not_full_cond_.notify_one();
- else
- // more than one producer could be waiting
- // for submiting an action object
- not_full_cond_.notify_all();
- }
- return ! ca.empty();
- }
-
- template< typename TimeDuration >
- bool take_(
- callable & ca,
- TimeDuration const& rel_time,
- unique_lock< shared_mutex > & lk)
- {
- bool empty = empty_();
- if ( ! active_() && empty)
- return false;
- if ( empty)
- {
- try
- {
- if ( ! not_empty_cond_.timed_wait(
- lk,
- rel_time,
- bind(
- & bounded_prio_queue::consumers_activate_,
- this) ) )
- return false;
- }
- catch ( thread_interrupted const&)
- { return false; }
- }
- if ( ! active_() && empty_() )
- return false;
- callable tmp( queue_.top().ca);
- queue_.pop();
- ca.swap( tmp);
- if ( size_() <= lwm_)
- {
- if ( lwm_ == hwm_)
- not_full_cond_.notify_one();
- else
- // more than one producer could be waiting
- // in order to submit an task
- not_full_cond_.notify_all();
- }
- return ! ca.empty();
+ fsem_.post();
}
bool try_take_( callable & ca)
@@ -259,20 +164,18 @@
bool producers_activate_() const
{ return ! active_() || ! full_(); }
- bool consumers_activate_() const
- { return ! active_() || ! empty_(); }
-
public:
bounded_prio_queue(
+ fast_semaphore & fsem,
high_watermark const& hwm,
low_watermark const& lwm) :
state_( ACTIVE),
queue_(),
mtx_(),
- not_empty_cond_(),
not_full_cond_(),
hwm_( hwm),
- lwm_( lwm)
+ lwm_( lwm),
+ fsem_( fsem)
{
if ( lwm_ > hwm_ )
throw invalid_watermark();
@@ -282,7 +185,11 @@
{ return active_(); }
void deactivate()
- { deactivate_(); }
+ {
+ unique_lock< shared_mutex > lk( mtx_);
+ deactivate_();
+ not_full_cond_.notify_all();
+ }
bool empty() const
{
@@ -296,24 +203,12 @@
return hwm_;
}
- void upper_bound( std::size_t hwm)
- {
- unique_lock< shared_mutex > lk( mtx_);
- upper_bound_( hwm);
- }
-
std::size_t lower_bound() const
{
shared_lock< shared_mutex > lk( mtx_);
return lwm_;
}
- void lower_bound( std::size_t lwm)
- {
- unique_lock< shared_mutex > lk( mtx_);
- lower_bound_( lwm);
- }
-
void put( value_type const& va)
{
unique_lock< shared_mutex > lk( mtx_);
@@ -329,21 +224,6 @@
put_( va, rel_time, lk);
}
- bool take( callable & ca)
- {
- unique_lock< shared_mutex > lk( mtx_);
- return take_( ca, lk);
- }
-
- template< typename TimeDuration >
- bool take(
- callable & ca,
- TimeDuration const& rel_time)
- {
- unique_lock< shared_mutex > lk( mtx_);
- return take_( ca, rel_time, lk);
- }
-
bool try_take( callable & ca)
{
unique_lock< shared_mutex > lk( mtx_);
Modified: sandbox/task/boost/task/bounded_smart_queue.hpp
==============================================================================
--- sandbox/task/boost/task/bounded_smart_queue.hpp (original)
+++ sandbox/task/boost/task/bounded_smart_queue.hpp 2010-01-23 07:25:17 EST (Sat, 23 Jan 2010)
@@ -25,6 +25,7 @@
#include <boost/task/detail/meta.hpp>
#include <boost/task/detail/smart.hpp>
#include <boost/task/exceptions.hpp>
+#include <boost/task/fast_semaphore.hpp>
#include <boost/task/watermark.hpp>
#include <boost/config/abi_prefix.hpp>
@@ -88,12 +89,12 @@
queue_type queue_;
queue_index & idx_;
mutable shared_mutex mtx_;
- condition not_empty_cond_;
condition not_full_cond_;
Enq enq_op_;
Deq deq_op_;
std::size_t hwm_;
std::size_t lwm_;
+ fast_semaphore & fsem_;
bool active_() const
{ return ACTIVE == state_.load(); }
@@ -110,24 +111,6 @@
std::size_t size_() const
{ return queue_.size(); }
- void upper_bound_( std::size_t hwm)
- {
- if ( lwm_ > hwm )
- throw invalid_watermark();
- std::size_t tmp( hwm_);
- hwm_ = hwm;
- if ( hwm_ > tmp) not_full_cond_.notify_one();
- }
-
- void lower_bound_( std::size_t lwm)
- {
- if ( lwm > hwm_ )
- throw invalid_watermark();
- std::size_t tmp( lwm_);
- lwm_ = lwm;
- if ( lwm_ > tmp) not_full_cond_.notify_one();
- }
-
void put_(
value_type const& va,
unique_lock< shared_mutex > & lk)
@@ -143,7 +126,7 @@
if ( ! active_() )
throw task_rejected("queue is not active");
enq_op_( idx_, va);
- not_empty_cond_.notify_one();
+ fsem_.post();
}
template< typename TimeDuration >
@@ -165,81 +148,7 @@
if ( ! active_() )
throw task_rejected("queue is not active");
enq_op_( idx_, va);
- not_empty_cond_.notify_one();
- }
-
- bool take_(
- callable & ca,
- unique_lock< shared_mutex > & lk)
- {
- bool empty = empty_();
- if ( ! active_() && empty)
- return false;
- if ( empty)
- {
- try
- {
- not_empty_cond_.wait(
- lk,
- bind(
- & bounded_smart_queue::consumers_activate_,
- this) );
- }
- catch ( thread_interrupted const&)
- { return false; }
- }
- if ( ! active_() && empty_() )
- return false;
- deq_op_( idx_, ca);
- if ( size_() <= lwm_)
- {
- if ( lwm_ == hwm_)
- not_full_cond_.notify_one();
- else
- // more than one producer could be waiting
- // for submiting an action object
- not_full_cond_.notify_all();
- }
- return ! ca.empty();
- }
-
- template< typename TimeDuration >
- bool take_(
- callable & ca,
- TimeDuration const& rel_time,
- unique_lock< shared_mutex > & lk)
- {
- bool empty = empty_();
- if ( ! active_() && empty)
- return false;
- if ( empty)
- {
- try
- {
- if ( ! not_empty_cond_.timed_wait(
- lk,
- rel_time,
- bind(
- & bounded_smart_queue::consumers_activate_,
- this) ) )
- return false;
- }
- catch ( thread_interrupted const&)
- { return false; }
- }
- if ( ! active_() && empty_() )
- return false;
- deq_op_( idx_, ca);
- if ( size_() <= lwm_)
- {
- if ( lwm_ == hwm_)
- not_full_cond_.notify_one();
- else
- // more than one producer could be waiting
- // in order to submit an task
- not_full_cond_.notify_all();
- }
- return ! ca.empty();
+ fsem_.post();
}
bool try_take_( callable & ca)
@@ -263,23 +172,21 @@
bool producers_activate_() const
{ return ! active_() || ! full_(); }
- bool consumers_activate_() const
- { return ! active_() || ! empty_(); }
-
public:
bounded_smart_queue(
+ fast_semaphore & fsem,
high_watermark const& hwm,
low_watermark const& lwm) :
state_( ACTIVE),
queue_(),
idx_( queue_.get< 0 >() ),
mtx_(),
- not_empty_cond_(),
not_full_cond_(),
enq_op_(),
deq_op_(),
hwm_( hwm),
- lwm_( lwm)
+ lwm_( lwm),
+ fsem_( fsem)
{
if ( lwm_ > hwm_ )
throw invalid_watermark();
@@ -289,7 +196,11 @@
{ return active_(); }
void deactivate()
- { deactivate_(); }
+ {
+ unique_lock< shared_mutex > lk( mtx_);
+ deactivate_();
+ not_full_cond_.notify_all();
+ }
bool empty() const
{
@@ -303,24 +214,12 @@
return hwm_;
}
- void upper_bound( std::size_t hwm)
- {
- unique_lock< shared_mutex > lk( mtx_);
- upper_bound_( hwm);
- }
-
std::size_t lower_bound() const
{
shared_lock< shared_mutex > lk( mtx_);
return lwm_;
}
- void lower_bound( std::size_t lwm)
- {
- unique_lock< shared_mutex > lk( mtx_);
- lower_bound_( lwm);
- }
-
void put( value_type const& va)
{
unique_lock< shared_mutex > lk( mtx_);
@@ -336,21 +235,6 @@
put_( va, rel_time, lk);
}
- bool take( callable & ca)
- {
- unique_lock< shared_mutex > lk( mtx_);
- return take_( ca, lk);
- }
-
- template< typename TimeDuration >
- bool take(
- callable & ca,
- TimeDuration const& rel_time)
- {
- unique_lock< shared_mutex > lk( mtx_);
- return take_( ca, rel_time, lk);
- }
-
bool try_take( callable & ca)
{
unique_lock< shared_mutex > lk( mtx_);
Deleted: sandbox/task/boost/task/detail/guard.hpp
==============================================================================
--- sandbox/task/boost/task/detail/guard.hpp 2010-01-23 07:25:17 EST (Sat, 23 Jan 2010)
+++ (empty file)
@@ -1,46 +0,0 @@
-
-// Copyright Oliver Kowalke 2009.
-// Distributed under the Boost Software License, Version 1.0.
-// (See accompanying file LICENSE_1_0.txt or copy at
-// http://www.boost.org/LICENSE_1_0.txt)
-
-#ifndef BOOST_TASKS_DETAIL_GUARD_H
-#define BOOST_TASKS_DETAIL_GUARD_H
-
-#include <boost/atomic.hpp>
-#include <boost/utility.hpp>
-
-#include <boost/task/detail/config.hpp>
-
-#include <boost/config/abi_prefix.hpp>
-
-# if defined(BOOST_MSVC)
-# pragma warning(push)
-# pragma warning(disable:4251 4275)
-# endif
-
-namespace boost {
-namespace tasks {
-namespace detail {
-
-class BOOST_TASKS_DECL guard : private noncopyable
-{
-private:
- atomic< unsigned int > & active_worker_;
-
-public:
- guard( atomic< unsigned int > & active_worker);
-
- ~guard();
-};
-
-}}}
-
-# if defined(BOOST_MSVC)
-# pragma warning(pop)
-# endif
-
-#include <boost/config/abi_suffix.hpp>
-
-#endif // BOOST_TASKS_DETAIL_GUARD_H
-
Modified: sandbox/task/boost/task/detail/pool_base.hpp
==============================================================================
--- sandbox/task/boost/task/detail/pool_base.hpp (original)
+++ sandbox/task/boost/task/detail/pool_base.hpp 2010-01-23 07:25:17 EST (Sat, 23 Jan 2010)
@@ -25,9 +25,9 @@
#include <boost/task/detail/worker_group.hpp>
#include <boost/task/detail/worker.hpp>
#include <boost/task/exceptions.hpp>
+#include <boost/task/fast_semaphore.hpp>
#include <boost/task/handle.hpp>
#include <boost/task/poolsize.hpp>
-#include <boost/task/scanns.hpp>
#include <boost/task/spin/future.hpp>
#include <boost/task/stacksize.hpp>
#include <boost/task/task.hpp>
@@ -62,12 +62,12 @@
DEACTIVE
};
+ fast_semaphore fsem_;
atomic< unsigned int > use_count_;
worker_group wg_;
shared_mutex mtx_wg_;
atomic< state > state_;
queue_type queue_;
- atomic< unsigned int > idle_worker_;
atomic< bool > shtdwn_;
atomic< bool > shtdwn_now_;
@@ -84,16 +84,12 @@
void create_worker_(
poolsize const& psize,
- posix_time::time_duration const& asleep,
- scanns const& max_scns,
stacksize const& stack_size)
{
wg_.insert(
worker(
* this,
psize,
- asleep,
- max_scns,
stack_size,
boost::bind(
& pool_base::worker_entry_,
@@ -109,8 +105,6 @@
void create_worker_(
poolsize const& psize,
- posix_time::time_duration const& asleep,
- scanns const& max_scns,
stacksize const& stack_size,
std::size_t n)
{
@@ -118,8 +112,6 @@
worker(
* this,
psize,
- asleep,
- max_scns,
stack_size,
boost::bind(
& pool_base::worker_entry_,
@@ -140,93 +132,76 @@
public:
explicit pool_base(
poolsize const& psize,
- posix_time::time_duration const& asleep,
- scanns const& max_scns,
stacksize const& stack_size) :
+ fsem_( 0),
use_count_( 0),
wg_(),
mtx_wg_(),
state_( ACTIVE),
- queue_(),
- idle_worker_( 0),
+ queue_( fsem_),
shtdwn_( false),
shtdwn_now_( false)
{
- if ( asleep.is_special() || asleep.is_negative() )
- throw invalid_timeduration();
lock_guard< shared_mutex > lk( mtx_wg_);
for ( std::size_t i( 0); i < psize; ++i)
- create_worker_( psize, asleep, max_scns, stack_size);
+ create_worker_( psize, stack_size);
}
explicit pool_base(
poolsize const& psize,
high_watermark const& hwm,
low_watermark const& lwm,
- posix_time::time_duration const& asleep,
- scanns const& max_scns,
stacksize const& stack_size) :
+ fsem_( 0),
use_count_( 0),
wg_(),
mtx_wg_(),
state_( ACTIVE),
- queue_( hwm, lwm),
- idle_worker_( 0),
+ queue_( fsem_, hwm, lwm),
shtdwn_( false),
shtdwn_now_( false)
{
- if ( asleep.is_special() || asleep.is_negative() )
- throw invalid_timeduration();
lock_guard< shared_mutex > lk( mtx_wg_);
for ( std::size_t i( 0); i < psize; ++i)
- create_worker_( psize, asleep, max_scns, stack_size);
+ create_worker_( psize, stack_size);
}
# if defined(BOOST_HAS_PROCESSOR_BINDINGS)
- explicit pool_base(
- posix_time::time_duration const& asleep,
- scanns const& max_scns,
- stacksize const& stack_size) :
+ explicit pool_base( stacksize const& stack_size) :
+ fsem_( 0),
use_count_( 0),
wg_(),
mtx_wg_(),
state_( ACTIVE),
- queue_(),
- idle_worker_( 0),
+ queue_( fsem_),
shtdwn_( false),
shtdwn_now_( false)
{
- if ( asleep.is_special() || asleep.is_negative() )
- throw invalid_timeduration();
poolsize psize( thread::hardware_concurrency() );
BOOST_ASSERT( psize > 0);
lock_guard< shared_mutex > lk( mtx_wg_);
for ( std::size_t i( 0); i < psize; ++i)
- create_worker_( psize, asleep, max_scns, stack_size, i);
+ create_worker_( psize, stack_size, i);
}
explicit pool_base(
high_watermark const& hwm,
low_watermark const& lwm,
- posix_time::time_duration const& asleep,
- scanns const& max_scns,
stacksize const& stack_size) :
+ fsem_( 0),
use_count_( 0),
wg_(),
mtx_wg_(),
state_( ACTIVE),
- queue_( hwm, lwm),
- idle_worker_( 0),
+ queue_( fsem_, hwm, lwm),
shtdwn_( false),
shtdwn_now_( false)
{
- if ( asleep.is_special() || asleep.is_negative() )
- throw invalid_timeduration();
poolsize psize( thread::hardware_concurrency() );
BOOST_ASSERT( psize > 0);
lock_guard< shared_mutex > lk( mtx_wg_);
for ( std::size_t i( 0); i < psize; ++i)
- create_worker_( psize, asleep, max_scns, stack_size, i);
+ create_worker_( psize, stack_size, i);
}
# endif
@@ -246,6 +221,7 @@
if ( deactivated_() || ! deactivate_() ) return;
queue_.deactivate();
+ fsem_.deactivate();
shared_lock< shared_mutex > lk( mtx_wg_);
shtdwn_.store( true);
wg_.join_all();
@@ -256,6 +232,7 @@
if ( deactivated_() || ! deactivate_() ) return;
queue_.deactivate();
+ fsem_.deactivate();
shared_lock< shared_mutex > lk( mtx_wg_);
shtdwn_now_.store( true);
wg_.interrupt_all();
Modified: sandbox/task/boost/task/detail/worker.hpp
==============================================================================
--- sandbox/task/boost/task/detail/worker.hpp (original)
+++ sandbox/task/boost/task/detail/worker.hpp 2010-01-23 07:25:17 EST (Sat, 23 Jan 2010)
@@ -20,10 +20,8 @@
#include <boost/task/callable.hpp>
#include <boost/task/detail/config.hpp>
-#include <boost/task/detail/guard.hpp>
#include <boost/task/detail/wsq.hpp>
#include <boost/task/poolsize.hpp>
-#include <boost/task/scanns.hpp>
#include <boost/task/stacksize.hpp>
#include <boost/config/abi_prefix.hpp>
@@ -91,9 +89,6 @@
wsq wsq_;
bool shtdwn_;
- posix_time::time_duration asleep_;
- scanns max_scns_;
- std::size_t scns_;
std::size_t stack_size_;
random_idx rnd_idx_;
@@ -108,11 +103,6 @@
BOOST_ASSERT( ca.empty() );
}
- bool take_global_callable_(
- callable & ca,
- posix_time::time_duration const& asleep)
- { return pool_.queue_.take( ca, asleep); }
-
bool try_take_global_callable_( callable & ca)
{ return pool_.queue_.try_take( ca); }
@@ -143,37 +133,12 @@
try_take_global_callable_( ca) )
{
execute_( ca);
- scns_ = 0;
if ( 0 < sched_->ready() ) return;
}
else
{
- guard grd( pool_.idle_worker_);
- ++scns_;
- if ( scns_ >= max_scns_)
- {
- if ( pool_.size_() > pool_.idle_worker_)
- {
- if ( take_global_callable_( ca, asleep_) )
- {
- execute_( ca);
- if ( 0 < sched_->ready() ) return;
- }
- }
- else if ( 0 == sched_->ready() )
- {
- try
- { this_thread::sleep( asleep_); }
- catch ( thread_interrupted const&)
- { return; }
- }
- scns_ = 0;
- }
- else
- {
- if ( 0 < sched_->ready() ) return;
- this_thread::yield();
- }
+ if ( 0 < sched_->ready() ) return;
+ pool_.fsem_.wait();
}
}
}
@@ -201,18 +166,13 @@
worker_object(
Pool & pool,
poolsize const& psize,
- posix_time::time_duration const& asleep,
- scanns const& max_scns,
stacksize const& stack_size,
function< void() > const& fn) :
pool_( pool),
thrd_( new thread( fn) ),
sched_(),
- wsq_(),
+ wsq_( pool_.fsem_),
shtdwn_( false),
- asleep_( asleep),
- max_scns_( max_scns),
- scns_( 0),
stack_size_( stack_size),
rnd_idx_( psize)
{ BOOST_ASSERT( ! fn.empty() ); }
@@ -278,16 +238,12 @@
worker(
Pool & pool,
poolsize const& psize,
- posix_time::time_duration const& asleep,
- scanns const& max_scns,
stacksize const& stack_size,
function< void() > const& fn) :
impl_(
new worker_object< Pool, worker >(
pool,
psize,
- asleep,
- max_scns,
stack_size,
fn) )
{}
Modified: sandbox/task/boost/task/detail/wsq.hpp
==============================================================================
--- sandbox/task/boost/task/detail/wsq.hpp (original)
+++ sandbox/task/boost/task/detail/wsq.hpp 2010-01-23 07:25:17 EST (Sat, 23 Jan 2010)
@@ -14,6 +14,7 @@
#include <boost/task/callable.hpp>
#include <boost/task/detail/config.hpp>
+#include <boost/task/fast_semaphore.hpp>
#include <boost/config/abi_prefix.hpp>
@@ -36,9 +37,10 @@
atomic< unsigned int > head_idx_;
atomic< unsigned int > tail_idx_;
recursive_mutex mtx_;
+ fast_semaphore & fsem_;
public:
- wsq();
+ wsq( fast_semaphore & fsem);
bool empty() const;
Modified: sandbox/task/boost/task/exceptions.hpp
==============================================================================
--- sandbox/task/boost/task/exceptions.hpp (original)
+++ sandbox/task/boost/task/exceptions.hpp 2010-01-23 07:25:17 EST (Sat, 23 Jan 2010)
@@ -23,14 +23,6 @@
{}
};
-class invalid_scanns : public std::invalid_argument
-{
-public:
- invalid_scanns() :
- std::invalid_argument("scanns must be greater than or equal to zero")
- {}
-};
-
class invalid_stacksize : public std::invalid_argument
{
public:
@@ -39,14 +31,6 @@
{}
};
-class invalid_timeduration : public std::invalid_argument
-{
-public:
- invalid_timeduration() :
- std::invalid_argument("argument asleep is not valid")
- {}
-};
-
class invalid_watermark : public std::invalid_argument
{
public:
Added: sandbox/task/boost/task/fast_semaphore.hpp
==============================================================================
--- (empty file)
+++ sandbox/task/boost/task/fast_semaphore.hpp 2010-01-23 07:25:17 EST (Sat, 23 Jan 2010)
@@ -0,0 +1,48 @@
+
+// Copyright Oliver Kowalke 2009.
+// Distributed under the Boost Software License, Version 1.0.
+// (See accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TASK_FAST_SEMAPHORE_H
+#define BOOST_TASK_FAST_SEMAPHORE_H
+
+#include <boost/task/detail/config.hpp>
+
+#include <boost/atomic.hpp>
+#include <boost/utility.hpp>
+
+#include <boost/task/semaphore.hpp>
+
+#include <boost/config/abi_prefix.hpp>
+
+namespace boost {
+namespace tasks {
+
+class BOOST_TASKS_DECL fast_semaphore : private boost::noncopyable
+{
+private:
+ unsigned int spin_count_;
+ atomic< int > sem_count_;
+ atomic< bool > sem_active_;
+ semaphore sem_;
+
+public:
+ fast_semaphore( int, unsigned int = 0);
+
+ ~fast_semaphore();
+
+ void post( int = 1);
+
+ void wait();
+
+ bool try_wait();
+
+ void deactivate();
+};
+
+}}
+
+#include <boost/config/abi_suffix.hpp>
+
+#endif // BOOST_TASK_FAST_SEMAPHORE_H
Deleted: sandbox/task/boost/task/scanns.hpp
==============================================================================
--- sandbox/task/boost/task/scanns.hpp 2010-01-23 07:25:17 EST (Sat, 23 Jan 2010)
+++ (empty file)
@@ -1,43 +0,0 @@
-
-// Copyright Oliver Kowalke 2009.
-// Distributed under the Boost Software License, Version 1.0.
-// (See accompanying file LICENSE_1_0.txt or copy at
-// http://www.boost.org/LICENSE_1_0.txt)
-
-#ifndef BOOST_TASKS_SCANNS_H
-#define BOOST_TASKS_SCANNS_H
-
-#include <cstddef>
-
-#include <boost/task/detail/config.hpp>
-
-#include <boost/config/abi_prefix.hpp>
-
-# if defined(BOOST_MSVC)
-# pragma warning(push)
-# pragma warning(disable:4251 4275)
-# endif
-
-namespace boost {
-namespace tasks {
-
-class BOOST_TASKS_DECL scanns
-{
-private:
- std::size_t value_;
-
-public:
- explicit scanns( std::size_t value);
-
- operator std::size_t () const;
-};
-
-}}
-
-# if defined(BOOST_MSVC)
-# pragma warning(pop)
-# endif
-
-#include <boost/config/abi_suffix.hpp>
-
-#endif // BOOST_TASKS_SCANNS_H
Added: sandbox/task/boost/task/semaphore.hpp
==============================================================================
--- (empty file)
+++ sandbox/task/boost/task/semaphore.hpp 2010-01-23 07:25:17 EST (Sat, 23 Jan 2010)
@@ -0,0 +1,64 @@
+
+// Copyright Oliver Kowalke 2009.
+// Distributed under the Boost Software License, Version 1.0.
+// (See accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TASK_SEMAPHORE_H
+#define BOOST_TASK_SEMAPHORE_H
+
+#include <boost/task/detail/config.hpp>
+
+extern "C"
+{
+# if defined(BOOST_WINDOWS_API)
+#include <Windows.h>
+# elif defined(BOOST_POSIX_API)
+#include <sys/sem.h>
+# endif
+}
+
+#include <boost/utility.hpp>
+
+#include <boost/task/detail/config.hpp>
+
+#include <boost/config/abi_prefix.hpp>
+
+# if defined(BOOST_MSVC)
+# pragma warning(push)
+# pragma warning(disable:4251 4275)
+# endif
+
+namespace boost {
+namespace tasks {
+
+class BOOST_TASKS_DECL semaphore : private boost::noncopyable
+{
+private:
+# if defined(BOOST_WINDOWS_API)
+ HANDLE handle_;
+# elif defined(BOOST_POSIX_API)
+ int handle_;
+# endif
+public:
+ semaphore( int);
+
+ ~semaphore();
+
+ void post( int = 1);
+
+ void wait();
+
+ bool try_wait();
+};
+
+}}
+
+# if defined(BOOST_MSVC)
+# pragma warning(pop)
+# endif
+
+
+#include <boost/config/abi_suffix.hpp>
+
+#endif // BOOST_TASK_SEMAPHORE_H
Modified: sandbox/task/boost/task/spin/bounded_channel.hpp
==============================================================================
--- sandbox/task/boost/task/spin/bounded_channel.hpp (original)
+++ sandbox/task/boost/task/spin/bounded_channel.hpp 2010-01-23 07:25:17 EST (Sat, 23 Jan 2010)
@@ -143,27 +143,9 @@
use_count_( 0)
{}
- void upper_bound_( std::size_t hwm)
- {
- if ( hwm < lwm_)
- throw invalid_watermark();
- unsigned int tmp( hwm_);
- hwm_ = hwm;
- if ( hwm_ > tmp) not_full_cond_.notify_one();
- }
-
std::size_t upper_bound() const
{ return hwm_; }
- void lower_bound_( std::size_t lwm)
- {
- if ( lwm > hwm_ )
- throw invalid_watermark();
- unsigned int tmp( lwm_);
- lwm_ = lwm;
- if ( lwm_ > tmp) not_full_cond_.notify_one();
- }
-
std::size_t lower_bound() const
{ return lwm_; }
@@ -171,7 +153,13 @@
{ return active_(); }
void deactivate()
- { deactivate_(); }
+ {
+ mutex::scoped_lock head_lk( head_mtx_);
+ mutex::scoped_lock tail_lk( tail_mtx_);
+ not_empty_cond_.notify_all();
+ not_full_cond_.notify_all();
+ deactivate_();
+ }
bool empty() const
{
Modified: sandbox/task/boost/task/spin/unbounded_channel.hpp
==============================================================================
--- sandbox/task/boost/task/spin/unbounded_channel.hpp (original)
+++ sandbox/task/boost/task/spin/unbounded_channel.hpp 2010-01-23 07:25:17 EST (Sat, 23 Jan 2010)
@@ -114,7 +114,11 @@
{ return active_(); }
void deactivate()
- { deactivate_(); }
+ {
+ mutex::scoped_lock lk( head_mtx_);
+ deactivate_();
+ not_empty_cond_.notify_all();
+ }
bool empty() const
{
Modified: sandbox/task/boost/task/static_pool.hpp
==============================================================================
--- sandbox/task/boost/task/static_pool.hpp (original)
+++ sandbox/task/boost/task/static_pool.hpp 2010-01-23 07:25:17 EST (Sat, 23 Jan 2010)
@@ -10,7 +10,7 @@
#include <cstddef>
#include <boost/config.hpp>
-#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/fiber/fiber.hpp>
#include <boost/fiber/round_robin.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/move/move.hpp>
@@ -21,7 +21,6 @@
#include <boost/task/exceptions.hpp>
#include <boost/task/handle.hpp>
#include <boost/task/poolsize.hpp>
-#include <boost/task/scanns.hpp>
#include <boost/task/stacksize.hpp>
#include <boost/task/task.hpp>
#include <boost/task/watermark.hpp>
@@ -62,39 +61,31 @@
explicit static_pool(
poolsize const& psize,
- posix_time::time_duration const& asleep = posix_time::microseconds( 10),
- scanns const& max_scns = scanns( 20),
- stacksize const& stack_size = stacksize( 64000) ) :
- pool_( new base_type( psize, asleep, max_scns, stack_size) )
+ stacksize const& stack_size = stacksize( fiber::default_stacksize) ) :
+ pool_( new base_type( psize, stack_size) )
{}
explicit static_pool(
poolsize const& psize,
high_watermark const& hwm,
low_watermark const& lwm,
- posix_time::time_duration const& asleep = posix_time::microseconds( 100),
- scanns const& max_scns = scanns( 20),
- stacksize const& stack_size = stacksize( 64000) ) :
- pool_( new base_type( psize, hwm, lwm, asleep, max_scns, stack_size) )
+ stacksize const& stack_size = stacksize( fiber::default_stacksize) ) :
+ pool_( new base_type( psize, hwm, lwm, stack_size) )
{}
# if defined(BOOST_HAS_PROCESSOR_BINDINGS)
explicit static_pool(
tag_bind_to_processors,
- posix_time::time_duration const& asleep = posix_time::microseconds( 10),
- scanns const& max_scns = scanns( 20),
- stacksize const& stack_size = stacksize( 64000) ) :
- pool_( new base_type( asleep, max_scns, stack_size) )
+ stacksize const& stack_size = stacksize( fiber::default_stacksize) ) :
+ pool_( new base_type( stack_size) )
{}
explicit static_pool(
tag_bind_to_processors,
high_watermark const& hwm,
low_watermark const& lwm,
- posix_time::time_duration const& asleep = posix_time::microseconds( 100),
- scanns const& max_scns = scanns( 20),
- stacksize const& stack_size = stacksize( 64000) ) :
- pool_( new base_type( hwm, lwm, asleep, max_scns, stack_size) )
+ stacksize const& stack_size = stacksize( fiber::default_stacksize) ) :
+ pool_( new base_type( hwm, lwm, stack_size) )
{}
static tag_bind_to_processors bind_to_processors()
Modified: sandbox/task/boost/task/unbounded_fifo.hpp
==============================================================================
--- sandbox/task/boost/task/unbounded_fifo.hpp (original)
+++ sandbox/task/boost/task/unbounded_fifo.hpp 2010-01-23 07:25:17 EST (Sat, 23 Jan 2010)
@@ -20,6 +20,7 @@
#include <boost/task/callable.hpp>
#include <boost/task/detail/meta.hpp>
#include <boost/task/exceptions.hpp>
+#include <boost/task/fast_semaphore.hpp>
#include <boost/config/abi_prefix.hpp>
@@ -52,7 +53,7 @@
mutable mutex head_mtx_;
node::sptr_t tail_;
mutable mutex tail_mtx_;
- condition not_empty_cond_;
+ fast_semaphore & fsem_;
bool active_() const
{ return ACTIVE == state_.load(); }
@@ -78,20 +79,23 @@
}
public:
- unbounded_fifo() :
+ unbounded_fifo( fast_semaphore & fsem) :
state_( ACTIVE),
head_( new node),
head_mtx_(),
tail_( head_),
tail_mtx_(),
- not_empty_cond_()
+ fsem_( fsem)
{}
bool active() const
{ return active_(); }
void deactivate()
- { deactivate_(); }
+ {
+ unique_lock< mutex > lk( head_mtx_);
+ deactivate_();
+ }
bool empty() const
{
@@ -104,61 +108,13 @@
node::sptr_t new_node( new node);
{
unique_lock< mutex > lk( tail_mtx_);
+ if ( ! active_() )
+ throw task_rejected("queue is not active");
tail_->va = va;
tail_->next = new_node;
tail_ = new_node;
}
- not_empty_cond_.notify_one();
- }
-
- bool take( value_type & va)
- {
- unique_lock< mutex > lk( head_mtx_);
- bool empty = empty_();
- if ( ! active_() && empty)
- return false;
- if ( empty)
- {
- try
- {
- while ( active_() && empty_() )
- not_empty_cond_.wait( lk);
- }
- catch ( thread_interrupted const&)
- { return false; }
- }
- if ( ! active_() && empty_() )
- return false;
- va.swap( head_->va);
- pop_head_();
- return ! va.empty();
- }
-
- template< typename TimeDuration >
- bool take(
- value_type & va,
- TimeDuration const& rel_time)
- {
- unique_lock< mutex > lk( head_mtx_);
- bool empty = empty_();
- if ( ! active_() && empty)
- return false;
- if ( empty)
- {
- try
- {
- while ( active_() && empty_() )
- if ( ! not_empty_cond_.timed_wait( lk, rel_time) )
- return false;
- }
- catch ( thread_interrupted const&)
- { return false; }
- }
- if ( ! active_() && empty_() )
- return false;
- va.swap( head_->va);
- pop_head_();
- return ! va.empty();
+ fsem_.post();
}
bool try_take( value_type & va)
Modified: sandbox/task/boost/task/unbounded_prio_queue.hpp
==============================================================================
--- sandbox/task/boost/task/unbounded_prio_queue.hpp (original)
+++ sandbox/task/boost/task/unbounded_prio_queue.hpp 2010-01-23 07:25:17 EST (Sat, 23 Jan 2010)
@@ -23,6 +23,7 @@
#include <boost/task/callable.hpp>
#include <boost/task/detail/meta.hpp>
#include <boost/task/exceptions.hpp>
+#include <boost/task/fast_semaphore.hpp>
#include <boost/config/abi_prefix.hpp>
@@ -68,7 +69,7 @@
value_type,
std::deque< value_type >,
compare
- > queue_type;
+ > queue_type;
enum state
{
@@ -79,7 +80,7 @@
atomic< state > state_;
queue_type queue_;
mutable shared_mutex mtx_;
- condition not_empty_cond_;
+ fast_semaphore & fsem_;
bool active_() const
{ return ACTIVE == state_.load(); }
@@ -95,67 +96,7 @@
if ( ! active_() )
throw task_rejected("queue is not active");
queue_.push( va);
- not_empty_cond_.notify_one();
- }
-
- bool take_(
- callable & ca,
- unique_lock< shared_mutex > & lk)
- {
- bool empty = empty_();
- if ( ! active_() && empty)
- return false;
- if ( empty)
- {
- try
- {
- not_empty_cond_.wait(
- lk,
- bind(
- & unbounded_prio_queue::consumers_activate_,
- this) );
- }
- catch ( thread_interrupted const&)
- { return false; }
- }
- if ( ! active_() && empty_() )
- return false;
- callable tmp( queue_.top().ca);
- queue_.pop();
- ca.swap( tmp);
- return ! ca.empty();
- }
-
- template< typename TimeDuration >
- bool take_(
- callable & ca,
- TimeDuration const& rel_time,
- unique_lock< shared_mutex > & lk)
- {
- bool empty = empty_();
- if ( ! active_() && empty)
- return false;
- if ( empty)
- {
- try
- {
- if ( ! not_empty_cond_.timed_wait(
- lk,
- rel_time,
- bind(
- & unbounded_prio_queue::consumers_activate_,
- this) ) )
- return false;
- }
- catch ( thread_interrupted const&)
- { return false; }
- }
- if ( ! active_() && empty_() )
- return false;
- callable tmp( queue_.top().ca);
- queue_.pop();
- ca.swap( tmp);
- return ! ca.empty();
+ fsem_.post();
}
bool try_take_( callable & ca)
@@ -168,22 +109,22 @@
return ! ca.empty();
}
- bool consumers_activate_() const
- { return ! active_() || ! empty_(); }
-
public:
- unbounded_prio_queue() :
+ unbounded_prio_queue( fast_semaphore & fsem) :
state_( ACTIVE),
queue_(),
mtx_(),
- not_empty_cond_()
+ fsem_( fsem)
{}
bool active() const
{ return active_(); }
void deactivate()
- { deactivate_(); }
+ {
+ unique_lock< shared_mutex > lk( mtx_);
+ deactivate_();
+ }
bool empty() const
{
@@ -197,21 +138,6 @@
put_( va);
}
- bool take( callable & ca)
- {
- unique_lock< shared_mutex > lk( mtx_);
- return take_( ca, lk);
- }
-
- template< typename TimeDuration >
- bool take(
- callable & ca,
- TimeDuration const& rel_time)
- {
- unique_lock< shared_mutex > lk( mtx_);
- return take_( ca, rel_time, lk);
- }
-
bool try_take( callable & ca)
{
unique_lock< shared_mutex > lk( mtx_);
Modified: sandbox/task/boost/task/unbounded_smart_queue.hpp
==============================================================================
--- sandbox/task/boost/task/unbounded_smart_queue.hpp (original)
+++ sandbox/task/boost/task/unbounded_smart_queue.hpp 2010-01-23 07:25:17 EST (Sat, 23 Jan 2010)
@@ -25,6 +25,7 @@
#include <boost/task/detail/meta.hpp>
#include <boost/task/detail/smart.hpp>
#include <boost/task/exceptions.hpp>
+#include <boost/task/fast_semaphore.hpp>
#include <boost/config/abi_prefix.hpp>
@@ -87,9 +88,9 @@
queue_type queue_;
queue_index & idx_;
mutable shared_mutex mtx_;
- condition not_empty_cond_;
Enq enq_op_;
Deq deq_op_;
+ fast_semaphore & fsem_;
bool active_() const
{ return ACTIVE == state_.load(); }
@@ -105,63 +106,7 @@
if ( ! active_() )
throw task_rejected("queue is not active");
enq_op_( idx_, va);
- not_empty_cond_.notify_one();
- }
-
- bool take_(
- callable & ca,
- unique_lock< shared_mutex > & lk)
- {
- bool empty = empty_();
- if ( ! active_() && empty)
- return false;
- if ( empty)
- {
- try
- {
- not_empty_cond_.wait(
- lk,
- bind(
- & unbounded_smart_queue::consumers_activate_,
- this) );
- }
- catch ( thread_interrupted const&)
- { return false; }
- }
- if ( ! active_() && empty_() )
- return false;
- deq_op_( idx_, ca);
- return ! ca.empty();
- }
-
- template< typename TimeDuration >
- bool take_(
- callable & ca,
- TimeDuration const& rel_time,
- unique_lock< shared_mutex > & lk)
- {
- bool empty = empty_();
- if ( ! active_() && empty)
- return false;
- if ( empty)
- {
- try
- {
- if ( ! not_empty_cond_.timed_wait(
- lk,
- rel_time,
- bind(
- & unbounded_smart_queue::consumers_activate_,
- this) ) )
- return false;
- }
- catch ( thread_interrupted const&)
- { return false; }
- }
- if ( ! active_() && empty_() )
- return false;
- deq_op_( idx_, ca);
- return ! ca.empty();
+ fsem_.post();
}
bool try_take_( callable & ca)
@@ -172,25 +117,25 @@
return ! ca.empty();
}
- bool consumers_activate_() const
- { return ! active_() || ! empty_(); }
-
public:
- unbounded_smart_queue() :
+ unbounded_smart_queue( fast_semaphore & fsem) :
state_( ACTIVE),
queue_(),
idx_( queue_.get< 0 >() ),
mtx_(),
- not_empty_cond_(),
enq_op_(),
- deq_op_()
+ deq_op_(),
+ fsem_( fsem)
{}
bool active() const
{ return active_(); }
void deactivate()
- { deactivate_(); }
+ {
+ unique_lock< shared_mutex > lk( mtx_);
+ deactivate_();
+ }
bool empty() const
{
@@ -204,21 +149,6 @@
put_( va);
}
- bool take( callable & ca)
- {
- unique_lock< shared_mutex > lk( mtx_);
- return take_( ca, lk);
- }
-
- template< typename TimeDuration >
- bool take(
- callable & ca,
- TimeDuration const& rel_time)
- {
- unique_lock< shared_mutex > lk( mtx_);
- return take_( ca, rel_time, lk);
- }
-
bool try_take( callable & ca)
{
unique_lock< shared_mutex > lk( mtx_);
Modified: sandbox/task/libs/task/build/Jamfile.v2
==============================================================================
--- sandbox/task/libs/task/build/Jamfile.v2 (original)
+++ sandbox/task/libs/task/build/Jamfile.v2 2010-01-23 07:25:17 EST (Sat, 23 Jan 2010)
@@ -38,9 +38,9 @@
: ## win32 sources ##
callable.cpp
context.cpp
- detail/guard.cpp
+ fast_semaphore.cpp
poolsize.cpp
- scanns.cpp
+ semaphore_windows.cpp
spin/auto_reset_event.cpp
spin/barrier.cpp
spin/condition.cpp
@@ -60,9 +60,9 @@
: ## posix sources ##
callable.cpp
context.cpp
- detail/guard.cpp
+ fast_semaphore.cpp
poolsize.cpp
- scanns.cpp
+ semaphore_posix.cpp
spin/auto_reset_event.cpp
spin/barrier.cpp
spin/condition.cpp
Modified: sandbox/task/libs/task/examples/bind_to_processors.cpp
==============================================================================
--- sandbox/task/libs/task/examples/bind_to_processors.cpp (original)
+++ sandbox/task/libs/task/examples/bind_to_processors.cpp 2010-01-23 07:25:17 EST (Sat, 23 Jan 2010)
@@ -65,17 +65,34 @@
std::vector< tsk::handle< int > > results;
results.reserve( 10);
- for ( int i = 0; i < 10; ++i)
- results.push_back(
- tsk::async(
- tsk::make_task(
- parallel_fib,
- i,
- 5),
- pool) );
+ pt::ptime start = pt::microsec_clock::universal_time();
+
+// for ( int i = 0; i < 15; ++i)
+// results.push_back(
+// tsk::async(
+// tsk::make_task(
+// parallel_fib,
+// 10,
+// 5),
+// pool) );
+//
+
+ results.push_back(
+ tsk::async(
+ tsk::make_task(
+ parallel_fib,
+ 25,
+ 5),
+ pool) );
tsk::waitfor_all( results.begin(), results.end() );
+ pt::ptime stop = pt::microsec_clock::universal_time();
+
+ pt::time_duration elapsed = stop - start;
+ std::cout << "total microseconds == " << elapsed.total_microseconds() << std::endl;
+ std::cout << "total milliseconds == " << elapsed.total_milliseconds() << std::endl;
+
int k = 0;
std::vector< tsk::handle< int > >::iterator e( results.end() );
for (
Deleted: sandbox/task/libs/task/src/detail/guard.cpp
==============================================================================
--- sandbox/task/libs/task/src/detail/guard.cpp 2010-01-23 07:25:17 EST (Sat, 23 Jan 2010)
+++ (empty file)
@@ -1,28 +0,0 @@
-
-// Copyright Oliver Kowalke 2009.
-// Distributed under the Boost Software License, Version 1.0.
-// (See accompanying file LICENSE_1_0.txt or copy at
-// http://www.boost.org/LICENSE_1_0.txt)
-
-#include "boost/task/detail/guard.hpp"
-
-#include <boost/assert.hpp>
-
-namespace boost {
-namespace tasks {
-namespace detail {
-
-guard::guard( atomic< unsigned int > & active_worker) :
- active_worker_( active_worker)
-{
- BOOST_ASSERT( 0 <= active_worker_.load() );
- active_worker_.fetch_add( 1);
-}
-
-guard::~guard()
-{
- active_worker_.fetch_sub( 1);
- BOOST_ASSERT( 0 <= active_worker_.load() );
-}
-
-}}}
Modified: sandbox/task/libs/task/src/detail/wsq.cpp
==============================================================================
--- sandbox/task/libs/task/src/detail/wsq.cpp (original)
+++ sandbox/task/libs/task/src/detail/wsq.cpp 2010-01-23 07:25:17 EST (Sat, 23 Jan 2010)
@@ -12,14 +12,15 @@
namespace tasks {
namespace detail {
-wsq::wsq() :
-initial_size_( 32),
-array_( new callable[ initial_size_]),
-capacity_( initial_size_),
-mask_( initial_size_ - 1),
-head_idx_( 0),
-tail_idx_( 0),
-mtx_()
+wsq::wsq( fast_semaphore & fsem) :
+ initial_size_( 32),
+ array_( new callable[ initial_size_]),
+ capacity_( initial_size_),
+ mask_( initial_size_ - 1),
+ head_idx_( 0),
+ tail_idx_( 0),
+ mtx_(),
+ fsem_( fsem)
{}
bool
@@ -60,6 +61,7 @@
array_[tail & mask_] = ca;
tail_idx_.fetch_add( 1);
}
+ fsem_.post();
}
bool
Added: sandbox/task/libs/task/src/fast_semaphore.cpp
==============================================================================
--- (empty file)
+++ sandbox/task/libs/task/src/fast_semaphore.cpp 2010-01-23 07:25:17 EST (Sat, 23 Jan 2010)
@@ -0,0 +1,74 @@
+
+// Copyright Oliver Kowalke 2009.
+// Distributed under the Boost Software License, Version 1.0.
+// (See accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+#include "boost/task/fast_semaphore.hpp"
+
+#include <limits>
+#include <stdexcept>
+
+namespace boost {
+namespace tasks {
+
+fast_semaphore::fast_semaphore( int sem_count, unsigned int spin_count) :
+ spin_count_( spin_count),
+ sem_count_( sem_count),
+ sem_active_( true),
+ sem_( sem_count)
+{
+ if ( 0 > sem_count_)
+ throw std::invalid_argument("count must not be negative");
+}
+
+fast_semaphore::~fast_semaphore()
+{}
+
+void
+fast_semaphore::post( int count)
+{
+ if ( 0 > count)
+ throw std::invalid_argument("count must not be negative");
+
+ int sem_count = sem_count_.fetch_add( count);
+
+ if ( sem_count < 0)
+ sem_.post( std::min( count, -sem_count) );
+}
+
+void
+fast_semaphore::wait()
+{
+ unsigned int spin_count( spin_count_);
+ while ( 0 < spin_count--)
+ if ( try_wait() ) return;
+
+ if ( 0 <= sem_count_.fetch_sub( 1) ) return;
+
+ if ( ! sem_active_.load() ) return;
+
+ sem_.wait();
+}
+
+bool
+fast_semaphore::try_wait()
+{
+ int sem_count;
+
+ do
+ {
+ sem_count = sem_count_.load();
+ if ( ! sem_active_.load() ) return false;
+ } while ( sem_count > 0 && ! sem_count_.compare_exchange_strong( sem_count, sem_count - 1) );
+
+ return sem_count > 0;
+}
+
+void
+fast_semaphore::deactivate() {
+ sem_active_.store( false);
+ post( std::abs( sem_count_.load() ) );
+}
+
+}}
Deleted: sandbox/task/libs/task/src/scanns.cpp
==============================================================================
--- sandbox/task/libs/task/src/scanns.cpp 2010-01-23 07:25:17 EST (Sat, 23 Jan 2010)
+++ (empty file)
@@ -1,21 +0,0 @@
-
-// Copyright Oliver Kowalke 2009.
-// Distributed under the Boost Software License, Version 1.0.
-// (See accompanying file LICENSE_1_0.txt or copy at
-// http://www.boost.org/LICENSE_1_0.txt)
-
-#include "boost/task/scanns.hpp"
-
-#include <boost/task/exceptions.hpp>
-
-namespace boost {
-namespace tasks {
-
-scanns::scanns( std::size_t value) :
- value_( value)
-{ if ( value < 0) throw invalid_scanns(); }
-
-scanns::operator std::size_t () const
-{ return value_; }
-
-}}
Added: sandbox/task/libs/task/src/semaphore_posix.cpp
==============================================================================
--- (empty file)
+++ sandbox/task/libs/task/src/semaphore_posix.cpp 2010-01-23 07:25:17 EST (Sat, 23 Jan 2010)
@@ -0,0 +1,97 @@
+
+// Copyright Oliver Kowalke 2009.
+// Distributed under the Boost Software License, Version 1.0.
+// (See accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+#include "boost/task/semaphore.hpp"
+
+extern "C" {
+
+#include <sys/stat.h>
+
+}
+
+#include <cerrno>
+#include <stdexcept>
+
+#include <boost/assert.hpp>
+#include <boost/system/error_code.hpp>
+#include <boost/system/system_error.hpp>
+
+namespace {
+
+union semun {
+ int val;
+ semid_ds * buf;
+ ushort * array;
+};
+
+}
+
+namespace boost {
+namespace tasks {
+
+semaphore::semaphore( int sem_count) :
+ handle_( -1)
+{
+ if ( 0 > sem_count)
+ throw std::invalid_argument("count must not be negative");
+
+ semun ctl;
+ ctl.val = sem_count;
+ BOOST_ASSERT( ctl.val == sem_count);
+
+ if ( ( handle_ = ::semget( IPC_PRIVATE, 1, S_IRUSR | S_IWUSR) ) == -1)
+ throw system::system_error( errno, system::system_category);
+
+ if ( ::semctl( handle_, 0, SETVAL, ctl) == -1)
+ throw system::system_error( errno, system::system_category);
+}
+
+semaphore::~semaphore()
+{ ::semctl( handle_, 0, IPC_RMID); }
+
+void
+semaphore::post( int n)
+{
+ if ( 0 >= n)
+ throw std::invalid_argument("invalid post-argument");
+
+ sembuf op;
+
+ op.sem_num = 0;
+ op.sem_op = n;
+ op.sem_flg = 0;
+ BOOST_ASSERT( op.sem_op == n);
+
+ if ( ::semop( handle_, & op, 1) == -1)
+ throw system::system_error( errno, system::system_category);
+}
+
+void
+semaphore::wait()
+{
+ sembuf op;
+
+ op.sem_num = 0;
+ op.sem_op = -1;
+ op.sem_flg = 0;
+
+ if ( ::semop( handle_, & op, 1) == -1)
+ throw system::system_error( errno, system::system_category);
+}
+
+bool
+semaphore::try_wait()
+{
+ sembuf op;
+
+ op.sem_num = 0;
+ op.sem_op = -1;
+ op.sem_flg = IPC_NOWAIT; // Don't wait if we can't lock it now
+
+ return ::semop( handle_, & op, 1) == 0;
+}
+
+}}
Added: sandbox/task/libs/task/src/semaphore_windows.cpp
==============================================================================
--- (empty file)
+++ sandbox/task/libs/task/src/semaphore_windows.cpp 2010-01-23 07:25:17 EST (Sat, 23 Jan 2010)
@@ -0,0 +1,57 @@
+
+// Copyright Oliver Kowalke 2009.
+// Distributed under the Boost Software License, Version 1.0.
+// (See accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+#include "boost/task/semaphore.hpp"
+
+#include <cerrno>
+#include <limits>
+
+#include <boost/system/error_code.hpp>
+#include <boost/system/system_error.hpp>
+
+namespace boost {
+namespace tasks {
+
+semaphore::semaphore( int value) :
+ handle_()
+{
+ if ( ( handle_ = ::CreateSemaphore( 0, value, ( std::numeric_limits< int >::max)(), 0) ) == 0)
+ throw system::system_error( ::GetLastError(), system::system_category);
+}
+
+semaphore::~semaphore()
+{ ::CloseHandle( handle_); }
+
+void
+semaphore::post( int n)
+{
+ if ( ! ::ReleaseSemaphore( handle_, n, 0) )
+ throw system::system_error( ::GetLastError(), system::system_category);
+}
+
+void
+semaphore::wait()
+{
+ if ( ::WaitForSingleObject( handle_, INFINITE) != WAIT_OBJECT_0)
+ throw system::system_error( ::GetLastError(), system::system_category);
+}
+
+bool
+semaphore::try_wait()
+{
+ switch ( ::WaitForSingleObject( handle_, 0) )
+ {
+ case WAIT_OBJECT_0:
+ return true;
+ case WAIT_TIMEOUT:
+ return false;
+ default:
+ throw system::system_error( ::GetLastError(), system::system_category);
+ }
+ return true;
+}
+
+}}
Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk