Boost logo

Boost :

Subject: Re: [boost] threadpool lockfree_channel
From: Vicente Botet Escriba (vicente.botet_at_[hidden])
Date: 2009-03-16 06:40:03


Anthony Williams-4 wrote:
>
> Tim Blechmann <tim_at_[hidden]> writes:
>
>> i just saw, that you checked in a lockfree fifo implementation to the
>> boost sandbox.
>> it looks like an implementation of the michael/scott queue, isn't it? if
>> so, i suppose, you are missing a thread-safe memory reclamation
>> mechanism ...
>> some time ago, i wrote a boost-style implementation of this data
>> structure, not sure, if you came across it [1] ... maybe it would make
>> sense to join our efforts in implementing a lockfree queue and get it
>> into boost?
>
>> [1] http://tim.klingt.org/git?p=boost_lockfree.git;a=summary
>
> Firstly, I'd like to help with this. I have a couple of lock-free queue
> implementations lying around from my book. I've attached a sample
> implementation that uses atomic reference-counting --- it would need to
> be "boostified", as it's written for C++0x, but that should be
> relatively straightforward. I don't make any claims for performance, but
> I believe it to be "correct", with no data races or undefined behaviour
> --- I *really* want to know if that's not the case, so I can correct it
> before the book goes to press.
>
> Secondly, both these queues (Tim's at the posted link, and Oliver's at
> https://svn.boost.org/trac/boost/browser/sandbox/threadpool/boost/tp/lockfree_channel.hpp)
> have race conditions in dequeue/take.
>
> First, let's look at Tim's implementation at
> http://tim.klingt.org/git?p=boost_lockfree.git;a=blob;f=boost/lockfree/fifo.hpp;h=066465af55e8f030100093742c8534b3fbb9ce40;hb=HEAD
>
> Suppose there are two threads calling deqeue() concurrently. If there is
> an item in the queue, both will proceed identically down to line 137, as
> neither thread modifies the queue data before then. Now suppose one
> thread (A) gets preempted, but the other (B) proceeds. Thread B will
> read next->data and assign it to ret. It will then update head_ with CAS
> (which will succeed as no other thread is modifying/has modified the
> data structure), and *deallocate the node* at line 141. This will
> destroy next->data. Now, when thread A wakes up it will try and read
> next->data => reading destroyed object, undefined behaviour.
>
> Oliver's take() suffers a similar problem. Suppose there are two threads
> calling take(). Both proceed as far as line 168, and then thread A gets
> pre-empted whilst thread B proceeds. There is an item on the queue, so
> val is non-NULL, and the queue hasn't been modified, so head_==head. No
> other thread is modifying the queue, so we end up at line 197. We then
> update head_ in line 198 and *delete head.ptr* at line 202 and set it to
> NULL at line 203. Now, when thread A wakes up the first thing it does is
> read head.ptr->prev, which is a dereference of a NULL ptr => undefined
> behaviour.
>
> Lock-free code is hard. Memory reclamation makes it doubly hard.
>
> Anthony
> --
> Author of C++ Concurrency in Action | http://www.manning.com/williams
> just::thread C++0x thread library | http://www.stdthread.co.uk
> Just Software Solutions Ltd |
> http://www.justsoftwaresolutions.co.uk
> 15 Carrallack Mews, St Just, Cornwall, TR19 7UL, UK. Company No. 5478976
>
> template<typename T>
> class queue
> {
> private:
> struct node;
>
> struct counted_node_ptr
> {
> int external_count;
> node* ptr;
> };
>
> struct node_counter
> {
> int internal_count:30;
> unsigned external_counters:2;
> };
>
> struct node
> {
> std::atomic<T*> data;
> std::atomic<node_counter> count;
> std::atomic<counted_node_ptr> next;
>
> node()
> {
> node_counter new_count;
> new_count.internal_count=0;
> new_count.external_counters=2;
> count.store(new_count);
>
> counted_node_ptr next_node={0};
> next.store(next_node);
> }
>
> void release_ref()
> {
> node_counter old_counter=
> count.load(std::memory_order_relaxed);
> node_counter new_counter;
> do
> {
> new_counter=old_counter;
> --new_counter.internal_count;
> }
> while(!count.compare_exchange_strong(
> old_counter,new_counter,
> std::memory_order_acquire,std::memory_order_relaxed));
>
> if(!new_counter.internal_count &&
> !new_counter.external_counters)
> {
> delete this;
> }
> }
>
> };
>
> std::atomic<counted_node_ptr> head;
> std::atomic<counted_node_ptr> tail;
>
> static void increase_external_count(
> std::atomic<counted_node_ptr>& counter,
> counted_node_ptr& old_counter)
> {
> counted_node_ptr new_counter;
>
> do
> {
> new_counter=old_counter;
> ++new_counter.external_count;
> }
> while(!counter.compare_exchange_strong(
> old_counter,new_counter,
> std::memory_order_acquire,
> std::memory_order_relaxed));
>
> old_counter.external_count=new_counter.external_count;
> }
>
> void set_new_tail(counted_node_ptr &old_tail,
> counted_node_ptr const &new_tail)
> {
> node* const current_tail_ptr=old_tail.ptr;
> while(!tail.compare_exchange_weak(old_tail,new_tail) &&
> old_tail.ptr==current_tail_ptr);
> if(old_tail.ptr==current_tail_ptr)
> {
> free_external_counter(old_tail);
> }
> else
> {
> current_tail_ptr->release_ref();
> }
> }
>
> static void free_external_counter(counted_node_ptr &old_node_ptr)
> {
> node* const ptr=old_node_ptr.ptr;
> int const count_increase=old_node_ptr.external_count-2;
>
> node_counter old_counter=
> ptr->count.load(std::memory_order_relaxed);
> node_counter new_counter;
> do
> {
> new_counter=old_counter;
> --new_counter.external_counters;
> new_counter.internal_count+=count_increase;
> }
> while(!ptr->count.compare_exchange_strong(
> old_counter,new_counter,
> std::memory_order_acquire,std::memory_order_relaxed));
>
> if(!new_counter.internal_count &&
> !new_counter.external_counters)
> {
> delete ptr;
> }
> }
>
> public:
> queue()
> {
> counted_node_ptr new_node;
> new_node.external_count=1;
> new_node.ptr=new node;
>
> head.store(new_node);
> tail.store(new_node);
> }
>
> queue(const queue& other)=delete;
> queue& operator=(const queue& other)=delete;
>
> ~queue()
> {
> while(pop());
> delete head.load().ptr;
> }
> std::unique_ptr<T> pop()
> {
> counted_node_ptr old_head=head.load(std::memory_order_relaxed);
> for(;;)
> {
> increase_external_count(head,old_head);
> node* const ptr=old_head.ptr;
> if(ptr==tail.load().ptr)
> {
> return std::unique_ptr<T>();
> }
> counted_node_ptr next=ptr->next.load();
> if(head.compare_exchange_strong(old_head,next))
> {
> T* const res=ptr->data.exchange(NULL);
> free_external_counter(old_head);
> return std::unique_ptr<T>(res);
> }
> ptr->release_ref();
> }
> }
>
> void push(T new_value)
> {
> std::unique_ptr<T> new_data(new T(new_value));
> counted_node_ptr new_next;
> new_next.ptr=new node;
> new_next.external_count=1;
> counted_node_ptr old_tail=tail.load();
>
> for(;;)
> {
> increase_external_count(tail,old_tail);
>
> T* old_data=NULL;
> if(old_tail.ptr->data.compare_exchange_strong(
> old_data,new_data.get()))
> {
> counted_node_ptr old_next={0};
> if(!old_tail.ptr->next.compare_exchange_strong(
> old_next,new_next))
> {
> delete new_next.ptr;
> new_next=old_next;
> }
> set_new_tail(old_tail, new_next);
> new_data.release();
> break;
> }
> else
> {
> counted_node_ptr old_next={0};
> if(old_tail.ptr->next.compare_exchange_strong(
> old_next,new_next))
> {
> old_next=new_next;
> new_next.ptr=new node;
> }
> set_new_tail(old_tail, old_next);
> }
> }
> }
> };
>
> _______________________________________________
> Unsubscribe & other changes:
> http://lists.boost.org/mailman/listinfo.cgi/boost
>

Hi,

I'm also interesteed on lock-free queues. In particular I need one that have
* single-writer single-reader

So there is not issue push/push and front/front race conditions.

I think that we can extend this and provide implementation thats are
lock-free for
* single-writer single-reader
* multiple-writer single-reader
* single-writer multiple-reader
* multiple-writer multiple-reader

So we can have the better implementation for each case.

I'm also interested on a dequeue in which
* multiple-writer (push) multiple-reader(pop) single_reader(front).

I think that the Thread pool has one like that for the internal queues, but
I need to verify.

Best,
Vicente

-- 
View this message in context: http://www.nabble.com/threadpool-lockfree_channel-tp22529985p22535465.html
Sent from the Boost - Dev mailing list archive at Nabble.com.

Boost list run by bdawes at acm.org, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk