00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00016 #include <ders/thread.hpp>
00017
00018 namespace {
00019
00020 using namespace ders;
00021
00022 struct long_mutex : public mutex {
00023 bool locked;
00024 sh_mutex mtx;
00025 sh_cond_var cv;
00026
00027 long_mutex(mem_pool& mp);
00028 ~long_mutex();
00029
00030 virtual void lock();
00031 virtual void unlock();
00032
00033 virtual void destroy(mem_pool& mp2) { destroy_this(this, mp2); }
00034 };
00035
00036 long_mutex::long_mutex(mem_pool& mp) :
00037 locked(false), mtx(new_mutex(mp)), cv(new_cond_var(mp, *mtx))
00038 {
00039 }
00040
00041 long_mutex::~long_mutex()
00042 {
00043 assert(!locked);
00044 }
00045
00046 void long_mutex::lock()
00047 {
00048 auto_mutex amt(*mtx);
00049
00050 while (locked)
00051 cv->wait();
00052
00053 locked=true;
00054 }
00055
00056 void long_mutex::unlock()
00057 {
00058 auto_mutex amt(*mtx);
00059
00060 assert(locked);
00061 locked=false;
00062 cv->notify();
00063 }
00064
00065 }
00066
00067 namespace ders {
00068
00069 sh_mutex new_long_mutex(mem_pool& mp)
00070 {
00071 mp_newbuf<long_mutex> buf(mp);
00072 return sh_mutex(buf.pool(), buf.rls(::new(buf.get()) long_mutex(mp)));
00073 }
00074
00075 barrier::barrier(mem_pool& mp, int cnt) :
00076 mtx(new_mutex(mp)), cv1(new_cond_var(mp, *mtx)), cv2(new_cond_var(mp, *mtx)),
00077 maxCnt(cnt), wtCnt(0), relCnt(0), ret(false), blnr(0)
00078 {
00079 assert(maxCnt>=0);
00080 }
00081
00082 barrier::~barrier()
00083 {
00084 assert(wtCnt==0);
00085 }
00086
00087 int barrier::inc_count()
00088 {
00089 auto_mutex amt(*mtx);
00090 return ++maxCnt;
00091 }
00092
00093 int barrier::dec_count()
00094 {
00095 auto_mutex amt(*mtx);
00096 assert(wtCnt<maxCnt);
00097
00098 relCnt=--maxCnt;
00099 cv1->notify_all();
00100
00101 return maxCnt;
00102 }
00103
00104 int barrier::get_count()
00105 {
00106 auto_mutex amt(*mtx);
00107 assert(maxCnt>=0);
00108 return maxCnt;
00109 }
00110
00111 bool barrier::wait()
00112 {
00113 auto_mutex amt(*mtx);
00114 assert(maxCnt>0);
00115
00116 while (wtCnt>0 && relCnt>0)
00117 cv2->wait();
00118
00119 assert(wtCnt<maxCnt);
00120 if (++wtCnt==maxCnt) {
00121 barrier_listener::action act=barrier_listener::release_all_true;
00122 if (blnr) {
00123 try { act=blnr->on_release(*this); }
00124 catch (...) { hard_assert(false); }
00125 }
00126
00127 switch (act) {
00128 case barrier_listener::release_one_false: {
00129 if (relCnt<maxCnt) relCnt++;
00130
00131 break;
00132 }
00133 case barrier_listener::release_all_false: {
00134 relCnt=maxCnt;
00135 cv1->notify_all();
00136
00137 break;
00138 }
00139 case barrier_listener::release_all_true: {
00140 relCnt=maxCnt;
00141 ret=true;
00142 cv1->notify_all();
00143
00144 break;
00145 }
00146 }
00147 }
00148
00149 for (;;) {
00150 if (relCnt>0) {
00151 relCnt--;
00152 break;
00153 }
00154
00155 cv1->wait();
00156 }
00157
00158 bool sav=ret;
00159 assert(wtCnt>0);
00160 if (--wtCnt==0) ret=false;
00161
00162 if (wtCnt==0 || relCnt==0) cv2->notify_all();
00163
00164 return sav;
00165 }
00166
00167 void barrier::release_one()
00168 {
00169 auto_mutex amt(*mtx);
00170
00171 if (relCnt<maxCnt) relCnt++;
00172 cv1->notify();
00173 }
00174
00175 void barrier::release_all()
00176 {
00177 auto_mutex amt(*mtx);
00178
00179 relCnt=maxCnt;
00180 cv1->notify_all();
00181 }
00182
00183 barrier_listener* barrier::set_listener(barrier_listener* bl)
00184 {
00185 barrier_listener* ret=blnr;
00186 blnr=bl;
00187
00188 return ret;
00189 }
00190
00191 sh_barrier new_barrier(mem_pool& mp, int cnt)
00192 {
00193 mp_newbuf<barrier> buf(mp);
00194 return sh_barrier(buf.pool(), buf.rls(::new(buf.get()) barrier(mp, cnt)));
00195 }
00196
00197 namespace detail {
00198
00199 thr_core::thr_core(void (*st)(void*), void* ar) :
00200 strt(st), arg(ar), ownmp(0, 1), mtx(new_mutex(ownmp)), cv(new_cond_var(ownmp,
00201 *mtx)), refs(0), rng(true)
00202 {
00203 }
00204
00205 void thr_core::inc_refs()
00206 {
00207 auto_mutex amt(*mtx);
00208 refs++;
00209 }
00210
00211 void thr_core::dec_refs(bool parent)
00212 {
00213 bool del=false;
00214
00215 if (parent) {
00216 auto_mutex amt(*mtx);
00217 del=(--refs==0 && !rng);
00218 }
00219 else {
00220 auto_mutex amt(*mtx);
00221
00222 del=(--refs==0);
00223
00224 clear_rng();
00225 cv->notify_all();
00226 }
00227
00228 if (del) delete this;
00229 }
00230
00231 void thr_core::clear_rng()
00232 {
00233 rng=false;
00234 }
00235
00236 void thr_core::start()
00237 {
00238 tc_ptr child(false, this);
00239 strt(arg);
00240 }
00241
00242 bool thr_core::is_running()
00243 {
00244 auto_mutex amt(*mtx);
00245 return rng;
00246 }
00247
00248 void thr_core::join()
00249 {
00250 auto_mutex amt(*mtx);
00251 while (rng) cv->wait();
00252 }
00253
00254 }
00255
00256 }
00257