“It's not an easy task. You should catch the exception and then notify all connected processes that should disconnected from the segmenet (as only offline growing is supported). After that a process can call "grow"
to try extend the segment. Then, all processes can attach to the newly grown segment.
Think of that as a complex binary file that must be extended and internal structures changed. All users should close the file, the file should be changed by a process and then all users can start using the file again.”
OK Ion, I see.
1. Can I ask what mechanism you would suggest for notifying the other processes to disconnect before the grow() operation.
2. My wrapper class implementation is still in the experimental stages, I’m trying to build it up to operate in the way I need. I’ve hacked it around a bit to account for the fact that I cannot control the order in which the client & server instances will ultimately be launched by the 3rd party application I’m building a DLL for; so I’ve got to make the wrapper class as flexible as possible. What I’m trying to do is get my toy client process to wait, if it should happen to launch first, before the server, and then have the clients notified by the server process when data is ready to be accessed. So……..I can get the client to wait using a named_condition, but then I can’t get it to wake up. The notifyall() is not taking effect. P.S. In my tests I first launch the client and then the server process. Have I missed something fundamental here, is my notifyall() properly constructed and directed to the right destination?
3. Also, another thought; if the shared memory segment should happen to grow whilst a client is waiting then the client will need to remap the segment on waking, correct? But will it even get the notifyall() signal, because the client is still ‘looking’ at a different memory location? Could this be my problem? Not sure. Though I did test the code without the grow() commands and the client process still did not wake!
Many, many thanks for your help so far, just a few more hurdles to clear and this wrapper class will be operational.
Riskybiz.
#ifndef SHARED_MEMORY_WRAPPER_H//if not defined already
#define SHARED_MEMORY_WRAPPER_H//then define it
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/containers/vector.hpp>
#include <boost/interprocess/containers/string.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/sync/named_mutex.hpp>
#include <boost/interprocess/sync/named_condition.hpp>
#include <iostream>
#include <stdexcept>
using namespace boost::interprocess;
class SharedMemoryWrapper
{
public:
SharedMemoryWrapper(const std::string &name, bool server) : m_name(name), m_server(server)
{
try
{
//Open the shared segment. Will throw an interprocess exception if fails to open i.e. segment does not exist. Control jumps to the catch block.
m_segment = new managed_shared_memory(open_only, m_name.c_str());
//Find the vector using the c-string name and open it
sharedSegmentVector = m_segment->find<MyShmStringVector>("sharedSegmentVector").first;
//Open the underlying mutex
m_mutex = new named_mutex(open_only, "smw_mutex");
//Open the underlying condition variable
waitForData = new named_condition(open_only, "smw_named_condition");
if(m_server)//Flag to put client processes to sleep until ready for them
{
sleep = false;
}
else
{
sleep = true;
}
}
catch(interprocess_exception &ipe)//Catch the interprocess exception and instantiate the shared memory resources here.
{
try
{
std::cout << "Error: interprocess_exception: " << ipe.what() << "Going to instantiate shared memory resources now...." << std::endl;
//Clean up resources from any prior instances
named_mutex::remove("smw_mutex");
named_condition::remove("smw_named_condition");
shared_memory_object::remove(m_name.c_str());
//Create shared memory
m_segment = new managed_shared_memory(create_only, m_name.c_str(), 100000);
//Create allocators
CharAllocator charallocator(m_segment->get_segment_manager());
StringAllocator stringallocator(m_segment->get_segment_manager());
//This vector is fully constructed in shared memory. All pointers
//buffers are constructed in the same shared memory segment
//This vector can be safely accessed from other processes.
sharedSegmentVector = m_segment->construct<MyShmStringVector>("sharedSegmentVector")(stringallocator);
if(m_server)//Flag to put client processes to sleep until ready for them
{
sleep = false;
}
else
{
sleep = true;
}
//Instantiate the underlying mutex
m_mutex = new named_mutex(create_only, "smw_mutex");
//Instantiate the underlying condition variable
waitForData = new named_condition(create_only, "smw_named_condition");
//......despite the exception code continues executing to the end of the constructor.
}
catch(interprocess_exception &ipe1)
{
std::cout << "Error: Whilst instantiating shared memory resources: interprocess_exception: " << ipe1.what() << std::endl;
}
}
catch(...)
{
std::cout << "Error: Unknown Exception: " << std::endl;
}
}
~SharedMemoryWrapper()
{
/*
//Destructor: Will remove the shared memory resources
if (m_server)
{
named_mutex::remove("named_mutex");
//Destroy the vector from the managed_shared_memory
//m_segment->destroy_ptr(sharedSegmentVector);//This will free all strings that the vector contains
m_segment->destroy<MyShmStringVector>("sharedSegmentVector");//Destroy the vector
//Is it necessary to destroy the allocators here?
//Remove the managed_shared_memory; this may fail if the memory does not exist or is mapped or opened by another process
shared_memory_object::remove(m_name.c_str());
} */
//Delete instance members
delete m_mutex;
delete m_segment;
delete waitForData;
}
//Writer
void write(const std::string &inStr)
{
try
{
CharAllocator charallocator(m_segment->get_segment_manager());
scoped_lock<named_mutex> lock(*m_mutex);
try
{
if(sleep)//Put client process to sleep when it first tries to access.
{
std::cout << "Going to wait...." << std::endl;
sleep = false;
waitForData->wait(lock);//Set condition variable to wait. When ready to allow access then call notify().
}
}
catch(interprocess_exception &ex)
{
std::cout << "Error in Writer Process Wait: interprocess_exception: "<< ex.what() << std::endl;
}
MyShmString shmStr(inStr.c_str(), charallocator);
sharedSegmentVector->push_back(shmStr);
//Try to grow if free memory is running low
if(m_segment->get_free_memory() < 50000)
{
std::cout << "Free Memory Before: " << m_segment->get_free_memory() << " Elements: " << getSize() << std::endl;
delete m_mutex;//delete
delete waitForData;//delete
delete m_segment;//unmap
managed_shared_memory::grow(m_name.c_str(), 50000);//grow
m_segment = new managed_shared_memory(open_only, m_name.c_str());//remap
sharedSegmentVector = m_segment->find<MyShmStringVector>("sharedSegmentVector").first;//find
//Reopen the underlying mutex
m_mutex = new named_mutex(open_only, "smw_mutex");
//Reopen the underlying condition variable
waitForData = new named_condition(open_only, "smw_named_condition");
std::cout << "Free Memory After Grow: " << m_segment->get_free_memory() << " Elements: " << getSize() << std::endl;
}
}
catch(interprocess_exception ipe)
{
std::cout << "Error in SharedMemoryWrapper::write(): interprocess_exception: "<< ipe.what() << std::endl;
}
catch(...)
{
std::cout << "Error in SharedMemoryWrapper::write(): unknown error" << std::endl;
}
}
//Reader
std::string read(size_t &index)
{
try
{
scoped_lock<named_mutex> lock(*m_mutex);
try
{
if(sleep)//Put client process to sleep when it first tries to access.
{
std::cout << "Going to wait...." << std::endl;
sleep = false;
waitForData->wait(lock);//Set condition variable to wait. When ready to allow access then call notify().
}
}
catch(interprocess_exception &ex)
{
std::cout << "Error in Reader Process Wait: interprocess_exception: "<< ex.what() << std::endl;
}
const MyShmString &shmStr = (*sharedSegmentVector)[index];
std::string stdStr(shmStr.begin(), shmStr.end());
return stdStr;
}
catch(interprocess_exception &ipe)
{
std::cout << "Error in SharedMemoryWrapper::read(): interprocess_exception: "<< ipe.what() << std::endl;
}
catch(std::range_error &re)
{
std::cout << "Error in SharedMemoryWrapper::read(): range_error: " << re.what() << std::endl;
}
catch(...)
{
std::cout << "Error in SharedMemoryWrapper::read(): unknown error "<< std::endl;
}
return "Error in SharedMemoryWrapper::read()";
}
//Notifier
void notify()
{
try
{
scoped_lock<named_mutex> lock(*m_mutex);
std::cout << "Going to Notify...." << std::endl;
lock.unlock();
waitForData->notify_all();//notify to release the wait condition; when data is available and ready to be accessed by client processes
std::cout << "Notified" << std::endl;
}
catch(interprocess_exception &ipe)
{
std::cout << "Error: boost_interprocess exception: in notifier: " << ipe.what() << std::endl;
}
catch(...)
{
std::cout << "Unknown Error In Notifier: " << std::endl;
}
}
//Vector Size
size_t getSize() const
{
return this->sharedSegmentVector->size();
}
//Remover
void release()
{
try
{
std::cout << "Going to release shared memory resources...." << std::endl;
named_mutex::remove("named_mutex");
named_condition::remove("smw_named_condition");
//Destroy the vector from the managed_shared_memory
//m_segment->destroy_ptr(sharedSegmentVector);//This will free all strings that the vector contains
m_segment->destroy<MyShmStringVector>("sharedSegmentVector");//Destroy the vector
//Is it necessary to destroy the allocators here?
//Remove the managed_shared_memory; this may fail if the memory does not exist or is mapped or opened by another process
shared_memory_object::remove(m_name.c_str());
std::cout << "Resources Released" << std::endl;
}
catch(interprocess_exception ipe2)
{
std::cout << "Error: remove() interprocess_exception: " << ipe2.what() << std::endl;
}
}
private:
//Typedefs
typedef boost::interprocess::allocator<char, managed_shared_memory::segment_manager> CharAllocator;
typedef boost::interprocess::basic_string<char, std::char_traits<char>, CharAllocator> MyShmString;
typedef boost::interprocess::allocator<MyShmString, managed_shared_memory::segment_manager> StringAllocator;
typedef boost::interprocess::vector<MyShmString, StringAllocator> MyShmStringVector;
bool m_server;
std::string m_name;
bool sleep;
managed_shared_memory *m_segment;
MyShmStringVector *sharedSegmentVector;
named_mutex *m_mutex;
named_condition *waitForData;
};//class
#endif//header guard
Server program to prepare the shared vector of strings:
// PrepareSharedMemoryVectorOfStrings.cpp : Defines the entry point for the console application.
//
#include "SharedMemoryWrapper.h"
#include <iostream>
#include <string>
#include <sstream>
int main(int argc, char *argv[])
{
SharedMemoryWrapper *smw = new SharedMemoryWrapper("SMT", true);
size_t cnt(0);
while(cnt < 100)
{
//Convert number to string representation
std::string cntStr = static_cast<std::ostringstream*>( &(std::ostringstream() << cnt) )->str();
//Write to shared memory vector
smw->write(cntStr);
cnt++;
}
size_t vecSize = smw->getSize();
std::cout << "Elements Present: "<< vecSize << std::endl;
smw->notify();
//smw->release();
delete smw;
return 0;
}
Client program to access the shared memory vector of strings:
// AccessSharedMemoryVectorOfStrings.cpp : Defines the entry point for the console application.
//
#include "SharedMemoryWrapper.h"
#include <iostream>
#include <string>
int main(int argc, char *argv[])
{
SharedMemoryWrapper *smw = new SharedMemoryWrapper("SMT", false);
try
{
std::string outStr("Blank");
//if(smw->getSize() == 100)
//{
outStr = "";
for(size_t index = 0; index < 100; index++)
{
outStr += (smw->read(index) + " ");
}
//}
std::cout << outStr << std::endl;
}
catch(...)
{
std::cout << "Error: Unknown Exception." << std::endl;
}
smw->release();//removes shared memory resources from the system
delete smw;
return 0;
}