00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00016 #include <ders/thread.hpp>
00017 #include <errno.h>
00018 #include <pthread.h>
00019 #include <time.h>
00020 #include <ders/error.hpp>
00021
00022 namespace {
00023
00024 using namespace std;
00025 using namespace ders;
00026 using namespace ders::detail;
00027
00028 struct pos_thread : public thread {
00029 mem_pool& mp;
00030 tc_ptr core;
00031
00032 pos_thread(mem_pool& m, void (*strt)(void*), void* arg);
00033 ~pos_thread();
00034
00035 virtual bool is_running();
00036 virtual void join();
00037 virtual void destroy(mem_pool& mp2) { destroy_this(this, mp2); }
00038
00039 void start();
00040 };
00041
00042 struct pos_mutex : public mutex {
00043 pthread_mutex_t mtx;
00044
00045 pos_mutex();
00046 ~pos_mutex();
00047
00048 virtual void lock();
00049 virtual void unlock();
00050
00051 virtual void destroy(mem_pool& mp2) { destroy_this(this, mp2); }
00052 };
00053
00054 struct pos_cond_var : public cond_var {
00055 pos_mutex& pm;
00056 pthread_cond_t cv;
00057
00058 pos_cond_var(mutex& mtx);
00059 ~pos_cond_var();
00060
00061 virtual void wait(long timeout);
00062 virtual void notify();
00063 virtual void notify_all();
00064
00065 virtual void destroy(mem_pool& mp2) { destroy_this(this, mp2); }
00066 };
00067
00068 extern "C" void* pos_start(void* arg)
00069 {
00070 static_cast<thr_core*>(arg)->start();
00071 return 0;
00072 }
00073
00074 pos_thread::pos_thread(mem_pool& m, void (*strt)(void*), void* arg) :
00075 mp(m), core(true, new thr_core(strt, arg))
00076 {
00077 }
00078
00079 pos_thread::~pos_thread()
00080 {
00081 }
00082
00083 bool pos_thread::is_running()
00084 {
00085 return core->is_running();
00086 }
00087
00088 void pos_thread::join()
00089 {
00090 core->join();
00091 }
00092
00093 void pos_thread::start()
00094 {
00095 pthread_attr_t attr;
00096 int rc=pthread_attr_init(&attr);
00097 hard_assert(rc==0);
00098
00099 rc=pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
00100 hard_assert(rc==0);
00101
00102 pthread_t thr;
00103 rc=pthread_create(&thr, &attr, pos_start, core.ptr);
00104
00105 int rc2=pthread_attr_destroy(&attr);
00106 hard_assert(rc2==0);
00107
00108 if (rc!=0) {
00109 core->clear_rng();
00110
00111 throw newErrorException(mp, _FLINE_, "Can't create thread", convert_errno(
00112 rc));
00113 }
00114 }
00115
00116 pos_mutex::pos_mutex()
00117 {
00118 int rc=pthread_mutex_init(&mtx, NULL);
00119 hard_assert(rc==0);
00120 }
00121
00122 pos_mutex::~pos_mutex()
00123 {
00124 int rc=pthread_mutex_destroy(&mtx);
00125 hard_assert(rc==0);
00126 }
00127
00128 void pos_mutex::lock()
00129 {
00130 int rc=pthread_mutex_lock(&mtx);
00131 hard_assert(rc==0);
00132 }
00133
00134 void pos_mutex::unlock()
00135 {
00136 int rc=pthread_mutex_unlock(&mtx);
00137 hard_assert(rc==0);
00138 }
00139
00140 pos_cond_var::pos_cond_var(mutex& mtx) : pm(dynamic_cast<pos_mutex&>(mtx))
00141 {
00142 int rc=pthread_cond_init(&cv, NULL);
00143 hard_assert(rc==0);
00144 }
00145
00146 pos_cond_var::~pos_cond_var()
00147 {
00148 int rc=pthread_cond_destroy(&cv);
00149 hard_assert(rc==0);
00150 }
00151
00152 void pos_cond_var::wait(long timeout)
00153 {
00154 if (timeout==infinite) {
00155 int rc=pthread_cond_wait(&cv, &pm.mtx);
00156 hard_assert(rc==0);
00157 }
00158 else {
00159 timespec ts;
00160 clock_gettime(CLOCK_REALTIME, &ts);
00161 ts.tv_sec+=timeout/1000;
00162 ts.tv_nsec+=timeout%1000*1000;
00163
00164 int rc=pthread_cond_timedwait(&cv, &pm.mtx, &ts);
00165 hard_assert(rc==0 || rc==ETIMEDOUT);
00166 }
00167 }
00168
00169 void pos_cond_var::notify()
00170 {
00171 int rc=pthread_cond_signal(&cv);
00172 hard_assert(rc==0);
00173 }
00174
00175 void pos_cond_var::notify_all()
00176 {
00177 int rc=pthread_cond_broadcast(&cv);
00178 hard_assert(rc==0);
00179 }
00180
00181 }
00182
00183 namespace ders {
00184
00185 sh_thread new_thread(mem_pool& mp, void (*start)(void*), void* arg)
00186 {
00187 mp_newbuf<pos_thread> buf(mp);
00188 sh_thread ret(buf.pool(), buf.rls(::new(buf.get()) pos_thread(buf.pool(),
00189 start, arg)));
00190
00191 static_cast<pos_thread*>(ret.get())->start();
00192
00193 return ret;
00194 }
00195
00196 sh_mutex new_mutex(mem_pool& mp)
00197 {
00198 mp_newbuf<pos_mutex> buf(mp);
00199 return sh_mutex(buf.pool(), buf.rls(::new(buf.get()) pos_mutex));
00200 }
00201
00202 sh_cond_var new_cond_var(mem_pool& mp, mutex& mtx)
00203 {
00204 mp_newbuf<pos_cond_var> buf(mp);
00205 return sh_cond_var(buf.pool(), buf.rls(::new(buf.get()) pos_cond_var(mtx)));
00206 }
00207
00208 }
00209