mt_thread.cpp

Go to the documentation of this file.
00001 /*
00002  * Copyright (C) Sergey P. Derevyago, 2008-2009.
00003  *
00004  * Permission to copy, use, modify, sell and distribute this software is granted
00005  * provided this copyright notice appears in all copies.
00006  * This software is provided "as is" without express or implied warranty, and
00007  * with no claim as to its suitability for any purpose.
00008  *
00009  */
00010 
00016 #include <ders/thread.hpp>
00017 
00018 namespace {
00019 
00020 using namespace ders;
00021 
00022 struct long_mutex : public mutex {
00023        bool locked;
00024        sh_mutex mtx;
00025        sh_cond_var cv;
00026 
00027        long_mutex(mem_pool& mp);
00028        ~long_mutex();
00029 
00030        virtual void lock();
00031        virtual void unlock();
00032 
00033        virtual void destroy(mem_pool& mp2) { destroy_this(this, mp2); }
00034 };
00035 
00036 long_mutex::long_mutex(mem_pool& mp) :
00037   locked(false), mtx(new_mutex(mp)), cv(new_cond_var(mp, *mtx))
00038 {
00039 }
00040 
00041 long_mutex::~long_mutex()
00042 {
00043  assert(!locked);
00044 }
00045 
00046 void long_mutex::lock()
00047 {
00048  auto_mutex amt(*mtx);
00049 
00050  while (locked)
00051        cv->wait();
00052 
00053  locked=true;
00054 }
00055 
00056 void long_mutex::unlock()
00057 {
00058  auto_mutex amt(*mtx);
00059 
00060  assert(locked);
00061  locked=false;
00062  cv->notify();
00063 }
00064 
00065 }  // unnamed
00066 
00067 namespace ders {  // ::ders
00068 
00069 sh_mutex new_long_mutex(mem_pool& mp)
00070 {
00071  mp_newbuf<long_mutex> buf(mp);
00072  return sh_mutex(buf.pool(), buf.rls(::new(buf.get()) long_mutex(mp)));
00073 }
00074 
00075 barrier::barrier(mem_pool& mp, int cnt) :
00076   mtx(new_mutex(mp)), cv1(new_cond_var(mp, *mtx)), cv2(new_cond_var(mp, *mtx)),
00077   maxCnt(cnt), wtCnt(0), relCnt(0), ret(false), blnr(0)
00078 {
00079  assert(maxCnt>=0);
00080 }
00081 
00082 barrier::~barrier()
00083 {
00084  assert(wtCnt==0);
00085 }
00086 
00087 int barrier::inc_count()
00088 {
00089  auto_mutex amt(*mtx);
00090  return ++maxCnt;
00091 }
00092 
00093 int barrier::dec_count()
00094 {
00095  auto_mutex amt(*mtx);
00096  assert(wtCnt<maxCnt);
00097 
00098  relCnt=--maxCnt;
00099  cv1->notify_all();
00100 
00101  return maxCnt;
00102 }
00103 
00104 int barrier::get_count()
00105 {
00106  auto_mutex amt(*mtx);
00107  assert(maxCnt>=0);
00108  return maxCnt;
00109 }
00110 
00111 bool barrier::wait()
00112 {
00113  auto_mutex amt(*mtx);
00114  assert(maxCnt>0);
00115 
00116  while (wtCnt>0 && relCnt>0)
00117        cv2->wait();
00118 
00119  assert(wtCnt<maxCnt);
00120  if (++wtCnt==maxCnt) {
00121     barrier_listener::action act=barrier_listener::release_all_true;
00122     if (blnr) {
00123        try { act=blnr->on_release(*this); }
00124        catch (...) { hard_assert(false); }
00125     }
00126 
00127     switch (act) {
00128            case barrier_listener::release_one_false: {
00129                 if (relCnt<maxCnt) relCnt++;
00130 
00131                 break;
00132            }
00133            case barrier_listener::release_all_false: {
00134                 relCnt=maxCnt;
00135                 cv1->notify_all();
00136 
00137                 break;
00138            }
00139            case barrier_listener::release_all_true: {
00140                 relCnt=maxCnt;
00141                 ret=true;
00142                 cv1->notify_all();
00143 
00144                 break;
00145            }
00146     }
00147  }
00148 
00149  for (;;) {
00150      if (relCnt>0) {
00151         relCnt--;
00152         break;
00153      }
00154 
00155      cv1->wait();
00156  }
00157 
00158  bool sav=ret;
00159  assert(wtCnt>0);
00160  if (--wtCnt==0) ret=false;
00161 
00162  if (wtCnt==0 || relCnt==0) cv2->notify_all();
00163 
00164  return sav;
00165 }
00166 
00167 void barrier::release_one()
00168 {
00169  auto_mutex amt(*mtx);
00170 
00171  if (relCnt<maxCnt) relCnt++;
00172  cv1->notify();
00173 }
00174 
00175 void barrier::release_all()
00176 {
00177  auto_mutex amt(*mtx);
00178 
00179  relCnt=maxCnt;
00180  cv1->notify_all();
00181 }
00182 
00183 barrier_listener* barrier::set_listener(barrier_listener* bl)
00184 {
00185  barrier_listener* ret=blnr;
00186  blnr=bl;
00187 
00188  return ret;
00189 }
00190 
00191 sh_barrier new_barrier(mem_pool& mp, int cnt)
00192 {
00193  mp_newbuf<barrier> buf(mp);
00194  return sh_barrier(buf.pool(), buf.rls(::new(buf.get()) barrier(mp, cnt)));
00195 }
00196 
00197 namespace detail {  // ::ders::detail
00198 
00199 thr_core::thr_core(void (*st)(void*), void* ar) :
00200   strt(st), arg(ar), ownmp(0, 1), mtx(new_mutex(ownmp)), cv(new_cond_var(ownmp,
00201   *mtx)), refs(0), rng(true)
00202 {
00203 }
00204 
00205 void thr_core::inc_refs()
00206 {
00207  auto_mutex amt(*mtx);
00208  refs++;
00209 }
00210 
00211 void thr_core::dec_refs(bool parent)
00212 {
00213  bool del=false;
00214 
00215  if (parent) {
00216     auto_mutex amt(*mtx);
00217     del=(--refs==0 && !rng);
00218  }
00219  else {
00220       auto_mutex amt(*mtx);
00221 
00222       del=(--refs==0);
00223 
00224       clear_rng();
00225       cv->notify_all();
00226  }
00227 
00228  if (del) delete this;
00229 }
00230 
00231 void thr_core::clear_rng()
00232 {
00233  rng=false;
00234 }
00235 
00236 void thr_core::start()
00237 {
00238  tc_ptr child(false, this);
00239  strt(arg);
00240 }
00241 
00242 bool thr_core::is_running()
00243 {
00244  auto_mutex amt(*mtx);
00245  return rng;
00246 }
00247 
00248 void thr_core::join()
00249 {
00250  auto_mutex amt(*mtx);
00251  while (rng) cv->wait();
00252 }
00253 
00254 }  // namespace ::ders::detail
00255 
00256 }  // namespace ::ders
00257 

Generated on Tue Dec 8 11:35:32 2009 for derslib by  doxygen 1.5.5