|
Boost Users : |
Subject: Re: [Boost-users] Mutex synchronised wrapper class for boost::interprocess vector of strings
From: Riskybiz (riskybizLive_at_[hidden])
Date: 2013-06-28 08:26:47
"It's not an easy task. You should catch the exception and then notify all
connected processes that should disconnected from the segmenet (as only
offline growing is supported). After that a process can call "grow"
to try extend the segment. Then, all processes can attach to the newly grown
segment.
Think of that as a complex binary file that must be extended and internal
structures changed. All users should close the file, the file should be
changed by a process and then all users can start using the file again."
OK Ion, I see.
1. Can I ask what mechanism you would suggest for notifying the other
processes to disconnect before the grow() operation.
2. My wrapper class implementation is still in the experimental
stages, I'm trying to build it up to operate in the way I need. I've hacked
it around a bit to account for the fact that I cannot control the order in
which the client & server instances will ultimately be launched by the 3rd
party application I'm building a DLL for; so I've got to make the wrapper
class as flexible as possible. What I'm trying to do is get my toy client
process to wait, if it should happen to launch first, before the server, and
then have the clients notified by the server process when data is ready to
be accessed. So....I can get the client to wait using a named_condition,
but then I can't get it to wake up. The notifyall() is not taking effect.
P.S. In my tests I first launch the client and then the server process.
Have I missed something fundamental here, is my notifyall() properly
constructed and directed to the right destination?
3. Also, another thought; if the shared memory segment should happen
to grow whilst a client is waiting then the client will need to remap the
segment on waking, correct? But will it even get the notifyall() signal,
because the client is still 'looking' at a different memory location? Could
this be my problem? Not sure. Though I did test the code without the
grow() commands and the client process still did not wake!
Many, many thanks for your help so far, just a few more hurdles to clear and
this wrapper class will be operational.
Riskybiz.
#ifndef SHARED_MEMORY_WRAPPER_H//if not defined already
#define SHARED_MEMORY_WRAPPER_H//then define it
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/containers/vector.hpp>
#include <boost/interprocess/containers/string.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/sync/named_mutex.hpp>
#include <boost/interprocess/sync/named_condition.hpp>
#include <iostream>
#include <stdexcept>
using namespace boost::interprocess;
class SharedMemoryWrapper
{
public:
SharedMemoryWrapper(const std::string &name, bool server) :
m_name(name), m_server(server)
{
try
{
//Open the shared segment.
Will throw an interprocess exception if fails to open i.e. segment does not
exist. Control jumps to the catch block.
m_segment = new
managed_shared_memory(open_only, m_name.c_str());
//Find the vector using the
c-string name and open it
sharedSegmentVector =
m_segment->find<MyShmStringVector>("sharedSegmentVector").first;
//Open the underlying mutex
m_mutex = new
named_mutex(open_only, "smw_mutex");
//Open the underlying
condition variable
waitForData = new
named_condition(open_only, "smw_named_condition");
if(m_server)//Flag to put
client processes to sleep until ready for them
{
sleep =
false;
}
else
{
sleep =
true;
}
}
catch(interprocess_exception &ipe)//Catch
the interprocess exception and instantiate the shared memory resources here.
{
try
{
std::cout << "Error:
interprocess_exception: " << ipe.what() << "Going to instantiate shared
memory resources now...." << std::endl;
//Clean up resources from
any prior instances
named_mutex::remove("smw_mutex");
named_condition::remove("smw_named_condition");
shared_memory_object::remove(m_name.c_str());
//Create shared memory
m_segment = new
managed_shared_memory(create_only, m_name.c_str(), 100000);
//Create allocators
CharAllocator
charallocator(m_segment->get_segment_manager());
StringAllocator
stringallocator(m_segment->get_segment_manager());
//This vector is fully
constructed in shared memory. All pointers
//buffers are constructed in
the same shared memory segment
//This vector can be safely
accessed from other processes.
sharedSegmentVector =
m_segment->construct<MyShmStringVector>("sharedSegmentVector")(stringallocat
or);
if(m_server)//Flag to put
client processes to sleep until ready for them
{
sleep =
false;
}
else
{
sleep =
true;
}
//Instantiate the underlying
mutex
m_mutex = new
named_mutex(create_only, "smw_mutex");
//Instantiate the underlying
condition variable
waitForData = new
named_condition(create_only, "smw_named_condition");
//......despite the
exception code continues executing to the end of the constructor.
}
catch(interprocess_exception &ipe1)
{
std::cout << "Error: Whilst
instantiating shared memory resources: interprocess_exception: " <<
ipe1.what() << std::endl;
}
}
catch(...)
{
std::cout << "Error: Unknown
Exception: " << std::endl;
}
}
~SharedMemoryWrapper()
{
/*
//Destructor: Will remove the shared memory resources
if (m_server)
{
named_mutex::remove("named_mutex");
//Destroy the vector from the managed_shared_memory
//m_segment->destroy_ptr(sharedSegmentVector);//This will free all strings
that the vector contains
m_segment->destroy<MyShmStringVector>("sharedSegmentVector");//Destroy the
vector
//Is it necessary to destroy
the allocators here?
//Remove the
managed_shared_memory; this may fail if the memory does not exist or is
mapped or opened by another process
shared_memory_object::remove(m_name.c_str());
} */
//Delete instance members
delete m_mutex;
delete m_segment;
delete waitForData;
}
//Writer
void write(const std::string &inStr)
{
try
{
CharAllocator
charallocator(m_segment->get_segment_manager());
scoped_lock<named_mutex> lock(*m_mutex);
try
{
if(sleep)//Put client process to sleep when
it first tries to access.
{
std::cout <<
"Going to wait...." << std::endl;
sleep =
false;
waitForData->wait(lock);//Set condition variable to wait. When ready to
allow access then call notify().
}
}
catch(interprocess_exception &ex)
{
std::cout << "Error in
Writer Process Wait: interprocess_exception: "<< ex.what() << std::endl;
}
MyShmString shmStr(inStr.c_str(),
charallocator);
sharedSegmentVector->push_back(shmStr);
//Try to grow if free memory is running low
if(m_segment->get_free_memory() < 50000)
{
std::cout << "Free Memory
Before: " << m_segment->get_free_memory() << " Elements: " << getSize() <<
std::endl;
delete m_mutex;//delete
delete waitForData;//delete
delete m_segment;//unmap
managed_shared_memory::grow(m_name.c_str(), 50000);//grow
m_segment = new
managed_shared_memory(open_only, m_name.c_str());//remap
sharedSegmentVector =
m_segment->find<MyShmStringVector>("sharedSegmentVector").first;//find
//Reopen the underlying
mutex
m_mutex = new
named_mutex(open_only, "smw_mutex");
//Reopen the underlying
condition variable
waitForData = new
named_condition(open_only, "smw_named_condition");
std::cout << "Free Memory
After Grow: " << m_segment->get_free_memory() << " Elements: " << getSize()
<< std::endl;
}
}
catch(interprocess_exception ipe)
{
std::cout << "Error in
SharedMemoryWrapper::write(): interprocess_exception: "<< ipe.what() <<
std::endl;
}
catch(...)
{
std::cout << "Error in
SharedMemoryWrapper::write(): unknown error" << std::endl;
}
}
//Reader
std::string read(size_t &index)
{
try
{
scoped_lock<named_mutex> lock(*m_mutex);
try
{
if(sleep)//Put client process to sleep
when it first tries to access.
{
std::cout <<
"Going to wait...." << std::endl;
sleep =
false;
waitForData->wait(lock);//Set condition variable to wait. When ready to
allow access then call notify().
}
}
catch(interprocess_exception &ex)
{
std::cout << "Error in
Reader Process Wait: interprocess_exception: "<< ex.what() << std::endl;
}
const MyShmString &shmStr =
(*sharedSegmentVector)[index];
std::string stdStr(shmStr.begin(),
shmStr.end());
return stdStr;
}
catch(interprocess_exception &ipe)
{
std::cout << "Error in
SharedMemoryWrapper::read(): interprocess_exception: "<< ipe.what() <<
std::endl;
}
catch(std::range_error &re)
{
std::cout << "Error in
SharedMemoryWrapper::read(): range_error: " << re.what() << std::endl;
}
catch(...)
{
std::cout << "Error in
SharedMemoryWrapper::read(): unknown error "<< std::endl;
}
return "Error in
SharedMemoryWrapper::read()";
}
//Notifier
void notify()
{
try
{
scoped_lock<named_mutex> lock(*m_mutex);
std::cout << "Going to Notify...." <<
std::endl;
lock.unlock();
waitForData->notify_all();//notify to
release the wait condition; when data is available and ready to be accessed
by client processes
std::cout << "Notified" << std::endl;
}
catch(interprocess_exception &ipe)
{
std::cout << "Error: boost_interprocess
exception: in notifier: " << ipe.what() << std::endl;
}
catch(...)
{
std::cout << "Unknown Error In Notifier: "
<< std::endl;
}
}
//Vector Size
size_t getSize() const
{
return this->sharedSegmentVector->size();
}
//Remover
void release()
{
try
{
std::cout << "Going to release shared memory
resources...." << std::endl;
named_mutex::remove("named_mutex");
named_condition::remove("smw_named_condition");
//Destroy the vector from the
managed_shared_memory
//m_segment->destroy_ptr(sharedSegmentVector);//This will free all strings
that the vector contains
m_segment->destroy<MyShmStringVector>("sharedSegmentVector");//Destroy the
vector
//Is it necessary to destroy the allocators
here?
//Remove the managed_shared_memory; this may
fail if the memory does not exist or is mapped or opened by another process
shared_memory_object::remove(m_name.c_str());
std::cout << "Resources Released" <<
std::endl;
}
catch(interprocess_exception ipe2)
{
std::cout << "Error:
remove() interprocess_exception: " << ipe2.what() << std::endl;
}
}
private:
//Typedefs
typedef boost::interprocess::allocator<char,
managed_shared_memory::segment_manager> CharAllocator;
typedef boost::interprocess::basic_string<char,
std::char_traits<char>, CharAllocator> MyShmString;
typedef boost::interprocess::allocator<MyShmString,
managed_shared_memory::segment_manager> StringAllocator;
typedef boost::interprocess::vector<MyShmString,
StringAllocator> MyShmStringVector;
bool m_server;
std::string m_name;
bool sleep;
managed_shared_memory *m_segment;
MyShmStringVector *sharedSegmentVector;
named_mutex *m_mutex;
named_condition *waitForData;
};//class
#endif//header guard
Server program to prepare the shared vector of strings:
// PrepareSharedMemoryVectorOfStrings.cpp : Defines the entry point for the
console application.
//
#include "SharedMemoryWrapper.h"
#include <iostream>
#include <string>
#include <sstream>
int main(int argc, char *argv[])
{
SharedMemoryWrapper *smw = new SharedMemoryWrapper("SMT",
true);
size_t cnt(0);
while(cnt < 100)
{
//Convert number to string representation
std::string cntStr =
static_cast<std::ostringstream*>( &(std::ostringstream() << cnt) )->str();
//Write to shared memory vector
smw->write(cntStr);
cnt++;
}
size_t vecSize = smw->getSize();
std::cout << "Elements Present: "<< vecSize << std::endl;
smw->notify();
//smw->release();
delete smw;
return 0;
}
Client program to access the shared memory vector of strings:
// AccessSharedMemoryVectorOfStrings.cpp : Defines the entry point for the
console application.
//
#include "SharedMemoryWrapper.h"
#include <iostream>
#include <string>
int main(int argc, char *argv[])
{
SharedMemoryWrapper *smw = new SharedMemoryWrapper("SMT",
false);
try
{
std::string outStr("Blank");
//if(smw->getSize() == 100)
//{
outStr = "";
for(size_t index = 0; index < 100; index++)
{
outStr += (smw->read(index) + " ");
}
//}
std::cout << outStr << std::endl;
}
catch(...)
{
std::cout << "Error: Unknown Exception." << std::endl;
}
smw->release();//removes shared memory resources from the
system
delete smw;
return 0;
}
Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net