/////////////////////////////////////////////////////////////////////////////// // TITLE: Simulation of multi-threaded disk cache // AUTHOR: Romulo Pinho // CONTACT: // VisionLab, Physics Dept. // University of Antwerp, Belgium // romulo.pinho@ua.ac.be // // DESCRIPTION: // The code simulates the behaviour of a cache system implemented to // optimise off-core image processing (IP) operations on large image files // represented as a collection of image slices forming a 3d volume. The idea // behind the cache is that of a sliding window that only "sees" the region // of the image loaded in memory. As the image processing algorithm traverses // the image in a well-known pattern, the window slides accordingly, freeing // parts of the cache no longer necessary and asynchronously prefetching // from disk other parts of the image to be used in the future, according to // the traversal pattern. // // The cache here represented allows efficient execution of sequential // traversal patterns. The cache is subdivided into blocks. The central block // holds the region of the image being accessed by the IP algorithm. Next and // previous blocks hold neighbour regions to be accessed in the future, whether // moving backward or forward in the traversal. The scheme below illustrates // the idea // 1) Start: F = first, C = central, L = last block // ------------------------- // | F | ... | C | ... | L | // ------------------------- // 2) Movnig forward through the image: rotate blocks, discard current block F, // update pointers, and prefetch the new block L. Prefetching occurs in a // separate thread, thus not blocking the IP algorithm while reading data from // C. // ------------------------- // | L | F | ... | C | ... | // ------------------------- // 3) Keep on moving forward and scheduling prefetches. If the prefecth takes // too long, pointer C may reach a block that is not yet ready // to be processed. The IP algorithm must stop and wait for the block update // to complete. // ------------------------- // | ... | L | F | ... | C | // ------------------------- // // The thread synchronization scheme guarantees that a block read is // only allowed on blocks not running a prefetch. Since this prefetch is of a // block to be used in the future there is no per-pixel buffer // access synchronization. The access pattern itself makes sure that the current // read will not be overrun by an update. // // All cache blocks were implemented as Active Objects. At this moment, // this is unecessary because all disk access are serialized. In the future, // parallel reads might be allowed and the Active Objects will pay-off. As // such, all blocks remain in a conditioned wait until the cache system // signals that a block needs to be updated. The signaled block is then released, // starts its update, and suspends possible reads until the update is finished. // // PROBLEMS: // This program was originally written in VC++ 6.0, using WinAPI's thread // library. For the purpose of performance evaluations, I made the code multiplatform // and decided to use boost for the threaded code. After translating the code from // Win32 to boost, I immediately encountered deadlock problems. Because of the // complexity of the original program, I wrote the simplified version below to // trace them. I eventually discovered that one or some of the Active Objects // (threaded cache blocks) never leave their wait state, even after signaling its // release with boost::condition::notify_one. With traces, I could verify that // the block enters the wait state and that the signal is indeed triggered. Strangely // enough, I later discovered that adding a system-related call (eg printf, sleep, ...) // before entering the wait state would normalise the behaviour. // I could not reproduce the deadlock using Windows threads nor pthreads so far. The // code for the three different libraries is also implemented here. Removing the // #define _USE_BOOST directive automatically enables pthreads in Linux and the // Windows API in windows. Compilation was in Suse 10.3 with KDevelop 3.5.7 on top // of g++ 4.2.1 and in WinXP Home SP2 with VC++ 6.0. The traces are represented // by printf commands in the relevant points of the code. // // #include #include #include #include #ifdef _WIN32 #include #else #define Sleep(x) usleep(x*1000) #endif #pragma warning (disable: 4355) //#define _USE_BOOST #if defined _USE_BOOST #pragma warning (disable: 4275) #pragma warning (disable: 4251) #include #include #include using namespace boost; #elif !defined _WIN32 #include #endif // _USE_BOOST #include using namespace std; const unsigned int gTh = 1; const unsigned int gTr = 30; // simulated disk read time of one image slice in miliseconds class Block { public: Block(size_t index, size_t s); ~Block(); void Read(); // simulate a pixel read from the cache block void TriggerUpdate(); // release the block from its waiting state void Update(); // synchronized update void Update2(); // non-synchronized update size_t Index() {return mIndex;}; bool Stopped() {return mStop;}; private: class AsyncFunc // thread function { public: AsyncFunc(Block* pMain) : mpMain(pMain) {}; ~AsyncFunc() {}; #if defined _USE_BOOST void operator()() { #elif defined _WIN32 static DWORD WINAPI func(LPVOID ptr) { Block* mpMain = (Block*)ptr; #else static void* func(void* ptr) { Block* mpMain = (Block*)ptr; #endif //printf("AsyncFunc(%d) - IN\n", mpMain->Index()); while (!mpMain->Stopped()) { // calls the synchronized update and wait mpMain->Update(); } //printf("AsyncFunc(%d) - OUT\n", mpMain->Index()); #ifndef _USE_BOOST return 0; #endif }; private: Block* mpMain; }; size_t mIndex; size_t mSize; bool mExec2; bool mStop; AsyncFunc mThreadFunc; #if defined _USE_BOOST boost::thread mThread; boost::mutex mMutex2; boost::condition mCond2; boost::mutex mMutex1; boost::condition mCond1; static boost::mutex mDataMutex; #elif defined _WIN32 HANDLE mThread; HANDLE mMutex2; HANDLE mCond2; HANDLE mMutex1; HANDLE mCond1; static HANDLE mDataMutex; #else pthread_t mThread; pthread_mutex_t mMutex2; pthread_cond_t mCond2; pthread_mutex_t mMutex1; pthread_cond_t mCond1; static pthread_mutex_t mDataMutex; #endif void Wait2(); // wait for update signal void Exec2(); // actual execution of update void Release2(); // release update void Wait1(); // wait for read signal void Exec1(); // actual execution of read void Release1(); // release read }; // the data mutex serialises the update commands executed from // different threads. #ifdef _USE_BOOST boost::mutex Block::mDataMutex; #elif defined _WIN32 HANDLE Block::mDataMutex = CreateMutex(NULL, FALSE, NULL); #else pthread_mutex_t Block::mDataMutex = PTHREAD_MUTEX_INITIALIZER; #endif #if defined _USE_BOOST Block::Block(size_t index, size_t s) : mIndex(index), mSize(s), mExec2(false), mStop(false), mThreadFunc(this), mThread(mThreadFunc) { } #elif defined _WIN32 Block::Block(size_t index, size_t s) : mIndex(index), mSize(s), mExec2(false), mStop(false), mThreadFunc(this) { mThread = CreateThread(NULL, 0, AsyncFunc::func, this, 0, NULL); mMutex2 = CreateMutex(NULL, FALSE, NULL); mCond2 = CreateEvent(NULL, FALSE, FALSE, NULL); mMutex1 = CreateMutex(NULL, FALSE, NULL); mCond1 = CreateEvent(NULL, FALSE, FALSE, NULL); } #else Block::Block(size_t index, size_t s) : mIndex(index), mSize(s), mExec2(false), mStop(false), mThreadFunc(this) { pthread_create(&mThread, NULL, AsyncFunc::func, this); pthread_mutex_init(&mMutex2, NULL); pthread_cond_init(&mCond2, NULL); pthread_mutex_init(&mMutex1, NULL); pthread_cond_init(&mCond1, NULL); } #endif Block::~Block() { //printf("Stopping(%d)\n", mIndex); mStop = true; Release2(); #if defined _USE_BOOST mThread.join(); #elif defined _WIN32 WaitForSingleObject(mThread, INFINITE); CloseHandle(mMutex2); CloseHandle(mCond2); CloseHandle(mMutex1); CloseHandle(mCond1); #else pthread_join(mThread, NULL); pthread_mutex_destroy(&mMutex2); pthread_cond_destroy(&mCond2); pthread_mutex_destroy(&mMutex1); pthread_cond_destroy(&mCond1); #endif } void Block::Read() { // wait until the block is ready to be accessed, // ie until an updates has finished Wait1(); // execute related operation Exec1(); } void Block::TriggerUpdate() { // we must release the update thread if it is waiting. Release2(); } void Block::Update() { // wait until the block is released to run the update Wait2(); if (!Stopped()) { // if the process hasn't been interrupted, run // the update function. //printf("BlockReleased(%d)\n", Index()); Exec2(); } } void Block::Update2() { Exec2(); } void Block::Wait2() { // block execution of block's update opertation. // this function basically blocks the running // thread until an external event in the read // in the read thread releases it. //printf("WaitBufferReady(%d) - IN\n", mIndex); //system("echo off"); #if defined _USE_BOOST // ATTENTION: // with boost, the threads are only working // with the Sleep command below (or apparently // any other system-related call executed before the wait, // such as the //printf() above). basically, // the "notify" signaled in Block::Release2() // does not release the update thread. the // result is that the "wait" never finishes, // causing a deadlock when the program reaches // the "wait" call in Block::Read(). //Sleep(1); boost::mutex::scoped_lock lk(mMutex1, true); while (!mExec2) mCond2.wait(lk); #elif defined _WIN32 WaitForSingleObject(mCond2, INFINITE); #else pthread_mutex_lock(&mMutex1); while (!mExec2) pthread_cond_wait(&mCond2, &mMutex1); pthread_mutex_unlock(&mMutex1); #endif //printf("WaitBufferReady(%d) - OUT\n", mIndex); } void Block::Exec2() { #ifdef _USE_BOOST boost:mutex::scoped_lock lk(mDataMutex); #elif defined _WIN32 WaitForSingleObject(mDataMutex, INFINITE); #else pthread_mutex_lock(&mDataMutex); #endif // simulate execution of block's // update operation. //printf("Update(%d) - IN\n", mIndex); Sleep(mSize*gTr); //printf("Update(%d) - OUT\n", mIndex); #ifdef _USE_BOOST lk.unlock(); #elif defined _WIN32 ReleaseMutex(mDataMutex); #else pthread_mutex_unlock(&mDataMutex); #endif mExec2 = false; Release1(); } void Block::Release2() { // release the execution of block's update // operation. //printf("TriggerUpdate(%d) - IN\n", mIndex); mExec2 = true; #if defined _USE_BOOST mCond2.notify_one(); #elif defined _WIN32 SetEvent(mCond2); #else int sig = pthread_cond_signal(&mCond2); //printf("Sginal(%d) = %d\n", mIndex, sig); #endif //printf("TriggerUpdate(%d) - OUT\n", mIndex); } void Block::Wait1() { // suspend execution of block's read operation. if (mExec2) { // if the update operation is running on // the separate thread, we will wait until // it completes. //printf("WaitToRead(%d) - IN\n", mIndex); #if defined _USE_BOOST boost::mutex::scoped_lock lk(mMutex1); while (mExec2) mCond1.wait(lk); #elif defined _WIN32 while (mExec2) WaitForSingleObject(mCond1, INFINITE); #else pthread_mutex_lock(&mMutex1); while (mExec2) pthread_cond_wait(&mCond1, &mMutex1); pthread_mutex_unlock(&mMutex1); #endif //printf("WaitToRead(%d) - OUT\n", mIndex); } } void Block::Exec1() { // simulation of block's read operation //Sleep(gTh); } void Block::Release1() { // release the execution of block's // read operation. #if defined _USE_BOOST mCond1.notify_one(); #elif defined _WIN32 SetEvent(mCond1); #else pthread_cond_signal(&mCond1); #endif } /////////////////////////////////////////////////////////////////////////// class Cache { public: Cache(size_t num, size_t size, int sz); ~Cache(); void Read(int z); void Read2(int z); private: vector mBlocks; size_t mNumBlocks; size_t mSize; size_t mBlockSize; int mSz; size_t mC, mF, mL; }; Cache::Cache(size_t num, size_t size, int sz) : mBlocks(num*2 + 1), mNumBlocks(num), mSize(size), mSz(sz) { // create list of blocks, set the pointers to // centre, first, and last and instantiate // each block with its correponding size mF = 0; mC = num; mL = mBlocks.size() - 1; mBlockSize = mSize/mBlocks.size(); for (size_t i = 0; i < mBlocks.size(); i++) mBlocks[i] = new Block(i, mBlockSize); } Cache::~Cache() { for (size_t i = 0; i < mBlocks.size(); i++) delete mBlocks[i]; } void Cache::Read(int z) { static int prev_z = 0; if (z % mBlockSize == 0 && prev_z != z) { // whenever block boundary has been reached, // update pointers of circular block buffer. mC = (mC + 1) % mBlocks.size(); mF = (mF + 1) % mBlocks.size(); mL = (mL + 1) % mBlocks.size(); // the last block is now free to be // asynchronously updated. mBlocks[mL]->TriggerUpdate(); prev_z = z; } mBlocks[mC]->Read(); } void Cache::Read2(int z) { static int prev_z = 0; if (z % ((mNumBlocks+1)*mBlockSize) == 0 && prev_z != z) { // this second version of read invalidates the entire cache, // synchronously update the central block, and triggers the // update of the other blocks. mC = mNumBlocks; mF = 0; mL = mBlocks.size() - 1; mBlocks[mC]->Update2(); for (size_t i = 0; i < mBlocks.size(); i++) { if (i != mC) mBlocks[i]->TriggerUpdate(); } prev_z = z; } else if (z % mBlockSize == 0 && prev_z != z) { // whenever block boundary has been reached, // update pointers of circular block buffer. mC = (mC + 1) % mBlocks.size(); prev_z = z; } mBlocks[mC]->Read(); } /////////////////////////////////////////////////////////////////////////////// int main(int argc, char *argv[]) { int nb = 3; // # blocks in cache (radius) int sc = 128; // # slices in cache int sz = 997; // # slices to process int sxy = 4*4; // size of each slice Cache cache(nb, sc, sz); printf("begin\n"); clock_t c = times(NULL); for (int z = 0; z < sz; z++) { //printf("slice %03d\r\n", z); //if (z%sc == 0) printf("slice %03d\r\n", z); for (int xy = 0; xy < sxy; xy++) { cache.Read(z); } } double t = (double)(times(NULL) - c)/sysconf(_SC_CLK_TCK); printf("end\n"); printf("t = %lf; fps = %lf\n", t, sz/t); return 0; }