00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00015 #include <ders/thread_pool.hpp>
00016 #include <ders/thread.hpp>
00017
00018 namespace {
00019
00020 using namespace ders;
00021 using namespace std;
00022
00023 struct exec_data;
00024
00025 struct mt_dque : data_queue {
00026 exec_data& ed;
00027 sh_mutex mtx;
00028 data_queue* dq;
00029
00030 mt_dque(mem_pool& mp, exec_data& e, data_queue* d) : ed(e), mtx(
00031 new_mutex(mp)), dq(d) {}
00032
00033 virtual void destroy(mem_pool& mp2) { destroy_this(this, mp2); }
00034 virtual bool is_intr();
00035 virtual void set_intr(bool val);
00036 virtual result read_obj(object_io& rdr, bool wait);
00037 virtual result write_obj(const object_io& wrtr);
00038 };
00039
00040 struct exec_data : task_opers {
00041 task& tsk;
00042 vector<sh_data_queue> dqs;
00043 dq_vec dqv;
00044 sh_barrier bar;
00045 sh_mutex mtx;
00046
00047 exec_data(mem_pool& mp, task& tk, int bc, const dq_vec& dv);
00048
00049 virtual void destroy(mem_pool& mp2) { destroy_this(this, mp2); }
00050 virtual void invoke(void (*fun)(void*), void* arg);
00051 };
00052
00053 typedef sh_ptr<exec_data> sh_exec_data;
00054
00055 struct mt_thrpl : thread_pool {
00056 mem_pool ownmp;
00057 bool dstr;
00058 sh_barrier bar;
00059 vector<sh_thread> tds;
00060 sh_exec_data ed;
00061
00062 mt_thrpl(int cnt);
00063 ~mt_thrpl();
00064
00065 virtual void destroy(mem_pool& mp2) { destroy_this(this, mp2); }
00066 virtual void exec(task& tk, const dq_vec& dv);
00067
00068 void start();
00069 void do_exec();
00070 };
00071
00072 sh_data_queue new_mt_dque(mem_pool& mp, exec_data& e, data_queue* d)
00073 {
00074 mp_newbuf<mt_dque> buf(mp);
00075 return sh_data_queue(buf.pool(), buf.rls(::new(buf.get()) mt_dque(mp, e, d)));
00076 }
00077
00078 bool mt_dque::is_intr()
00079 {
00080 auto_mutex amt(*mtx);
00081 return dq->is_intr();
00082 }
00083
00084 void mt_dque::set_intr(bool val)
00085 {
00086 {
00087 auto_mutex amt(*mtx);
00088 dq->set_intr(val);
00089 }
00090
00091 ed.bar->release_all();
00092 }
00093
00094 data_queue::result mt_dque::read_obj(object_io& rdr, bool wait)
00095 {
00096 for (;;) {
00097 result rc;
00098 {
00099 auto_mutex amt(*mtx);
00100 rc=dq->read_obj(rdr, wait);
00101 }
00102
00103 if (!wait || rc!=empt) return rc;
00104
00105 if (ed.bar->wait()) return empt;
00106 }
00107 }
00108
00109 data_queue::result mt_dque::write_obj(const object_io& wrtr)
00110 {
00111 result rc;
00112 {
00113 auto_mutex amt(*mtx);
00114 rc=dq->write_obj(wrtr);
00115 }
00116 if (rc==ok) ed.bar->release_all();
00117
00118 return rc;
00119 }
00120
00121 sh_exec_data null_exec_data(mem_pool& mp)
00122 {
00123 return sh_exec_data(mp, 0);
00124 }
00125
00126 sh_exec_data new_exec_data(mem_pool& mp, task& tk, int bc, const dq_vec& dv)
00127 {
00128 mp_newbuf<exec_data> buf(mp);
00129 return sh_exec_data(buf.pool(), buf.rls(::new(buf.get()) exec_data(mp, tk, bc,
00130 dv)));
00131 }
00132
00133 exec_data::exec_data(mem_pool& mp, task& tk, int bc, const dq_vec& dv) :
00134 tsk(tk), bar(new_barrier(mp, bc)), mtx(new_mutex(mp))
00135 {
00136 int sz=dv.size();
00137 dqs.reserve(sz);
00138 dqv.reserve(sz);
00139
00140 for (int i=0; i<sz; i++) {
00141 dqs.push_back(new_mt_dque(mp, *this, dv[i]));
00142 dqv.push_back(dqs[i].get());
00143 }
00144 }
00145
00146 void exec_data::invoke(void (*fun)(void*), void* arg)
00147 {
00148 auto_mutex amt(*mtx);
00149 fun(arg);
00150 }
00151
00152 void start_fun(void* arg)
00153 {
00154 static_cast<mt_thrpl*>(arg)->start();
00155 }
00156
00157 mt_thrpl::mt_thrpl(int cnt) :
00158 dstr(false), bar(new_barrier(ownmp, cnt+1)), ed(null_exec_data(ownmp))
00159 {
00160 assert(cnt>0);
00161 for (int i=0; i<cnt; i++) tds.push_back(new_thread(ownmp, start_fun, this));
00162 }
00163
00164 mt_thrpl::~mt_thrpl()
00165 {
00166 assert(!ed.get());
00167
00168 assert(!dstr);
00169 dstr=true;
00170 bar->wait();
00171
00172 for (int i=0, end=tds.size(); i<end; i++) tds[i]->join();
00173 }
00174
00175 void mt_thrpl::exec(task& tk, const dq_vec& dv)
00176 {
00177 assert(!dstr);
00178
00179 assert(!ed.get());
00180 ed=new_exec_data(ownmp, tk, bar->get_count(), dv);
00181 bar->wait();
00182
00183 do_exec();
00184
00185 bar->wait();
00186 ed=null_exec_data(ownmp);
00187 }
00188
00189 void mt_thrpl::start()
00190 {
00191 for (;;) {
00192 bar->wait();
00193 if (dstr) break;
00194
00195 assert(ed.get());
00196 do_exec();
00197
00198 bar->wait();
00199 }
00200 }
00201
00202 void mt_thrpl::do_exec()
00203 {
00204 mem_pool mp;
00205
00206 void* arg;
00207 {
00208 auto_mutex amt(*ed->mtx);
00209 arg=ed->tsk.proc_arg();
00210 }
00211
00212 try { ed->tsk.proc(mp, ed->dqv, arg, *ed); }
00213 catch (...) { hard_assert(false); }
00214 }
00215
00216 }
00217
00218 namespace ders {
00219
00220 sh_thread_pool new_thread_pool(mem_pool& mp, int cnt)
00221 {
00222 mp_newbuf<mt_thrpl> buf(mp);
00223 return sh_thread_pool(buf.pool(), buf.rls(::new(buf.get()) mt_thrpl(cnt)));
00224 }
00225
00226 }
00227