|
Boost-Commit : |
From: stipe_at_[hidden]
Date: 2008-08-24 21:21:46
Author: srajko
Date: 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
New Revision: 48360
URL: http://svn.boost.org/trac/boost/changeset/48360
Log:
added glv gui example, updated docs for review
Added:
sandbox/SOC/2007/signals/libs/dataflow/example/glv_gui/glvgui_vtk.cpp (contents, props changed)
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/Jamfile (contents, props changed)
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/.DS_Store (contents, props changed)
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/future/
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/future/exception_ptr.hpp (contents, props changed)
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/future/exception_ptr_impl.hpp (contents, props changed)
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/future/future.hpp (contents, props changed)
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/future/future_detail.hpp (contents, props changed)
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/future/future_exceptions.hpp (contents, props changed)
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/future/future_group.hpp (contents, props changed)
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/future/future_stream.hpp (contents, props changed)
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/adaptive.hpp (contents, props changed)
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/adjustment.hpp (contents, props changed)
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/bounded_channel.hpp (contents, props changed)
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/detail/
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/detail/interrupter.hpp (contents, props changed)
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/detail/lock.hpp (contents, props changed)
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/detail/lock_guard.hpp (contents, props changed)
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/exceptions.hpp (contents, props changed)
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/fifo.hpp (contents, props changed)
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/fixed.hpp (contents, props changed)
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/lazy.hpp (contents, props changed)
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/lifo.hpp (contents, props changed)
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/pool.hpp (contents, props changed)
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/poolsize.hpp (contents, props changed)
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/priority.hpp (contents, props changed)
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/rendezvous_channel.hpp (contents, props changed)
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/smart.hpp (contents, props changed)
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/task.hpp (contents, props changed)
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/unbounded_channel.hpp (contents, props changed)
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/watermark.hpp (contents, props changed)
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/threadpool_example.cpp (contents, props changed)
Text files modified:
sandbox/SOC/2007/signals/boost/dataflow/signals/component/condition.hpp | 2
sandbox/SOC/2007/signals/boost/dataflow/signals/component/multiplexer.hpp | 2
sandbox/SOC/2007/signals/boost/dataflow/vtk/support.hpp | 1
sandbox/SOC/2007/signals/libs/dataflow/build/xcodeide/dataflow.xcodeproj/project.pbxproj | 102 ++++++++++++++++++++++++++++++++++++++++
sandbox/SOC/2007/signals/libs/dataflow/doc/Jamfile.v2 | 36 ++++++++++++++
sandbox/SOC/2007/signals/libs/dataflow/doc/introduction/introduction.qbk | 20 ++++---
sandbox/SOC/2007/signals/libs/dataflow/doc/signals.qbk | 2
sandbox/SOC/2007/signals/libs/dataflow/doc/signals/components.qbk | 25 ++++++++-
sandbox/SOC/2007/signals/libs/dataflow/doc/signals/concepts.qbk | 7 +
sandbox/SOC/2007/signals/libs/dataflow/doc/signals/connections.qbk | 8 +-
sandbox/SOC/2007/signals/libs/dataflow/doc/signals/introduction.qbk | 2
sandbox/SOC/2007/signals/libs/dataflow/doc/signals/tutorial.qbk | 14 +++-
sandbox/SOC/2007/signals/libs/dataflow/doc/support/examples/vtk_example.qbk | 18 +++---
sandbox/SOC/2007/signals/libs/dataflow/example/glv_gui/blueprint_bank.cpp | 5 +
sandbox/SOC/2007/signals/libs/dataflow/example/glv_gui/blueprint_component.cpp | 71 ++++++++++++++++++---------
sandbox/SOC/2007/signals/libs/dataflow/example/glv_gui/blueprint_component.hpp | 14 +++--
sandbox/SOC/2007/signals/libs/dataflow/example/glv_gui/blueprint_window.cpp | 40 +++++++++++---
sandbox/SOC/2007/signals/libs/dataflow/example/glv_gui/blueprint_window.hpp | 7 +-
sandbox/SOC/2007/signals/libs/dataflow/example/signals/gil_example.cpp | 2
sandbox/SOC/2007/signals/libs/dataflow/example/signals/quick_start_examples.cpp | 2
sandbox/SOC/2007/signals/libs/dataflow/example/signals/simple_distributed_example.cpp | 40 +++++++-------
sandbox/SOC/2007/signals/libs/dataflow/example/signals/simple_example_components.hpp | 8 +-
sandbox/SOC/2007/signals/libs/dataflow/test/signals/test_same_type.cpp | 30 +++++------
sandbox/SOC/2007/signals/libs/dataflow/test/signals/test_storage.cpp | 4 +
sandbox/SOC/2007/signals/libs/glv/src/glv_pimpl_binding_glut.cpp | 1
sandbox/SOC/2007/signals/libs/glv/src/glv_view.cpp | 13 +++-
26 files changed, 347 insertions(+), 129 deletions(-)
Modified: sandbox/SOC/2007/signals/boost/dataflow/signals/component/condition.hpp
==============================================================================
--- sandbox/SOC/2007/signals/boost/dataflow/signals/component/condition.hpp (original)
+++ sandbox/SOC/2007/signals/boost/dataflow/signals/component/condition.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -48,8 +48,6 @@
condition<Signature, OutSignal, SignalArgs>,
detail::cond_and_mutex, detail::notify_all, Signature, OutSignal, SignalArgs> base_type;
public:
- /** Initializes the internal counter to 0.
- */
condition(boost::condition &cond, boost::mutex &m)
: base_type(detail::cond_and_mutex(cond, m))
{}
Modified: sandbox/SOC/2007/signals/boost/dataflow/signals/component/multiplexer.hpp
==============================================================================
--- sandbox/SOC/2007/signals/boost/dataflow/signals/component/multiplexer.hpp (original)
+++ sandbox/SOC/2007/signals/boost/dataflow/signals/component/multiplexer.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -41,7 +41,7 @@
*/
multiplexer(int selector=0)
{ select(selector); }
- /** Enables the junction (signals will be forwarded).
+ /** Sets the multiplexer to forward the specified input.
*/
void select(int selector)
{ multiplexer::member = selector; }
Modified: sandbox/SOC/2007/signals/boost/dataflow/vtk/support.hpp
==============================================================================
--- sandbox/SOC/2007/signals/boost/dataflow/vtk/support.hpp (original)
+++ sandbox/SOC/2007/signals/boost/dataflow/vtk/support.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -17,6 +17,7 @@
#include <boost/dataflow/support/port/port_adapter.hpp>
#include <boost/dataflow/support/fusion_component.hpp>
#include <boost/dataflow/support/fusion_keyed_port.hpp>
+#include <boost/dataflow/signals/support.hpp>
#include <boost/assert.hpp>
#include <boost/mpl/and.hpp>
Modified: sandbox/SOC/2007/signals/libs/dataflow/build/xcodeide/dataflow.xcodeproj/project.pbxproj
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/build/xcodeide/dataflow.xcodeproj/project.pbxproj (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/build/xcodeide/dataflow.xcodeproj/project.pbxproj 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -114,6 +114,8 @@
086231D10D1F42B60068A238 /* BlueprintWindow.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = BlueprintWindow.cpp; sourceTree = "<group>"; };
08668C4E0C19A16300ACB19A /* simple_distributed_example.cpp */ = {isa = PBXFileReference; fileEncoding = 30; lastKnownFileType = sourcecode.cpp.cpp; path = simple_distributed_example.cpp; sourceTree = "<group>"; };
08668C4F0C19A16300ACB19A /* Jamfile.v2 */ = {isa = PBXFileReference; fileEncoding = 30; lastKnownFileType = text; path = Jamfile.v2; sourceTree = "<group>"; };
+ 0889996E0E478BA6009B59E2 /* glv_buttons.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = glv_buttons.h; sourceTree = "<group>"; };
+ 0889996F0E478BA6009B59E2 /* glv_widget.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = glv_widget.h; sourceTree = "<group>"; };
088FC6BF0D5A6EAD004F0E76 /* Jamfile.v2 */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = Jamfile.v2; sourceTree = "<group>"; };
08920DCE0D84514B00E148EF /* test_bind_mem_fn_overload.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = test_bind_mem_fn_overload.cpp; sourceTree = "<group>"; };
089321AD0D7BB18300F16965 /* vtk_example.qbk */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = vtk_example.qbk; sourceTree = "<group>"; };
@@ -225,6 +227,34 @@
08D2C7390D1DDEB8008388D7 /* fltk_gui_example.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = fltk_gui_example.cpp; sourceTree = "<group>"; };
08DC14FC0C951C4800B96B2E /* Cone.cxx */ = {isa = PBXFileReference; fileEncoding = 30; lastKnownFileType = sourcecode.cpp.cpp; path = Cone.cxx; sourceTree = "<group>"; };
08E180320E2C863D00F4BF04 /* test_vector_same_initialization.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = test_vector_same_initialization.cpp; sourceTree = "<group>"; };
+ 08E228E10E6207EC00D1C2AF /* exception_ptr.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = exception_ptr.hpp; sourceTree = "<group>"; };
+ 08E228E20E6207EC00D1C2AF /* exception_ptr_impl.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = exception_ptr_impl.hpp; sourceTree = "<group>"; };
+ 08E228E30E6207EC00D1C2AF /* future.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = future.hpp; sourceTree = "<group>"; };
+ 08E228E40E6207EC00D1C2AF /* future_detail.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = future_detail.hpp; sourceTree = "<group>"; };
+ 08E228E50E6207EC00D1C2AF /* future_exceptions.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = future_exceptions.hpp; sourceTree = "<group>"; };
+ 08E228E60E6207EC00D1C2AF /* future_group.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = future_group.hpp; sourceTree = "<group>"; };
+ 08E228E70E6207EC00D1C2AF /* future_stream.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = future_stream.hpp; sourceTree = "<group>"; };
+ 08E228E90E6207EC00D1C2AF /* adaptive.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = adaptive.hpp; sourceTree = "<group>"; };
+ 08E228EA0E6207EC00D1C2AF /* adjustment.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = adjustment.hpp; sourceTree = "<group>"; };
+ 08E228EB0E6207EC00D1C2AF /* bounded_channel.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = bounded_channel.hpp; sourceTree = "<group>"; };
+ 08E228ED0E6207EC00D1C2AF /* interrupter.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = interrupter.hpp; sourceTree = "<group>"; };
+ 08E228EE0E6207EC00D1C2AF /* lock.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = lock.hpp; sourceTree = "<group>"; };
+ 08E228EF0E6207EC00D1C2AF /* lock_guard.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = lock_guard.hpp; sourceTree = "<group>"; };
+ 08E228F00E6207EC00D1C2AF /* exceptions.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = exceptions.hpp; sourceTree = "<group>"; };
+ 08E228F10E6207EC00D1C2AF /* fifo.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = fifo.hpp; sourceTree = "<group>"; };
+ 08E228F20E6207EC00D1C2AF /* fixed.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = fixed.hpp; sourceTree = "<group>"; };
+ 08E228F30E6207EC00D1C2AF /* lazy.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = lazy.hpp; sourceTree = "<group>"; };
+ 08E228F40E6207EC00D1C2AF /* lifo.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = lifo.hpp; sourceTree = "<group>"; };
+ 08E228F50E6207EC00D1C2AF /* pool.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = pool.hpp; sourceTree = "<group>"; };
+ 08E228F60E6207EC00D1C2AF /* poolsize.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = poolsize.hpp; sourceTree = "<group>"; };
+ 08E228F70E6207EC00D1C2AF /* priority.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = priority.hpp; sourceTree = "<group>"; };
+ 08E228F80E6207EC00D1C2AF /* rendezvous_channel.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = rendezvous_channel.hpp; sourceTree = "<group>"; };
+ 08E228F90E6207EC00D1C2AF /* smart.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = smart.hpp; sourceTree = "<group>"; };
+ 08E228FA0E6207EC00D1C2AF /* task.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = task.hpp; sourceTree = "<group>"; };
+ 08E228FB0E6207EC00D1C2AF /* unbounded_channel.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = unbounded_channel.hpp; sourceTree = "<group>"; };
+ 08E228FC0E6207EC00D1C2AF /* watermark.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = watermark.hpp; sourceTree = "<group>"; };
+ 08E228FE0E62084100D1C2AF /* Jamfile */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.jam; path = Jamfile; sourceTree = "<group>"; };
+ 08E229020E6208AC00D1C2AF /* threadpool_example.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = threadpool_example.cpp; sourceTree = "<group>"; };
08EBA7590CFF8B6D0080E225 /* example.cpp */ = {isa = PBXFileReference; fileEncoding = 30; lastKnownFileType = sourcecode.cpp.cpp; path = example.cpp; sourceTree = "<group>"; };
08EF045E0CEBF1AD002ABBBC /* port_t.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = port_t.hpp; sourceTree = "<group>"; };
08EF9B220C5D506A00D4D206 /* applicator.hpp */ = {isa = PBXFileReference; fileEncoding = 30; lastKnownFileType = sourcecode.cpp.h; path = applicator.hpp; sourceTree = "<group>"; };
@@ -349,6 +379,7 @@
08F71D4E0CA3547C0010099E /* test_same_type.cpp */ = {isa = PBXFileReference; fileEncoding = 30; lastKnownFileType = sourcecode.cpp.cpp; path = test_same_type.cpp; sourceTree = "<group>"; };
08F71D4F0CA3547C0010099E /* test_socket.cpp */ = {isa = PBXFileReference; fileEncoding = 30; lastKnownFileType = sourcecode.cpp.cpp; path = test_socket.cpp; sourceTree = "<group>"; };
08F71D500CA3547C0010099E /* test_storage.cpp */ = {isa = PBXFileReference; fileEncoding = 30; lastKnownFileType = sourcecode.cpp.cpp; path = test_storage.cpp; sourceTree = "<group>"; };
+ 08F72E6C0E464418001A58ED /* glv_pimpl_binding_glut.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = glv_pimpl_binding_glut.cpp; sourceTree = "<group>"; };
08F9459A0C46A86E00E224E4 /* components.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = components.hpp; sourceTree = "<group>"; };
08F9462E0C46C2F000E224E4 /* fibonacci.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = fibonacci.cpp; sourceTree = "<group>"; };
08F972FB0D0471FA00ABF6C1 /* port_adapter.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = port_adapter.hpp; sourceTree = "<group>"; };
@@ -435,6 +466,7 @@
08668C4D0C19A16300ACB19A /* example */ = {
isa = PBXGroup;
children = (
+ 08E228DE0E6207EC00D1C2AF /* threadpool */,
08A6B2640E25A566005539F2 /* glv_gui */,
08668C4F0C19A16300ACB19A /* Jamfile.v2 */,
08EBA7590CFF8B6D0080E225 /* example.cpp */,
@@ -562,6 +594,8 @@
08A2D5050E2A728A00D8FA04 /* include */ = {
isa = PBXGroup;
children = (
+ 0889996E0E478BA6009B59E2 /* glv_buttons.h */,
+ 0889996F0E478BA6009B59E2 /* glv_widget.h */,
08A2D5060E2A728A00D8FA04 /* COPYRIGHT */,
08A2D5070E2A728A00D8FA04 /* glv.h */,
08A2D5080E2A728A00D8FA04 /* glv_abstract_binding.h */,
@@ -587,6 +621,7 @@
08A2D5190E2A728A00D8FA04 /* src */ = {
isa = PBXGroup;
children = (
+ 08F72E6C0E464418001A58ED /* glv_pimpl_binding_glut.cpp */,
08A2D51A0E2A728A00D8FA04 /* COPYRIGHT */,
08A2D51B0E2A728A00D8FA04 /* FontGL */,
08A2D51E0E2A728A00D8FA04 /* glv_abstract_binding.cpp */,
@@ -710,6 +745,73 @@
path = fltk_gui;
sourceTree = "<group>";
};
+ 08E228DE0E6207EC00D1C2AF /* threadpool */ = {
+ isa = PBXGroup;
+ children = (
+ 08E228DF0E6207EC00D1C2AF /* boost */,
+ 08E228FE0E62084100D1C2AF /* Jamfile */,
+ 08E229020E6208AC00D1C2AF /* threadpool_example.cpp */,
+ );
+ path = threadpool;
+ sourceTree = "<group>";
+ };
+ 08E228DF0E6207EC00D1C2AF /* boost */ = {
+ isa = PBXGroup;
+ children = (
+ 08E228E00E6207EC00D1C2AF /* future */,
+ 08E228E80E6207EC00D1C2AF /* tp */,
+ );
+ path = boost;
+ sourceTree = "<group>";
+ };
+ 08E228E00E6207EC00D1C2AF /* future */ = {
+ isa = PBXGroup;
+ children = (
+ 08E228E10E6207EC00D1C2AF /* exception_ptr.hpp */,
+ 08E228E20E6207EC00D1C2AF /* exception_ptr_impl.hpp */,
+ 08E228E30E6207EC00D1C2AF /* future.hpp */,
+ 08E228E40E6207EC00D1C2AF /* future_detail.hpp */,
+ 08E228E50E6207EC00D1C2AF /* future_exceptions.hpp */,
+ 08E228E60E6207EC00D1C2AF /* future_group.hpp */,
+ 08E228E70E6207EC00D1C2AF /* future_stream.hpp */,
+ );
+ path = future;
+ sourceTree = "<group>";
+ };
+ 08E228E80E6207EC00D1C2AF /* tp */ = {
+ isa = PBXGroup;
+ children = (
+ 08E228E90E6207EC00D1C2AF /* adaptive.hpp */,
+ 08E228EA0E6207EC00D1C2AF /* adjustment.hpp */,
+ 08E228EB0E6207EC00D1C2AF /* bounded_channel.hpp */,
+ 08E228EC0E6207EC00D1C2AF /* detail */,
+ 08E228F00E6207EC00D1C2AF /* exceptions.hpp */,
+ 08E228F10E6207EC00D1C2AF /* fifo.hpp */,
+ 08E228F20E6207EC00D1C2AF /* fixed.hpp */,
+ 08E228F30E6207EC00D1C2AF /* lazy.hpp */,
+ 08E228F40E6207EC00D1C2AF /* lifo.hpp */,
+ 08E228F50E6207EC00D1C2AF /* pool.hpp */,
+ 08E228F60E6207EC00D1C2AF /* poolsize.hpp */,
+ 08E228F70E6207EC00D1C2AF /* priority.hpp */,
+ 08E228F80E6207EC00D1C2AF /* rendezvous_channel.hpp */,
+ 08E228F90E6207EC00D1C2AF /* smart.hpp */,
+ 08E228FA0E6207EC00D1C2AF /* task.hpp */,
+ 08E228FB0E6207EC00D1C2AF /* unbounded_channel.hpp */,
+ 08E228FC0E6207EC00D1C2AF /* watermark.hpp */,
+ );
+ path = tp;
+ sourceTree = "<group>";
+ };
+ 08E228EC0E6207EC00D1C2AF /* detail */ = {
+ isa = PBXGroup;
+ children = (
+ 08E228ED0E6207EC00D1C2AF /* interrupter.hpp */,
+ 08E228EE0E6207EC00D1C2AF /* lock.hpp */,
+ 08E228EF0E6207EC00D1C2AF /* lock_guard.hpp */,
+ );
+ path = detail;
+ sourceTree = "<group>";
+ };
08EF9B200C5D506A00D4D206 /* signals */ = {
isa = PBXGroup;
children = (
Modified: sandbox/SOC/2007/signals/libs/dataflow/doc/Jamfile.v2
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/doc/Jamfile.v2 (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/doc/Jamfile.v2 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -14,6 +14,41 @@
xml dataflow_xml : dataflow.qbk ;
+doxygen dataflow_signals_doxygen
+ :
+ [ glob ../../../boost/dataflow/signals/component/filter.hpp ]
+ [ glob ../../../boost/dataflow/signals/component/storage.hpp ]
+ [ glob ../../../boost/dataflow/signals/component/counter.hpp ]
+ [ glob ../../../boost/dataflow/signals/component/junction.hpp ]
+ [ glob ../../../boost/dataflow/signals/component/multiplexer.hpp ]
+ [ glob ../../../boost/dataflow/signals/component/mutex.hpp ]
+ [ glob ../../../boost/dataflow/signals/component/condition.hpp ]
+ [ glob ../../../boost/dataflow/signals/component/function.hpp ]
+ [ glob ../../../boost/dataflow/signals/component/socket_sender.hpp ]
+ [ glob ../../../boost/dataflow/signals/component/socket_receiver.hpp ]
+ [ glob ../../../boost/dataflow/signals/component/applicator.hpp ]
+ [ glob ../../../boost/dataflow/signals/component/conditional.hpp ]
+ [ glob ../../../boost/dataflow/signals/component/conditional_modifier.hpp ]
+ [ glob ../../../boost/dataflow/signals/component/instantiator.hpp ]
+ [ glob ../../../boost/dataflow/signals/component/timed_generator.hpp ]
+ :
+ <doxygen:param>TAB_SIZE=4
+ <doxygen:param>EXAMPLE_PATH=../test
+ <doxygen:param>EXAMPLE_PATH=../example
+ <doxygen:param>STRIP_FROM_PATH=$(root)
+ <doxygen:param>STRIP_FROM_INC_PATH=$(root)
+ <doxygen:param>EXTRACT_ALL=NO
+ <doxygen:param>HIDE_UNDOC_MEMBERS=YES
+ <doxygen:param>INHERIT_DOCS=YES
+ <doxygen:param>EXTRACT_PRIVATE=NO
+ <doxygen:param>ENABLE_PREPROCESSING=YES
+ <doxygen:param>MACRO_EXPANSION=YES
+ <doxygen:param>SEARCH_INCLUDES=YES
+ <doxygen:param>INCLUDE_PATH=../../..
+ <doxygen:param>INCLUDE_PATH=$(BOOST_ROOT)
+ <doxygen:param>PREDEFINED=DOXYGEN_DOCS_BUILD
+ ;
+
doxygen dataflow_blueprint_doxygen
:
[ glob ../../../boost/dataflow/blueprint/port.hpp ]
@@ -95,6 +130,7 @@
dataflow_utility_doxygen
dataflow_support_doxygen
dataflow_blueprint_doxygen
+ dataflow_signals_doxygen
:
# pull in the online .css and images
<xsl:param>project.root=http://dancinghacker.com
Modified: sandbox/SOC/2007/signals/libs/dataflow/doc/introduction/introduction.qbk
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/doc/introduction/introduction.qbk (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/doc/introduction/introduction.qbk 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -49,17 +49,18 @@
as well as the connection and data extraction functionality
(the [rationale] section discusses why this is the initial focus).
-[DataflowSignals] has a (very thin) support layer which makes `boost::signal`
-and `boost::function` model the Dataflow [concepts], and there is an
-[vtk_example example] showing how to do the same for [VTK]. Support layers
-for other dataflow frameworks will be added in the future.
-
-Developing a Dataflow support layer for a dataflow framework allows
+Dataflow frameworks can be made to model the [concepts] of the generic dataflow
+layer by developing a Dataflow support layer for the framework.
+This allows
anything that is built on top of the generic layer (e.g., [DataflowBlueprint],
a [gui_example GUI dataflow editor], and operators which can be used
to connect components in a clean, readable manner) to be used with the
framework. See the [link dataflow.future future directions] section
-for an idea of about other things in planning.
+for an idea of about other things in planning. See [DataflowSignals]
+for an example of a (very thin) support layer which makes `boost::signal`
+and `boost::function` model the Dataflow [concepts]. There is also a
+[vtk_example example] showing how to do the same for [VTK]. Support layers
+for other dataflow frameworks will be added in the future.
[h5 Dataflow.Signals layer]
@@ -88,7 +89,10 @@
dataflow framework of a different kind - one that allows runtime specification
of port/component characteristics, offers memory management for the data being
exchanged between the components, as well as flexible scheduling and optimized
-evaluation. At this point, however, such a framework is missing.
+evaluation. At this point, some work in this direction is being done in implementing
+a framework that keeps track of the entire
+network of components and takes care of component invocation scheduling.
+The framework is called Dataflow.Managed, but is currently completely undocumented.
[heading Where to go from here]
Modified: sandbox/SOC/2007/signals/libs/dataflow/doc/signals.qbk
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/doc/signals.qbk (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/doc/signals.qbk 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -8,4 +8,6 @@
[include signals/components.qbk]
+[xinclude dataflow_signals_doxygen.xml]
+
[endsect]
Modified: sandbox/SOC/2007/signals/libs/dataflow/doc/signals/components.qbk
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/doc/signals/components.qbk (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/doc/signals/components.qbk 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -130,6 +130,7 @@
[endsect][/generic]
[section:properties Signal Properties]
+
[section:storage storage]
[*See also]: [classref boost::signals::storage storage class reference.]
@@ -141,6 +142,8 @@
The [storage] class can store the arguments it receives from a signal,
as well as transmit the stored argument values through its own signal.
+[heading Example]
+
[test_storage_unfused]
[endsect][/storage]
@@ -157,6 +160,8 @@
receives a signal, it will increment an internal
counter and forward the signal.
+[heading Example]
+
[test_counter_unfused]
[endsect][/counter]
@@ -174,6 +179,8 @@
The [junction] class can be used to bring in multiple signals, and forward
them all to the same set of output signals.
+[heading Example]
+
[test_junction_unfused]
[endsect][/junction]
@@ -189,6 +196,8 @@
The [multiplexer] class can be used to bring in multiple signals, and forward
them all to the same set of output signals.
+[heading Example]
+
[test_multiplexer_unfused]
[endsect][/multiplexer]
@@ -208,12 +217,14 @@
The [mutex] class locks an internal mutex when it receives a signal, and then forwards the signal.
[mutex] is an __instantiator__ with the Instantiation boost::mutex::scoped_lock and Member boost::mutex.
+[heading Example]
+
[test_mutex_unfused]
[endsect][/mutex]
[section:condition condition]
-[*See also]: [classref boost::signals::mutex mutex class reference.]
+[*See also]: [classref boost::signals::condition condition class reference.]
[heading Model of]
* [SignalFilterComponent]
@@ -222,7 +233,9 @@
The [condition] class signals a threading condition when receiving a signal.
-[test_mutex_unfused]
+[heading Example]
+
+[test_socket]
[endsect][/condition]
@@ -240,6 +253,8 @@
The [function] class can be used to apply a function to an incoming signal and output the result.
[function] is a __modifier__ with the Modifier set to an adapter for the provided function.
+[heading Example]
+
[test_function_unfused]
[endsect][/function]
@@ -257,6 +272,8 @@
The [chain] class chains together multiple copies of the same component.
+[heading Example]
+
[test_chain_unfused]
The example above uses the following classes:
@@ -281,7 +298,7 @@
network connection. Any signal going to the [socket_sender] will be serialized and sent over a provided
socket. A [socket_receiver] can be used on the other end to unserialize the signal and forward it.
-Example:
+[heading Example]
[test_socket]
@@ -300,7 +317,7 @@
network connection. Any signal going to the [socket_sender] will be serialized and sent over a provided
socket. A [socket_receiver] can be used on the other end to unserialize the signal and forward it.
-Example:
+[heading Example]
[test_socket]
Modified: sandbox/SOC/2007/signals/libs/dataflow/doc/signals/concepts.qbk
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/doc/signals/concepts.qbk (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/doc/signals/concepts.qbk 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -1,5 +1,8 @@
[section Concepts]
+This section describes certain concepts used by the [DataflowSignals] layer.
+The section is under development.
+
[section:signaltraits SignalTraits]
A [Traits] type `T` is a ['[SignalTraits]] if
its tag is `dataflow::signals::tag`.
@@ -105,8 +108,8 @@
[section SignalConsumer]
A consumer [Port] type `P` is a ['[SignalConsumer]] if its
-[PortTraits] is a [SignalPortTraits]. A [SignalProducer] must also
-match the concepts modeled by the [boost_signal] class.
+[PortTraits] is a [SignalPortTraits]. A [SignalConsumer] must also
+match the concepts modeled by the [boost_function] class.
[heading Refinement of]
* consumer [Port]
Modified: sandbox/SOC/2007/signals/libs/dataflow/doc/signals/connections.qbk
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/doc/signals/connections.qbk (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/doc/signals/connections.qbk 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -6,7 +6,7 @@
[DataflowSignals] provides a [connect] function in the `boost::signals`
namespace, which connects a [SignalProducer] and [SignalConsumer]
-which are [Connectable].
+which are [Connectable].
Given a [SignalProducer] `p` and [Connectable] [SignalConsumer] `c`,
@@ -14,12 +14,12 @@
will create a connection between the two.
-Since [DataflowSignals] [components] provide default ports, they can be used
-with the [connect] function directly. The default [SignalProducer] is the
+The same function can also connect [components] which provide default ports.
+With [DataflowSignals] [components], the default [SignalProducer] is the
[boost_signal] provided by each of the [components], and the default [SignalConsumer]
is the call operator ([^operator()]).
-[note The call operator acts as a [KeyedPort] - an appropriate overload will
+[note The call operator ([^operator()]) acts as a [KeyedPort] - an appropriate overload will
be selected depending on the signature of the signal being connected]
The [link dataflow.signals.introduction.quick_start quick start] section
Modified: sandbox/SOC/2007/signals/libs/dataflow/doc/signals/introduction.qbk
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/doc/signals/introduction.qbk (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/doc/signals/introduction.qbk 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -28,7 +28,7 @@
Here is a simple example that illustrates this:
[signal_function_connect]
-If you are familiar with [BoostSignals], will notice that the [^connect]
+If you are familiar with [BoostSignals], you will notice that the [^connect]
free function is something new - it is provided by [DataflowSignals].
[DataflowSignals] offers other utilities that make connection making easier,
like the [bind_mem_fn] function which binds a member function to an object:
Modified: sandbox/SOC/2007/signals/libs/dataflow/doc/signals/tutorial.qbk
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/doc/signals/tutorial.qbk (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/doc/signals/tutorial.qbk 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -32,11 +32,15 @@
[section:disconnect Disconnecting]
-Disconnection capability is not yet fully supported by [DataflowSignals],
-although you can always use disconnection features of [BoostSignals].
-Currently, the only [DataflowSignals] method of disconnecting is the
+There are two ways to disconnect signals in [DataflowSignals]. One is the
[disconnect_all] function, which can be applied to any component based on
-[filter]. Here is an example:
+[filter], and will disconnect everything connected to the default output signal
+of that component (it will not disconnect anything connected to the consumer
+ports of that component).
+The other way involves storing the [BoostSignals] connection object,
+which is returned by the [connect] function.
+
+Here is an example showing both:
[test_disconnect_unfused]
@@ -59,7 +63,7 @@
[test_multi_type_unfused]
-[endsect][/multiple_different]
+[endsect][/disconnect]
[section:multiple_same Multiple inputs of the same signature]
Modified: sandbox/SOC/2007/signals/libs/dataflow/doc/support/examples/vtk_example.qbk
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/doc/support/examples/vtk_example.qbk (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/doc/support/examples/vtk_example.qbk 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -48,7 +48,7 @@
the [gui_example GUI editor] currently under development.
[heading Next]
-[link dataflow.introduction.examples.new_layer.tag
+[link dataflow.support.examples.new_layer.tag
Setting up the Tag]
[section:tag Setting up the Tag]
@@ -62,7 +62,7 @@
We can now use it to tag all of our [VTK] support classes.
[heading Next]
-[link dataflow.introduction.examples.new_layer.producerconsumer
+[link dataflow.support.examples.new_layer.producerconsumer
Setting up a producer Port and a consumer Port]
[endsect][/mechanism]
@@ -95,7 +95,7 @@
[heading Next]
-[link dataflow.introduction.examples.new_layer.connectable
+[link dataflow.support.examples.new_layer.connectable
Making things Connectable]
[endsect][/producerconsumer]
@@ -119,7 +119,7 @@
[vtk_connect_unforwarded]
[heading Next]
-[link dataflow.introduction.examples.new_layer.forwarding
+[link dataflow.support.examples.new_layer.forwarding
Defining forwarding functions and operators]
[endsect][/connectable]
@@ -149,7 +149,7 @@
[vtk_connect_forwarded]
[heading Next]
-[link dataflow.introduction.examples.new_layer.component
+[link dataflow.support.examples.new_layer.component
Setting up a Component]
[endsect][/forwarding]
@@ -176,7 +176,7 @@
```
[heading Next]
-[link dataflow.introduction.examples.new_layer.filter
+[link dataflow.support.examples.new_layer.filter
Setting up a filter (ProducerPort+ConsumerPort)]
[endsect][/component]
@@ -190,7 +190,7 @@
[vtk_actor_filter]
[heading Next]
-[link dataflow.introduction.examples.new_layer.producermap
+[link dataflow.support.examples.new_layer.producermap
Setting up a KeyedPort]
[endsect][/filter]
@@ -210,7 +210,7 @@
[vtk_mapper_producer]
[heading Next]
-[link dataflow.introduction.examples.new_layer.remaining
+[link dataflow.support.examples.new_layer.remaining
Setting up the remaining components (more of the same)]
[endsect][/producermap]
@@ -220,7 +220,7 @@
[vtk_setup_rest]
[heading Next]
-[link dataflow.introduction.examples.new_layer.using_support_layer
+[link dataflow.support.examples.new_layer.using_support_layer
Using the VTK support layer]
[endsect][/remaining]
Modified: sandbox/SOC/2007/signals/libs/dataflow/example/glv_gui/blueprint_bank.cpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/glv_gui/blueprint_bank.cpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/glv_gui/blueprint_bank.cpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -15,8 +15,9 @@
: glv::Button(glv::Rect(60, 60), false), m_label(label), m_bank(bank)
{
glv::Label *l = new glv::Label(label);
- l->pos(glv::Place::TL,0,20);
- l->size(20);
+ l->pos(glv::Place::TL,0,5);
+ l->size(1);
+ l->size(60/l->width());
*this << *l;
}
void spawn()
Modified: sandbox/SOC/2007/signals/libs/dataflow/example/glv_gui/blueprint_component.cpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/glv_gui/blueprint_component.cpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/glv_gui/blueprint_component.cpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -5,7 +5,7 @@
#include <glv.h>
#include "blueprint_component.hpp"
-
+#include <iostream>
namespace boost { namespace dataflow { namespace glv_gui {
bool blueprint_invoke_button::onEvent(glv::Event::t e, glv::GLV& glv)
@@ -31,51 +31,74 @@
blueprint_component::blueprint_component(const std::string &name, blueprint::component &c, id_type id)
: glv::View(glv::Rect(100, 100))
- , m_left_buttons(glv::Rect(20, 100))
- , m_right_buttons(glv::Rect(20, 100))
- , m_layout(*this, glv::Direction::E, glv::Place::CL, 0, 50, 0)
- , m_left_layout(m_left_buttons,glv::Direction::S, glv::Place::TL, 0, 0, 0)
- , m_right_layout(m_right_buttons,glv::Direction::S, glv::Place::TL, 0, 0, 0)
+ , m_label(name)
+ , m_layout_style(0)
{
- glv::Label *l = new glv::Label(name);
- l->size(20);
- l->pos(glv::Place::TL,20,20);
- *this << *l;
- m_layout << m_left_buttons << m_invoke_button << m_right_buttons;
(*this) (glv::Event::MouseDrag, glv::Behavior::mouseMove);
- // BlueprintComponent_(label);
set_component(c, id);
+ arrange_layout();
}
void blueprint_component::set_component(blueprint::component &c, id_type id)
{
m_component=&c;
m_id = id;
-
+
if(c.is_invocable())
m_invoke_button.enable(glv::Visible);
else
m_invoke_button.disable(glv::Visible);
+
+ for (size_t i=0; i<c.num_ports(); i++)
+ m_ports.push_back(new blueprint_component_port(i));
+}
+
+void blueprint_component::arrange_layout()
+{
+ glv::Placer m_layout, m_left_layout, m_right_layout;
- blueprint::based_component<glv::View> *view_component = dynamic_cast<blueprint::based_component<glv::View> *>(&c);
+ switch(m_layout_style)
+ {
+ case 1:
+ m_layout = glv::Placer(*this, glv::Direction::E, glv::Place::CL, 0, 50, 0);
+ m_left_buttons.extent(20, 100);
+ m_right_buttons.extent(20, 100);
+ m_invoke_button.extent(60, 60);
+ m_left_layout = glv::Placer(m_left_buttons,glv::Direction::S, glv::Place::TL, 0, 0, 0);
+ m_right_layout = glv::Placer(m_right_buttons,glv::Direction::S, glv::Place::TL, 0, 80, 0);
+ break;
+ default:
+ m_layout = glv::Placer(*this, glv::Direction::S, glv::Place::TC, 50, 0, 0);
+ m_left_buttons.extent(100, 20);
+ m_right_buttons.extent(100, 20);
+ m_invoke_button.extent(95,60);
+ m_left_layout = glv::Placer(m_left_buttons,glv::Direction::E, glv::Place::TL, 0, 0, 0);
+ m_right_layout = glv::Placer(m_right_buttons,glv::Direction::E, glv::Place::TL, 80, 0, 0);
+ }
+
+ m_label.size(1);
+ m_label.size(100/m_label.width());
+ m_label.pos(glv::Place::TL,0,20);
+ *this << m_label;
+
+ m_layout << m_left_buttons << m_invoke_button << m_right_buttons;
+
+ for (size_t i=0; i<m_component->num_ports(); i++)
+ if(m_component->get_port(i).traits().category().is<ports::producer>())
+ m_right_layout << *m_ports[i];
+ else
+ m_left_layout << *m_ports[i];
+
+ blueprint::based_component<glv::View> *view_component = dynamic_cast<blueprint::based_component<glv::View> *>(m_component);
if(view_component)
{
- std::cout << "View" << std::endl;
glv::View *view = view_component->get_pointer();
*this << *view;
extent(100,200);
}
else
- std::cout << "Not View" << std::endl;
-
- for (size_t i=0; i<c.num_ports(); i++)
- if (c.get_port(i).traits().category().is<ports::producer>())
- m_right_layout << *(new blueprint_component_port(i));
- else
- m_left_layout << *(new blueprint_component_port(i));
+ extent(100, 100);
}
-
-blueprint_component *blueprint_component::m_currently_constructed;
} } }
Modified: sandbox/SOC/2007/signals/libs/dataflow/example/glv_gui/blueprint_component.hpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/glv_gui/blueprint_component.hpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/glv_gui/blueprint_component.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -15,7 +15,7 @@
{
public:
blueprint_invoke_button()
- : glv::Button(glv::Rect(60,60), false)
+ : glv::Button(glv::Rect(95,60), false)
{}
private:
bool onEvent(glv::Event::t e, glv::GLV& glv);
@@ -27,18 +27,20 @@
public:
blueprint_component(const std::string &name, blueprint::component &c, id_type id);
void set_component(blueprint::component &c, id_type id);
- void gui_begin();
- void gui_end();
void invoke() {m_component->invoke();}
id_type id() {return m_id;}
+ void arrange_layout();
+ void layout_style(int style) {m_layout_style = style;}
+ int layout_style() {return m_layout_style;}
private:
- static blueprint_component *m_currently_constructed;
blueprint::component *m_component;
std::string m_name;
id_type m_id;
glv::View m_left_buttons, m_right_buttons;
- glv::Placer m_layout, m_left_layout, m_right_layout;
- blueprint_invoke_button m_invoke_button;
+ glv::Label m_label;
+ blueprint_invoke_button m_invoke_button;
+ std::vector<blueprint_component_port *> m_ports;
+ int m_layout_style;
};
class blueprint_component_port
Modified: sandbox/SOC/2007/signals/libs/dataflow/example/glv_gui/blueprint_window.cpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/glv_gui/blueprint_window.cpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/glv_gui/blueprint_window.cpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -6,54 +6,74 @@
#include <glv.h>
#include "blueprint_window.hpp"
#include "blueprint_component.hpp"
+#include <boost/foreach.hpp>
namespace boost { namespace dataflow { namespace glv_gui {
blueprint_window::blueprint_window()
: glv::View(glv::Rect(800, 600))
, m_dragged(NULL)
- , m_layout(*this, glv::Direction::E, glv::Place::TL, 5, 5, 5)
-{
+ , m_layout(100, 200, glv::Rect(600, 800), 2, 5)
+ , m_status("Status bar...", 0, 580)
+ , m_rotate_component_layout(glv::Rect(50, 50), false)
+{
+ *this << m_status;
+ m_rotate_component_layout << *new glv::Label("Change\nStyle");
+ m_rotate_component_layout.pos(glv::Place::TL, 520, 550);
+ *this << m_rotate_component_layout;
+ m_rotate_component_layout.attachHandler(style_change_handler, this);
+ m_layout.parent = this;
this->colors().back.set(0);
- m_next_created_x = 100;
- m_next_created_y = 50;
}
+void blueprint_window::style_change_handler(glv::Notifier *sender, void *this__)
+{
+ blueprint_window *this_(reinterpret_cast<blueprint_window *>(this__));
+
+ BOOST_FOREACH(blueprint_component *c, this_->m_components)
+ {
+ c->layout_style(!c->layout_style());
+ c->arrange_layout();
+ this_->m_status.label("Switched style to " + boost::lexical_cast<std::string>(c->layout_style()));
+ }
+}
+
+
void blueprint_window::add_component(std::auto_ptr<blueprint::component> c, const std::string &name)
{
- std::cout << "Adding component" << name << std::endl;
+ m_status.label("Adding component");
blueprint::component &cr = *c;
blueprint_component *bc = new blueprint_component(name, cr, m_network.add_component(c));
- if((m_components.size()%5)==0)
- m_layout.pos(0, m_components.size()/5 * 220);
m_components.push_back(bc);
m_layout << *bc;
+ bc->arrange_layout();
}
void blueprint_window::register_port_click(blueprint_component_port *port)
{
if(m_dragged)
{
- std::cout << "Attempting connect..." << std::endl;
if (m_network.are_connectable(m_dragged->component().id(), m_dragged->id(), port->component().id(), port->id()))
{
m_network.add_connection(m_dragged->component().id(), m_dragged->id(), port->component().id(), port->id(), true);
m_connections.push_back(connection(m_dragged, port));
+ m_status.label("Ports connected...");
}
else
- std::cout << "Ports not connectable" << std::endl;
+ m_status.label("Ports not connectable");
m_dragged = NULL;
}
else
{
- std::cout << "Initiating connect - click on destination port to make connection" << std::endl;
+ m_status.label("Initiating connect - click on destination port to make connection");
m_dragged = port;
}
}
void blueprint_window::onDraw()
{
+ glv::draw::lineWidth(5);
glv::draw::begin(glv::draw::Lines);
/* if(m_dragged)
{
Modified: sandbox/SOC/2007/signals/libs/dataflow/example/glv_gui/blueprint_window.hpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/glv_gui/blueprint_window.hpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/glv_gui/blueprint_window.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -34,15 +34,16 @@
void add_component(std::auto_ptr<blueprint::component> c, const std::string &name);
void register_port_click(blueprint_component_port *port);
private:
+ static void style_change_handler(glv::Notifier *sender, void *this_);
void onDraw();
blueprint::network m_network;
std::vector<blueprint_component *> m_components;
blueprint_component_port *m_dragged;
std::vector<connection> m_connections;
std::vector<connection>::iterator m_selected;
- int m_next_created_x;
- int m_next_created_y;
- glv::Placer m_layout;
+ glv::LayoutGrid m_layout;
+ glv::Label m_status;
+ glv::Button m_rotate_component_layout;
};
} } }
Added: sandbox/SOC/2007/signals/libs/dataflow/example/glv_gui/glvgui_vtk.cpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/glv_gui/glvgui_vtk.cpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -0,0 +1,88 @@
+// Copyright 2007 Stjepan Rajko.
+// Distributed under the Boost Software License, Version 1.0. (See
+// accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+#include <glv.h>
+#include <glv_pimpl_binding.h>
+
+#include "blueprint_bank.hpp"
+#include "blueprint_window.hpp"
+
+#include <boost/dataflow/vtk/support.hpp>
+#include "vtkPolyDataMapper.h"
+#include "vtkConeSource.h"
+#include <boost/dataflow/blueprint/component_bank.hpp>
+
+DATAFLOW_RUNTIME_PROPERTY(vtk::tag, const char *, "Dataflow.Graph")
+// 40 us supposed to be a UUID :-)
+DATAFLOW_RUNTIME_PROPERTY(vtk::tag, int, 40)
+
+namespace df=boost::dataflow;
+namespace blueprint=boost::dataflow::blueprint;
+
+template<typename T>
+struct vtk_smart_ptr
+{
+ typedef T element_type;
+
+ vtk_smart_ptr()
+ {
+ m_ptr = T::New();
+ }
+ vtk_smart_ptr(const vtk_smart_ptr &other)
+ {
+ m_ptr = other.m_ptr;
+ other.m_ptr = NULL;
+ }
+ ~vtk_smart_ptr()
+ {
+ if(m_ptr)
+ m_ptr->Delete();
+ }
+ T &operator*()
+ {
+ return *m_ptr;
+ }
+private:
+ mutable T *m_ptr;
+};
+
+// A component_bank with some components
+class example_bank : public blueprint::tag_component_bank<df::vtk::tag>
+{
+public:
+ example_bank()
+ {
+ add_component<vtk_smart_ptr<vtkRenderWindow> >("Render\nWindow");
+ add_component<vtk_smart_ptr<vtkRenderer> >("Renderer");
+ add_component<vtk_smart_ptr<vtkActor> >("Actor");
+ add_component<vtk_smart_ptr<vtkPolyDataMapper> >("PolyData\nMapper");
+ add_component<vtk_smart_ptr<vtkConeSource> >("Cone\nSource");
+ }
+private:
+};
+
+int main()
+{
+ glv::GLV top;
+
+ glv::Window win(640, 640, "GLV Blueprint GUI", &top);
+ glv::Placer placer(top, glv::Direction::E, glv::Place::TL, 0, 0, 0);
+
+ using namespace boost::dataflow::glv_gui;
+
+ // create the blueprint and component windows
+ blueprint_bank bank;
+ blueprint_window window;
+
+ // initialize the bank
+ bank.set_bank(example_bank());
+ bank.set_blueprint(window);
+
+ placer << bank.view() << window;
+ glv::Application::run();
+}
+
+//]
+
Modified: sandbox/SOC/2007/signals/libs/dataflow/example/signals/gil_example.cpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/signals/gil_example.cpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/signals/gil_example.cpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -79,7 +79,7 @@
gil::gray8_pixel_t operator()(const gil::gray8_pixel_t& p) const
{
- return gil::gray8_pixel_t(p + gil::bits8(generator()));
+ return gil::gray8_pixel_t(p + gil::bits8(generator()));
}
variate_generator<mt19937&, boost::normal_distribution<> > &generator;
Modified: sandbox/SOC/2007/signals/libs/dataflow/example/signals/quick_start_examples.cpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/signals/quick_start_examples.cpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/signals/quick_start_examples.cpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -73,7 +73,7 @@
// Inheriting from signals::filter or signals::consumer allows us to implement
// signal consumer ports as overloads of operator() - later you will see that
-// that allows us to access them more easily.
+// this approach allows us to access such ports more easily.
class consumer_component
: public signals::consumer<consumer_component>
{
Modified: sandbox/SOC/2007/signals/libs/dataflow/example/signals/simple_distributed_example.cpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/signals/simple_distributed_example.cpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/signals/simple_distributed_example.cpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -31,15 +31,15 @@
// its final signal through the socket.
void asio_server()
{
- // set up the socket
- asio::ip::tcp::acceptor acceptor(io_service, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), 1097));
- asio::ip::tcp::socket socket(io_service);
- {
- boost::mutex::scoped_lock lock(mutex_);
- acceptor.listen();
- cond.notify_all();
- }
- acceptor.accept(socket);
+ // set up the socket
+ asio::ip::tcp::acceptor acceptor(io_service, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), 1097));
+ asio::ip::tcp::socket socket(io_service);
+ {
+ boost::mutex::scoped_lock lock(mutex_);
+ acceptor.listen();
+ cond.notify_all();
+ }
+ acceptor.accept(socket);
// For our data source, we will use timed_generator,
// which creates its own thread and outputs it's stored value
@@ -47,7 +47,7 @@
// The signature void(double) specifies that the signal carries a double,
// and that there is no return value.
signals::timed_generator<void (double)> input(0);
- // To have our dataflow network straddle a network connection,
+ // To have our dataflow network straddle a network connection,
// we need a socket_sender
signals::socket_sender<void (double)> sender(socket);
@@ -74,15 +74,15 @@
int main(int, char* [])
{
- // start the server in a separate thread, and wait until it is listening
- boost::mutex::scoped_lock lock(mutex_);
- boost::thread t(asio_server);
- cond.wait(lock);
-
- // set up the socket
- asio::ip::tcp::endpoint endpoint_recv(asio::ip::address::from_string("127.0.0.1"), 1097);
- asio::ip::tcp::socket socket(io_service);
- socket.connect(endpoint_recv);
+ // start the server in a separate thread, and wait until it is listening
+ boost::mutex::scoped_lock lock(mutex_);
+ boost::thread t(asio_server);
+ cond.wait(lock);
+
+ // set up the socket
+ asio::ip::tcp::endpoint endpoint_recv(asio::ip::address::from_string("127.0.0.1"), 1097);
+ asio::ip::tcp::socket socket(io_service);
+ socket.connect(endpoint_recv);
// Setup data processor and output:
processor proc;
@@ -103,7 +103,7 @@
boost::thread receive_thread(boost::bind(&asio::io_service::run, boost::ref(io_service)));
// and wait until the server is done sending
- t.join();
+ t.join();
io_service.stop();
receive_thread.join();
Modified: sandbox/SOC/2007/signals/libs/dataflow/example/signals/simple_example_components.hpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/signals/simple_example_components.hpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/signals/simple_example_components.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -35,13 +35,11 @@
boost::variate_generator<mt19937&, boost::normal_distribution<> > generator;
};
-// This will be our data output. We just need to make a function object,
-// and specify that it is a signals::call_consumer.
-class output
+// This will be our data output. We just make a function object, and specify
+// that it is a signal consumer by inheriting the signal::consumer class.
+class output : public signals::consumer<output>
{
public:
- typedef dataflow::signals::call_consumer<> dataflow_traits;
-
void operator()(double x)
{
std::cout << x << std::endl;
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/Jamfile
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/Jamfile 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -0,0 +1,14 @@
+# Copyright 2007 Stjepan Rajko.
+# Distributed under the Boost Software License, Version 1.0. (See
+# accompanying file LICENSE_1_0.txt or copy at
+# http://www.boost.org/LICENSE_1_0.txt)
+
+project dataflow/example/threadpool
+ : requirements
+ <include>../../../..
+ <include>.
+ <library>/boost/signals//boost_signals/<link>static
+ <define>BOOST_ALL_NO_LIB=1
+ ;
+
+exe threadpool_example : threadpool_example.cpp ;
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/.DS_Store
==============================================================================
Binary file. No diff available.
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/future/exception_ptr.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/future/exception_ptr.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -0,0 +1,38 @@
+#ifndef EXCEPTION_PTR_HPP_INCLUDED
+#define EXCEPTION_PTR_HPP_INCLUDED
+
+//By Peter Dimov, example code from propsal N2179
+// Copyright (c) 2007 Peter Dimov
+//
+// Distributed under the Boost Software License, Version 1.0.
+// http://www.boost.org/LICENSE_1_0.txt
+//
+#include <boost/shared_ptr.hpp>
+
+namespace boost {
+ namespace detail {
+
+class _exp_throwable;
+
+typedef boost::shared_ptr< _exp_throwable > exception_ptr;
+
+exception_ptr current_exception();
+void rethrow_exception( exception_ptr p );
+
+template< class E > exception_ptr copy_exception( E const & e )
+{
+ try
+ {
+ throw e;
+ }
+ catch( ... )
+ {
+ return current_exception();
+ }
+}
+
+} //namespace detail
+} //namespace boost
+
+#include "exception_ptr_impl.hpp"
+#endif // #ifndef EXCEPTION_PTR_HPP_INCLUDED
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/future/exception_ptr_impl.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/future/exception_ptr_impl.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -0,0 +1,127 @@
+//By Peter Dimov, example code from propsal N2179
+
+#include "exception_ptr.hpp"
+#include <exception>
+#include <stdexcept>
+#include <ios> //for ios_base::failure declaration
+#include <boost/thread/exceptions.hpp>
+#include <boost/version.hpp>
+#include "future.hpp"
+#include "future_exceptions.hpp"
+
+namespace boost {
+ namespace detail {
+ class _exp_throwable
+ {
+ protected:
+
+ virtual ~_exp_throwable() {}
+
+ public:
+
+ virtual void rethrow() = 0;
+ };
+
+ template< class E > class _exp_throwable_impl: public _exp_throwable
+ {
+ private:
+
+ E e_;
+
+ public:
+
+ _exp_throwable_impl()
+ {
+ }
+
+ template< class A > _exp_throwable_impl( A const & a ): e_( a )
+ {
+ }
+
+ void rethrow()
+ {
+ throw e_;
+ }
+ virtual ~_exp_throwable_impl() {}
+ };
+
+#define _CATCH_AND_RETURN( E ) catch( E const & e ) { return exception_ptr( new _exp_throwable_impl< E >( e ) ); }
+
+ inline static exception_ptr _exp_current_exception()
+ {
+ try
+ {
+ throw;
+ }
+
+ // runtime_error standard subclasses
+ _CATCH_AND_RETURN( std::overflow_error )
+ _CATCH_AND_RETURN( std::range_error )
+ _CATCH_AND_RETURN( std::underflow_error)
+
+ _CATCH_AND_RETURN( std::runtime_error )
+
+ // logic_error standard subclasses
+ _CATCH_AND_RETURN( std::domain_error )
+ _CATCH_AND_RETURN( std::invalid_argument )
+ _CATCH_AND_RETURN( std::length_error )
+ _CATCH_AND_RETURN( std::out_of_range )
+ _CATCH_AND_RETURN( std::logic_error )
+
+ _CATCH_AND_RETURN( std::bad_alloc )
+ _CATCH_AND_RETURN( std::bad_cast )
+ _CATCH_AND_RETURN( std::bad_typeid )
+ _CATCH_AND_RETURN( std::bad_exception )
+
+ // iostreams library
+ _CATCH_AND_RETURN( std::ios_base::failure )
+
+ // boost.thread exceptions
+ _CATCH_AND_RETURN( boost::lock_error )
+ _CATCH_AND_RETURN( boost::thread_resource_error )
+ _CATCH_AND_RETURN( boost::unsupported_thread_option )
+ _CATCH_AND_RETURN( boost::thread_permission_error )
+ _CATCH_AND_RETURN( boost::invalid_thread_argument )
+ _CATCH_AND_RETURN( boost::thread_exception )
+#if BOOST_VERSION >= 103500
+ _CATCH_AND_RETURN( boost::thread_interrupted ) // boost 1.5
+#endif
+ _CATCH_AND_RETURN( boost::broken_promise )
+ _CATCH_AND_RETURN( boost::future_already_set )
+ _CATCH_AND_RETURN( boost::future_cancel )
+
+ catch( std::exception const & e )
+ {
+ return exception_ptr( new _exp_throwable_impl<std::runtime_error>( e.what() ) );
+ }
+ catch( ... )
+ {
+ return exception_ptr( new _exp_throwable_impl<std::bad_exception>() );
+ }
+ }
+
+ // FIXME: can't have this global static as header-only. -braddock
+ //static exception_ptr s_bad_alloc( new _exp_throwable_impl< std::bad_alloc > );
+
+ inline exception_ptr current_exception()
+ {
+ try
+ {
+ return _exp_current_exception();
+ }
+ catch( std::bad_alloc const & )
+ {
+ // FIXME: see above note about static s_bad_alloc in header-only -braddock
+ //return s_bad_alloc;
+ return exception_ptr( new _exp_throwable_impl< std::bad_alloc > ); //PROBABLY WILL FAIL -braddock
+ }
+ }
+
+ inline void rethrow_exception( exception_ptr p )
+ {
+ p->rethrow();
+ }
+
+ } //detail
+} //namespace boost
+
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/future/future.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/future/future.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -0,0 +1,371 @@
+#ifndef BOOST_FUTURE_HPP
+#define BOOST_FUTURE_HPP 1
+
+// Copyright (c) 2007 Braddock Gaskill Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+// exception_ptr.hpp/cpp copyright Peter Dimov
+
+#include <boost/shared_ptr.hpp>
+#include <boost/thread/condition.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/xtime.hpp>
+#include <boost/utility/result_of.hpp>
+
+#include "exception_ptr.hpp"
+#include "future_exceptions.hpp"
+#include "future_detail.hpp"
+
+#ifdef __GNUC__
+#define DEPRECATED __attribute__ ((deprecated))
+#else
+#define DEPRECATED
+#endif
+
+namespace boost {
+
+ class callback_reference;
+
+ class untyped_promise {
+ public:
+ untyped_promise() {}
+ explicit untyped_promise(const shared_ptr<detail::future_impl> &fimpl) : f_(fimpl) {}
+ untyped_promise(const untyped_promise &other) : f_(other.f_) {}
+ untyped_promise& operator=(const untyped_promise& t) {f_ = t.f_; return *this;}
+ template<typename E> void set_exception( E const & e ) { // stores the exception e and transitions to ready()
+ // f_->set_exception(detail::copy_exception(e));
+ f_->set_exception(detail::exception_ptr(new detail::_exp_throwable_impl<E>(e)));
+ }
+
+ // Attempt's a 'throw', assuming there is an exception set
+ void set_current_exception() {
+ f_->set_exception(detail::current_exception());
+ }
+
+ void set_exception( const detail::exception_ptr & e) {
+ f_->set_exception(e);
+ }
+
+ // Add a cancel handler, which will be invoked if
+ // future<T>::cancel() is ever called
+ // Note: If the future is ALREADY canceled, then get_cancel_handler()
+ // will call the handler immediately.
+ // There is only one cancel_handler() for an underlying Future instance
+ void DEPRECATED set_cancel_handler( const boost::function<void (void)> &f ) {
+ f_->set_cancel_handler(f);
+ }
+
+ void DEPRECATED unset_cancel_handler() {
+ f_->set_cancel_handler(boost::function<void (void)>());
+ }
+
+ protected:
+ shared_ptr<detail::future_impl> f_;
+ };
+
+ template<typename R> class promise : public untyped_promise
+ {
+ public:
+ promise() : impl_(new detail::promise_impl<R>) {f_ = impl_->f_;}; // creates an empty future
+ promise(const promise& t) : untyped_promise(t.impl_->f_), impl_(t.impl_) {}
+ promise& operator=(const promise& t) {
+ impl_ = t.impl_;
+ untyped_promise::operator=(t);
+ return *this;
+ }
+
+ void set( R const & r ) { // sets the value r and transitions to ready()
+ impl_->f_->set_value(r, *impl_->value_);
+ }
+
+ void set_or_throw( R const & r) {
+ if (!impl_->f_->set_value(r, *impl_->value_))
+ throw future_already_set();
+ }
+
+ bool is_needed() {return impl_->f_->is_needed();}
+ void wait_until_needed() {return impl_->f_->wait_until_needed();}
+ shared_ptr<detail::future_impl> get_needed_future() {return impl_->f_->get_needed_future();}
+
+ void reset() {
+ impl_.reset();
+ f_.reset();
+ }
+ private:
+ template <typename Y> friend class future;
+ shared_ptr<detail::promise_impl<R> > impl_;
+ };
+
+ template<typename R> class future
+ {
+ public:
+ //future() : impl_(new detail::future_impl<R>) {}; // creates an empty future
+
+ // Default constructor will create a future, and will immediately set a
+ // broken_promise exception.
+ // A default-constructed future is only good for equality assignment to a
+ // valid future.
+ future() : impl_(), value_() {
+ promise<R> p;
+ impl_ = p.impl_->f_;
+ value_ = p.impl_->value_;
+ }
+
+ future(const future& t) : impl_(t.impl_), value_(t.value_) {}
+
+ template<typename T>
+ future(const future<T>& t) :
+ impl_(t.impl_),
+ value_(new detail::return_value_type_adaptor<R,T>(t.value_))
+ {}
+
+ future(const promise<R>& p) : impl_(p.impl_->f_), value_(p.impl_->value_) {}
+
+ template<typename T>
+ future(const promise<T>& p)
+ : impl_(p.impl_->f_),
+ value_(new detail::return_value_type_adaptor<R,T>(p.impl_->value_))
+ {}
+
+ protected:
+ // used by future<void> for typeless futures
+ future(const boost::shared_ptr<detail::future_impl> &impl) :
+ impl_(impl),
+ value_(new detail::return_value_real<R>(-999)) //value is never used
+ {}
+ public:
+
+ future& operator=(const future& t) {
+ impl_ = t.impl_;
+ value_ = t.value_;
+ return *this;
+ }
+
+ template<typename T>
+ future<R>& operator=(const future<T>& t) {
+ impl_ = t.impl_;
+ value_ = shared_ptr<detail::return_value_base<R> >(new detail::return_value_type_adaptor<R,T>(t.value_));
+ return *this;
+ }
+ ~future() {};
+
+ bool has_value() const { // newer Dimov proposal N2185
+ return impl_->has_value();
+ }
+
+ bool has_exception() const { // N2185
+ return impl_->has_exception();
+ }
+
+ bool ready() const { // queries whether the future contains a value or an exception
+ return impl_->ready();
+ }
+
+ void wait() { // wait for ready()
+ return impl_->wait();
+ }
+
+ bool timed_wait( boost::xtime const & abstime ) {
+ return impl_->timed_wait(abstime);
+ }
+
+ operator R() const { // N2185
+ return impl_->get(*value_);
+ }
+
+ R get() const {
+ return impl_->get(*value_);
+ }
+
+ R operator()() const { // waits for a value, then returns it
+ return impl_->get(*value_);
+ }
+
+ void set_needed() const {
+ impl_->set_needed();
+ }
+
+ // set future exception to boost::future_cancel, and call
+ // the cancel handler if one has been set by the user
+ void DEPRECATED cancel() {
+ impl_->cancel();
+ }
+
+ callback_reference add_callback(const boost::function<void (void)> &f) {
+ return impl_->add_callback(f);
+ }
+
+ // remove_callback will remove a registered callback
+ // Calling with an invalid callback_reference, or a
+ // callback_reference which has already been removed is
+ // undefined.
+ // This function is guaranteed not to return until the
+ // callback is removed.
+ // This can block if callbacks are already in progress
+ void remove_callback(callback_reference &ref) {
+ impl_->remove_callback(ref);
+ }
+ private:
+ template <typename Y> friend class future;
+ shared_ptr<detail::future_impl > impl_;
+ shared_ptr<detail::return_value_base<R> > value_;
+ };
+
+ // note, promise<int> must be public for friend to work in specialization (?)
+ template<> class promise<void> : public promise<int> {
+ private:
+ typedef promise<int> base_type;
+ public:
+ promise() : promise<int>() {}
+ promise(const promise& t) : promise<int>(t) {}
+ promise& operator=(const promise& t) {
+ base_type::operator=(t);
+ return *this;
+ }
+ using base_type::set_exception;
+ using base_type::set_cancel_handler;
+ using base_type::is_needed;
+ using base_type::wait_until_needed;
+ void set() {
+ base_type::set(0);
+ }
+ void set_or_throw() {
+ base_type::set_or_throw(0);
+ }
+ };
+
+ // void specialization, based on Peter Dimov's example
+ template<> class future<void> : private future<int> {
+ private:
+ typedef future<int> base_type;
+ public:
+ future() : base_type() {}
+ future(const future& t) : base_type((const future<int>&)t) {}
+ future(const promise<void> &p) : base_type((const promise<int>&)p) {}
+ future(const boost::shared_ptr<detail::future_impl> &impl) : base_type(impl) {}
+
+ template<typename T>
+ future(const future<T> &t) : base_type(t.impl_) {}
+
+ template<typename T>
+ future(const promise<T> &t) : base_type(t.impl_->f_) {}
+
+ future& operator=(const future& t) {
+ base_type::operator=((const future<int>&)t);
+ return *this;
+ }
+
+ template<typename T>
+ future& operator=(const future<T>& t) {
+ future<void> tmp(t);
+ base_type::operator=((const future<int>&)tmp);
+ return *this;
+ }
+
+ using base_type::has_value;
+ using base_type::has_exception;
+ using base_type::timed_wait;
+ using base_type::cancel;
+ using base_type::ready;
+ using base_type::wait;
+ using base_type::set_needed;
+ using base_type::add_callback;
+ using base_type::remove_callback;
+
+ void get() const {
+ base_type::get();
+ }
+ };
+
+ template <typename R > class promise< R& > : public promise<R*> {
+ private:
+ typedef promise< R* > base_type;
+ public:
+ promise() : promise<R*>() {}
+ promise(const promise& t) : promise<R*>(t) {}
+ promise& operator=(const promise& t) {
+ base_type::operator=(t);
+ return *this;
+ }
+ using base_type::set_exception;
+ using base_type::set_cancel_handler;
+ using base_type::is_needed;
+ using base_type::wait_until_needed;
+ void set(R &r) {
+ base_type::set(&r);
+ }
+ void set_or_throw(R &r) {
+ base_type::set_or_throw(&r);
+ }
+ };
+
+ // reference passing specialization, based on Peter Dimov's example
+ template<typename R > class future< R& >: private future< R* >
+ {
+ private:
+ typedef future< R* > base_type;
+ public:
+ future() : base_type() {}
+ future(const future& t) : base_type((const future<R*> &) t) {}
+ future(const promise<R*> &p) : base_type((const promise<R*> &) p) {}
+ future& operator=(const future& t) {
+ base_type::operator=(t);
+ return *this;
+ }
+ using base_type::has_value;
+ using base_type::has_exception;
+ using base_type::timed_wait;
+ using base_type::cancel;
+ using base_type::ready;
+ using base_type::wait;
+ using base_type::add_callback;
+ using base_type::set_needed;
+
+ operator R&() const {
+ return *base_type::get();
+ }
+
+ R& get() const {
+ return *base_type::get();
+ }
+ };
+
+ template<typename R> class future_wrapper
+ {
+ public:
+ future_wrapper(const boost::function<R (void)> &fn, const promise<R> &ft ) : fn_(fn), ft_(ft) {}; // stores fn and ft
+ void operator()() throw() { // executes fn() and places the outcome into ft
+ try {
+ ft_.set(fn_());
+ } catch (...) {
+ ft_.set_exception(detail::current_exception());
+ }
+ }
+ future<R> get_future() const {return future<R>(ft_);}
+ private:
+ boost::function<R (void)> fn_;
+ promise<R> ft_;
+ };
+
+ // void specialization
+ template<> class future_wrapper<void>
+ {
+ public:
+ future_wrapper(const boost::function<void (void)> &fn, const promise<void> &ft ) : ft_(ft), fn_(fn) {}; // stores fn and ft
+ void operator()() throw() { // executes fn() and places the outcome into ft
+ try {
+ fn_();
+ ft_.set();
+ } catch (...) {
+ ft_.set_exception(detail::current_exception());
+ }
+ }
+ future<void> get_future() const {return future<void>(ft_);}
+ private:
+ promise<void> ft_;
+ boost::function<void (void)> fn_;
+ };
+} // end of namespace boost
+
+#endif // BOOST_FUTURE_HPP
+
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/future/future_detail.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/future/future_detail.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -0,0 +1,261 @@
+#ifndef BOOST_FUTURE_DETAIL_HPP
+#define BOOST_FUTURE_DETAIL_HPP 1
+
+#include <boost/shared_ptr.hpp>
+#include <boost/thread/condition.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/xtime.hpp>
+#include <boost/function.hpp>
+#include <vector>
+#include <list>
+
+// DEBUG
+#include <iostream>
+
+namespace boost {
+ namespace detail {
+ class future_impl;
+ }
+
+ class callback_reference {
+ public:
+ callback_reference() : empty_(true) {}
+ callback_reference(const callback_reference &t) : empty_(t.empty_), iter_(t.iter_) {}
+ bool operator==(callback_reference const& t) {
+ if (t.empty_ != empty_) return false;
+ if (t.empty_ && empty_) return true;
+ return (t.iter_ == iter_);
+ }
+ private:
+ friend class detail::future_impl;
+ bool empty_;
+ std::list<boost::function<void (void)> >::iterator iter_;
+ };
+
+ template<typename R> class future;
+ template<typename R> class promise;
+ namespace detail {
+ template<typename T>
+ class return_value_base {
+ public:
+ virtual T get() const = 0;
+ virtual ~return_value_base() {}
+ };
+
+ template<typename T>
+ class return_value_real : public return_value_base<T> {
+ public:
+ return_value_real() {}
+ explicit return_value_real(const T &value)
+ : value_(new T(value)) {}
+
+ virtual T get() const {
+ return *value_;
+ }
+ void set(const T& value) {
+ value_ = shared_ptr<T>(new T(value));
+ }
+ virtual ~return_value_real() {}
+ private:
+ shared_ptr<T> value_;
+ };
+
+ template<typename T,typename U>
+ class return_value_type_adaptor : public return_value_base<T> {
+ public:
+ return_value_type_adaptor(const shared_ptr<return_value_base<U> > &real_value) : value_(real_value) {}
+ virtual T get() const {
+ return value_->get();
+ }
+ virtual ~return_value_type_adaptor() {}
+ private:
+ shared_ptr<return_value_base<U> > value_;
+ };
+
+ class future_impl {
+ public:
+ future_impl() : has_value_(false), has_exception_(false), is_canceled_(false), callbacks_in_progress_(false), is_needed_() {}
+ bool has_value() const {
+ boost::mutex::scoped_lock lck(mutex_);
+ return has_value_;
+ }
+ bool has_exception() const {
+ boost::mutex::scoped_lock lck(mutex_);
+ return has_exception_;
+ }
+ bool ready() const {
+ boost::mutex::scoped_lock lck(mutex_);
+ return (has_value_ || has_exception_);
+ }
+ void wait() {
+ set_needed();
+ boost::mutex::scoped_lock lck(mutex_);
+ while (!has_value_ && !has_exception_)
+ cond_.wait(lck);
+ return;
+ }
+ bool timed_wait( boost::xtime const & abstime ) {
+ set_needed();
+ mutex::scoped_lock lck(mutex_);
+ while (!has_value_ && !has_exception_)
+ if (!cond_.timed_wait(lck, abstime))
+ return false; /* timeout */
+ return true;
+ }
+ // Could return by-ref if set_value only called once
+ template <typename R>
+ R get(const return_value_base<R> &value) {
+ set_needed();
+ boost::mutex::scoped_lock lck(mutex_);
+ while (!has_value_ && !has_exception_)
+ cond_.wait(lck);
+ if (has_exception_)
+ rethrow_exception(exception_);
+ return value.get();
+ }
+
+ template <typename R>
+ bool set_value( const R &r, return_value_real<R> &value) {
+ boost::mutex::scoped_lock lck(mutex_);
+ if (has_value_ || has_exception_) return false;
+ value.set(r);
+ has_value_ = true;
+ notify(lck);
+ return true;
+ }
+
+ void set_exception( const exception_ptr &e) {
+ boost::mutex::scoped_lock lck(mutex_);
+ if (has_value_ || has_exception_) return;
+ exception_ = e;
+ has_exception_ = true;
+ notify(lck);
+ }
+ void cancel() {
+ boost::mutex::scoped_lock lck(mutex_);
+ if (has_value_ || has_exception_) return; // ignore
+ exception_ = detail::exception_ptr(new detail::_exp_throwable_impl<future_cancel>(future_cancel()));
+ has_exception_ = true;
+ is_canceled_ = true;
+ boost::function<void (void)> canhan = cancel_handler_;
+ notify(lck); //unlocks mutex, also deletes cancel_handler_
+ canhan();
+ }
+ void end_promise() {
+ boost::mutex::scoped_lock lck(mutex_);
+ if (has_value_ || has_exception_) return; // ignore
+ exception_ = detail::exception_ptr(new detail::_exp_throwable_impl<broken_promise>(broken_promise()));
+ has_exception_ = true;
+ notify(lck);
+ }
+ boost::callback_reference add_callback(const boost::function<void (void)> f) {
+ boost::mutex::scoped_lock lck(mutex_);
+ if (has_value_ || has_exception_) {
+ lck.unlock(); // never call a callback within the mutex
+ f(); // future already fulfilled. Call the callback immediately.
+ return callback_reference(); //return empty callback_reference
+ }
+ callbacks_.push_front(f);
+ boost::callback_reference cb_ref;
+ cb_ref.iter_ = callbacks_.begin();
+ cb_ref.empty_ = false;
+ return cb_ref;
+ }
+ void remove_callback(const boost::callback_reference &ref) {
+ boost::mutex::scoped_lock lck(mutex_);
+ if (callbacks_in_progress_) {
+ while (callbacks_in_progress_)
+ cond_.wait(lck);
+ //notify already removed all callbacks
+ return;
+ }
+ if (has_value_ || has_exception_) return; //ignore, already set, and automatically removed
+ callbacks_.erase(ref.iter_);
+ }
+ bool set_cancel_handler( const boost::function<void (void)> &f ) {
+ boost::mutex::scoped_lock lck(mutex_);
+ if (is_canceled_) {
+ lck.unlock();
+ f();
+ return false;
+ }
+ if (has_value_ || has_exception_ || callbacks_in_progress_)
+ return false; //ignore, future already set, cancel will never happen
+ cancel_handler_ = f;
+ return true;
+ }
+
+ shared_ptr<future_impl> get_needed_future() const {
+ boost::mutex::scoped_lock lck(mutex_);
+ if (!is_needed_) // allocate if desired
+ is_needed_.reset(new future_impl);
+ return is_needed_;
+ }
+
+ // as-needed functionality permits lazy eval and as-needed producer/consumer
+ void set_needed() {
+ shared_ptr<future_impl> n = get_needed_future();
+ n->set();
+ }
+
+ bool is_needed() const {
+ boost::mutex::scoped_lock lck(mutex_);
+ // if we are bound, we always say we are already needed
+ return ((is_needed_ && is_needed_->ready()) || has_value_ || has_exception_);
+ }
+ void wait_until_needed() const {
+ shared_ptr<future_impl> n = get_needed_future();
+ n->wait();
+ }
+ private:
+ typedef std::list<boost::function<void (void)> > func_list_t;
+ void notify(boost::mutex::scoped_lock &lck) {
+ callbacks_in_progress_ = true;
+ cond_.notify_all();
+ func_list_t cb(callbacks_);
+ lck.unlock();
+ func_list_t::iterator it;
+ for (it = cb.begin(); it != cb.end(); ++it)
+ (*it)();
+ // delete all callbacks - they will never be needed again
+ // that is also why this clear is thread-safe outside the mutex
+ callbacks_.clear();
+ cancel_handler_ = boost::function<void (void)>();
+ // the below is in case someone tried to remove while we are calling
+ boost::mutex::scoped_lock lck2(mutex_);
+ callbacks_in_progress_ = false;
+ cond_.notify_all();
+ }
+ bool set() { // a very simple set, used for as_needed_ future
+ boost::mutex::scoped_lock lck(mutex_);
+ if (has_value_ || has_exception_) return false;
+ has_value_ = true;
+ notify(lck);
+ return true;
+ }
+ bool has_value_;
+ bool has_exception_;
+ bool is_canceled_;
+ exception_ptr exception_;
+ mutable boost::mutex mutex_;
+ mutable boost::condition cond_;
+ func_list_t callbacks_;
+ bool callbacks_in_progress_;
+ mutable shared_ptr<future_impl> is_needed_;
+ boost::function<void (void)> cancel_handler_;
+ };
+
+ template<typename R>
+ class promise_impl {
+ public:
+ promise_impl() : f_(new future_impl), value_(new return_value_real<R>) {};
+ ~promise_impl() {
+ f_->end_promise();
+ }
+ shared_ptr<detail::future_impl> f_;
+ shared_ptr<return_value_real<R> > value_;
+ };
+
+ } // namespace detail
+} // end of namespace boost
+#endif // BOOST_FUTURE_DETAIL_HPP
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/future/future_exceptions.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/future/future_exceptions.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -0,0 +1,26 @@
+#ifndef FUTURE_EXCEPTIONS_HPP_
+#define FUTURE_EXCEPTIONS_HPP_
+
+#include <stdexcept>
+
+namespace boost {
+ struct broken_promise : public std::exception {
+ virtual const char *what() const throw () {
+ return "Broken Promise Exception";
+ }
+ };
+
+ struct future_already_set : public std::exception {
+ virtual const char *what() const throw () {
+ return "Future Already Set Exception";
+ }
+ };
+
+ struct future_cancel : public std::exception {
+ virtual const char *what() const throw () {
+ return "Future Canceled Exception";
+ }
+ };
+} // namespace
+
+#endif //FUTURE_EXCEPTIONS_HPP_
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/future/future_group.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/future/future_group.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -0,0 +1,164 @@
+#ifndef FUTURE_GROUP_HPP_
+#define FUTURE_GROUP_HPP_ 1
+
+#include "future.hpp"
+
+namespace boost {
+
+
+ class future_or_func_impl {
+ public:
+ future_or_func_impl(future<void> a, future<void> b) : a_(a), b_(b) {}
+ future_or_func_impl(const future_or_func_impl &f) : a_(f.a_), b_(f.b_), p_(f.p_) {}
+ struct future_or_failed : public std::exception { };
+ void operator()() {
+ boost::mutex::scoped_lock lck(mutex_);
+ try {
+ if (a_.ready()) {
+ a_.get(); //throw if you have any
+ p_.set();
+ } else if (b_.ready()) {
+ b_.get();
+ p_.set();
+ }
+ } catch (...) {
+ p_.set_exception(detail::current_exception());
+ }
+ }
+ future<void> a_,b_;
+ promise<void> p_;
+ optional<callback_reference> a_cb_;
+ optional<callback_reference> b_cb_;
+ boost::mutex mutex_;
+ };
+
+ class future_or_func {
+ public:
+ future_or_func(future<void> a, future<void> b) : impl_(new future_or_func_impl(a, b)) {}
+ future_or_func(const future_or_func &f) : impl_(new future_or_func_impl(*f.impl_)) {}
+ void operator()() {
+ (*impl_)();
+ }
+ promise<void> get_promise() {return impl_->p_;}
+ protected:
+ boost::shared_ptr<future_or_func_impl> impl_;
+ };
+
+ class future_and_func_impl {
+ public:
+ future_and_func_impl(future<void> a, future<void> b) : a_(a), b_(b), got_a_(false), got_b_(false) {}
+ future_and_func_impl(const future_and_func_impl &f) : a_(f.a_), b_(f.b_), p_(f.p_), got_a_(false), got_b_(false) {}
+ struct future_and_failed : public std::exception { };
+ void operator()() {
+ boost::mutex::scoped_lock lck(mutex_);
+ try {
+ if ((!got_a_) && a_.ready()) {
+ a_.get(); //throw if you have any
+ got_a_ = true;
+ }
+ if ((!got_b_) && b_.ready()) {
+ b_.get(); //throw if you have any
+ got_b_ = true;
+ }
+ } catch (...) {
+ p_.set_exception(detail::current_exception());
+ }
+ if (got_a_ && got_b_) {
+ p_.set(); //completed
+ }
+ }
+ future<void> a_,b_;
+ promise<void> p_;
+ bool got_a_, got_b_;
+ boost::mutex mutex_;
+ };
+
+ class future_and_func {
+ public:
+ future_and_func(future<void> a, future<void> b) : impl_(new future_and_func_impl(a, b)) {}
+ future_and_func(const future_and_func &f) : impl_(new future_and_func_impl(*f.impl_)) {}
+ void operator()() {
+ (*impl_)();
+ }
+ promise<void> get_promise() {return impl_->p_;}
+ protected:
+ boost::shared_ptr<future_and_func_impl> impl_;
+ };
+
+ template<typename T>
+ class comb {
+ public:
+ comb(const comb &t) : f_(t.f_) {}
+ comb(const future<T> &f) : f_(f) {}
+ comb(const promise<T> &p) : f_(p) {}
+ operator future<T> () const {
+ return f_;
+ }
+ const future<T> &get() const {
+ return f_;
+ }
+ private:
+ future<T> f_;
+ };
+
+ template<typename T, typename U>
+ future<void> future_or(future<T> &a, future<U> &b) {
+ //future_or_func fa(future<void>(a), future<void>(b));
+ future<void> va(a);
+ future<void> vb(b);
+ future_or_func fa(va,vb);
+
+ a.add_callback(fa);
+ b.add_callback(fa);
+ fa(); // check if already satisfied
+ return fa.get_promise();
+ }
+
+ template<typename T, typename U>
+ comb<void> operator||(const comb<T> &a, const comb<U> &b) {
+ future<T> fa = a.get();
+ future<U> fb = b.get();
+ return future_or(fa, fb);
+ }
+
+ template<typename T, typename U>
+ future<void> future_and(future<T> &a, future<U> &b) {
+ //future_or_func fa(future<void>(a), future<void>(b));
+ future<void> va(a);
+ future<void> vb(b);
+ future_and_func fa(va,vb);
+
+ a.add_callback(fa);
+ b.add_callback(fa);
+ fa(); // check if already satisfied
+ return fa.get_promise();
+ }
+
+ template<typename T, typename U>
+ comb<void> operator&&(const comb<T> &a, const comb<U> &b) {
+ future<T> fa = a.get();
+ future<U> fb = b.get();
+ return future_and(fa, fb);
+ }
+
+ class future_group {
+ public:
+ future_group(const future<void> &f) : fut_(f) {}
+ operator future<void> () {
+ return fut_;
+ }
+ private:
+ future<void> fut_;
+ };
+
+ template<typename T>
+ comb<T> op(const future<T> &x) {
+ return comb<T>(x);
+ }
+
+ template<typename T>
+ comb<T> op(const promise<T> &x) {
+ return comb<T>(x);
+ }
+} //namespace boost
+#endif // FUTURE_GROUP_HPP_
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/future/future_stream.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/future/future_stream.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -0,0 +1,224 @@
+#ifndef FUTURE_STREAM_HPP_
+#define FUTURE_STREAM_HPP_ 1
+
+#include "future.hpp"
+#include <boost/iterator/iterator_facade.hpp>
+#include <boost/optional.hpp>
+
+namespace boost {
+ class end_of_stream : public std::runtime_error {
+ public:
+ end_of_stream() : std::runtime_error("End of Future Stream") {}
+ };
+
+ template <typename T>
+ class future_stream {
+ protected:
+ class item;
+ public:
+ class iterator;
+ typedef boost::shared_ptr<item> item_p;
+ protected:
+ struct item {
+ item(const T &value, const promise<item_p> &p) : value_(value), next_(p) {}
+ T value_;
+ future<item_p> next_;
+ };
+
+ struct stream_impl {
+ stream_impl(const promise<item_p> &p) : head_(p) {}
+ future<item_p> head_; //the promise of the next element
+ mutable boost::mutex mutex_;
+ };
+#if 1
+ struct hold_impl {
+ hold_impl(const future<item_p> &itm) : item_(itm) {}
+ boost::optional<future<item_p> > item_;
+ mutable boost::mutex mutex_;
+ };
+#endif
+ public:
+ class iterator : public boost::iterator_facade<
+ iterator, T, boost::forward_traversal_tag>
+ {
+ public:
+ iterator() : item_(promise<item_p>()) {}
+ iterator(const iterator &t) : item_(t.item_) {}
+ iterator(const future<item_p> &head) : item_(head) {}
+
+ T recv() {
+ if (!item_.get()) throw (end_of_stream());
+ T tmp = item_.get()->value_;
+ item_ = item_.get()->next_;
+ return tmp;
+ }
+
+ bool ready() const {
+ return item_.ready();
+ }
+
+ bool closed() const {
+ if (!item_.ready()) return false;
+ if (item_.has_exception()) return true;
+ if (!item_.get()) return true; // closed
+ return false;
+ }
+
+ void set_needed() {
+ item_.set_needed();
+ }
+
+ void reset() {
+ item_ = promise<item_p>(); // will set broken promise exception
+ }
+
+ future<item_p> next() {
+ item_p ip = item_.get();
+ if (!ip) throw (end_of_stream());
+ return item_.get()->next_;
+ }
+ private:
+ void increment() { item_ = item_.get()->next_; }
+
+ // NOTE this blocks!
+ bool equal(iterator const& t) const {
+ return (item_.get() == t.item_.get());
+ }
+
+ T& dereference() const {
+ if (!item_.get()) { //DEBUG!
+ std::cout << "dref got eos, item_.get() = " << item_.get() << "\n";
+ throw (end_of_stream());
+ }
+ return item_.get()->value_;
+ }
+
+ future<item_p> item_;
+ friend class boost::iterator_core_access;
+ };
+
+#if 1
+ class hold {
+ public:
+ class hold_already_released : public std::exception {
+ virtual char *what() {
+ return "Attempt to release future_stream::hold more than once";
+ }
+ };
+ hold(const hold &t) : impl_(t.impl_) {}
+ hold(const future<item_p> &head) : impl_(new hold_impl(head)) {}
+ iterator release() {
+ boost::mutex::scoped_lock lck(impl_->mutex_);
+ if (!impl_->item_) throw hold_already_released();
+ iterator iter(impl_->item_.get());
+ impl_->item_.reset();
+ return iter;
+ }
+ private:
+ boost::shared_ptr<hold_impl> impl_;
+ };
+#endif
+
+ future_stream(const future_stream &ps) : impl_(ps.impl_) {}
+
+ future_stream &operator=(const future_stream& t) {
+ impl_ = t.impl_;
+ return *this;
+ }
+
+ future<item_p> head() {
+ boost::mutex::scoped_lock lck(impl_->mutex_);
+ return impl_->head_; //iterator(impl_->head_);
+ }
+
+ void reset() {
+ impl_.reset();
+ }
+
+#if 1
+ hold take_hold() {
+ boost::mutex::scoped_lock lck(impl_->mutex_);
+ return hold(impl_->head_);
+ }
+#endif
+
+ iterator begin() {
+ return head();
+ }
+
+ iterator end() {
+ boost::promise<item_p> p;
+ p.set(item_p());
+ return iterator(p); // empty iterator
+ }
+ protected:
+ future_stream() : impl_() {}
+ boost::shared_ptr<stream_impl> impl_;
+ };
+
+ template<typename T>
+ class promise_stream : public future_stream<T> {
+ private:
+ typedef future_stream<T> base_type;
+ typedef typename base_type::item_p item_p;
+ typedef typename base_type::item item;
+ typedef typename base_type::stream_impl stream_impl;
+ using base_type::impl_;
+ public:
+ typedef typename base_type::iterator iterator;
+ promise_stream()
+ : future_stream<T>(), prom_(new promise<item_p>)
+ {
+ impl_.reset(new stream_impl(*prom_));
+ }
+
+ promise_stream(const promise_stream<T> &ps)
+ : future_stream<T>(ps), prom_(ps.prom_)
+ {}
+
+ promise_stream &operator=(const promise_stream& t) {
+ base_type::operator=((base_type&)t);
+ prom_ = t.prom_;
+ }
+
+ void send(const T &value) {
+ boost::mutex::scoped_lock lck(impl_->mutex_);
+ promise<item_p> p; //create next promise
+ prom_->set(item_p(new item(value, p))); //fulfill current promise
+ impl_->head_ = p; //set the head
+ (*prom_) = p; //remember our next promise
+ }
+
+ void reset() {
+ base_type::reset();
+ prom_.reset();
+ }
+
+ void close() {
+ boost::mutex::scoped_lock lck(impl_->mutex_);
+ prom_->set(item_p()); //fulfill current promise with 0x0
+ }
+
+ void wait_until_needed() {
+ boost::mutex::scoped_lock lck(impl_->mutex_);
+ promise<item_p> p = *prom_;
+ lck.unlock();
+ p.wait_until_needed();
+ }
+
+ bool is_needed() {
+ boost::mutex::scoped_lock lck(impl_->mutex_);
+ return prom_->is_needed();
+ }
+
+ future<void> get_needed_future() {
+ boost::mutex::scoped_lock lck(impl_->mutex_);
+ return prom_->get_needed_future();
+ }
+ private:
+ boost::shared_ptr<promise<item_p> > prom_;
+ };
+
+} //namespace boost
+
+#endif //FUTURE_STREAM_HPP_
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/adaptive.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/adaptive.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -0,0 +1,909 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_ADAPTIVE_H
+#define BOOST_TP_ADAPTIVE_H
+
+#include <algorithm>
+#include <cstddef>
+#include <functional>
+#include <vector>
+
+#include <boost/assert.hpp>
+#include <boost/bind.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/foreach.hpp>
+#include <boost/function.hpp>
+#include <boost/future/future.hpp>
+#include <boost/multi_index_container.hpp>
+#include <boost/multi_index/mem_fun.hpp>
+#include <boost/multi_index/ordered_index.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/thread.hpp>
+#include <boost/thread/locks.hpp>
+#include <boost/thread/shared_mutex.hpp>
+#include <boost/utility.hpp>
+
+#include <boost/tp/adjustment.hpp>
+#include <boost/tp/detail/interrupter.hpp>
+#include <boost/tp/detail/lock.hpp>
+#include <boost/tp/detail/lock_guard.hpp>
+#include <boost/tp/fifo.hpp>
+#include <boost/tp/poolsize.hpp>
+#include <boost/tp/task.hpp>
+#include <boost/tp/unbounded_channel.hpp>
+#include <boost/tp/watermark.hpp>
+
+namespace boost { namespace tp
+{
+struct keep_core_size {};
+struct less_than_core_size {};
+
+template<
+ typename AdjustmentPolicy = depend_on_pending,
+ typename RecreatePolicy = less_than_core_size
+>
+struct adaptive
+{
+ template<
+ typename Channel = unbounded_channel< fifo >
+ >
+ class impl
+ : private noncopyable
+ {
+ private:
+ typedef AdjustmentPolicy adjustment_policy;
+ typedef function< void() > callable;
+ typedef Channel channel;
+ typedef typename channel::item channel_item;
+ typedef typename channel::iterator channel_iterator;
+ typedef RecreatePolicy recreate_policy;
+
+ enum state
+ {
+ active_state,
+ terminateing_state,
+ terminated_state
+ };
+
+ class worker
+ {
+ public:
+ enum state
+ {
+ running_state,
+ terminate_state
+ };
+
+ private:
+ shared_ptr< thread > thrd_;
+ state state_;
+
+ public:
+ worker( callable const& ca)
+ :
+ thrd_( new thread( ca) ),
+ state_( running_state)
+ { BOOST_ASSERT( ! ca.empty() ); }
+
+ const shared_ptr< thread > thrd() const
+ { return thrd_; }
+
+ const thread::id get_id() const
+ { return thrd_->get_id(); }
+
+ void join() const
+ { thrd_->join(); }
+
+ void interrupt() const
+ { thrd_->interrupt(); }
+
+ state current_state() const
+ { return state_; }
+
+ void current_state( state value)
+ { state_ = value; }
+ };
+
+ struct id_idx_tag {};
+ struct state_idx_tag {};
+
+ typedef multi_index::multi_index_container<
+ worker,
+ multi_index::indexed_by<
+ multi_index::ordered_unique<
+ multi_index::tag< id_idx_tag >,
+ multi_index::const_mem_fun<
+ worker,
+ const thread::id,
+ & worker::get_id
+ >
+ >,
+ multi_index::ordered_non_unique<
+ multi_index::tag< state_idx_tag >,
+ multi_index::const_mem_fun<
+ worker,
+ typename worker::state,
+ & worker::current_state
+ >
+ >
+ >
+ > worker_list;
+
+ typedef typename worker_list::template index<
+ id_idx_tag >::type id_idx;
+ typedef typename worker_list::template index<
+ state_idx_tag >::type state_idx;
+
+ class active_guard
+ {
+ private:
+ std::size_t & active_;
+ shared_mutex & mtx_worker_;
+
+ public:
+ active_guard(
+ std::size_t & active__,
+ shared_mutex & mtx_worker__)
+ : active_( active__), mtx_worker_( mtx_worker__)
+ {
+ unique_lock< shared_mutex > lk( mtx_worker_);
+ BOOST_ASSERT( active_ >= 0);
+ ++active_;
+ }
+
+ ~active_guard()
+ {
+ unique_lock< shared_mutex > lk( mtx_worker_);
+ --active_;
+ BOOST_ASSERT( active_ >= 0);
+ }
+ };
+
+ class change_state
+ {
+ private:
+ typename worker::state state_;
+
+ public:
+ change_state( typename worker::state value)
+ : state_( value)
+ {}
+
+ void operator()( worker & w)
+ { w.current_state( state_); }
+ };
+
+ class state_guard
+ : private noncopyable
+ {
+ private:
+ shared_mutex & mtx_worker_;
+ id_idx & id_idx_;
+ state_idx & state_idx_;
+
+ public:
+ state_guard(
+ shared_mutex & mtx,
+ id_idx & id_idx__,
+ state_idx & state_idx__)
+ :
+ mtx_worker_( mtx),
+ id_idx_( id_idx__),
+ state_idx_( state_idx__)
+ {}
+
+ ~state_guard()
+ {
+ upgrade_lock< shared_mutex > lk1( mtx_worker_);
+ std::for_each(
+ state_idx_.lower_bound( worker::terminate_state),
+ state_idx_.upper_bound( worker::terminate_state),
+ std::mem_fun_ref( & worker::join) );
+ unique_lock< shared_mutex > lk2( lk1.move() );
+ state_idx_.erase( worker::terminate_state);
+ typename id_idx::iterator i( id_idx_.find( this_thread::get_id() ) );
+ if ( i != id_idx_.end() ) id_idx_.modify( i, change_state( worker::terminate_state) );
+ }
+ };
+
+ std::size_t core_size_;
+ std::size_t max_size_;
+ posix_time::time_duration keep_alive_;
+ worker_list worker_;
+ id_idx & iidx_;
+ state_idx & sidx_;
+ std::size_t active_worker_;
+ shared_mutex mtx_worker_;
+ state state_;
+ shared_mutex mtx_state_;
+ channel channel_;
+ adjustment_policy adjust_pol_;
+
+ void entry_()
+ {
+ shared_lock< shared_mutex > lk( mtx_worker_);
+ id_idx & iidx( worker_.get< id_idx_tag >() );
+ typename id_idx::iterator i( iidx.end() );
+ typename id_idx::iterator e( iidx.end() );
+ lk.unlock();
+ while ( i == e)
+ {
+ lk.lock();
+ i = iidx.find( this_thread::get_id() );
+ lk.unlock();
+ }
+ BOOST_ASSERT( i != e);
+
+ shared_ptr< thread > thrd( i->thrd() );
+ BOOST_ASSERT( thrd);
+ callable ca;
+ detail::interrupter intr;
+ while ( channel_.take( ca, intr, keep_alive_) )
+ {
+ BOOST_ASSERT( ! ca.empty() );
+ active_guard guard(
+ active_worker_,
+ mtx_worker_);
+ shared_ptr< void > ig(
+ static_cast< void * >( 0),
+ bind(
+ & detail::interrupter::reset,
+ intr) );
+ intr.set( thrd);
+ ca();
+ ca.clear();
+ BOOST_ASSERT( ca.empty() );
+ }
+ }
+
+ void entry_( keep_core_size)
+ {
+ entry_();
+ detail::lockguard< shared_mutex, shared_mutex > lk( mtx_state_, mtx_worker_);
+ if ( ! terminateing_() && ! terminated_() && size_() < core_size_)
+ create_worker_();
+ }
+
+ void entry_( less_than_core_size)
+ { entry_(); }
+
+ void create_worker_()
+ {
+ BOOST_ASSERT( ! terminateing_() && ! terminated_() );
+ unique_lock< shared_mutex > lk( mtx_worker_);
+ worker_.insert(
+ worker(
+ bind(
+ ( void ( impl::*) ( recreate_policy) ) & impl::entry_,
+ this,
+ recreate_policy() ) ) );
+ lk.unlock();
+ }
+
+ std::size_t active_() const
+ { return active_worker_; }
+
+ std::size_t idle_() const
+ { return size_() - active_(); }
+
+ std::size_t size_() const
+ {
+ state_idx const& idx( worker_.get< state_idx_tag >() );
+ return std::distance(
+ idx.find( worker::running_state),
+ idx.end() );
+ }
+
+ void adjust_pool_()
+ {
+ BOOST_ASSERT( ! terminateing_() && ! terminated_() );
+ std::size_t s( size() );
+ if ( s < max_size_ &&
+ adjust_pol_(
+ poolsize( s),
+ core_poolsize( core_size() ),
+ max_poolsize( max_size_),
+ channel_.size(),
+ channel_.full() ) )
+ create_worker_();
+ BOOST_ASSERT( size() <= max_size_ );
+ }
+
+ bool terminated_() const
+ { return state_ == terminated_state; }
+
+ bool terminateing_() const
+ { return state_ == terminateing_state; }
+
+ public:
+ explicit impl(
+ core_poolsize const& core_size,
+ max_poolsize const& max_size,
+ posix_time::time_duration const& keep_alive,
+ adjustment_policy const& adjust_pol = adjustment_policy() )
+ :
+ core_size_( core_size),
+ max_size_( max_size),
+ keep_alive_( keep_alive),
+ worker_(),
+ iidx_( worker_.get< id_idx_tag >() ),
+ sidx_( worker_.get< state_idx_tag >() ),
+ active_worker_( 0),
+ mtx_worker_(),
+ state_( active_state),
+ mtx_state_(),
+ channel_(),
+ adjust_pol_( adjust_pol)
+ {
+ if ( core_size > max_size_)
+ throw invalid_poolsize("core poolsize must be less than or equal to max poolsize");
+ if ( keep_alive_.is_special() )
+ throw invalid_timeout("keep_alive is not valid");
+ if ( keep_alive_.is_negative() )
+ throw invalid_timeout("keep_alive is negative");
+ channel_.activate();
+ }
+
+ explicit impl(
+ core_poolsize const& core_size,
+ max_poolsize const& max_size,
+ posix_time::time_duration const& keep_alive,
+ high_watermark const& hwm,
+ low_watermark const& lwm,
+ adjustment_policy const& adjust_pol = adjustment_policy() )
+ :
+ core_size_( core_size),
+ max_size_( max_size),
+ keep_alive_( keep_alive),
+ worker_(),
+ iidx_( worker_.get< id_idx_tag >() ),
+ sidx_( worker_.get< state_idx_tag >() ),
+ active_worker_( 0),
+ mtx_worker_(),
+ state_( active_state),
+ mtx_state_(),
+ channel_(
+ hwm,
+ lwm),
+ adjust_pol_( adjust_pol)
+ {
+ if ( core_size > max_size_)
+ throw invalid_poolsize("core poolsize must be less than or equal to max poolsize");
+ if ( keep_alive_.is_special() )
+ throw invalid_timeout("keep_alive is not valid");
+ if ( keep_alive_.is_negative() )
+ throw invalid_timeout("keep_alive is negative");
+ channel_.activate();
+ }
+
+ explicit impl(
+ preallocate const& pre,
+ core_poolsize const& core_size,
+ max_poolsize const& max_size,
+ posix_time::time_duration const& keep_alive,
+ adjustment_policy const& adjust_pol = adjustment_policy() )
+ :
+ core_size_( core_size),
+ max_size_( max_size),
+ keep_alive_( keep_alive),
+ worker_(),
+ iidx_( worker_.get< id_idx_tag >() ),
+ sidx_( worker_.get< state_idx_tag >() ),
+ active_worker_( 0),
+ mtx_worker_(),
+ state_( active_state),
+ mtx_state_(),
+ channel_(),
+ adjust_pol_( adjust_pol)
+ {
+ if ( core_size > max_size_)
+ throw invalid_poolsize("core poolsize must be less than or equal to max poolsize");
+ if ( pre > max_size_)
+ throw invalid_poolsize("preallocated poolsize must be less than or equal to max poolsize");
+ if ( keep_alive_.is_special() )
+ throw invalid_timeout("keep_alive is not valid");
+ if ( keep_alive_.is_negative() )
+ throw invalid_timeout("keep_alive is negative");
+ channel_.activate();
+ for ( std::size_t i( 0); i < pre; ++i)
+ create_worker_();
+ }
+
+ explicit impl(
+ preallocate const& pre,
+ core_poolsize const& core_size,
+ max_poolsize const& max_size,
+ posix_time::time_duration const& keep_alive,
+ high_watermark const& hwm,
+ low_watermark const& lwm,
+ adjustment_policy const& adjust_pol = adjustment_policy() )
+ :
+ core_size_( core_size),
+ max_size_( max_size),
+ keep_alive_( keep_alive),
+ worker_(),
+ iidx_( worker_.get< id_idx_tag >() ),
+ sidx_( worker_.get< state_idx_tag >() ),
+ active_worker_( 0),
+ mtx_worker_(),
+ state_( active_state),
+ mtx_state_(),
+ channel_(
+ hwm,
+ lwm),
+ adjust_pol_( adjust_pol)
+ {
+ if ( core_size > max_size_)
+ throw invalid_poolsize("core poolsize must be less than or equal to max poolsize");
+ if ( pre > max_size_)
+ throw invalid_poolsize("preallocated poolsize must be less than or equal to max poolsize");
+ if ( keep_alive_.is_special() )
+ throw invalid_timeout("keep_alive is not valid");
+ if ( keep_alive_.is_negative() )
+ throw invalid_timeout("keep_alive is negative");
+ channel_.activate();
+ for ( std::size_t i( 0); i < pre; ++i)
+ create_worker_();
+ }
+
+ ~impl()
+ { shutdown(); }
+
+ std::size_t active()
+ {
+ shared_lock< shared_mutex > lk( mtx_worker_);
+ return active_();
+ }
+
+ std::size_t idle()
+ {
+ shared_lock< shared_mutex > lk( mtx_worker_);
+ return idle_();
+ }
+
+ const posix_time::time_duration keep_alive()
+ {
+ shared_lock< shared_mutex > lk( mtx_worker_);
+ return keep_alive_;
+ }
+
+ void keep_alive( posix_time::time_duration const& value)
+ {
+ unique_lock< shared_mutex > lk( mtx_worker_);
+ return keep_alive_ = value;
+ }
+
+ void shutdown()
+ {
+ unique_lock< shared_mutex > lk1( mtx_state_);
+ if ( terminateing_() || terminated_() ) return;
+ state_ = terminateing_state;
+ lk1.unlock();
+
+ unique_lock< shared_mutex > lk2( mtx_worker_);
+ channel_.deactivate();
+ std::vector< worker > lst(
+ worker_.begin(),
+ worker_.end() );
+ worker_.clear();
+ lk2.unlock();
+
+ std::for_each(
+ lst.begin(),
+ lst.end(),
+ std::mem_fun_ref( & worker::join) );
+
+ lk1.lock();
+ state_ = terminated_state;
+ lk1.unlock();
+ }
+
+ const std::vector< callable > shutdown_now()
+ {
+ unique_lock< shared_mutex > lk1( mtx_state_);
+ if ( terminateing_() || terminated_() ) return std::vector< callable >();
+ state_ = terminateing_state;
+ lk1.unlock();
+
+ unique_lock< shared_mutex > lk2( mtx_worker_);
+ channel_.deactivate();
+ std::for_each(
+ worker_.begin(),
+ worker_.end(),
+ std::mem_fun_ref( & worker::interrupt) );
+ std::vector< worker > lst(
+ worker_.begin(),
+ worker_.end() );
+ worker_.clear();
+ std::vector< callable > drain( channel_.drain() );
+ lk2.unlock();
+
+ std::for_each(
+ lst.begin(),
+ lst.end(),
+ std::mem_fun_ref( & worker::join) );
+
+ lk1.lock();
+ state_ = terminated_state;
+ lk1.unlock();
+
+ return drain;
+ }
+
+ std::size_t size()
+ {
+ shared_lock< shared_mutex > lk( mtx_worker_);
+ return size_();
+ }
+
+ std::size_t core_size()
+ {
+ shared_lock< shared_mutex > lk( mtx_worker_);
+ return core_size_;
+ }
+
+ void core_size( std::size_t size)
+ {
+ shared_lock< shared_mutex > lk( mtx_worker_);
+ core_size_ = size;
+ }
+
+ std::size_t max_size()
+ {
+ shared_lock< shared_mutex > lk( mtx_worker_);
+ return max_size_;
+ }
+
+ bool terminated()
+ {
+ shared_lock< shared_mutex > lk( mtx_state_);
+ return terminated_();
+ }
+
+ bool terminateing()
+ {
+ shared_lock< shared_mutex > lk( mtx_state_);
+ return terminateing_();
+ }
+
+ void clear()
+ { channel_.clear(); }
+
+ bool empty()
+ { return channel_.empty(); }
+
+ std::size_t pending()
+ { return channel_.size(); }
+
+ const std::size_t upper_bound()
+ { return channel_.upper_bound(); }
+
+ void upper_bound( std::size_t hwm)
+ { return channel_.upper_bound( hwm); }
+
+ const std::size_t lower_bound()
+ { return channel_.lower_bound(); }
+
+ void lower_bound( std::size_t lwm)
+ { return channel_.lower_bound( lwm); }
+
+ template< typename Act >
+ task< typename result_of< Act() >::type > submit( Act const& act)
+ {
+ shared_lock< shared_mutex > lk1( mtx_state_);
+ if ( terminated_() )
+ throw task_rejected("pool ist terminated");
+ if ( terminateing_() )
+ throw task_rejected("pool ist terminateing");
+
+ adjust_pool_();
+
+ typedef typename result_of< Act() >::type R;
+ detail::interrupter intr;
+ promise< R > prom;
+ channel_item itm(
+ future_wrapper< R >(
+ act,
+ prom),
+ intr);
+ channel_.put( itm);
+ return task< R >( prom, intr);
+ }
+
+ template<
+ typename Act,
+ typename Attr
+ >
+ task< typename result_of< Act() >::type > submit(
+ Act const& act,
+ Attr const& attr)
+ {
+ shared_lock< shared_mutex > lk1( mtx_state_);
+ if ( terminated_() )
+ throw task_rejected("pool ist terminated");
+ if ( terminateing_() )
+ throw task_rejected("pool ist terminateing");
+
+ adjust_pool_();
+
+ typedef typename result_of< Act() >::type R;
+ detail::interrupter intr;
+ promise< R > prom;
+ channel_item itm(
+ future_wrapper< R >(
+ act,
+ prom),
+ attr,
+ intr);
+ channel_.put( itm);
+ return task< R >( prom, intr);
+ }
+
+ template<
+ typename Act,
+ typename T
+ >
+ task< typename result_of< Act() >::type > chained_submit(
+ Act const& act,
+ task< T > & tsk)
+ {
+ shared_lock< shared_mutex > lk1( mtx_state_);
+ if ( terminated_() )
+ throw task_rejected("pool ist terminated");
+ if ( terminateing_() )
+ throw task_rejected("pool ist terminateing");
+
+ adjust_pool_();
+
+ typedef typename result_of< Act() >::type R;
+ detail::interrupter intr;
+ promise< R > prom;
+ channel_item itm(
+ future_wrapper< R >(
+ act,
+ prom),
+ intr);
+ tsk.get_future().add_callback(
+ bind(
+ ( channel_iterator ( channel::*)( channel_item const&) ) & channel::put,
+ ref( channel_),
+ itm) );
+ return task< R >( prom, intr);
+ }
+
+ template<
+ typename Act,
+ typename Attr,
+ typename T
+ >
+ task< typename result_of< Act() >::type > chained_submit(
+ Act const& act,
+ Attr const& attr,
+ task< T > & tsk)
+ {
+ shared_lock< shared_mutex > lk1( mtx_state_);
+ if ( terminated_() )
+ throw task_rejected("pool ist terminated");
+ if ( terminateing_() )
+ throw task_rejected("pool ist terminateing");
+
+ adjust_pool_();
+
+ typedef typename result_of< Act() >::type R;
+ detail::interrupter intr;
+ promise< R > prom;
+ channel_item itm(
+ future_wrapper< R >(
+ act,
+ prom),
+ attr,
+ intr);
+ tsk.get_future().add_callback(
+ bind(
+ ( channel_iterator ( channel::*)( channel_item const&) ) & channel::put,
+ ref( channel_),
+ itm) );
+ return task< R >( prom, intr);
+ }
+
+ template< typename Act >
+ task< typename result_of< Act() >::type > lazy_submit( Act const& act)
+ {
+ shared_lock< shared_mutex > lk1( mtx_state_);
+ if ( terminated_() )
+ throw task_rejected("pool ist terminated");
+ if ( terminateing_() )
+ throw task_rejected("pool ist terminateing");
+
+ adjust_pool_();
+
+ typedef typename result_of< Act() >::type R;
+ detail::interrupter intr;
+ promise< R > prom;
+ future< void > fg( prom.get_needed_future() );
+ channel_item itm(
+ future_wrapper< R >(
+ act,
+ prom),
+ intr);
+ fg.add_callback(
+ bind(
+ ( channel_iterator ( channel::*)( channel_item const&) ) & channel::put,
+ ref( channel_),
+ itm) );
+ return task< R >( prom, intr);
+ }
+
+ template<
+ typename Act,
+ typename Attr
+ >
+ task< typename result_of< Act() >::type > lazy_submit(
+ Act const& act,
+ Attr const& attr)
+ {
+ shared_lock< shared_mutex > lk1( mtx_state_);
+ if ( terminated_() )
+ throw task_rejected("pool ist terminated");
+ if ( terminateing_() )
+ throw task_rejected("pool ist terminateing");
+
+ adjust_pool_();
+
+ typedef typename result_of< Act() >::type R;
+ detail::interrupter intr;
+ promise< R > prom;
+ future< void > fg( prom.get_needed_future() );
+ channel_item itm(
+ future_wrapper< R >(
+ act,
+ prom),
+ attr,
+ intr);
+ fg.add_callback(
+ bind(
+ ( channel_iterator ( channel::*)( channel_item const&) ) & channel::put,
+ ref( channel_),
+ itm) );
+ return task< R >( prom, intr);
+ }
+
+ template<
+ typename Act,
+ typename Tm
+ >
+ task< typename result_of< Act() >::type > timed_submit(
+ Act const& act,
+ Tm const& tm)
+ {
+ shared_lock< shared_mutex > lk1( mtx_state_);
+ if ( terminated_() )
+ throw task_rejected("pool ist terminated");
+ if ( terminateing_() )
+ throw task_rejected("pool ist terminateing");
+
+ adjust_pool_();
+
+ typedef typename result_of< Act() >::type R;
+ detail::interrupter intr;
+ promise< R > prom;
+ channel_item itm(
+ future_wrapper< R >(
+ act,
+ prom),
+ intr);
+ channel_.put( itm, tm);
+ return task< R >( prom, intr);
+ }
+
+ template<
+ typename Act,
+ typename Attr,
+ typename Tm
+ >
+ task< typename result_of< Act() >::type > timed_submit(
+ Act const& act,
+ Attr const& attr,
+ Tm const& tm)
+ {
+ shared_lock< shared_mutex > lk1( mtx_state_);
+ if ( terminated_() )
+ throw task_rejected("pool ist terminated");
+ if ( terminateing_() )
+ throw task_rejected("pool ist terminateing");
+
+ adjust_pool_();
+
+ typedef typename result_of< Act() >::type R;
+ detail::interrupter intr;
+ promise< R > prom;
+ channel_item itm(
+ future_wrapper< R >(
+ act,
+ prom),
+ attr,
+ intr);
+ channel_.put( itm, tm);
+ return task< R >( prom, intr);
+ }
+
+ template<
+ typename Act,
+ typename Tm,
+ typename T
+ >
+ task< typename result_of< Act() >::type > timed_chained_submit(
+ Act const& act,
+ Tm const& tm,
+ task< T > & tsk)
+ {
+ shared_lock< shared_mutex > lk1( mtx_state_);
+ if ( terminated_() )
+ throw task_rejected("pool ist terminated");
+ if ( terminateing_() )
+ throw task_rejected("pool ist terminateing");
+
+ adjust_pool_();
+
+ typedef typename result_of< Act() >::type R;
+ detail::interrupter intr;
+ promise< R > prom;
+ channel_item itm(
+ future_wrapper< R >(
+ act,
+ prom),
+ intr);
+ tsk.get_future().add_callback(
+ bind(
+ ( channel_iterator ( channel::*)( channel_item const&, Tm const&) ) & channel::put,
+ ref( channel_),
+ itm,
+ tm) );
+ return task< R >( prom, intr);
+ }
+
+ template<
+ typename Act,
+ typename Attr,
+ typename Tm,
+ typename T
+ >
+ task< typename result_of< Act() >::type > timed_chained_submit(
+ Act const& act,
+ Attr const& attr,
+ Tm const& tm,
+ task< T > & tsk)
+ {
+ shared_lock< shared_mutex > lk1( mtx_state_);
+ if ( terminated_() )
+ throw task_rejected("pool ist terminated");
+ if ( terminateing_() )
+ throw task_rejected("pool ist terminateing");
+
+ adjust_pool_();
+
+ typedef typename result_of< Act() >::type R;
+ detail::interrupter intr;
+ promise< R > prom;
+ channel_item itm(
+ future_wrapper< R >(
+ act,
+ prom),
+ attr,
+ intr);
+ tsk.get_future().add_callback(
+ bind(
+ ( channel_iterator ( channel::*)( channel_item const&, Tm const&) ) & channel::put,
+ ref( channel_),
+ itm,
+ tm) );
+ return task< R >( prom, intr);
+ }
+ };
+};
+} }
+
+#endif // BOOST_TP_ADAPTIVE_H
+
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/adjustment.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/adjustment.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -0,0 +1,48 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_ADJUSTMENT_H
+#define BOOST_TP_ADJUSTMENT_H
+
+#include <cstddef>
+
+#include <boost/tp/poolsize.hpp>
+
+namespace boost { namespace tp
+{
+struct depend_on_core
+{
+ bool operator ()(
+ poolsize size,
+ core_poolsize const& core_size,
+ max_poolsize const& max_size,
+ std::size_t,
+ bool full) const
+ { return size < core_size || full; }
+};
+
+struct depend_on_nothing
+{
+ bool operator ()(
+ poolsize,
+ core_poolsize const&,
+ max_poolsize const&,
+ std::size_t,
+ bool) const
+ { return true; }
+};
+
+struct depend_on_pending
+{
+ bool operator ()(
+ poolsize size,
+ core_poolsize const& core_size,
+ max_poolsize const& max_size,
+ std::size_t pending,
+ bool full) const
+ { return size < core_size || pending > 0; }
+};
+} }
+
+#endif // BOOST_TP_ADJUSTMENT_H
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/bounded_channel.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/bounded_channel.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -0,0 +1,370 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_BOUNDED_CHANNEL_H
+#define BOOST_TP_BOUNDED_CHANNEL_H
+
+#include <cstddef>
+#include <list>
+#include <utility>
+#include <vector>
+
+#include <boost/assert.hpp>
+#include <boost/bind.hpp>
+#include <boost/function.hpp>
+#include <boost/thread/condition.hpp>
+#include <boost/thread/locks.hpp>
+#include <boost/thread/shared_mutex.hpp>
+
+#include <boost/tp/detail/interrupter.hpp>
+#include <boost/tp/exceptions.hpp>
+#include <boost/tp/fifo.hpp>
+#include <boost/tp/watermark.hpp>
+
+namespace boost { namespace tp
+{
+template<
+ typename QueueingPolicy = fifo
+>
+class bounded_channel
+{
+private:
+ typedef function< void() > callable;
+ typedef QueueingPolicy queueing_policy;
+ typedef typename queueing_policy::template impl<
+ callable
+ > queue;
+
+public:
+ typedef typename queue::item item;
+ typedef typename queue::iterator iterator;
+
+private:
+ bool active_;
+ queue queue_;
+ shared_mutex mtx_;
+ condition not_empty_cond_;
+ condition not_full_cond_;
+ std::size_t hwm_;
+ std::size_t lwm_;
+
+ void activate_()
+ { active_ = true; }
+
+ void clear_()
+ {
+ BOOST_ASSERT( ! active_);
+ queue_.clear();
+ BOOST_ASSERT( queue_.empty() );
+ }
+
+ void deactivate_()
+ {
+ if ( active_)
+ {
+ active_ = false;
+ not_empty_cond_.notify_all();
+ not_full_cond_.notify_all();
+ }
+
+ BOOST_ASSERT( ! active_);
+ }
+
+ const std::vector< callable > drain_()
+ {
+ BOOST_ASSERT( ! active_);
+ std::vector< callable > unprocessed;
+ unprocessed.reserve( queue_.size() );
+ BOOST_FOREACH( item itm, queue_)
+ { unprocessed.push_back( itm.ca() ); }
+ clear_();
+ return unprocessed;
+ }
+
+ bool empty_() const
+ { return queue_.empty(); }
+
+ bool erase_(
+ iterator & i,
+ future< void > & f)
+ { return queue_.erase( i, f); }
+
+ bool full_() const
+ { return size_() >= hwm_; }
+
+ std::size_t size_() const
+ { return queue_.size(); }
+
+ void upper_bound_( std::size_t hwm)
+ {
+ if ( lwm_ > hwm )
+ throw invalid_watermark("low watermark must be less than or equal to high watermark");
+ std::size_t tmp( hwm_);
+ hwm_ = hwm;
+ if ( hwm_ > tmp) not_full_cond_.notify_one();
+ }
+
+ void lower_bound_( std::size_t lwm)
+ {
+ if ( lwm > hwm_ )
+ throw invalid_watermark("low watermark must be less than or equal to high watermark");
+ std::size_t tmp( lwm_);
+ lwm_ = lwm;
+ if ( lwm_ > tmp) not_full_cond_.notify_one();
+ }
+
+ iterator put_(
+ item const& itm,
+ unique_lock< shared_mutex > & lk)
+ {
+ if ( ! active_)
+ throw task_rejected("channel is not active");
+ not_full_cond_.wait(
+ lk,
+ bind(
+ & bounded_channel::producers_activate_,
+ this) );
+ if ( ! active_)
+ throw task_rejected("channel is not active");
+ iterator i( queue_.push( itm) );
+ not_empty_cond_.notify_one();
+ return i;
+ }
+
+ template< typename Tm >
+ iterator put_(
+ item const& itm,
+ Tm const& tm,
+ unique_lock< shared_mutex > & lk)
+ {
+ if ( ! active_)
+ throw task_rejected("channel is not active");
+ if ( ! not_full_cond_.timed_wait(
+ lk,
+ tm,
+ bind(
+ & bounded_channel::producers_activate_,
+ this) ) )
+ throw task_rejected("timed out");
+ if ( ! active_)
+ throw task_rejected("channel is not active");
+ iterator i( queue_.push( itm) );
+ not_empty_cond_.notify_one();
+ return i;
+ }
+
+ bool take_(
+ callable & ca,
+ detail::interrupter & intr,
+ unique_lock< shared_mutex > & lk)
+ {
+ if ( ! active_ && empty_() )
+ return false;
+ try
+ {
+ not_empty_cond_.wait(
+ lk,
+ bind(
+ & bounded_channel::consumers_activate_,
+ this) );
+ }
+ catch ( thread_interrupted const& e)
+ { return false; }
+ if ( ! active_ && empty_() )
+ return false;
+ item itm( queue_.pop() );
+ ca = itm.ca();
+ intr = itm.intr();
+ if ( size_() <= lwm_)
+ {
+ if ( lwm_ == hwm_)
+ not_full_cond_.notify_one();
+ else
+ // more than one producer could be waiting
+ // for submiting an action object
+ not_full_cond_.notify_all();
+ }
+ return ! ca.empty();
+ }
+
+ template< typename Tm >
+ bool take_(
+ callable & ca,
+ detail::interrupter & intr,
+ Tm const& tm,
+ unique_lock< shared_mutex > & lk)
+ {
+ if ( ! active_ && empty_() )
+ return false;
+ try
+ {
+ if ( ! not_empty_cond_.timed_wait(
+ lk,
+ tm,
+ bind(
+ & bounded_channel::consumers_activate_,
+ this) ) )
+ return false;
+ }
+ catch ( thread_interrupted const& e)
+ { return false; }
+ if ( ! active_ && empty_() )
+ return false;
+ item itm( queue_.pop() );
+ ca = itm.ca();
+ intr = itm.intr();
+ if ( size_() <= lwm_)
+ {
+ if ( lwm_ == hwm_)
+ not_full_cond_.notify_one();
+ else
+ // more than one producer could be waiting
+ // for submiting an action object
+ not_full_cond_.notify_all();
+ }
+ return ! ca.empty();
+ }
+
+ bool producers_activate_() const
+ { return ! active_ || ! full_(); }
+
+ bool consumers_activate_() const
+ { return ! active_ || ! empty_(); }
+
+public:
+ bounded_channel(
+ high_watermark const& hwm,
+ low_watermark const& lwm)
+ :
+ active_( false),
+ queue_(),
+ mtx_(),
+ not_empty_cond_(),
+ not_full_cond_(),
+ hwm_( hwm),
+ lwm_( lwm)
+ {
+ if ( lwm_ > hwm_ )
+ throw invalid_watermark("low watermark must be less than or equal to high watermark");
+ }
+
+ bool active()
+ {
+ shared_lock< shared_mutex > lk( mtx_);
+ return active_;
+ }
+
+ void activate()
+ {
+ unique_lock< shared_mutex > lk( mtx_);
+ activate_();
+ }
+
+ void clear()
+ {
+ unique_lock< shared_mutex > lk( mtx_);
+ clear_();
+ }
+
+ bool deactive()
+ { return ! active(); }
+
+ void deactivate()
+ {
+ unique_lock< shared_mutex > lk( mtx_);
+ deactivate_();
+ }
+
+ const std::vector< callable > drain()
+ {
+ unique_lock< shared_mutex > lk( mtx_);
+ return drain_();
+ }
+
+ bool empty()
+ {
+ shared_lock< shared_mutex > lk( mtx_);
+ return empty_();
+ }
+
+ bool erase(
+ iterator & i,
+ future< void > f)
+ {
+ shared_lock< shared_mutex > lk( mtx_);
+ return erase_( i, f);
+ }
+
+ bool full()
+ {
+ shared_lock< shared_mutex > lk( mtx_);
+ return full_();
+ }
+
+ const std::size_t upper_bound()
+ {
+ shared_lock< shared_mutex > lk( mtx_);
+ return hwm_;
+ }
+
+ void upper_bound( std::size_t hwm)
+ {
+ unique_lock< shared_mutex > lk( mtx_);
+ upper_bound_( hwm);
+ }
+
+ const std::size_t lower_bound()
+ {
+ shared_lock< shared_mutex > lk( mtx_);
+ return lwm_;
+ }
+
+ void lower_bound( std::size_t lwm)
+ {
+ unique_lock< shared_mutex > lk( mtx_);
+ lower_bound_( lwm);
+ }
+
+ std::size_t size()
+ {
+ shared_lock< shared_mutex > lk( mtx_);
+ return size_();
+ }
+
+ iterator put( item const& itm)
+ {
+ unique_lock< shared_mutex > lk( mtx_);
+ return put_( itm, lk);
+ }
+
+ template< typename Tm >
+ iterator put(
+ item const& itm,
+ Tm const& tm)
+ {
+ unique_lock< shared_mutex > lk( mtx_);
+ return put_( itm, tm, lk);
+ }
+
+ bool take(
+ callable & ca,
+ detail::interrupter & intr)
+ {
+ unique_lock< shared_mutex > lk( mtx_);
+ return take_( ca, intr, lk);
+ }
+
+ template< typename Tm >
+ bool take(
+ callable & ca,
+ detail::interrupter & intr,
+ Tm const& tm)
+ {
+ unique_lock< shared_mutex > lk( mtx_);
+ return take_( ca, intr, tm, lk);
+ }
+};
+} }
+
+#endif // BOOST_TP_BOUNDED_CHANNEL_H
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/detail/interrupter.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/detail/interrupter.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -0,0 +1,86 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_DETAIL_INTERRUPTER_H
+#define BOOST_TP_DETAIL_INTERRUPTER_H
+
+#include <boost/shared_ptr.hpp>
+#include <boost/thread.hpp>
+#include <boost/thread/locks.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/utility.hpp>
+
+namespace boost { namespace tp {
+namespace detail
+{
+class interrupter
+{
+private:
+ class impl
+ : private noncopyable
+ {
+ private:
+ bool is_requested_;
+ mutex mtx_;
+ shared_ptr< thread > thrd_;
+
+ public:
+ impl()
+ :
+ is_requested_( false),
+ mtx_(),
+ thrd_()
+ {}
+
+ void set( shared_ptr< thread > const& thrd)
+ {
+ BOOST_ASSERT( thrd);
+ unique_lock< mutex > lk( mtx_);
+ thrd_ = thrd;
+ BOOST_ASSERT( thrd_);
+ if ( is_requested_) thrd_->interrupt();
+ }
+
+ void reset()
+ {
+ unique_lock< mutex > lk( mtx_);
+ thrd_.reset();
+ BOOST_ASSERT( ! thrd_);
+ try
+ { this_thread::interruption_point(); }
+ catch ( thread_interrupted const&)
+ {}
+ }
+
+ void interrupt()
+ {
+ unique_lock< mutex > lk( mtx_);
+ if ( ! is_requested_)
+ {
+ is_requested_ = true;
+ if ( thrd_) thrd_->interrupt();
+ }
+ }
+ };
+
+ shared_ptr< impl > impl_;
+
+public:
+ interrupter()
+ : impl_( new impl() )
+ {}
+
+ void set( shared_ptr< thread > const& thrd)
+ { impl_->set( thrd); }
+
+ void reset()
+ { impl_->reset(); }
+
+ void interrupt()
+ { impl_->interrupt(); }
+};
+}
+} }
+
+#endif // BOOST_TP_DETAIL_INTERRUPTER_H
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/detail/lock.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/detail/lock.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -0,0 +1,152 @@
+// Copyright Howard Hinnant 2007. Distributed under the Boost // Software License, Version 1.0. (see http://www.boost.org/LICENSE_1_0.txt)
+#ifndef BOOST_TP_DETAIL_LOCK_H
+#define BOOST_TP_DETAIL_LOCK_H
+
+#include <boost/thread.hpp>
+
+namespace boost { namespace tp {
+namespace detail
+{
+// returns index of failed lock or -1 on success
+template< class L1, class L2 >
+int try_lock( L1 & l1, L2 & l2)
+{
+ unique_lock< L1 > u( l1, try_to_lock);
+ if ( u.owns_lock())
+ {
+ if ( l2.try_lock() )
+ {
+ u.release();
+ return -1;
+ }
+ return 1;
+ }
+ return 0;
+}
+
+template< class L1, class L2, class L3 >
+int try_lock( L1 & l1, L2 & l2, L3 & l3)
+{
+ unsigned int r = 0;
+ unique_lock< L1 > u( l1, try_to_lock);
+ if ( u.owns_lock() )
+ {
+ r = try_lock( l2, l3);
+ if ( r == -1)
+ u.release();
+ else
+ ++r;
+ }
+ return r;
+}
+
+template< class L1, class L2, class L3, class L4 >
+int try_lock( L1 & l1, L2 & l2, L3 & l3, L4 & l4)
+{
+ unsigned int r = 0;
+ unique_lock< L1 > u( l1, try_to_lock);
+ if ( u.owns_lock() )
+ {
+ r = try_lock( l2, l3, l4);
+ if ( r == -1)
+ u.release();
+ else
+ ++r;
+ }
+ return r;
+}
+
+template< class L1, class L2, class L3, class L4, class L5 >
+int try_lock( L1 & l1, L2 & l2, L3 & l3, L4 & l4, L5 & l5)
+{
+ unsigned int r = 0;
+ unique_lock< L1 > u( l1, try_to_lock);
+ if ( u.owns_lock() )
+ {
+ r = try_lock( l2, l3, l4, l5);
+ if ( r == -1)
+ u.release();
+ else
+ ++r;
+ }
+ return r;
+}
+
+template< class L1, class L2, class L3 >
+void __lock_first( int __i, L1 & l1, L2 & l2, L3 & l3)
+{
+ while ( true)
+ {
+ switch ( __i)
+ {
+ case 0:
+ {
+ unique_lock< L1 > __u1( l1);
+ __i = try_lock( l2, l3);
+ if ( __i == -1)
+ {
+ __u1.release();
+ return;
+ }
+ }
+ ++__i;
+ boost::thread::yield();
+ break;
+ case 1:
+ {
+ unique_lock< L2 > __u2( l2);
+ __i = try_lock( l3, l1);
+ if ( __i == -1)
+ {
+ __u2.release();
+ return;
+ }
+ }
+ if ( __i == sizeof( L3) )
+ __i = 0;
+ else
+ __i += 2;
+ boost::thread::yield();
+ break;
+ default:
+ __lock_first( __i - 2, l3, l1, l2);
+ return;
+ }
+ }
+}
+
+template< class L1, class L2 >
+void lock( L1 & l1, L2 & l2)
+{
+ while ( true)
+ {
+ {
+ unique_lock< L1 > __u1( l1);
+ if ( l2.try_lock() )
+ {
+ __u1.release();
+ break;
+ }
+ }
+ boost::thread::yield();
+
+ {
+ unique_lock< L2 > __u2( l2);
+ if ( l1.try_lock())
+ {
+ __u2.release();
+ break;
+ }
+ }
+ boost::thread::yield();
+ }
+}
+
+template< class L1, class L2, class L3 >
+void lock( L1 & l1, L2 & l2, L3 & l3)
+{ __lock_first( 0, l1, l2, l3); }
+
+} }
+}
+
+#endif // BOOST_TP_DETAIL_LOCK_H
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/detail/lock_guard.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/detail/lock_guard.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -0,0 +1,39 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_DETAIL_LOCK_GUARD_H
+#define BOOST_TP_DETAIL_LOCK_GUARD_H
+
+#include <boost/utility.hpp>
+
+#include <boost/tp/detail/lock.hpp>
+
+namespace boost { namespace tp {
+namespace detail
+{
+ template< typename M0, typename M1 >
+ class lockguard
+ : private boost::noncopyable
+ {
+ private:
+ M0 & m0_;
+ M1 & m1_;
+
+ public:
+ lockguard(
+ M0 & m0,
+ M1 & m1)
+ : m0_( m0), m1_( m1)
+ { lock( m0_, m1_); }
+
+ ~lockguard()
+ {
+ m0_.unlock();
+ m1_.unlock();
+ }
+ };
+}
+} }
+
+#endif // BOOST_TP_DETAIL_LOCK_GUARD_H
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/exceptions.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/exceptions.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -0,0 +1,68 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_EXCEPTIONS_H
+#define BOOST_TP_EXCEPTIONS_H
+
+#include <stdexcept>
+#include <string>
+
+namespace boost { namespace tp
+{
+class invalid_poolsize
+: public std::invalid_argument
+{
+public:
+ invalid_poolsize( std::string const& msg)
+ : std::invalid_argument( msg)
+ {}
+};
+
+class invalid_timeout
+: public std::invalid_argument
+{
+public:
+ invalid_timeout( std::string const& msg)
+ : std::invalid_argument( msg)
+ {}
+};
+
+class invalid_watermark
+: public std::invalid_argument
+{
+public:
+ invalid_watermark( std::string const& msg)
+ : std::invalid_argument( msg)
+ {}
+};
+
+class task_interrupted
+: public std::runtime_error
+{
+public:
+ task_interrupted( std::string const& msg)
+ : std::runtime_error( msg)
+ {}
+};
+
+class task_rejected
+: public std::runtime_error
+{
+public:
+ task_rejected( std::string const& msg)
+ : std::runtime_error( msg)
+ {}
+};
+
+class task_removed
+: public std::runtime_error
+{
+public:
+ task_removed( std::string const& msg)
+ : std::runtime_error( msg)
+ {}
+};
+} }
+
+#endif // BOOST_TP_EXCEPTIONS_H
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/fifo.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/fifo.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -0,0 +1,110 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_FIFO_H
+#define BOOST_TP_FIFO_H
+
+#include <cstddef>
+#include <list>
+
+#include <boost/assert.hpp>
+#include <boost/bind.hpp>
+#include <boost/function.hpp>
+#include <boost/future/future.hpp>
+
+#include <boost/tp/detail/interrupter.hpp>
+
+namespace boost { namespace tp
+{
+struct fifo
+{
+ template< typename Callable >
+ class impl
+ {
+ private:
+ typedef Callable callable;
+
+ public:
+ class item
+ {
+ private:
+ callable ca_;
+ detail::interrupter intr_;
+ promise< void > prom_;
+
+ public:
+ item(
+ callable const& ca,
+ detail::interrupter const& intr)
+ : ca_( ca), intr_( intr), prom_()
+ {}
+
+ const callable ca() const
+ { return ca_; }
+
+ const detail::interrupter intr() const
+ { return intr_; }
+
+ promise< void > & prom()
+ { return prom_; }
+ };
+
+ typedef typename std::list< item >::iterator iterator;
+ typedef typename std::list< item >::const_iterator const_iterator;
+
+ private:
+ std::list< item > lst_;
+
+ public:
+ iterator push( item const& itm)
+ {
+ lst_.push_back( itm);
+ return ( ++lst_.rbegin() ).base();
+ }
+
+ const item pop()
+ {
+ item itm( lst_.front() );
+ lst_.pop_front();
+ itm.prom().set();
+ return itm;
+ }
+
+ bool erase(
+ iterator & i,
+ future< void > & f)
+ {
+ if ( f.ready() ) return false;
+ item itm( * i);
+ itm.prom().set();
+ lst_.erase( i);
+ BOOST_ASSERT( f.ready() );
+ return true;
+ }
+
+ std::size_t size() const
+ { return lst_.size(); }
+
+ bool empty() const
+ { return lst_.empty(); }
+
+ void clear()
+ { lst_.clear(); }
+
+ const iterator begin()
+ { return lst_.begin(); }
+
+ const const_iterator begin() const
+ { return lst_.begin(); }
+
+ const iterator end()
+ { return lst_.end(); }
+
+ const const_iterator end() const
+ { return lst_.end(); }
+ };
+};
+} }
+
+#endif // BOOST_TP_FIFO_H
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/fixed.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/fixed.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -0,0 +1,645 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_FIXED_H
+#define BOOST_TP_FIXED_H
+
+#include <algorithm>
+#include <cstddef>
+#include <functional>
+#include <vector>
+
+#include <boost/assert.hpp>
+#include <boost/bind.hpp>
+#include <boost/foreach.hpp>
+#include <boost/function.hpp>
+#include <boost/future/future.hpp>
+#include <boost/multi_index_container.hpp>
+#include <boost/multi_index/mem_fun.hpp>
+#include <boost/multi_index/ordered_index.hpp>
+#include <boost/ref.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/thread.hpp>
+#include <boost/thread/locks.hpp>
+#include <boost/thread/shared_mutex.hpp>
+#include <boost/utility.hpp>
+
+#include <boost/tp/detail/interrupter.hpp>
+#include <boost/tp/fifo.hpp>
+#include <boost/tp/poolsize.hpp>
+#include <boost/tp/task.hpp>
+#include <boost/tp/unbounded_channel.hpp>
+#include <boost/tp/watermark.hpp>
+
+namespace boost { namespace tp
+{
+struct fixed
+{
+ template<
+ typename Channel = unbounded_channel< fifo >
+ >
+ class impl
+ : private noncopyable
+ {
+ private:
+ typedef function< void() > callable;
+ typedef Channel channel;
+ typedef typename channel::item channel_item;
+ typedef typename channel::iterator channel_iterator;
+
+ enum state
+ {
+ active_state,
+ terminateing_state,
+ terminated_state
+ };
+
+ class active_guard
+ {
+ private:
+ std::size_t & active_;
+ shared_mutex & mtx_worker_;
+
+ public:
+ active_guard(
+ std::size_t & active__,
+ shared_mutex & mtx_worker__)
+ : active_( active__), mtx_worker_( mtx_worker__)
+ {
+ unique_lock< shared_mutex > lk( mtx_worker_);
+ ++active_;
+ BOOST_ASSERT( active_ > 0);
+ }
+
+ ~active_guard()
+ {
+ unique_lock< shared_mutex > lk( mtx_worker_);
+ --active_;
+ BOOST_ASSERT( active_ >= 0);
+ }
+ };
+
+ class worker
+ {
+ private:
+ shared_ptr< thread > thrd_;
+
+ public:
+ worker( callable const& ca)
+ : thrd_( new thread( ca) )
+ { BOOST_ASSERT( ! ca.empty() ); }
+
+ const shared_ptr< thread > thrd() const
+ { return thrd_; }
+
+ const thread::id get_id() const
+ { return thrd_->get_id(); }
+
+ void join() const
+ { thrd_->join(); }
+
+ void interrupt() const
+ { thrd_->interrupt(); }
+ };
+
+ struct id_idx_tag {};
+
+ typedef multi_index::multi_index_container<
+ worker,
+ multi_index::indexed_by<
+ multi_index::ordered_unique<
+ multi_index::tag< id_idx_tag >,
+ multi_index::const_mem_fun<
+ worker,
+ const thread::id,
+ & worker::get_id
+ >
+ >
+ >
+ > worker_list;
+
+ typedef typename worker_list::template index<
+ id_idx_tag >::type id_idx;
+
+ std::size_t max_size_;
+ worker_list worker_;
+ id_idx & iidx_;
+ std::size_t active_worker_;
+ shared_mutex mtx_worker_;
+ state state_;
+ shared_mutex mtx_state_;
+ channel channel_;
+
+ void entry_()
+ {
+ shared_lock< shared_mutex > lk( mtx_worker_);
+ id_idx & iidx( worker_.get< id_idx_tag >() );
+ typename id_idx::iterator i( iidx.end() );
+ typename id_idx::iterator e( iidx.end() );
+ lk.unlock();
+ while ( i == e)
+ {
+ lk.lock();
+ i = iidx.find( this_thread::get_id() );
+ lk.unlock();
+ }
+ BOOST_ASSERT( i != e);
+
+ shared_ptr< thread > thrd( i->thrd() );
+ BOOST_ASSERT( thrd);
+ callable ca;
+ detail::interrupter intr;
+ while ( channel_.take( ca, intr) )
+ {
+ BOOST_ASSERT( ! ca.empty() );
+ active_guard ag(
+ active_worker_,
+ mtx_worker_);
+ shared_ptr< void > ig(
+ static_cast< void * >( 0),
+ bind(
+ & detail::interrupter::reset,
+ intr) );
+ intr.set( thrd);
+ ca();
+ ca.clear();
+ BOOST_ASSERT( ca.empty() );
+ }
+ }
+
+ void create_worker_()
+ {
+ BOOST_ASSERT( ! terminateing_() && ! terminated_() );
+ unique_lock< shared_mutex > lk( mtx_worker_);
+ worker_.insert(
+ worker(
+ bind(
+ & impl::entry_,
+ this) ) );
+ lk.unlock();
+ }
+
+ std::size_t active_() const
+ { return active_worker_; }
+
+ std::size_t size_()
+ { return worker_.size(); }
+
+ bool terminated_() const
+ { return state_ == terminated_state; }
+
+ bool terminateing_() const
+ { return state_ == terminateing_state; }
+
+ public:
+ explicit impl( max_poolsize const& max_size)
+ :
+ max_size_( max_size),
+ worker_(),
+ iidx_( worker_.get< id_idx_tag >() ),
+ active_worker_( 0),
+ mtx_worker_(),
+ state_( active_state),
+ mtx_state_(),
+ channel_()
+ {
+ channel_.activate();
+ for ( std::size_t i( 0); i < max_size; ++i)
+ create_worker_();
+ }
+
+ explicit impl(
+ max_poolsize const& max_size,
+ high_watermark const& hwm,
+ low_watermark const& lwm)
+ :
+ max_size_( max_size),
+ worker_(),
+ iidx_( worker_.get< id_idx_tag >() ),
+ active_worker_( 0),
+ mtx_worker_(),
+ state_( active_state),
+ mtx_state_(),
+ channel_(
+ hwm,
+ lwm)
+ {
+ BOOST_ASSERT( lwm <= hwm);
+ channel_.activate();
+ for ( std::size_t i( 0); i < max_size; ++i)
+ create_worker_();
+ }
+
+ ~impl()
+ { shutdown(); }
+
+ std::size_t active()
+ {
+ shared_lock< shared_mutex > lk( mtx_worker_);
+ return active_();
+ }
+
+ std::size_t idle()
+ {
+ shared_lock< shared_mutex > lk( mtx_worker_);
+ return size_() - active_();
+ }
+
+ void shutdown()
+ {
+ unique_lock< shared_mutex > lk1( mtx_state_);
+ if ( terminateing_() || terminated_() ) return;
+ state_ = terminateing_state;
+ lk1.unlock();
+
+ unique_lock< shared_mutex > lk2( mtx_worker_);
+ channel_.deactivate();
+ std::vector< worker > lst(
+ worker_.begin(),
+ worker_.end() );
+ worker_.clear();
+ lk2.unlock();
+
+ std::for_each(
+ lst.begin(),
+ lst.end(),
+ std::mem_fun_ref( & worker::join) );
+
+ lk1.lock();
+ state_ = terminated_state;
+ lk1.unlock();
+ }
+
+ const std::vector< callable > shutdown_now()
+ {
+ unique_lock< shared_mutex > lk1( mtx_state_);
+ if ( terminateing_() || terminated_() ) return std::vector< callable >();
+ state_ = terminateing_state;
+ lk1.unlock();
+
+ unique_lock< shared_mutex > lk2( mtx_worker_);
+ channel_.deactivate();
+ std::for_each(
+ worker_.begin(),
+ worker_.end(),
+ std::mem_fun_ref( & worker::interrupt) );
+ std::vector< worker > lst(
+ worker_.begin(),
+ worker_.end() );
+ worker_.clear();
+ std::vector< callable > drain( channel_.drain() );
+ lk2.unlock();
+
+ std::for_each(
+ lst.begin(),
+ lst.end(),
+ std::mem_fun_ref( & worker::join) );
+
+ lk1.lock();
+ state_ = terminated_state;
+ lk1.unlock();
+
+ return drain;
+ }
+
+ std::size_t size()
+ {
+ shared_lock< shared_mutex > lk( mtx_worker_);
+ return size_();
+ }
+
+ std::size_t max_size()
+ { return max_size_; }
+
+ bool terminated()
+ {
+ shared_lock< shared_mutex > lk( mtx_state_);
+ return terminated_();
+ }
+
+ bool terminateing()
+ {
+ shared_lock< shared_mutex > lk( mtx_state_);
+ return terminateing_();
+ }
+
+ void clear()
+ { channel_.clear(); }
+
+ bool empty()
+ { return channel_.empty(); }
+
+ std::size_t pending()
+ { return channel_.size(); }
+
+ const std::size_t upper_bound()
+ { return channel_.upper_bound(); }
+
+ void upper_bound( std::size_t hwm)
+ { return channel_.upper_bound( hwm); }
+
+ const std::size_t lower_bound()
+ { return channel_.lower_bound(); }
+
+ void lower_bound( std::size_t lwm)
+ { return channel_.lower_bound( lwm); }
+
+ template< typename Act >
+ task< typename result_of< Act() >::type > submit( Act const& act)
+ {
+ shared_lock< shared_mutex > lk( mtx_state_);
+ if ( terminated_() )
+ throw task_rejected("pool ist terminated");
+ if ( terminateing_() )
+ throw task_rejected("pool ist terminateing");
+
+ typedef typename result_of< Act() >::type R;
+ detail::interrupter intr;
+ promise< R > prom;
+ channel_item itm(
+ future_wrapper< R >(
+ act,
+ prom),
+ intr);
+ channel_.put( itm);
+ return task< R >( prom, intr);
+ }
+
+ template<
+ typename Act,
+ typename Attr
+ >
+ task< typename result_of< Act() >::type > submit(
+ Act const& act,
+ Attr const& attr)
+ {
+ shared_lock< shared_mutex > lk( mtx_state_);
+ if ( terminated_() )
+ throw task_rejected("pool ist terminated");
+ if ( terminateing_() )
+ throw task_rejected("pool ist terminateing");
+
+ typedef typename result_of< Act() >::type R;
+ detail::interrupter intr;
+ promise< R > prom;
+ channel_item itm(
+ future_wrapper< R >(
+ act,
+ prom),
+ attr,
+ intr);
+ channel_.put( itm);
+ return task< R >( prom, intr);
+ }
+
+ template<
+ typename Act,
+ typename T
+ >
+ task< typename result_of< Act() >::type > chained_submit(
+ Act const& act,
+ task< T > & tsk)
+ {
+ shared_lock< shared_mutex > lk( mtx_state_);
+ if ( terminated_() )
+ throw task_rejected("pool ist terminated");
+ if ( terminateing_() )
+ throw task_rejected("pool ist terminateing");
+
+ typedef typename result_of< Act() >::type R;
+ detail::interrupter intr;
+ promise< R > prom;
+ channel_item itm(
+ future_wrapper< R >(
+ act,
+ prom),
+ intr);
+ tsk.get_future().add_callback(
+ bind(
+ ( channel_iterator ( channel::*)( channel_item const&) ) & channel::put,
+ ref( channel_),
+ itm) );
+ return task< R >( prom, intr);
+ }
+
+ template<
+ typename Act,
+ typename Attr,
+ typename T
+ >
+ task< typename result_of< Act() >::type > chained_submit(
+ Act const& act,
+ Attr const& attr,
+ task< T > & tsk)
+ {
+ shared_lock< shared_mutex > lk( mtx_state_);
+ if ( terminated_() )
+ throw task_rejected("pool ist terminated");
+ if ( terminateing_() )
+ throw task_rejected("pool ist terminateing");
+
+ typedef typename result_of< Act() >::type R;
+ detail::interrupter intr;
+ promise< R > prom;
+ channel_item itm(
+ future_wrapper< R >(
+ act,
+ prom),
+ attr,
+ intr);
+ tsk.get_future().add_callback(
+ bind(
+ ( channel_iterator ( channel::*)( channel_item const&) ) & channel::put,
+ ref( channel_),
+ itm) );
+ return task< R >( prom, intr);
+ }
+
+ template< typename Act >
+ task< typename result_of< Act() >::type > lazy_submit( Act const& act)
+ {
+ shared_lock< shared_mutex > lk( mtx_state_);
+ if ( terminated_() )
+ throw task_rejected("pool ist terminated");
+ if ( terminateing_() )
+ throw task_rejected("pool ist terminateing");
+
+ typedef typename result_of< Act() >::type R;
+ detail::interrupter intr;
+ promise< R > prom;
+ future< void > fg( prom.get_needed_future() );
+ channel_item itm(
+ future_wrapper< R >(
+ act,
+ prom),
+ intr);
+ fg.add_callback(
+ bind(
+ ( channel_iterator ( channel::*)( channel_item const&) ) & channel::put,
+ ref( channel_),
+ itm) );
+ return task< R >( prom, intr);
+ }
+
+ template<
+ typename Act,
+ typename Attr
+ >
+ task< typename result_of< Act() >::type > lazy_submit(
+ Act const& act,
+ Attr const& attr)
+ {
+ shared_lock< shared_mutex > lk( mtx_state_);
+ if ( terminated_() )
+ throw task_rejected("pool ist terminated");
+ if ( terminateing_() )
+ throw task_rejected("pool ist terminateing");
+
+ typedef typename result_of< Act() >::type R;
+ detail::interrupter intr;
+ promise< R > prom;
+ future< void > fg( prom.get_needed_future() );
+ channel_item itm(
+ future_wrapper< R >(
+ act,
+ prom),
+ attr,
+ intr);
+ fg.add_callback(
+ bind(
+ ( channel_iterator ( channel::*)( channel_item const&) ) & channel::put,
+ ref( channel_),
+ itm) );
+ return task< R >( prom, intr);
+ }
+
+ template<
+ typename Act,
+ typename Tm
+ >
+ task< typename result_of< Act() >::type > timed_submit(
+ Act const& act,
+ Tm const& tm)
+ {
+ shared_lock< shared_mutex > lk( mtx_state_);
+ if ( terminated_() )
+ throw task_rejected("pool ist terminated");
+ if ( terminateing_() )
+ throw task_rejected("pool ist terminateing");
+
+ typedef typename result_of< Act() >::type R;
+ detail::interrupter intr;
+ promise< R > prom;
+ channel_item itm(
+ future_wrapper< R >(
+ act,
+ prom),
+ intr);
+ channel_.put( itm, tm);
+ return task< R >( prom, intr);
+ }
+
+ template<
+ typename Act,
+ typename Attr,
+ typename Tm
+ >
+ task< typename result_of< Act() >::type > timed_submit(
+ Act const& act,
+ Attr const& attr,
+ Tm const& tm)
+ {
+ shared_lock< shared_mutex > lk( mtx_state_);
+ if ( terminated_() )
+ throw task_rejected("pool ist terminated");
+ if ( terminateing_() )
+ throw task_rejected("pool ist terminateing");
+
+ typedef typename result_of< Act() >::type R;
+ detail::interrupter intr;
+ promise< R > prom;
+ channel_item itm(
+ future_wrapper< R >(
+ act,
+ prom),
+ attr,
+ intr);
+ channel_.put( itm, tm);
+ return task< R >( prom, intr);
+ }
+
+ template<
+ typename Act,
+ typename Tm,
+ typename T
+ >
+ task< typename result_of< Act() >::type > timed_chained_submit(
+ Act const& act,
+ Tm const& tm,
+ task< T > & tsk)
+ {
+ shared_lock< shared_mutex > lk( mtx_state_);
+ if ( terminated_() )
+ throw task_rejected("pool ist terminated");
+ if ( terminateing_() )
+ throw task_rejected("pool ist terminateing");
+
+ typedef typename result_of< Act() >::type R;
+ detail::interrupter intr;
+ promise< R > prom;
+ channel_item itm(
+ future_wrapper< R >(
+ act,
+ prom),
+ intr);
+ tsk.get_future().add_callback(
+ bind(
+ ( channel_iterator ( channel::*)( channel_item const&, Tm const&) ) & channel::put,
+ ref( channel_),
+ itm,
+ tm) );
+ return task< R >( prom, intr);
+ }
+
+ template<
+ typename Act,
+ typename Attr,
+ typename Tm,
+ typename T
+ >
+ task< typename result_of< Act() >::type > timed_chained_submit(
+ Act const& act,
+ Attr const& attr,
+ Tm const& tm,
+ task< T > & tsk)
+ {
+ shared_lock< shared_mutex > lk( mtx_state_);
+ if ( terminated_() )
+ throw task_rejected("pool ist terminated");
+ if ( terminateing_() )
+ throw task_rejected("pool ist terminateing");
+
+ typedef typename result_of< Act() >::type R;
+ detail::interrupter intr;
+ promise< R > prom;
+ channel_item itm(
+ future_wrapper< R >(
+ act,
+ prom),
+ attr,
+ intr);
+ tsk.get_future().add_callback(
+ bind(
+ ( channel_iterator ( channel::*)( channel_item const&, Tm const&) ) & channel::put,
+ ref( channel_),
+ itm,
+ tm) );
+ return task< R >( prom, intr);
+ }
+ };
+};
+} }
+
+#endif // BOOST_TP_FIXED_H
+
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/lazy.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/lazy.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -0,0 +1,766 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_LAZY_H
+#define BOOST_TP_LAZY_H
+
+#include <algorithm>
+#include <cstddef>
+#include <functional>
+#include <vector>
+
+#include <boost/assert.hpp>
+#include <boost/bind.hpp>
+#include <boost/foreach.hpp>
+#include <boost/function.hpp>
+#include <boost/future/future.hpp>
+#include <boost/multi_index_container.hpp>
+#include <boost/multi_index/mem_fun.hpp>
+#include <boost/multi_index/ordered_index.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/thread.hpp>
+#include <boost/thread/condition.hpp>
+#include <boost/thread/locks.hpp>
+#include <boost/thread/shared_mutex.hpp>
+#include <boost/utility.hpp>
+
+#include <boost/tp/adjustment.hpp>
+#include <boost/tp/detail/interrupter.hpp>
+#include <boost/tp/fifo.hpp>
+#include <boost/tp/poolsize.hpp>
+#include <boost/tp/task.hpp>
+#include <boost/tp/unbounded_channel.hpp>
+#include <boost/tp/watermark.hpp>
+
+namespace boost { namespace tp
+{
+template<
+ typename AdjustmentPolicy = depend_on_pending
+>
+struct lazy
+{
+ template<
+ typename Channel = unbounded_channel< fifo >
+ >
+ class impl
+ : private noncopyable
+ {
+ private:
+ typedef AdjustmentPolicy adjustment_policy;
+ typedef function< void() > callable;
+ typedef Channel channel;
+ typedef typename channel::item channel_item;
+ typedef typename channel::iterator channel_iterator;
+
+ enum state
+ {
+ active_state,
+ terminateing_state,
+ terminated_state
+ };
+
+ class active_guard
+ {
+ private:
+ std::size_t & active_;
+ shared_mutex & mtx_worker_;
+
+ public:
+ active_guard(
+ std::size_t & active__,
+ shared_mutex & mtx_worker__)
+ : active_( active__), mtx_worker_( mtx_worker__)
+ {
+ unique_lock< shared_mutex > lk( mtx_worker_);
+ BOOST_ASSERT( active_ >= 0);
+ ++active_;
+ }
+
+ ~active_guard()
+ {
+ unique_lock< shared_mutex > lk( mtx_worker_);
+ --active_;
+ BOOST_ASSERT( active_ >= 0);
+ }
+ };
+
+ class worker
+ {
+ private:
+ shared_ptr< thread > thrd_;
+
+ public:
+ worker( callable const& ca)
+ : thrd_( new thread( ca) )
+ { BOOST_ASSERT( ! ca.empty() ); }
+
+ const shared_ptr< thread > thrd() const
+ { return thrd_; }
+
+ const thread::id get_id() const
+ { return thrd_->get_id(); }
+
+ void join() const
+ { thrd_->join(); }
+
+ void interrupt() const
+ { thrd_->interrupt(); }
+ };
+
+ struct id_idx_tag {};
+
+ typedef multi_index::multi_index_container<
+ worker,
+ multi_index::indexed_by<
+ multi_index::ordered_unique<
+ multi_index::tag< id_idx_tag >,
+ multi_index::const_mem_fun<
+ worker,
+ const thread::id,
+ & worker::get_id
+ >
+ >
+ >
+ > worker_list;
+
+ typedef typename worker_list::template index<
+ id_idx_tag >::type id_idx;
+
+ std::size_t core_size_;
+ std::size_t max_size_;
+ worker_list worker_;
+ id_idx & iidx_;
+ std::size_t active_worker_;
+ shared_mutex mtx_worker_;
+ state state_;
+ shared_mutex mtx_state_;
+ channel channel_;
+ adjustment_policy adjust_pol_;
+
+ void entry_()
+ {
+ shared_lock< shared_mutex > lk( mtx_worker_);
+ id_idx & iidx( worker_.get< id_idx_tag >() );
+ typename id_idx::iterator i( iidx.end() );
+ typename id_idx::iterator e( iidx.end() );
+ lk.unlock();
+ while ( i == e)
+ {
+ lk.lock();
+ i = iidx.find( this_thread::get_id() );
+ lk.unlock();
+ }
+ BOOST_ASSERT( i != e);
+
+ shared_ptr< thread > thrd( i->thrd() );
+ BOOST_ASSERT( thrd);
+ callable ca;
+ detail::interrupter intr;
+ while ( channel_.take( ca, intr) )
+ {
+ BOOST_ASSERT( ! ca.empty() );
+ active_guard guard(
+ active_worker_,
+ mtx_worker_);
+ shared_ptr< void > ig(
+ static_cast< void * >( 0),
+ bind(
+ & detail::interrupter::reset,
+ intr) );
+ intr.set( thrd);
+ ca();
+ ca.clear();
+ BOOST_ASSERT( ca.empty() );
+ }
+ }
+
+ void create_worker_()
+ {
+ BOOST_ASSERT( ! terminateing_() && ! terminated_() );
+ unique_lock< shared_mutex > lk( mtx_worker_);
+ worker_.insert(
+ worker(
+ bind(
+ & impl::entry_,
+ this) ) );
+ lk.unlock();
+ }
+
+ std::size_t active_() const
+ { return active_worker_; }
+
+ std::size_t size_()
+ { return worker_.size(); }
+
+ void adjust_pool_()
+ {
+ BOOST_ASSERT( ! terminateing_() && ! terminated_() );
+ std::size_t s( size() );
+ if ( s < max_size_ &&
+ adjust_pol_(
+ poolsize( s),
+ core_poolsize( core_size() ),
+ max_poolsize( max_size_),
+ channel_.size(),
+ channel_.full() ) )
+ create_worker_();
+ BOOST_ASSERT( size() <= max_size_ );
+ }
+
+ bool terminated_() const
+ { return state_ == terminated_state; }
+
+ bool terminateing_() const
+ { return state_ == terminateing_state; }
+
+ channel_iterator submit_( channel_item const& itm)
+ { return channel_.put( itm); }
+
+ public:
+ explicit impl(
+ core_poolsize const& core_size,
+ max_poolsize const& max_size,
+ adjustment_policy const& adjust_pol = adjustment_policy() )
+ :
+ core_size_( core_size),
+ max_size_( max_size),
+ worker_(),
+ iidx_( worker_.get< id_idx_tag >() ),
+ active_worker_( 0),
+ mtx_worker_(),
+ state_( active_state),
+ mtx_state_(),
+ channel_(),
+ adjust_pol_( adjust_pol)
+ {
+ if ( core_size > max_size_)
+ throw invalid_poolsize("core poolsize must be less than or equal to max poolsize");
+ channel_.activate();
+ }
+
+ explicit impl(
+ core_poolsize const& core_size,
+ max_poolsize const& max_size,
+ high_watermark const& hwm,
+ low_watermark const& lwm,
+ adjustment_policy const& adjust_pol = adjustment_policy() )
+ :
+ core_size_( core_size),
+ max_size_( max_size),
+ worker_(),
+ iidx_( worker_.get< id_idx_tag >() ),
+ active_worker_( 0),
+ mtx_worker_(),
+ state_( active_state),
+ mtx_state_(),
+ channel_(
+ hwm,
+ lwm),
+ adjust_pol_( adjust_pol)
+ {
+ if ( core_size > max_size_)
+ throw invalid_poolsize("core poolsize must be less than or equal to max poolsize");
+ channel_.activate();
+ }
+
+ explicit impl(
+ preallocate const& pre,
+ core_poolsize const& core_size,
+ max_poolsize const& max_size,
+ adjustment_policy const& adjust_pol = adjustment_policy() )
+ :
+ core_size_( core_size),
+ max_size_( max_size),
+ worker_(),
+ iidx_( worker_.get< id_idx_tag >() ),
+ active_worker_( 0),
+ mtx_worker_(),
+ state_( active_state),
+ mtx_state_(),
+ channel_(),
+ adjust_pol_( adjust_pol)
+ {
+ if ( core_size > max_size_)
+ throw invalid_poolsize("core poolsize must be less than or equal to max poolsize");
+ if ( pre > max_size_)
+ throw invalid_poolsize("preallocated poolsize must be less than or equal to max poolsize");
+ channel_.activate();
+ for ( std::size_t i( 0); i < pre; ++i)
+ create_worker_();
+ }
+
+ explicit impl(
+ preallocate const& pre,
+ core_poolsize const& core_size,
+ max_poolsize const& max_size,
+ high_watermark const& hwm,
+ low_watermark const& lwm,
+ adjustment_policy const& adjust_pol = adjustment_policy() )
+ :
+ core_size_( core_size),
+ max_size_( max_size),
+ worker_(),
+ iidx_( worker_.get< id_idx_tag >() ),
+ active_worker_( 0),
+ mtx_worker_(),
+ state_( active_state),
+ mtx_state_(),
+ channel_(
+ hwm,
+ lwm),
+ adjust_pol_( adjust_pol)
+ {
+ if ( core_size > max_size_)
+ throw invalid_poolsize("core poolsize must be less than or equal to max poolsize");
+ if ( pre > max_size_)
+ throw invalid_poolsize("preallocated poolsize must be less than or equal to max poolsize");
+ channel_.activate();
+ for ( std::size_t i( 0); i < pre; ++i)
+ create_worker_();
+ }
+
+ ~impl()
+ { shutdown(); }
+
+ std::size_t active()
+ {
+ shared_lock< shared_mutex > lk( mtx_worker_);
+ return active_();
+ }
+
+ std::size_t idle()
+ {
+ shared_lock< shared_mutex > lk( mtx_worker_);
+ return size_() - active_();
+ }
+
+ void shutdown()
+ {
+ unique_lock< shared_mutex > lk1( mtx_state_);
+ if ( terminateing_() || terminated_() ) return;
+ state_ = terminateing_state;
+ lk1.unlock();
+
+ unique_lock< shared_mutex > lk2( mtx_worker_);
+ channel_.deactivate();
+ std::vector< worker > lst(
+ worker_.begin(),
+ worker_.end() );
+ worker_.clear();
+ lk2.unlock();
+
+ std::for_each(
+ lst.begin(),
+ lst.end(),
+ std::mem_fun_ref( & worker::join) );
+
+ lk1.lock();
+ state_ = terminated_state;
+ lk1.unlock();
+ }
+
+ const std::vector< callable > shutdown_now()
+ {
+ unique_lock< shared_mutex > lk1( mtx_state_);
+ if ( terminateing_() || terminated_() ) return std::vector< callable >();
+ state_ = terminateing_state;
+ lk1.unlock();
+
+ unique_lock< shared_mutex > lk2( mtx_worker_);
+ channel_.deactivate();
+ std::for_each(
+ worker_.begin(),
+ worker_.end(),
+ std::mem_fun_ref( & worker::interrupt) );
+ std::vector< worker > lst(
+ worker_.begin(),
+ worker_.end() );
+ worker_.clear();
+ std::vector< callable > drain( channel_.drain() );
+ lk2.unlock();
+
+ std::for_each(
+ lst.begin(),
+ lst.end(),
+ std::mem_fun_ref( & worker::join) );
+
+ lk1.lock();
+ state_ = terminated_state;
+ lk1.unlock();
+
+ return drain;
+ }
+
+ std::size_t size()
+ {
+ shared_lock< shared_mutex > lk( mtx_worker_);
+ return size_();
+ }
+
+ std::size_t core_size()
+ {
+ shared_lock< shared_mutex > lk( mtx_worker_);
+ return core_size_;
+ }
+
+ void core_size( std::size_t size)
+ {
+ unique_lock< shared_mutex > lk( mtx_worker_);
+ core_size_ = size;
+ }
+
+ std::size_t max_size()
+ { return max_size_; }
+
+ bool terminated()
+ {
+ shared_lock< shared_mutex > lk( mtx_state_);
+ return terminated_();
+ }
+
+ bool terminateing()
+ {
+ shared_lock< shared_mutex > lk( mtx_state_);
+ return terminateing_();
+ }
+
+ void clear()
+ { channel_.clear(); }
+
+ bool empty()
+ { return channel_.empty(); }
+
+ std::size_t pending()
+ { return channel_.size(); }
+
+ const std::size_t upper_bound()
+ { return channel_.upper_bound(); }
+
+ void upper_bound( std::size_t hwm)
+ { return channel_.upper_bound( hwm); }
+
+ const std::size_t lower_bound()
+ { return channel_.lower_bound(); }
+
+ void lower_bound( std::size_t lwm)
+ { return channel_.lower_bound( lwm); }
+
+ template< typename Act >
+ task< typename result_of< Act() >::type > submit( Act const& act)
+ {
+ shared_lock< shared_mutex > lk1( mtx_state_);
+ if ( terminated_() )
+ throw task_rejected("pool ist terminated");
+ if ( terminateing_() )
+ throw task_rejected("pool ist terminateing");
+
+ adjust_pool_();
+
+ typedef typename result_of< Act() >::type R;
+ detail::interrupter intr;
+ promise< R > prom;
+ channel_item itm(
+ future_wrapper< R >(
+ act,
+ prom),
+ intr);
+ channel_.put( itm);
+ return task< R >( prom, intr);
+ }
+
+ template<
+ typename Act,
+ typename Attr
+ >
+ task< typename result_of< Act() >::type > submit(
+ Act const& act,
+ Attr const& attr)
+ {
+ shared_lock< shared_mutex > lk1( mtx_state_);
+ if ( terminated_() )
+ throw task_rejected("pool ist terminated");
+ if ( terminateing_() )
+ throw task_rejected("pool ist terminateing");
+
+ adjust_pool_();
+
+ typedef typename result_of< Act() >::type R;
+ detail::interrupter intr;
+ promise< R > prom;
+ channel_item itm(
+ future_wrapper< R >(
+ act,
+ prom),
+ attr,
+ intr);
+ channel_.put( itm);
+ return task< R >( prom, intr);
+ }
+
+ template<
+ typename Act,
+ typename T
+ >
+ task< typename result_of< Act() >::type > chained_submit(
+ Act const& act,
+ task< T > & tsk)
+ {
+ shared_lock< shared_mutex > lk1( mtx_state_);
+ if ( terminated_() )
+ throw task_rejected("pool ist terminated");
+ if ( terminateing_() )
+ throw task_rejected("pool ist terminateing");
+
+ adjust_pool_();
+
+ typedef typename result_of< Act() >::type R;
+ detail::interrupter intr;
+ promise< R > prom;
+ channel_item itm(
+ future_wrapper< R >(
+ act,
+ prom),
+ intr);
+ tsk.get_future().add_callback(
+ bind(
+ ( channel_iterator ( channel::*)( channel_item const&) ) & channel::put,
+ ref( channel_),
+ itm) );
+ return task< R >( prom, intr);
+ }
+
+ template<
+ typename Act,
+ typename Attr,
+ typename T
+ >
+ task< typename result_of< Act() >::type > chained_submit(
+ Act const& act,
+ Attr const& attr,
+ task< T > & tsk)
+ {
+ shared_lock< shared_mutex > lk1( mtx_state_);
+ if ( terminated_() )
+ throw task_rejected("pool ist terminated");
+ if ( terminateing_() )
+ throw task_rejected("pool ist terminateing");
+
+ adjust_pool_();
+
+ typedef typename result_of< Act() >::type R;
+ detail::interrupter intr;
+ promise< R > prom;
+ channel_item itm(
+ future_wrapper< R >(
+ act,
+ prom),
+ attr,
+ intr);
+ tsk.get_future().add_callback(
+ bind(
+ ( channel_iterator ( channel::*)( channel_item const&) ) & channel::put,
+ ref( channel_),
+ itm) );
+ return task< R >( prom, intr);
+ }
+
+ template< typename Act >
+ task< typename result_of< Act() >::type > lazy_submit( Act const& act)
+ {
+ shared_lock< shared_mutex > lk1( mtx_state_);
+ if ( terminated_() )
+ throw task_rejected("pool ist terminated");
+ if ( terminateing_() )
+ throw task_rejected("pool ist terminateing");
+
+ adjust_pool_();
+
+ typedef typename result_of< Act() >::type R;
+ detail::interrupter intr;
+ promise< R > prom;
+ future< void > fg( prom.get_needed_future() );
+ channel_item itm(
+ future_wrapper< R >(
+ act,
+ prom),
+ intr);
+ fg.add_callback(
+ bind(
+ ( channel_iterator ( channel::*)( channel_item const&) ) & channel::put,
+ ref( channel_),
+ itm) );
+ return task< R >( prom, intr);
+ }
+
+ template<
+ typename Act,
+ typename Attr
+ >
+ task< typename result_of< Act() >::type > lazy_submit(
+ Act const& act,
+ Attr const& attr)
+ {
+ shared_lock< shared_mutex > lk1( mtx_state_);
+ if ( terminated_() )
+ throw task_rejected("pool ist terminated");
+ if ( terminateing_() )
+ throw task_rejected("pool ist terminateing");
+
+ adjust_pool_();
+
+ typedef typename result_of< Act() >::type R;
+ detail::interrupter intr;
+ promise< R > prom;
+ future< void > fg( prom.get_needed_future() );
+ channel_item itm(
+ future_wrapper< R >(
+ act,
+ prom),
+ attr,
+ intr);
+ fg.add_callback(
+ bind(
+ ( channel_iterator ( channel::*)( channel_item const&) ) & channel::put,
+ ref( channel_),
+ itm) );
+ return task< R >( prom, intr);
+ }
+
+ template<
+ typename Act,
+ typename Tm
+ >
+ task< typename result_of< Act() >::type > timed_submit(
+ Act const& act,
+ Tm const& tm)
+ {
+ shared_lock< shared_mutex > lk1( mtx_state_);
+ if ( terminated_() )
+ throw task_rejected("pool ist terminated");
+ if ( terminateing_() )
+ throw task_rejected("pool ist terminateing");
+
+ adjust_pool_();
+
+ typedef typename result_of< Act() >::type R;
+ detail::interrupter intr;
+ promise< R > prom;
+ channel_item itm(
+ future_wrapper< R >(
+ act,
+ prom),
+ intr);
+ channel_.put( itm, tm);
+ return task< R >( prom, intr);
+ }
+
+ template<
+ typename Act,
+ typename Attr,
+ typename Tm
+ >
+ task< typename result_of< Act() >::type > timed_submit(
+ Act const& act,
+ Attr const& attr,
+ Tm const& tm)
+ {
+ shared_lock< shared_mutex > lk1( mtx_state_);
+ if ( terminated_() )
+ throw task_rejected("pool ist terminated");
+ if ( terminateing_() )
+ throw task_rejected("pool ist terminateing");
+
+ adjust_pool_();
+
+ typedef typename result_of< Act() >::type R;
+ detail::interrupter intr;
+ promise< R > prom;
+ channel_item itm(
+ future_wrapper< R >(
+ act,
+ prom),
+ attr,
+ intr);
+ channel_.put( itm, tm);
+ return task< R >( prom, intr);
+ }
+
+ template<
+ typename Act,
+ typename Tm,
+ typename T
+ >
+ task< typename result_of< Act() >::type > timed_chained_submit(
+ Act const& act,
+ Tm const& tm,
+ task< T > & tsk)
+ {
+ shared_lock< shared_mutex > lk1( mtx_state_);
+ if ( terminated_() )
+ throw task_rejected("pool ist terminated");
+ if ( terminateing_() )
+ throw task_rejected("pool ist terminateing");
+
+ adjust_pool_();
+
+ typedef typename result_of< Act() >::type R;
+ detail::interrupter intr;
+ promise< R > prom;
+ channel_item itm(
+ future_wrapper< R >(
+ act,
+ prom),
+ intr);
+ tsk.get_future().add_callback(
+ bind(
+ ( channel_iterator ( channel::*)( channel_item const&, Tm const&) ) & channel::put,
+ ref( channel_),
+ itm,
+ tm) );
+ return task< R >( prom, intr);
+ }
+
+ template<
+ typename Act,
+ typename Attr,
+ typename Tm,
+ typename T
+ >
+ task< typename result_of< Act() >::type > timed_chained_submit(
+ Act const& act,
+ Attr const& attr,
+ Tm const& tm,
+ task< T > & tsk)
+ {
+ shared_lock< shared_mutex > lk1( mtx_state_);
+ if ( terminated_() )
+ throw task_rejected("pool ist terminated");
+ if ( terminateing_() )
+ throw task_rejected("pool ist terminateing");
+
+ adjust_pool_();
+
+ typedef typename result_of< Act() >::type R;
+ detail::interrupter intr;
+ promise< R > prom;
+ channel_item itm(
+ future_wrapper< R >(
+ act,
+ prom),
+ attr,
+ intr);
+ tsk.get_future().add_callback(
+ bind(
+ ( channel_iterator ( channel::*)( channel_item const&, Tm const&) ) & channel::put,
+ ref( channel_),
+ itm,
+ tm) );
+ return task< R >( prom, intr);
+ }
+ };
+};
+} }
+
+#endif // BOOST_TP_LAZY_H
+
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/lifo.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/lifo.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -0,0 +1,110 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_LIFO_H
+#define BOOST_TP_LIFO_H
+
+#include <cstddef>
+#include <list>
+
+#include <boost/assert.hpp>
+#include <boost/bind.hpp>
+#include <boost/function.hpp>
+#include <boost/future/future.hpp>
+
+#include <boost/tp/detail/interrupter.hpp>
+
+namespace boost { namespace tp
+{
+struct lifo
+{
+ template< typename Callable >
+ class impl
+ {
+ private:
+ typedef Callable callable;
+
+ public:
+ class item
+ {
+ private:
+ callable ca_;
+ detail::interrupter intr_;
+ promise< void > prom_;
+
+ public:
+ item(
+ callable const& ca,
+ detail::interrupter const& intr)
+ : ca_( ca), intr_( intr), prom_()
+ {}
+
+ const callable ca() const
+ { return ca_; }
+
+ const detail::interrupter intr() const
+ { return intr_; }
+
+ promise< void > & prom()
+ { return prom_; }
+ };
+
+ typedef typename std::list< item >::iterator iterator;
+ typedef typename std::list< item >::const_iterator const_iterator;
+
+ private:
+ std::list< item > lst_;
+
+ public:
+ iterator push( item const& itm)
+ {
+ lst_.push_front( itm);
+ return lst_.begin();
+ }
+
+ const item pop()
+ {
+ item itm( lst_.front() );
+ lst_.pop_front();
+ itm.prom().set();
+ return itm;
+ }
+
+ bool erase(
+ iterator & i,
+ future< void > & f)
+ {
+ if ( f.ready() ) return false;
+ item itm( * i);
+ itm.prom().set();
+ lst_.erase( i);
+ BOOST_ASSERT( f.ready() );
+ return true;
+ }
+
+ std::size_t size() const
+ { return lst_.size(); }
+
+ bool empty() const
+ { return lst_.empty(); }
+
+ void clear()
+ { lst_.clear(); }
+
+ const iterator begin()
+ { return lst_.begin(); }
+
+ const const_iterator begin() const
+ { return lst_.begin(); }
+
+ const iterator end()
+ { return lst_.end(); }
+
+ const const_iterator end() const
+ { return lst_.end(); }
+ };
+};
+} }
+
+#endif // BOOST_TP_LIFO_H
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/pool.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/pool.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -0,0 +1,45 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_POOL_H
+#define BOOST_TP_POOL_H
+
+#include <boost/preprocessor/repetition.hpp>
+
+#include <boost/tp/fixed.hpp>
+#include <boost/tp/unbounded_channel.hpp>
+
+namespace boost { namespace tp
+{
+#ifndef BOOST_TP_MAX_ARITY
+# define BOOST_TP_MAX_ARITY 8
+#endif
+
+// Expands as: A0& a0, A1& a1, A2& a2, ... AN& aN
+#define BOOST_TP_CTOR_ARG(z, n, A) \
+ BOOST_PP_CAT(A, n) const& BOOST_PP_CAT(a, n)
+#define BOOST_ENUM_TP_CTOR_ARGS(n, A) BOOST_PP_ENUM(n, BOOST_TP_CTOR_ARG, A)
+
+template<
+ typename Strategy = fixed,
+ typename Channel = unbounded_channel<>
+>
+struct pool
+: public Strategy::template impl< Channel >
+{
+#define BOOST_TP_CTOR(z, n, A) \
+ template< BOOST_PP_ENUM_PARAMS(n, typename A) > \
+ pool(BOOST_ENUM_TP_CTOR_ARGS(n, A)) \
+ : Strategy::template impl< Channel >(BOOST_PP_ENUM_PARAMS(n, a)) \
+ {}
+ BOOST_PP_REPEAT_FROM_TO( 1, BOOST_TP_MAX_ARITY, BOOST_TP_CTOR, A)
+#undef BOOST_TP_CTOR
+};
+
+#undef BOOST_ENUM_TP_CTOR_ARGS
+#undef BOOST_TP_CTOR_ARG
+#undef BOOST_TP_MAX_ARITY
+} }
+
+#endif // BOOST_TP_POOL_H
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/poolsize.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/poolsize.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -0,0 +1,71 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_POOLSIZE_H
+#define BOOST_TP_POOLSIZE_H
+
+#include <cstddef>
+
+#include <boost/tp/exceptions.hpp>
+
+namespace boost { namespace tp
+{
+class core_poolsize
+{
+private:
+ std::size_t value_;
+
+public:
+ explicit core_poolsize( std::size_t value)
+ : value_( value)
+ { if ( value < 0) throw invalid_poolsize("min poolsize must be greater than or equal to zero"); }
+
+ operator std::size_t () const
+ { return value_; }
+};
+
+class max_poolsize
+{
+private:
+ std::size_t value_;
+
+public:
+ explicit max_poolsize( std::size_t value)
+ : value_( value)
+ { if ( value <= 0) throw invalid_poolsize("max poolsize must be greater than zero"); }
+
+ operator std::size_t () const
+ { return value_; }
+};
+
+class poolsize
+{
+private:
+ std::size_t value_;
+
+public:
+ explicit poolsize( std::size_t value)
+ : value_( value)
+ { if ( value < 0) throw invalid_poolsize("core poolsize must be greater than or equal to zero"); }
+
+ operator std::size_t () const
+ { return value_; }
+};
+
+class preallocate
+{
+private:
+ std::size_t value_;
+
+public:
+ explicit preallocate( std::size_t value)
+ : value_( value)
+ { if ( value < 0) throw invalid_poolsize("preallocated poolsize must be greater than or equal to zero"); }
+
+ operator std::size_t () const
+ { return value_; }
+};
+} }
+
+#endif // BOOST_TP_POOLSIZE_H
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/priority.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/priority.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -0,0 +1,148 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_PRIORITY_H
+#define BOOST_TP_PRIORITY_H
+
+#include <cstddef>
+#include <utility>
+
+#include <boost/assert.hpp>
+#include <boost/function.hpp>
+#include <boost/future/future.hpp>
+#include <boost/multi_index_container.hpp>
+#include <boost/multi_index/mem_fun.hpp>
+#include <boost/multi_index/ordered_index.hpp>
+
+#include <boost/tp/detail/interrupter.hpp>
+
+namespace boost { namespace tp
+{
+template<
+ typename Attr,
+ typename Ord = std::less< Attr >
+>
+struct priority
+{
+ template< typename Callable >
+ class impl
+ {
+ private:
+ typedef Attr attribute;
+ typedef Callable callable;
+ typedef Ord ordering;
+
+ public:
+ class item
+ {
+ private:
+ callable ca_;
+ attribute attr_;
+ detail::interrupter intr_;
+ promise< void > prom_;
+
+ public:
+ item(
+ callable const& ca,
+ attribute const& attr,
+ detail::interrupter const& intr)
+ : ca_( ca), attr_( attr), intr_( intr), prom_()
+ {}
+
+ const callable ca() const
+ { return ca_; }
+
+ const attribute attr() const
+ { return attr_; }
+
+ const detail::interrupter intr() const
+ { return intr_; }
+
+ promise< void > & prom()
+ { return prom_; }
+ };
+
+ private:
+ typedef multi_index::multi_index_container<
+ item,
+ multi_index::indexed_by<
+ multi_index::ordered_non_unique<
+ multi_index::const_mem_fun<
+ item,
+ const attribute,
+ & item::attr
+ >,
+ ordering
+ >
+ >
+ > list;
+ typedef typename list::template nth_index< 0 >::type index;
+
+ list lst_;
+ index & idx_;
+
+ public:
+ typedef typename list::iterator iterator;
+ typedef typename list::const_iterator const_iterator;
+
+ impl()
+ :
+ lst_(),
+ idx_( lst_.get< 0 >() )
+ {}
+
+ iterator push( item const& itm)
+ {
+ std::pair< typename index::iterator, bool > p( idx_.insert( itm) );
+ BOOST_ASSERT( p.second);
+ return p.first;
+ }
+
+ const item pop()
+ {
+ iterator i( lst_.begin() );
+ BOOST_ASSERT( i != lst_.end() );
+ item itm( * i);
+ lst_.erase( i);
+ itm.prom().set();
+ return itm;
+ }
+
+ bool erase(
+ iterator & i,
+ future< void > & f)
+ {
+ if ( f.ready() ) return false;
+ item itm( * i);
+ itm.prom().set();
+ idx_.erase( i);
+ BOOST_ASSERT( f.ready() );
+ return true;
+ }
+
+ std::size_t size() const
+ { return lst_.size(); }
+
+ bool empty() const
+ { return lst_.empty(); }
+
+ void clear()
+ { lst_.clear(); }
+
+ const iterator begin()
+ { return lst_.begin(); }
+
+ const const_iterator begin() const
+ { return lst_.begin(); }
+
+ const iterator end()
+ { return lst_.end(); }
+
+ const const_iterator end() const
+ { return lst_.end(); }
+ };
+};
+} }
+
+#endif // BOOST_TP_PRIORITY_H
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/rendezvous_channel.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/rendezvous_channel.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -0,0 +1,514 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_RENDEZVOUS_CHANNEL_H
+#define BOOST_TP_RENDEZVOUS_CHANNEL_H
+
+#include <cstddef>
+#include <list>
+#include <vector>
+
+#include <boost/bind.hpp>
+#include <boost/foreach.hpp>
+#include <boost/function.hpp>
+#include <boost/thread/condition.hpp>
+#include <boost/thread/locks.hpp>
+#include <boost/thread/condition.hpp>
+#include <boost/thread/locks.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/shared_mutex.hpp>
+
+#include <boost/tp/exceptions.hpp>
+#include <boost/tp/fifo.hpp>
+#include <boost/tp/detail/interrupter.hpp>
+#include <boost/tp/detail/lock.hpp>
+
+namespace boost { namespace tp
+{
+template<
+ typename QueueingPolicy = fifo
+>
+class rendezvous_channel
+{
+private:
+ typedef boost::function< void() > callable;
+ typedef QueueingPolicy queueing_policy;
+
+ template< typename M0, typename M1 >
+ class lock_guard
+ : private boost::noncopyable
+ {
+ private:
+ M0 & m0_;
+ M1 & m1_;
+
+ public:
+ lock_guard(
+ M0 & m0,
+ M1 & m1)
+ : m0_( m0), m1_( m1)
+ { detail::lock( m0_, m1_); }
+
+ ~lock_guard()
+ {
+ m0_.unlock();
+ m1_.unlock();
+ }
+ };
+
+ class producer_slot
+ {
+ private:
+ class impl
+ {
+ private:
+ condition_variable cond_;
+ mutex mtx_;
+ callable ca_;
+ detail::interrupter intr_;
+ bool transfered_;
+
+ bool pred_() const
+ { return transfered_; }
+
+ public:
+ impl()
+ :
+ cond_(),
+ mtx_(),
+ ca_(),
+ intr_(),
+ transfered_( false)
+ {}
+
+ impl( callable const& ca)
+ :
+ cond_(),
+ mtx_(),
+ ca_( ca),
+ intr_(),
+ transfered_( false)
+ {}
+
+ void notify()
+ {
+ unique_lock< mutex > lk( mtx_);
+ transfered_ = true;
+ cond_.notify_all();
+ }
+
+ void transfer(
+ callable & ca,
+ detail::interrupter & intr)
+ {
+ unique_lock< mutex > lk( mtx_);
+ ca = ca_;
+ intr = intr_;
+ transfered_ = true;
+ cond_.notify_one();
+ }
+
+ const callable get() const
+ { return ca_; }
+
+ void wait( detail::interrupter const& intr)
+ {
+ unique_lock< mutex > lk( mtx_);
+ intr_ = intr;
+ cond_.wait(
+ lk,
+ bind(
+ & impl::pred_,
+ this) );
+ }
+
+ template< typename timed_type >
+ bool wait(
+ detail::interrupter const& intr,
+ timed_type const& dt)
+ {
+ unique_lock< mutex > lk( mtx_);
+ intr_ = intr;
+ return cond_.timed_wait(
+ lk,
+ dt,
+ bind(
+ & impl::pred_,
+ this) );
+ }
+ };
+
+ shared_ptr< impl > impl_;
+
+ public:
+ producer_slot()
+ : impl_( new impl)
+ {}
+
+ template< typename T >
+ producer_slot( T const& t)
+ : impl_( new impl( t) )
+ {}
+
+ producer_slot( producer_slot const& rh)
+ : impl_( rh.impl_)
+ {}
+
+ producer_slot &
+ operator=( producer_slot const& rh)
+ {
+ if ( & rh == this) return * this;
+ impl_->notify();
+ impl_ = rh.impl_;
+ return * this;
+ }
+
+ void notify()
+ { impl_->notify(); }
+
+ void transfer(
+ callable & ca,
+ detail::interrupter & intr)
+ { impl_->transfer( ca, intr); }
+
+ const callable get() const
+ { return impl_->get(); }
+
+ void wait( detail::interrupter const& intr)
+ { impl_->wait( intr); }
+
+ template< typename timed_type >
+ bool wait(
+ detail::interrupter const& intr,
+ timed_type const& dt)
+ { return impl_->wait( intr, dt); }
+ };
+
+ class consumer_slot
+ {
+ private:
+ class impl
+ {
+ private:
+ condition_variable cond_;
+ mutex mtx_;
+ callable ca_;
+ detail::interrupter intr_;
+ bool transfered_;
+
+ bool pred_() const
+ { return transfered_; }
+
+ public:
+ impl()
+ :
+ cond_(),
+ mtx_(),
+ ca_(),
+ intr_(),
+ transfered_( false)
+ {}
+
+ void notify()
+ {
+ unique_lock< mutex > lk( mtx_);
+ transfered_ = true;
+ cond_.notify_all();
+ }
+
+ void transfer(
+ callable const& ca,
+ detail::interrupter const& intr)
+ {
+ unique_lock< mutex > lk( mtx_);
+ ca_ = ca;
+ intr_ = intr;
+ transfered_ = true;
+ cond_.notify_one();
+ }
+
+ void wait(
+ callable & ca,
+ detail::interrupter & intr)
+ {
+ unique_lock< mutex > lk( mtx_);
+ cond_.wait(
+ lk,
+ bind(
+ & impl::pred_,
+ this) );
+ ca = ca_;
+ intr = intr_;
+ }
+
+ template< typename timed_type >
+ bool wait(
+ callable & ca,
+ detail::interrupter & intr,
+ timed_type const& dt)
+ {
+ unique_lock< mutex > lk( mtx_);
+ if ( cond_.timed_wait(
+ lk,
+ dt,
+ bind(
+ & impl::pred_,
+ this) ) )
+ {
+ ca = ca_;
+ intr = intr_;
+ return true;
+ }
+ return false;
+ }
+ };
+
+ shared_ptr< impl > impl_;
+
+ public:
+ consumer_slot()
+ : impl_( new impl() )
+ {}
+
+ void notify()
+ { impl_->notify(); }
+
+ void transfer(
+ callable const& ca,
+ detail::interrupter const& intr)
+ { impl_->transfer( ca, intr); }
+
+ void wait(
+ callable & ca,
+ detail::interrupter & intr)
+ { impl_->wait( ca, intr); }
+
+ template< typename timed_type >
+ bool wait(
+ callable & ca,
+ detail::interrupter & intr,
+ timed_type const& dt)
+ { return impl_->wait( ca, intr, dt); }
+ };
+
+ typedef typename queueing_policy::template impl<
+ producer_slot
+ > producer_queue;
+ typedef std::list< consumer_slot > consumer_queue;
+
+public:
+ typedef typename producer_queue::item item;
+ typedef typename producer_queue::iterator iterator;
+
+private:
+ bool active_;
+ producer_queue producer_queue_;
+ shared_mutex producer_mtx_;
+ consumer_queue consumer_queue_;
+ shared_mutex consumer_mtx_;
+
+ void activate_()
+ { active_ = true; }
+
+ void clear_()
+ {
+ BOOST_ASSERT( ! active_);
+ producer_queue_.clear();
+ consumer_queue_.clear();
+ BOOST_ASSERT( producer_queue_.empty() );
+ BOOST_ASSERT( consumer_queue_.empty() );
+ }
+
+ void deactivate_()
+ {
+ if ( active_)
+ {
+ active_ = false;
+ BOOST_FOREACH( item itm, producer_queue_)
+ {
+ producer_slot slot( itm.ca() );
+ slot.notify();
+ }
+ BOOST_FOREACH( consumer_slot slot, consumer_queue_)
+ { slot.notify(); }
+ }
+
+ BOOST_ASSERT( ! active_);
+ }
+
+ const std::vector< callable > drain_()
+ {
+ BOOST_ASSERT( ! active_);
+ std::vector< callable > unprocessed;
+ unprocessed.reserve( producer_queue_.size() );
+ BOOST_FOREACH( item itm, producer_queue_)
+ { if ( ! itm.ca().get().empty() ) unprocessed.push_back( itm.ca().get() ); }
+ clear_();
+ return unprocessed;
+ }
+
+ bool empty_() const
+ { return producer_queue_.empty(); }
+
+ std::size_t size_() const
+ { return producer_queue_.size(); }
+
+public:
+ rendezvous_channel()
+ :
+ active_( false),
+ producer_queue_(),
+ producer_mtx_(),
+ consumer_queue_(),
+ consumer_mtx_()
+ {}
+
+ bool active()
+ {
+ lock_guard< shared_mutex, shared_mutex > lk( producer_mtx_, consumer_mtx_);
+ return active_;
+ }
+
+ void activate()
+ {
+ lock_guard< shared_mutex, shared_mutex > lk( producer_mtx_, consumer_mtx_);
+ activate_();
+ }
+
+ void clear()
+ {
+ lock_guard< shared_mutex, shared_mutex > lk( producer_mtx_, consumer_mtx_);
+ clear_();
+ }
+
+ bool deactive()
+ { return ! active(); }
+
+ void deactivate()
+ {
+ lock_guard< shared_mutex, shared_mutex > lk( producer_mtx_, consumer_mtx_);
+ deactivate_();
+ }
+
+ const std::vector< callable > drain()
+ {
+ unique_lock< shared_mutex > lk( producer_mtx_);
+ return drain_();
+ }
+
+ bool empty()
+ {
+ unique_lock< shared_mutex > lk( producer_mtx_);
+ return empty_();
+ }
+
+ bool erase(
+ iterator &,
+ future< void >)
+ { return false; }
+
+ bool full()
+ { return false; }
+
+ std::size_t size()
+ {
+ unique_lock< shared_mutex > lk( producer_mtx_);
+ return size_();
+ }
+
+ iterator put( item const& itm)
+ {
+ unique_lock< shared_mutex > lk1( consumer_mtx_);
+ if ( ! active_) throw task_rejected("channel is not active");
+ if ( ! consumer_queue_.empty() )
+ {
+ consumer_queue_.front().transfer( itm.ca().get(), itm.intr() );
+ consumer_queue_.pop_front();
+ return producer_queue_.end();
+ }
+ lk1.unlock();
+ unique_lock< shared_mutex > lk2( producer_mtx_);
+ producer_queue_.push( itm);
+ lk2.unlock();
+ producer_slot slot( itm.ca() );
+ slot.wait( itm.intr() );
+ return producer_queue_.end();
+ }
+
+ template< typename Tm >
+ iterator put(
+ item const& itm,
+ Tm const& tm)
+ {
+ unique_lock< shared_mutex > lk1( consumer_mtx_);
+ if ( ! active_) throw task_rejected("channel is not active");
+ if ( ! consumer_queue_.empty() )
+ {
+ consumer_queue_.front().transfer( itm.ca().get(), itm.intr() );
+ consumer_queue_.pop_front();
+ return producer_queue_.end();
+ }
+ lk1.unlock();
+ unique_lock< shared_mutex > lk2( producer_mtx_);
+ producer_queue_.push( itm);
+ lk2.unlock();
+ producer_slot slot( itm.ca() );
+ if ( ! slot.wait( itm.intr(), tm) ) throw task_rejected("timed out");
+ return producer_queue_.end();
+ }
+
+ bool take(
+ callable & ca,
+ detail::interrupter & intr)
+ {
+ unique_lock< shared_mutex > lk1( producer_mtx_);
+ if ( ! active_) return false;
+ if ( ! producer_queue_.empty() )
+ {
+ producer_slot slot( producer_queue_.pop().ca() );
+ slot.transfer( ca, intr);
+ return true;
+ }
+ lk1.unlock();
+ consumer_slot slot;
+ unique_lock< shared_mutex > lk2( consumer_mtx_);
+ consumer_queue_.push_back( slot);
+ lk2.unlock();
+ try
+ {
+ slot.wait( ca, intr);
+ return ! ca.empty();
+ }
+ catch ( thread_interrupted const& e)
+ { printf("take: thread_interrupted\n"); return false; }
+ }
+
+ template< typename Tm >
+ bool take(
+ callable & ca,
+ detail::interrupter & intr,
+ Tm const& tm)
+ {
+ unique_lock< shared_mutex > lk1( producer_mtx_);
+ if ( ! active_) return false;
+ if ( ! producer_queue_.empty() )
+ {
+ producer_slot slot( producer_queue_.pop().ca() );
+ slot.transfer( ca, intr);
+ return true;
+ }
+ lk1.unlock();
+ consumer_slot slot;
+ unique_lock< shared_mutex > lk2( consumer_mtx_);
+ consumer_queue_.push_back( slot);
+ lk2.unlock();
+ try
+ { return slot.wait( ca, intr, tm) && ! ca.empty(); }
+ catch ( thread_interrupted const& e)
+ { return false; }
+ }
+};
+} }
+
+#endif // BOOST_TP_RENDEZVOUS_CHANNEL_H
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/smart.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/smart.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -0,0 +1,196 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_SMART_H
+#define BOOST_TP_SMART_H
+
+#include <cstddef>
+
+#include <boost/assert.hpp>
+#include <boost/function.hpp>
+#include <boost/future/future.hpp>
+#include <boost/multi_index_container.hpp>
+#include <boost/multi_index/mem_fun.hpp>
+#include <boost/multi_index/ordered_index.hpp>
+
+#include <boost/tp/detail/interrupter.hpp>
+
+namespace boost { namespace tp
+{
+template<
+ typename Attr,
+ typename Ord,
+ typename Enq,
+ typename Deq
+>
+struct smart
+{
+ template< typename Callable >
+ class impl
+ {
+ private:
+ typedef Attr attribute;
+ typedef Callable callable;
+ typedef Deq dequeue_op;
+ typedef Enq enqueue_op;
+ typedef Ord ordering;
+
+ public:
+ class item
+ {
+ private:
+ callable ca_;
+ attribute attr_;
+ detail::interrupter intr_;
+ promise< void > prom_;
+
+ public:
+ item()
+ : ca_(), attr_(), intr_(), prom_()
+ {}
+
+ item(
+ callable const& ca,
+ attribute const& attr,
+ detail::interrupter const& intr)
+ : ca_( ca), attr_( attr), intr_( intr), prom_()
+ {}
+
+ const callable ca() const
+ { return ca_; }
+
+ const attribute attr() const
+ { return attr_; }
+
+ const detail::interrupter intr() const
+ { return intr_; }
+
+ promise< void > & prom()
+ { return prom_; }
+ };
+
+ private:
+ typedef multi_index::multi_index_container<
+ item,
+ multi_index::indexed_by<
+ multi_index::ordered_non_unique<
+ multi_index::const_mem_fun<
+ item,
+ const attribute,
+ & item::attr
+ >,
+ ordering
+ >
+ >
+ > list;
+ typedef typename list::template nth_index< 0 >::type index;
+
+ list lst_;
+ index & idx_;
+ enqueue_op enq_op_;
+ dequeue_op deq_op_;
+
+ public:
+ typedef typename index::iterator iterator;
+ typedef typename index::const_iterator const_iterator;
+
+ impl(
+ enqueue_op const& enq_op = enqueue_op(),
+ dequeue_op const& deq_op = dequeue_op() )
+ :
+ lst_(),
+ idx_( lst_.get< 0 >() ),
+ enq_op_( enq_op),
+ deq_op_( deq_op)
+ {}
+
+ iterator push( item const& itm)
+ { return enq_op_( idx_, itm); }
+
+ const item pop()
+ {
+ item itm;
+ deq_op_( idx_, itm);
+ itm.prom().set();
+ return itm;
+ }
+
+ bool erase(
+ iterator & i,
+ future< void > & f)
+ {
+ if ( f.ready() ) return false;
+ item itm( * i);
+ itm.prom().set();
+ idx_.erase( i);
+ BOOST_ASSERT( f.ready() );
+ return true;
+ }
+
+ std::size_t size() const
+ { return lst_.size(); }
+
+ bool empty() const
+ { return lst_.empty(); }
+
+ void clear()
+ { lst_.clear(); }
+
+ const iterator begin()
+ { return lst_.begin(); }
+
+ const const_iterator begin() const
+ { return lst_.begin(); }
+
+ const iterator end()
+ { return lst_.end(); }
+
+ const const_iterator end() const
+ { return lst_.end(); }
+ };
+};
+
+struct replace_oldest
+{
+ template<
+ typename Index,
+ typename Item
+ >
+ typename Index::iterator operator()( Index & idx, Item const& itm)
+ {
+ typedef typename Index::iterator iterator;
+ iterator i( idx.find( itm.attr() ) );
+ if ( i == idx.end() )
+ {
+ std::pair< iterator, bool > p( idx.insert( itm) );
+ BOOST_ASSERT( p.second);
+ return p.first;
+ }
+ else
+ {
+ bool result( idx.replace( i, itm) );
+ BOOST_ASSERT( result);
+ return i;
+ }
+ }
+};
+
+struct take_oldest
+{
+ template<
+ typename Index,
+ typename Item
+ >
+ void operator()( Index & idx, Item & itm)
+ {
+ typedef typename Index::iterator iterator;
+ iterator i( idx.begin() );
+ BOOST_ASSERT( i != idx.end() );
+ itm = * i;
+ idx.erase( i);
+ }
+};
+} }
+
+#endif // BOOST_TP_SMART_H
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/task.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/task.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -0,0 +1,62 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_TASK_H
+#define BOOST_TP_TASK_H
+
+#include <boost/assert.hpp>
+#include <boost/function.hpp>
+#include <boost/future/future.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/utility.hpp>
+
+#include <boost/tp/detail/interrupter.hpp>
+
+namespace boost { namespace tp
+{
+template< typename R >
+class task
+{
+private:
+ class impl
+ : private noncopyable
+ {
+ private:
+ boost::future< R > fut_;
+ detail::interrupter intr_;
+
+ public:
+ impl(
+ boost::future< R > const& fut,
+ detail::interrupter const& intr)
+ :
+ fut_( fut),
+ intr_( intr)
+ {}
+
+ boost::future< R > & get_future()
+ { return fut_; }
+
+ void interrupt()
+ { intr_.interrupt(); }
+ };
+
+ shared_ptr< impl > impl_;
+
+public:
+ task(
+ boost::future< R > const& fut,
+ detail::interrupter const& intr)
+ : impl_( new impl( fut, intr) )
+ {}
+
+ boost::future< R > & get_future()
+ { return impl_->get_future(); }
+
+ void interrupt()
+ { impl_->interrupt(); }
+};
+} }
+
+#endif // BOOST_TP_TASK_H
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/unbounded_channel.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/unbounded_channel.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -0,0 +1,261 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_UNBOUNDED_CHANNEL_H
+#define BOOST_TP_UNBOUNDED_CHANNEL_H
+
+#include <cstddef>
+#include <list>
+#include <utility>
+#include <vector>
+
+#include <boost/assert.hpp>
+#include <boost/bind.hpp>
+#include <boost/foreach.hpp>
+#include <boost/function.hpp>
+#include <boost/ref.hpp>
+#include <boost/thread/condition.hpp>
+#include <boost/thread/locks.hpp>
+#include <boost/thread/shared_mutex.hpp>
+
+#include <boost/tp/detail/interrupter.hpp>
+#include <boost/tp/exceptions.hpp>
+#include <boost/tp/fifo.hpp>
+
+namespace boost { namespace tp
+{
+template<
+ typename QueueingPolicy = fifo
+>
+class unbounded_channel
+{
+private:
+ typedef function< void() > callable;
+ typedef QueueingPolicy queueing_policy;
+ typedef typename queueing_policy::template impl<
+ callable
+ > queue;
+
+public:
+ typedef typename queue::item item;
+ typedef typename queue::iterator iterator;
+
+private:
+ bool active_;
+ queue queue_;
+ shared_mutex mtx_;
+ condition not_empty_cond_;
+
+ void activate_()
+ { active_ = true; }
+
+ void clear_()
+ {
+ BOOST_ASSERT( ! active_);
+ queue_.clear();
+ BOOST_ASSERT( queue_.empty() );
+ }
+
+ void deactivate_()
+ {
+ if ( active_)
+ {
+ active_ = false;
+ not_empty_cond_.notify_all();
+ }
+
+ BOOST_ASSERT( ! active_);
+ }
+
+ const std::vector< callable > drain_()
+ {
+ BOOST_ASSERT( ! active_);
+ std::vector< callable > unprocessed;
+ unprocessed.reserve( queue_.size() );
+ BOOST_FOREACH( item itm, queue_)
+ { unprocessed.push_back( itm.ca() ); }
+ clear_();
+ return unprocessed;
+ }
+
+ bool empty_() const
+ { return queue_.empty(); }
+
+ bool erase_(
+ iterator & i,
+ future< void > & f)
+ { return queue_.erase( i, f); }
+
+ std::size_t size_() const
+ { return queue_.size(); }
+
+ iterator put_(
+ item const& itm,
+ unique_lock< shared_mutex > & lk)
+ {
+ if ( ! active_)
+ throw task_rejected("channel is not active");
+ iterator i( queue_.push( itm) );
+ not_empty_cond_.notify_one();
+ return i;
+ }
+
+ bool take_(
+ callable & ca,
+ detail::interrupter & intr,
+ unique_lock< shared_mutex > & lk)
+ {
+ if ( ! active_ && empty_() )
+ return false;
+ try
+ {
+ not_empty_cond_.wait(
+ lk,
+ bind(
+ & unbounded_channel::consumers_activate_,
+ this) );
+ }
+ catch ( thread_interrupted const& e)
+ { return false; }
+ if ( ! active_ && empty_() )
+ return false;
+ item itm( queue_.pop() );
+ ca = itm.ca();
+ intr = itm.intr();
+ return ! ca.empty();
+ }
+
+ template< typename Tm >
+ bool take_(
+ callable & ca,
+ detail::interrupter & intr,
+ Tm const& tm,
+ unique_lock< shared_mutex > & lk)
+ {
+ if ( ! active_ && empty_() )
+ return false;
+ try
+ {
+ if ( ! not_empty_cond_.timed_wait(
+ lk,
+ tm,
+ bind(
+ & unbounded_channel::consumers_activate_,
+ this) ) )
+ return false;
+ }
+ catch ( thread_interrupted const& e)
+ { return false; }
+ if ( ! active_ && empty_() )
+ return false;
+ item itm( queue_.pop() );
+ ca = itm.ca();
+ intr = itm.intr();
+ return ! ca.empty();
+ }
+
+ bool consumers_activate_() const
+ { return ! active_ || ! empty_(); }
+
+public:
+ unbounded_channel()
+ :
+ active_( false),
+ queue_(),
+ mtx_(),
+ not_empty_cond_()
+ {}
+
+ bool active()
+ {
+ shared_lock< shared_mutex > lk( mtx_);
+ return active_;
+ }
+
+ void activate()
+ {
+ unique_lock< shared_mutex > lk( mtx_);
+ activate_();
+ }
+
+ void clear()
+ {
+ unique_lock< shared_mutex > lk( mtx_);
+ clear_();
+ }
+
+ bool deactive()
+ { return ! active(); }
+
+ void deactivate()
+ {
+ unique_lock< shared_mutex > lk( mtx_);
+ deactivate_();
+ }
+
+ const std::vector< callable > drain()
+ {
+ unique_lock< shared_mutex > lk( mtx_);
+ return drain_();
+ }
+
+ bool empty()
+ {
+ shared_lock< shared_mutex > lk( mtx_);
+ return empty_();
+ }
+
+ bool erase(
+ iterator & i,
+ future< void > f)
+ {
+ shared_lock< shared_mutex > lk( mtx_);
+ return erase_( i, f);
+ }
+
+ bool full()
+ { return false; }
+
+ std::size_t size()
+ {
+ shared_lock< shared_mutex > lk( mtx_);
+ return size_();
+ }
+
+ iterator put( item const& itm)
+ {
+ unique_lock< shared_mutex > lk( mtx_);
+ return put_( itm, lk);
+ }
+
+ template< typename Tm >
+ iterator put(
+ item const& itm,
+ Tm const&)
+ {
+ unique_lock< shared_mutex > lk( mtx_);
+ return put_( itm, lk);
+ }
+
+ bool take(
+ callable & ca,
+ detail::interrupter & intr)
+ {
+ unique_lock< shared_mutex > lk( mtx_);
+ return take_( ca, intr, lk);
+ }
+
+ template< typename Tm >
+ bool take(
+ callable & ca,
+ detail::interrupter & intr,
+ Tm const& tm)
+ {
+ unique_lock< shared_mutex > lk( mtx_);
+ return take_( ca, intr, tm, lk);
+ }
+};
+} }
+
+#endif // BOOST_TP_UNBOUNDED_CHANNEL_H
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/watermark.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/watermark.hpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -0,0 +1,49 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_WATER_MARK_H
+#define BOOST_TP_WATER_MARK_H
+
+#include <cstddef>
+
+#include <boost/tp/exceptions.hpp>
+
+namespace boost { namespace tp
+{
+class high_watermark
+{
+private:
+ std::size_t value_;
+
+public:
+ explicit high_watermark( std::size_t value)
+ : value_( value)
+ {
+ if ( value <= 0)
+ throw invalid_watermark("high watermark must be greater than zero");
+ }
+
+ operator std::size_t () const
+ { return value_; }
+};
+
+class low_watermark
+{
+private:
+ std::size_t value_;
+
+public:
+ explicit low_watermark( std::size_t value)
+ : value_( value)
+ {
+ if ( value < 0)
+ throw invalid_watermark("low watermark must be greater than or equal to zero");
+ }
+
+ operator std::size_t () const
+ { return value_; }
+};
+} }
+
+#endif // BOOST_TP_WATER_MARK_H
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/threadpool_example.cpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/threadpool_example.cpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -0,0 +1,27 @@
+// Copyright 2008 Stjepan Rajko.
+// Distributed under the Boost Software License, Version 1.0. (See
+// accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+#include <boost/dataflow/signals/component/storage.hpp>
+
+template<class T>
+class scheduler : public signals::filter<delay, void (double)>
+{
+public:
+ void operator(double x)
+ {
+ out(x);
+ }
+};
+
+int main()
+{
+ using namespace boost::signals;
+
+ storage<void(double)> source;
+
+ source >>=
+
+ return 0;
+}
\ No newline at end of file
Modified: sandbox/SOC/2007/signals/libs/dataflow/test/signals/test_same_type.cpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/test/signals/test_same_type.cpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/test/signals/test_same_type.cpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -17,23 +17,21 @@
: public signals::consumer<Signal2VoidInputs>
{
public:
- typedef void result_type;
-
- Signal2VoidInputs() : result(0) {};
- void operator()()
- {
- result++;
- }
- void AltInput()
- {
- result+=10;
- }
- int GetResult()
- {
- return result;
- }
+ Signal2VoidInputs() : result(0) {};
+ void operator()()
+ {
+ result++;
+ }
+ void AltInput()
+ {
+ result+=10;
+ }
+ int GetResult()
+ {
+ return result;
+ }
private:
- int result;
+ int result;
}; // end class Signal2VoidInputs
//]
Modified: sandbox/SOC/2007/signals/libs/dataflow/test/signals/test_storage.cpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/test/signals/test_storage.cpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/test/signals/test_storage.cpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -43,7 +43,9 @@
floater.close();
floater(1.5f); // change the value in floater
- invoke(floater); // we can also signal floater directly
+ // we can also signal floater directly, which will again cause it to
+ // output its stored value:
+ invoke(floater);
BOOST_CHECK_EQUAL(collector.at<0>(), 1.5f);
//]
}
Modified: sandbox/SOC/2007/signals/libs/glv/src/glv_pimpl_binding_glut.cpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/glv/src/glv_pimpl_binding_glut.cpp (original)
+++ sandbox/SOC/2007/signals/libs/glv/src/glv_pimpl_binding_glut.cpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -285,7 +285,6 @@
void Window::platformShowHide(){ }
void WindowImpl::draw(){
- std::cout << "draw" << std::endl;
if(mWindow->shouldDraw()){
mWindow->glv->drawGLV(mWindow->w, mWindow->h);
glutSwapBuffers();
Modified: sandbox/SOC/2007/signals/libs/glv/src/glv_view.cpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/glv/src/glv_view.cpp (original)
+++ sandbox/SOC/2007/signals/libs/glv/src/glv_view.cpp 2008-08-24 21:21:41 EDT (Sun, 24 Aug 2008)
@@ -57,9 +57,9 @@
newchild.reanchor(op ? w-op->w : w, op ? h-op->h : h);
//if(op) newchild.reanchor(w - op->w, h - op->h);
- newchild.parent = this;
+ newchild.parent = this;
newchild.sibling = 0;
-
+
//newchild.constrainWithinParent(); // keep within the bounds of the parent's rect
if (child == 0) // I didn't have any children until now
@@ -69,7 +69,14 @@
// I have children already... so go to the end and add there
// default behaviour is to add at the end of the list, to be drawn last
View * lastChild = child;
- while (lastChild->sibling != 0) lastChild = lastChild->sibling;
+ if(lastChild==&newchild)
+ return;
+ while (lastChild->sibling != 0)
+ {
+ lastChild = lastChild->sibling;
+ if(lastChild==&newchild)
+ return;
+ }
lastChild->sibling = &newchild;
}
}
Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk