Boost logo

Boost Users :

Subject: Re: [Boost-users] Mutex synchronised wrapper class for boost::interprocess vector of strings
From: Riskybiz (riskybizLive_at_[hidden])
Date: 2013-06-28 08:26:47


"It's not an easy task. You should catch the exception and then notify all
connected processes that should disconnected from the segmenet (as only
offline growing is supported). After that a process can call "grow"

to try extend the segment. Then, all processes can attach to the newly grown
segment.

 

Think of that as a complex binary file that must be extended and internal
structures changed. All users should close the file, the file should be
changed by a process and then all users can start using the file again."

 

 

 

OK Ion, I see.

 

1. Can I ask what mechanism you would suggest for notifying the other
processes to disconnect before the grow() operation.

 

2. My wrapper class implementation is still in the experimental
stages, I'm trying to build it up to operate in the way I need. I've hacked
it around a bit to account for the fact that I cannot control the order in
which the client & server instances will ultimately be launched by the 3rd
party application I'm building a DLL for; so I've got to make the wrapper
class as flexible as possible. What I'm trying to do is get my toy client
process to wait, if it should happen to launch first, before the server, and
then have the clients notified by the server process when data is ready to
be accessed. So....I can get the client to wait using a named_condition,
but then I can't get it to wake up. The notifyall() is not taking effect.
P.S. In my tests I first launch the client and then the server process.
Have I missed something fundamental here, is my notifyall() properly
constructed and directed to the right destination?

 

3. Also, another thought; if the shared memory segment should happen
to grow whilst a client is waiting then the client will need to remap the
segment on waking, correct? But will it even get the notifyall() signal,
because the client is still 'looking' at a different memory location? Could
this be my problem? Not sure. Though I did test the code without the
grow() commands and the client process still did not wake!

 

Many, many thanks for your help so far, just a few more hurdles to clear and
this wrapper class will be operational.

 

Riskybiz.

 

#ifndef SHARED_MEMORY_WRAPPER_H//if not defined already

#define SHARED_MEMORY_WRAPPER_H//then define it

 

#include <boost/interprocess/managed_shared_memory.hpp>

#include <boost/interprocess/containers/vector.hpp>

#include <boost/interprocess/containers/string.hpp>

#include <boost/interprocess/allocators/allocator.hpp>

#include <boost/interprocess/sync/named_mutex.hpp>

#include <boost/interprocess/sync/named_condition.hpp>

 

#include <iostream>

#include <stdexcept>

 

using namespace boost::interprocess;

 

class SharedMemoryWrapper

{

public:

    SharedMemoryWrapper(const std::string &name, bool server) :
m_name(name), m_server(server)

    {

                                try

                                {

                                                //Open the shared segment.
Will throw an interprocess exception if fails to open i.e. segment does not
exist. Control jumps to the catch block.

                                                m_segment = new
managed_shared_memory(open_only, m_name.c_str());

                                                //Find the vector using the
c-string name and open it

                                                sharedSegmentVector =
m_segment->find<MyShmStringVector>("sharedSegmentVector").first;

                                                //Open the underlying mutex

                                                m_mutex = new
named_mutex(open_only, "smw_mutex");

                                                //Open the underlying
condition variable

                                                waitForData = new
named_condition(open_only, "smw_named_condition");

 

                                                if(m_server)//Flag to put
client processes to sleep until ready for them

                                                {

                                                                sleep =
false;

                                                }

                                                else

                                                {

                                                                sleep =
true;

                                                }

 

                                }

                                catch(interprocess_exception &ipe)//Catch
the interprocess exception and instantiate the shared memory resources here.

                                {

                                                try

                                                {

                                                std::cout << "Error:
interprocess_exception: " << ipe.what() << "Going to instantiate shared
memory resources now...." << std::endl;

                                                //Clean up resources from
any prior instances

 
named_mutex::remove("smw_mutex");

 
named_condition::remove("smw_named_condition");

 
shared_memory_object::remove(m_name.c_str());

         

                                                //Create shared memory

                                                m_segment = new
managed_shared_memory(create_only, m_name.c_str(), 100000);

                                                

                                                //Create allocators

                                                CharAllocator
charallocator(m_segment->get_segment_manager());

                                                StringAllocator
stringallocator(m_segment->get_segment_manager());

                                                

                                                //This vector is fully
constructed in shared memory. All pointers

                                                //buffers are constructed in
the same shared memory segment

                                                //This vector can be safely
accessed from other processes.

                                                sharedSegmentVector =
m_segment->construct<MyShmStringVector>("sharedSegmentVector")(stringallocat
or);

 

                                                if(m_server)//Flag to put
client processes to sleep until ready for them

                                                {

                                                                sleep =
false;

                                                }

                                                else

                                                {

                                                                sleep =
true;

                                                }

                                                

                                                //Instantiate the underlying
mutex

                                                m_mutex = new
named_mutex(create_only, "smw_mutex");

                                                //Instantiate the underlying
condition variable

                                                waitForData = new
named_condition(create_only, "smw_named_condition");

                                                //......despite the
exception code continues executing to the end of the constructor.

                                                }

                                catch(interprocess_exception &ipe1)

                                                {

                                                std::cout << "Error: Whilst
instantiating shared memory resources: interprocess_exception: " <<
ipe1.what() << std::endl;

                                                }

                                                

                                }

                                catch(...)

                                {

                                                std::cout << "Error: Unknown
Exception: " << std::endl;

                                }

                                

                }

 

                

    ~SharedMemoryWrapper()

    {

       /*

                   //Destructor: Will remove the shared memory resources

                   if (m_server)

        {

            named_mutex::remove("named_mutex");

            //Destroy the vector from the managed_shared_memory

 
//m_segment->destroy_ptr(sharedSegmentVector);//This will free all strings
that the vector contains

 
m_segment->destroy<MyShmStringVector>("sharedSegmentVector");//Destroy the
vector

                                                //Is it necessary to destroy
the allocators here?

                                                

                                                //Remove the
managed_shared_memory; this may fail if the memory does not exist or is
mapped or opened by another process

 
shared_memory_object::remove(m_name.c_str());

                                

                                } */

                                //Delete instance members

        delete m_mutex;

        delete m_segment;

                                delete waitForData;

                }

                

    //Writer

                void write(const std::string &inStr)

    {

                                try

                                {

                                CharAllocator
charallocator(m_segment->get_segment_manager());

                                scoped_lock<named_mutex> lock(*m_mutex);

                                try

                                {

                                if(sleep)//Put client process to sleep when
it first tries to access.

                                                {

                                                                std::cout <<
"Going to wait...." << std::endl;

                                                                sleep =
false;

 
waitForData->wait(lock);//Set condition variable to wait. When ready to
allow access then call notify().

                                                }

                                }

                                catch(interprocess_exception &ex)

                                {

                                                std::cout << "Error in
Writer Process Wait: interprocess_exception: "<< ex.what() << std::endl;

                                }

                                

                                 MyShmString shmStr(inStr.c_str(),
charallocator);

                                sharedSegmentVector->push_back(shmStr);

                                

                                 

                                 //Try to grow if free memory is running low

                                if(m_segment->get_free_memory() < 50000)

                                {

                                                std::cout << "Free Memory
Before: " << m_segment->get_free_memory() << " Elements: " << getSize() <<
std::endl;

                                                delete m_mutex;//delete

                                                delete waitForData;//delete

                                                delete m_segment;//unmap

 
managed_shared_memory::grow(m_name.c_str(), 50000);//grow

                                                m_segment = new
managed_shared_memory(open_only, m_name.c_str());//remap

                                                sharedSegmentVector =
m_segment->find<MyShmStringVector>("sharedSegmentVector").first;//find

                                                //Reopen the underlying
mutex

                                                m_mutex = new
named_mutex(open_only, "smw_mutex");

                                                //Reopen the underlying
condition variable

                                                waitForData = new
named_condition(open_only, "smw_named_condition");

                                                std::cout << "Free Memory
After Grow: " << m_segment->get_free_memory() << " Elements: " << getSize()
<< std::endl;

                                }

                

                                 }

                                catch(interprocess_exception ipe)

                                {

                                std::cout << "Error in
SharedMemoryWrapper::write(): interprocess_exception: "<< ipe.what() <<
std::endl;

                                }

                                catch(...)

                                {

                                std::cout << "Error in
SharedMemoryWrapper::write(): unknown error" << std::endl;

                                }

                }

 

                //Reader

                std::string read(size_t &index)

                {

                                try

                                {

                                scoped_lock<named_mutex> lock(*m_mutex);

                                

                                 try

                                {

                                   if(sleep)//Put client process to sleep
when it first tries to access.

                                                {

                                                                std::cout <<
"Going to wait...." << std::endl;

                                                                sleep =
false;

 
waitForData->wait(lock);//Set condition variable to wait. When ready to
allow access then call notify().

                                                }

                                }

                                catch(interprocess_exception &ex)

                                {

                                                std::cout << "Error in
Reader Process Wait: interprocess_exception: "<< ex.what() << std::endl;

                                }

 

                                const MyShmString &shmStr =
(*sharedSegmentVector)[index];

                                std::string stdStr(shmStr.begin(),
shmStr.end());

                                return stdStr;

                                }

                                

                                catch(interprocess_exception &ipe)

                                {

                                std::cout << "Error in
SharedMemoryWrapper::read(): interprocess_exception: "<< ipe.what() <<
std::endl;

                                }

                                catch(std::range_error &re)

                                {

                                std::cout << "Error in
SharedMemoryWrapper::read(): range_error: " << re.what() << std::endl;

                                }

                                catch(...)

                                {

                                std::cout << "Error in
SharedMemoryWrapper::read(): unknown error "<< std::endl;

                                }

                                return "Error in
SharedMemoryWrapper::read()";

                }

 

                //Notifier

                void notify()

                {

                                try

                                {

                                scoped_lock<named_mutex> lock(*m_mutex);

                                std::cout << "Going to Notify...." <<
std::endl;

                                lock.unlock();

                                waitForData->notify_all();//notify to
release the wait condition; when data is available and ready to be accessed
by client processes

                                std::cout << "Notified" << std::endl;

                                }

                                catch(interprocess_exception &ipe)

                                {

                                std::cout << "Error: boost_interprocess
exception: in notifier: " << ipe.what() << std::endl;

                                }

                                catch(...)

                                {

                                std::cout << "Unknown Error In Notifier: "
<< std::endl;

                                }

                }

                

                //Vector Size

                size_t getSize() const

                {

                                return this->sharedSegmentVector->size();

                }

 

                //Remover

                void release()

                {

                                try

                                {

                                std::cout << "Going to release shared memory
resources...." << std::endl;

                                named_mutex::remove("named_mutex");

 
named_condition::remove("smw_named_condition");

                                //Destroy the vector from the
managed_shared_memory

 
//m_segment->destroy_ptr(sharedSegmentVector);//This will free all strings
that the vector contains

 
m_segment->destroy<MyShmStringVector>("sharedSegmentVector");//Destroy the
vector

                                //Is it necessary to destroy the allocators
here?

                                //Remove the managed_shared_memory; this may
fail if the memory does not exist or is mapped or opened by another process

 
shared_memory_object::remove(m_name.c_str());

                                std::cout << "Resources Released" <<
std::endl;

                                }

                                catch(interprocess_exception ipe2)

                                {

                                                std::cout << "Error:
remove() interprocess_exception: " << ipe2.what() << std::endl;

                                }

                }

 

private:

    //Typedefs

                typedef boost::interprocess::allocator<char,
managed_shared_memory::segment_manager> CharAllocator;

                typedef boost::interprocess::basic_string<char,
std::char_traits<char>, CharAllocator> MyShmString;

                typedef boost::interprocess::allocator<MyShmString,
managed_shared_memory::segment_manager> StringAllocator;

                typedef boost::interprocess::vector<MyShmString,
StringAllocator> MyShmStringVector;

                

                bool m_server;

    std::string m_name;

                bool sleep;

 

                managed_shared_memory *m_segment;

                MyShmStringVector *sharedSegmentVector;

                named_mutex *m_mutex;

                named_condition *waitForData;

};//class

#endif//header guard

 

Server program to prepare the shared vector of strings:

 

// PrepareSharedMemoryVectorOfStrings.cpp : Defines the entry point for the
console application.

//

 

#include "SharedMemoryWrapper.h"

#include <iostream>

#include <string>

#include <sstream>

 

int main(int argc, char *argv[])

{

                SharedMemoryWrapper *smw = new SharedMemoryWrapper("SMT",
true);

                                

                size_t cnt(0);

                

                while(cnt < 100)

                {

                                //Convert number to string representation

                                std::string cntStr =
static_cast<std::ostringstream*>( &(std::ostringstream() << cnt) )->str();

                                //Write to shared memory vector

                                smw->write(cntStr);

                                cnt++;

                }

                

                size_t vecSize = smw->getSize();

                std::cout << "Elements Present: "<< vecSize << std::endl;

                smw->notify();

                //smw->release();

                delete smw;

                

                return 0;

}

 

Client program to access the shared memory vector of strings:

 

// AccessSharedMemoryVectorOfStrings.cpp : Defines the entry point for the
console application.

//

 

#include "SharedMemoryWrapper.h"

#include <iostream>

#include <string>

 

int main(int argc, char *argv[])

{

                SharedMemoryWrapper *smw = new SharedMemoryWrapper("SMT",
false);

                

                try

                {

                std::string outStr("Blank");

 

                //if(smw->getSize() == 100)

                //{

                                outStr = "";

 

                                for(size_t index = 0; index < 100; index++)

                                {

                                outStr += (smw->read(index) + " ");

                                }

                

                //}

                std::cout << outStr << std::endl;

                }

                catch(...)

                {

                std::cout << "Error: Unknown Exception." << std::endl;

                }

                

                smw->release();//removes shared memory resources from the
system

                delete smw;

                

                return 0;

}



Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net