mt_thread_pool.cpp

Go to the documentation of this file.
00001 /*
00002  * Copyright (C) Sergey P. Derevyago, 2008.
00003  *
00004  * Permission to copy, use, modify, sell and distribute this software is granted
00005  * provided this copyright notice appears in all copies.
00006  * This software is provided "as is" without express or implied warranty, and
00007  * with no claim as to its suitability for any purpose.
00008  *
00009  */
00010 
00015 #include <ders/thread_pool.hpp>
00016 #include <ders/thread.hpp>
00017 
00018 namespace {
00019 
00020 using namespace ders;
00021 using namespace std;
00022 
00023 struct exec_data;
00024 
00025 struct mt_dque : data_queue {
00026        exec_data& ed;
00027        sh_mutex mtx;
00028        data_queue* dq;
00029 
00030        mt_dque(mem_pool& mp, exec_data& e, data_queue* d) : ed(e), mtx(
00031          new_mutex(mp)), dq(d) {}
00032 
00033        virtual void destroy(mem_pool& mp2) { destroy_this(this, mp2); }
00034        virtual bool is_intr();
00035        virtual void set_intr(bool val);
00036        virtual result read_obj(object_io& rdr, bool wait);
00037        virtual result write_obj(const object_io& wrtr);
00038 };
00039 
00040 struct exec_data : task_opers {
00041        task& tsk;
00042        vector<sh_data_queue> dqs;
00043        dq_vec dqv;
00044        sh_barrier bar;
00045        sh_mutex mtx;
00046 
00047        exec_data(mem_pool& mp, task& tk, int bc, const dq_vec& dv);
00048 
00049        virtual void destroy(mem_pool& mp2) { destroy_this(this, mp2); }
00050        virtual void invoke(void (*fun)(void*), void* arg);
00051 };
00052 
00053 typedef sh_ptr<exec_data> sh_exec_data;
00054 
00055 struct mt_thrpl : thread_pool {
00056        mem_pool ownmp;
00057        bool dstr;
00058        sh_barrier bar;
00059        vector<sh_thread> tds;
00060        sh_exec_data ed;
00061 
00062        mt_thrpl(int cnt);
00063        ~mt_thrpl();
00064 
00065        virtual void destroy(mem_pool& mp2) { destroy_this(this, mp2); }
00066        virtual void exec(task& tk, const dq_vec& dv);
00067 
00068        void start();
00069        void do_exec();
00070 };
00071 
00072 sh_data_queue new_mt_dque(mem_pool& mp, exec_data& e, data_queue* d)
00073 {
00074  mp_newbuf<mt_dque> buf(mp);
00075  return sh_data_queue(buf.pool(), buf.rls(::new(buf.get()) mt_dque(mp, e, d)));
00076 }
00077 
00078 bool mt_dque::is_intr()
00079 {
00080  auto_mutex amt(*mtx);
00081  return dq->is_intr();
00082 }
00083 
00084 void mt_dque::set_intr(bool val)
00085 {
00086  {
00087   auto_mutex amt(*mtx);
00088   dq->set_intr(val);
00089  }
00090 
00091  ed.bar->release_all();
00092 }
00093 
00094 data_queue::result mt_dque::read_obj(object_io& rdr, bool wait)
00095 {
00096  for (;;) {
00097      result rc;
00098      {
00099       auto_mutex amt(*mtx);
00100       rc=dq->read_obj(rdr, wait);
00101      }
00102 
00103      if (!wait || rc!=empt) return rc;
00104 
00105      if (ed.bar->wait()) return empt;
00106  }
00107 }
00108 
00109 data_queue::result mt_dque::write_obj(const object_io& wrtr)
00110 {
00111  result rc;
00112  {
00113   auto_mutex amt(*mtx);
00114   rc=dq->write_obj(wrtr);
00115  }
00116  if (rc==ok) ed.bar->release_all();
00117 
00118  return rc;
00119 }
00120 
00121 sh_exec_data null_exec_data(mem_pool& mp)
00122 {
00123  return sh_exec_data(mp, 0);
00124 }
00125 
00126 sh_exec_data new_exec_data(mem_pool& mp, task& tk, int bc, const dq_vec& dv)
00127 {
00128  mp_newbuf<exec_data> buf(mp);
00129  return sh_exec_data(buf.pool(), buf.rls(::new(buf.get()) exec_data(mp, tk, bc,
00130    dv)));
00131 }
00132 
00133 exec_data::exec_data(mem_pool& mp, task& tk, int bc, const dq_vec& dv) :
00134   tsk(tk), bar(new_barrier(mp, bc)), mtx(new_mutex(mp))
00135 {
00136  int sz=dv.size();
00137  dqs.reserve(sz);
00138  dqv.reserve(sz);
00139 
00140  for (int i=0; i<sz; i++) {
00141      dqs.push_back(new_mt_dque(mp, *this, dv[i]));
00142      dqv.push_back(dqs[i].get());
00143  }
00144 }
00145 
00146 void exec_data::invoke(void (*fun)(void*), void* arg)
00147 {
00148  auto_mutex amt(*mtx);
00149  fun(arg);
00150 }
00151 
00152 void start_fun(void* arg)
00153 {
00154  static_cast<mt_thrpl*>(arg)->start();
00155 }
00156 
00157 mt_thrpl::mt_thrpl(int cnt) :
00158   dstr(false), bar(new_barrier(ownmp, cnt+1)), ed(null_exec_data(ownmp))
00159 {
00160  assert(cnt>0);
00161  for (int i=0; i<cnt; i++) tds.push_back(new_thread(ownmp, start_fun, this));
00162 }
00163 
00164 mt_thrpl::~mt_thrpl()
00165 {
00166  assert(!ed.get());
00167 
00168  assert(!dstr);
00169  dstr=true;
00170  bar->wait();
00171 
00172  for (int i=0, end=tds.size(); i<end; i++) tds[i]->join();
00173 }
00174 
00175 void mt_thrpl::exec(task& tk, const dq_vec& dv)
00176 {
00177  assert(!dstr);
00178 
00179  assert(!ed.get());
00180  ed=new_exec_data(ownmp, tk, bar->get_count(), dv);
00181  bar->wait();
00182 
00183  do_exec();
00184 
00185  bar->wait();
00186  ed=null_exec_data(ownmp);
00187 }
00188 
00189 void mt_thrpl::start()
00190 {
00191  for (;;) {
00192      bar->wait();
00193      if (dstr) break;
00194 
00195      assert(ed.get());
00196      do_exec();
00197 
00198      bar->wait();
00199  }
00200 }
00201 
00202 void mt_thrpl::do_exec()
00203 {
00204  mem_pool mp;
00205 
00206  void* arg;
00207  {
00208   auto_mutex amt(*ed->mtx);
00209   arg=ed->tsk.proc_arg();
00210  }
00211 
00212  try { ed->tsk.proc(mp, ed->dqv, arg, *ed); }
00213  catch (...) { hard_assert(false); }
00214 }
00215 
00216 }  // unnamed
00217 
00218 namespace ders {  // ::ders
00219 
00220 sh_thread_pool new_thread_pool(mem_pool& mp, int cnt)
00221 {
00222  mp_newbuf<mt_thrpl> buf(mp);
00223  return sh_thread_pool(buf.pool(), buf.rls(::new(buf.get()) mt_thrpl(cnt)));
00224 }
00225 
00226 }  // namespace ::ders
00227 

Generated on Tue Dec 8 11:35:32 2009 for derslib by  doxygen 1.5.5